Update Dockerfile with curl
This commit is contained in:
@@ -34,8 +34,8 @@ Step 1: Fetch Data from VehAPI
|
||||
python3 vehapi_fetch_snapshot.py --min-year 2020 --max-year 2020
|
||||
|
||||
# Full ETL workflow
|
||||
./reset_database.sh # Clear old data
|
||||
python3 vehapi_fetch_snapshot.py # Fetch from API
|
||||
python3 etl_generate_sql.py --snapshot-path snapshots/<date> # Generate SQL
|
||||
./import_data.sh # Import to Postgres
|
||||
docker compose exec mvp-redis redis-cli FLUSHALL # Flush Redis Cache for front end
|
||||
./reset_database.sh # Clear old data
|
||||
python3 vehapi_fetch_snapshot.py # Fetch from API
|
||||
python3 etl_generate_sql.py --snapshot-path snapshots/*.sqlite # Generate SQL
|
||||
./import_data.sh # Import to Postgres
|
||||
docker compose exec mvp-redis redis-cli FLUSHALL # Flush Redis Cache for front end
|
||||
|
||||
131
data/vehicle-etl/merge_snapshots.py
Normal file
131
data/vehicle-etl/merge_snapshots.py
Normal file
@@ -0,0 +1,131 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Merges two VehAPI snapshot databases into a single consolidated database.
|
||||
Handles deduplication via PRIMARY KEY constraint.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sqlite3
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def merge_databases(db1_path: Path, db2_path: Path, output_path: Path) -> dict:
|
||||
"""Merge two snapshot databases into one, deduplicating by PRIMARY KEY."""
|
||||
|
||||
if output_path.exists():
|
||||
output_path.unlink()
|
||||
print(f"[info] Removed existing output file: {output_path}")
|
||||
|
||||
conn = sqlite3.connect(output_path)
|
||||
conn.execute("PRAGMA journal_mode=WAL;")
|
||||
conn.execute("PRAGMA synchronous=NORMAL;")
|
||||
|
||||
# Create target schema (pairs table only)
|
||||
conn.execute("""
|
||||
CREATE TABLE pairs(
|
||||
year INT,
|
||||
make TEXT,
|
||||
model TEXT,
|
||||
trim TEXT,
|
||||
engine_display TEXT,
|
||||
engine_canon TEXT,
|
||||
engine_bucket TEXT,
|
||||
trans_display TEXT,
|
||||
trans_canon TEXT,
|
||||
trans_bucket TEXT,
|
||||
PRIMARY KEY(year, make, model, trim, engine_canon, trans_canon)
|
||||
)
|
||||
""")
|
||||
conn.commit()
|
||||
|
||||
# Attach source databases
|
||||
conn.execute(f"ATTACH DATABASE '{db1_path}' AS db1")
|
||||
conn.execute(f"ATTACH DATABASE '{db2_path}' AS db2")
|
||||
|
||||
# Insert from first database
|
||||
print(f"[info] Inserting records from {db1_path.name}...")
|
||||
cursor = conn.execute("""
|
||||
INSERT OR IGNORE INTO pairs
|
||||
SELECT year, make, model, trim, engine_display, engine_canon, engine_bucket,
|
||||
trans_display, trans_canon, trans_bucket
|
||||
FROM db1.pairs
|
||||
""")
|
||||
db1_inserted = cursor.rowcount
|
||||
conn.commit()
|
||||
print(f"[info] Inserted {db1_inserted:,} records from {db1_path.name}")
|
||||
|
||||
# Insert from second database (duplicates ignored)
|
||||
print(f"[info] Inserting records from {db2_path.name}...")
|
||||
cursor = conn.execute("""
|
||||
INSERT OR IGNORE INTO pairs
|
||||
SELECT year, make, model, trim, engine_display, engine_canon, engine_bucket,
|
||||
trans_display, trans_canon, trans_bucket
|
||||
FROM db2.pairs
|
||||
""")
|
||||
db2_inserted = cursor.rowcount
|
||||
conn.commit()
|
||||
print(f"[info] Inserted {db2_inserted:,} new records from {db2_path.name}")
|
||||
|
||||
# Detach source databases
|
||||
conn.execute("DETACH DATABASE db1")
|
||||
conn.execute("DETACH DATABASE db2")
|
||||
|
||||
# Get final stats
|
||||
total_count = conn.execute("SELECT COUNT(*) FROM pairs").fetchone()[0]
|
||||
min_year = conn.execute("SELECT MIN(year) FROM pairs").fetchone()[0]
|
||||
max_year = conn.execute("SELECT MAX(year) FROM pairs").fetchone()[0]
|
||||
|
||||
# Optimize the database
|
||||
print("[info] Running VACUUM to optimize database...")
|
||||
conn.execute("VACUUM")
|
||||
conn.close()
|
||||
|
||||
stats = {
|
||||
"db1_inserted": db1_inserted,
|
||||
"db2_inserted": db2_inserted,
|
||||
"total_records": total_count,
|
||||
"min_year": min_year,
|
||||
"max_year": max_year,
|
||||
"output_path": str(output_path),
|
||||
}
|
||||
|
||||
return stats
|
||||
|
||||
|
||||
def main() -> int:
|
||||
base_dir = Path(__file__).resolve().parent
|
||||
snapshots_dir = base_dir / "snapshots"
|
||||
|
||||
db1_path = snapshots_dir / "1980-2007.sqlite"
|
||||
db2_path = snapshots_dir / "2007-2022.sqlite"
|
||||
output_path = snapshots_dir / "1980-2022-vehicles.sqlite"
|
||||
|
||||
# Validate source files exist
|
||||
if not db1_path.exists():
|
||||
print(f"[error] Source database not found: {db1_path}", file=sys.stderr)
|
||||
return 1
|
||||
if not db2_path.exists():
|
||||
print(f"[error] Source database not found: {db2_path}", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
print(f"[info] Merging databases...")
|
||||
print(f" Source 1: {db1_path}")
|
||||
print(f" Source 2: {db2_path}")
|
||||
print(f" Output: {output_path}")
|
||||
print()
|
||||
|
||||
stats = merge_databases(db1_path, db2_path, output_path)
|
||||
|
||||
print()
|
||||
print(f"[done] Merge complete!")
|
||||
print(f" Total records: {stats['total_records']:,}")
|
||||
print(f" Year range: {stats['min_year']} - {stats['max_year']}")
|
||||
print(f" Output: {stats['output_path']}")
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
Reference in New Issue
Block a user