Skip to content

Testing Guide

This comprehensive guide covers testing practices, strategies, and tools for MockLoop MCP. Our testing approach ensures reliability, maintainability, and confidence in the codebase through multiple layers of testing.

Overview

MockLoop MCP uses a multi-layered testing strategy:

  • Unit Tests: Test individual functions and classes in isolation
  • Integration Tests: Test component interactions and workflows
  • End-to-End Tests: Test complete user scenarios
  • Performance Tests: Validate performance characteristics
  • Security Tests: Verify security controls and data protection
  • Contract Tests: Ensure API compatibility

Testing Framework and Tools

Core Testing Stack

# pytest - Primary testing framework
import pytest
import pytest_asyncio
import pytest_mock

# Testing utilities
from unittest.mock import Mock, patch, AsyncMock
from faker import Faker
from factory_boy import Factory, Sequence

# HTTP testing
import httpx
from fastapi.testclient import TestClient

# Database testing
import pytest_postgresql
from sqlalchemy import create_engine
from alembic import command
from alembic.config import Config

# Performance testing
import pytest_benchmark
import locust

# Security testing
import bandit
import safety

Test Configuration

# conftest.py - Global test configuration
import pytest
import asyncio
from pathlib import Path
from typing import AsyncGenerator, Generator

from mockloop_mcp.core import create_app
from mockloop_mcp.database import Database, get_database
from mockloop_mcp.config import Settings, get_settings

# Test settings
@pytest.fixture(scope="session")
def test_settings() -> Settings:
    """Provide test-specific settings."""
    return Settings(
        database_url="sqlite:///test.db",
        environment="test",
        log_level="DEBUG",
        auth_enabled=False,
        storage_enabled=True,
        webhook_enabled=False
    )

# Database fixtures
@pytest.fixture(scope="session")
async def database(test_settings: Settings) -> AsyncGenerator[Database, None]:
    """Provide test database connection."""
    db = Database(test_settings.database_url)
    await db.connect()

    # Run migrations
    alembic_cfg = Config("alembic.ini")
    alembic_cfg.set_main_option("sqlalchemy.url", test_settings.database_url)
    command.upgrade(alembic_cfg, "head")

    yield db

    await db.disconnect()

@pytest.fixture
async def clean_database(database: Database) -> AsyncGenerator[Database, None]:
    """Provide clean database for each test."""
    # Start transaction
    async with database.transaction():
        yield database
        # Rollback transaction after test

# Application fixtures
@pytest.fixture
def app(test_settings: Settings, clean_database: Database):
    """Provide FastAPI test application."""
    app = create_app(test_settings)

    # Override dependencies
    app.dependency_overrides[get_settings] = lambda: test_settings
    app.dependency_overrides[get_database] = lambda: clean_database

    return app

@pytest.fixture
def client(app) -> TestClient:
    """Provide HTTP test client."""
    return TestClient(app)

@pytest.fixture
async def async_client(app) -> AsyncGenerator[httpx.AsyncClient, None]:
    """Provide async HTTP test client."""
    async with httpx.AsyncClient(app=app, base_url="http://test") as client:
        yield client

Unit Testing

Testing Core Classes

# test_mock_generator.py
import pytest
from unittest.mock import Mock, patch
from pathlib import Path

from mockloop_mcp.generator import MockServerGenerator
from mockloop_mcp.models import GenerationOptions, GenerationResult
from mockloop_mcp.exceptions import SpecificationError, GenerationError

