System Architecture
This document provides a comprehensive overview of MockLoop MCP's system architecture, including core components, design patterns, data flow, and integration points.
Overview
MockLoop MCP is designed as a modular, scalable system for generating and managing sophisticated mock API servers. The architecture follows modern software engineering principles including separation of concerns, dependency injection, and event-driven design.
High-Level Architecture
graph TB
    subgraph "Client Layer"
        CLI[CLI Interface]
        MCP[MCP Tools]
        API[Admin API]
        UI[Web UI]
    end
    subgraph "Application Layer"
        Core[Core Engine]
        Gen[Generator]
        Mgr[Server Manager]
        Log[Log Analyzer]
    end
    subgraph "Service Layer"
        Auth[Authentication]
        Webhook[Webhook Service]
        Storage[Storage Service]
        Cache[Cache Service]
    end
    subgraph "Data Layer"
        DB[(Database)]
        FS[File System]
        Redis[(Redis Cache)]
    end
    subgraph "Generated Servers"
        Mock1[Mock Server 1]
        Mock2[Mock Server 2]
        MockN[Mock Server N]
    end
    CLI --> Core
    MCP --> Core
    API --> Core
    UI --> API
    Core --> Gen
    Core --> Mgr
    Core --> Log
    Gen --> Auth
    Gen --> Storage
    Mgr --> Webhook
    Log --> Cache
    Auth --> DB
    Storage --> FS
    Cache --> Redis
    Webhook --> DB
    Mgr --> Mock1
    Mgr --> Mock2
    Mgr --> MockN
    Mock1 --> DB
    Mock2 --> DB
    MockN --> DBCore Components
1. Core Engine
The central orchestrator that coordinates all system operations.
class MockLoopCore:
    """
    Central engine for MockLoop MCP operations.
    Coordinates between generators, managers, and services
    to provide unified functionality.
    """
    def __init__(self, config: Configuration):
        self.config = config
        self.generator = MockServerGenerator(config.generation)
        self.manager = MockServerManager(config.server)
        self.log_analyzer = LogAnalyzer(config.logging)
        self.auth_service = AuthenticationService(config.auth)
        self.webhook_service = WebhookService(config.webhooks)
        self.storage_service = StorageService(config.storage)
    async def generate_mock_server(self, spec_source: str, options: GenerationOptions) -> GenerationResult:
        """Generate a new mock server from API specification."""
    async def manage_server_lifecycle(self, server_id: str, action: str) -> OperationResult:
        """Manage server lifecycle (start, stop, restart)."""
    async def analyze_server_logs(self, server_id: str, query: LogQuery) -> LogAnalysis:
        """Analyze server request logs."""
2. Mock Server Generator
Responsible for parsing API specifications and generating FastAPI mock servers.
class MockServerGenerator:
    """
    Generates FastAPI mock servers from OpenAPI specifications.
    Handles specification parsing, code generation, and file creation.
    """
    def __init__(self, config: GenerationConfig):
        self.config = config
        self.parser = SpecificationParser()
        self.code_generator = CodeGenerator()
        self.template_engine = TemplateEngine()
    async def generate(self, spec_source: str, output_dir: str, options: GenerationOptions) -> GenerationResult:
        """
        Main generation workflow:
        1. Parse API specification
        2. Generate route definitions
        3. Create FastAPI application
        4. Generate middleware and utilities
        5. Create configuration files
        """
        # Parse specification
        parsed_spec = await self.parser.parse(spec_source)
        # Generate routes
        routes = self.code_generator.generate_routes(parsed_spec, options)
        # Generate application
        app_code = self.code_generator.generate_application(routes, options)
        # Generate middleware
        middleware = self.code_generator.generate_middleware(options)
        # Write files
        await self.write_generated_files(output_dir, app_code, middleware, routes)
        return GenerationResult(
            server_id=generate_server_id(),
            output_directory=output_dir,
            routes=routes,
            config=options
        )
