Skip to content

Custom AI Workflows

MockLoop MCP provides a flexible foundation for creating custom AI workflow integrations beyond the standard frameworks. This guide covers patterns, techniques, and best practices for integrating MockLoop with custom AI systems, proprietary frameworks, and specialized workflows.

Overview

Custom AI workflows often have unique requirements that don't fit standard frameworks. MockLoop MCP enables:

  • Framework-Agnostic Integration: Work with any AI system that makes HTTP requests
  • Custom Protocol Support: Adapt to proprietary communication protocols
  • Specialized Testing Scenarios: Create domain-specific testing patterns
  • Hybrid Architectures: Integrate multiple AI systems and frameworks
  • Legacy System Integration: Connect with existing AI infrastructure

Core Integration Principles

1. HTTP-Based Integration

Most AI systems communicate via HTTP APIs, making integration straightforward:

from mockloop_mcp import MockLoopClient
import asyncio
import httpx

class CustomAIWorkflow:
    def __init__(self, workflow_name: str):
        self.workflow_name = workflow_name
        self.mockloop = MockLoopClient()
        self.services = {}
        self.workflow_state = {}

    async def register_service(self, service_name: str, spec_path: str, port: int):
        """Register a mock service for the workflow"""
        await self.mockloop.generate_mock_api(
            spec_url_or_path=spec_path,
            output_dir_name=f"{self.workflow_name}_{service_name}"
        )

        self.services[service_name] = f"http://localhost:{port}"

    async def configure_workflow_scenario(self, scenario_name: str, scenario_config: dict):
        """Configure a complete workflow scenario"""
        for service_name, service_config in scenario_config.items():
            if service_name in self.services:
                service_url = self.services[service_name]

                for endpoint, response_data in service_config.items():
                    await self.mockloop.manage_mock_data(
                        server_url=service_url,
                        operation="update_response",
                        endpoint_path=endpoint,
                        response_data=response_data
                    )

    async def execute_workflow_step(self, step_name: str, step_config: dict):
        """Execute a single workflow step"""
        service_name = step_config["service"]
        endpoint = step_config["endpoint"]
        method = step_config.get("method", "GET")
        data = step_config.get("data", {})

        if service_name not in self.services:
            raise ValueError(f"Service {service_name} not registered")

        service_url = self.services[service_name]

        async with httpx.AsyncClient() as client:
            if method.upper() == "GET":
                response = await client.get(f"{service_url}{endpoint}")
            elif method.upper() == "POST":
                response = await client.post(f"{service_url}{endpoint}", json=data)
            elif method.upper() == "PUT":
                response = await client.put(f"{service_url}{endpoint}", json=data)
            else:
                raise ValueError(f"Unsupported method: {method}")

        result = {
            "step": step_name,
            "status_code": response.status_code,
            "response": response.json() if response.headers.get("content-type", "").startswith("application/json") else response.text,
            "success": 200 <= response.status_code < 300
        }

        # Update workflow state
        self.workflow_state[step_name] = result

        return result

2. State-Driven Workflows

Create workflows that adapt based on current state:

