From 852c9013b5a52ababcd74aba4934b3fddb31d3cd Mon Sep 17 00:00:00 2001 From: Eric Gullickson <16152721+ericgullickson@users.noreply.github.com> Date: Sun, 1 Feb 2026 16:02:11 -0600 Subject: [PATCH] feat: add core OCR API integration (refs #65) OCR Service (Python/FastAPI): - POST /extract for synchronous OCR extraction - POST /jobs and GET /jobs/{job_id} for async processing - Image preprocessing (deskew, denoise) for accuracy - HEIC conversion via pillow-heif - Redis job queue for async processing Backend (Fastify): - POST /api/ocr/extract - authenticated proxy to OCR - POST /api/ocr/jobs - async job submission - GET /api/ocr/jobs/:jobId - job polling - Multipart file upload handling - JWT authentication required File size limits: 10MB sync, 200MB async Processing time target: <3 seconds for typical photos Co-Authored-By: Claude Opus 4.5 --- .gitignore | 3 + backend/package.json | 1 + backend/src/app.ts | 6 +- backend/src/features/CLAUDE.md | 1 + backend/src/features/ocr/README.md | 54 ++++ .../src/features/ocr/api/ocr.controller.ts | 275 ++++++++++++++++++ backend/src/features/ocr/api/ocr.routes.ts | 31 ++ .../src/features/ocr/api/ocr.validation.ts | 18 ++ .../src/features/ocr/domain/ocr.service.ts | 208 +++++++++++++ backend/src/features/ocr/domain/ocr.types.ts | 53 ++++ .../src/features/ocr/external/ocr-client.ts | 229 +++++++++++++++ backend/src/features/ocr/index.ts | 11 + docker-compose.yml | 6 + ocr/app/config.py | 5 + ocr/app/main.py | 35 +++ ocr/app/models/__init__.py | 18 ++ ocr/app/models/schemas.py | 65 +++++ ocr/app/routers/__init__.py | 5 + ocr/app/routers/extract.py | 69 +++++ ocr/app/routers/jobs.py | 142 +++++++++ ocr/app/services/__init__.py | 6 + ocr/app/services/job_queue.py | 233 +++++++++++++++ ocr/app/services/ocr_service.py | 275 ++++++++++++++++++ ocr/app/services/preprocessor.py | 176 +++++++++++ ocr/requirements.txt | 9 +- 25 files changed, 1931 insertions(+), 3 deletions(-) create mode 100644 backend/src/features/ocr/README.md create mode 100644 backend/src/features/ocr/api/ocr.controller.ts create mode 100644 backend/src/features/ocr/api/ocr.routes.ts create mode 100644 backend/src/features/ocr/api/ocr.validation.ts create mode 100644 backend/src/features/ocr/domain/ocr.service.ts create mode 100644 backend/src/features/ocr/domain/ocr.types.ts create mode 100644 backend/src/features/ocr/external/ocr-client.ts create mode 100644 backend/src/features/ocr/index.ts create mode 100644 ocr/app/models/__init__.py create mode 100644 ocr/app/models/schemas.py create mode 100644 ocr/app/routers/__init__.py create mode 100644 ocr/app/routers/extract.py create mode 100644 ocr/app/routers/jobs.py create mode 100644 ocr/app/services/__init__.py create mode 100644 ocr/app/services/job_queue.py create mode 100644 ocr/app/services/ocr_service.py create mode 100644 ocr/app/services/preprocessor.py diff --git a/.gitignore b/.gitignore index dc5fa5f..d6d3432 100644 --- a/.gitignore +++ b/.gitignore @@ -12,6 +12,9 @@ coverage/ *.swo .venv .playwright-mcp +__pycache__/ +*.py[cod] +*$py.class # K8s-aligned secret mounts (real files ignored; examples committed) secrets/** diff --git a/backend/package.json b/backend/package.json index 69725ba..6df708f 100644 --- a/backend/package.json +++ b/backend/package.json @@ -30,6 +30,7 @@ "fastify": "^5.2.0", "fastify-plugin": "^5.0.1", "file-type": "^16.5.4", + "form-data": "^4.0.0", "get-jwks": "^11.0.3", "ioredis": "^5.4.2", "js-yaml": "^4.1.0", diff --git a/backend/src/app.ts b/backend/src/app.ts index 19a88b9..b944cb5 100644 --- a/backend/src/app.ts +++ b/backend/src/app.ts @@ -34,6 +34,7 @@ import { userExportRoutes } from './features/user-export'; import { userImportRoutes } from './features/user-import'; import { ownershipCostsRoutes } from './features/ownership-costs'; import { subscriptionsRoutes, donationsRoutes, webhooksRoutes } from './features/subscriptions'; +import { ocrRoutes } from './features/ocr'; import { pool } from './core/config/database'; import { configRoutes } from './core/config/config.routes'; @@ -95,7 +96,7 @@ async function buildApp(): Promise { status: 'healthy', timestamp: new Date().toISOString(), environment: process.env['NODE_ENV'], - features: ['admin', 'auth', 'config', 'onboarding', 'vehicles', 'documents', 'fuel-logs', 'stations', 'maintenance', 'platform', 'notifications', 'user-profile', 'user-preferences', 'user-export', 'user-import', 'ownership-costs', 'subscriptions', 'donations'] + features: ['admin', 'auth', 'config', 'onboarding', 'vehicles', 'documents', 'fuel-logs', 'stations', 'maintenance', 'platform', 'notifications', 'user-profile', 'user-preferences', 'user-export', 'user-import', 'ownership-costs', 'subscriptions', 'donations', 'ocr'] }); }); @@ -105,7 +106,7 @@ async function buildApp(): Promise { status: 'healthy', scope: 'api', timestamp: new Date().toISOString(), - features: ['admin', 'auth', 'config', 'onboarding', 'vehicles', 'documents', 'fuel-logs', 'stations', 'maintenance', 'platform', 'notifications', 'user-profile', 'user-preferences', 'user-export', 'user-import', 'ownership-costs', 'subscriptions', 'donations'] + features: ['admin', 'auth', 'config', 'onboarding', 'vehicles', 'documents', 'fuel-logs', 'stations', 'maintenance', 'platform', 'notifications', 'user-profile', 'user-preferences', 'user-export', 'user-import', 'ownership-costs', 'subscriptions', 'donations', 'ocr'] }); }); @@ -151,6 +152,7 @@ async function buildApp(): Promise { await app.register(subscriptionsRoutes, { prefix: '/api' }); await app.register(donationsRoutes, { prefix: '/api' }); await app.register(webhooksRoutes, { prefix: '/api' }); + await app.register(ocrRoutes, { prefix: '/api' }); await app.register(configRoutes, { prefix: '/api' }); // 404 handler diff --git a/backend/src/features/CLAUDE.md b/backend/src/features/CLAUDE.md index af6560c..096db1d 100644 --- a/backend/src/features/CLAUDE.md +++ b/backend/src/features/CLAUDE.md @@ -14,6 +14,7 @@ Feature capsule directory. Each feature is 100% self-contained with api/, domain | `fuel-logs/` | Fuel consumption tracking | Fuel log CRUD, statistics | | `maintenance/` | Maintenance record management | Service records, reminders | | `notifications/` | Email and push notifications | Alert system, email templates | +| `ocr/` | OCR proxy to mvp-ocr service | Image text extraction, async jobs | | `onboarding/` | User onboarding flow | First-time user setup | | `platform/` | Vehicle data and VIN decoding | Make/model lookup, VIN validation | | `stations/` | Gas station search and favorites | Google Maps integration, station data | diff --git a/backend/src/features/ocr/README.md b/backend/src/features/ocr/README.md new file mode 100644 index 0000000..20442a4 --- /dev/null +++ b/backend/src/features/ocr/README.md @@ -0,0 +1,54 @@ +# OCR Feature + +Backend proxy for OCR service communication. Handles authentication, validation, and file streaming to the OCR container. + +## API Endpoints + +| Method | Endpoint | Description | +|--------|----------|-------------| +| POST | `/api/ocr/extract` | Synchronous OCR extraction (max 10MB) | +| POST | `/api/ocr/jobs` | Submit async OCR job (max 200MB) | +| GET | `/api/ocr/jobs/:jobId` | Poll async job status | + +## Architecture + +``` +api/ + ocr.controller.ts # Request handlers + ocr.routes.ts # Route registration + ocr.validation.ts # Request validation types +domain/ + ocr.service.ts # Business logic + ocr.types.ts # TypeScript types +external/ + ocr-client.ts # HTTP client to OCR service +``` + +## Supported File Types + +- HEIC (converted server-side) +- JPEG +- PNG +- PDF (first page only) + +## Response Format + +```typescript +interface OcrResponse { + success: boolean; + documentType: 'vin' | 'receipt' | 'manual' | 'unknown'; + rawText: string; + confidence: number; // 0.0 - 1.0 + extractedFields: Record; + processingTimeMs: number; +} +``` + +## Async Job Flow + +1. POST `/api/ocr/jobs` with file +2. Receive `{ jobId, status: 'pending' }` +3. Poll GET `/api/ocr/jobs/:jobId` +4. When `status: 'completed'`, result contains OCR data + +Jobs expire after 1 hour. diff --git a/backend/src/features/ocr/api/ocr.controller.ts b/backend/src/features/ocr/api/ocr.controller.ts new file mode 100644 index 0000000..bf27063 --- /dev/null +++ b/backend/src/features/ocr/api/ocr.controller.ts @@ -0,0 +1,275 @@ +/** + * @ai-summary Controller for OCR API endpoints + */ +import { FastifyReply, FastifyRequest } from 'fastify'; +import { logger } from '../../../core/logging/logger'; +import { ocrService } from '../domain/ocr.service'; +import type { ExtractQuery, JobIdParams, JobSubmitBody } from './ocr.validation'; + +/** Supported MIME types for OCR */ +const SUPPORTED_TYPES = new Set([ + 'image/jpeg', + 'image/png', + 'image/heic', + 'image/heif', + 'application/pdf', +]); + +export class OcrController { + /** + * POST /api/ocr/extract + * Extract text from an uploaded image using synchronous OCR. + */ + async extract( + request: FastifyRequest<{ Querystring: ExtractQuery }>, + reply: FastifyReply + ) { + const userId = (request as any).user?.sub as string; + const preprocess = request.query.preprocess !== false; + + logger.info('OCR extract requested', { + operation: 'ocr.controller.extract', + userId, + preprocess, + }); + + // Get uploaded file + const file = await (request as any).file({ limits: { files: 1 } }); + if (!file) { + logger.warn('No file provided for OCR', { + operation: 'ocr.controller.extract.no_file', + userId, + }); + return reply.code(400).send({ + error: 'Bad Request', + message: 'No file provided', + }); + } + + // Validate content type + const contentType = file.mimetype as string; + if (!SUPPORTED_TYPES.has(contentType)) { + logger.warn('Unsupported file type for OCR', { + operation: 'ocr.controller.extract.unsupported_type', + userId, + contentType, + fileName: file.filename, + }); + return reply.code(415).send({ + error: 'Unsupported Media Type', + message: `Unsupported file type: ${contentType}. Supported: JPEG, PNG, HEIC, PDF`, + }); + } + + // Read file content + const chunks: Buffer[] = []; + for await (const chunk of file.file) { + chunks.push(chunk); + } + const fileBuffer = Buffer.concat(chunks); + + if (fileBuffer.length === 0) { + logger.warn('Empty file provided for OCR', { + operation: 'ocr.controller.extract.empty_file', + userId, + fileName: file.filename, + }); + return reply.code(400).send({ + error: 'Bad Request', + message: 'Empty file provided', + }); + } + + try { + const result = await ocrService.extract(userId, { + fileBuffer, + contentType, + preprocess, + }); + + logger.info('OCR extract completed', { + operation: 'ocr.controller.extract.success', + userId, + success: result.success, + documentType: result.documentType, + processingTimeMs: result.processingTimeMs, + }); + + return reply.code(200).send(result); + } catch (error: any) { + if (error.statusCode === 413) { + return reply.code(413).send({ + error: 'Payload Too Large', + message: error.message, + }); + } + if (error.statusCode === 415) { + return reply.code(415).send({ + error: 'Unsupported Media Type', + message: error.message, + }); + } + + logger.error('OCR extract failed', { + operation: 'ocr.controller.extract.error', + userId, + error: error.message, + }); + + return reply.code(500).send({ + error: 'Internal Server Error', + message: 'OCR processing failed', + }); + } + } + + /** + * POST /api/ocr/jobs + * Submit an async OCR job for large files. + */ + async submitJob( + request: FastifyRequest<{ Body: JobSubmitBody }>, + reply: FastifyReply + ) { + const userId = (request as any).user?.sub as string; + + logger.info('OCR job submit requested', { + operation: 'ocr.controller.submitJob', + userId, + }); + + // Get uploaded file + const file = await (request as any).file({ limits: { files: 1 } }); + if (!file) { + logger.warn('No file provided for OCR job', { + operation: 'ocr.controller.submitJob.no_file', + userId, + }); + return reply.code(400).send({ + error: 'Bad Request', + message: 'No file provided', + }); + } + + // Validate content type + const contentType = file.mimetype as string; + if (!SUPPORTED_TYPES.has(contentType)) { + logger.warn('Unsupported file type for OCR job', { + operation: 'ocr.controller.submitJob.unsupported_type', + userId, + contentType, + fileName: file.filename, + }); + return reply.code(415).send({ + error: 'Unsupported Media Type', + message: `Unsupported file type: ${contentType}. Supported: JPEG, PNG, HEIC, PDF`, + }); + } + + // Read file content + const chunks: Buffer[] = []; + for await (const chunk of file.file) { + chunks.push(chunk); + } + const fileBuffer = Buffer.concat(chunks); + + if (fileBuffer.length === 0) { + logger.warn('Empty file provided for OCR job', { + operation: 'ocr.controller.submitJob.empty_file', + userId, + fileName: file.filename, + }); + return reply.code(400).send({ + error: 'Bad Request', + message: 'Empty file provided', + }); + } + + // Get callback URL from form data (if present) + const callbackUrl = file.fields?.callbackUrl?.value as string | undefined; + + try { + const result = await ocrService.submitJob(userId, { + fileBuffer, + contentType, + callbackUrl, + }); + + logger.info('OCR job submitted', { + operation: 'ocr.controller.submitJob.success', + userId, + jobId: result.jobId, + status: result.status, + }); + + return reply.code(202).send(result); + } catch (error: any) { + if (error.statusCode === 413) { + return reply.code(413).send({ + error: 'Payload Too Large', + message: error.message, + }); + } + if (error.statusCode === 415) { + return reply.code(415).send({ + error: 'Unsupported Media Type', + message: error.message, + }); + } + + logger.error('OCR job submit failed', { + operation: 'ocr.controller.submitJob.error', + userId, + error: error.message, + }); + + return reply.code(500).send({ + error: 'Internal Server Error', + message: 'Job submission failed', + }); + } + } + + /** + * GET /api/ocr/jobs/:jobId + * Get the status of an async OCR job. + */ + async getJobStatus( + request: FastifyRequest<{ Params: JobIdParams }>, + reply: FastifyReply + ) { + const userId = (request as any).user?.sub as string; + const { jobId } = request.params; + + logger.debug('OCR job status requested', { + operation: 'ocr.controller.getJobStatus', + userId, + jobId, + }); + + try { + const result = await ocrService.getJobStatus(userId, jobId); + + return reply.code(200).send(result); + } catch (error: any) { + if (error.statusCode === 404) { + return reply.code(404).send({ + error: 'Not Found', + message: error.message, + }); + } + + logger.error('OCR job status failed', { + operation: 'ocr.controller.getJobStatus.error', + userId, + jobId, + error: error.message, + }); + + return reply.code(500).send({ + error: 'Internal Server Error', + message: 'Failed to retrieve job status', + }); + } + } +} diff --git a/backend/src/features/ocr/api/ocr.routes.ts b/backend/src/features/ocr/api/ocr.routes.ts new file mode 100644 index 0000000..1251c2a --- /dev/null +++ b/backend/src/features/ocr/api/ocr.routes.ts @@ -0,0 +1,31 @@ +/** + * @ai-summary Fastify routes for OCR API + */ +import { FastifyInstance, FastifyPluginAsync, FastifyPluginOptions } from 'fastify'; +import { OcrController } from './ocr.controller'; + +export const ocrRoutes: FastifyPluginAsync = async ( + fastify: FastifyInstance, + _opts: FastifyPluginOptions +) => { + const ctrl = new OcrController(); + const requireAuth = fastify.authenticate.bind(fastify); + + // POST /api/ocr/extract - Synchronous OCR extraction + fastify.post('/ocr/extract', { + preHandler: [requireAuth], + handler: ctrl.extract.bind(ctrl), + }); + + // POST /api/ocr/jobs - Submit async OCR job + fastify.post('/ocr/jobs', { + preHandler: [requireAuth], + handler: ctrl.submitJob.bind(ctrl), + }); + + // GET /api/ocr/jobs/:jobId - Get job status + fastify.get<{ Params: { jobId: string } }>('/ocr/jobs/:jobId', { + preHandler: [requireAuth], + handler: ctrl.getJobStatus.bind(ctrl), + }); +}; diff --git a/backend/src/features/ocr/api/ocr.validation.ts b/backend/src/features/ocr/api/ocr.validation.ts new file mode 100644 index 0000000..faf102b --- /dev/null +++ b/backend/src/features/ocr/api/ocr.validation.ts @@ -0,0 +1,18 @@ +/** + * @ai-summary Validation types for OCR API + */ + +/** Query parameters for OCR extract endpoint */ +export interface ExtractQuery { + preprocess?: boolean; +} + +/** Path parameters for job status endpoint */ +export interface JobIdParams { + jobId: string; +} + +/** Form data for job submission */ +export interface JobSubmitBody { + callbackUrl?: string; +} diff --git a/backend/src/features/ocr/domain/ocr.service.ts b/backend/src/features/ocr/domain/ocr.service.ts new file mode 100644 index 0000000..6eaaaaa --- /dev/null +++ b/backend/src/features/ocr/domain/ocr.service.ts @@ -0,0 +1,208 @@ +/** + * @ai-summary Domain service for OCR operations + */ +import { logger } from '../../../core/logging/logger'; +import { ocrClient, JobNotFoundError } from '../external/ocr-client'; +import type { + JobResponse, + OcrExtractRequest, + OcrJobSubmitRequest, + OcrResponse, +} from './ocr.types'; + +/** Maximum file size for sync processing (10MB) */ +const MAX_SYNC_SIZE = 10 * 1024 * 1024; + +/** Maximum file size for async processing (200MB) */ +const MAX_ASYNC_SIZE = 200 * 1024 * 1024; + +/** Supported MIME types */ +const SUPPORTED_TYPES = new Set([ + 'image/jpeg', + 'image/png', + 'image/heic', + 'image/heif', + 'application/pdf', +]); + +/** + * Domain service for OCR operations. + * Handles business logic and validation for OCR requests. + */ +export class OcrService { + /** + * Extract text from an image using synchronous OCR. + * + * @param userId - User ID for logging + * @param request - OCR extraction request + * @returns OCR extraction result + */ + async extract(userId: string, request: OcrExtractRequest): Promise { + // Validate file size for sync processing + if (request.fileBuffer.length > MAX_SYNC_SIZE) { + const err: any = new Error( + `File too large for sync processing. Max: ${MAX_SYNC_SIZE / (1024 * 1024)}MB. Use async job submission for larger files.` + ); + err.statusCode = 413; + throw err; + } + + // Validate content type + if (!SUPPORTED_TYPES.has(request.contentType)) { + const err: any = new Error( + `Unsupported file type: ${request.contentType}. Supported: ${[...SUPPORTED_TYPES].join(', ')}` + ); + err.statusCode = 415; + throw err; + } + + logger.info('OCR extract requested', { + operation: 'ocr.service.extract', + userId, + contentType: request.contentType, + fileSize: request.fileBuffer.length, + preprocess: request.preprocess ?? true, + }); + + try { + const result = await ocrClient.extract( + request.fileBuffer, + request.contentType, + request.preprocess ?? true + ); + + logger.info('OCR extract completed', { + operation: 'ocr.service.extract.success', + userId, + success: result.success, + documentType: result.documentType, + confidence: result.confidence, + processingTimeMs: result.processingTimeMs, + textLength: result.rawText.length, + }); + + return result; + } catch (error) { + logger.error('OCR extract failed', { + operation: 'ocr.service.extract.error', + userId, + error: error instanceof Error ? error.message : 'Unknown error', + }); + throw error; + } + } + + /** + * Submit an async OCR job for large files. + * + * @param userId - User ID for logging + * @param request - Job submission request + * @returns Job response with job ID + */ + async submitJob(userId: string, request: OcrJobSubmitRequest): Promise { + // Validate file size for async processing + if (request.fileBuffer.length > MAX_ASYNC_SIZE) { + const err: any = new Error( + `File too large. Max: ${MAX_ASYNC_SIZE / (1024 * 1024)}MB.` + ); + err.statusCode = 413; + throw err; + } + + // Validate content type + if (!SUPPORTED_TYPES.has(request.contentType)) { + const err: any = new Error( + `Unsupported file type: ${request.contentType}. Supported: ${[...SUPPORTED_TYPES].join(', ')}` + ); + err.statusCode = 415; + throw err; + } + + logger.info('OCR job submit requested', { + operation: 'ocr.service.submitJob', + userId, + contentType: request.contentType, + fileSize: request.fileBuffer.length, + hasCallback: !!request.callbackUrl, + }); + + try { + const result = await ocrClient.submitJob( + request.fileBuffer, + request.contentType, + request.callbackUrl + ); + + logger.info('OCR job submitted', { + operation: 'ocr.service.submitJob.success', + userId, + jobId: result.jobId, + status: result.status, + }); + + return result; + } catch (error) { + logger.error('OCR job submit failed', { + operation: 'ocr.service.submitJob.error', + userId, + error: error instanceof Error ? error.message : 'Unknown error', + }); + throw error; + } + } + + /** + * Get the status of an async OCR job. + * + * @param userId - User ID for logging + * @param jobId - Job ID to check + * @returns Job status response + */ + async getJobStatus(userId: string, jobId: string): Promise { + logger.debug('OCR job status requested', { + operation: 'ocr.service.getJobStatus', + userId, + jobId, + }); + + try { + const result = await ocrClient.getJobStatus(jobId); + + logger.debug('OCR job status retrieved', { + operation: 'ocr.service.getJobStatus.success', + userId, + jobId, + status: result.status, + progress: result.progress, + }); + + return result; + } catch (error) { + if (error instanceof JobNotFoundError) { + const err: any = new Error(`Job ${jobId} not found. Jobs expire after 1 hour.`); + err.statusCode = 404; + throw err; + } + + logger.error('OCR job status failed', { + operation: 'ocr.service.getJobStatus.error', + userId, + jobId, + error: error instanceof Error ? error.message : 'Unknown error', + }); + throw error; + } + } + + /** + * Check if the OCR service is available. + * + * @returns true if OCR service is healthy + */ + async isServiceHealthy(): Promise { + return ocrClient.isHealthy(); + } +} + +/** Singleton instance */ +export const ocrService = new OcrService(); diff --git a/backend/src/features/ocr/domain/ocr.types.ts b/backend/src/features/ocr/domain/ocr.types.ts new file mode 100644 index 0000000..314d0e5 --- /dev/null +++ b/backend/src/features/ocr/domain/ocr.types.ts @@ -0,0 +1,53 @@ +/** + * @ai-summary TypeScript types for OCR feature + */ + +/** Types of documents that can be processed by OCR */ +export type DocumentType = 'vin' | 'receipt' | 'manual' | 'unknown'; + +/** A single extracted field with confidence score */ +export interface ExtractedField { + value: string; + confidence: number; +} + +/** Response from OCR extraction */ +export interface OcrResponse { + success: boolean; + documentType: DocumentType; + rawText: string; + confidence: number; + extractedFields: Record; + processingTimeMs: number; +} + +/** Status of an async OCR job */ +export type JobStatus = 'pending' | 'processing' | 'completed' | 'failed'; + +/** Response for async job status */ +export interface JobResponse { + jobId: string; + status: JobStatus; + progress?: number; + result?: OcrResponse; + error?: string; +} + +/** Request to submit an async OCR job */ +export interface JobSubmitRequest { + callbackUrl?: string; +} + +/** Internal request to OCR service */ +export interface OcrExtractRequest { + fileBuffer: Buffer; + contentType: string; + preprocess?: boolean; +} + +/** Internal request to submit async job */ +export interface OcrJobSubmitRequest { + fileBuffer: Buffer; + contentType: string; + callbackUrl?: string; +} diff --git a/backend/src/features/ocr/external/ocr-client.ts b/backend/src/features/ocr/external/ocr-client.ts new file mode 100644 index 0000000..d8018eb --- /dev/null +++ b/backend/src/features/ocr/external/ocr-client.ts @@ -0,0 +1,229 @@ +/** + * @ai-summary HTTP client for OCR service communication + */ +import FormData from 'form-data'; +import { logger } from '../../../core/logging/logger'; +import type { JobResponse, OcrResponse } from '../domain/ocr.types'; + +/** OCR service configuration */ +const OCR_SERVICE_URL = process.env.OCR_SERVICE_URL || 'http://mvp-ocr:8000'; +const OCR_TIMEOUT_MS = 30000; // 30 seconds for sync operations + +/** + * HTTP client for communicating with the OCR service. + */ +export class OcrClient { + private readonly baseUrl: string; + + constructor(baseUrl: string = OCR_SERVICE_URL) { + this.baseUrl = baseUrl; + } + + /** + * Extract text from an image using OCR. + * + * @param fileBuffer - Image file buffer + * @param contentType - MIME type of the file + * @param preprocess - Whether to apply preprocessing (default: true) + * @returns OCR extraction result + */ + async extract( + fileBuffer: Buffer, + contentType: string, + preprocess: boolean = true + ): Promise { + const formData = new FormData(); + formData.append('file', fileBuffer, { + filename: this.getFilenameFromContentType(contentType), + contentType, + }); + + const url = `${this.baseUrl}/extract?preprocess=${preprocess}`; + + logger.info('OCR extract request', { + operation: 'ocr.client.extract', + url, + contentType, + fileSize: fileBuffer.length, + preprocess, + }); + + const response = await this.fetchWithTimeout(url, { + method: 'POST', + body: formData as any, + headers: formData.getHeaders(), + }); + + if (!response.ok) { + const errorText = await response.text(); + logger.error('OCR extract failed', { + operation: 'ocr.client.extract.error', + status: response.status, + error: errorText, + }); + throw new Error(`OCR service error: ${response.status} - ${errorText}`); + } + + const result = (await response.json()) as OcrResponse; + + logger.info('OCR extract completed', { + operation: 'ocr.client.extract.success', + success: result.success, + documentType: result.documentType, + confidence: result.confidence, + processingTimeMs: result.processingTimeMs, + }); + + return result; + } + + /** + * Submit an async OCR job for large files. + * + * @param fileBuffer - Image file buffer + * @param contentType - MIME type of the file + * @param callbackUrl - Optional URL to call when job completes + * @returns Job submission response + */ + async submitJob( + fileBuffer: Buffer, + contentType: string, + callbackUrl?: string + ): Promise { + const formData = new FormData(); + formData.append('file', fileBuffer, { + filename: this.getFilenameFromContentType(contentType), + contentType, + }); + if (callbackUrl) { + formData.append('callback_url', callbackUrl); + } + + const url = `${this.baseUrl}/jobs`; + + logger.info('OCR job submit request', { + operation: 'ocr.client.submitJob', + url, + contentType, + fileSize: fileBuffer.length, + hasCallback: !!callbackUrl, + }); + + const response = await this.fetchWithTimeout(url, { + method: 'POST', + body: formData as any, + headers: formData.getHeaders(), + }); + + if (!response.ok) { + const errorText = await response.text(); + logger.error('OCR job submit failed', { + operation: 'ocr.client.submitJob.error', + status: response.status, + error: errorText, + }); + throw new Error(`OCR service error: ${response.status} - ${errorText}`); + } + + const result = (await response.json()) as JobResponse; + + logger.info('OCR job submitted', { + operation: 'ocr.client.submitJob.success', + jobId: result.jobId, + status: result.status, + }); + + return result; + } + + /** + * Get the status of an async OCR job. + * + * @param jobId - Job ID to check + * @returns Job status response + */ + async getJobStatus(jobId: string): Promise { + const url = `${this.baseUrl}/jobs/${jobId}`; + + logger.debug('OCR job status request', { + operation: 'ocr.client.getJobStatus', + jobId, + }); + + const response = await this.fetchWithTimeout(url, { + method: 'GET', + }); + + if (response.status === 404) { + throw new JobNotFoundError(jobId); + } + + if (!response.ok) { + const errorText = await response.text(); + logger.error('OCR job status failed', { + operation: 'ocr.client.getJobStatus.error', + jobId, + status: response.status, + error: errorText, + }); + throw new Error(`OCR service error: ${response.status} - ${errorText}`); + } + + return (await response.json()) as JobResponse; + } + + /** + * Check if the OCR service is healthy. + * + * @returns true if healthy, false otherwise + */ + async isHealthy(): Promise { + try { + const response = await this.fetchWithTimeout(`${this.baseUrl}/health`, { + method: 'GET', + }); + return response.ok; + } catch { + return false; + } + } + + private async fetchWithTimeout( + url: string, + options: RequestInit & { headers?: Record } + ): Promise { + const controller = new AbortController(); + const timeout = setTimeout(() => controller.abort(), OCR_TIMEOUT_MS); + + try { + return await fetch(url, { + ...options, + signal: controller.signal, + }); + } finally { + clearTimeout(timeout); + } + } + + private getFilenameFromContentType(contentType: string): string { + const extensions: Record = { + 'image/jpeg': 'image.jpg', + 'image/png': 'image.png', + 'image/heic': 'image.heic', + 'image/heif': 'image.heif', + 'application/pdf': 'document.pdf', + }; + return extensions[contentType] || 'file.bin'; + } +} + +/** Error thrown when a job is not found */ +export class JobNotFoundError extends Error { + constructor(jobId: string) { + super(`Job ${jobId} not found`); + this.name = 'JobNotFoundError'; + } +} + +/** Singleton instance */ +export const ocrClient = new OcrClient(); diff --git a/backend/src/features/ocr/index.ts b/backend/src/features/ocr/index.ts new file mode 100644 index 0000000..0ee79a7 --- /dev/null +++ b/backend/src/features/ocr/index.ts @@ -0,0 +1,11 @@ +/** + * @ai-summary Public API for OCR feature capsule + */ +export { ocrRoutes } from './api/ocr.routes'; +export type { + DocumentType, + ExtractedField, + JobResponse, + JobStatus, + OcrResponse, +} from './domain/ocr.types'; diff --git a/docker-compose.yml b/docker-compose.yml index 17abbf4..f4af2a4 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -173,8 +173,14 @@ services: restart: unless-stopped environment: LOG_LEVEL: info + REDIS_HOST: mvp-redis + REDIS_PORT: 6379 + REDIS_DB: 1 networks: - backend + - database + depends_on: + - mvp-redis healthcheck: test: ["CMD", "curl", "-f", "http://localhost:8000/health"] interval: 30s diff --git a/ocr/app/config.py b/ocr/app/config.py index a0f4ada..384ffdc 100644 --- a/ocr/app/config.py +++ b/ocr/app/config.py @@ -11,5 +11,10 @@ class Settings: self.port: int = int(os.getenv("PORT", "8000")) self.tesseract_cmd: str = os.getenv("TESSERACT_CMD", "/usr/bin/tesseract") + # Redis configuration for job queue + self.redis_host: str = os.getenv("REDIS_HOST", "mvp-redis") + self.redis_port: int = int(os.getenv("REDIS_PORT", "6379")) + self.redis_db: int = int(os.getenv("REDIS_DB", "1")) + settings = Settings() diff --git a/ocr/app/main.py b/ocr/app/main.py index 4553aa9..5af3c8c 100644 --- a/ocr/app/main.py +++ b/ocr/app/main.py @@ -1,14 +1,44 @@ """OCR Service FastAPI Application.""" +import logging +from contextlib import asynccontextmanager +from typing import AsyncIterator + from fastapi import FastAPI from app.config import settings +from app.routers import extract_router, jobs_router +from app.services import job_queue + +# Configure logging +logging.basicConfig( + level=getattr(logging, settings.log_level.upper(), logging.INFO), + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", +) +logger = logging.getLogger(__name__) + + +@asynccontextmanager +async def lifespan(app: FastAPI) -> AsyncIterator[None]: + """Application lifespan handler for startup/shutdown.""" + # Startup + logger.info("OCR service starting up") + yield + # Shutdown + logger.info("OCR service shutting down") + await job_queue.close() + app = FastAPI( title="MotoVaultPro OCR Service", description="OCR processing service for vehicle documents", version="1.0.0", + lifespan=lifespan, ) +# Include routers +app.include_router(extract_router) +app.include_router(jobs_router) + @app.get("/health") async def health_check() -> dict: @@ -23,4 +53,9 @@ async def root() -> dict: "service": "mvp-ocr", "version": "1.0.0", "log_level": settings.log_level, + "endpoints": [ + "POST /extract - Synchronous OCR extraction", + "POST /jobs - Submit async OCR job", + "GET /jobs/{job_id} - Get async job status", + ], } diff --git a/ocr/app/models/__init__.py b/ocr/app/models/__init__.py new file mode 100644 index 0000000..bac3e26 --- /dev/null +++ b/ocr/app/models/__init__.py @@ -0,0 +1,18 @@ +"""Pydantic models for OCR service.""" +from .schemas import ( + DocumentType, + ExtractedField, + JobResponse, + JobStatus, + JobSubmitRequest, + OcrResponse, +) + +__all__ = [ + "DocumentType", + "ExtractedField", + "JobResponse", + "JobStatus", + "JobSubmitRequest", + "OcrResponse", +] diff --git a/ocr/app/models/schemas.py b/ocr/app/models/schemas.py new file mode 100644 index 0000000..d5c86a5 --- /dev/null +++ b/ocr/app/models/schemas.py @@ -0,0 +1,65 @@ +"""Pydantic models for OCR API request/response validation.""" +from enum import Enum +from typing import Optional + +from pydantic import BaseModel, Field + + +class DocumentType(str, Enum): + """Types of documents that can be processed.""" + + VIN = "vin" + RECEIPT = "receipt" + MANUAL = "manual" + UNKNOWN = "unknown" + + +class ExtractedField(BaseModel): + """A single extracted field with confidence score.""" + + value: str + confidence: float = Field(ge=0.0, le=1.0) + + +class OcrResponse(BaseModel): + """Response from OCR extraction.""" + + success: bool + document_type: DocumentType = Field(alias="documentType") + raw_text: str = Field(alias="rawText") + confidence: float = Field(ge=0.0, le=1.0) + extracted_fields: dict[str, ExtractedField] = Field( + default_factory=dict, alias="extractedFields" + ) + processing_time_ms: int = Field(alias="processingTimeMs") + + model_config = {"populate_by_name": True} + + +class JobStatus(str, Enum): + """Status of an async OCR job.""" + + PENDING = "pending" + PROCESSING = "processing" + COMPLETED = "completed" + FAILED = "failed" + + +class JobResponse(BaseModel): + """Response for async job status.""" + + job_id: str = Field(alias="jobId") + status: JobStatus + progress: Optional[int] = Field(default=None, ge=0, le=100) + result: Optional[OcrResponse] = None + error: Optional[str] = None + + model_config = {"populate_by_name": True} + + +class JobSubmitRequest(BaseModel): + """Request to submit an async OCR job.""" + + callback_url: Optional[str] = Field(default=None, alias="callbackUrl") + + model_config = {"populate_by_name": True} diff --git a/ocr/app/routers/__init__.py b/ocr/app/routers/__init__.py new file mode 100644 index 0000000..ded0afd --- /dev/null +++ b/ocr/app/routers/__init__.py @@ -0,0 +1,5 @@ +"""OCR API routers.""" +from .extract import router as extract_router +from .jobs import router as jobs_router + +__all__ = ["extract_router", "jobs_router"] diff --git a/ocr/app/routers/extract.py b/ocr/app/routers/extract.py new file mode 100644 index 0000000..1f7afaf --- /dev/null +++ b/ocr/app/routers/extract.py @@ -0,0 +1,69 @@ +"""OCR extraction endpoints.""" +import logging + +from fastapi import APIRouter, File, HTTPException, Query, UploadFile + +from app.models import OcrResponse +from app.services import ocr_service + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/extract", tags=["extract"]) + +# Maximum file size for synchronous processing (10MB) +MAX_SYNC_SIZE = 10 * 1024 * 1024 + + +@router.post("", response_model=OcrResponse) +async def extract_text( + file: UploadFile = File(..., description="Image file to process"), + preprocess: bool = Query(True, description="Apply image preprocessing"), +) -> OcrResponse: + """ + Extract text from an uploaded image using OCR. + + Supports HEIC, JPEG, PNG, and PDF (first page only) formats. + Processing time target: <3 seconds for typical photos. + + - **file**: Image file (max 10MB for sync processing) + - **preprocess**: Whether to apply deskew/denoise preprocessing (default: true) + """ + # Validate file presence + if not file.filename: + raise HTTPException(status_code=400, detail="No file provided") + + # Read file content + content = await file.read() + file_size = len(content) + + # Validate file size + if file_size > MAX_SYNC_SIZE: + raise HTTPException( + status_code=413, + detail=f"File too large for sync processing. Max: {MAX_SYNC_SIZE // (1024*1024)}MB. Use /jobs for larger files.", + ) + + if file_size == 0: + raise HTTPException(status_code=400, detail="Empty file provided") + + logger.info( + f"Processing file: {file.filename}, " + f"size: {file_size} bytes, " + f"content_type: {file.content_type}" + ) + + # Perform OCR extraction + result = ocr_service.extract( + file_bytes=content, + content_type=file.content_type, + preprocess=preprocess, + ) + + if not result.success: + logger.warning(f"OCR extraction failed for {file.filename}") + raise HTTPException( + status_code=422, + detail="Failed to extract text from image. Ensure the file is a valid image format.", + ) + + return result diff --git a/ocr/app/routers/jobs.py b/ocr/app/routers/jobs.py new file mode 100644 index 0000000..c467c8a --- /dev/null +++ b/ocr/app/routers/jobs.py @@ -0,0 +1,142 @@ +"""Async OCR job endpoints.""" +import asyncio +import logging +from typing import Optional + +from fastapi import APIRouter, BackgroundTasks, File, Form, HTTPException, UploadFile + +from app.models import JobResponse, JobSubmitRequest +from app.services import job_queue, ocr_service + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/jobs", tags=["jobs"]) + +# Maximum file size for async processing (200MB) +MAX_ASYNC_SIZE = 200 * 1024 * 1024 + + +@router.post("", response_model=JobResponse) +async def submit_job( + background_tasks: BackgroundTasks, + file: UploadFile = File(..., description="Image file to process"), + callback_url: Optional[str] = Form(None, description="URL to call when job completes"), +) -> JobResponse: + """ + Submit an async OCR job for large files. + + Use this endpoint for files larger than 10MB or when you don't want to wait + for processing to complete. Poll GET /jobs/{job_id} for status. + + - **file**: Image file (max 200MB) + - **callback_url**: Optional webhook URL to receive job completion notification + """ + # Validate file presence + if not file.filename: + raise HTTPException(status_code=400, detail="No file provided") + + # Read file content + content = await file.read() + file_size = len(content) + + # Validate file size + if file_size > MAX_ASYNC_SIZE: + raise HTTPException( + status_code=413, + detail=f"File too large. Max: {MAX_ASYNC_SIZE // (1024*1024)}MB.", + ) + + if file_size == 0: + raise HTTPException(status_code=400, detail="Empty file provided") + + logger.info( + f"Submitting async job: {file.filename}, " + f"size: {file_size} bytes, " + f"content_type: {file.content_type}" + ) + + # Submit job to queue + job_id = await job_queue.submit_job( + file_bytes=content, + content_type=file.content_type or "application/octet-stream", + callback_url=callback_url, + ) + + # Schedule background processing + background_tasks.add_task(process_job, job_id) + + # Return initial status + return JobResponse( + jobId=job_id, + status="pending", + progress=0, + ) + + +@router.get("/{job_id}", response_model=JobResponse) +async def get_job_status(job_id: str) -> JobResponse: + """ + Get the status of an async OCR job. + + Poll this endpoint to check job progress and retrieve results. + + Returns: + - **pending**: Job is queued + - **processing**: Job is being processed (includes progress %) + - **completed**: Job finished successfully (includes result) + - **failed**: Job failed (includes error message) + """ + result = await job_queue.get_job_status(job_id) + + if result is None: + raise HTTPException( + status_code=404, + detail=f"Job {job_id} not found. Jobs expire after 1 hour.", + ) + + return result + + +async def process_job(job_id: str) -> None: + """Background task to process an OCR job.""" + logger.info(f"Starting job {job_id}") + + try: + # Update status to processing + await job_queue.update_job_progress(job_id, 10) + + # Get job data + file_bytes = await job_queue.get_job_data(job_id) + if not file_bytes: + await job_queue.fail_job(job_id, "Job data not found") + return + + await job_queue.update_job_progress(job_id, 30) + + # Get metadata for content type + status = await job_queue.get_job_status(job_id) + if not status: + return + + # Perform OCR in thread pool (CPU-bound operation) + await job_queue.update_job_progress(job_id, 50) + + loop = asyncio.get_event_loop() + result = await loop.run_in_executor( + None, + lambda: ocr_service.extract( + file_bytes=file_bytes, + preprocess=True, + ), + ) + + await job_queue.update_job_progress(job_id, 90) + + if result.success: + await job_queue.complete_job(job_id, result) + else: + await job_queue.fail_job(job_id, "OCR extraction failed") + + except Exception as e: + logger.error(f"Job {job_id} failed: {e}", exc_info=True) + await job_queue.fail_job(job_id, str(e)) diff --git a/ocr/app/services/__init__.py b/ocr/app/services/__init__.py new file mode 100644 index 0000000..eb74d8b --- /dev/null +++ b/ocr/app/services/__init__.py @@ -0,0 +1,6 @@ +"""OCR service layer.""" +from .job_queue import job_queue +from .ocr_service import ocr_service +from .preprocessor import preprocessor + +__all__ = ["job_queue", "ocr_service", "preprocessor"] diff --git a/ocr/app/services/job_queue.py b/ocr/app/services/job_queue.py new file mode 100644 index 0000000..7b4e04c --- /dev/null +++ b/ocr/app/services/job_queue.py @@ -0,0 +1,233 @@ +"""Redis-based job queue for async OCR processing.""" +import asyncio +import json +import logging +import uuid +from typing import Optional + +import redis.asyncio as redis + +from app.config import settings +from app.models import JobResponse, JobStatus, OcrResponse + +logger = logging.getLogger(__name__) + +# Job TTL in seconds (1 hour) +JOB_TTL = 3600 + +# Key prefixes +JOB_PREFIX = "ocr:job:" +JOB_DATA_PREFIX = "ocr:job:data:" +JOB_RESULT_PREFIX = "ocr:job:result:" + + +class JobQueue: + """Manages async OCR jobs using Redis.""" + + def __init__(self) -> None: + """Initialize job queue.""" + self._redis: Optional[redis.Redis] = None + + async def get_redis(self) -> redis.Redis: + """Get or create Redis connection.""" + if self._redis is None: + self._redis = redis.Redis( + host=settings.redis_host, + port=settings.redis_port, + db=settings.redis_db, + decode_responses=True, + ) + return self._redis + + async def close(self) -> None: + """Close Redis connection.""" + if self._redis: + await self._redis.close() + self._redis = None + + async def submit_job( + self, + file_bytes: bytes, + content_type: str, + callback_url: Optional[str] = None, + ) -> str: + """ + Submit a new OCR job. + + Args: + file_bytes: Raw file bytes to process + content_type: MIME type of the file + callback_url: Optional URL to call when job completes + + Returns: + Job ID + """ + r = await self.get_redis() + job_id = str(uuid.uuid4()) + + # Store job metadata + job_meta = { + "status": JobStatus.PENDING.value, + "progress": 0, + "content_type": content_type, + "callback_url": callback_url or "", + } + + # Store file data separately (binary) + data_key = f"{JOB_DATA_PREFIX}{job_id}" + meta_key = f"{JOB_PREFIX}{job_id}" + + # Use pipeline for atomic operation + async with r.pipeline() as pipe: + # Store metadata as hash + await pipe.hset(meta_key, mapping=job_meta) # type: ignore + await pipe.expire(meta_key, JOB_TTL) + + # Store binary data + await pipe.set(data_key, file_bytes) + await pipe.expire(data_key, JOB_TTL) + + await pipe.execute() + + logger.info(f"Job {job_id} submitted") + return job_id + + async def get_job_status(self, job_id: str) -> Optional[JobResponse]: + """ + Get the status of a job. + + Args: + job_id: Job ID to check + + Returns: + JobResponse or None if job doesn't exist + """ + r = await self.get_redis() + meta_key = f"{JOB_PREFIX}{job_id}" + result_key = f"{JOB_RESULT_PREFIX}{job_id}" + + # Get job metadata + meta = await r.hgetall(meta_key) # type: ignore + if not meta: + return None + + status = JobStatus(meta.get("status", JobStatus.PENDING.value)) + progress = int(meta.get("progress", 0)) + error = meta.get("error") + + # Get result if completed + result = None + if status == JobStatus.COMPLETED: + result_json = await r.get(result_key) + if result_json: + result_dict = json.loads(result_json) + result = OcrResponse(**result_dict) + + return JobResponse( + jobId=job_id, + status=status, + progress=progress if status == JobStatus.PROCESSING else None, + result=result, + error=error if status == JobStatus.FAILED else None, + ) + + async def update_job_progress(self, job_id: str, progress: int) -> None: + """Update job progress percentage.""" + r = await self.get_redis() + meta_key = f"{JOB_PREFIX}{job_id}" + + await r.hset(meta_key, mapping={ # type: ignore + "status": JobStatus.PROCESSING.value, + "progress": progress, + }) + + async def complete_job(self, job_id: str, result: OcrResponse) -> None: + """Mark job as completed with result.""" + r = await self.get_redis() + meta_key = f"{JOB_PREFIX}{job_id}" + result_key = f"{JOB_RESULT_PREFIX}{job_id}" + data_key = f"{JOB_DATA_PREFIX}{job_id}" + + # Store result + result_dict = result.model_dump(by_alias=True) + result_json = json.dumps(result_dict) + + async with r.pipeline() as pipe: + # Update status + await pipe.hset(meta_key, mapping={ # type: ignore + "status": JobStatus.COMPLETED.value, + "progress": 100, + }) + + # Store result + await pipe.set(result_key, result_json) + await pipe.expire(result_key, JOB_TTL) + + # Delete file data (no longer needed) + await pipe.delete(data_key) + + await pipe.execute() + + logger.info(f"Job {job_id} completed") + + # TODO: Trigger callback if configured + meta = await r.hgetall(meta_key) # type: ignore + callback_url = meta.get("callback_url") + if callback_url: + # Fire-and-forget callback (don't block) + asyncio.create_task(self._send_callback(callback_url, job_id, result_dict)) + + async def fail_job(self, job_id: str, error: str) -> None: + """Mark job as failed with error message.""" + r = await self.get_redis() + meta_key = f"{JOB_PREFIX}{job_id}" + data_key = f"{JOB_DATA_PREFIX}{job_id}" + + async with r.pipeline() as pipe: + await pipe.hset(meta_key, mapping={ # type: ignore + "status": JobStatus.FAILED.value, + "error": error, + }) + # Delete file data + await pipe.delete(data_key) + await pipe.execute() + + logger.error(f"Job {job_id} failed: {error}") + + async def get_job_data(self, job_id: str) -> Optional[bytes]: + """Get the file data for a job.""" + r = await self.get_redis() + data_key = f"{JOB_DATA_PREFIX}{job_id}" + + # Get raw bytes (not decoded) + raw_redis = redis.Redis( + host=settings.redis_host, + port=settings.redis_port, + db=settings.redis_db, + decode_responses=False, + ) + try: + data = await raw_redis.get(data_key) + return data # type: ignore + finally: + await raw_redis.close() + + async def _send_callback( + self, url: str, job_id: str, result: dict + ) -> None: + """Send callback notification when job completes.""" + try: + import httpx + + async with httpx.AsyncClient(timeout=10.0) as client: + await client.post( + url, + json={"jobId": job_id, "result": result}, + ) + logger.info(f"Callback sent for job {job_id}") + except Exception as e: + logger.error(f"Callback failed for job {job_id}: {e}") + + +# Singleton instance +job_queue = JobQueue() diff --git a/ocr/app/services/ocr_service.py b/ocr/app/services/ocr_service.py new file mode 100644 index 0000000..4c317b3 --- /dev/null +++ b/ocr/app/services/ocr_service.py @@ -0,0 +1,275 @@ +"""Core OCR service using Tesseract with HEIC support.""" +import io +import logging +import time +from typing import Optional + +import magic +import pytesseract +from PIL import Image +from pillow_heif import register_heif_opener + +from app.config import settings +from app.models import DocumentType, ExtractedField, OcrResponse +from app.services.preprocessor import preprocessor + +# Register HEIF/HEIC opener with Pillow +register_heif_opener() + +logger = logging.getLogger(__name__) + + +class OcrService: + """Core OCR processing service.""" + + # Supported MIME types + SUPPORTED_TYPES = { + "image/jpeg", + "image/png", + "image/heic", + "image/heif", + "application/pdf", + } + + def __init__(self) -> None: + """Initialize OCR service.""" + pytesseract.pytesseract.tesseract_cmd = settings.tesseract_cmd + + def extract( + self, + file_bytes: bytes, + content_type: Optional[str] = None, + preprocess: bool = True, + ) -> OcrResponse: + """ + Extract text from an image file. + + Args: + file_bytes: Raw file bytes + content_type: MIME type (optional, will be detected if not provided) + preprocess: Whether to apply preprocessing + + Returns: + OcrResponse with extracted text and metadata + """ + start_time = time.time() + + # Detect file type if not provided + if not content_type: + content_type = self._detect_mime_type(file_bytes) + + # Validate file type + if content_type not in self.SUPPORTED_TYPES: + return OcrResponse( + success=False, + documentType=DocumentType.UNKNOWN, + rawText="", + confidence=0.0, + extractedFields={}, + processingTimeMs=int((time.time() - start_time) * 1000), + ) + + try: + # Convert HEIC/HEIF to standard format + if content_type in ("image/heic", "image/heif"): + file_bytes = self._convert_heic(file_bytes) + content_type = "image/png" + + # Handle PDF (extract first page as image) + if content_type == "application/pdf": + file_bytes = self._extract_pdf_first_page(file_bytes) + content_type = "image/png" + + # Apply preprocessing if enabled + if preprocess: + file_bytes = preprocessor.preprocess( + file_bytes, deskew=True, denoise=True + ) + + # Perform OCR + image = Image.open(io.BytesIO(file_bytes)) + ocr_data = pytesseract.image_to_data( + image, output_type=pytesseract.Output.DICT + ) + + # Extract text and calculate confidence + raw_text, confidence = self._process_ocr_data(ocr_data) + + # Detect document type from content + document_type = self._detect_document_type(raw_text) + + # Extract fields based on document type + extracted_fields = self._extract_fields(raw_text, document_type) + + processing_time_ms = int((time.time() - start_time) * 1000) + + logger.info( + f"OCR completed: {len(raw_text)} chars, " + f"{confidence:.2%} confidence, {processing_time_ms}ms" + ) + + return OcrResponse( + success=True, + documentType=document_type, + rawText=raw_text, + confidence=confidence, + extractedFields=extracted_fields, + processingTimeMs=processing_time_ms, + ) + + except Exception as e: + logger.error(f"OCR extraction failed: {e}", exc_info=True) + return OcrResponse( + success=False, + documentType=DocumentType.UNKNOWN, + rawText="", + confidence=0.0, + extractedFields={}, + processingTimeMs=int((time.time() - start_time) * 1000), + ) + + def _detect_mime_type(self, file_bytes: bytes) -> str: + """Detect MIME type using python-magic.""" + mime = magic.Magic(mime=True) + detected = mime.from_buffer(file_bytes) + return detected or "application/octet-stream" + + def _convert_heic(self, heic_bytes: bytes) -> bytes: + """Convert HEIC/HEIF to PNG format.""" + # pillow-heif registers itself with PIL, so we can open HEIC directly + image = Image.open(io.BytesIO(heic_bytes)) + buffer = io.BytesIO() + image.save(buffer, format="PNG") + return buffer.getvalue() + + def _extract_pdf_first_page(self, pdf_bytes: bytes) -> bytes: + """Extract first page of PDF as PNG image.""" + try: + # Use pdf2image if available, otherwise return empty + from pdf2image import convert_from_bytes + + images = convert_from_bytes(pdf_bytes, first_page=1, last_page=1, dpi=300) + if images: + buffer = io.BytesIO() + images[0].save(buffer, format="PNG") + return buffer.getvalue() + except ImportError: + logger.warning("pdf2image not available, PDF support limited") + except Exception as e: + logger.error(f"PDF extraction failed: {e}") + + return b"" + + def _process_ocr_data( + self, ocr_data: dict + ) -> tuple[str, float]: + """Process Tesseract output to extract text and confidence.""" + words = [] + confidences = [] + + for i, text in enumerate(ocr_data["text"]): + # Filter out empty strings and low-confidence results + conf = int(ocr_data["conf"][i]) + if text.strip() and conf > 0: + words.append(text) + confidences.append(conf) + + raw_text = " ".join(words) + avg_confidence = sum(confidences) / len(confidences) if confidences else 0.0 + + # Normalize confidence to 0-1 range (Tesseract returns 0-100) + return raw_text, avg_confidence / 100.0 + + def _detect_document_type(self, text: str) -> DocumentType: + """Detect document type from extracted text content.""" + text_lower = text.lower() + + # VIN document indicators + vin_indicators = [ + "vin", + "vehicle identification", + "title", + "registration", + "certificate of title", + ] + if any(indicator in text_lower for indicator in vin_indicators): + # Additional check: look for 17-character alphanumeric sequences + import re + + vin_pattern = r"\b[A-HJ-NPR-Z0-9]{17}\b" + if re.search(vin_pattern, text.upper()): + return DocumentType.VIN + + # Receipt indicators + receipt_indicators = [ + "receipt", + "total", + "subtotal", + "tax", + "payment", + "invoice", + "amount due", + "gallons", + "price/gallon", + ] + if sum(1 for ind in receipt_indicators if ind in text_lower) >= 2: + return DocumentType.RECEIPT + + # Manual indicators + manual_indicators = [ + "owner's manual", + "maintenance schedule", + "service interval", + "chapter", + "table of contents", + "specifications", + ] + if any(indicator in text_lower for indicator in manual_indicators): + return DocumentType.MANUAL + + return DocumentType.UNKNOWN + + def _extract_fields( + self, text: str, document_type: DocumentType + ) -> dict[str, ExtractedField]: + """Extract specific fields based on document type.""" + import re + + fields: dict[str, ExtractedField] = {} + + if document_type == DocumentType.VIN: + # Extract VIN (17 alphanumeric characters, excluding I, O, Q) + vin_pattern = r"\b([A-HJ-NPR-Z0-9]{17})\b" + match = re.search(vin_pattern, text.upper()) + if match: + fields["vin"] = ExtractedField(value=match.group(1), confidence=0.9) + + elif document_type == DocumentType.RECEIPT: + # Extract amounts (currency patterns) + amount_pattern = r"\$\s*(\d{1,3}(?:,\d{3})*(?:\.\d{2})?)" + amounts = re.findall(amount_pattern, text) + if amounts: + # Last amount is often the total + fields["total"] = ExtractedField( + value=f"${amounts[-1]}", confidence=0.7 + ) + + # Extract date + date_pattern = r"(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})" + date_match = re.search(date_pattern, text) + if date_match: + fields["date"] = ExtractedField(value=date_match.group(1), confidence=0.8) + + # Extract gallons (for fuel receipts) + gallon_pattern = r"(\d+\.?\d*)\s*(?:gal|gallons)" + gallon_match = re.search(gallon_pattern, text.lower()) + if gallon_match: + fields["gallons"] = ExtractedField( + value=gallon_match.group(1), confidence=0.85 + ) + + return fields + + +# Singleton instance +ocr_service = OcrService() diff --git a/ocr/app/services/preprocessor.py b/ocr/app/services/preprocessor.py new file mode 100644 index 0000000..a609788 --- /dev/null +++ b/ocr/app/services/preprocessor.py @@ -0,0 +1,176 @@ +"""Image preprocessing service for OCR accuracy improvement.""" +import io +import logging +from typing import Optional + +import cv2 +import numpy as np +from PIL import Image + +logger = logging.getLogger(__name__) + + +class ImagePreprocessor: + """Handles image preprocessing for improved OCR accuracy.""" + + def preprocess( + self, + image_bytes: bytes, + deskew: bool = True, + denoise: bool = True, + binarize: bool = False, + ) -> bytes: + """ + Apply preprocessing to an image for better OCR results. + + Args: + image_bytes: Raw image bytes + deskew: Whether to correct image rotation + denoise: Whether to apply noise reduction + binarize: Whether to convert to black and white + + Returns: + Preprocessed image as PNG bytes + """ + # Convert bytes to numpy array via PIL + pil_image = Image.open(io.BytesIO(image_bytes)) + + # Convert to RGB if necessary (handles RGBA, grayscale, etc.) + if pil_image.mode not in ("RGB", "L"): + pil_image = pil_image.convert("RGB") + + # Convert PIL to OpenCV format + cv_image = np.array(pil_image) + + # Convert RGB to BGR for OpenCV (if color image) + if len(cv_image.shape) == 3: + cv_image = cv2.cvtColor(cv_image, cv2.COLOR_RGB2BGR) + + # Convert to grayscale for processing + if len(cv_image.shape) == 3: + gray = cv2.cvtColor(cv_image, cv2.COLOR_BGR2GRAY) + else: + gray = cv_image + + # Apply denoising + if denoise: + gray = self._denoise(gray) + + # Apply deskewing + if deskew: + gray = self._deskew(gray) + + # Apply binarization (optional - can help with some documents) + if binarize: + gray = self._binarize(gray) + + # Convert back to PIL and return as PNG bytes + result_image = Image.fromarray(gray) + buffer = io.BytesIO() + result_image.save(buffer, format="PNG") + return buffer.getvalue() + + def _denoise(self, image: np.ndarray) -> np.ndarray: + """Apply noise reduction using non-local means denoising.""" + try: + # fastNlMeansDenoising is effective for grayscale images + return cv2.fastNlMeansDenoising(image, h=10, templateWindowSize=7, searchWindowSize=21) + except cv2.error as e: + logger.warning(f"Denoising failed: {e}") + return image + + def _deskew(self, image: np.ndarray) -> np.ndarray: + """Correct image rotation using Hough transform.""" + try: + # Detect edges + edges = cv2.Canny(image, 50, 150, apertureSize=3) + + # Detect lines using Hough transform + lines = cv2.HoughLinesP( + edges, + rho=1, + theta=np.pi / 180, + threshold=100, + minLineLength=100, + maxLineGap=10, + ) + + if lines is None: + return image + + # Calculate the average angle of detected lines + angles = [] + for line in lines: + x1, y1, x2, y2 = line[0] + if x2 - x1 != 0: # Avoid division by zero + angle = np.arctan2(y2 - y1, x2 - x1) * 180 / np.pi + # Only consider nearly horizontal lines (within 45 degrees) + if -45 < angle < 45: + angles.append(angle) + + if not angles: + return image + + # Use median angle to avoid outliers + median_angle = np.median(angles) + + # Only correct if skew is significant but not too extreme + if abs(median_angle) < 0.5 or abs(median_angle) > 15: + return image + + # Rotate the image to correct skew + height, width = image.shape[:2] + center = (width // 2, height // 2) + rotation_matrix = cv2.getRotationMatrix2D(center, median_angle, 1.0) + + # Calculate new image bounds to avoid cropping + cos_val = abs(rotation_matrix[0, 0]) + sin_val = abs(rotation_matrix[0, 1]) + new_width = int(height * sin_val + width * cos_val) + new_height = int(height * cos_val + width * sin_val) + + rotation_matrix[0, 2] += (new_width - width) / 2 + rotation_matrix[1, 2] += (new_height - height) / 2 + + rotated = cv2.warpAffine( + image, + rotation_matrix, + (new_width, new_height), + borderMode=cv2.BORDER_REPLICATE, + ) + + logger.debug(f"Deskewed image by {median_angle:.2f} degrees") + return rotated + + except Exception as e: + logger.warning(f"Deskewing failed: {e}") + return image + + def _binarize(self, image: np.ndarray) -> np.ndarray: + """Convert to binary (black and white) using adaptive thresholding.""" + try: + return cv2.adaptiveThreshold( + image, + 255, + cv2.ADAPTIVE_THRESH_GAUSSIAN_C, + cv2.THRESH_BINARY, + blockSize=11, + C=2, + ) + except cv2.error as e: + logger.warning(f"Binarization failed: {e}") + return image + + def get_image_info(self, image_bytes: bytes) -> dict: + """Get basic information about an image.""" + pil_image = Image.open(io.BytesIO(image_bytes)) + return { + "width": pil_image.width, + "height": pil_image.height, + "mode": pil_image.mode, + "format": pil_image.format, + } + + +# Singleton instance +preprocessor = ImagePreprocessor() diff --git a/ocr/requirements.txt b/ocr/requirements.txt index d14a652..5e374c1 100644 --- a/ocr/requirements.txt +++ b/ocr/requirements.txt @@ -2,6 +2,7 @@ fastapi>=0.100.0 uvicorn[standard]>=0.23.0 python-multipart>=0.0.6 +pydantic>=2.0.0 # File Detection & Handling python-magic>=0.4.27 @@ -15,6 +16,12 @@ numpy>=1.24.0 # OCR Engines pytesseract>=0.3.10 +# Redis for job queue +redis>=5.0.0 + +# HTTP client for callbacks +httpx>=0.24.0 + # Testing pytest>=7.4.0 -httpx>=0.24.0 +pytest-asyncio>=0.21.0