class TestMockServerGenerator:
    """Test mock server generation functionality."""

    @pytest.fixture
    def generator(self):
        """Provide generator instance."""
        return MockServerGenerator()

    @pytest.fixture
    def valid_openapi_spec(self, tmp_path):
        """Provide valid OpenAPI specification."""
        spec_content = {
            "openapi": "3.0.0",
            "info": {"title": "Test API", "version": "1.0.0"},
            "paths": {
                "/users": {
                    "get": {
                        "responses": {
                            "200": {
                                "description": "Success",
                                "content": {
                                    "application/json": {
                                        "schema": {
                                            "type": "array",
                                            "items": {"$ref": "#/components/schemas/User"}
                                        }
                                    }
                                }
                            }
                        }
                    }
                }
            },
            "components": {
                "schemas": {
                    "User": {
                        "type": "object",
                        "properties": {
                            "id": {"type": "integer"},
                            "name": {"type": "string"},
                            "email": {"type": "string", "format": "email"}
                        }
                    }
                }
            }
        }

        spec_file = tmp_path / "openapi.json"
        spec_file.write_text(json.dumps(spec_content))
        return str(spec_file)

    def test_generate_with_valid_spec(self, generator, valid_openapi_spec, tmp_path):
        """Test successful generation with valid specification."""
        options = GenerationOptions(
            output_dir=str(tmp_path / "output"),
            auth_enabled=True,
            storage_enabled=True
        )

        result = generator.generate(valid_openapi_spec, options)

        assert isinstance(result, GenerationResult)
        assert result.success is True
        assert result.server_info is not None
        assert result.server_info.name == "test-api"
        assert len(result.generated_files) > 0

        # Verify generated files exist
        output_dir = Path(options.output_dir)
        assert (output_dir / "main.py").exists()
        assert (output_dir / "requirements.txt").exists()
        assert (output_dir / "Dockerfile").exists()

    def test_generate_with_invalid_spec_raises_error(self, generator, tmp_path):
        """Test that invalid specification raises SpecificationError."""
        invalid_spec = tmp_path / "invalid.json"
        invalid_spec.write_text('{"invalid": "spec"}')

        options = GenerationOptions(output_dir=str(tmp_path / "output"))

        with pytest.raises(SpecificationError) as exc_info:
            generator.generate(str(invalid_spec), options)

        assert "Invalid OpenAPI specification" in str(exc_info.value)
        assert exc_info.value.spec_path == str(invalid_spec)

    def test_generate_with_nonexistent_spec_raises_error(self, generator, tmp_path):
        """Test that nonexistent specification file raises error."""
        nonexistent_spec = str(tmp_path / "nonexistent.json")
        options = GenerationOptions(output_dir=str(tmp_path / "output"))

        with pytest.raises(SpecificationError):
            generator.generate(nonexistent_spec, options)

    @patch('mockloop_mcp.generator.template_engine')
    def test_generate_with_template_error_raises_generation_error(
        self, mock_template_engine, generator, valid_openapi_spec, tmp_path
    ):
        """Test that template errors raise GenerationError."""
        mock_template_engine.render.side_effect = Exception("Template error")

        options = GenerationOptions(output_dir=str(tmp_path / "output"))

        with pytest.raises(GenerationError) as exc_info:
            generator.generate(valid_openapi_spec, options)

        assert exc_info.value.stage == "template_rendering"

    def test_validate_specification_with_valid_spec(self, generator, valid_openapi_spec):
        """Test specification validation with valid spec."""
        result = generator.validate_specification(valid_openapi_spec)

        assert result.is_valid is True
        assert len(result.errors) == 0
        assert result.spec_type == "openapi"
        assert result.version == "3.0.0"

    def test_validate_specification_with_invalid_spec(self, generator, tmp_path):
        """Test specification validation with invalid spec."""
        invalid_spec = tmp_path / "invalid.json"
        invalid_spec.write_text('{"openapi": "3.0.0"}')  # Missing required fields

        result = generator.validate_specification(str(invalid_spec))

        assert result.is_valid is False
        assert len(result.errors) > 0
        assert any("info" in error for error in result.errors)

Testing Database Operations

# test_database.py
import pytest
from datetime import datetime, timedelta

from mockloop_mcp.database import Database
from mockloop_mcp.models import RequestLog, ServerConfig