class StateDrivenAIWorkflow:
    def __init__(self, workflow_definition: dict):
        self.workflow_definition = workflow_definition
        self.mockloop = MockLoopClient()
        self.current_state = "initial"
        self.state_history = []
        self.context = {}

    async def setup_workflow(self):
        """Setup all services defined in the workflow"""
        for service_name, service_config in self.workflow_definition["services"].items():
            await self.mockloop.generate_mock_api(
                spec_url_or_path=service_config["spec"],
                output_dir_name=f"workflow_{service_name}"
            )

    async def transition_to_state(self, new_state: str, context_updates: dict = None):
        """Transition workflow to a new state"""
        if new_state not in self.workflow_definition["states"]:
            raise ValueError(f"Unknown state: {new_state}")

        # Update context
        if context_updates:
            self.context.update(context_updates)

        # Record state transition
        self.state_history.append({
            "from_state": self.current_state,
            "to_state": new_state,
            "timestamp": time.time(),
            "context": self.context.copy()
        })

        self.current_state = new_state

        # Configure mock services for new state
        state_config = self.workflow_definition["states"][new_state]
        await self.configure_state_services(state_config)

    async def configure_state_services(self, state_config: dict):
        """Configure mock services for current state"""
        for service_name, service_responses in state_config.get("services", {}).items():
            service_url = f"http://localhost:{self.workflow_definition['services'][service_name]['port']}"

            for endpoint, response_template in service_responses.items():
                # Process response template with current context
                response_data = self.process_response_template(response_template, self.context)

                await self.mockloop.manage_mock_data(
                    server_url=service_url,
                    operation="update_response",
                    endpoint_path=endpoint,
                    response_data=response_data
                )

    def process_response_template(self, template: dict, context: dict) -> dict:
        """Process response template with context variables"""
        import json

        # Convert template to string, substitute variables, convert back
        template_str = json.dumps(template)

        for key, value in context.items():
            template_str = template_str.replace(f"${{{key}}}", str(value))

        return json.loads(template_str)

    async def execute_state_actions(self):
        """Execute actions defined for current state"""
        state_config = self.workflow_definition["states"][self.current_state]
        actions = state_config.get("actions", [])

        results = []
        for action in actions:
            result = await self.execute_action(action)
            results.append(result)

            # Check for state transitions based on action results
            if result.get("trigger_transition"):
                next_state = result["trigger_transition"]
                await self.transition_to_state(next_state, result.get("context_updates", {}))

        return results

    async def execute_action(self, action: dict):
        """Execute a single action"""
        action_type = action["type"]

        if action_type == "api_call":
            return await self.execute_api_call_action(action)
        elif action_type == "condition_check":
            return await self.execute_condition_check(action)
        elif action_type == "data_transform":
            return await self.execute_data_transform(action)
        else:
            raise ValueError(f"Unknown action type: {action_type}")

    async def execute_api_call_action(self, action: dict):
        """Execute API call action"""
        service_name = action["service"]
        endpoint = action["endpoint"]
        method = action.get("method", "GET")

        service_config = self.workflow_definition["services"][service_name]
        service_url = f"http://localhost:{service_config['port']}"

        async with httpx.AsyncClient() as client:
            if method.upper() == "GET":
                response = await client.get(f"{service_url}{endpoint}")
            elif method.upper() == "POST":
                data = action.get("data", {})
                # Process data template with context
                processed_data = self.process_response_template(data, self.context)
                response = await client.post(f"{service_url}{endpoint}", json=processed_data)

        result = {
            "action": action["name"],
            "type": "api_call",
            "status_code": response.status_code,
            "response": response.json() if response.headers.get("content-type", "").startswith("application/json") else response.text,
            "success": 200 <= response.status_code < 300
        }

        # Check for transition conditions
        if "transition_on_success" in action and result["success"]:
            result["trigger_transition"] = action["transition_on_success"]
        elif "transition_on_failure" in action and not result["success"]:
            result["trigger_transition"] = action["transition_on_failure"]

        return result

# Example workflow definition
ml_pipeline_workflow = {
    "services": {
        "data_service": {"spec": "./data-api.yaml", "port": 8001},
        "model_service": {"spec": "./model-api.yaml", "port": 8002},
        "validation_service": {"spec": "./validation-api.yaml", "port": 8003}
    },
    "states": {
        "initial": {
            "services": {
                "data_service": {
                    "/status": {"status": "ready", "data_available": True}
                }
            },
            "actions": [
                {
                    "name": "check_data_availability",
                    "type": "api_call",
                    "service": "data_service",
                    "endpoint": "/status",
                    "transition_on_success": "data_loading"
                }
            ]
        },
        "data_loading": {
            "services": {
                "data_service": {
                    "/data": {"data": "${data_batch}", "size": "${batch_size}"}
                }
            },
            "actions": [
                {
                    "name": "load_data_batch",
                    "type": "api_call",
                    "service": "data_service",
                    "endpoint": "/data",
                    "transition_on_success": "model_training"
                }
            ]
        },
        "model_training": {
            "services": {
                "model_service": {
                    "/train": {"status": "training", "progress": "${training_progress}"},
                    "/status": {"status": "training", "epoch": "${current_epoch}"}
                }
            },
            "actions": [
                {
                    "name": "start_training",
                    "type": "api_call",
                    "service": "model_service",
                    "endpoint": "/train",
                    "method": "POST",
                    "data": {"batch_size": "${batch_size}", "epochs": "${max_epochs}"},
                    "transition_on_success": "model_validation"
                }
            ]
        },
        "model_validation": {
            "services": {
                "validation_service": {
                    "/validate": {"accuracy": "${model_accuracy}", "status": "complete"}
                }
            },
            "actions": [
                {
                    "name": "validate_model",
                    "type": "api_call",
                    "service": "validation_service",
                    "endpoint": "/validate",
                    "method": "POST",
                    "transition_on_success": "complete"
                }
            ]
        },
        "complete": {
            "services": {},
            "actions": []
        }
    }
}

