fix: ETL vehicle db import fixes

This commit is contained in:
Eric Gullickson
2025-12-26 14:54:51 -06:00
parent fb52ce398b
commit 09410c3c3f
10 changed files with 67243 additions and 61980 deletions

View File

@@ -0,0 +1,322 @@
#!/usr/bin/env python3
"""
Export PostgreSQL database to SQL files.
Extracts current state from running mvp-postgres container and generates
SQL import files compatible with the GitLab CI/CD pipeline.
Usage:
python3 export_from_postgres.py
python3 export_from_postgres.py --output-dir custom/path
Output files:
- output/01_engines.sql
- output/02_transmissions.sql
- output/03_vehicle_options.sql
"""
import argparse
import csv
import io
import subprocess
import sys
from pathlib import Path
from typing import Dict, Iterable, List, Sequence
BATCH_SIZE = 1000
def check_python_version():
"""Ensure Python 3.7+ is being used."""
if sys.version_info < (3, 7):
raise RuntimeError(
f"Python 3.7 or higher required. Current version: {sys.version_info.major}.{sys.version_info.minor}"
)
def check_container_running():
"""Verify mvp-postgres container is running."""
try:
result = subprocess.run(
["docker", "ps", "--filter", "name=mvp-postgres", "--format", "{{.Names}}"],
capture_output=True,
text=True,
check=True,
)
if "mvp-postgres" not in result.stdout:
raise RuntimeError(
"mvp-postgres container is not running.\n"
"Start with: docker compose up -d mvp-postgres"
)
except subprocess.CalledProcessError as e:
raise RuntimeError(f"Failed to check Docker containers: {e}")
def sql_value(value):
"""
Convert a Python value to its SQL representation.
- None -> NULL
- str -> 'escaped string' (single quotes doubled)
- int/other -> str(value)
"""
if value is None:
return "NULL"
if isinstance(value, str):
return "'" + value.replace("'", "''") + "'"
return str(value)
def chunked(seq: Iterable[Dict], size: int) -> Iterable[List[Dict]]:
"""
Yield successive chunks of `size` from sequence.
Used to batch INSERT statements for better performance.
"""
chunk: List[Dict] = []
for item in seq:
chunk.append(item)
if len(chunk) >= size:
yield chunk
chunk = []
if chunk:
yield chunk
def write_insert_file(
path: Path,
table: str,
columns: Sequence[str],
rows: Sequence[Dict],
):
"""
Write batched INSERT statements to a SQL file.
Args:
path: Output file path
table: Table name
columns: Column names to insert
rows: List of row dictionaries
"""
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as f:
f.write(f"-- Auto-generated by export_from_postgres.py\n")
if not rows:
f.write(f"-- No rows for {table}\n")
return
for batch in chunked(rows, BATCH_SIZE):
values_sql = ",\n".join(
"(" + ",".join(sql_value(row[col]) for col in columns) + ")"
for row in batch
)
f.write(f"INSERT INTO {table} ({', '.join(columns)}) VALUES\n{values_sql};\n\n")
def execute_psql_copy(query: str) -> str:
"""
Execute a PostgreSQL COPY command via docker exec.
Args:
query: SQL COPY query to execute
Returns:
CSV output as string
Raises:
RuntimeError: If command fails
"""
try:
result = subprocess.run(
[
"docker",
"exec",
"mvp-postgres",
"psql",
"-U",
"postgres",
"-d",
"motovaultpro",
"-c",
query,
],
capture_output=True,
text=True,
check=True,
)
return result.stdout
except subprocess.CalledProcessError as e:
error_msg = e.stderr if e.stderr else str(e)
raise RuntimeError(f"PostgreSQL query failed: {error_msg}")
def export_engines(output_dir: Path) -> int:
"""
Export engines table to 01_engines.sql.
Returns:
Number of records exported
"""
query = "COPY (SELECT id, name, fuel_type FROM engines ORDER BY id) TO STDOUT WITH CSV HEADER"
csv_output = execute_psql_copy(query)
rows = []
try:
reader = csv.DictReader(io.StringIO(csv_output))
for row in reader:
rows.append({
"id": int(row["id"]),
"name": row["name"],
"fuel_type": row["fuel_type"] if row["fuel_type"] else None,
})
except (csv.Error, KeyError, ValueError) as e:
raise RuntimeError(f"Failed to parse engines CSV output: {e}")
write_insert_file(
output_dir / "01_engines.sql",
"engines",
["id", "name", "fuel_type"],
rows,
)
return len(rows)
def export_transmissions(output_dir: Path) -> int:
"""
Export transmissions table to 02_transmissions.sql.
Returns:
Number of records exported
"""
query = "COPY (SELECT id, type FROM transmissions ORDER BY id) TO STDOUT WITH CSV HEADER"
csv_output = execute_psql_copy(query)
rows = []
try:
reader = csv.DictReader(io.StringIO(csv_output))
for row in reader:
rows.append({
"id": int(row["id"]),
"type": row["type"],
})
except (csv.Error, KeyError, ValueError) as e:
raise RuntimeError(f"Failed to parse transmissions CSV output: {e}")
write_insert_file(
output_dir / "02_transmissions.sql",
"transmissions",
["id", "type"],
rows,
)
return len(rows)
def export_vehicle_options(output_dir: Path) -> tuple:
"""
Export vehicle_options table to 03_vehicle_options.sql.
Returns:
Tuple of (record_count, min_year, max_year)
"""
query = """COPY (
SELECT year, make, model, trim, engine_id, transmission_id
FROM vehicle_options
ORDER BY year, make, model, trim
) TO STDOUT WITH CSV HEADER"""
csv_output = execute_psql_copy(query)
rows = []
years = []
try:
reader = csv.DictReader(io.StringIO(csv_output))
for row in reader:
year = int(row["year"])
years.append(year)
rows.append({
"year": year,
"make": row["make"],
"model": row["model"],
"trim": row["trim"],
"engine_id": int(row["engine_id"]) if row["engine_id"] else None,
"transmission_id": int(row["transmission_id"]) if row["transmission_id"] else None,
})
except (csv.Error, KeyError, ValueError) as e:
raise RuntimeError(f"Failed to parse vehicle_options CSV output: {e}")
write_insert_file(
output_dir / "03_vehicle_options.sql",
"vehicle_options",
["year", "make", "model", "trim", "engine_id", "transmission_id"],
rows,
)
min_year = min(years) if years else None
max_year = max(years) if years else None
return len(rows), min_year, max_year
def parse_args() -> argparse.Namespace:
"""Parse command-line arguments."""
parser = argparse.ArgumentParser(
description="Export PostgreSQL vehicle catalog to SQL files.",
)
parser.add_argument(
"--output-dir",
type=Path,
default=Path("output"),
help="Directory to write SQL output files (default: output)",
)
return parser.parse_args()
def main():
"""Main export workflow."""
check_python_version()
args = parse_args()
output_dir: Path = args.output_dir
print("Exporting from PostgreSQL database...")
print()
# Verify container is running
try:
check_container_running()
except RuntimeError as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
# Export each table
try:
engines_count = export_engines(output_dir)
print(f" Engines: {engines_count:,} records")
trans_count = export_transmissions(output_dir)
print(f" Transmissions: {trans_count:,} records")
vehicles_count, min_year, max_year = export_vehicle_options(output_dir)
print(f" Vehicle options: {vehicles_count:,} records")
print()
except RuntimeError as e:
print(f"Error during export: {e}", file=sys.stderr)
sys.exit(1)
# Print summary
print("SQL files generated:")
for sql_file in sorted(output_dir.glob("*.sql")):
size_kb = sql_file.stat().st_size / 1024
print(f" - {sql_file} ({size_kb:.0f}KB)")
print()
if min_year and max_year:
print(f"Year coverage: {min_year}-{max_year}")
print()
print("Export complete! Commit these files to deploy:")
print(f" git add {output_dir}/*.sql")
print(f" git commit -m \"Update vehicle catalog from PostgreSQL export ({min_year}-{max_year})\"")
if __name__ == "__main__":
main()