class TestDatabase:
    """Test database operations."""

    @pytest.mark.asyncio
    async def test_insert_request_log(self, clean_database: Database):
        """Test inserting request log."""
        log_data = {
            "method": "GET",
            "path": "/users",
            "query_params": {"limit": "10"},
            "headers": {"Accept": "application/json"},
            "body": None,
            "status_code": 200,
            "response_body": {"users": []},
            "response_time": 0.123,
            "timestamp": datetime.utcnow()
        }

        log_id = await clean_database.insert_request_log(log_data)

        assert log_id is not None

        # Verify log was inserted
        log = await clean_database.get_request_log(log_id)
        assert log is not None
        assert log.method == "GET"
        assert log.path == "/users"
        assert log.status_code == 200

    @pytest.mark.asyncio
    async def test_query_request_logs_with_filters(self, clean_database: Database):
        """Test querying request logs with filters."""
        # Insert test data
        base_time = datetime.utcnow()

        logs = [
            {
                "method": "GET",
                "path": "/users",
                "status_code": 200,
                "timestamp": base_time
            },
            {
                "method": "POST",
                "path": "/users",
                "status_code": 201,
                "timestamp": base_time + timedelta(minutes=1)
            },
            {
                "method": "GET",
                "path": "/posts",
                "status_code": 200,
                "timestamp": base_time + timedelta(minutes=2)
            }
        ]

        for log_data in logs:
            await clean_database.insert_request_log(log_data)

        # Test method filter
        get_logs = await clean_database.query_request_logs(method="GET")
        assert len(get_logs) == 2
        assert all(log.method == "GET" for log in get_logs)

        # Test path filter
        user_logs = await clean_database.query_request_logs(path_pattern="/users")
        assert len(user_logs) == 2
        assert all("/users" in log.path for log in user_logs)

        # Test time range filter
        recent_logs = await clean_database.query_request_logs(
            time_from=base_time + timedelta(minutes=0.5),
            time_to=base_time + timedelta(minutes=1.5)
        )
        assert len(recent_logs) == 1
        assert recent_logs[0].method == "POST"

    @pytest.mark.asyncio
    async def test_transaction_rollback(self, clean_database: Database):
        """Test transaction rollback on error."""
        initial_count = await clean_database.count_request_logs()

        try:
            async with clean_database.transaction():
                # Insert valid log
                await clean_database.insert_request_log({
                    "method": "GET",
                    "path": "/test",
                    "status_code": 200,
                    "timestamp": datetime.utcnow()
                })

                # Raise error to trigger rollback
                raise ValueError("Test error")

        except ValueError:
            pass

        # Verify rollback occurred
        final_count = await clean_database.count_request_logs()
        assert final_count == initial_count

Testing API Endpoints

# test_api.py
import pytest
from fastapi.testclient import TestClient

class TestMockServerAPI:
    """Test mock server API endpoints."""

    def test_health_check(self, client: TestClient):
        """Test health check endpoint."""
        response = client.get("/health")

        assert response.status_code == 200
        assert response.json() == {
            "status": "healthy",
            "timestamp": pytest.approx(datetime.utcnow().timestamp(), abs=1)
        }

    def test_get_server_info(self, client: TestClient):
        """Test server info endpoint."""
        response = client.get("/admin/info")

        assert response.status_code == 200
        data = response.json()
        assert "name" in data
        assert "version" in data
        assert "uptime" in data
        assert "request_count" in data

    def test_list_request_logs(self, client: TestClient):
        """Test listing request logs."""
        # Make some requests to generate logs
        client.get("/users")
        client.post("/users", json={"name": "Test User"})

        response = client.get("/admin/logs")

        assert response.status_code == 200
        data = response.json()
        assert "logs" in data
        assert "total" in data
        assert len(data["logs"]) >= 2

    def test_list_request_logs_with_filters(self, client: TestClient):
        """Test listing request logs with filters."""
        # Generate test requests
        client.get("/users")
        client.post("/users", json={"name": "Test"})
        client.get("/posts")

        # Filter by method
        response = client.get("/admin/logs?method=GET")
        assert response.status_code == 200
        data = response.json()
        assert all(log["method"] == "GET" for log in data["logs"])

        # Filter by path
        response = client.get("/admin/logs?path_pattern=/users")
        assert response.status_code == 200
        data = response.json()
        assert all("/users" in log["path"] for log in data["logs"])

    def test_update_response_data(self, client: TestClient):
        """Test updating response data."""
        new_data = {"users": [{"id": 1, "name": "Updated User"}]}

        response = client.put(
            "/admin/responses/users",
            json=new_data
        )

        assert response.status_code == 200

        # Verify updated data is returned
        response = client.get("/users")
        assert response.status_code == 200
        assert response.json() == new_data

    def test_create_scenario(self, client: TestClient):
        """Test creating a new scenario."""
        scenario_config = {
            "name": "test_scenario",
            "description": "Test scenario",
            "responses": {
                "/users": {"users": [{"id": 1, "name": "Test User"}]},
                "/posts": {"posts": []}
            }
        }

        response = client.post("/admin/scenarios", json=scenario_config)

        assert response.status_code == 201
        data = response.json()
        assert data["name"] == "test_scenario"
        assert "id" in data

    def test_switch_scenario(self, client: TestClient):
        """Test switching to a different scenario."""
        # Create scenario first
        scenario_config = {
            "name": "test_scenario",
            "responses": {
                "/users": {"users": [{"id": 999, "name": "Scenario User"}]}
            }
        }

        create_response = client.post("/admin/scenarios", json=scenario_config)
        scenario_id = create_response.json()["id"]

        # Switch to scenario
        response = client.post(f"/admin/scenarios/{scenario_id}/activate")
        assert response.status_code == 200

        # Verify scenario is active
        response = client.get("/users")
        assert response.status_code == 200
        data = response.json()
        assert data["users"][0]["id"] == 999