3. Server Manager
Manages the lifecycle of generated mock servers.
class MockServerManager:
    """
    Manages mock server instances and their lifecycle.
    Handles starting, stopping, monitoring, and configuration
    of generated mock servers.
    """
    def __init__(self, config: ServerConfig):
        self.config = config
        self.servers: Dict[str, ServerInstance] = {}
        self.process_monitor = ProcessMonitor()
    async def start_server(self, server_config: ServerConfiguration) -> ServerInstance:
        """Start a mock server instance."""
    async def stop_server(self, server_id: str) -> bool:
        """Stop a running mock server."""
    async def restart_server(self, server_id: str) -> ServerInstance:
        """Restart a mock server."""
    async def monitor_servers(self) -> List[ServerStatus]:
        """Monitor health and status of all servers."""
4. Log Analyzer
Analyzes request logs and provides insights.
class LogAnalyzer:
    """
    Analyzes request logs to provide insights and metrics.
    Processes log data to generate statistics, identify patterns,
    and detect anomalies.
    """
    def __init__(self, config: LoggingConfig):
        self.config = config
        self.database = DatabaseConnection(config.database)
        self.metrics_calculator = MetricsCalculator()
    async def analyze_logs(self, query: LogQuery) -> LogAnalysis:
        """Analyze logs based on query parameters."""
    async def generate_report(self, server_id: str, time_range: TimeRange) -> AnalysisReport:
        """Generate comprehensive analysis report."""
    async def detect_anomalies(self, server_id: str) -> List[Anomaly]:
        """Detect anomalies in request patterns."""
Data Flow Architecture
Request Processing Flow
sequenceDiagram
    participant Client
    participant MockServer
    participant Middleware
    participant Database
    participant Webhook
    Client->>MockServer: HTTP Request
    MockServer->>Middleware: Process Request
    Middleware->>Database: Log Request
    Middleware->>MockServer: Continue Processing
    MockServer->>Database: Get Mock Response
    MockServer->>Client: HTTP Response
    MockServer->>Database: Log Response
    MockServer->>Webhook: Send Event (async)Mock Generation Flow
sequenceDiagram
    participant User
    participant Core
    participant Generator
    participant Parser
    participant CodeGen
    participant FileSystem
    participant Database
    User->>Core: Generate Mock Server
    Core->>Generator: Parse Specification
    Generator->>Parser: Parse OpenAPI Spec
    Parser->>Generator: Parsed Specification
    Generator->>CodeGen: Generate Code
    CodeGen->>Generator: Generated Files
    Generator->>FileSystem: Write Files
    Generator->>Database: Store Configuration
    Generator->>Core: Generation Result
    Core->>User: Server CreatedComponent Interactions
Service Dependencies
# Dependency injection container
class ServiceContainer:
    def __init__(self, config: Configuration):
        # Core services
        self.database = DatabaseService(config.database)
        self.cache = CacheService(config.cache)
        self.storage = StorageService(config.storage)
        # Application services
        self.auth_service = AuthenticationService(config.auth, self.database)
        self.webhook_service = WebhookService(config.webhooks, self.database)
        self.log_service = LogService(self.database, self.cache)
        # Core components
        self.generator = MockServerGenerator(config.generation, self.storage)
        self.manager = MockServerManager(config.server, self.database)
        self.analyzer = LogAnalyzer(config.logging, self.database, self.cache)
        # API layer
        self.admin_api = AdminAPI(
            self.generator,
            self.manager,
            self.analyzer,
            self.auth_service
        )
Event System
MockLoop MCP uses an event-driven architecture for loose coupling:
class EventBus:
    """Central event bus for system-wide communication."""
    def __init__(self):
        self.handlers: Dict[str, List[EventHandler]] = {}
    def subscribe(self, event_type: str, handler: EventHandler):
        """Subscribe to events of a specific type."""
    async def publish(self, event: Event):
        """Publish an event to all subscribers."""
    async def publish_async(self, event: Event):
        """Publish event asynchronously."""
