Database Migrations
This document provides comprehensive guidance on managing database schema changes in MockLoop MCP through the migration system. The migration system ensures safe, versioned, and reversible database schema updates.
Overview
MockLoop MCP uses a robust migration system to manage database schema changes across different environments. The system supports:
- Versioned Migrations: Each migration has a unique version number
- Reversible Changes: All migrations can be rolled back
- Environment Safety: Migrations work across SQLite, PostgreSQL, and MySQL
- Dependency Management: Migrations can depend on other migrations
- Validation: Schema validation before and after migrations
- Backup Integration: Automatic backups before major changes
Migration System Architecture
Migration Structure
from abc import ABC, abstractmethod
from typing import Optional, List
import logging
class Migration(ABC):
"""Base class for all database migrations."""
version: int
description: str
dependencies: List[int] = []
@abstractmethod
def up(self, connection: DatabaseConnection) -> None:
"""Apply the migration."""
pass
@abstractmethod
def down(self, connection: DatabaseConnection) -> None:
"""Reverse the migration."""
pass
def validate(self, connection: DatabaseConnection) -> bool:
"""Validate migration can be applied."""
return True
def backup_required(self) -> bool:
"""Whether this migration requires a backup."""
return False
class MigrationManager:
"""Manages database migrations."""
def __init__(self, database_url: str, migrations_path: str):
self.database = DatabaseConnection(database_url)
self.migrations_path = migrations_path
self.logger = logging.getLogger(__name__)
async def get_current_version(self) -> int:
"""Get current database schema version."""
async def get_pending_migrations(self) -> List[Migration]:
"""Get list of pending migrations."""
async def apply_migration(self, migration: Migration) -> bool:
"""Apply a single migration."""
async def rollback_migration(self, migration: Migration) -> bool:
"""Rollback a single migration."""
async def migrate_to_version(self, target_version: int) -> bool:
"""Migrate to specific version."""
Migration Discovery
class MigrationDiscovery:
"""Discovers and loads migration files."""
def __init__(self, migrations_path: str):
self.migrations_path = Path(migrations_path)
def discover_migrations(self) -> List[Migration]:
"""Discover all migration files."""
migrations = []
for file_path in self.migrations_path.glob("*.py"):
if file_path.name.startswith("migration_"):
migration = self.load_migration(file_path)
if migration:
migrations.append(migration)
return sorted(migrations, key=lambda m: m.version)
def load_migration(self, file_path: Path) -> Optional[Migration]:
"""Load migration from file."""
try:
spec = importlib.util.spec_from_file_location("migration", file_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# Find migration class
for attr_name in dir(module):
attr = getattr(module, attr_name)
if (isinstance(attr, type) and
issubclass(attr, Migration) and
attr != Migration):
return attr()
except Exception as e:
self.logger.error(f"Failed to load migration {file_path}: {e}")
return None
Creating Migrations
Migration File Structure
Migration files follow a specific naming convention and structure:
# migrations/migration_001_initial_schema.py
from mockloop_mcp.database.migration import Migration
from mockloop_mcp.database.connection import DatabaseConnection
class InitialSchema(Migration):
version = 1
description = "Create initial database schema"
def up(self, connection: DatabaseConnection) -> None:
"""Create initial tables."""
# Create request_logs table
connection.execute("""
CREATE TABLE request_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
method VARCHAR(10) NOT NULL,
path TEXT NOT NULL,
query_params TEXT,
headers TEXT,
request_body TEXT,
response_status INTEGER NOT NULL,
response_headers TEXT,
response_body TEXT,
response_time_ms INTEGER,
server_id VARCHAR(255),
client_ip VARCHAR(45),
user_agent TEXT,
request_id VARCHAR(255),
scenario_name VARCHAR(255),
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
)
""")
# Create indexes
connection.execute("""
CREATE INDEX idx_request_logs_timestamp ON request_logs(timestamp)
""")
connection.execute("""
CREATE INDEX idx_request_logs_server_id ON request_logs(server_id)
""")
# Create mock_servers table
connection.execute("""
CREATE TABLE mock_servers (
id VARCHAR(255) PRIMARY KEY,
name VARCHAR(255) NOT NULL,
spec_path TEXT NOT NULL,
spec_content TEXT,
output_directory TEXT NOT NULL,
port INTEGER,
status VARCHAR(50) NOT NULL DEFAULT 'stopped',
pid INTEGER,
config TEXT,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
started_at DATETIME,
stopped_at DATETIME
)
""")
# Create schema_version table
connection.execute("""
CREATE TABLE schema_version (
version INTEGER PRIMARY KEY,
description TEXT,
applied_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
)
""")
# Insert initial version
connection.execute("""
INSERT INTO schema_version (version, description)
VALUES (1, 'Initial schema')
""")
def down(self, connection: DatabaseConnection) -> None:
"""Drop all tables."""
connection.execute("DROP TABLE IF EXISTS request_logs")
connection.execute("DROP TABLE IF EXISTS mock_servers")
connection.execute("DROP TABLE IF EXISTS schema_version")
Adding New Tables
# migrations/migration_002_add_scenarios.py
class AddScenariosTable(Migration):
version = 2
description = "Add scenarios table for mock response management"
dependencies = [1] # Depends on initial schema
def up(self, connection: DatabaseConnection) -> None:
"""Add scenarios table."""
connection.execute("""
CREATE TABLE scenarios (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name VARCHAR(255) NOT NULL,
server_id VARCHAR(255) NOT NULL,
description TEXT,
config TEXT NOT NULL,
is_active BOOLEAN NOT NULL DEFAULT FALSE,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
created_by VARCHAR(255),
FOREIGN KEY (server_id) REFERENCES mock_servers(id) ON DELETE CASCADE,
UNIQUE(name, server_id)
)
""")
# Add indexes
connection.execute("""
CREATE INDEX idx_scenarios_name ON scenarios(name)
""")
connection.execute("""
CREATE INDEX idx_scenarios_server_id ON scenarios(server_id)
""")
connection.execute("""
CREATE INDEX idx_scenarios_is_active ON scenarios(is_active)
""")
# Update schema version
connection.execute("""
INSERT INTO schema_version (version, description)
VALUES (2, 'Add scenarios table')
""")
def down(self, connection: DatabaseConnection) -> None:
"""Remove scenarios table."""
connection.execute("DROP TABLE IF EXISTS scenarios")
connection.execute("DELETE FROM schema_version WHERE version = 2")
Modifying Existing Tables
# migrations/migration_003_add_webhook_support.py
class AddWebhookSupport(Migration):
version = 3
description = "Add webhook tables and modify request_logs"
dependencies = [2]
def backup_required(self) -> bool:
"""This migration modifies existing data."""
return True
def up(self, connection: DatabaseConnection) -> None:
"""Add webhook support."""
# Add webhook_url column to request_logs
connection.execute("""
ALTER TABLE request_logs
ADD COLUMN webhook_url TEXT
""")
# Create webhooks table
connection.execute("""
CREATE TABLE webhooks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
server_id VARCHAR(255) NOT NULL,
name VARCHAR(255) NOT NULL,
url TEXT NOT NULL,
method VARCHAR(10) NOT NULL DEFAULT 'POST',
headers TEXT,
events TEXT NOT NULL,
secret_key VARCHAR(255),
is_active BOOLEAN NOT NULL DEFAULT TRUE,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (server_id) REFERENCES mock_servers(id) ON DELETE CASCADE
)
""")
# Create webhook_deliveries table
connection.execute("""
CREATE TABLE webhook_deliveries (
id INTEGER PRIMARY KEY AUTOINCREMENT,
webhook_id INTEGER NOT NULL,
event_type VARCHAR(255) NOT NULL,
payload TEXT NOT NULL,
response_status INTEGER,
response_body TEXT,
delivery_time_ms INTEGER,
attempt_number INTEGER NOT NULL DEFAULT 1,
success BOOLEAN NOT NULL DEFAULT FALSE,
error_message TEXT,
delivered_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (webhook_id) REFERENCES webhooks(id) ON DELETE CASCADE
)
""")
# Add indexes
connection.execute("""
CREATE INDEX idx_webhooks_server_id ON webhooks(server_id)
""")
connection.execute("""
CREATE INDEX idx_webhook_deliveries_webhook_id ON webhook_deliveries(webhook_id)
""")
# Update schema version
connection.execute("""
INSERT INTO schema_version (version, description)
VALUES (3, 'Add webhook support')
""")
def down(self, connection: DatabaseConnection) -> None:
"""Remove webhook support."""
# Drop webhook tables
connection.execute("DROP TABLE IF EXISTS webhook_deliveries")
connection.execute("DROP TABLE IF EXISTS webhooks")
# Remove webhook_url column (SQLite doesn't support DROP COLUMN)
if connection.database_type == "sqlite":
# Recreate table without webhook_url column
connection.execute("""
CREATE TABLE request_logs_new (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
method VARCHAR(10) NOT NULL,
path TEXT NOT NULL,
query_params TEXT,
headers TEXT,
request_body TEXT,
response_status INTEGER NOT NULL,
response_headers TEXT,
response_body TEXT,
response_time_ms INTEGER,
server_id VARCHAR(255),
client_ip VARCHAR(45),
user_agent TEXT,
request_id VARCHAR(255),
scenario_name VARCHAR(255),
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
)
""")
connection.execute("""
INSERT INTO request_logs_new
SELECT id, timestamp, method, path, query_params, headers,
request_body, response_status, response_headers,
response_body, response_time_ms, server_id, client_ip,
user_agent, request_id, scenario_name, created_at, updated_at
FROM request_logs
""")
connection.execute("DROP TABLE request_logs")
connection.execute("ALTER TABLE request_logs_new RENAME TO request_logs")
# Recreate indexes
connection.execute("CREATE INDEX idx_request_logs_timestamp ON request_logs(timestamp)")
connection.execute("CREATE INDEX idx_request_logs_server_id ON request_logs(server_id)")
else:
# PostgreSQL/MySQL support DROP COLUMN
connection.execute("ALTER TABLE request_logs DROP COLUMN webhook_url")
connection.execute("DELETE FROM schema_version WHERE version = 3")
Database-Specific Migrations
SQLite Migrations
class SQLiteMigration(Migration):
"""Base class for SQLite-specific migrations."""
def recreate_table_without_column(self, connection: DatabaseConnection,
table_name: str, column_to_remove: str) -> None:
"""Helper to remove column from SQLite table."""
# Get table schema
result = connection.execute(f"PRAGMA table_info({table_name})")
columns = [row for row in result if row[1] != column_to_remove]
# Create new table
column_defs = []
for col in columns:
col_def = f"{col[1]} {col[2]}"
if col[3]: # NOT NULL
col_def += " NOT NULL"
if col[4]: # DEFAULT
col_def += f" DEFAULT {col[4]}"
if col[5]: # PRIMARY KEY
col_def += " PRIMARY KEY"
column_defs.append(col_def)
new_table_sql = f"""
CREATE TABLE {table_name}_new (
{', '.join(column_defs)}
)
"""
connection.execute(new_table_sql)
# Copy data
column_names = [col[1] for col in columns]
connection.execute(f"""
INSERT INTO {table_name}_new ({', '.join(column_names)})
SELECT {', '.join(column_names)} FROM {table_name}
""")
# Replace table
connection.execute(f"DROP TABLE {table_name}")
connection.execute(f"ALTER TABLE {table_name}_new RENAME TO {table_name}")
PostgreSQL Migrations
class PostgreSQLMigration(Migration):
"""Base class for PostgreSQL-specific migrations."""
def create_enum(self, connection: DatabaseConnection, enum_name: str, values: List[str]) -> None:
"""Create PostgreSQL enum type."""
values_str = "', '".join(values)
connection.execute(f"CREATE TYPE {enum_name} AS ENUM ('{values_str}')")
def add_column_with_default(self, connection: DatabaseConnection,
table_name: str, column_def: str, default_value: str) -> None:
"""Add column with default value efficiently."""
connection.execute(f"ALTER TABLE {table_name} ADD COLUMN {column_def}")
connection.execute(f"UPDATE {table_name} SET {column_def.split()[0]} = {default_value}")
MySQL Migrations
class MySQLMigration(Migration):
"""Base class for MySQL-specific migrations."""
def modify_column(self, connection: DatabaseConnection,
table_name: str, column_name: str, new_definition: str) -> None:
"""Modify column definition in MySQL."""
connection.execute(f"ALTER TABLE {table_name} MODIFY COLUMN {column_name} {new_definition}")
def add_index_if_not_exists(self, connection: DatabaseConnection,
table_name: str, index_name: str, columns: str) -> None:
"""Add index only if it doesn't exist."""
connection.execute(f"""
CREATE INDEX {index_name} ON {table_name} ({columns})
""")
Migration Management
Running Migrations
# Apply all pending migrations
mockloop db migrate
# Migrate to specific version
mockloop db migrate --version 5
# Check migration status
mockloop db status
# Show pending migrations
mockloop db pending
# Validate migrations without applying
mockloop db validate
Migration CLI Commands
class MigrationCLI:
"""Command-line interface for migrations."""
def __init__(self, migration_manager: MigrationManager):
self.manager = migration_manager
async def migrate(self, target_version: Optional[int] = None) -> None:
"""Apply migrations."""
if target_version:
await self.manager.migrate_to_version(target_version)
else:
await self.manager.migrate_to_latest()
async def rollback(self, target_version: int) -> None:
"""Rollback to specific version."""
current_version = await self.manager.get_current_version()
if target_version >= current_version:
print(f"Target version {target_version} is not lower than current version {current_version}")
return
# Confirm rollback
if not self.confirm_rollback(current_version, target_version):
return
await self.manager.rollback_to_version(target_version)
async def status(self) -> None:
"""Show migration status."""
current_version = await self.manager.get_current_version()
pending_migrations = await self.manager.get_pending_migrations()
print(f"Current database version: {current_version}")
print(f"Pending migrations: {len(pending_migrations)}")
for migration in pending_migrations:
print(f" - Version {migration.version}: {migration.description}")
def confirm_rollback(self, current_version: int, target_version: int) -> bool:
"""Confirm rollback operation."""
print(f"WARNING: This will rollback from version {current_version} to {target_version}")
print("This operation may result in data loss.")
response = input("Are you sure you want to continue? (yes/no): ")
return response.lower() == "yes"
Automated Migration Testing
class MigrationTester:
"""Tests migration up and down operations."""
def __init__(self, database_url: str):
self.database_url = database_url
self.test_database_url = self.create_test_database_url()
async def test_migration(self, migration: Migration) -> TestResult:
"""Test a migration's up and down operations."""
# Create test database
test_db = DatabaseConnection(self.test_database_url)
try:
# Apply migration
migration.up(test_db)
# Validate schema
up_validation = await self.validate_schema_after_up(test_db, migration)
# Test rollback
migration.down(test_db)
# Validate rollback
down_validation = await self.validate_schema_after_down(test_db, migration)
return TestResult(
migration_version=migration.version,
up_success=up_validation.success,
down_success=down_validation.success,
errors=up_validation.errors + down_validation.errors
)
finally:
# Cleanup test database
await self.cleanup_test_database(test_db)
async def test_all_migrations(self) -> List[TestResult]:
"""Test all migrations."""
discovery = MigrationDiscovery("./migrations")
migrations = discovery.discover_migrations()
results = []
for migration in migrations:
result = await self.test_migration(migration)
results.append(result)
return results
Data Migrations
Migrating Existing Data
class DataMigration(Migration):
"""Base class for data migrations."""
def migrate_data_in_batches(self, connection: DatabaseConnection,
query: str, batch_size: int = 1000) -> None:
"""Migrate data in batches to avoid memory issues."""
offset = 0
while True:
batch_query = f"{query} LIMIT {batch_size} OFFSET {offset}"
rows = connection.execute(batch_query).fetchall()
if not rows:
break
self.process_batch(connection, rows)
offset += batch_size
def process_batch(self, connection: DatabaseConnection, rows: List[tuple]) -> None:
"""Process a batch of rows."""
raise NotImplementedError
# Example data migration
class MigrateRequestLogFormat(DataMigration):
version = 4
description = "Migrate request log format from JSON to structured columns"
dependencies = [3]
def backup_required(self) -> bool:
return True
def up(self, connection: DatabaseConnection) -> None:
"""Migrate request log data format."""
# Add new columns
connection.execute("ALTER TABLE request_logs ADD COLUMN parsed_headers TEXT")
connection.execute("ALTER TABLE request_logs ADD COLUMN parsed_query_params TEXT")
# Migrate existing data
self.migrate_data_in_batches(
connection,
"SELECT id, headers, query_params FROM request_logs WHERE parsed_headers IS NULL"
)
# Update schema version
connection.execute("""
INSERT INTO schema_version (version, description)
VALUES (4, 'Migrate request log format')
""")
def process_batch(self, connection: DatabaseConnection, rows: List[tuple]) -> None:
"""Process batch of request logs."""
for row_id, headers_json, query_params_json in rows:
try:
# Parse and restructure data
parsed_headers = self.parse_headers(headers_json)
parsed_query_params = self.parse_query_params(query_params_json)
# Update row
connection.execute("""
UPDATE request_logs
SET parsed_headers = ?, parsed_query_params = ?
WHERE id = ?
""", (parsed_headers, parsed_query_params, row_id))
except Exception as e:
self.logger.error(f"Failed to migrate row {row_id}: {e}")
def parse_headers(self, headers_json: str) -> str:
"""Parse headers JSON into structured format."""
# Implementation for parsing headers
pass
def parse_query_params(self, query_params_json: str) -> str:
"""Parse query parameters JSON into structured format."""
# Implementation for parsing query parameters
pass
def down(self, connection: DatabaseConnection) -> None:
"""Rollback data migration."""
connection.execute("ALTER TABLE request_logs DROP COLUMN parsed_headers")
connection.execute("ALTER TABLE request_logs DROP COLUMN parsed_query_params")
connection.execute("DELETE FROM schema_version WHERE version = 4")
Migration Best Practices
1. Migration Safety
class SafeMigration(Migration):
"""Template for safe migrations."""
def validate(self, connection: DatabaseConnection) -> bool:
"""Validate migration can be safely applied."""
# Check database constraints
if not self.check_constraints(connection):
return False
# Check data integrity
if not self.check_data_integrity(connection):
return False
# Check disk space
if not self.check_disk_space(connection):
return False
return True
def check_constraints(self, connection: DatabaseConnection) -> bool:
"""Check database constraints."""
# Verify foreign key constraints
# Check unique constraints
# Validate data types
return True
def check_data_integrity(self, connection: DatabaseConnection) -> bool:
"""Check data integrity before migration."""
# Verify data consistency
# Check for orphaned records
# Validate data formats
return True
def check_disk_space(self, connection: DatabaseConnection) -> bool:
"""Check available disk space."""
# Estimate migration space requirements
# Check available disk space
return True
2. Backup Integration
class BackupManager:
"""Manages database backups for migrations."""
def __init__(self, database_url: str, backup_path: str):
self.database_url = database_url
self.backup_path = Path(backup_path)
async def create_backup(self, migration_version: int) -> str:
"""Create database backup before migration."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_filename = f"backup_v{migration_version}_{timestamp}.sql"
backup_file = self.backup_path / backup_filename
# Create backup based on database type
if self.database_url.startswith("sqlite"):
await self.backup_sqlite(backup_file)
elif self.database_url.startswith("postgresql"):
await self.backup_postgresql(backup_file)
elif self.database_url.startswith("mysql"):
await self.backup_mysql(backup_file)
return str(backup_file)
async def restore_backup(self, backup_file: str) -> bool:
"""Restore database from backup."""
# Implementation for restoring backup
pass
3. Migration Monitoring
class MigrationMonitor:
"""Monitors migration progress and performance."""
def __init__(self):
self.start_time = None
self.metrics = {}
def start_migration(self, migration: Migration) -> None:
"""Start monitoring migration."""
self.start_time = time.time()
self.metrics = {
"version": migration.version,
"description": migration.description,
"start_time": self.start_time
}
def end_migration(self, success: bool) -> None:
"""End monitoring migration."""
end_time = time.time()
self.metrics.update({
"end_time": end_time,
"duration": end_time - self.start_time,
"success": success
})
# Log metrics
self.log_metrics()
def log_metrics(self) -> None:
"""Log migration metrics."""
logger.info(f"Migration {self.metrics['version']} completed", extra=self.metrics)
Troubleshooting Migrations
Common Issues
1. Failed Migration Recovery
class MigrationRecovery:
"""Handles migration failure recovery."""
async def recover_from_failed_migration(self, migration_version: int) -> bool:
"""Recover from failed migration."""
# Check migration state
state = await self.check_migration_state(migration_version)
if state == "partial":
# Attempt to complete migration
return await self.complete_partial_migration(migration_version)
elif state == "failed":
# Rollback failed migration
return await self.rollback_failed_migration(migration_version)
return False
async def check_migration_state(self, migration_version: int) -> str:
"""Check the state of a migration."""
# Check schema_version table
# Verify expected schema changes
# Check for partial data migration
pass
2. Schema Validation
class SchemaValidator:
"""Validates database schema after migrations."""
def __init__(self, connection: DatabaseConnection):
self.connection = connection
async def validate_schema(self, expected_version: int) -> ValidationResult:
"""Validate database schema matches expected version."""
# Check schema version
current_version = await self.get_schema_version()
if current_version != expected_version:
return ValidationResult(False, f"Version mismatch: {current_version} != {expected_version}")
# Validate table structure
table_validation = await self.validate_tables()
if not table_validation.success:
return table_validation
# Validate indexes
index_validation = await self.validate_indexes()
if not index_validation.success:
return index_validation
# Validate constraints
constraint_validation = await self.validate_constraints()
if not constraint_validation.success:
return constraint_validation
return ValidationResult(True, "Schema validation passed")
Migration Debugging
# Enable debug logging
export MOCKLOOP_LOG_LEVEL=debug
# Run migration with verbose output
mockloop db migrate --verbose
# Check migration history
mockloop db history
# Validate current schema
mockloop db validate-schema
# Show migration details
mockloop db show-migration --version 3
See Also
- Database Schema: Complete database schema reference
- Configuration Options: Database configuration options
- Architecture: System architecture overview
- Troubleshooting: General troubleshooting guide