Integration Testing

Testing Component Interactions

# test_integration.py
import pytest
import asyncio
from pathlib import Path

from mockloop_mcp.generator import MockServerGenerator
from mockloop_mcp.server import MockServer
from mockloop_mcp.database import Database

class TestMockServerIntegration:
    """Test integration between components."""

    @pytest.mark.asyncio
    async def test_full_generation_and_startup_workflow(self, tmp_path, test_settings):
        """Test complete workflow from generation to server startup."""
        # 1. Generate mock server
        generator = MockServerGenerator()
        spec_path = self.create_test_spec(tmp_path)
        output_dir = tmp_path / "generated_server"

        options = GenerationOptions(
            output_dir=str(output_dir),
            auth_enabled=True,
            storage_enabled=True
        )

        result = generator.generate(spec_path, options)
        assert result.success is True

        # 2. Start generated server
        server = MockServer.from_directory(output_dir)
        await server.start()

        try:
            # 3. Test server functionality
            async with httpx.AsyncClient(base_url=server.base_url) as client:
                # Test health endpoint
                response = await client.get("/health")
                assert response.status_code == 200

                # Test generated API endpoint
                response = await client.get("/users")
                assert response.status_code == 200

                # Test admin endpoints
                response = await client.get("/admin/info")
                assert response.status_code == 200

                # Test request logging
                response = await client.get("/admin/logs")
                assert response.status_code == 200
                logs = response.json()["logs"]
                assert len(logs) >= 3  # health, users, admin/info

        finally:
            await server.stop()

    @pytest.mark.asyncio
    async def test_database_integration_with_server(self, tmp_path, test_settings):
        """Test database integration with running server."""
        # Setup server with database
        server = MockServer(
            config=ServerConfig(
                name="test_server",
                port=8080,
                storage_enabled=True,
                database_url="sqlite:///test_integration.db"
            )
        )

        await server.start()

        try:
            # Make requests to generate logs
            async with httpx.AsyncClient(base_url=server.base_url) as client:
                await client.get("/users")
                await client.post("/users", json={"name": "Test User"})
                await client.get("/users/1")

            # Verify logs in database
            db = server.database
            logs = await db.query_request_logs(limit=10)

            assert len(logs) == 3
            assert logs[0].method in ["GET", "POST"]
            assert all(log.path.startswith("/users") for log in logs)

            # Test log filtering
            get_logs = await db.query_request_logs(method="GET")
            assert len(get_logs) == 2

            post_logs = await db.query_request_logs(method="POST")
            assert len(post_logs) == 1

        finally:
            await server.stop()

    def create_test_spec(self, tmp_path: Path) -> str:
        """Create test OpenAPI specification."""
        spec = {
            "openapi": "3.0.0",
            "info": {"title": "Test API", "version": "1.0.0"},
            "paths": {
                "/users": {
                    "get": {
                        "responses": {
                            "200": {
                                "description": "List users",
                                "content": {
                                    "application/json": {
                                        "schema": {
                                            "type": "object",
                                            "properties": {
                                                "users": {
                                                    "type": "array",
                                                    "items": {"$ref": "#/components/schemas/User"}
                                                }
                                            }
                                        }
                                    }
                                }
                            }
                        }
                    },
                    "post": {
                        "requestBody": {
                            "content": {
                                "application/json": {
                                    "schema": {"$ref": "#/components/schemas/User"}
                                }
                            }
                        },
                        "responses": {
                            "201": {
                                "description": "User created",
                                "content": {
                                    "application/json": {
                                        "schema": {"$ref": "#/components/schemas/User"}
                                    }
                                }
                            }
                        }
                    }
                },
                "/users/{id}": {
                    "get": {
                        "parameters": [
                            {
                                "name": "id",
                                "in": "path",
                                "required": True,
                                "schema": {"type": "integer"}
                            }
                        ],
                        "responses": {
                            "200": {
                                "description": "Get user",
                                "content": {
                                    "application/json": {
                                        "schema": {"$ref": "#/components/schemas/User"}
                                    }
                                }
                            }
                        }
                    }
                }
            },
            "components": {
                "schemas": {
                    "User": {
                        "type": "object",
                        "properties": {
                            "id": {"type": "integer"},
                            "name": {"type": "string"},
                            "email": {"type": "string", "format": "email"}
                        }
                    }
                }
            }
        }

        spec_file = tmp_path / "test_spec.json"
        spec_file.write_text(json.dumps(spec))
        return str(spec_file)