# Event types
class Events:
    SERVER_STARTED = "server.started"
    SERVER_STOPPED = "server.stopped"
    REQUEST_RECEIVED = "request.received"
    RESPONSE_SENT = "response.sent"
    SCENARIO_CHANGED = "scenario.changed"
    ERROR_OCCURRED = "error.occurred"
# Event handlers
class WebhookEventHandler:
    async def handle(self, event: Event):
        """Send webhook notifications for events."""
class LogEventHandler:
    async def handle(self, event: Event):
        """Log events to database."""
class MetricsEventHandler:
    async def handle(self, event: Event):
        """Update metrics based on events."""
Design Patterns
1. Factory Pattern
Used for creating different types of mock servers and components:
class MockServerFactory:
    """Factory for creating different types of mock servers."""
    @staticmethod
    def create_server(spec_type: str, config: ServerConfig) -> MockServer:
        if spec_type == "openapi":
            return OpenAPIMockServer(config)
        elif spec_type == "graphql":
            return GraphQLMockServer(config)
        elif spec_type == "grpc":
            return GRPCMockServer(config)
        else:
            raise ValueError(f"Unsupported spec type: {spec_type}")
class MiddlewareFactory:
    """Factory for creating middleware components."""
    @staticmethod
    def create_middleware(middleware_type: str, config: dict) -> Middleware:
        if middleware_type == "auth":
            return AuthenticationMiddleware(config)
        elif middleware_type == "logging":
            return LoggingMiddleware(config)
        elif middleware_type == "cors":
            return CORSMiddleware(config)
        else:
            raise ValueError(f"Unknown middleware type: {middleware_type}")
2. Strategy Pattern
Used for different authentication and storage strategies:
class AuthenticationStrategy(ABC):
    """Abstract base class for authentication strategies."""
    @abstractmethod
    async def authenticate(self, request: Request) -> AuthResult:
        pass
class APIKeyAuthentication(AuthenticationStrategy):
    async def authenticate(self, request: Request) -> AuthResult:
        # API key authentication logic
        pass
class JWTAuthentication(AuthenticationStrategy):
    async def authenticate(self, request: Request) -> AuthResult:
        # JWT authentication logic
        pass
class StorageStrategy(ABC):
    """Abstract base class for storage strategies."""
    @abstractmethod
    async def store(self, key: str, data: bytes) -> bool:
        pass
    @abstractmethod
    async def retrieve(self, key: str) -> Optional[bytes]:
        pass
class FileSystemStorage(StorageStrategy):
    async def store(self, key: str, data: bytes) -> bool:
        # File system storage logic
        pass
class S3Storage(StorageStrategy):
    async def store(self, key: str, data: bytes) -> bool:
        # S3 storage logic
        pass
3. Observer Pattern
Used for monitoring and event handling:
class ServerObserver(ABC):
    """Abstract observer for server events."""
    @abstractmethod
    async def on_server_started(self, server: ServerInstance):
        pass
    @abstractmethod
    async def on_server_stopped(self, server: ServerInstance):
        pass
    @abstractmethod
    async def on_request_received(self, server: ServerInstance, request: Request):
        pass
class MetricsObserver(ServerObserver):
    """Observer that collects metrics."""
    async def on_request_received(self, server: ServerInstance, request: Request):
        # Update request metrics
        pass
class LoggingObserver(ServerObserver):
    """Observer that logs server events."""
    async def on_server_started(self, server: ServerInstance):
        logger.info(f"Server {server.id} started on port {server.port}")
