#!/usr/bin/env python3 """ Export PostgreSQL database to SQL files. Extracts current state from running mvp-postgres container and generates SQL import files compatible with the GitLab CI/CD pipeline. Usage: python3 export_from_postgres.py python3 export_from_postgres.py --output-dir custom/path Output files: - output/01_engines.sql - output/02_transmissions.sql - output/03_vehicle_options.sql """ import argparse import csv import io import subprocess import sys from pathlib import Path from typing import Dict, Iterable, List, Sequence BATCH_SIZE = 1000 def check_python_version(): """Ensure Python 3.7+ is being used.""" if sys.version_info < (3, 7): raise RuntimeError( f"Python 3.7 or higher required. Current version: {sys.version_info.major}.{sys.version_info.minor}" ) def check_container_running(): """Verify mvp-postgres container is running.""" try: result = subprocess.run( ["docker", "ps", "--filter", "name=mvp-postgres", "--format", "{{.Names}}"], capture_output=True, text=True, check=True, ) if "mvp-postgres" not in result.stdout: raise RuntimeError( "mvp-postgres container is not running.\n" "Start with: docker compose up -d mvp-postgres" ) except subprocess.CalledProcessError as e: raise RuntimeError(f"Failed to check Docker containers: {e}") def sql_value(value): """ Convert a Python value to its SQL representation. - None -> NULL - str -> 'escaped string' (single quotes doubled) - int/other -> str(value) """ if value is None: return "NULL" if isinstance(value, str): return "'" + value.replace("'", "''") + "'" return str(value) def chunked(seq: Iterable[Dict], size: int) -> Iterable[List[Dict]]: """ Yield successive chunks of `size` from sequence. Used to batch INSERT statements for better performance. """ chunk: List[Dict] = [] for item in seq: chunk.append(item) if len(chunk) >= size: yield chunk chunk = [] if chunk: yield chunk def write_insert_file( path: Path, table: str, columns: Sequence[str], rows: Sequence[Dict], ): """ Write batched INSERT statements to a SQL file. Args: path: Output file path table: Table name columns: Column names to insert rows: List of row dictionaries """ path.parent.mkdir(parents=True, exist_ok=True) with path.open("w", encoding="utf-8") as f: f.write(f"-- Auto-generated by export_from_postgres.py\n") if not rows: f.write(f"-- No rows for {table}\n") return for batch in chunked(rows, BATCH_SIZE): values_sql = ",\n".join( "(" + ",".join(sql_value(row[col]) for col in columns) + ")" for row in batch ) f.write(f"INSERT INTO {table} ({', '.join(columns)}) VALUES\n{values_sql};\n\n") def execute_psql_copy(query: str) -> str: """ Execute a PostgreSQL COPY command via docker exec. Args: query: SQL COPY query to execute Returns: CSV output as string Raises: RuntimeError: If command fails """ try: result = subprocess.run( [ "docker", "exec", "mvp-postgres", "psql", "-U", "postgres", "-d", "motovaultpro", "-c", query, ], capture_output=True, text=True, check=True, ) return result.stdout except subprocess.CalledProcessError as e: error_msg = e.stderr if e.stderr else str(e) raise RuntimeError(f"PostgreSQL query failed: {error_msg}") def export_engines(output_dir: Path) -> int: """ Export engines table to 01_engines.sql. Returns: Number of records exported """ query = "COPY (SELECT id, name, fuel_type FROM engines ORDER BY id) TO STDOUT WITH CSV HEADER" csv_output = execute_psql_copy(query) rows = [] try: reader = csv.DictReader(io.StringIO(csv_output)) for row in reader: rows.append({ "id": int(row["id"]), "name": row["name"], "fuel_type": row["fuel_type"] if row["fuel_type"] else None, }) except (csv.Error, KeyError, ValueError) as e: raise RuntimeError(f"Failed to parse engines CSV output: {e}") write_insert_file( output_dir / "01_engines.sql", "engines", ["id", "name", "fuel_type"], rows, ) return len(rows) def export_transmissions(output_dir: Path) -> int: """ Export transmissions table to 02_transmissions.sql. Returns: Number of records exported """ query = "COPY (SELECT id, type FROM transmissions ORDER BY id) TO STDOUT WITH CSV HEADER" csv_output = execute_psql_copy(query) rows = [] try: reader = csv.DictReader(io.StringIO(csv_output)) for row in reader: rows.append({ "id": int(row["id"]), "type": row["type"], }) except (csv.Error, KeyError, ValueError) as e: raise RuntimeError(f"Failed to parse transmissions CSV output: {e}") write_insert_file( output_dir / "02_transmissions.sql", "transmissions", ["id", "type"], rows, ) return len(rows) def export_vehicle_options(output_dir: Path) -> tuple: """ Export vehicle_options table to 03_vehicle_options.sql. Returns: Tuple of (record_count, min_year, max_year) """ query = """COPY ( SELECT year, make, model, trim, engine_id, transmission_id FROM vehicle_options ORDER BY year, make, model, trim ) TO STDOUT WITH CSV HEADER""" csv_output = execute_psql_copy(query) rows = [] years = [] try: reader = csv.DictReader(io.StringIO(csv_output)) for row in reader: year = int(row["year"]) years.append(year) rows.append({ "year": year, "make": row["make"], "model": row["model"], "trim": row["trim"], "engine_id": int(row["engine_id"]) if row["engine_id"] else None, "transmission_id": int(row["transmission_id"]) if row["transmission_id"] else None, }) except (csv.Error, KeyError, ValueError) as e: raise RuntimeError(f"Failed to parse vehicle_options CSV output: {e}") write_insert_file( output_dir / "03_vehicle_options.sql", "vehicle_options", ["year", "make", "model", "trim", "engine_id", "transmission_id"], rows, ) min_year = min(years) if years else None max_year = max(years) if years else None return len(rows), min_year, max_year def parse_args() -> argparse.Namespace: """Parse command-line arguments.""" parser = argparse.ArgumentParser( description="Export PostgreSQL vehicle catalog to SQL files.", ) parser.add_argument( "--output-dir", type=Path, default=Path("output"), help="Directory to write SQL output files (default: output)", ) return parser.parse_args() def main(): """Main export workflow.""" check_python_version() args = parse_args() output_dir: Path = args.output_dir print("Exporting from PostgreSQL database...") print() # Verify container is running try: check_container_running() except RuntimeError as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) # Export each table try: engines_count = export_engines(output_dir) print(f" Engines: {engines_count:,} records") trans_count = export_transmissions(output_dir) print(f" Transmissions: {trans_count:,} records") vehicles_count, min_year, max_year = export_vehicle_options(output_dir) print(f" Vehicle options: {vehicles_count:,} records") print() except RuntimeError as e: print(f"Error during export: {e}", file=sys.stderr) sys.exit(1) # Print summary print("SQL files generated:") for sql_file in sorted(output_dir.glob("*.sql")): size_kb = sql_file.stat().st_size / 1024 print(f" - {sql_file} ({size_kb:.0f}KB)") print() if min_year and max_year: print(f"Year coverage: {min_year}-{max_year}") print() print("Export complete! Commit these files to deploy:") print(f" git add {output_dir}/*.sql") print(f" git commit -m \"Update vehicle catalog from PostgreSQL export ({min_year}-{max_year})\"") if __name__ == "__main__": main()