Security Considerations
This document provides comprehensive security guidance for MockLoop MCP deployments, covering authentication, authorization, data protection, network security, and compliance considerations.
Overview
Security in MockLoop MCP involves multiple layers:
- Authentication: Verifying user and system identities
- Authorization: Controlling access to resources and operations
- Data Protection: Securing data at rest and in transit
- Network Security: Protecting network communications
- Audit and Compliance: Logging and monitoring for security compliance
- Vulnerability Management: Identifying and mitigating security risks
Authentication
Multi-Factor Authentication
class MultiFactorAuthentication:
"""Multi-factor authentication implementation."""
def __init__(self, config: MFAConfig):
self.config = config
self.totp_generator = TOTPGenerator()
self.sms_provider = SMSProvider(config.sms)
self.email_provider = EmailProvider(config.email)
async def authenticate_user(self, credentials: UserCredentials) -> AuthResult:
"""Authenticate user with multiple factors."""
# First factor: username/password
primary_auth = await self.verify_primary_credentials(credentials)
if not primary_auth.success:
return AuthResult(False, "Invalid credentials")
user = primary_auth.user
# Check if MFA is required
if not user.mfa_enabled:
return AuthResult(True, "Authentication successful", user)
# Second factor: TOTP, SMS, or email
mfa_result = await self.verify_second_factor(user, credentials.mfa_token)
if not mfa_result.success:
return AuthResult(False, "Invalid MFA token")
return AuthResult(True, "MFA authentication successful", user)
async def verify_primary_credentials(self, credentials: UserCredentials) -> AuthResult:
"""Verify username and password."""
user = await self.get_user_by_username(credentials.username)
if not user:
return AuthResult(False, "User not found")
# Verify password hash
if not self.verify_password_hash(credentials.password, user.password_hash):
# Log failed attempt
await self.log_failed_login_attempt(user.id, credentials.client_ip)
return AuthResult(False, "Invalid password")
# Check account status
if not user.is_active:
return AuthResult(False, "Account disabled")
# Check for account lockout
if await self.is_account_locked(user.id):
return AuthResult(False, "Account locked due to failed attempts")
return AuthResult(True, "Primary authentication successful", user)
async def verify_second_factor(self, user: User, mfa_token: str) -> AuthResult:
"""Verify second factor authentication."""
if user.mfa_method == "totp":
return await self.verify_totp_token(user, mfa_token)
elif user.mfa_method == "sms":
return await self.verify_sms_token(user, mfa_token)
elif user.mfa_method == "email":
return await self.verify_email_token(user, mfa_token)
return AuthResult(False, "Unknown MFA method")
async def verify_totp_token(self, user: User, token: str) -> AuthResult:
"""Verify TOTP token."""
# Get user's TOTP secret
totp_secret = await self.get_user_totp_secret(user.id)
# Verify token with time window tolerance
if self.totp_generator.verify_token(totp_secret, token, window=1):
return AuthResult(True, "TOTP verification successful")
return AuthResult(False, "Invalid TOTP token")
API Key Management
class APIKeyManager:
"""Manages API keys for authentication."""
def __init__(self, config: APIKeyConfig):
self.config = config
self.key_store = APIKeyStore(config.storage)
self.rate_limiter = RateLimiter(config.rate_limits)
async def generate_api_key(self, user_id: str, permissions: List[str],
expires_at: Optional[datetime] = None) -> APIKey:
"""Generate new API key for user."""
# Generate secure random key
key_value = self.generate_secure_key()
# Create API key record
api_key = APIKey(
id=generate_uuid(),
user_id=user_id,
key_hash=self.hash_key(key_value),
permissions=permissions,
created_at=datetime.utcnow(),
expires_at=expires_at,
last_used_at=None,
is_active=True
)
# Store in database
await self.key_store.store_api_key(api_key)
# Return key with plaintext value (only time it's available)
api_key.key_value = key_value
return api_key
async def validate_api_key(self, key_value: str, required_permission: str = None) -> ValidationResult:
"""Validate API key and check permissions."""
# Hash the provided key
key_hash = self.hash_key(key_value)
# Look up API key
api_key = await self.key_store.get_api_key_by_hash(key_hash)
if not api_key:
return ValidationResult(False, "Invalid API key")
# Check if key is active
if not api_key.is_active:
return ValidationResult(False, "API key is disabled")
# Check expiration
if api_key.expires_at and api_key.expires_at < datetime.utcnow():
return ValidationResult(False, "API key has expired")
# Check rate limits
if not await self.rate_limiter.check_rate_limit(api_key.id):
return ValidationResult(False, "Rate limit exceeded")
# Check permissions
if required_permission and required_permission not in api_key.permissions:
return ValidationResult(False, f"Missing permission: {required_permission}")
# Update last used timestamp
await self.key_store.update_last_used(api_key.id)
return ValidationResult(True, "API key valid", api_key)
def generate_secure_key(self) -> str:
"""Generate cryptographically secure API key."""
# Generate 32 bytes of random data
random_bytes = secrets.token_bytes(32)
# Encode as base64 with URL-safe characters
key_value = base64.urlsafe_b64encode(random_bytes).decode('ascii')
# Add prefix for identification
return f"mlcp_{key_value}"
def hash_key(self, key_value: str) -> str:
"""Hash API key for secure storage."""
# Use SHA-256 with salt
salt = self.config.key_salt.encode('utf-8')
key_bytes = key_value.encode('utf-8')
hash_obj = hashlib.sha256(salt + key_bytes)
return hash_obj.hexdigest()
class APIKeyMiddleware:
"""Middleware for API key authentication."""
def __init__(self, api_key_manager: APIKeyManager):
self.api_key_manager = api_key_manager
async def __call__(self, request: Request, call_next):
# Skip authentication for public endpoints
if self.is_public_endpoint(request.url.path):
return await call_next(request)
# Extract API key from header
api_key = request.headers.get("X-API-Key")
if not api_key:
return Response(
status_code=401,
content={"error": "API key required"}
)
# Validate API key
validation_result = await self.api_key_manager.validate_api_key(api_key)
if not validation_result.success:
return Response(
status_code=401,
content={"error": validation_result.message}
)
# Add user context to request
request.state.user = validation_result.api_key.user_id
request.state.permissions = validation_result.api_key.permissions
return await call_next(request)
JWT Authentication
class JWTAuthenticator:
"""JWT-based authentication system."""
def __init__(self, config: JWTConfig):
self.config = config
self.secret_key = config.secret_key
self.algorithm = config.algorithm
self.expiration_time = config.expiration_hours * 3600
async def generate_token(self, user: User) -> str:
"""Generate JWT token for user."""
now = datetime.utcnow()
payload = {
"sub": user.id,
"username": user.username,
"email": user.email,
"roles": user.roles,
"permissions": user.permissions,
"iat": now,
"exp": now + timedelta(seconds=self.expiration_time),
"iss": self.config.issuer,
"aud": self.config.audience
}
token = jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
# Store token for revocation checking
await self.store_active_token(user.id, token, payload["exp"])
return token
async def validate_token(self, token: str) -> TokenValidationResult:
"""Validate JWT token."""
try:
# Decode and verify token
payload = jwt.decode(
token,
self.secret_key,
algorithms=[self.algorithm],
audience=self.config.audience,
issuer=self.config.issuer
)
# Check if token is revoked
if await self.is_token_revoked(token):
return TokenValidationResult(False, "Token has been revoked")
# Get user information
user_id = payload["sub"]
user = await self.get_user_by_id(user_id)
if not user or not user.is_active:
return TokenValidationResult(False, "User not found or inactive")
return TokenValidationResult(True, "Token valid", user, payload)
except jwt.ExpiredSignatureError:
return TokenValidationResult(False, "Token has expired")
except jwt.InvalidTokenError as e:
return TokenValidationResult(False, f"Invalid token: {str(e)}")
async def revoke_token(self, token: str) -> bool:
"""Revoke JWT token."""
try:
# Decode token to get expiration
payload = jwt.decode(
token,
self.secret_key,
algorithms=[self.algorithm],
options={"verify_exp": False} # Don't verify expiration for revocation
)
# Add to revocation list
await self.add_to_revocation_list(token, payload["exp"])
return True
except jwt.InvalidTokenError:
return False
async def refresh_token(self, token: str) -> Optional[str]:
"""Refresh JWT token if valid and not expired."""
validation_result = await self.validate_token(token)
if not validation_result.success:
return None
# Generate new token
new_token = await self.generate_token(validation_result.user)
# Revoke old token
await self.revoke_token(token)
return new_token
Authorization
Role-Based Access Control (RBAC)
class RoleBasedAccessControl:
"""Role-based access control system."""
def __init__(self, config: RBACConfig):
self.config = config
self.role_store = RoleStore(config.storage)
self.permission_cache = PermissionCache(config.cache)
async def check_permission(self, user_id: str, resource: str, action: str) -> bool:
"""Check if user has permission for action on resource."""
# Get user permissions from cache or database
permissions = await self.get_user_permissions(user_id)
# Check direct permissions
if self.has_direct_permission(permissions, resource, action):
return True
# Check role-based permissions
user_roles = await self.get_user_roles(user_id)
for role in user_roles:
role_permissions = await self.get_role_permissions(role.id)
if self.has_direct_permission(role_permissions, resource, action):
return True
return False
def has_direct_permission(self, permissions: List[Permission], resource: str, action: str) -> bool:
"""Check if permissions list contains required permission."""
for permission in permissions:
if self.permission_matches(permission, resource, action):
return True
return False
def permission_matches(self, permission: Permission, resource: str, action: str) -> bool:
"""Check if permission matches resource and action."""
# Exact match
if permission.resource == resource and permission.action == action:
return True
# Wildcard matching
if permission.resource == "*" or permission.action == "*":
return True
# Pattern matching
if self.matches_pattern(permission.resource, resource) and \
self.matches_pattern(permission.action, action):
return True
return False
def matches_pattern(self, pattern: str, value: str) -> bool:
"""Check if value matches pattern (supports wildcards)."""
import fnmatch
return fnmatch.fnmatch(value, pattern)
async def assign_role_to_user(self, user_id: str, role_id: str) -> bool:
"""Assign role to user."""
# Verify role exists
role = await self.role_store.get_role(role_id)
if not role:
return False
# Assign role
await self.role_store.assign_user_role(user_id, role_id)
# Clear permission cache for user
await self.permission_cache.clear_user_permissions(user_id)
return True
async def create_role(self, role_data: RoleData) -> Role:
"""Create new role with permissions."""
role = Role(
id=generate_uuid(),
name=role_data.name,
description=role_data.description,
permissions=role_data.permissions,
created_at=datetime.utcnow()
)
await self.role_store.store_role(role)
return role
# Predefined roles and permissions
class DefaultRoles:
"""Default roles and permissions for MockLoop MCP."""
ADMIN = Role(
name="admin",
description="Full system administrator",
permissions=[
Permission("*", "*"), # All permissions
]
)
DEVELOPER = Role(
name="developer",
description="Developer with server management access",
permissions=[
Permission("servers", "create"),
Permission("servers", "read"),
Permission("servers", "update"),
Permission("servers", "delete"),
Permission("scenarios", "*"),
Permission("logs", "read"),
Permission("webhooks", "*")
]
)
VIEWER = Role(
name="viewer",
description="Read-only access to servers and logs",
permissions=[
Permission("servers", "read"),
Permission("scenarios", "read"),
Permission("logs", "read")
]
)
API_USER = Role(
name="api_user",
description="API access for external integrations",
permissions=[
Permission("servers", "read"),
Permission("scenarios", "read"),
Permission("scenarios", "switch"),
Permission("mock_data", "update")
]
)
class AuthorizationMiddleware:
"""Middleware for authorization checking."""
def __init__(self, rbac: RoleBasedAccessControl):
self.rbac = rbac
async def __call__(self, request: Request, call_next):
# Skip authorization for public endpoints
if self.is_public_endpoint(request.url.path):
return await call_next(request)
# Get user from request (set by authentication middleware)
user_id = getattr(request.state, "user", None)
if not user_id:
return Response(
status_code=401,
content={"error": "Authentication required"}
)
# Determine required permission
resource, action = self.extract_permission_requirements(request)
# Check authorization
if not await self.rbac.check_permission(user_id, resource, action):
return Response(
status_code=403,
content={"error": f"Insufficient permissions for {action} on {resource}"}
)
return await call_next(request)
def extract_permission_requirements(self, request: Request) -> Tuple[str, str]:
"""Extract resource and action from request."""
path = request.url.path
method = request.method
# Map HTTP methods to actions
method_action_map = {
"GET": "read",
"POST": "create",
"PUT": "update",
"PATCH": "update",
"DELETE": "delete"
}
action = method_action_map.get(method, "unknown")
# Extract resource from path
if path.startswith("/admin/api/v1/servers"):
resource = "servers"
elif path.startswith("/admin/api/v1/scenarios"):
resource = "scenarios"
elif path.startswith("/admin/api/v1/logs"):
resource = "logs"
elif path.startswith("/admin/api/v1/webhooks"):
resource = "webhooks"
else:
resource = "unknown"
return resource, action
Data Protection
Encryption at Rest
class DataEncryption:
"""Data encryption for sensitive information."""
def __init__(self, config: EncryptionConfig):
self.config = config
self.cipher_suite = self.create_cipher_suite()
self.key_manager = KeyManager(config.key_management)
def create_cipher_suite(self) -> Fernet:
"""Create encryption cipher suite."""
# Get encryption key from key manager
encryption_key = self.key_manager.get_encryption_key()
# Create Fernet cipher
return Fernet(encryption_key)
def encrypt_sensitive_data(self, data: str) -> str:
"""Encrypt sensitive data."""
if not data:
return data
# Convert to bytes
data_bytes = data.encode('utf-8')
# Encrypt
encrypted_bytes = self.cipher_suite.encrypt(data_bytes)
# Return base64 encoded string
return base64.b64encode(encrypted_bytes).decode('ascii')
def decrypt_sensitive_data(self, encrypted_data: str) -> str:
"""Decrypt sensitive data."""
if not encrypted_data:
return encrypted_data
try:
# Decode from base64
encrypted_bytes = base64.b64decode(encrypted_data.encode('ascii'))
# Decrypt
decrypted_bytes = self.cipher_suite.decrypt(encrypted_bytes)
# Return string
return decrypted_bytes.decode('utf-8')
except Exception as e:
raise DecryptionError(f"Failed to decrypt data: {str(e)}")
class SecureStorage:
"""Secure storage for sensitive configuration data."""
def __init__(self, encryption: DataEncryption):
self.encryption = encryption
self.sensitive_fields = {
"password", "secret", "key", "token", "credential"
}
async def store_configuration(self, config: dict) -> dict:
"""Store configuration with sensitive data encrypted."""
encrypted_config = {}
for key, value in config.items():
if self.is_sensitive_field(key) and isinstance(value, str):
# Encrypt sensitive fields
encrypted_config[key] = self.encryption.encrypt_sensitive_data(value)
encrypted_config[f"{key}_encrypted"] = True
else:
encrypted_config[key] = value
return encrypted_config
async def load_configuration(self, encrypted_config: dict) -> dict:
"""Load configuration with sensitive data decrypted."""
decrypted_config = {}
for key, value in encrypted_config.items():
if key.endswith("_encrypted"):
continue # Skip encryption flags
if encrypted_config.get(f"{key}_encrypted", False):
# Decrypt sensitive fields
decrypted_config[key] = self.encryption.decrypt_sensitive_data(value)
else:
decrypted_config[key] = value
return decrypted_config
def is_sensitive_field(self, field_name: str) -> bool:
"""Check if field contains sensitive data."""
field_lower = field_name.lower()
return any(sensitive in field_lower for sensitive in self.sensitive_fields)
Data Masking and Anonymization
class DataMasking:
"""Data masking for logs and exports."""
def __init__(self, config: MaskingConfig):
self.config = config
self.masking_rules = self.load_masking_rules()
def load_masking_rules(self) -> List[MaskingRule]:
"""Load data masking rules."""
return [
MaskingRule(
field_pattern=r".*password.*",
mask_type="replace",
replacement="[MASKED]"
),
MaskingRule(
field_pattern=r".*email.*",
mask_type="partial",
visible_chars=3,
mask_char="*"
),
MaskingRule(
field_pattern=r".*phone.*",
mask_type="format",
format_pattern="XXX-XXX-{last4}"
),
MaskingRule(
field_pattern=r".*credit_card.*",
mask_type="format",
format_pattern="****-****-****-{last4}"
)
]
def mask_data(self, data: dict) -> dict:
"""Apply masking rules to data."""
masked_data = {}
for key, value in data.items():
if isinstance(value, dict):
# Recursively mask nested objects
masked_data[key] = self.mask_data(value)
elif isinstance(value, list):
# Mask list items
masked_data[key] = [
self.mask_data(item) if isinstance(item, dict) else self.mask_value(key, item)
for item in value
]
else:
# Apply masking rules
masked_data[key] = self.mask_value(key, value)
return masked_data
def mask_value(self, field_name: str, value: Any) -> Any:
"""Apply masking to a single value."""
if not isinstance(value, str):
return value
for rule in self.masking_rules:
if re.match(rule.field_pattern, field_name, re.IGNORECASE):
return self.apply_masking_rule(rule, value)
return value
def apply_masking_rule(self, rule: MaskingRule, value: str) -> str:
"""Apply specific masking rule."""
if rule.mask_type == "replace":
return rule.replacement
elif rule.mask_type == "partial":
if len(value) <= rule.visible_chars:
return rule.mask_char * len(value)
visible_part = value[:rule.visible_chars]
masked_part = rule.mask_char * (len(value) - rule.visible_chars)
return visible_part + masked_part
elif rule.mask_type == "format":
if "{last4}" in rule.format_pattern:
last4 = value[-4:] if len(value) >= 4 else value
return rule.format_pattern.replace("{last4}", last4)
return value
class LogSanitizer:
"""Sanitizes logs to remove sensitive information."""
def __init__(self, data_masking: DataMasking):
self.data_masking = data_masking
self.sensitive_patterns = [
(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '[EMAIL]'), # Email
(r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b', '[CARD]'), # Credit card
(r'\b\d{3}-\d{2}-\d{4}\b', '[SSN]'), # SSN
(r'\b(?:\d{1,3}\.){3}\d{1,3}\b', '[IP]'), # IP address
]
def sanitize_log_entry(self, log_entry: LogEntry) -> LogEntry:
"""Sanitize log entry."""
sanitized_entry = log_entry.copy()
# Sanitize request headers
if sanitized_entry.headers:
sanitized_entry.headers = self.data_masking.mask_data(
json.loads(sanitized_entry.headers)
)
# Sanitize request body
if sanitized_entry.request_body:
sanitized_entry.request_body = self.sanitize_text(sanitized_entry.request_body)
# Sanitize response body
if sanitized_entry.response_body:
sanitized_entry.response_body = self.sanitize_text(sanitized_entry.response_body)
return sanitized_entry
def sanitize_text(self, text: str) -> str:
"""Sanitize text content."""
sanitized = text
for pattern, replacement in self.sensitive_patterns:
sanitized = re.sub(pattern, replacement, sanitized)
return sanitized
Network Security
TLS/SSL Configuration
class TLSConfiguration:
"""TLS/SSL configuration for secure communications."""
def __init__(self, config: TLSConfig):
self.config = config
self.ssl_context = self.create_ssl_context()
def create_ssl_context(self) -> ssl.SSLContext:
"""Create SSL context with secure configuration."""
# Create SSL context
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
# Load certificate and private key
context.load_cert_chain(
certfile=self.config.cert_file,
keyfile=self.config.key_file
)
# Configure security options
context.minimum_version = ssl.TLSVersion.TLSv1_2
context.maximum_version = ssl.TLSVersion.TLSv1_3
# Disable weak ciphers
context.set_ciphers('ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20:!aNULL:!MD5:!DSS')
# Configure certificate verification
if self.config.verify_mode == "required":
context.verify_mode = ssl.CERT_REQUIRED
elif self.config.verify_mode == "optional":
context.verify_mode = ssl.CERT_OPTIONAL
else:
context.verify_mode = ssl.CERT_NONE
# Load CA certificates if provided
if self.config.ca_file:
context.load_verify_locations(cafile=self.config.ca_file)
return context
def create_secure_server(self, app, host: str, port: int) -> None:
"""Create secure HTTPS server."""
import uvicorn
uvicorn.run(
app,
host=host,
port=port,
ssl_keyfile=self.config.key_file,
ssl_certfile=self.config.cert_file,
ssl_ca_certs=self.config.ca_file,
ssl_version=ssl.PROTOCOL_TLS_SERVER,
ssl_cert_reqs=ssl.CERT_NONE if self.config.verify_mode == "none" else ssl.CERT_REQUIRED
)
class SecurityHeaders:
"""Security headers middleware."""
def __init__(self, config: SecurityHeadersConfig):
self.config = config
async def __call__(self, request: Request, call_next):
response = await call_next(request)
# Add security headers
self.add_security_headers(response)
return response
def add_security_headers(self, response: Response) -> None:
"""Add security headers to response."""
# Strict Transport Security
if self.config.hsts_enabled:
response.headers["Strict-Transport-Security"] = (
f"max-age={self.config.hsts_max_age}; "
f"includeSubDomains; preload"
)
# Content Security Policy
if self.config.csp_policy:
response.headers["Content-Security-Policy"] = self.config.csp_policy
# X-Frame-Options
response.headers["X-Frame-Options"] = "DENY"
# X-Content-Type-Options
response.headers["X-Content-Type-Options"] = "nosniff"
# X-XSS-Protection
response.headers["X-XSS-Protection"] = "1; mode=block"
# Referrer Policy
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
# Permissions Policy
response.headers["Permissions-Policy"] = (
"geolocation=(), microphone=(), camera=()"
)
Rate Limiting and DDoS Protection
```python class AdvancedRateLimiter: """Advanced rate limiting with DDoS protection."""
def __init__(self, config: RateLimitConfig):
self.config = config
self.redis_client = redis.Redis(
host=config.redis_host,
port=config.redis_port,
db=config.redis_db
)
self.suspicious_ips = set()
async def check_rate_limit(self, client_ip: str, endpoint: str) -> RateLimitResult:
"""Check rate limit for client and endpoint."""
# Check if IP is suspicious
if client_ip in self.suspicious_ips:
return RateLimitResult(False, "IP blocked due to suspicious activity")
# Get rate limit rules for endpoint
rules = self.get_rate_limit_rules(endpoint)
for rule in rules:
if not await self