Scalability Considerations
Horizontal Scaling
MockLoop MCP is designed to scale horizontally:
class ClusterManager:
    """Manages MockLoop MCP cluster nodes."""
    def __init__(self, config: ClusterConfig):
        self.config = config
        self.nodes: Dict[str, NodeInfo] = {}
        self.load_balancer = LoadBalancer()
    async def add_node(self, node_info: NodeInfo):
        """Add a new node to the cluster."""
    async def remove_node(self, node_id: str):
        """Remove a node from the cluster."""
    async def distribute_load(self, request: Request) -> NodeInfo:
        """Distribute requests across cluster nodes."""
Database Scaling
Support for database scaling strategies:
class DatabaseCluster:
    """Manages database cluster for high availability."""
    def __init__(self, config: DatabaseClusterConfig):
        self.primary = DatabaseConnection(config.primary)
        self.replicas = [DatabaseConnection(replica) for replica in config.replicas]
        self.connection_pool = ConnectionPool()
    async def read_query(self, query: str) -> QueryResult:
        """Execute read query on replica."""
        replica = self.select_replica()
        return await replica.execute(query)
    async def write_query(self, query: str) -> QueryResult:
        """Execute write query on primary."""
        return await self.primary.execute(query)
Caching Strategy
Multi-level caching for performance:
class CacheManager:
    """Manages multi-level caching strategy."""
    def __init__(self, config: CacheConfig):
        self.l1_cache = MemoryCache(config.memory)  # In-memory cache
        self.l2_cache = RedisCache(config.redis)    # Distributed cache
        self.l3_cache = DatabaseCache(config.db)    # Persistent cache
    async def get(self, key: str) -> Optional[Any]:
        """Get value from cache hierarchy."""
        # Try L1 cache first
        value = await self.l1_cache.get(key)
        if value is not None:
            return value
        # Try L2 cache
        value = await self.l2_cache.get(key)
        if value is not None:
            await self.l1_cache.set(key, value)
            return value
        # Try L3 cache
        value = await self.l3_cache.get(key)
        if value is not None:
            await self.l2_cache.set(key, value)
            await self.l1_cache.set(key, value)
            return value
        return None
Security Architecture
Authentication Flow
sequenceDiagram
    participant Client
    participant Gateway
    participant Auth
    participant MockServer
    participant Database
    Client->>Gateway: Request with Credentials
    Gateway->>Auth: Validate Credentials
    Auth->>Database: Check User/API Key
    Database->>Auth: User Info
    Auth->>Gateway: Auth Result
    Gateway->>MockServer: Authorized Request
    MockServer->>Client: ResponseAuthorization Model
class AuthorizationManager:
    """Manages authorization policies and permissions."""
    def __init__(self, config: AuthConfig):
        self.policies = PolicyEngine(config.policies)
        self.roles = RoleManager(config.roles)
    async def authorize(self, user: User, resource: Resource, action: str) -> bool:
        """Check if user is authorized to perform action on resource."""
        # Check user roles
        user_roles = await self.roles.get_user_roles(user.id)
        # Evaluate policies
        for role in user_roles:
            if await self.policies.evaluate(role, resource, action):
                return True
        return False
class SecurityMiddleware:
    """Security middleware for request validation."""
    async def __call__(self, request: Request, call_next):
        # Validate request headers
        await self.validate_headers(request)
        # Check rate limits
        await self.check_rate_limits(request)
        # Validate input
        await self.validate_input(request)
        response = await call_next(request)
        # Add security headers
        self.add_security_headers(response)
        return response
Performance Optimization
Asynchronous Processing
MockLoop MCP uses async/await throughout for optimal performance:
class AsyncRequestProcessor:
    """Asynchronous request processor for high throughput."""
    def __init__(self, config: ProcessorConfig):
        self.semaphore = asyncio.Semaphore(config.max_concurrent_requests)
        self.request_queue = asyncio.Queue(maxsize=config.queue_size)
    async def process_request(self, request: Request) -> Response:
        """Process request asynchronously."""
        async with self.semaphore:
            # Process request
            response = await self.handle_request(request)
            # Log asynchronously
            asyncio.create_task(self.log_request(request, response))
            # Send webhooks asynchronously
            asyncio.create_task(self.send_webhooks(request, response))
            return response