3. Event-Driven Workflows

Create workflows that respond to events and webhooks:

import asyncio
from typing import Dict, List, Callable

class EventDrivenAIWorkflow:
    def __init__(self, workflow_name: str):
        self.workflow_name = workflow_name
        self.mockloop = MockLoopClient()
        self.event_handlers: Dict[str, List[Callable]] = {}
        self.webhook_servers = {}
        self.event_queue = asyncio.Queue()
        self.running = False

    async def setup_webhook_service(self, service_name: str, port: int):
        """Setup webhook service for receiving events"""
        await self.mockloop.generate_mock_api(
            spec_url_or_path="./webhook-api.yaml",
            output_dir_name=f"{self.workflow_name}_webhook_{service_name}"
        )

        self.webhook_servers[service_name] = f"http://localhost:{port}"

        # Configure webhook endpoint to capture events
        await self.mockloop.manage_mock_data(
            server_url=self.webhook_servers[service_name],
            operation="update_response",
            endpoint_path="/webhook",
            response_data={"status": "received", "timestamp": "{{timestamp}}"}
        )

    def register_event_handler(self, event_type: str, handler: Callable):
        """Register handler for specific event type"""
        if event_type not in self.event_handlers:
            self.event_handlers[event_type] = []

        self.event_handlers[event_type].append(handler)

    async def emit_event(self, event_type: str, event_data: dict):
        """Emit an event to the workflow"""
        event = {
            "type": event_type,
            "data": event_data,
            "timestamp": time.time()
        }

        await self.event_queue.put(event)

    async def start_event_processing(self):
        """Start processing events from the queue"""
        self.running = True

        while self.running:
            try:
                # Wait for event with timeout
                event = await asyncio.wait_for(self.event_queue.get(), timeout=1.0)
                await self.process_event(event)
            except asyncio.TimeoutError:
                continue
            except Exception as e:
                print(f"Error processing event: {e}")

    async def process_event(self, event: dict):
        """Process a single event"""
        event_type = event["type"]

        if event_type in self.event_handlers:
            for handler in self.event_handlers[event_type]:
                try:
                    await handler(event)
                except Exception as e:
                    print(f"Error in event handler: {e}")

    async def stop_event_processing(self):
        """Stop event processing"""
        self.running = False

