fix: Fix imports and database bugs. Removed legacy ETL code.

This commit is contained in:
Eric Gullickson
2025-12-27 12:07:24 -06:00
parent 0d9edbe761
commit bfb0c23ae1
30 changed files with 239174 additions and 4441 deletions

View File

@@ -0,0 +1,386 @@
/**
* Bulk Vehicle Catalog CSV Import
*
* Processes large CSV files (250k+ rows) using batch processing to avoid
* memory and timeout issues that occur in the web import.
*
* Usage (from inside container):
* ts-node src/features/admin/scripts/bulk-import-catalog.ts
*
* CSV Format:
* Required columns: year, make, model, trim
* Optional columns: engine_name, transmission_type
*/
import * as fs from 'fs';
import * as readline from 'readline';
import { pool } from '../../../core/config/database';
const BATCH_SIZE = 5000;
const CSV_PATH = '/tmp/catalog-import.csv';
interface ImportRow {
year: number;
make: string;
model: string;
trim: string;
engineName: string | null;
transmissionType: string | null;
}
interface ImportStats {
totalRows: number;
batchesProcessed: number;
errors: number;
startTime: Date;
}
/**
* Parse a CSV line handling quoted fields
*/
function parseCSVLine(line: string): string[] {
const result: string[] = [];
let current = '';
let inQuotes = false;
for (let i = 0; i < line.length; i++) {
const char = line[i];
if (char === '"') {
inQuotes = !inQuotes;
} else if (char === ',' && !inQuotes) {
result.push(current.trim());
current = '';
} else {
current += char;
}
}
result.push(current.trim());
return result;
}
/**
* Bulk get or create engines
* Returns map of engine_name -> engine_id
*/
async function getOrCreateEngines(
client: any,
engineNames: string[]
): Promise<Map<string, number>> {
if (engineNames.length === 0) {
return new Map();
}
// Build VALUES clause for bulk insert
const values: any[] = [];
const placeholders = engineNames
.map((name, idx) => {
values.push(name, 'Gas');
return `($${idx * 2 + 1}, $${idx * 2 + 2})`;
})
.join(', ');
const query = `
INSERT INTO engines (name, fuel_type)
VALUES ${placeholders}
ON CONFLICT (LOWER(name)) DO UPDATE
SET name = EXCLUDED.name
RETURNING id, name
`;
const result = await client.query(query, values);
const map = new Map<string, number>();
for (const row of result.rows) {
map.set(row.name, row.id);
}
return map;
}
/**
* Bulk get or create transmissions
* Returns map of transmission_type -> transmission_id
*/
async function getOrCreateTransmissions(
client: any,
transmissionTypes: string[]
): Promise<Map<string, number>> {
if (transmissionTypes.length === 0) {
return new Map();
}
// Build VALUES clause for bulk insert
const values: any[] = [];
const placeholders = transmissionTypes
.map((type, idx) => {
values.push(type);
return `($${idx + 1})`;
})
.join(', ');
const query = `
INSERT INTO transmissions (type)
VALUES ${placeholders}
ON CONFLICT (LOWER(type)) DO UPDATE
SET type = EXCLUDED.type
RETURNING id, type
`;
const result = await client.query(query, values);
const map = new Map<string, number>();
for (const row of result.rows) {
map.set(row.type, row.id);
}
return map;
}
/**
* Process a batch of rows
*/
async function processBatch(
client: any,
batch: ImportRow[],
stats: ImportStats
): Promise<void> {
if (batch.length === 0) {
return;
}
// Extract unique engines and transmissions
const uniqueEngines = new Set<string>();
const uniqueTransmissions = new Set<string>();
for (const row of batch) {
if (row.engineName) {
uniqueEngines.add(row.engineName);
}
if (row.transmissionType) {
uniqueTransmissions.add(row.transmissionType);
}
}
// Get/create engines and transmissions
const engineMap = await getOrCreateEngines(client, Array.from(uniqueEngines));
const transmissionMap = await getOrCreateTransmissions(
client,
Array.from(uniqueTransmissions)
);
// Build vehicle_options batch upsert
const values: any[] = [];
const placeholders = batch
.map((row, idx) => {
const engineId = row.engineName ? engineMap.get(row.engineName) || null : null;
const transmissionId = row.transmissionType
? transmissionMap.get(row.transmissionType) || null
: null;
values.push(
row.year,
row.make,
row.model,
row.trim,
engineId,
transmissionId
);
const base = idx * 6;
return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5}, $${base + 6})`;
})
.join(', ');
const upsertQuery = `
INSERT INTO vehicle_options (year, make, model, trim, engine_id, transmission_id)
VALUES ${placeholders}
ON CONFLICT (year, make, model, trim, engine_id, transmission_id)
DO UPDATE SET
updated_at = NOW()
`;
await client.query(upsertQuery, values);
stats.totalRows += batch.length;
}
/**
* Main import function
*/
async function importCatalog(): Promise<void> {
const stats: ImportStats = {
totalRows: 0,
batchesProcessed: 0,
errors: 0,
startTime: new Date(),
};
console.log('='.repeat(60));
console.log('Vehicle Catalog Bulk Import');
console.log('='.repeat(60));
console.log(`CSV File: ${CSV_PATH}`);
console.log(`Batch Size: ${BATCH_SIZE}`);
console.log('');
// Validate file exists
if (!fs.existsSync(CSV_PATH)) {
console.error(`Error: CSV file not found at ${CSV_PATH}`);
process.exit(1);
}
const fileStream = fs.createReadStream(CSV_PATH);
const rl = readline.createInterface({
input: fileStream,
crlfDelay: Infinity,
});
let headers: string[] = [];
let headerIndices: Record<string, number> = {};
let batch: ImportRow[] = [];
let isFirstLine = true;
for await (const line of rl) {
// Parse header row
if (isFirstLine) {
headers = parseCSVLine(line);
const headerLower = headers.map((h) => h.toLowerCase().trim());
// Validate required headers
const required = ['year', 'make', 'model', 'trim'];
for (const req of required) {
if (!headerLower.includes(req)) {
console.error(`Error: Missing required header: ${req}`);
process.exit(1);
}
}
// Build header index map
headerIndices = {
year: headerLower.indexOf('year'),
make: headerLower.indexOf('make'),
model: headerLower.indexOf('model'),
trim: headerLower.indexOf('trim'),
engineName: headerLower.indexOf('engine_name'),
transmissionType: headerLower.indexOf('transmission_type'),
};
isFirstLine = false;
continue;
}
// Parse data row
try {
const fields = parseCSVLine(line);
const row: ImportRow = {
year: parseInt(fields[headerIndices.year]),
make: fields[headerIndices.make]?.trim() || '',
model: fields[headerIndices.model]?.trim() || '',
trim: fields[headerIndices.trim]?.trim() || '',
engineName:
headerIndices.engineName >= 0
? fields[headerIndices.engineName]?.trim() || null
: null,
transmissionType:
headerIndices.transmissionType >= 0
? fields[headerIndices.transmissionType]?.trim() || null
: null,
};
batch.push(row);
// Process batch when full
if (batch.length >= BATCH_SIZE) {
const client = await pool.connect();
try {
await client.query('BEGIN');
await processBatch(client, batch, stats);
await client.query('COMMIT');
stats.batchesProcessed++;
const elapsed = (Date.now() - stats.startTime.getTime()) / 1000;
console.log(
`Batch ${stats.batchesProcessed}: ${stats.totalRows.toLocaleString()} rows processed (${elapsed.toFixed(1)}s)`
);
} catch (error: any) {
await client.query('ROLLBACK');
console.error(`Error processing batch ${stats.batchesProcessed + 1}:`, error.message);
stats.errors += batch.length;
} finally {
client.release();
}
batch = [];
}
} catch (error: any) {
stats.errors++;
console.error(`Error parsing row: ${error.message}`);
}
}
// Process remaining rows
if (batch.length > 0) {
const client = await pool.connect();
try {
await client.query('BEGIN');
await processBatch(client, batch, stats);
await client.query('COMMIT');
stats.batchesProcessed++;
const elapsed = (Date.now() - stats.startTime.getTime()) / 1000;
console.log(
`Batch ${stats.batchesProcessed}: ${stats.totalRows.toLocaleString()} rows processed (${elapsed.toFixed(1)}s)`
);
} catch (error: any) {
await client.query('ROLLBACK');
console.error(`Error processing final batch:`, error.message);
stats.errors += batch.length;
} finally {
client.release();
}
}
// Print summary
const totalElapsed = (Date.now() - stats.startTime.getTime()) / 1000;
console.log('');
console.log('='.repeat(60));
console.log('Import Summary');
console.log('='.repeat(60));
console.log(`Total rows processed: ${stats.totalRows.toLocaleString()}`);
console.log(`Batches processed: ${stats.batchesProcessed}`);
console.log(`Errors: ${stats.errors}`);
console.log(`Elapsed time: ${Math.floor(totalElapsed / 60)}m ${(totalElapsed % 60).toFixed(0)}s`);
console.log('');
// Verify counts
const client = await pool.connect();
try {
const voResult = await client.query('SELECT COUNT(*) FROM vehicle_options');
const engResult = await client.query('SELECT COUNT(*) FROM engines');
const transResult = await client.query('SELECT COUNT(*) FROM transmissions');
console.log('Database Verification:');
console.log(` vehicle_options: ${parseInt(voResult.rows[0].count).toLocaleString()}`);
console.log(` engines: ${parseInt(engResult.rows[0].count).toLocaleString()}`);
console.log(` transmissions: ${parseInt(transResult.rows[0].count).toLocaleString()}`);
} finally {
client.release();
}
console.log('');
console.log('Import completed successfully!');
console.log('='.repeat(60));
}
// Run import
importCatalog()
.then(() => {
pool.end();
process.exit(0);
})
.catch((error) => {
console.error('Fatal error:', error);
pool.end();
process.exit(1);
});