feat: add import service and API layer (refs #26)
Implements Milestone 3: Backend import service and API with: Service Layer (user-import.service.ts): - generatePreview(): extract archive, validate, detect VIN conflicts - executeMerge(): chunk-based import (100 records/batch), UPDATE existing by VIN, INSERT new via batchInsert - executeReplace(): transactional DELETE all user data, batchInsert all records - Conflict detection: VIN duplicates in vehicles - Error handling: collect errors per record, continue, report in summary - File handling: copy vehicle images and documents from archive to storage - Cleanup: delete temp directory in finally block API Layer: - POST /api/user/import: multipart upload, mode selection (merge/replace) - POST /api/user/import/preview: preview without executing import - Authentication: fastify.authenticate preHandler - Content-Type validation: application/gzip or application/x-gzip - Magic byte validation: FileType.fromBuffer verifies tar.gz - Request validation: Zod schema for mode selection - Response: ImportResult with success, mode, summary, warnings Files Created: - backend/src/features/user-import/domain/user-import.service.ts - backend/src/features/user-import/api/user-import.controller.ts - backend/src/features/user-import/api/user-import.routes.ts - backend/src/features/user-import/api/user-import.validation.ts Files Updated: - backend/src/app.ts: register userImportRoutes with /api prefix Quality: - Type-check: PASS (0 errors) - Linting: PASS (0 errors, 470 warnings - all pre-existing) - Repository pattern: snake_case→camelCase conversion - User-scoped: all queries filter by user_id - Transaction boundaries: Replace mode atomic, Merge mode per-batch Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -31,6 +31,7 @@ import { userProfileRoutes } from './features/user-profile';
|
||||
import { onboardingRoutes } from './features/onboarding';
|
||||
import { userPreferencesRoutes } from './features/user-preferences';
|
||||
import { userExportRoutes } from './features/user-export';
|
||||
import { userImportRoutes } from './features/user-import/api/user-import.routes';
|
||||
import { pool } from './core/config/database';
|
||||
import { configRoutes } from './core/config/config.routes';
|
||||
|
||||
@@ -92,7 +93,7 @@ async function buildApp(): Promise<FastifyInstance> {
|
||||
status: 'healthy',
|
||||
timestamp: new Date().toISOString(),
|
||||
environment: process.env['NODE_ENV'],
|
||||
features: ['admin', 'auth', 'config', 'onboarding', 'vehicles', 'documents', 'fuel-logs', 'stations', 'maintenance', 'platform', 'notifications', 'user-profile', 'user-preferences', 'user-export']
|
||||
features: ['admin', 'auth', 'config', 'onboarding', 'vehicles', 'documents', 'fuel-logs', 'stations', 'maintenance', 'platform', 'notifications', 'user-profile', 'user-preferences', 'user-export', 'user-import']
|
||||
});
|
||||
});
|
||||
|
||||
@@ -102,7 +103,7 @@ async function buildApp(): Promise<FastifyInstance> {
|
||||
status: 'healthy',
|
||||
scope: 'api',
|
||||
timestamp: new Date().toISOString(),
|
||||
features: ['admin', 'auth', 'config', 'onboarding', 'vehicles', 'documents', 'fuel-logs', 'stations', 'maintenance', 'platform', 'notifications', 'user-profile', 'user-preferences', 'user-export']
|
||||
features: ['admin', 'auth', 'config', 'onboarding', 'vehicles', 'documents', 'fuel-logs', 'stations', 'maintenance', 'platform', 'notifications', 'user-profile', 'user-preferences', 'user-export', 'user-import']
|
||||
});
|
||||
});
|
||||
|
||||
@@ -143,6 +144,7 @@ async function buildApp(): Promise<FastifyInstance> {
|
||||
await app.register(userProfileRoutes, { prefix: '/api' });
|
||||
await app.register(userPreferencesRoutes, { prefix: '/api' });
|
||||
await app.register(userExportRoutes, { prefix: '/api' });
|
||||
await app.register(userImportRoutes, { prefix: '/api' });
|
||||
await app.register(configRoutes, { prefix: '/api' });
|
||||
|
||||
// 404 handler
|
||||
|
||||
235
backend/src/features/user-import/api/user-import.controller.ts
Normal file
235
backend/src/features/user-import/api/user-import.controller.ts
Normal file
@@ -0,0 +1,235 @@
|
||||
/**
|
||||
* @ai-summary Controller for user data import endpoints
|
||||
* @ai-context Handles multipart uploads, validation, and import orchestration
|
||||
*/
|
||||
|
||||
import { FastifyRequest, FastifyReply } from 'fastify';
|
||||
import * as fsp from 'fs/promises';
|
||||
import * as path from 'path';
|
||||
import FileType from 'file-type';
|
||||
import { logger } from '../../../core/logging/logger';
|
||||
import { pool } from '../../../core/config/database';
|
||||
import { UserImportService } from '../domain/user-import.service';
|
||||
import { importRequestSchema } from './user-import.validation';
|
||||
|
||||
export class UserImportController {
|
||||
private readonly importService: UserImportService;
|
||||
|
||||
constructor() {
|
||||
this.importService = new UserImportService(pool);
|
||||
}
|
||||
|
||||
/**
|
||||
* POST /api/user/import
|
||||
* Uploads and imports user data archive
|
||||
*/
|
||||
async uploadAndImport(request: FastifyRequest, reply: FastifyReply): Promise<void> {
|
||||
const userId = request.user?.sub;
|
||||
if (!userId) {
|
||||
return reply.code(401).send({ error: 'Unauthorized' });
|
||||
}
|
||||
|
||||
logger.info('Processing user data import request', { userId });
|
||||
|
||||
let tempFilePath: string | null = null;
|
||||
|
||||
try {
|
||||
// Get multipart file
|
||||
const data = await request.file();
|
||||
|
||||
if (!data) {
|
||||
return reply.code(400).send({
|
||||
error: 'Bad Request',
|
||||
message: 'No file uploaded',
|
||||
});
|
||||
}
|
||||
|
||||
// Validate Content-Type header
|
||||
const contentType = data.mimetype;
|
||||
const allowedTypes = ['application/gzip', 'application/x-gzip', 'application/x-tar'];
|
||||
|
||||
if (!allowedTypes.includes(contentType)) {
|
||||
logger.warn('Invalid Content-Type for import upload', {
|
||||
userId,
|
||||
contentType,
|
||||
fileName: data.filename,
|
||||
});
|
||||
return reply.code(415).send({
|
||||
error: 'Unsupported Media Type',
|
||||
message: 'Only tar.gz archives are allowed (application/gzip)',
|
||||
});
|
||||
}
|
||||
|
||||
// Read file to buffer for magic byte validation
|
||||
const chunks: Buffer[] = [];
|
||||
for await (const chunk of data.file) {
|
||||
chunks.push(chunk);
|
||||
}
|
||||
const fileBuffer = Buffer.concat(chunks);
|
||||
|
||||
// Validate actual file content using magic bytes
|
||||
const detectedType = await FileType.fromBuffer(fileBuffer);
|
||||
|
||||
if (!detectedType || detectedType.mime !== 'application/gzip') {
|
||||
logger.warn('File content does not match gzip format', {
|
||||
userId,
|
||||
detectedType: detectedType?.mime,
|
||||
fileName: data.filename,
|
||||
});
|
||||
return reply.code(415).send({
|
||||
error: 'Unsupported Media Type',
|
||||
message: 'File content is not a valid gzip archive',
|
||||
});
|
||||
}
|
||||
|
||||
// Save to temp file for processing
|
||||
const timestamp = Date.now();
|
||||
tempFilePath = path.join('/tmp', `import-upload-${userId}-${timestamp}.tar.gz`);
|
||||
await fsp.writeFile(tempFilePath, fileBuffer);
|
||||
|
||||
logger.info('Import archive uploaded and validated', { userId, tempFilePath });
|
||||
|
||||
// Parse request body for mode (if provided)
|
||||
const fields: Record<string, any> = {};
|
||||
if (data.fields) {
|
||||
for (const [key, value] of Object.entries(data.fields)) {
|
||||
fields[key] = (value as any).value;
|
||||
}
|
||||
}
|
||||
|
||||
const validatedFields = importRequestSchema.parse(fields);
|
||||
const mode = validatedFields.mode || 'merge';
|
||||
|
||||
// Execute import based on mode
|
||||
let result;
|
||||
if (mode === 'replace') {
|
||||
result = await this.importService.executeReplace(userId, tempFilePath);
|
||||
} else {
|
||||
result = await this.importService.executeMerge(userId, tempFilePath);
|
||||
}
|
||||
|
||||
logger.info('Import completed', { userId, mode, result });
|
||||
|
||||
return reply.code(200).send(result);
|
||||
} catch (error) {
|
||||
logger.error('Import failed', {
|
||||
userId,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
stack: error instanceof Error ? error.stack : undefined,
|
||||
});
|
||||
|
||||
return reply.code(500).send({
|
||||
error: 'Internal Server Error',
|
||||
message: error instanceof Error ? error.message : 'Import failed',
|
||||
});
|
||||
} finally {
|
||||
// Cleanup temp upload file
|
||||
if (tempFilePath) {
|
||||
try {
|
||||
await fsp.unlink(tempFilePath);
|
||||
} catch {
|
||||
// Cleanup failed, but continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* POST /api/user/import/preview
|
||||
* Generates preview of import data without executing import
|
||||
*/
|
||||
async generatePreview(request: FastifyRequest, reply: FastifyReply): Promise<void> {
|
||||
const userId = request.user?.sub;
|
||||
if (!userId) {
|
||||
return reply.code(401).send({ error: 'Unauthorized' });
|
||||
}
|
||||
|
||||
logger.info('Generating import preview', { userId });
|
||||
|
||||
let tempFilePath: string | null = null;
|
||||
|
||||
try {
|
||||
// Get multipart file
|
||||
const data = await request.file();
|
||||
|
||||
if (!data) {
|
||||
return reply.code(400).send({
|
||||
error: 'Bad Request',
|
||||
message: 'No file uploaded',
|
||||
});
|
||||
}
|
||||
|
||||
// Validate Content-Type header
|
||||
const contentType = data.mimetype;
|
||||
const allowedTypes = ['application/gzip', 'application/x-gzip', 'application/x-tar'];
|
||||
|
||||
if (!allowedTypes.includes(contentType)) {
|
||||
logger.warn('Invalid Content-Type for preview upload', {
|
||||
userId,
|
||||
contentType,
|
||||
fileName: data.filename,
|
||||
});
|
||||
return reply.code(415).send({
|
||||
error: 'Unsupported Media Type',
|
||||
message: 'Only tar.gz archives are allowed (application/gzip)',
|
||||
});
|
||||
}
|
||||
|
||||
// Read file to buffer for magic byte validation
|
||||
const chunks: Buffer[] = [];
|
||||
for await (const chunk of data.file) {
|
||||
chunks.push(chunk);
|
||||
}
|
||||
const fileBuffer = Buffer.concat(chunks);
|
||||
|
||||
// Validate actual file content using magic bytes
|
||||
const detectedType = await FileType.fromBuffer(fileBuffer);
|
||||
|
||||
if (!detectedType || detectedType.mime !== 'application/gzip') {
|
||||
logger.warn('File content does not match gzip format', {
|
||||
userId,
|
||||
detectedType: detectedType?.mime,
|
||||
fileName: data.filename,
|
||||
});
|
||||
return reply.code(415).send({
|
||||
error: 'Unsupported Media Type',
|
||||
message: 'File content is not a valid gzip archive',
|
||||
});
|
||||
}
|
||||
|
||||
// Save to temp file for processing
|
||||
const timestamp = Date.now();
|
||||
tempFilePath = path.join('/tmp', `import-preview-${userId}-${timestamp}.tar.gz`);
|
||||
await fsp.writeFile(tempFilePath, fileBuffer);
|
||||
|
||||
logger.info('Preview archive uploaded and validated', { userId, tempFilePath });
|
||||
|
||||
// Generate preview
|
||||
const preview = await this.importService.generatePreview(userId, tempFilePath);
|
||||
|
||||
logger.info('Preview generated', { userId, preview });
|
||||
|
||||
return reply.code(200).send(preview);
|
||||
} catch (error) {
|
||||
logger.error('Preview generation failed', {
|
||||
userId,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
stack: error instanceof Error ? error.stack : undefined,
|
||||
});
|
||||
|
||||
return reply.code(500).send({
|
||||
error: 'Internal Server Error',
|
||||
message: error instanceof Error ? error.message : 'Preview generation failed',
|
||||
});
|
||||
} finally {
|
||||
// Cleanup temp upload file
|
||||
if (tempFilePath) {
|
||||
try {
|
||||
await fsp.unlink(tempFilePath);
|
||||
} catch {
|
||||
// Cleanup failed, but continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
21
backend/src/features/user-import/api/user-import.routes.ts
Normal file
21
backend/src/features/user-import/api/user-import.routes.ts
Normal file
@@ -0,0 +1,21 @@
|
||||
/**
|
||||
* @ai-summary User import routes
|
||||
* @ai-context Route definitions for user data import
|
||||
*/
|
||||
|
||||
import { FastifyPluginAsync } from 'fastify';
|
||||
import { UserImportController } from './user-import.controller';
|
||||
|
||||
export const userImportRoutes: FastifyPluginAsync = async (fastify) => {
|
||||
const controller = new UserImportController();
|
||||
|
||||
fastify.post('/user/import', {
|
||||
preHandler: [(fastify as any).authenticate],
|
||||
handler: controller.uploadAndImport.bind(controller),
|
||||
});
|
||||
|
||||
fastify.post('/user/import/preview', {
|
||||
preHandler: [(fastify as any).authenticate],
|
||||
handler: controller.generatePreview.bind(controller),
|
||||
});
|
||||
};
|
||||
@@ -0,0 +1,12 @@
|
||||
/**
|
||||
* @ai-summary Validation schemas for user import API
|
||||
* @ai-context Zod schemas for import request validation
|
||||
*/
|
||||
|
||||
import { z } from 'zod';
|
||||
|
||||
export const importRequestSchema = z.object({
|
||||
mode: z.enum(['merge', 'replace']).optional(),
|
||||
});
|
||||
|
||||
export type ImportRequest = z.infer<typeof importRequestSchema>;
|
||||
606
backend/src/features/user-import/domain/user-import.service.ts
Normal file
606
backend/src/features/user-import/domain/user-import.service.ts
Normal file
@@ -0,0 +1,606 @@
|
||||
/**
|
||||
* @ai-summary Service for importing user data from exported archives
|
||||
* @ai-context Orchestrates import process with merge/replace modes and batch operations
|
||||
*/
|
||||
|
||||
import * as fsp from 'fs/promises';
|
||||
import * as path from 'path';
|
||||
import { Pool, PoolClient } from 'pg';
|
||||
import { logger } from '../../../core/logging/logger';
|
||||
import { getStorageService } from '../../../core/storage/storage.service';
|
||||
import { VehiclesRepository } from '../../vehicles/data/vehicles.repository';
|
||||
import { FuelLogsRepository } from '../../fuel-logs/data/fuel-logs.repository';
|
||||
import { DocumentsRepository } from '../../documents/data/documents.repository';
|
||||
import { MaintenanceRepository } from '../../maintenance/data/maintenance.repository';
|
||||
import { UserImportArchiveService } from './user-import-archive.service';
|
||||
import { ImportPreview, ImportResult, USER_IMPORT_CONFIG } from './user-import.types';
|
||||
|
||||
export class UserImportService {
|
||||
private readonly archiveService: UserImportArchiveService;
|
||||
private readonly vehiclesRepo: VehiclesRepository;
|
||||
private readonly fuelLogsRepo: FuelLogsRepository;
|
||||
private readonly maintenanceRepo: MaintenanceRepository;
|
||||
private readonly documentsRepo: DocumentsRepository;
|
||||
private readonly storageService: ReturnType<typeof getStorageService>;
|
||||
|
||||
constructor(private pool: Pool) {
|
||||
this.archiveService = new UserImportArchiveService();
|
||||
this.vehiclesRepo = new VehiclesRepository(pool);
|
||||
this.fuelLogsRepo = new FuelLogsRepository(pool);
|
||||
this.maintenanceRepo = new MaintenanceRepository(pool);
|
||||
this.documentsRepo = new DocumentsRepository(pool);
|
||||
this.storageService = getStorageService();
|
||||
}
|
||||
|
||||
/**
|
||||
* Generates preview of import data including conflict detection
|
||||
*/
|
||||
async generatePreview(userId: string, archivePath: string): Promise<ImportPreview> {
|
||||
logger.info('Generating import preview', { userId, archivePath });
|
||||
|
||||
// Extract and validate archive
|
||||
const validation = await this.archiveService.extractAndValidate(archivePath, userId);
|
||||
|
||||
if (!validation.valid || !validation.manifest || !validation.extractedPath) {
|
||||
throw new Error(`Invalid archive: ${validation.errors.join(', ')}`);
|
||||
}
|
||||
|
||||
const { manifest, extractedPath } = validation;
|
||||
|
||||
try {
|
||||
// Detect VIN conflicts
|
||||
const vehicles = await this.archiveService.readDataFile<any>(extractedPath, 'vehicles.json');
|
||||
const vinsToCheck = vehicles
|
||||
.filter((v: any) => v.vin && v.vin.trim().length > 0)
|
||||
.map((v: any) => v.vin.trim());
|
||||
|
||||
let vinConflictCount = 0;
|
||||
if (vinsToCheck.length > 0) {
|
||||
const query = `
|
||||
SELECT COUNT(DISTINCT vin) as count
|
||||
FROM vehicles
|
||||
WHERE user_id = $1 AND vin = ANY($2::text[]) AND is_active = true
|
||||
`;
|
||||
const result = await this.pool.query(query, [userId, vinsToCheck]);
|
||||
vinConflictCount = parseInt(result.rows[0].count, 10);
|
||||
}
|
||||
|
||||
// Get sample records (first 3 of each type)
|
||||
const fuelLogs = await this.archiveService.readDataFile<any>(extractedPath, 'fuel-logs.json');
|
||||
const documents = await this.archiveService.readDataFile<any>(extractedPath, 'documents.json');
|
||||
const maintenanceRecords = await this.archiveService.readDataFile<any>(extractedPath, 'maintenance-records.json');
|
||||
const maintenanceSchedules = await this.archiveService.readDataFile<any>(extractedPath, 'maintenance-schedules.json');
|
||||
|
||||
return {
|
||||
manifest,
|
||||
conflicts: {
|
||||
vehicles: vinConflictCount,
|
||||
},
|
||||
sampleRecords: {
|
||||
vehicles: vehicles.slice(0, 3),
|
||||
fuelLogs: fuelLogs.slice(0, 3),
|
||||
documents: documents.slice(0, 3),
|
||||
maintenanceRecords: maintenanceRecords.slice(0, 3),
|
||||
maintenanceSchedules: maintenanceSchedules.slice(0, 3),
|
||||
},
|
||||
};
|
||||
} catch (error) {
|
||||
logger.error('Error generating preview', {
|
||||
userId,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
});
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes merge mode import: UPDATE existing records, INSERT new records
|
||||
* Partial success - continues on errors, reports in summary
|
||||
*/
|
||||
async executeMerge(userId: string, archivePath: string): Promise<ImportResult> {
|
||||
logger.info('Executing merge mode import', { userId, archivePath });
|
||||
|
||||
const validation = await this.archiveService.extractAndValidate(archivePath, userId);
|
||||
|
||||
if (!validation.valid || !validation.manifest || !validation.extractedPath) {
|
||||
throw new Error(`Invalid archive: ${validation.errors.join(', ')}`);
|
||||
}
|
||||
|
||||
const { extractedPath } = validation;
|
||||
|
||||
const summary = {
|
||||
imported: 0,
|
||||
updated: 0,
|
||||
skipped: 0,
|
||||
errors: [] as string[],
|
||||
};
|
||||
const warnings: string[] = [];
|
||||
|
||||
try {
|
||||
// Import vehicles with conflict resolution
|
||||
await this.mergeVehicles(userId, extractedPath, summary);
|
||||
|
||||
// Import fuel logs (batch insert, skip conflicts)
|
||||
await this.mergeFuelLogs(userId, extractedPath, summary);
|
||||
|
||||
// Import maintenance records
|
||||
await this.mergeMaintenanceRecords(userId, extractedPath, summary);
|
||||
|
||||
// Import maintenance schedules
|
||||
await this.mergeMaintenanceSchedules(userId, extractedPath, summary);
|
||||
|
||||
// Import documents
|
||||
await this.mergeDocuments(userId, extractedPath, summary);
|
||||
|
||||
// Copy files from archive
|
||||
await this.copyFiles(userId, extractedPath, warnings);
|
||||
|
||||
return {
|
||||
success: summary.errors.length === 0,
|
||||
mode: 'merge',
|
||||
summary,
|
||||
warnings,
|
||||
};
|
||||
} finally {
|
||||
// Always cleanup temp directory
|
||||
await this.archiveService.cleanup(extractedPath);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes replace mode import: DELETE all user data, INSERT all records
|
||||
* All-or-nothing transaction
|
||||
*/
|
||||
async executeReplace(userId: string, archivePath: string): Promise<ImportResult> {
|
||||
logger.info('Executing replace mode import', { userId, archivePath });
|
||||
|
||||
const validation = await this.archiveService.extractAndValidate(archivePath, userId);
|
||||
|
||||
if (!validation.valid || !validation.manifest || !validation.extractedPath) {
|
||||
throw new Error(`Invalid archive: ${validation.errors.join(', ')}`);
|
||||
}
|
||||
|
||||
const { extractedPath } = validation;
|
||||
|
||||
const summary = {
|
||||
imported: 0,
|
||||
updated: 0,
|
||||
skipped: 0,
|
||||
errors: [] as string[],
|
||||
};
|
||||
const warnings: string[] = [];
|
||||
|
||||
const client = await this.pool.connect();
|
||||
|
||||
try {
|
||||
await client.query('BEGIN');
|
||||
|
||||
// Delete existing data in correct order to avoid FK violations
|
||||
logger.info('Deleting existing user data', { userId });
|
||||
|
||||
// Delete maintenance records (no FK to vehicles)
|
||||
await client.query('DELETE FROM maintenance_records WHERE user_id = $1', [userId]);
|
||||
|
||||
// Delete maintenance schedules (no FK to vehicles)
|
||||
await client.query('DELETE FROM maintenance_schedules WHERE user_id = $1', [userId]);
|
||||
|
||||
// Delete vehicles (CASCADE to fuel_logs and documents)
|
||||
await client.query('DELETE FROM vehicles WHERE user_id = $1', [userId]);
|
||||
|
||||
// Import all data using batch operations
|
||||
await this.insertVehicles(userId, extractedPath, summary, client);
|
||||
await this.insertFuelLogs(userId, extractedPath, summary, client);
|
||||
await this.insertMaintenanceRecords(userId, extractedPath, summary, client);
|
||||
await this.insertMaintenanceSchedules(userId, extractedPath, summary, client);
|
||||
await this.insertDocuments(userId, extractedPath, summary, client);
|
||||
|
||||
// Copy files from archive
|
||||
await this.copyFiles(userId, extractedPath, warnings);
|
||||
|
||||
await client.query('COMMIT');
|
||||
|
||||
return {
|
||||
success: true,
|
||||
mode: 'replace',
|
||||
summary,
|
||||
warnings,
|
||||
};
|
||||
} catch (error) {
|
||||
await client.query('ROLLBACK');
|
||||
logger.error('Replace mode import failed, rolled back', {
|
||||
userId,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
});
|
||||
throw error;
|
||||
} finally {
|
||||
client.release();
|
||||
await this.archiveService.cleanup(extractedPath);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Merge vehicles: UPDATE existing by VIN, INSERT new
|
||||
*/
|
||||
private async mergeVehicles(
|
||||
userId: string,
|
||||
extractedPath: string,
|
||||
summary: ImportResult['summary']
|
||||
): Promise<void> {
|
||||
const vehicles = await this.archiveService.readDataFile<any>(extractedPath, 'vehicles.json');
|
||||
|
||||
if (vehicles.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Process in chunks
|
||||
const chunkSize = USER_IMPORT_CONFIG.chunkSize;
|
||||
for (let i = 0; i < vehicles.length; i += chunkSize) {
|
||||
const chunk = vehicles.slice(i, i + chunkSize);
|
||||
|
||||
for (const vehicle of chunk) {
|
||||
try {
|
||||
// Check if vehicle exists by VIN
|
||||
if (vehicle.vin && vehicle.vin.trim().length > 0) {
|
||||
const existing = await this.vehiclesRepo.findByUserAndVIN(userId, vehicle.vin.trim());
|
||||
|
||||
if (existing) {
|
||||
// Update existing vehicle
|
||||
await this.vehiclesRepo.update(existing.id, {
|
||||
make: vehicle.make,
|
||||
model: vehicle.model,
|
||||
year: vehicle.year,
|
||||
engine: vehicle.engine,
|
||||
transmission: vehicle.transmission,
|
||||
trimLevel: vehicle.trimLevel,
|
||||
driveType: vehicle.driveType,
|
||||
fuelType: vehicle.fuelType,
|
||||
nickname: vehicle.nickname,
|
||||
color: vehicle.color,
|
||||
licensePlate: vehicle.licensePlate,
|
||||
odometerReading: vehicle.odometerReading,
|
||||
});
|
||||
summary.updated++;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// Insert new vehicle
|
||||
await this.vehiclesRepo.create({
|
||||
...vehicle,
|
||||
userId,
|
||||
});
|
||||
summary.imported++;
|
||||
} catch (error) {
|
||||
summary.errors.push(`Vehicle import failed: ${error instanceof Error ? error.message : String(error)}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Merge fuel logs: batch insert new records
|
||||
*/
|
||||
private async mergeFuelLogs(
|
||||
userId: string,
|
||||
extractedPath: string,
|
||||
summary: ImportResult['summary']
|
||||
): Promise<void> {
|
||||
const fuelLogs = await this.archiveService.readDataFile<any>(extractedPath, 'fuel-logs.json');
|
||||
|
||||
if (fuelLogs.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const chunkSize = USER_IMPORT_CONFIG.chunkSize;
|
||||
for (let i = 0; i < fuelLogs.length; i += chunkSize) {
|
||||
const chunk = fuelLogs.slice(i, i + chunkSize);
|
||||
|
||||
try {
|
||||
const inserted = await this.fuelLogsRepo.batchInsert(
|
||||
chunk.map((log: any) => ({ ...log, userId }))
|
||||
);
|
||||
summary.imported += inserted.length;
|
||||
} catch (error) {
|
||||
summary.errors.push(`Fuel logs batch import failed: ${error instanceof Error ? error.message : String(error)}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Merge maintenance records: batch insert new records
|
||||
*/
|
||||
private async mergeMaintenanceRecords(
|
||||
userId: string,
|
||||
extractedPath: string,
|
||||
summary: ImportResult['summary']
|
||||
): Promise<void> {
|
||||
const records = await this.archiveService.readDataFile<any>(extractedPath, 'maintenance-records.json');
|
||||
|
||||
if (records.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const chunkSize = USER_IMPORT_CONFIG.chunkSize;
|
||||
for (let i = 0; i < records.length; i += chunkSize) {
|
||||
const chunk = records.slice(i, i + chunkSize);
|
||||
|
||||
try {
|
||||
const inserted = await this.maintenanceRepo.batchInsertRecords(
|
||||
chunk.map((record: any) => ({ ...record, userId }))
|
||||
);
|
||||
summary.imported += inserted.length;
|
||||
} catch (error) {
|
||||
summary.errors.push(`Maintenance records batch import failed: ${error instanceof Error ? error.message : String(error)}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Merge maintenance schedules: batch insert new records
|
||||
*/
|
||||
private async mergeMaintenanceSchedules(
|
||||
userId: string,
|
||||
extractedPath: string,
|
||||
summary: ImportResult['summary']
|
||||
): Promise<void> {
|
||||
const schedules = await this.archiveService.readDataFile<any>(extractedPath, 'maintenance-schedules.json');
|
||||
|
||||
if (schedules.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const chunkSize = USER_IMPORT_CONFIG.chunkSize;
|
||||
for (let i = 0; i < schedules.length; i += chunkSize) {
|
||||
const chunk = schedules.slice(i, i + chunkSize);
|
||||
|
||||
try {
|
||||
const inserted = await this.maintenanceRepo.batchInsertSchedules(
|
||||
chunk.map((schedule: any) => ({ ...schedule, userId }))
|
||||
);
|
||||
summary.imported += inserted.length;
|
||||
} catch (error) {
|
||||
summary.errors.push(`Maintenance schedules batch import failed: ${error instanceof Error ? error.message : String(error)}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Merge documents: batch insert new records
|
||||
*/
|
||||
private async mergeDocuments(
|
||||
userId: string,
|
||||
extractedPath: string,
|
||||
summary: ImportResult['summary']
|
||||
): Promise<void> {
|
||||
const documents = await this.archiveService.readDataFile<any>(extractedPath, 'documents.json');
|
||||
|
||||
if (documents.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const chunkSize = USER_IMPORT_CONFIG.chunkSize;
|
||||
for (let i = 0; i < documents.length; i += chunkSize) {
|
||||
const chunk = documents.slice(i, i + chunkSize);
|
||||
|
||||
try {
|
||||
const inserted = await this.documentsRepo.batchInsert(
|
||||
chunk.map((doc: any) => ({ ...doc, userId }))
|
||||
);
|
||||
summary.imported += inserted.length;
|
||||
} catch (error) {
|
||||
summary.errors.push(`Documents batch import failed: ${error instanceof Error ? error.message : String(error)}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Insert vehicles using batch operation (for replace mode)
|
||||
*/
|
||||
private async insertVehicles(
|
||||
userId: string,
|
||||
extractedPath: string,
|
||||
summary: ImportResult['summary'],
|
||||
client: PoolClient
|
||||
): Promise<void> {
|
||||
const vehicles = await this.archiveService.readDataFile<any>(extractedPath, 'vehicles.json');
|
||||
|
||||
if (vehicles.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const chunkSize = USER_IMPORT_CONFIG.chunkSize;
|
||||
for (let i = 0; i < vehicles.length; i += chunkSize) {
|
||||
const chunk = vehicles.slice(i, i + chunkSize);
|
||||
|
||||
const inserted = await this.vehiclesRepo.batchInsert(
|
||||
chunk.map((vehicle: any) => ({ ...vehicle, userId })),
|
||||
client
|
||||
);
|
||||
summary.imported += inserted.length;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Insert fuel logs using batch operation (for replace mode)
|
||||
*/
|
||||
private async insertFuelLogs(
|
||||
userId: string,
|
||||
extractedPath: string,
|
||||
summary: ImportResult['summary'],
|
||||
client: PoolClient
|
||||
): Promise<void> {
|
||||
const fuelLogs = await this.archiveService.readDataFile<any>(extractedPath, 'fuel-logs.json');
|
||||
|
||||
if (fuelLogs.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const chunkSize = USER_IMPORT_CONFIG.chunkSize;
|
||||
for (let i = 0; i < fuelLogs.length; i += chunkSize) {
|
||||
const chunk = fuelLogs.slice(i, i + chunkSize);
|
||||
|
||||
const inserted = await this.fuelLogsRepo.batchInsert(
|
||||
chunk.map((log: any) => ({ ...log, userId })),
|
||||
client
|
||||
);
|
||||
summary.imported += inserted.length;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Insert maintenance records using batch operation (for replace mode)
|
||||
*/
|
||||
private async insertMaintenanceRecords(
|
||||
userId: string,
|
||||
extractedPath: string,
|
||||
summary: ImportResult['summary'],
|
||||
client: PoolClient
|
||||
): Promise<void> {
|
||||
const records = await this.archiveService.readDataFile<any>(extractedPath, 'maintenance-records.json');
|
||||
|
||||
if (records.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const chunkSize = USER_IMPORT_CONFIG.chunkSize;
|
||||
for (let i = 0; i < records.length; i += chunkSize) {
|
||||
const chunk = records.slice(i, i + chunkSize);
|
||||
|
||||
const inserted = await this.maintenanceRepo.batchInsertRecords(
|
||||
chunk.map((record: any) => ({ ...record, userId })),
|
||||
client
|
||||
);
|
||||
summary.imported += inserted.length;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Insert maintenance schedules using batch operation (for replace mode)
|
||||
*/
|
||||
private async insertMaintenanceSchedules(
|
||||
userId: string,
|
||||
extractedPath: string,
|
||||
summary: ImportResult['summary'],
|
||||
client: PoolClient
|
||||
): Promise<void> {
|
||||
const schedules = await this.archiveService.readDataFile<any>(extractedPath, 'maintenance-schedules.json');
|
||||
|
||||
if (schedules.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const chunkSize = USER_IMPORT_CONFIG.chunkSize;
|
||||
for (let i = 0; i < schedules.length; i += chunkSize) {
|
||||
const chunk = schedules.slice(i, i + chunkSize);
|
||||
|
||||
const inserted = await this.maintenanceRepo.batchInsertSchedules(
|
||||
chunk.map((schedule: any) => ({ ...schedule, userId })),
|
||||
client
|
||||
);
|
||||
summary.imported += inserted.length;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Insert documents using batch operation (for replace mode)
|
||||
*/
|
||||
private async insertDocuments(
|
||||
userId: string,
|
||||
extractedPath: string,
|
||||
summary: ImportResult['summary'],
|
||||
client: PoolClient
|
||||
): Promise<void> {
|
||||
const documents = await this.archiveService.readDataFile<any>(extractedPath, 'documents.json');
|
||||
|
||||
if (documents.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const chunkSize = USER_IMPORT_CONFIG.chunkSize;
|
||||
for (let i = 0; i < documents.length; i += chunkSize) {
|
||||
const chunk = documents.slice(i, i + chunkSize);
|
||||
|
||||
const inserted = await this.documentsRepo.batchInsert(
|
||||
chunk.map((doc: any) => ({ ...doc, userId })),
|
||||
client
|
||||
);
|
||||
summary.imported += inserted.length;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Copy vehicle images and document files from archive to storage
|
||||
*/
|
||||
private async copyFiles(
|
||||
_userId: string,
|
||||
extractedPath: string,
|
||||
warnings: string[]
|
||||
): Promise<void> {
|
||||
const filesPath = path.join(extractedPath, 'files');
|
||||
|
||||
try {
|
||||
await fsp.access(filesPath);
|
||||
} catch {
|
||||
// No files directory in archive
|
||||
return;
|
||||
}
|
||||
|
||||
// Copy vehicle images
|
||||
const vehicleImagesPath = path.join(filesPath, 'vehicle-images');
|
||||
try {
|
||||
await fsp.access(vehicleImagesPath);
|
||||
const vehicleIds = await fsp.readdir(vehicleImagesPath);
|
||||
|
||||
for (const vehicleId of vehicleIds) {
|
||||
const vehicleDir = path.join(vehicleImagesPath, vehicleId);
|
||||
const stat = await fsp.stat(vehicleDir);
|
||||
|
||||
if (!stat.isDirectory()) continue;
|
||||
|
||||
const images = await fsp.readdir(vehicleDir);
|
||||
for (const image of images) {
|
||||
try {
|
||||
const sourcePath = path.join(vehicleDir, image);
|
||||
const fileBuffer = await fsp.readFile(sourcePath);
|
||||
const key = `vehicle-images/${vehicleId}/${image}`;
|
||||
|
||||
await this.storageService.putObject('documents', key, fileBuffer);
|
||||
} catch (error) {
|
||||
warnings.push(`Failed to copy vehicle image ${vehicleId}/${image}: ${error instanceof Error ? error.message : String(error)}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// No vehicle images
|
||||
}
|
||||
|
||||
// Copy document files
|
||||
const documentsPath = path.join(filesPath, 'documents');
|
||||
try {
|
||||
await fsp.access(documentsPath);
|
||||
const documentIds = await fsp.readdir(documentsPath);
|
||||
|
||||
for (const documentId of documentIds) {
|
||||
const documentDir = path.join(documentsPath, documentId);
|
||||
const stat = await fsp.stat(documentDir);
|
||||
|
||||
if (!stat.isDirectory()) continue;
|
||||
|
||||
const files = await fsp.readdir(documentDir);
|
||||
for (const file of files) {
|
||||
try {
|
||||
const sourcePath = path.join(documentDir, file);
|
||||
const fileBuffer = await fsp.readFile(sourcePath);
|
||||
const key = `documents/${documentId}/${file}`;
|
||||
|
||||
await this.storageService.putObject('documents', key, fileBuffer);
|
||||
} catch (error) {
|
||||
warnings.push(`Failed to copy document file ${documentId}/${file}: ${error instanceof Error ? error.message : String(error)}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// No document files
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user