# Example event-driven workflow
class AIModelMonitoringWorkflow(EventDrivenAIWorkflow):
    def __init__(self):
        super().__init__("ai_model_monitoring")
        self.model_metrics = {}
        self.alert_thresholds = {
            "accuracy": 0.85,
            "latency_ms": 1000,
            "error_rate": 0.05
        }

    async def setup(self):
        """Setup monitoring workflow"""
        await self.setup_webhook_service("metrics", 8001)
        await self.setup_webhook_service("alerts", 8002)

        # Register event handlers
        self.register_event_handler("model_prediction", self.handle_prediction_event)
        self.register_event_handler("model_error", self.handle_error_event)
        self.register_event_handler("performance_metric", self.handle_metric_event)

    async def handle_prediction_event(self, event: dict):
        """Handle model prediction events"""
        prediction_data = event["data"]
        model_id = prediction_data["model_id"]

        # Update model metrics
        if model_id not in self.model_metrics:
            self.model_metrics[model_id] = {
                "predictions": 0,
                "errors": 0,
                "total_latency": 0
            }

        metrics = self.model_metrics[model_id]
        metrics["predictions"] += 1
        metrics["total_latency"] += prediction_data.get("latency_ms", 0)

        # Check for performance issues
        avg_latency = metrics["total_latency"] / metrics["predictions"]
        if avg_latency > self.alert_thresholds["latency_ms"]:
            await self.emit_event("performance_alert", {
                "model_id": model_id,
                "alert_type": "high_latency",
                "value": avg_latency,
                "threshold": self.alert_thresholds["latency_ms"]
            })

    async def handle_error_event(self, event: dict):
        """Handle model error events"""
        error_data = event["data"]
        model_id = error_data["model_id"]

        if model_id in self.model_metrics:
            self.model_metrics[model_id]["errors"] += 1

            # Calculate error rate
            metrics = self.model_metrics[model_id]
            error_rate = metrics["errors"] / metrics["predictions"]

            if error_rate > self.alert_thresholds["error_rate"]:
                await self.emit_event("performance_alert", {
                    "model_id": model_id,
                    "alert_type": "high_error_rate",
                    "value": error_rate,
                    "threshold": self.alert_thresholds["error_rate"]
                })

    async def handle_metric_event(self, event: dict):
        """Handle performance metric events"""
        metric_data = event["data"]

        # Configure mock responses based on metrics
        if metric_data["metric_type"] == "accuracy":
            accuracy = metric_data["value"]

            if accuracy < self.alert_thresholds["accuracy"]:
                # Configure alert service to return alert
                await self.mockloop.manage_mock_data(
                    server_url=self.webhook_servers["alerts"],
                    operation="update_response",
                    endpoint_path="/alert",
                    response_data={
                        "alert": True,
                        "type": "low_accuracy",
                        "model_id": metric_data["model_id"],
                        "accuracy": accuracy
                    }
                )

Advanced Integration Patterns

1. Multi-Framework Orchestration

Orchestrate multiple AI frameworks in a single workflow:

class MultiFrameworkOrchestrator:
    def __init__(self):
        self.mockloop = MockLoopClient()
        self.frameworks = {}
        self.data_flow = {}

    async def register_framework(self, framework_name: str, framework_config: dict):
        """Register an AI framework with its mock services"""
        self.frameworks[framework_name] = framework_config

        # Setup mock services for framework
        for service_name, service_config in framework_config["services"].items():
            await self.mockloop.generate_mock_api(
                spec_url_or_path=service_config["spec"],
                output_dir_name=f"{framework_name}_{service_name}"
            )

    async def configure_data_flow(self, flow_definition: dict):
        """Configure data flow between frameworks"""
        self.data_flow = flow_definition

        # Setup data transformation endpoints
        for step_name, step_config in flow_definition["steps"].items():
            source_framework = step_config["source_framework"]
            target_framework = step_config["target_framework"]

            # Configure mock endpoints for data transformation
            await self.setup_data_transformation(step_name, step_config)

    async def setup_data_transformation(self, step_name: str, step_config: dict):
        """Setup data transformation between frameworks"""
        transform_service_url = f"http://localhost:{step_config['transform_port']}"

        # Generate transformation service
        await self.mockloop.generate_mock_api(
            spec_url_or_path="./data-transform-api.yaml",
            output_dir_name=f"transform_{step_name}"
        )

        # Configure transformation logic
        await self.mockloop.manage_mock_data(
            server_url=transform_service_url,
            operation="update_response",
            endpoint_path="/transform",
            response_data={
                "transformed_data": step_config["transformation_template"],
                "source_format": step_config["source_format"],
                "target_format": step_config["target_format"]
            }
        )

    async def execute_orchestrated_workflow(self, workflow_input: dict):
        """Execute workflow across multiple frameworks"""
        results = {}
        current_data = workflow_input

        for step_name in self.data_flow["execution_order"]:
            step_config = self.data_flow["steps"][step_name]

            # Execute step
            step_result = await self.execute_framework_step(
                step_name, step_config, current_data
            )

            results[step_name] = step_result

            # Update current data for next step
            if step_result["success"]:
                current_data = step_result["output_data"]
            else:
                # Handle step failure
                break

        return results

    async def execute_framework_step(self, step_name: str, step_config: dict, input_data: dict):
        """Execute a single framework step"""
        framework_name = step_config["framework"]
        framework_config = self.frameworks[framework_name]

        # Call framework service
        service_url = f"http://localhost:{framework_config['services']['main']['port']}"
        endpoint = step_config["endpoint"]

        async with httpx.AsyncClient() as client:
            response = await client.post(
                f"{service_url}{endpoint}",
                json=input_data
            )

        if response.status_code == 200:
            output_data = response.json()

            # Apply data transformation if needed
            if "transform_to" in step_config:
                output_data = await self.transform_data(
                    output_data,
                    step_config["transform_to"]
                )

            return {
                "step": step_name,
                "framework": framework_name,
                "success": True,
                "output_data": output_data
            }
        else:
            return {
                "step": step_name,
                "framework": framework_name,
                "success": False,
                "error": response.text
            }

    async def transform_data(self, data: dict, transform_config: dict):
        """Transform data between framework formats"""
        transform_service_url = f"http://localhost:{transform_config['port']}"

        async with httpx.AsyncClient() as client:
            response = await client.post(
                f"{transform_service_url}/transform",
                json={"data": data, "config": transform_config}
            )

        if response.status_code == 200:
            return response.json()["transformed_data"]
        else:
            raise Exception(f"Data transformation failed: {response.text}")