End-to-End Testing

User Workflow Testing

# test_e2e.py
import pytest
import subprocess
import time
import requests
from pathlib import Path

class TestEndToEndWorkflows:
    """Test complete user workflows."""

    @pytest.mark.e2e
    def test_cli_generate_and_run_workflow(self, tmp_path):
        """Test CLI workflow: generate server and run it."""
        spec_file = self.create_openapi_spec(tmp_path)
        output_dir = tmp_path / "generated_server"

        # 1. Generate server using CLI
        result = subprocess.run([
            "python", "-m", "mockloop_mcp.cli",
            "generate",
            "--spec", str(spec_file),
            "--output", str(output_dir),
            "--auth-enabled",
            "--storage-enabled"
        ], capture_output=True, text=True)

        assert result.returncode == 0
        assert "Server generated successfully" in result.stdout

        # 2. Verify generated files
        assert (output_dir / "main.py").exists()
        assert (output_dir / "requirements.txt").exists()
        assert (output_dir / "Dockerfile").exists()
        assert (output_dir / "docker-compose.yml").exists()

        # 3. Start server using Docker Compose
        compose_process = subprocess.Popen([
            "docker-compose", "up", "-d"
        ], cwd=output_dir)

        try:
            # Wait for server to start
            time.sleep(10)

            # 4. Test server endpoints
            base_url = "http://localhost:8080"

            # Health check
            response = requests.get(f"{base_url}/health")
            assert response.status_code == 200

            # API endpoints
            response = requests.get(f"{base_url}/users")
            assert response.status_code == 200

            # Admin endpoints
            response = requests.get(f"{base_url}/admin/info")
            assert response.status_code == 200

            # Test request logging
            response = requests.get(f"{base_url}/admin/logs")
            assert response.status_code == 200
            logs = response.json()["logs"]
            assert len(logs) >= 3

        finally:
            # 5. Stop server
            subprocess.run([
                "docker-compose", "down"
            ], cwd=output_dir)

    @pytest.mark.e2e
    def test_mcp_tool_integration(self, tmp_path):
        """Test MCP tool integration workflow."""
        # This would test the actual MCP tool usage
        # in a real MCP client environment
        pass

    def create_openapi_spec(self, tmp_path: Path) -> Path:
        """Create test OpenAPI specification file."""
        spec = {
            "openapi": "3.0.0",
            "info": {
                "title": "E2E Test API",
                "version": "1.0.0",
                "description": "API for end-to-end testing"
            },
            "servers": [
                {"url": "http://localhost:8080", "description": "Local server"}
            ],
            "paths": {
                "/users": {
                    "get": {
                        "summary": "List users",
                        "responses": {
                            "200": {
                                "description": "Success",
                                "content": {
                                    "application/json": {
                                        "schema": {
                                            "type": "object",
                                            "properties": {
                                                "users": {
                                                    "type": "array",
                                                    "items": {"$ref": "#/components/schemas/User"}
                                                }
                                            }
                                        }
                                    }
                                }
                            }
                        }
                    }
                }
            },
            "components": {
                "schemas": {
                    "User": {
                        "type": "object",
                        "properties": {
                            "id": {"type": "integer"},
                            "name": {"type": "string"},
                            "email": {"type": "string", "format": "email"}
                        }
                    }
                }
            }
        }

        spec_file = tmp_path / "e2e_spec.json"
        spec_file.write_text(json.dumps(spec, indent=2))
        return spec_file

