438 lines
18 KiB
Python
438 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
MSSQL Database Loader
|
|
Handles loading .bak files into MSSQL Server for ETL processing
|
|
"""
|
|
import os
|
|
import logging
|
|
import pyodbc
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Optional, List
|
|
from ..config import config
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class MSSQLLoader:
|
|
"""Loads database files into MSSQL Server"""
|
|
|
|
def __init__(self):
|
|
self.server = config.MSSQL_HOST
|
|
self.port = config.MSSQL_PORT
|
|
self.database = config.MSSQL_DATABASE
|
|
self.username = config.MSSQL_USER
|
|
self.password = config.MSSQL_PASSWORD
|
|
|
|
def get_connection_string(self, database: str = "master") -> str:
|
|
"""Get MSSQL connection string"""
|
|
return (
|
|
f"DRIVER={{ODBC Driver 17 for SQL Server}};"
|
|
f"SERVER={self.server},{self.port};"
|
|
f"DATABASE={database};"
|
|
f"UID={self.username};"
|
|
f"PWD={self.password};"
|
|
f"TrustServerCertificate=yes;"
|
|
)
|
|
|
|
def test_connection(self) -> bool:
|
|
"""Test MSSQL connection"""
|
|
try:
|
|
conn_str = self.get_connection_string()
|
|
logger.info(f"Testing MSSQL connection to: {self.server}")
|
|
|
|
with pyodbc.connect(conn_str, timeout=30) as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT @@VERSION")
|
|
version = cursor.fetchone()[0]
|
|
logger.info(f"MSSQL connection successful: {version[:100]}...")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"MSSQL connection failed: {e}")
|
|
return False
|
|
|
|
def database_exists(self, database_name: str) -> bool:
|
|
"""Check if database exists"""
|
|
try:
|
|
conn_str = self.get_connection_string()
|
|
with pyodbc.connect(conn_str, timeout=30) as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"SELECT COUNT(*) FROM sys.databases WHERE name = ?",
|
|
(database_name,)
|
|
)
|
|
count = cursor.fetchone()[0]
|
|
return count > 0
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to check if database exists: {e}")
|
|
return False
|
|
|
|
def get_database_state(self, database_name: str) -> Optional[str]:
|
|
"""Return the state_desc for a database or None if not found"""
|
|
try:
|
|
conn_str = self.get_connection_string()
|
|
with pyodbc.connect(conn_str, timeout=30) as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"SELECT state_desc FROM sys.databases WHERE name = ?",
|
|
(database_name,)
|
|
)
|
|
row = cursor.fetchone()
|
|
return row[0] if row else None
|
|
except Exception as e:
|
|
logger.error(f"Failed to get database state: {e}")
|
|
return None
|
|
|
|
def drop_database(self, database_name: str) -> bool:
|
|
"""Drop database if it exists"""
|
|
try:
|
|
if not self.database_exists(database_name):
|
|
logger.info(f"Database {database_name} does not exist, skipping drop")
|
|
return True
|
|
|
|
logger.info(f"Dropping database: {database_name}")
|
|
conn_str = self.get_connection_string()
|
|
|
|
with pyodbc.connect(conn_str, timeout=30) as conn:
|
|
conn.autocommit = True
|
|
cursor = conn.cursor()
|
|
|
|
# Kill existing connections
|
|
cursor.execute(f"""
|
|
ALTER DATABASE [{database_name}] SET SINGLE_USER WITH ROLLBACK IMMEDIATE;
|
|
DROP DATABASE [{database_name}];
|
|
""")
|
|
|
|
logger.info(f"Successfully dropped database: {database_name}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to drop database {database_name}: {e}")
|
|
return False
|
|
|
|
def get_backup_file_info(self, bak_path: Path) -> Optional[dict]:
|
|
"""Get information about backup file"""
|
|
try:
|
|
# Use the MSSQL container's mounted backup directory
|
|
container_path = f"/backups/{bak_path.name}"
|
|
|
|
# For now, assume the file is accessible
|
|
# In production, this would copy the file into the MSSQL container
|
|
|
|
conn_str = self.get_connection_string()
|
|
with pyodbc.connect(conn_str, timeout=30) as conn:
|
|
cursor = conn.cursor()
|
|
|
|
# Get backup file information
|
|
cursor.execute(f"RESTORE HEADERONLY FROM DISK = '{container_path}'")
|
|
headers = cursor.fetchall()
|
|
|
|
if headers:
|
|
header = headers[0]
|
|
return {
|
|
"database_name": header.DatabaseName,
|
|
"server_name": header.ServerName,
|
|
"backup_start_date": header.BackupStartDate,
|
|
"backup_finish_date": header.BackupFinishDate,
|
|
"backup_size": header.BackupSize,
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Could not get backup file info: {e}")
|
|
|
|
return None
|
|
|
|
def restore_database(self, bak_path: Path, target_database: str = None) -> bool:
|
|
"""
|
|
Restore database from .bak file
|
|
|
|
Args:
|
|
bak_path: Path to .bak file
|
|
target_database: Target database name (defaults to VPICList)
|
|
|
|
Returns:
|
|
True if successful
|
|
"""
|
|
if target_database is None:
|
|
target_database = self.database
|
|
|
|
if not bak_path.exists():
|
|
logger.error(f"Backup file does not exist: {bak_path}")
|
|
return False
|
|
|
|
logger.info(f"Starting database restore: {bak_path} -> {target_database}")
|
|
|
|
try:
|
|
# Copy backup file to MSSQL container
|
|
container_bak_path = self.copy_backup_to_container(bak_path)
|
|
|
|
if not container_bak_path:
|
|
logger.error("Failed to copy backup file to container")
|
|
return False
|
|
|
|
# If database exists, note the state; we will handle exclusivity in the same session below
|
|
if self.database_exists(target_database):
|
|
state = self.get_database_state(target_database)
|
|
logger.info(f"Existing database detected: {target_database} (state={state})")
|
|
else:
|
|
logger.info(f"Target database does not exist yet: {target_database} — proceeding with restore")
|
|
|
|
# Restore database using a single master connection for exclusivity
|
|
logger.info(f"Restoring database from: {container_bak_path}")
|
|
|
|
conn_str = self.get_connection_string()
|
|
with pyodbc.connect(conn_str, timeout=600) as conn: # 10 minute timeout
|
|
conn.autocommit = True
|
|
cursor = conn.cursor()
|
|
|
|
# If DB exists, ensure exclusive access: kill sessions + SINGLE_USER in this session
|
|
if self.database_exists(target_database):
|
|
try:
|
|
logger.info(f"Preparing exclusive access for restore: killing active sessions on {target_database}")
|
|
kill_sql = f"""
|
|
DECLARE @db sysname = N'{target_database}';
|
|
DECLARE @kill nvarchar(max) = N'';
|
|
SELECT @kill = @kill + N'KILL ' + CONVERT(nvarchar(10), session_id) + N';'
|
|
FROM sys.dm_exec_sessions
|
|
WHERE database_id = DB_ID(@db) AND session_id <> @@SPID;
|
|
IF LEN(@kill) > 0 EXEC (@kill);
|
|
"""
|
|
cursor.execute(kill_sql)
|
|
# Force SINGLE_USER in current session
|
|
cursor.execute(f"ALTER DATABASE [{target_database}] SET SINGLE_USER WITH ROLLBACK IMMEDIATE;")
|
|
logger.info(f"Exclusive access prepared (SINGLE_USER) for {target_database}")
|
|
except Exception as e:
|
|
logger.warning(f"Could not fully prepare exclusive access: {e}")
|
|
|
|
# Get logical file names from backup
|
|
cursor.execute(f"RESTORE FILELISTONLY FROM DISK = '{container_bak_path}'")
|
|
files = cursor.fetchall()
|
|
|
|
if not files:
|
|
logger.error("No files found in backup")
|
|
return False
|
|
|
|
# Build RESTORE command with MOVE options
|
|
data_file = None
|
|
log_file = None
|
|
|
|
for file_info in files:
|
|
logical_name = file_info.LogicalName
|
|
file_type = file_info.Type
|
|
|
|
if file_type == 'D': # Data file
|
|
data_file = logical_name
|
|
elif file_type == 'L': # Log file
|
|
log_file = logical_name
|
|
|
|
if not data_file:
|
|
logger.error("No data file found in backup")
|
|
return False
|
|
|
|
# Construct restore command
|
|
restore_sql = f"""
|
|
RESTORE DATABASE [{target_database}]
|
|
FROM DISK = '{container_bak_path}'
|
|
WITH
|
|
MOVE '{data_file}' TO '/var/opt/mssql/data/{target_database}.mdf',
|
|
"""
|
|
|
|
if log_file:
|
|
restore_sql += f" MOVE '{log_file}' TO '/var/opt/mssql/data/{target_database}.ldf',"
|
|
|
|
restore_sql += """
|
|
REPLACE,
|
|
RECOVERY,
|
|
STATS = 10
|
|
"""
|
|
|
|
logger.info(f"Executing restore command for database: {target_database}")
|
|
logger.debug(f"Restore SQL: {restore_sql}")
|
|
|
|
try:
|
|
cursor.execute(restore_sql)
|
|
except Exception as e:
|
|
# If we hit exclusive access error, retry once after killing sessions again
|
|
if 'Exclusive access could not be obtained' in str(e):
|
|
logger.warning("Exclusive access error on RESTORE; retrying after killing sessions and reasserting SINGLE_USER...")
|
|
try:
|
|
cursor.execute(kill_sql)
|
|
cursor.execute(f"ALTER DATABASE [{target_database}] SET SINGLE_USER WITH ROLLBACK IMMEDIATE;")
|
|
except Exception as e2:
|
|
logger.warning(f"Retry exclusive prep failed: {e2}")
|
|
cursor.execute(restore_sql)
|
|
else:
|
|
raise
|
|
|
|
# Poll for database to be ONLINE
|
|
if not self._wait_for_database_online(target_database):
|
|
logger.error(f"Database did not come ONLINE in time: {target_database}")
|
|
return False
|
|
|
|
# Small retry around database_exists to handle late readiness
|
|
if self._retry_database_exists(target_database):
|
|
logger.info(f"Database restore successful and ONLINE: {target_database}")
|
|
|
|
# Get basic database info
|
|
cursor.execute(f"""
|
|
SELECT
|
|
name,
|
|
create_date,
|
|
compatibility_level,
|
|
state_desc
|
|
FROM sys.databases
|
|
WHERE name = '{target_database}'
|
|
""")
|
|
|
|
db_info = cursor.fetchone()
|
|
if db_info:
|
|
logger.info(f"Database info: Name={db_info.name}, Created={db_info.create_date}, Level={db_info.compatibility_level}, State={db_info.state_desc}")
|
|
|
|
# Optional: quick content verification with small retry window
|
|
if not self._retry_verify_content(target_database):
|
|
logger.warning("Database restored but content verification is inconclusive")
|
|
|
|
# Try to set MULTI_USER back in same session
|
|
try:
|
|
cursor.execute(f"ALTER DATABASE [{target_database}] SET MULTI_USER;")
|
|
logger.info(f"Set {target_database} back to MULTI_USER")
|
|
except Exception as e:
|
|
logger.warning(f"Could not set MULTI_USER on {target_database}: {e}")
|
|
|
|
return True
|
|
else:
|
|
logger.error(f"Database restore failed - database not found: {target_database}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Database restore failed: {e}")
|
|
return False
|
|
|
|
def copy_backup_to_container(self, bak_path: Path) -> Optional[str]:
|
|
"""
|
|
Copy backup file to shared volume accessible by MSSQL container
|
|
|
|
Args:
|
|
bak_path: Local path to .bak file
|
|
|
|
Returns:
|
|
Container path to .bak file or None if failed
|
|
"""
|
|
try:
|
|
# Use shared volume instead of docker cp
|
|
shared_dir = Path("/app/shared")
|
|
shared_bak_path = shared_dir / bak_path.name
|
|
|
|
# If the file is already in the shared dir, skip copying
|
|
if bak_path.resolve().parent == shared_dir.resolve():
|
|
logger.info(f"Backup already in shared volume: {bak_path}")
|
|
else:
|
|
logger.info(f"Copying {bak_path} to shared volume...")
|
|
import shutil
|
|
shutil.copy2(bak_path, shared_bak_path)
|
|
|
|
# Container path from MSSQL perspective
|
|
container_path = f"/backups/{shared_bak_path.name}"
|
|
|
|
logger.info(f"Successfully copied to shared volume: {container_path}")
|
|
return container_path
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to copy backup to shared volume: {e}")
|
|
return None
|
|
|
|
def _wait_for_database_online(self, database_name: str, timeout_seconds: int = 600, interval_seconds: int = 5) -> bool:
|
|
"""Poll MSSQL until the specified database state becomes ONLINE or timeout.
|
|
|
|
Returns True if ONLINE, False on timeout/error.
|
|
"""
|
|
logger.info(f"Waiting for database to become ONLINE: {database_name}")
|
|
deadline = time.time() + timeout_seconds
|
|
last_state = None
|
|
try:
|
|
conn_str = self.get_connection_string()
|
|
while time.time() < deadline:
|
|
with pyodbc.connect(conn_str, timeout=30) as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT state_desc FROM sys.databases WHERE name = ?", (database_name,))
|
|
row = cursor.fetchone()
|
|
if row:
|
|
state = row[0]
|
|
if state != last_state:
|
|
logger.info(f"Database state: {state}")
|
|
last_state = state
|
|
if state == 'ONLINE':
|
|
# Optional: verify updateability is READ_WRITE
|
|
try:
|
|
cursor.execute("SELECT DATABASEPROPERTYEX(?, 'Updateability')", (database_name,))
|
|
up = cursor.fetchone()[0]
|
|
logger.info(f"Database updateability: {up}")
|
|
except Exception:
|
|
pass
|
|
return True
|
|
else:
|
|
logger.info("Database entry not found yet in sys.databases")
|
|
time.sleep(interval_seconds)
|
|
except Exception as e:
|
|
logger.error(f"Error while waiting for database ONLINE: {e}")
|
|
return False
|
|
logger.error("Timed out waiting for database to become ONLINE")
|
|
return False
|
|
|
|
def _retry_database_exists(self, database_name: str, attempts: int = 6, delay_seconds: int = 5) -> bool:
|
|
"""Retry wrapper for database existence checks."""
|
|
for i in range(1, attempts + 1):
|
|
if self.database_exists(database_name):
|
|
return True
|
|
logger.info(f"database_exists() false, retrying ({i}/{attempts})...")
|
|
time.sleep(delay_seconds)
|
|
return False
|
|
|
|
def _retry_verify_content(self, database_name: str, attempts: int = 3, delay_seconds: int = 5) -> bool:
|
|
"""Retry wrapper around verify_database_content to allow late readiness."""
|
|
for i in range(1, attempts + 1):
|
|
try:
|
|
counts = self.verify_database_content(database_name)
|
|
if counts:
|
|
logger.info(f"Content verification counts: {counts}")
|
|
return True
|
|
except Exception as e:
|
|
logger.info(f"Content verification attempt {i} failed: {e}")
|
|
time.sleep(delay_seconds)
|
|
return False
|
|
|
|
def verify_database_content(self, database_name: str = None) -> dict:
|
|
"""
|
|
Verify database has expected content
|
|
|
|
Returns:
|
|
Dictionary with table counts
|
|
"""
|
|
if database_name is None:
|
|
database_name = self.database
|
|
|
|
try:
|
|
conn_str = self.get_connection_string(database_name)
|
|
with pyodbc.connect(conn_str, timeout=30) as conn:
|
|
cursor = conn.cursor()
|
|
|
|
# Get table counts for key tables
|
|
tables_to_check = ['Make', 'Model', 'VehicleType', 'Manufacturer']
|
|
counts = {}
|
|
|
|
for table in tables_to_check:
|
|
try:
|
|
cursor.execute(f"SELECT COUNT(*) FROM {table}")
|
|
count = cursor.fetchone()[0]
|
|
counts[table] = count
|
|
logger.info(f"Table {table}: {count:,} rows")
|
|
except:
|
|
counts[table] = 0
|
|
|
|
return counts
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to verify database content: {e}")
|
|
return {}
|