# Example multi-framework workflow
nlp_pipeline_config = {
    "frameworks": {
        "preprocessing": {
            "services": {
                "main": {"spec": "./preprocessing-api.yaml", "port": 8001}
            }
        },
        "langchain": {
            "services": {
                "main": {"spec": "./langchain-api.yaml", "port": 8002}
            }
        },
        "custom_ml": {
            "services": {
                "main": {"spec": "./custom-ml-api.yaml", "port": 8003}
            }
        }
    },
    "data_flow": {
        "execution_order": ["preprocess", "extract_features", "classify"],
        "steps": {
            "preprocess": {
                "framework": "preprocessing",
                "endpoint": "/preprocess",
                "transform_to": {"port": 8004, "format": "langchain_input"}
            },
            "extract_features": {
                "framework": "langchain",
                "endpoint": "/extract_features",
                "transform_to": {"port": 8005, "format": "ml_input"}
            },
            "classify": {
                "framework": "custom_ml",
                "endpoint": "/classify"
            }
        }
    }
}

2. Legacy System Integration

Integrate with legacy AI systems and proprietary protocols:

class LegacyAISystemIntegrator:
    def __init__(self):
        self.mockloop = MockLoopClient()
        self.protocol_adapters = {}
        self.legacy_systems = {}

    async def register_legacy_system(self, system_name: str, system_config: dict):
        """Register a legacy AI system"""
        self.legacy_systems[system_name] = system_config

        # Create protocol adapter
        adapter_config = system_config["protocol_adapter"]
        await self.create_protocol_adapter(system_name, adapter_config)

    async def create_protocol_adapter(self, system_name: str, adapter_config: dict):
        """Create protocol adapter for legacy system"""
        protocol_type = adapter_config["type"]

        if protocol_type == "soap":
            await self.create_soap_adapter(system_name, adapter_config)
        elif protocol_type == "xml_rpc":
            await self.create_xml_rpc_adapter(system_name, adapter_config)
        elif protocol_type == "custom_tcp":
            await self.create_tcp_adapter(system_name, adapter_config)
        else:
            raise ValueError(f"Unsupported protocol: {protocol_type}")

    async def create_soap_adapter(self, system_name: str, adapter_config: dict):
        """Create SOAP protocol adapter"""
        # Generate mock SOAP service
        await self.mockloop.generate_mock_api(
            spec_url_or_path="./soap-adapter-api.yaml",
            output_dir_name=f"{system_name}_soap_adapter"
        )

        adapter_url = f"http://localhost:{adapter_config['port']}"
        self.protocol_adapters[system_name] = {
            "type": "soap",
            "url": adapter_url,
            "config": adapter_config
        }

        # Configure SOAP response templates
        soap_responses = adapter_config.get("responses", {})
        for operation, response_template in soap_responses.items():
            await self.mockloop.manage_mock_data(
                server_url=adapter_url,
                operation="update_response",
                endpoint_path=f"/soap/{operation}",
                response_data=response_template
            )

    async def create_xml_rpc_adapter(self, system_name: str, adapter_config: dict):
        """Create XML-RPC protocol adapter"""
        await self.mockloop.generate_mock_api(
            spec_url_or_path="./xmlrpc-adapter-api.yaml",
            output_dir_name=f"{system_name}_xmlrpc_adapter"
        )

        adapter_url = f"http://localhost:{adapter_config['port']}"
        self.protocol_adapters[system_name] = {
            "type": "xml_rpc",
            "url": adapter_url,
            "config": adapter_config
        }

    async def call_legacy_system(self, system_name: str, operation: str, parameters: dict):
        """Call legacy system through protocol adapter"""
        if system_name not in self.protocol_adapters:
            raise ValueError(f"No adapter for system: {system_name}")

        adapter = self.protocol_adapters[system_name]
        adapter_type = adapter["type"]

        if adapter_type == "soap":
            return await self.call_soap_service(adapter, operation, parameters)
        elif adapter_type == "xml_rpc":
            return await self.call_xml_rpc_service(adapter, operation, parameters)
        else:
            raise ValueError(f"Unsupported adapter type: {adapter_type}")

    async def call_soap_service(self, adapter: dict, operation: str, parameters: dict):
        """Call SOAP service through adapter"""
        adapter_url = adapter["url"]

        # Create SOAP envelope
        soap_envelope = self.create_soap_envelope(operation, parameters)

        async with httpx.AsyncClient() as client:
            response = await client.post(
                f"{adapter_url}/soap/{operation}",
                content=soap_envelope,
                headers={"Content-Type": "text/xml; charset=utf-8"}
            )

        if response.status_code == 200:
            return self.parse_soap_response(response.text)
        else:
            raise Exception(f"SOAP call failed: {response.text}")

    def create_soap_envelope(self, operation: str, parameters: dict) -> str:
        """Create SOAP envelope for operation"""
        # Simplified SOAP envelope creation
        params_xml = ""
        for key, value in parameters.items():
            params_xml += f"<{key}>{value}</{key}>"

        return f"""<?xml version="1.0" encoding="utf-8"?>
        <soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/">
            <soap:Body>
                <{operation}>
                    {params_xml}
                </{operation}>
            </soap:Body>
        </soap:Envelope>"""

    def parse_soap_response(self, response_xml: str) -> dict:
        """Parse SOAP response"""
        # Simplified SOAP response parsing
        import xml.etree.ElementTree as ET

        root = ET.fromstring(response_xml)

        # Extract response data (simplified)
        result = {}
        for elem in root.iter():
            if elem.text and elem.tag.split('}')[-1] not in ['Envelope', 'Body']:
                result[elem.tag.split('}')[-1]] = elem.text

        return result