Performance Testing

Load Testing

# test_performance.py
import pytest
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
import httpx

class TestPerformance:
    """Test performance characteristics."""

    @pytest.mark.performance
    @pytest.mark.asyncio
    async def test_concurrent_request_handling(self, running_server):
        """Test server performance under concurrent load."""
        base_url = running_server.base_url
        num_requests = 100
        concurrent_clients = 10

        async def make_requests(client_id: int) -> list:
            """Make requests from a single client."""
            results = []
            async with httpx.AsyncClient(base_url=base_url) as client:
                for i in range(num_requests // concurrent_clients):
                    start_time = time.time()
                    response = await client.get("/users")
                    end_time = time.time()

                    results.append({
                        "client_id": client_id,
                        "request_id": i,
                        "status_code": response.status_code,
                        "response_time": end_time - start_time
                    })

            return results

        # Run concurrent clients
        start_time = time.time()
        tasks = [
            make_requests(client_id) 
            for client_id in range(concurrent_clients)
        ]

        results = await asyncio.gather(*tasks)
        end_time = time.time()

        # Analyze results
        all_results = [result for client_results in results for result in client_results]

        # Verify all requests succeeded
        assert all(r["status_code"] == 200 for r in all_results)
        assert len(all_results) == num_requests

        # Performance assertions
        total_time = end_time - start_time
        requests_per_second = num_requests / total_time

        assert requests_per_second > 50  # Minimum throughput

        response_times = [r["response_time"] for r in all_results]
        avg_response_time = sum(response_times) / len(response_times)
        max_response_time = max(response_times)

        assert avg_response_time < 0.1  # Average response time under 100ms
        assert max_response_time < 1.0  # Max response time under 1s

    @pytest.mark.performance
    def test_memory_usage_under_load(self, running_server):
        """Test memory usage during sustained load."""
        import psutil
        import gc

        process = psutil.Process()
        initial_memory = process.memory_info().rss

        # Generate sustained load
        with ThreadPoolExecutor(max_workers=5) as executor:
            futures = []

            for _ in range(1000):
                future = executor.submit(
                    requests.get, 
                    f"{running_server.base_url}/users"
                )
                futures.append(future)

            # Wait for completion
            for future in futures:
                response = future.result()
                assert response.status_code == 200

        # Force garbage collection
        gc.collect()

        final_memory = process.memory_info().rss
        memory_increase = final_memory - initial_memory

        # Memory increase should be reasonable (less than 100MB)
        assert memory_increase < 100 * 1024 * 1024

    @pytest.mark.performance
    @pytest.mark.benchmark
    def test_request_processing_benchmark(self, benchmark, client):
        """Benchmark request processing performance."""
        def process_request():
            response = client.get("/users")
            assert response.status_code == 200
            return response

        result = benchmark(process_request)

        # Benchmark assertions
        assert benchmark.stats.mean < 0.01  # Mean time under 10ms
        assert benchmark.stats.max < 0.1    # Max time under 100ms