Testing Guide
This comprehensive guide covers testing practices, strategies, and tools for MockLoop MCP. Our testing approach ensures reliability, maintainability, and confidence in the codebase through multiple layers of testing.
Overview
MockLoop MCP uses a multi-layered testing strategy:
- Unit Tests: Test individual functions and classes in isolation
- Integration Tests: Test component interactions and workflows
- End-to-End Tests: Test complete user scenarios
- Performance Tests: Validate performance characteristics
- Security Tests: Verify security controls and data protection
- Contract Tests: Ensure API compatibility
Testing Framework and Tools
Core Testing Stack
# pytest - Primary testing framework
import pytest
import pytest_asyncio
import pytest_mock
# Testing utilities
from unittest.mock import Mock, patch, AsyncMock
from faker import Faker
from factory_boy import Factory, Sequence
# HTTP testing
import httpx
from fastapi.testclient import TestClient
# Database testing
import pytest_postgresql
from sqlalchemy import create_engine
from alembic import command
from alembic.config import Config
# Performance testing
import pytest_benchmark
import locust
# Security testing
import bandit
import safety
Test Configuration
# conftest.py - Global test configuration
import pytest
import asyncio
from pathlib import Path
from typing import AsyncGenerator, Generator
from mockloop_mcp.core import create_app
from mockloop_mcp.database import Database, get_database
from mockloop_mcp.config import Settings, get_settings
# Test settings
@pytest.fixture(scope="session")
def test_settings() -> Settings:
"""Provide test-specific settings."""
return Settings(
database_url="sqlite:///test.db",
environment="test",
log_level="DEBUG",
auth_enabled=False,
storage_enabled=True,
webhook_enabled=False
)
# Database fixtures
@pytest.fixture(scope="session")
async def database(test_settings: Settings) -> AsyncGenerator[Database, None]:
"""Provide test database connection."""
db = Database(test_settings.database_url)
await db.connect()
# Run migrations
alembic_cfg = Config("alembic.ini")
alembic_cfg.set_main_option("sqlalchemy.url", test_settings.database_url)
command.upgrade(alembic_cfg, "head")
yield db
await db.disconnect()
@pytest.fixture
async def clean_database(database: Database) -> AsyncGenerator[Database, None]:
"""Provide clean database for each test."""
# Start transaction
async with database.transaction():
yield database
# Rollback transaction after test
# Application fixtures
@pytest.fixture
def app(test_settings: Settings, clean_database: Database):
"""Provide FastAPI test application."""
app = create_app(test_settings)
# Override dependencies
app.dependency_overrides[get_settings] = lambda: test_settings
app.dependency_overrides[get_database] = lambda: clean_database
return app
@pytest.fixture
def client(app) -> TestClient:
"""Provide HTTP test client."""
return TestClient(app)
@pytest.fixture
async def async_client(app) -> AsyncGenerator[httpx.AsyncClient, None]:
"""Provide async HTTP test client."""
async with httpx.AsyncClient(app=app, base_url="http://test") as client:
yield client
Unit Testing
Testing Core Classes
# test_mock_generator.py
import pytest
from unittest.mock import Mock, patch
from pathlib import Path
from mockloop_mcp.generator import MockServerGenerator
from mockloop_mcp.models import GenerationOptions, GenerationResult
from mockloop_mcp.exceptions import SpecificationError, GenerationError
class TestMockServerGenerator:
"""Test mock server generation functionality."""
@pytest.fixture
def generator(self):
"""Provide generator instance."""
return MockServerGenerator()
@pytest.fixture
def valid_openapi_spec(self, tmp_path):
"""Provide valid OpenAPI specification."""
spec_content = {
"openapi": "3.0.0",
"info": {"title": "Test API", "version": "1.0.0"},
"paths": {
"/users": {
"get": {
"responses": {
"200": {
"description": "Success",
"content": {
"application/json": {
"schema": {
"type": "array",
"items": {"$ref": "#/components/schemas/User"}
}
}
}
}
}
}
}
},
"components": {
"schemas": {
"User": {
"type": "object",
"properties": {
"id": {"type": "integer"},
"name": {"type": "string"},
"email": {"type": "string", "format": "email"}
}
}
}
}
}
spec_file = tmp_path / "openapi.json"
spec_file.write_text(json.dumps(spec_content))
return str(spec_file)
def test_generate_with_valid_spec(self, generator, valid_openapi_spec, tmp_path):
"""Test successful generation with valid specification."""
options = GenerationOptions(
output_dir=str(tmp_path / "output"),
auth_enabled=True,
storage_enabled=True
)
result = generator.generate(valid_openapi_spec, options)
assert isinstance(result, GenerationResult)
assert result.success is True
assert result.server_info is not None
assert result.server_info.name == "test-api"
assert len(result.generated_files) > 0
# Verify generated files exist
output_dir = Path(options.output_dir)
assert (output_dir / "main.py").exists()
assert (output_dir / "requirements.txt").exists()
assert (output_dir / "Dockerfile").exists()
def test_generate_with_invalid_spec_raises_error(self, generator, tmp_path):
"""Test that invalid specification raises SpecificationError."""
invalid_spec = tmp_path / "invalid.json"
invalid_spec.write_text('{"invalid": "spec"}')
options = GenerationOptions(output_dir=str(tmp_path / "output"))
with pytest.raises(SpecificationError) as exc_info:
generator.generate(str(invalid_spec), options)
assert "Invalid OpenAPI specification" in str(exc_info.value)
assert exc_info.value.spec_path == str(invalid_spec)
def test_generate_with_nonexistent_spec_raises_error(self, generator, tmp_path):
"""Test that nonexistent specification file raises error."""
nonexistent_spec = str(tmp_path / "nonexistent.json")
options = GenerationOptions(output_dir=str(tmp_path / "output"))
with pytest.raises(SpecificationError):
generator.generate(nonexistent_spec, options)
@patch('mockloop_mcp.generator.template_engine')
def test_generate_with_template_error_raises_generation_error(
self, mock_template_engine, generator, valid_openapi_spec, tmp_path
):
"""Test that template errors raise GenerationError."""
mock_template_engine.render.side_effect = Exception("Template error")
options = GenerationOptions(output_dir=str(tmp_path / "output"))
with pytest.raises(GenerationError) as exc_info:
generator.generate(valid_openapi_spec, options)
assert exc_info.value.stage == "template_rendering"
def test_validate_specification_with_valid_spec(self, generator, valid_openapi_spec):
"""Test specification validation with valid spec."""
result = generator.validate_specification(valid_openapi_spec)
assert result.is_valid is True
assert len(result.errors) == 0
assert result.spec_type == "openapi"
assert result.version == "3.0.0"
def test_validate_specification_with_invalid_spec(self, generator, tmp_path):
"""Test specification validation with invalid spec."""
invalid_spec = tmp_path / "invalid.json"
invalid_spec.write_text('{"openapi": "3.0.0"}') # Missing required fields
result = generator.validate_specification(str(invalid_spec))
assert result.is_valid is False
assert len(result.errors) > 0
assert any("info" in error for error in result.errors)
Testing Database Operations
# test_database.py
import pytest
from datetime import datetime, timedelta
from mockloop_mcp.database import Database
from mockloop_mcp.models import RequestLog, ServerConfig
class TestDatabase:
"""Test database operations."""
@pytest.mark.asyncio
async def test_insert_request_log(self, clean_database: Database):
"""Test inserting request log."""
log_data = {
"method": "GET",
"path": "/users",
"query_params": {"limit": "10"},
"headers": {"Accept": "application/json"},
"body": None,
"status_code": 200,
"response_body": {"users": []},
"response_time": 0.123,
"timestamp": datetime.utcnow()
}
log_id = await clean_database.insert_request_log(log_data)
assert log_id is not None
# Verify log was inserted
log = await clean_database.get_request_log(log_id)
assert log is not None
assert log.method == "GET"
assert log.path == "/users"
assert log.status_code == 200
@pytest.mark.asyncio
async def test_query_request_logs_with_filters(self, clean_database: Database):
"""Test querying request logs with filters."""
# Insert test data
base_time = datetime.utcnow()
logs = [
{
"method": "GET",
"path": "/users",
"status_code": 200,
"timestamp": base_time
},
{
"method": "POST",
"path": "/users",
"status_code": 201,
"timestamp": base_time + timedelta(minutes=1)
},
{
"method": "GET",
"path": "/posts",
"status_code": 200,
"timestamp": base_time + timedelta(minutes=2)
}
]
for log_data in logs:
await clean_database.insert_request_log(log_data)
# Test method filter
get_logs = await clean_database.query_request_logs(method="GET")
assert len(get_logs) == 2
assert all(log.method == "GET" for log in get_logs)
# Test path filter
user_logs = await clean_database.query_request_logs(path_pattern="/users")
assert len(user_logs) == 2
assert all("/users" in log.path for log in user_logs)
# Test time range filter
recent_logs = await clean_database.query_request_logs(
time_from=base_time + timedelta(minutes=0.5),
time_to=base_time + timedelta(minutes=1.5)
)
assert len(recent_logs) == 1
assert recent_logs[0].method == "POST"
@pytest.mark.asyncio
async def test_transaction_rollback(self, clean_database: Database):
"""Test transaction rollback on error."""
initial_count = await clean_database.count_request_logs()
try:
async with clean_database.transaction():
# Insert valid log
await clean_database.insert_request_log({
"method": "GET",
"path": "/test",
"status_code": 200,
"timestamp": datetime.utcnow()
})
# Raise error to trigger rollback
raise ValueError("Test error")
except ValueError:
pass
# Verify rollback occurred
final_count = await clean_database.count_request_logs()
assert final_count == initial_count
Testing API Endpoints
# test_api.py
import pytest
from fastapi.testclient import TestClient
class TestMockServerAPI:
"""Test mock server API endpoints."""
def test_health_check(self, client: TestClient):
"""Test health check endpoint."""
response = client.get("/health")
assert response.status_code == 200
assert response.json() == {
"status": "healthy",
"timestamp": pytest.approx(datetime.utcnow().timestamp(), abs=1)
}
def test_get_server_info(self, client: TestClient):
"""Test server info endpoint."""
response = client.get("/admin/info")
assert response.status_code == 200
data = response.json()
assert "name" in data
assert "version" in data
assert "uptime" in data
assert "request_count" in data
def test_list_request_logs(self, client: TestClient):
"""Test listing request logs."""
# Make some requests to generate logs
client.get("/users")
client.post("/users", json={"name": "Test User"})
response = client.get("/admin/logs")
assert response.status_code == 200
data = response.json()
assert "logs" in data
assert "total" in data
assert len(data["logs"]) >= 2
def test_list_request_logs_with_filters(self, client: TestClient):
"""Test listing request logs with filters."""
# Generate test requests
client.get("/users")
client.post("/users", json={"name": "Test"})
client.get("/posts")
# Filter by method
response = client.get("/admin/logs?method=GET")
assert response.status_code == 200
data = response.json()
assert all(log["method"] == "GET" for log in data["logs"])
# Filter by path
response = client.get("/admin/logs?path_pattern=/users")
assert response.status_code == 200
data = response.json()
assert all("/users" in log["path"] for log in data["logs"])
def test_update_response_data(self, client: TestClient):
"""Test updating response data."""
new_data = {"users": [{"id": 1, "name": "Updated User"}]}
response = client.put(
"/admin/responses/users",
json=new_data
)
assert response.status_code == 200
# Verify updated data is returned
response = client.get("/users")
assert response.status_code == 200
assert response.json() == new_data
def test_create_scenario(self, client: TestClient):
"""Test creating a new scenario."""
scenario_config = {
"name": "test_scenario",
"description": "Test scenario",
"responses": {
"/users": {"users": [{"id": 1, "name": "Test User"}]},
"/posts": {"posts": []}
}
}
response = client.post("/admin/scenarios", json=scenario_config)
assert response.status_code == 201
data = response.json()
assert data["name"] == "test_scenario"
assert "id" in data
def test_switch_scenario(self, client: TestClient):
"""Test switching to a different scenario."""
# Create scenario first
scenario_config = {
"name": "test_scenario",
"responses": {
"/users": {"users": [{"id": 999, "name": "Scenario User"}]}
}
}
create_response = client.post("/admin/scenarios", json=scenario_config)
scenario_id = create_response.json()["id"]
# Switch to scenario
response = client.post(f"/admin/scenarios/{scenario_id}/activate")
assert response.status_code == 200
# Verify scenario is active
response = client.get("/users")
assert response.status_code == 200
data = response.json()
assert data["users"][0]["id"] == 999
Integration Testing
Testing Component Interactions
# test_integration.py
import pytest
import asyncio
from pathlib import Path
from mockloop_mcp.generator import MockServerGenerator
from mockloop_mcp.server import MockServer
from mockloop_mcp.database import Database
class TestMockServerIntegration:
"""Test integration between components."""
@pytest.mark.asyncio
async def test_full_generation_and_startup_workflow(self, tmp_path, test_settings):
"""Test complete workflow from generation to server startup."""
# 1. Generate mock server
generator = MockServerGenerator()
spec_path = self.create_test_spec(tmp_path)
output_dir = tmp_path / "generated_server"
options = GenerationOptions(
output_dir=str(output_dir),
auth_enabled=True,
storage_enabled=True
)
result = generator.generate(spec_path, options)
assert result.success is True
# 2. Start generated server
server = MockServer.from_directory(output_dir)
await server.start()
try:
# 3. Test server functionality
async with httpx.AsyncClient(base_url=server.base_url) as client:
# Test health endpoint
response = await client.get("/health")
assert response.status_code == 200
# Test generated API endpoint
response = await client.get("/users")
assert response.status_code == 200
# Test admin endpoints
response = await client.get("/admin/info")
assert response.status_code == 200
# Test request logging
response = await client.get("/admin/logs")
assert response.status_code == 200
logs = response.json()["logs"]
assert len(logs) >= 3 # health, users, admin/info
finally:
await server.stop()
@pytest.mark.asyncio
async def test_database_integration_with_server(self, tmp_path, test_settings):
"""Test database integration with running server."""
# Setup server with database
server = MockServer(
config=ServerConfig(
name="test_server",
port=8080,
storage_enabled=True,
database_url="sqlite:///test_integration.db"
)
)
await server.start()
try:
# Make requests to generate logs
async with httpx.AsyncClient(base_url=server.base_url) as client:
await client.get("/users")
await client.post("/users", json={"name": "Test User"})
await client.get("/users/1")
# Verify logs in database
db = server.database
logs = await db.query_request_logs(limit=10)
assert len(logs) == 3
assert logs[0].method in ["GET", "POST"]
assert all(log.path.startswith("/users") for log in logs)
# Test log filtering
get_logs = await db.query_request_logs(method="GET")
assert len(get_logs) == 2
post_logs = await db.query_request_logs(method="POST")
assert len(post_logs) == 1
finally:
await server.stop()
def create_test_spec(self, tmp_path: Path) -> str:
"""Create test OpenAPI specification."""
spec = {
"openapi": "3.0.0",
"info": {"title": "Test API", "version": "1.0.0"},
"paths": {
"/users": {
"get": {
"responses": {
"200": {
"description": "List users",
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {
"users": {
"type": "array",
"items": {"$ref": "#/components/schemas/User"}
}
}
}
}
}
}
}
},
"post": {
"requestBody": {
"content": {
"application/json": {
"schema": {"$ref": "#/components/schemas/User"}
}
}
},
"responses": {
"201": {
"description": "User created",
"content": {
"application/json": {
"schema": {"$ref": "#/components/schemas/User"}
}
}
}
}
}
},
"/users/{id}": {
"get": {
"parameters": [
{
"name": "id",
"in": "path",
"required": True,
"schema": {"type": "integer"}
}
],
"responses": {
"200": {
"description": "Get user",
"content": {
"application/json": {
"schema": {"$ref": "#/components/schemas/User"}
}
}
}
}
}
}
},
"components": {
"schemas": {
"User": {
"type": "object",
"properties": {
"id": {"type": "integer"},
"name": {"type": "string"},
"email": {"type": "string", "format": "email"}
}
}
}
}
}
spec_file = tmp_path / "test_spec.json"
spec_file.write_text(json.dumps(spec))
return str(spec_file)
End-to-End Testing
User Workflow Testing
# test_e2e.py
import pytest
import subprocess
import time
import requests
from pathlib import Path
class TestEndToEndWorkflows:
"""Test complete user workflows."""
@pytest.mark.e2e
def test_cli_generate_and_run_workflow(self, tmp_path):
"""Test CLI workflow: generate server and run it."""
spec_file = self.create_openapi_spec(tmp_path)
output_dir = tmp_path / "generated_server"
# 1. Generate server using CLI
result = subprocess.run([
"python", "-m", "mockloop_mcp.cli",
"generate",
"--spec", str(spec_file),
"--output", str(output_dir),
"--auth-enabled",
"--storage-enabled"
], capture_output=True, text=True)
assert result.returncode == 0
assert "Server generated successfully" in result.stdout
# 2. Verify generated files
assert (output_dir / "main.py").exists()
assert (output_dir / "requirements.txt").exists()
assert (output_dir / "Dockerfile").exists()
assert (output_dir / "docker-compose.yml").exists()
# 3. Start server using Docker Compose
compose_process = subprocess.Popen([
"docker-compose", "up", "-d"
], cwd=output_dir)
try:
# Wait for server to start
time.sleep(10)
# 4. Test server endpoints
base_url = "http://localhost:8080"
# Health check
response = requests.get(f"{base_url}/health")
assert response.status_code == 200
# API endpoints
response = requests.get(f"{base_url}/users")
assert response.status_code == 200
# Admin endpoints
response = requests.get(f"{base_url}/admin/info")
assert response.status_code == 200
# Test request logging
response = requests.get(f"{base_url}/admin/logs")
assert response.status_code == 200
logs = response.json()["logs"]
assert len(logs) >= 3
finally:
# 5. Stop server
subprocess.run([
"docker-compose", "down"
], cwd=output_dir)
@pytest.mark.e2e
def test_mcp_tool_integration(self, tmp_path):
"""Test MCP tool integration workflow."""
# This would test the actual MCP tool usage
# in a real MCP client environment
pass
def create_openapi_spec(self, tmp_path: Path) -> Path:
"""Create test OpenAPI specification file."""
spec = {
"openapi": "3.0.0",
"info": {
"title": "E2E Test API",
"version": "1.0.0",
"description": "API for end-to-end testing"
},
"servers": [
{"url": "http://localhost:8080", "description": "Local server"}
],
"paths": {
"/users": {
"get": {
"summary": "List users",
"responses": {
"200": {
"description": "Success",
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {
"users": {
"type": "array",
"items": {"$ref": "#/components/schemas/User"}
}
}
}
}
}
}
}
}
}
},
"components": {
"schemas": {
"User": {
"type": "object",
"properties": {
"id": {"type": "integer"},
"name": {"type": "string"},
"email": {"type": "string", "format": "email"}
}
}
}
}
}
spec_file = tmp_path / "e2e_spec.json"
spec_file.write_text(json.dumps(spec, indent=2))
return spec_file
Performance Testing
Load Testing
# test_performance.py
import pytest
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
import httpx
class TestPerformance:
"""Test performance characteristics."""
@pytest.mark.performance
@pytest.mark.asyncio
async def test_concurrent_request_handling(self, running_server):
"""Test server performance under concurrent load."""
base_url = running_server.base_url
num_requests = 100
concurrent_clients = 10
async def make_requests(client_id: int) -> list:
"""Make requests from a single client."""
results = []
async with httpx.AsyncClient(base_url=base_url) as client:
for i in range(num_requests // concurrent_clients):
start_time = time.time()
response = await client.get("/users")
end_time = time.time()
results.append({
"client_id": client_id,
"request_id": i,
"status_code": response.status_code,
"response_time": end_time - start_time
})
return results
# Run concurrent clients
start_time = time.time()
tasks = [
make_requests(client_id)
for client_id in range(concurrent_clients)
]
results = await asyncio.gather(*tasks)
end_time = time.time()
# Analyze results
all_results = [result for client_results in results for result in client_results]
# Verify all requests succeeded
assert all(r["status_code"] == 200 for r in all_results)
assert len(all_results) == num_requests
# Performance assertions
total_time = end_time - start_time
requests_per_second = num_requests / total_time
assert requests_per_second > 50 # Minimum throughput
response_times = [r["response_time"] for r in all_results]
avg_response_time = sum(response_times) / len(response_times)
max_response_time = max(response_times)
assert avg_response_time < 0.1 # Average response time under 100ms
assert max_response_time < 1.0 # Max response time under 1s
@pytest.mark.performance
def test_memory_usage_under_load(self, running_server):
"""Test memory usage during sustained load."""
import psutil
import gc
process = psutil.Process()
initial_memory = process.memory_info().rss
# Generate sustained load
with ThreadPoolExecutor(max_workers=5) as executor:
futures = []
for _ in range(1000):
future = executor.submit(
requests.get,
f"{running_server.base_url}/users"
)
futures.append(future)
# Wait for completion
for future in futures:
response = future.result()
assert response.status_code == 200
# Force garbage collection
gc.collect()
final_memory = process.memory_info().rss
memory_increase = final_memory - initial_memory
# Memory increase should be reasonable (less than 100MB)
assert memory_increase < 100 * 1024 * 1024
@pytest.mark.performance
@pytest.mark.benchmark
def test_request_processing_benchmark(self, benchmark, client):
"""Benchmark request processing performance."""
def process_request():
response = client.get("/users")
assert response.status_code == 200
return response
result = benchmark(process_request)
# Benchmark assertions
assert benchmark.stats.mean < 0.01 # Mean time under 10ms
assert benchmark.stats.max < 0.1 # Max time under 100ms