#!/usr/bin/env python3 """ Merges two VehAPI snapshot databases into a single consolidated database. Handles deduplication via PRIMARY KEY constraint. """ from __future__ import annotations import sqlite3 import sys from pathlib import Path def merge_databases(db1_path: Path, db2_path: Path, output_path: Path) -> dict: """Merge two snapshot databases into one, deduplicating by PRIMARY KEY.""" if output_path.exists(): output_path.unlink() print(f"[info] Removed existing output file: {output_path}") conn = sqlite3.connect(output_path) conn.execute("PRAGMA journal_mode=WAL;") conn.execute("PRAGMA synchronous=NORMAL;") # Create target schema (pairs table only) conn.execute(""" CREATE TABLE pairs( year INT, make TEXT, model TEXT, trim TEXT, engine_display TEXT, engine_canon TEXT, engine_bucket TEXT, trans_display TEXT, trans_canon TEXT, trans_bucket TEXT, PRIMARY KEY(year, make, model, trim, engine_canon, trans_canon) ) """) conn.commit() # Attach source databases conn.execute(f"ATTACH DATABASE '{db1_path}' AS db1") conn.execute(f"ATTACH DATABASE '{db2_path}' AS db2") # Insert from first database print(f"[info] Inserting records from {db1_path.name}...") cursor = conn.execute(""" INSERT OR IGNORE INTO pairs SELECT year, make, model, trim, engine_display, engine_canon, engine_bucket, trans_display, trans_canon, trans_bucket FROM db1.pairs """) db1_inserted = cursor.rowcount conn.commit() print(f"[info] Inserted {db1_inserted:,} records from {db1_path.name}") # Insert from second database (duplicates ignored) print(f"[info] Inserting records from {db2_path.name}...") cursor = conn.execute(""" INSERT OR IGNORE INTO pairs SELECT year, make, model, trim, engine_display, engine_canon, engine_bucket, trans_display, trans_canon, trans_bucket FROM db2.pairs """) db2_inserted = cursor.rowcount conn.commit() print(f"[info] Inserted {db2_inserted:,} new records from {db2_path.name}") # Detach source databases conn.execute("DETACH DATABASE db1") conn.execute("DETACH DATABASE db2") # Get final stats total_count = conn.execute("SELECT COUNT(*) FROM pairs").fetchone()[0] min_year = conn.execute("SELECT MIN(year) FROM pairs").fetchone()[0] max_year = conn.execute("SELECT MAX(year) FROM pairs").fetchone()[0] # Optimize the database print("[info] Running VACUUM to optimize database...") conn.execute("VACUUM") conn.close() stats = { "db1_inserted": db1_inserted, "db2_inserted": db2_inserted, "total_records": total_count, "min_year": min_year, "max_year": max_year, "output_path": str(output_path), } return stats def main() -> int: base_dir = Path(__file__).resolve().parent snapshots_dir = base_dir / "snapshots" db1_path = snapshots_dir / "1980-2007.sqlite" db2_path = snapshots_dir / "2007-2022.sqlite" output_path = snapshots_dir / "1980-2022-vehicles.sqlite" # Validate source files exist if not db1_path.exists(): print(f"[error] Source database not found: {db1_path}", file=sys.stderr) return 1 if not db2_path.exists(): print(f"[error] Source database not found: {db2_path}", file=sys.stderr) return 1 print(f"[info] Merging databases...") print(f" Source 1: {db1_path}") print(f" Source 2: {db2_path}") print(f" Output: {output_path}") print() stats = merge_databases(db1_path, db2_path, output_path) print() print(f"[done] Merge complete!") print(f" Total records: {stats['total_records']:,}") print(f" Year range: {stats['min_year']} - {stats['max_year']}") print(f" Output: {stats['output_path']}") return 0 if __name__ == "__main__": sys.exit(main())