/** * @ai-summary Service for importing user data from exported archives * @ai-context Orchestrates import process with merge/replace modes and batch operations */ import * as fsp from 'fs/promises'; import * as path from 'path'; import { Pool, PoolClient } from 'pg'; import { logger } from '../../../core/logging/logger'; import { getStorageService } from '../../../core/storage/storage.service'; import { VehiclesRepository } from '../../vehicles/data/vehicles.repository'; import { FuelLogsRepository } from '../../fuel-logs/data/fuel-logs.repository'; import { DocumentsRepository } from '../../documents/data/documents.repository'; import { MaintenanceRepository } from '../../maintenance/data/maintenance.repository'; import { UserImportArchiveService } from './user-import-archive.service'; import { ImportPreview, ImportResult, USER_IMPORT_CONFIG } from './user-import.types'; export class UserImportService { private readonly archiveService: UserImportArchiveService; private readonly vehiclesRepo: VehiclesRepository; private readonly fuelLogsRepo: FuelLogsRepository; private readonly maintenanceRepo: MaintenanceRepository; private readonly documentsRepo: DocumentsRepository; private readonly storageService: ReturnType; constructor(private pool: Pool) { this.archiveService = new UserImportArchiveService(); this.vehiclesRepo = new VehiclesRepository(pool); this.fuelLogsRepo = new FuelLogsRepository(pool); this.maintenanceRepo = new MaintenanceRepository(pool); this.documentsRepo = new DocumentsRepository(pool); this.storageService = getStorageService(); } /** * Generates preview of import data including conflict detection */ async generatePreview(userId: string, archivePath: string): Promise { logger.info('Generating import preview', { userId, archivePath }); // Extract and validate archive const validation = await this.archiveService.extractAndValidate(archivePath, userId); if (!validation.valid || !validation.manifest || !validation.extractedPath) { throw new Error(`Invalid archive: ${validation.errors.join(', ')}`); } const { manifest, extractedPath } = validation; try { // Detect VIN conflicts const vehicles = await this.archiveService.readDataFile(extractedPath, 'vehicles.json'); const vinsToCheck = vehicles .filter((v: any) => v.vin && v.vin.trim().length > 0) .map((v: any) => v.vin.trim()); let vinConflictCount = 0; if (vinsToCheck.length > 0) { const query = ` SELECT COUNT(DISTINCT vin) as count FROM vehicles WHERE user_id = $1 AND vin = ANY($2::text[]) AND is_active = true `; const result = await this.pool.query(query, [userId, vinsToCheck]); vinConflictCount = parseInt(result.rows[0].count, 10); } // Get sample records (first 3 of each type) const fuelLogs = await this.archiveService.readDataFile(extractedPath, 'fuel-logs.json'); const documents = await this.archiveService.readDataFile(extractedPath, 'documents.json'); const maintenanceRecords = await this.archiveService.readDataFile(extractedPath, 'maintenance-records.json'); const maintenanceSchedules = await this.archiveService.readDataFile(extractedPath, 'maintenance-schedules.json'); return { manifest, conflicts: { vehicles: vinConflictCount, }, sampleRecords: { vehicles: vehicles.slice(0, 3), fuelLogs: fuelLogs.slice(0, 3), documents: documents.slice(0, 3), maintenanceRecords: maintenanceRecords.slice(0, 3), maintenanceSchedules: maintenanceSchedules.slice(0, 3), }, }; } catch (error) { logger.error('Error generating preview', { userId, error: error instanceof Error ? error.message : String(error), }); throw error; } } /** * Executes merge mode import: UPDATE existing records, INSERT new records * Partial success - continues on errors, reports in summary */ async executeMerge(userId: string, archivePath: string): Promise { logger.info('Executing merge mode import', { userId, archivePath }); const validation = await this.archiveService.extractAndValidate(archivePath, userId); if (!validation.valid || !validation.manifest || !validation.extractedPath) { throw new Error(`Invalid archive: ${validation.errors.join(', ')}`); } const { extractedPath } = validation; const summary = { imported: 0, updated: 0, skipped: 0, errors: [] as string[], }; const warnings: string[] = []; try { // Import vehicles with conflict resolution await this.mergeVehicles(userId, extractedPath, summary); // Import fuel logs (batch insert, skip conflicts) await this.mergeFuelLogs(userId, extractedPath, summary); // Import maintenance records await this.mergeMaintenanceRecords(userId, extractedPath, summary); // Import maintenance schedules await this.mergeMaintenanceSchedules(userId, extractedPath, summary); // Import documents await this.mergeDocuments(userId, extractedPath, summary); // Copy files from archive await this.copyFiles(userId, extractedPath, warnings); return { success: summary.errors.length === 0, mode: 'merge', summary, warnings, }; } finally { // Always cleanup temp directory await this.archiveService.cleanup(extractedPath); } } /** * Executes replace mode import: DELETE all user data, INSERT all records * All-or-nothing transaction */ async executeReplace(userId: string, archivePath: string): Promise { logger.info('Executing replace mode import', { userId, archivePath }); const validation = await this.archiveService.extractAndValidate(archivePath, userId); if (!validation.valid || !validation.manifest || !validation.extractedPath) { throw new Error(`Invalid archive: ${validation.errors.join(', ')}`); } const { extractedPath } = validation; const summary = { imported: 0, updated: 0, skipped: 0, errors: [] as string[], }; const warnings: string[] = []; const client = await this.pool.connect(); try { await client.query('BEGIN'); // Delete existing data in correct order to avoid FK violations logger.info('Deleting existing user data', { userId }); // Delete maintenance records (no FK to vehicles) await client.query('DELETE FROM maintenance_records WHERE user_id = $1', [userId]); // Delete maintenance schedules (no FK to vehicles) await client.query('DELETE FROM maintenance_schedules WHERE user_id = $1', [userId]); // Delete vehicles (CASCADE to fuel_logs and documents) await client.query('DELETE FROM vehicles WHERE user_id = $1', [userId]); // Import all data using batch operations await this.insertVehicles(userId, extractedPath, summary, client); await this.insertFuelLogs(userId, extractedPath, summary, client); await this.insertMaintenanceRecords(userId, extractedPath, summary, client); await this.insertMaintenanceSchedules(userId, extractedPath, summary, client); await this.insertDocuments(userId, extractedPath, summary, client); // Copy files from archive await this.copyFiles(userId, extractedPath, warnings); await client.query('COMMIT'); return { success: true, mode: 'replace', summary, warnings, }; } catch (error) { await client.query('ROLLBACK'); logger.error('Replace mode import failed, rolled back', { userId, error: error instanceof Error ? error.message : String(error), }); throw error; } finally { client.release(); await this.archiveService.cleanup(extractedPath); } } /** * Merge vehicles: UPDATE existing by VIN, INSERT new */ private async mergeVehicles( userId: string, extractedPath: string, summary: ImportResult['summary'] ): Promise { const vehicles = await this.archiveService.readDataFile(extractedPath, 'vehicles.json'); if (vehicles.length === 0) { return; } // Process in chunks const chunkSize = USER_IMPORT_CONFIG.chunkSize; for (let i = 0; i < vehicles.length; i += chunkSize) { const chunk = vehicles.slice(i, i + chunkSize); for (const vehicle of chunk) { try { // Check if vehicle exists by VIN if (vehicle.vin && vehicle.vin.trim().length > 0) { const existing = await this.vehiclesRepo.findByUserAndVIN(userId, vehicle.vin.trim()); if (existing) { // Update existing vehicle await this.vehiclesRepo.update(existing.id, { make: vehicle.make, model: vehicle.model, year: vehicle.year, engine: vehicle.engine, transmission: vehicle.transmission, trimLevel: vehicle.trimLevel, driveType: vehicle.driveType, fuelType: vehicle.fuelType, nickname: vehicle.nickname, color: vehicle.color, licensePlate: vehicle.licensePlate, odometerReading: vehicle.odometerReading, }); summary.updated++; continue; } } // Insert new vehicle await this.vehiclesRepo.create({ ...vehicle, userId, }); summary.imported++; } catch (error) { summary.errors.push(`Vehicle import failed: ${error instanceof Error ? error.message : String(error)}`); } } } } /** * Merge fuel logs: batch insert new records */ private async mergeFuelLogs( userId: string, extractedPath: string, summary: ImportResult['summary'] ): Promise { const fuelLogs = await this.archiveService.readDataFile(extractedPath, 'fuel-logs.json'); if (fuelLogs.length === 0) { return; } const chunkSize = USER_IMPORT_CONFIG.chunkSize; for (let i = 0; i < fuelLogs.length; i += chunkSize) { const chunk = fuelLogs.slice(i, i + chunkSize); try { const inserted = await this.fuelLogsRepo.batchInsert( chunk.map((log: any) => ({ ...log, userId })) ); summary.imported += inserted.length; } catch (error) { summary.errors.push(`Fuel logs batch import failed: ${error instanceof Error ? error.message : String(error)}`); } } } /** * Merge maintenance records: batch insert new records */ private async mergeMaintenanceRecords( userId: string, extractedPath: string, summary: ImportResult['summary'] ): Promise { const records = await this.archiveService.readDataFile(extractedPath, 'maintenance-records.json'); if (records.length === 0) { return; } const chunkSize = USER_IMPORT_CONFIG.chunkSize; for (let i = 0; i < records.length; i += chunkSize) { const chunk = records.slice(i, i + chunkSize); try { const inserted = await this.maintenanceRepo.batchInsertRecords( chunk.map((record: any) => ({ ...record, userId })) ); summary.imported += inserted.length; } catch (error) { summary.errors.push(`Maintenance records batch import failed: ${error instanceof Error ? error.message : String(error)}`); } } } /** * Merge maintenance schedules: batch insert new records */ private async mergeMaintenanceSchedules( userId: string, extractedPath: string, summary: ImportResult['summary'] ): Promise { const schedules = await this.archiveService.readDataFile(extractedPath, 'maintenance-schedules.json'); if (schedules.length === 0) { return; } const chunkSize = USER_IMPORT_CONFIG.chunkSize; for (let i = 0; i < schedules.length; i += chunkSize) { const chunk = schedules.slice(i, i + chunkSize); try { const inserted = await this.maintenanceRepo.batchInsertSchedules( chunk.map((schedule: any) => ({ ...schedule, userId })) ); summary.imported += inserted.length; } catch (error) { summary.errors.push(`Maintenance schedules batch import failed: ${error instanceof Error ? error.message : String(error)}`); } } } /** * Merge documents: batch insert new records */ private async mergeDocuments( userId: string, extractedPath: string, summary: ImportResult['summary'] ): Promise { const documents = await this.archiveService.readDataFile(extractedPath, 'documents.json'); if (documents.length === 0) { return; } const chunkSize = USER_IMPORT_CONFIG.chunkSize; for (let i = 0; i < documents.length; i += chunkSize) { const chunk = documents.slice(i, i + chunkSize); try { const inserted = await this.documentsRepo.batchInsert( chunk.map((doc: any) => ({ ...doc, userId })) ); summary.imported += inserted.length; } catch (error) { summary.errors.push(`Documents batch import failed: ${error instanceof Error ? error.message : String(error)}`); } } } /** * Insert vehicles using batch operation (for replace mode) */ private async insertVehicles( userId: string, extractedPath: string, summary: ImportResult['summary'], client: PoolClient ): Promise { const vehicles = await this.archiveService.readDataFile(extractedPath, 'vehicles.json'); if (vehicles.length === 0) { return; } const chunkSize = USER_IMPORT_CONFIG.chunkSize; for (let i = 0; i < vehicles.length; i += chunkSize) { const chunk = vehicles.slice(i, i + chunkSize); const inserted = await this.vehiclesRepo.batchInsert( chunk.map((vehicle: any) => ({ ...vehicle, userId })), client ); summary.imported += inserted.length; } } /** * Insert fuel logs using batch operation (for replace mode) */ private async insertFuelLogs( userId: string, extractedPath: string, summary: ImportResult['summary'], client: PoolClient ): Promise { const fuelLogs = await this.archiveService.readDataFile(extractedPath, 'fuel-logs.json'); if (fuelLogs.length === 0) { return; } const chunkSize = USER_IMPORT_CONFIG.chunkSize; for (let i = 0; i < fuelLogs.length; i += chunkSize) { const chunk = fuelLogs.slice(i, i + chunkSize); const inserted = await this.fuelLogsRepo.batchInsert( chunk.map((log: any) => ({ ...log, userId })), client ); summary.imported += inserted.length; } } /** * Insert maintenance records using batch operation (for replace mode) */ private async insertMaintenanceRecords( userId: string, extractedPath: string, summary: ImportResult['summary'], client: PoolClient ): Promise { const records = await this.archiveService.readDataFile(extractedPath, 'maintenance-records.json'); if (records.length === 0) { return; } const chunkSize = USER_IMPORT_CONFIG.chunkSize; for (let i = 0; i < records.length; i += chunkSize) { const chunk = records.slice(i, i + chunkSize); const inserted = await this.maintenanceRepo.batchInsertRecords( chunk.map((record: any) => ({ ...record, userId })), client ); summary.imported += inserted.length; } } /** * Insert maintenance schedules using batch operation (for replace mode) */ private async insertMaintenanceSchedules( userId: string, extractedPath: string, summary: ImportResult['summary'], client: PoolClient ): Promise { const schedules = await this.archiveService.readDataFile(extractedPath, 'maintenance-schedules.json'); if (schedules.length === 0) { return; } const chunkSize = USER_IMPORT_CONFIG.chunkSize; for (let i = 0; i < schedules.length; i += chunkSize) { const chunk = schedules.slice(i, i + chunkSize); const inserted = await this.maintenanceRepo.batchInsertSchedules( chunk.map((schedule: any) => ({ ...schedule, userId })), client ); summary.imported += inserted.length; } } /** * Insert documents using batch operation (for replace mode) */ private async insertDocuments( userId: string, extractedPath: string, summary: ImportResult['summary'], client: PoolClient ): Promise { const documents = await this.archiveService.readDataFile(extractedPath, 'documents.json'); if (documents.length === 0) { return; } const chunkSize = USER_IMPORT_CONFIG.chunkSize; for (let i = 0; i < documents.length; i += chunkSize) { const chunk = documents.slice(i, i + chunkSize); const inserted = await this.documentsRepo.batchInsert( chunk.map((doc: any) => ({ ...doc, userId })), client ); summary.imported += inserted.length; } } /** * Copy vehicle images and document files from archive to storage */ private async copyFiles( _userId: string, extractedPath: string, warnings: string[] ): Promise { const filesPath = path.join(extractedPath, 'files'); try { await fsp.access(filesPath); } catch { // No files directory in archive return; } // Copy vehicle images const vehicleImagesPath = path.join(filesPath, 'vehicle-images'); try { await fsp.access(vehicleImagesPath); const vehicleIds = await fsp.readdir(vehicleImagesPath); for (const vehicleId of vehicleIds) { const vehicleDir = path.join(vehicleImagesPath, vehicleId); const stat = await fsp.stat(vehicleDir); if (!stat.isDirectory()) continue; const images = await fsp.readdir(vehicleDir); for (const image of images) { try { const sourcePath = path.join(vehicleDir, image); const fileBuffer = await fsp.readFile(sourcePath); const key = `vehicle-images/${vehicleId}/${image}`; await this.storageService.putObject('documents', key, fileBuffer); } catch (error) { warnings.push(`Failed to copy vehicle image ${vehicleId}/${image}: ${error instanceof Error ? error.message : String(error)}`); } } } } catch { // No vehicle images } // Copy document files const documentsPath = path.join(filesPath, 'documents'); try { await fsp.access(documentsPath); const documentIds = await fsp.readdir(documentsPath); for (const documentId of documentIds) { const documentDir = path.join(documentsPath, documentId); const stat = await fsp.stat(documentDir); if (!stat.isDirectory()) continue; const files = await fsp.readdir(documentDir); for (const file of files) { try { const sourcePath = path.join(documentDir, file); const fileBuffer = await fsp.readFile(sourcePath); const key = `documents/${documentId}/${file}`; await this.storageService.putObject('documents', key, fileBuffer); } catch (error) { warnings.push(`Failed to copy document file ${documentId}/${file}: ${error instanceof Error ? error.message : String(error)}`); } } } } catch { // No document files } } }