9.9 KiB
9.9 KiB
Implementation Plan - Manual JSON ETL
Implementation Overview
Add manual JSON processing capability to the existing MVP Platform Vehicles ETL system without disrupting the current MSSQL-based pipeline.
Development Phases
Phase 1: Core Utilities ⏳
Objective: Create foundational utilities for JSON processing
1.1 Make Name Mapper (etl/utils/make_name_mapper.py)
class MakeNameMapper:
def normalize_make_name(self, filename: str) -> str:
"""Convert 'alfa_romeo' to 'Alfa Romeo'"""
def get_display_name_mapping(self) -> Dict[str, str]:
"""Get complete filename -> display name mapping"""
def validate_against_sources(self) -> List[str]:
"""Cross-validate with sources/makes.json"""
Implementation Requirements:
- Handle underscore → space conversion
- Title case each word
- Special cases: BMW, GMC (all caps)
- Validation against existing
sources/makes.json
1.2 Engine Spec Parser (etl/utils/engine_spec_parser.py)
@dataclass
class EngineSpec:
displacement_l: float
configuration: str # I, V, H
cylinders: int
fuel_type: str # Gasoline, Hybrid, Electric, Flex Fuel
aspiration: str # Natural, Turbo, Supercharged
raw_string: str
class EngineSpecParser:
def parse_engine_string(self, engine_str: str) -> EngineSpec:
"""Parse '2.0L I4 PLUG-IN HYBRID EV- (PHEV)' into components"""
def normalize_configuration(self, config: str) -> str:
"""Convert L → I (L3 becomes I3)"""
def extract_fuel_type(self, engine_str: str) -> str:
"""Extract fuel type from modifiers"""
Implementation Requirements:
- CRITICAL: L-configuration → I (Inline) normalization
- Regex patterns for standard format:
{displacement}L {config}{cylinders} - Hybrid/electric detection: PHEV, FHEV, ELECTRIC patterns
- Flex-fuel detection: FLEX modifier
- Handle parsing failures gracefully
Phase 2: Data Extraction ⏳
Objective: Extract data from JSON files into normalized structures
2.1 JSON Extractor (etl/extractors/json_extractor.py)
class JsonExtractor:
def __init__(self, make_mapper: MakeNameMapper,
engine_parser: EngineSpecParser):
pass
def extract_make_data(self, json_file_path: str) -> MakeData:
"""Extract complete make data from JSON file"""
def extract_all_makes(self, sources_dir: str) -> List[MakeData]:
"""Process all JSON files in directory"""
def validate_json_structure(self, json_data: dict) -> ValidationResult:
"""Validate JSON structure before processing"""
Data Structures:
@dataclass
class MakeData:
name: str # Normalized display name
models: List[ModelData]
@dataclass
class ModelData:
name: str
years: List[int]
engines: List[EngineSpec]
trims: List[str] # From submodels
2.2 Electric Vehicle Handler
class ElectricVehicleHandler:
def create_default_engine(self) -> EngineSpec:
"""Create default 'Electric Motor' engine for empty arrays"""
def is_electric_vehicle(self, model_data: ModelData) -> bool:
"""Detect electric vehicles by empty engines + make patterns"""
Phase 3: Data Loading ⏳
Objective: Load JSON-extracted data into PostgreSQL
3.1 JSON Manual Loader (etl/loaders/json_manual_loader.py)
class JsonManualLoader:
def __init__(self, postgres_loader: PostgreSQLLoader):
pass
def load_make_data(self, make_data: MakeData, mode: LoadMode):
"""Load complete make data with referential integrity"""
def load_all_makes(self, makes_data: List[MakeData],
mode: LoadMode) -> LoadResult:
"""Batch load all makes with progress tracking"""
def handle_duplicates(self, table: str, data: List[Dict]) -> int:
"""Handle duplicate records based on natural keys"""
Load Modes:
- CLEAR:
TRUNCATE CASCADEthen insert (destructive) - APPEND: Insert with
ON CONFLICT DO NOTHING(safe)
3.2 Extend PostgreSQL Loader
Enhance etl/loaders/postgres_loader.py with JSON-specific methods:
def load_json_makes(self, makes: List[Dict], clear_existing: bool) -> int
def load_json_engines(self, engines: List[EngineSpec], clear_existing: bool) -> int
def create_model_year_relationships(self, model_years: List[Dict]) -> int
Phase 4: Pipeline Integration ⏳
Objective: Create manual JSON processing pipeline
4.1 Manual JSON Pipeline (etl/pipelines/manual_json_pipeline.py)
class ManualJsonPipeline:
def __init__(self, sources_dir: str):
self.extractor = JsonExtractor(...)
self.loader = JsonManualLoader(...)
def run_manual_pipeline(self, mode: LoadMode,
specific_make: Optional[str] = None) -> PipelineResult:
"""Complete JSON → PostgreSQL pipeline"""
def validate_before_load(self) -> ValidationReport:
"""Pre-flight validation of all JSON files"""
def generate_load_report(self) -> LoadReport:
"""Post-load statistics and data quality report"""
4.2 Pipeline Result Tracking
@dataclass
class PipelineResult:
success: bool
makes_processed: int
models_loaded: int
engines_loaded: int
trims_loaded: int
errors: List[str]
warnings: List[str]
duration: timedelta
Phase 5: CLI Integration ⏳
Objective: Add CLI commands for manual processing
5.1 Main CLI Updates (etl/main.py)
@cli.command()
@click.option('--mode', type=click.Choice(['clear', 'append']),
default='append', help='Load mode')
@click.option('--make', help='Process specific make only')
@click.option('--validate-only', is_flag=True,
help='Validate JSON files without loading')
def load_manual(mode, make, validate_only):
"""Load vehicle data from JSON files"""
@cli.command()
def validate_json():
"""Validate all JSON files structure and data quality"""
5.2 Configuration Updates (etl/config.py)
# JSON Processing settings
JSON_SOURCES_DIR: str = "sources/makes"
MANUAL_LOAD_DEFAULT_MODE: str = "append"
ELECTRIC_DEFAULT_ENGINE: str = "Electric Motor"
ENGINE_PARSING_STRICT: bool = False # Log vs fail on parse errors
Phase 6: Testing & Validation ⏳
Objective: Comprehensive testing and validation
6.1 Unit Tests
test_make_name_mapper.py- Make name normalizationtest_engine_spec_parser.py- Engine parsing with L→I normalizationtest_json_extractor.py- JSON data extractiontest_manual_loader.py- Database loading
6.2 Integration Tests
test_manual_pipeline.py- End-to-end JSON processingtest_api_integration.py- Verify API endpoints work with JSON datatest_data_quality.py- Data quality validation
6.3 Data Validation Scripts
# examples/validate_all_json.py
def validate_all_makes() -> ValidationReport:
"""Validate all 55 JSON files and report issues"""
# examples/compare_data_sources.py
def compare_mssql_vs_json() -> ComparisonReport:
"""Compare MSSQL vs JSON data for overlapping makes"""
File Structure Changes
New Files to Create
etl/
├── utils/
│ ├── make_name_mapper.py # Make name normalization
│ └── engine_spec_parser.py # Engine specification parsing
├── extractors/
│ └── json_extractor.py # JSON data extraction
├── loaders/
│ └── json_manual_loader.py # JSON-specific data loading
└── pipelines/
└── manual_json_pipeline.py # JSON processing pipeline
Files to Modify
etl/
├── main.py # Add load-manual command
├── config.py # Add JSON processing config
└── loaders/
└── postgres_loader.py # Extend for JSON data types
Implementation Order
Week 1: Foundation
- ✅ Create documentation structure
- ⏳ Implement
MakeNameMapperwith validation - ⏳ Implement
EngineSpecParserwith L→I normalization - ⏳ Unit tests for utilities
Week 2: Data Processing
- ⏳ Implement
JsonExtractorwith validation - ⏳ Implement
ElectricVehicleHandler - ⏳ Create data structures and type definitions
- ⏳ Integration tests for extraction
Week 3: Data Loading
- ⏳ Implement
JsonManualLoaderwith clear/append modes - ⏳ Extend
PostgreSQLLoaderfor JSON data types - ⏳ Implement duplicate handling strategy
- ⏳ Database integration tests
Week 4: Pipeline & CLI
- ⏳ Implement
ManualJsonPipeline - ⏳ Add CLI commands with options
- ⏳ Add configuration management
- ⏳ End-to-end testing
Week 5: Validation & Polish
- ⏳ Comprehensive data validation
- ⏳ Performance testing with all 55 files
- ⏳ Error handling improvements
- ⏳ Documentation completion
Success Metrics
- Process all 55 JSON files without errors
- Correct make name normalization (alfa_romeo → Alfa Romeo)
- Engine parsing with L→I normalization working
- Electric vehicle handling (default engines created)
- Clear/append modes working correctly
- API endpoints return data from JSON sources
- Performance acceptable (<5 minutes for full load)
- Comprehensive error reporting and logging
Risk Mitigation
Data Quality Risks
- Mitigation: Extensive validation before loading
- Fallback: Report data quality issues, continue processing
Performance Risks
- Mitigation: Batch processing, progress tracking
- Fallback: Process makes individually if batch fails
Schema Compatibility Risks
- Mitigation: Thorough testing against existing schema
- Fallback: Schema migration scripts if needed
Integration Risks
- Mitigation: Maintain existing MSSQL pipeline compatibility
- Fallback: Feature flag to disable JSON processing