Testing Custom Workflows

Comprehensive Workflow Testing

```python import pytest import asyncio

class CustomWorkflowTester: def init(self): self.mockloop = MockLoopClient() self.test_scenarios = {} self.performance_metrics = {}

async def setup_test_environment(self, workflow_config: dict):
    """Setup test environment for custom workflow"""
    # Generate all required mock services
    for service_name, service_config in workflow_config["services"].items():
        await self.mockloop.generate_mock_api(
            spec_url_or_path=service_config["spec"],
            output_dir_name=f"test_{service_name}"
        )

async def create_test_scenario(self, scenario_name: str, scenario_config: dict):
    """Create a test scenario"""
    self.test_scenarios[scenario_name] = scenario_config

    # Configure mock services for scenario
    for service_name, service_responses in scenario_config["services"].items():
        service_url = f"http://localhost:{scenario_config['ports'][service_name]}"

        for endpoint, response_data in service_responses.items():
            await self.mockloop.manage_mock_data(
                server_url=service_url,
                operation="update_response",
                endpoint_path=endpoint,
                response_data=response_data
            )

async def test_workflow_scenario(self, workflow, scenario_name: str, test_input: dict):
    """Test workflow with specific scenario"""
    if scenario_name not in self.test_scenarios:
        raise ValueError(f"Scenario {scenario_name} not found")

    # Switch to test scenario
    scenario_config = self.test_scenarios[scenario_name]
    await self.activate_scenario(scenario_config)

    # Execute workflow
    start_time = time.time()

    try:
        result = await workflow.execute