Connection Pooling
Efficient database and HTTP connection management:
class ConnectionManager:
    """Manages database and HTTP connection pools."""
    def __init__(self, config: ConnectionConfig):
        self.db_pool = DatabasePool(
            min_connections=config.db_min_connections,
            max_connections=config.db_max_connections,
            timeout=config.db_timeout
        )
        self.http_pool = HTTPPool(
            max_connections=config.http_max_connections,
            max_keepalive_connections=config.http_keepalive_connections,
            keepalive_expiry=config.http_keepalive_expiry
        )
Monitoring and Observability
Metrics Collection
class MetricsCollector:
    """Collects system and application metrics."""
    def __init__(self, config: MetricsConfig):
        self.prometheus_registry = CollectorRegistry()
        self.request_counter = Counter('requests_total', 'Total requests')
        self.response_time_histogram = Histogram('response_time_seconds', 'Response time')
        self.error_counter = Counter('errors_total', 'Total errors')
    async def record_request(self, request: Request, response: Response, duration: float):
        """Record request metrics."""
        self.request_counter.inc()
        self.response_time_histogram.observe(duration)
        if response.status_code >= 400:
            self.error_counter.inc()
Health Checks
class HealthChecker:
    """Performs health checks on system components."""
    def __init__(self, config: HealthConfig):
        self.checks = [
            DatabaseHealthCheck(config.database),
            CacheHealthCheck(config.cache),
            StorageHealthCheck(config.storage),
            ExternalServiceHealthCheck(config.external_services)
        ]
    async def check_health(self) -> HealthStatus:
        """Perform all health checks."""
        results = await asyncio.gather(
            *[check.perform() for check in self.checks],
            return_exceptions=True
        )
        return HealthStatus.from_results(results)
Deployment Architecture
Container Architecture
# Multi-stage build for optimal image size
FROM python:3.11-slim as builder
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
FROM python:3.11-slim as runtime
WORKDIR /app
COPY --from=builder /usr/local/lib/python3.11/site-packages /usr/local/lib/python3.11/site-packages
COPY src/ ./src/
COPY templates/ ./templates/
EXPOSE 8000
CMD ["python", "-m", "mockloop_mcp.main"]
Kubernetes Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
  name: mockloop-mcp
spec:
  replicas: 3
  selector:
    matchLabels:
      app: mockloop-mcp
  template:
    metadata:
      labels:
        app: mockloop-mcp
    spec:
      containers:
      - name: mockloop-mcp
        image: mockloop/mcp:latest
        ports:
        - containerPort: 8000
        env:
        - name: MOCKLOOP_DATABASE_TYPE
          value: "postgresql"
        - name: MOCKLOOP_DATABASE_HOST
          value: "postgres-service"
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits:
            memory: "512Mi"
            cpu: "500m"
Extension Points
Plugin Architecture
class PluginManager:
    """Manages system plugins and extensions."""
    def __init__(self, config: PluginConfig):
        self.plugins: Dict[str, Plugin] = {}
        self.hooks: Dict[str, List[HookFunction]] = {}
    async def load_plugin(self, plugin_path: str):
        """Load a plugin from file."""
    async def execute_hook(self, hook_name: str, context: dict) -> dict:
        """Execute all functions registered for a hook."""
    def register_hook(self, hook_name: str, function: HookFunction):
        """Register a function for a specific hook."""
class Plugin(ABC):
    """Abstract base class for plugins."""
    @abstractmethod
    def initialize(self, context: PluginContext):
        """Initialize the plugin."""
    @abstractmethod
    def register_hooks(self, hook_registry: HookRegistry):
        """Register plugin hooks."""
See Also
- Database Schema: Database structure and relationships
- Configuration Options: System configuration reference
- Performance Optimization: Performance tuning guide
- Security Considerations: Security architecture details