System Architecture
This document provides a comprehensive overview of MockLoop MCP's system architecture, including core components, design patterns, data flow, and integration points.
Overview
MockLoop MCP is designed as a modular, scalable system for generating and managing sophisticated mock API servers. The architecture follows modern software engineering principles including separation of concerns, dependency injection, and event-driven design.
High-Level Architecture
graph TB
subgraph "Client Layer"
CLI[CLI Interface]
MCP[MCP Tools]
API[Admin API]
UI[Web UI]
end
subgraph "Application Layer"
Core[Core Engine]
Gen[Generator]
Mgr[Server Manager]
Log[Log Analyzer]
end
subgraph "Service Layer"
Auth[Authentication]
Webhook[Webhook Service]
Storage[Storage Service]
Cache[Cache Service]
end
subgraph "Data Layer"
DB[(Database)]
FS[File System]
Redis[(Redis Cache)]
end
subgraph "Generated Servers"
Mock1[Mock Server 1]
Mock2[Mock Server 2]
MockN[Mock Server N]
end
CLI --> Core
MCP --> Core
API --> Core
UI --> API
Core --> Gen
Core --> Mgr
Core --> Log
Gen --> Auth
Gen --> Storage
Mgr --> Webhook
Log --> Cache
Auth --> DB
Storage --> FS
Cache --> Redis
Webhook --> DB
Mgr --> Mock1
Mgr --> Mock2
Mgr --> MockN
Mock1 --> DB
Mock2 --> DB
MockN --> DB
Core Components
1. Core Engine
The central orchestrator that coordinates all system operations.
class MockLoopCore:
"""
Central engine for MockLoop MCP operations.
Coordinates between generators, managers, and services
to provide unified functionality.
"""
def __init__(self, config: Configuration):
self.config = config
self.generator = MockServerGenerator(config.generation)
self.manager = MockServerManager(config.server)
self.log_analyzer = LogAnalyzer(config.logging)
self.auth_service = AuthenticationService(config.auth)
self.webhook_service = WebhookService(config.webhooks)
self.storage_service = StorageService(config.storage)
async def generate_mock_server(self, spec_source: str, options: GenerationOptions) -> GenerationResult:
"""Generate a new mock server from API specification."""
async def manage_server_lifecycle(self, server_id: str, action: str) -> OperationResult:
"""Manage server lifecycle (start, stop, restart)."""
async def analyze_server_logs(self, server_id: str, query: LogQuery) -> LogAnalysis:
"""Analyze server request logs."""
2. Mock Server Generator
Responsible for parsing API specifications and generating FastAPI mock servers.
class MockServerGenerator:
"""
Generates FastAPI mock servers from OpenAPI specifications.
Handles specification parsing, code generation, and file creation.
"""
def __init__(self, config: GenerationConfig):
self.config = config
self.parser = SpecificationParser()
self.code_generator = CodeGenerator()
self.template_engine = TemplateEngine()
async def generate(self, spec_source: str, output_dir: str, options: GenerationOptions) -> GenerationResult:
"""
Main generation workflow:
1. Parse API specification
2. Generate route definitions
3. Create FastAPI application
4. Generate middleware and utilities
5. Create configuration files
"""
# Parse specification
parsed_spec = await self.parser.parse(spec_source)
# Generate routes
routes = self.code_generator.generate_routes(parsed_spec, options)
# Generate application
app_code = self.code_generator.generate_application(routes, options)
# Generate middleware
middleware = self.code_generator.generate_middleware(options)
# Write files
await self.write_generated_files(output_dir, app_code, middleware, routes)
return GenerationResult(
server_id=generate_server_id(),
output_directory=output_dir,
routes=routes,
config=options
)
3. Server Manager
Manages the lifecycle of generated mock servers.
class MockServerManager:
"""
Manages mock server instances and their lifecycle.
Handles starting, stopping, monitoring, and configuration
of generated mock servers.
"""
def __init__(self, config: ServerConfig):
self.config = config
self.servers: Dict[str, ServerInstance] = {}
self.process_monitor = ProcessMonitor()
async def start_server(self, server_config: ServerConfiguration) -> ServerInstance:
"""Start a mock server instance."""
async def stop_server(self, server_id: str) -> bool:
"""Stop a running mock server."""
async def restart_server(self, server_id: str) -> ServerInstance:
"""Restart a mock server."""
async def monitor_servers(self) -> List[ServerStatus]:
"""Monitor health and status of all servers."""
4. Log Analyzer
Analyzes request logs and provides insights.
class LogAnalyzer:
"""
Analyzes request logs to provide insights and metrics.
Processes log data to generate statistics, identify patterns,
and detect anomalies.
"""
def __init__(self, config: LoggingConfig):
self.config = config
self.database = DatabaseConnection(config.database)
self.metrics_calculator = MetricsCalculator()
async def analyze_logs(self, query: LogQuery) -> LogAnalysis:
"""Analyze logs based on query parameters."""
async def generate_report(self, server_id: str, time_range: TimeRange) -> AnalysisReport:
"""Generate comprehensive analysis report."""
async def detect_anomalies(self, server_id: str) -> List[Anomaly]:
"""Detect anomalies in request patterns."""
Data Flow Architecture
Request Processing Flow
sequenceDiagram
participant Client
participant MockServer
participant Middleware
participant Database
participant Webhook
Client->>MockServer: HTTP Request
MockServer->>Middleware: Process Request
Middleware->>Database: Log Request
Middleware->>MockServer: Continue Processing
MockServer->>Database: Get Mock Response
MockServer->>Client: HTTP Response
MockServer->>Database: Log Response
MockServer->>Webhook: Send Event (async)
Mock Generation Flow
sequenceDiagram
participant User
participant Core
participant Generator
participant Parser
participant CodeGen
participant FileSystem
participant Database
User->>Core: Generate Mock Server
Core->>Generator: Parse Specification
Generator->>Parser: Parse OpenAPI Spec
Parser->>Generator: Parsed Specification
Generator->>CodeGen: Generate Code
CodeGen->>Generator: Generated Files
Generator->>FileSystem: Write Files
Generator->>Database: Store Configuration
Generator->>Core: Generation Result
Core->>User: Server Created
Component Interactions
Service Dependencies
# Dependency injection container
class ServiceContainer:
def __init__(self, config: Configuration):
# Core services
self.database = DatabaseService(config.database)
self.cache = CacheService(config.cache)
self.storage = StorageService(config.storage)
# Application services
self.auth_service = AuthenticationService(config.auth, self.database)
self.webhook_service = WebhookService(config.webhooks, self.database)
self.log_service = LogService(self.database, self.cache)
# Core components
self.generator = MockServerGenerator(config.generation, self.storage)
self.manager = MockServerManager(config.server, self.database)
self.analyzer = LogAnalyzer(config.logging, self.database, self.cache)
# API layer
self.admin_api = AdminAPI(
self.generator,
self.manager,
self.analyzer,
self.auth_service
)
Event System
MockLoop MCP uses an event-driven architecture for loose coupling:
class EventBus:
"""Central event bus for system-wide communication."""
def __init__(self):
self.handlers: Dict[str, List[EventHandler]] = {}
def subscribe(self, event_type: str, handler: EventHandler):
"""Subscribe to events of a specific type."""
async def publish(self, event: Event):
"""Publish an event to all subscribers."""
async def publish_async(self, event: Event):
"""Publish event asynchronously."""
# Event types
class Events:
SERVER_STARTED = "server.started"
SERVER_STOPPED = "server.stopped"
REQUEST_RECEIVED = "request.received"
RESPONSE_SENT = "response.sent"
SCENARIO_CHANGED = "scenario.changed"
ERROR_OCCURRED = "error.occurred"
# Event handlers
class WebhookEventHandler:
async def handle(self, event: Event):
"""Send webhook notifications for events."""
class LogEventHandler:
async def handle(self, event: Event):
"""Log events to database."""
class MetricsEventHandler:
async def handle(self, event: Event):
"""Update metrics based on events."""
Design Patterns
1. Factory Pattern
Used for creating different types of mock servers and components:
class MockServerFactory:
"""Factory for creating different types of mock servers."""
@staticmethod
def create_server(spec_type: str, config: ServerConfig) -> MockServer:
if spec_type == "openapi":
return OpenAPIMockServer(config)
elif spec_type == "graphql":
return GraphQLMockServer(config)
elif spec_type == "grpc":
return GRPCMockServer(config)
else:
raise ValueError(f"Unsupported spec type: {spec_type}")
class MiddlewareFactory:
"""Factory for creating middleware components."""
@staticmethod
def create_middleware(middleware_type: str, config: dict) -> Middleware:
if middleware_type == "auth":
return AuthenticationMiddleware(config)
elif middleware_type == "logging":
return LoggingMiddleware(config)
elif middleware_type == "cors":
return CORSMiddleware(config)
else:
raise ValueError(f"Unknown middleware type: {middleware_type}")
2. Strategy Pattern
Used for different authentication and storage strategies:
class AuthenticationStrategy(ABC):
"""Abstract base class for authentication strategies."""
@abstractmethod
async def authenticate(self, request: Request) -> AuthResult:
pass
class APIKeyAuthentication(AuthenticationStrategy):
async def authenticate(self, request: Request) -> AuthResult:
# API key authentication logic
pass
class JWTAuthentication(AuthenticationStrategy):
async def authenticate(self, request: Request) -> AuthResult:
# JWT authentication logic
pass
class StorageStrategy(ABC):
"""Abstract base class for storage strategies."""
@abstractmethod
async def store(self, key: str, data: bytes) -> bool:
pass
@abstractmethod
async def retrieve(self, key: str) -> Optional[bytes]:
pass
class FileSystemStorage(StorageStrategy):
async def store(self, key: str, data: bytes) -> bool:
# File system storage logic
pass
class S3Storage(StorageStrategy):
async def store(self, key: str, data: bytes) -> bool:
# S3 storage logic
pass
3. Observer Pattern
Used for monitoring and event handling:
class ServerObserver(ABC):
"""Abstract observer for server events."""
@abstractmethod
async def on_server_started(self, server: ServerInstance):
pass
@abstractmethod
async def on_server_stopped(self, server: ServerInstance):
pass
@abstractmethod
async def on_request_received(self, server: ServerInstance, request: Request):
pass
class MetricsObserver(ServerObserver):
"""Observer that collects metrics."""
async def on_request_received(self, server: ServerInstance, request: Request):
# Update request metrics
pass
class LoggingObserver(ServerObserver):
"""Observer that logs server events."""
async def on_server_started(self, server: ServerInstance):
logger.info(f"Server {server.id} started on port {server.port}")
Scalability Considerations
Horizontal Scaling
MockLoop MCP is designed to scale horizontally:
class ClusterManager:
"""Manages MockLoop MCP cluster nodes."""
def __init__(self, config: ClusterConfig):
self.config = config
self.nodes: Dict[str, NodeInfo] = {}
self.load_balancer = LoadBalancer()
async def add_node(self, node_info: NodeInfo):
"""Add a new node to the cluster."""
async def remove_node(self, node_id: str):
"""Remove a node from the cluster."""
async def distribute_load(self, request: Request) -> NodeInfo:
"""Distribute requests across cluster nodes."""
Database Scaling
Support for database scaling strategies:
class DatabaseCluster:
"""Manages database cluster for high availability."""
def __init__(self, config: DatabaseClusterConfig):
self.primary = DatabaseConnection(config.primary)
self.replicas = [DatabaseConnection(replica) for replica in config.replicas]
self.connection_pool = ConnectionPool()
async def read_query(self, query: str) -> QueryResult:
"""Execute read query on replica."""
replica = self.select_replica()
return await replica.execute(query)
async def write_query(self, query: str) -> QueryResult:
"""Execute write query on primary."""
return await self.primary.execute(query)
Caching Strategy
Multi-level caching for performance:
class CacheManager:
"""Manages multi-level caching strategy."""
def __init__(self, config: CacheConfig):
self.l1_cache = MemoryCache(config.memory) # In-memory cache
self.l2_cache = RedisCache(config.redis) # Distributed cache
self.l3_cache = DatabaseCache(config.db) # Persistent cache
async def get(self, key: str) -> Optional[Any]:
"""Get value from cache hierarchy."""
# Try L1 cache first
value = await self.l1_cache.get(key)
if value is not None:
return value
# Try L2 cache
value = await self.l2_cache.get(key)
if value is not None:
await self.l1_cache.set(key, value)
return value
# Try L3 cache
value = await self.l3_cache.get(key)
if value is not None:
await self.l2_cache.set(key, value)
await self.l1_cache.set(key, value)
return value
return None
Security Architecture
Authentication Flow
sequenceDiagram
participant Client
participant Gateway
participant Auth
participant MockServer
participant Database
Client->>Gateway: Request with Credentials
Gateway->>Auth: Validate Credentials
Auth->>Database: Check User/API Key
Database->>Auth: User Info
Auth->>Gateway: Auth Result
Gateway->>MockServer: Authorized Request
MockServer->>Client: Response
Authorization Model
class AuthorizationManager:
"""Manages authorization policies and permissions."""
def __init__(self, config: AuthConfig):
self.policies = PolicyEngine(config.policies)
self.roles = RoleManager(config.roles)
async def authorize(self, user: User, resource: Resource, action: str) -> bool:
"""Check if user is authorized to perform action on resource."""
# Check user roles
user_roles = await self.roles.get_user_roles(user.id)
# Evaluate policies
for role in user_roles:
if await self.policies.evaluate(role, resource, action):
return True
return False
class SecurityMiddleware:
"""Security middleware for request validation."""
async def __call__(self, request: Request, call_next):
# Validate request headers
await self.validate_headers(request)
# Check rate limits
await self.check_rate_limits(request)
# Validate input
await self.validate_input(request)
response = await call_next(request)
# Add security headers
self.add_security_headers(response)
return response
Performance Optimization
Asynchronous Processing
MockLoop MCP uses async/await throughout for optimal performance:
class AsyncRequestProcessor:
"""Asynchronous request processor for high throughput."""
def __init__(self, config: ProcessorConfig):
self.semaphore = asyncio.Semaphore(config.max_concurrent_requests)
self.request_queue = asyncio.Queue(maxsize=config.queue_size)
async def process_request(self, request: Request) -> Response:
"""Process request asynchronously."""
async with self.semaphore:
# Process request
response = await self.handle_request(request)
# Log asynchronously
asyncio.create_task(self.log_request(request, response))
# Send webhooks asynchronously
asyncio.create_task(self.send_webhooks(request, response))
return response
Connection Pooling
Efficient database and HTTP connection management:
class ConnectionManager:
"""Manages database and HTTP connection pools."""
def __init__(self, config: ConnectionConfig):
self.db_pool = DatabasePool(
min_connections=config.db_min_connections,
max_connections=config.db_max_connections,
timeout=config.db_timeout
)
self.http_pool = HTTPPool(
max_connections=config.http_max_connections,
max_keepalive_connections=config.http_keepalive_connections,
keepalive_expiry=config.http_keepalive_expiry
)
Monitoring and Observability
Metrics Collection
class MetricsCollector:
"""Collects system and application metrics."""
def __init__(self, config: MetricsConfig):
self.prometheus_registry = CollectorRegistry()
self.request_counter = Counter('requests_total', 'Total requests')
self.response_time_histogram = Histogram('response_time_seconds', 'Response time')
self.error_counter = Counter('errors_total', 'Total errors')
async def record_request(self, request: Request, response: Response, duration: float):
"""Record request metrics."""
self.request_counter.inc()
self.response_time_histogram.observe(duration)
if response.status_code >= 400:
self.error_counter.inc()
Health Checks
class HealthChecker:
"""Performs health checks on system components."""
def __init__(self, config: HealthConfig):
self.checks = [
DatabaseHealthCheck(config.database),
CacheHealthCheck(config.cache),
StorageHealthCheck(config.storage),
ExternalServiceHealthCheck(config.external_services)
]
async def check_health(self) -> HealthStatus:
"""Perform all health checks."""
results = await asyncio.gather(
*[check.perform() for check in self.checks],
return_exceptions=True
)
return HealthStatus.from_results(results)
Deployment Architecture
Container Architecture
# Multi-stage build for optimal image size
FROM python:3.11-slim as builder
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
FROM python:3.11-slim as runtime
WORKDIR /app
COPY --from=builder /usr/local/lib/python3.11/site-packages /usr/local/lib/python3.11/site-packages
COPY src/ ./src/
COPY templates/ ./templates/
EXPOSE 8000
CMD ["python", "-m", "mockloop_mcp.main"]
Kubernetes Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: mockloop-mcp
spec:
replicas: 3
selector:
matchLabels:
app: mockloop-mcp
template:
metadata:
labels:
app: mockloop-mcp
spec:
containers:
- name: mockloop-mcp
image: mockloop/mcp:latest
ports:
- containerPort: 8000
env:
- name: MOCKLOOP_DATABASE_TYPE
value: "postgresql"
- name: MOCKLOOP_DATABASE_HOST
value: "postgres-service"
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
Extension Points
Plugin Architecture
class PluginManager:
"""Manages system plugins and extensions."""
def __init__(self, config: PluginConfig):
self.plugins: Dict[str, Plugin] = {}
self.hooks: Dict[str, List[HookFunction]] = {}
async def load_plugin(self, plugin_path: str):
"""Load a plugin from file."""
async def execute_hook(self, hook_name: str, context: dict) -> dict:
"""Execute all functions registered for a hook."""
def register_hook(self, hook_name: str, function: HookFunction):
"""Register a function for a specific hook."""
class Plugin(ABC):
"""Abstract base class for plugins."""
@abstractmethod
def initialize(self, context: PluginContext):
"""Initialize the plugin."""
@abstractmethod
def register_hooks(self, hook_registry: HookRegistry):
"""Register plugin hooks."""
See Also
- Database Schema: Database structure and relationships
- Configuration Options: System configuration reference
- Performance Optimization: Performance tuning guide
- Security Considerations: Security architecture details