🏆 Day 30⏱ 90 min🎓 Capstone🚀 Ascend

Capstone:
Enterprise MCP Platform

You've learned 29 days of MCP. Today, combine everything into a single production-grade enterprise platform: multi-server architecture, API key + OAuth auth, Redis rate limiting, AWS service integrations, Bedrock AI routing, SSE streaming, context management, monitoring, and IaC deployment.

This is the final test. A junior engineer can get one MCP server running locally. A senior engineer ships an enterprise platform that handles 10,000 requests per hour, authenticates every tool call, integrates five AWS services, streams results in real time, monitors its own health, and deploys with a single CDK command. That's you, today.
📋 Today's topics
🏗️ Architecture

Full Architecture Blueprint

The enterprise MCP platform is a fleet of specialised servers behind a secure, observable gateway. Each server is responsible for one domain. The router aggregates them and presents a unified tool surface to Claude and custom MCP clients.

Platform Architecture
Claude / Client
API Gateway + WAF
Auth + Rate Limit
Verified requests only
MCP Router (ECS)
S3/DynamoDB Server
Bedrock AI Server
Lambda/SQS Server
Secrets/Config Server
CloudWatch Logs
+
X-Ray Traces
+
ElastiCache Redis
ComponentTechnologyPurpose
API GatewayAWS API GW + WAFTLS termination, DDoS protection, throttling
Auth layerAPI key + Cognito JWTAuthenticate every tool call
Rate limiterElastiCache RedisSliding-window 1,000 req/min per client
RouterFastMCP + ECS FargateAggregate servers, namespace tools
ServersFastMCP (Python)Domain-specific tools (4 servers)
AI serverBedrock Converse APISpecialists, KB search, caching
ObservabilityCloudWatch + X-RayLatency, errors, tool usage metrics
DeploymentAWS CDK (Python)Infra-as-code; single cdk deploy
⚙️ Bootstrap

Platform Bootstrap & Config

Every server loads its configuration from AWS Secrets Manager at startup. A shared platform_config.py module resolves secrets and exposes typed settings — no environment variable spaghetti, no hardcoded credentials.

# platform_config.py — shared across all servers
import boto3, json, time
from dataclasses import dataclass
from functools import lru_cache

@dataclass
class PlatformConfig:
    aws_region: str
    bedrock_model: str
    kb_id: str
    dynamo_table: str
    redis_host: str
    allowed_origins: list[str]
    rate_limit_rpm: int
    log_level: str

@lru_cache(maxsize=1)
def load_config() -> PlatformConfig:
    sm = boto3.client("secretsmanager")
    raw = sm.get_secret_value(SecretId="mcp-platform/config")
    cfg = json.loads(raw["SecretString"])
    return PlatformConfig(**cfg)

# Each server imports this at top:
# from platform_config import load_config
# cfg = load_config()
# router.py — the MCP router (aggregates all servers)
from fastmcp import FastMCP, Client
from fastmcp.client.transports import SSETransport
from platform_config import load_config
import asyncio

cfg = load_config()
router = FastMCP("enterprise-router")

SERVERS = {
    "data":    SSETransport("http://data-server:8080/sse"),
    "ai":      SSETransport("http://ai-server:8081/sse"),
    "ops":     SSETransport("http://ops-server:8082/sse"),
    "secrets": SSETransport("http://secrets-server:8083/sse"),
}

async def register_all():
    for ns, transport in SERVERS.items():
        async with Client(transport) as client:
            tools = await client.list_tools()
            for tool in tools:
                def make_proxy(t=tool, tr=transport):
                    async def proxy(**kwargs):
                        async with Client(tr) as c:
                            return await c.call_tool(t.name, kwargs)
                    proxy.__name__ = f"{ns}__{t.name}"
                    proxy.__doc__ = t.description
                    return proxy
                router.add_tool(make_proxy(), name=f"{ns}__{tool.name}")

if __name__ == "__main__":
    asyncio.run(register_all())
    router.run(transport="sse", host="0.0.0.0", port=8080)
🔐 Auth + Rate Limit

Auth + Rate Limiting Layer

All requests pass through an auth + rate limiting middleware before reaching any MCP tool. This is a FastAPI gateway that wraps the MCP router — validating API keys, checking Redis for rate limits, and forwarding clean requests.

# gateway.py — FastAPI wrapper around the MCP router
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import StreamingResponse
import redis.asyncio as aioredis
import httpx, time, hmac, hashlib, json
from platform_config import load_config

cfg = load_config()
app = FastAPI()
redis = aioredis.from_url(f"redis://{cfg.redis_host}")

VALID_KEYS = set()  # loaded from Secrets Manager on startup

async def check_auth(request: Request) -> str:
    key = request.headers.get("X-API-Key", "")
    hashed = hashlib.sha256(key.encode()).hexdigest()
    if hashed not in VALID_KEYS:
        raise HTTPException(401, "Invalid API key")
    return hashed  # return key hash as client ID

async def check_rate_limit(client_id: str):
    key = f"rl:{client_id}"
    now_ms = int(time.time() * 1000)
    window_ms = 60_000
    limit = cfg.rate_limit_rpm

    lua = """
    local key,now,window,limit = KEYS[1],tonumber(ARGV[1]),tonumber(ARGV[2]),tonumber(ARGV[3])
    redis.call('ZREMRANGEBYSCORE',key,0,now-window)
    local count = redis.call('ZCARD',key)
    if count >= limit then return 0 end
    redis.call('ZADD',key,now,now)
    redis.call('PEXPIRE',key,window)
    return 1
    """
    allowed = await redis.eval(lua, 1, key, now_ms, window_ms, limit)
    if not allowed:
        raise HTTPException(429, "Rate limit exceeded")

@app.post("/mcp/{path:path}")
async def proxy(path: str, request: Request):
    client_id = await check_auth(request)
    await check_rate_limit(client_id)
    body = await request.body()
    # Forward to internal MCP router
    async with httpx.AsyncClient() as client:
        resp = await client.post(
            f"http://router:8080/{path}",
            content=body,
            headers={"Content-Type": "application/json"}
        )
    return StreamingResponse(iter([resp.content]), media_type=resp.headers["content-type"])
☁️ AWS Pack

AWS Service Integration Pack

The data server bundles all AWS service tools into a single deployable unit. Each tool enforces least-privilege IAM and uses IAM role credentials from the ECS task definition — zero hardcoded keys.

# data_server.py — S3 + DynamoDB + Bedrock KB
from fastmcp import FastMCP
import boto3, json
from platform_config import load_config
from pydantic import BaseModel, constr
from typing import Optional

cfg = load_config()
mcp = FastMCP("data-server")
s3  = boto3.client("s3")
ddb = boto3.resource("dynamodb")
br  = boto3.client("bedrock-agent-runtime")
table = ddb.Table(cfg.dynamo_table)

@mcp.tool()
async def s3_get(bucket: str, key: str) -> str:
    """Read an S3 object (max 50KB returned)."""
    obj = s3.get_object(Bucket=bucket, Key=key)
    body = obj["Body"].read(51200)
    try: return body.decode()
    except: return f"[Binary: {len(body)} bytes]"

@mcp.tool()
async def dynamo_get(pk: str, sk: str) -> str:
    """Retrieve a DynamoDB item by primary key."""
    resp = table.get_item(Key={"pk": pk, "sk": sk})
    item = resp.get("Item")
    return json.dumps(item, default=str) if item else "Not found."

@mcp.tool()
async def kb_search(query: str, top_k: int = 5) -> str:
    """Semantic search over the Bedrock Knowledge Base."""
    resp = br.retrieve(
        knowledgeBaseId=cfg.kb_id,
        retrievalQuery={"text": query},
        retrievalConfiguration={"vectorSearchConfiguration": {"numberOfResults": top_k}}
    )
    out = []
    for r in resp["retrievalResults"]:
        out.append(f"[{round(r['score'],3)}] {r['content']['text'][:400]}")
    return "\n\n".join(out)

if __name__ == "__main__":
    mcp.run(transport="sse", host="0.0.0.0", port=8080)
📊 Monitoring

Monitoring & Observability

Production MCP platforms need three observability pillars: structured logs for debugging, custom CloudWatch metrics for dashboards and alarms, and X-Ray traces for request flow across services.

# observability.py — shared middleware for all servers
import boto3, json, time, logging, uuid
from functools import wraps

cw = boto3.client("cloudwatch")
logger = logging.getLogger("mcp-platform")
logging.basicConfig(format='%(message)s', level=logging.INFO)

def instrument(server_name: str):
    """Decorator: logs every tool call with latency + emits CloudWatch metrics."""
    def decorator(fn):
        @wraps(fn)
        async def wrapper(*args, **kwargs):
            rid = str(uuid.uuid4())[:8]
            t0 = time.time()
            status = "success"
            try:
                result = await fn(*args, **kwargs)
                return result
            except Exception as e:
                status = "error"
                logger.error(json.dumps({
                    "rid": rid, "server": server_name,
                    "tool": fn.__name__, "error": str(e)
                }))
                raise
            finally:
                ms = (time.time() - t0) * 1000
                logger.info(json.dumps({
                    "rid": rid, "server": server_name,
                    "tool": fn.__name__, "latency_ms": round(ms, 2),
                    "status": status
                }))
                # Emit custom metric
                cw.put_metric_data(
                    Namespace="MCPPlatform",
                    MetricData=[
                        {"MetricName":"ToolLatency","Value":ms,"Unit":"Milliseconds",
                         "Dimensions":[{"Name":"Server","Value":server_name},{"Name":"Tool","Value":fn.__name__}]},
                        {"MetricName":"ToolInvocations","Value":1,"Unit":"Count",
                         "Dimensions":[{"Name":"Status","Value":status}]},
                    ]
                )
        return wrapper
    return decorator

# Usage: @instrument("data-server") on every @mcp.tool()

📈 Key Metrics

ToolLatency P99, ToolInvocations/min, ErrorRate, RateLimitHits, CacheHitRate. Set CloudWatch alarms on P99 > 3s and ErrorRate > 1%.

🔍 X-Ray Tracing

Add aws_xray_sdk to ECS tasks. Traces flow: API GW → Gateway → Router → Server → AWS Service. Visualise latency in the X-Ray Service Map.

🚀 CDK Deploy

CDK Deployment Stack

The entire platform deploys with AWS CDK. One stack creates the VPC, ECS cluster, four Fargate services, ElastiCache Redis, API Gateway, and all IAM roles. One command deploys everything: cdk deploy MCPPlatformStack.

# cdk_stack.py — abbreviated key resources
from aws_cdk import (
    Stack, aws_ecs as ecs, aws_ec2 as ec2,
    aws_elasticache as cache, aws_iam as iam,
    aws_apigateway as apigw, aws_logs as logs
)
from constructs import Construct

class MCPPlatformStack(Stack):
    def __init__(self, scope, id, **kwargs):
        super().__init__(scope, id, **kwargs)

        # VPC
        vpc = ec2.Vpc(self, "PlatformVPC", max_azs=2)

        # ECS Cluster
        cluster = ecs.Cluster(self, "MCPCluster", vpc=vpc, container_insights=True)

        # Task IAM role with least-privilege policies
        task_role = iam.Role(self, "MCPTaskRole",
            assumed_by=iam.ServicePrincipal("ecs-tasks.amazonaws.com"))
        task_role.add_managed_policy(iam.ManagedPolicy.from_aws_managed_policy_name(
            "AmazonBedrockFullAccess"))
        task_role.add_to_policy(iam.PolicyStatement(
            actions=["s3:GetObject","dynamodb:GetItem","dynamodb:PutItem",
                     "secretsmanager:GetSecretValue","cloudwatch:PutMetricData"],
            resources=["*"]
        ))

        # One Fargate service per server (router + 4 domain servers)
        for name, port in [("router",8080),("data",8080),("ai",8081),
                             ("ops",8082),("secrets",8083)]:
            td = ecs.FargateTaskDefinition(self, f"TD-{name}",
                task_role=task_role, cpu=256, memory_limit_mib=512)
            td.add_container(f"mcp-{name}",
                image=ecs.ContainerImage.from_registry(f"123456789.dkr.ecr.us-east-1.amazonaws.com/mcp-{name}:latest"),
                port_mappings=[ecs.PortMapping(container_port=port)],
                logging=ecs.LogDrivers.aws_logs(
                    stream_prefix=f"mcp-{name}",
                    log_group=logs.LogGroup(self, f"LG-{name}",
                        retention=logs.RetentionDays.ONE_WEEK)
                )
            )
            ecs.FargateService(self, f"SVC-{name}",
                cluster=cluster, task_definition=td,
                desired_count=2)  # 2 replicas per service
🚀
Deploy Commands

cdk bootstrapdocker build & push × 5 images → cdk deploy MCPPlatformStack. The stack outputs the API Gateway URL — paste it into your Claude Code mcp_servers.json.

🏆 Recap

30-Day Mastery Recap

You've completed the full MCP curriculum. Here's every skill you've built — each one a building block of the enterprise platform you just deployed.

PhaseDaysSkills Mastered
🌱 Spark1–10MCP fundamentals, FastMCP setup, tools/resources/prompts, stdio transport, Claude Code integration, file tools, web tools, database tools, async patterns
🔥 Forge11–20Testing/debugging, performance optimisation, error handling, production packaging, monitoring, Docker, composing servers, protocol deep-dive, versioning, SSE streaming
⚡ Ascend21–30AWS integrations (S3/DDB/Lambda), Bedrock AI server, auth (API key + OAuth + RBAC), rate limiting (Redis), security & input validation, multi-server orchestration, MCP clients, context engineering, capstone platform

🏆 Final Assessment

QUESTION 1 OF 3
In the enterprise platform, why does the gateway hash API keys before storing them in the VALID_KEYS set?
AHashing makes keys shorter and faster to compare
BSo that even if memory is leaked, raw API keys are not exposed
CSHA-256 is required by the MCP specification
DHashed keys can be rotated without restarting the service
Security best practice: never store raw secrets in memory. If a memory dump or heap snapshot leaks, the attacker gets SHA-256 hashes, not the original API keys — which are useless without the pre-image.
QUESTION 2 OF 3
The CDK stack sets desired_count=2 for each Fargate service. What does this achieve?
ARuns exactly 2 tasks globally to save cost
BEnsures high availability — if one task fails, the other continues serving requests
CECS requires a minimum of 2 tasks per service
DDoubles throughput by running the same tool twice in parallel
Two replicas across multiple AZs means the service survives a single task failure or AZ disruption. ECS replaces failed tasks automatically, maintaining the desired count of 2.
QUESTION 3 OF 3
What is the role of the @lru_cache(maxsize=1) on load_config()?
ALimits the number of config objects to one per CPU core
BEnsures Secrets Manager is called only once per container lifetime, caching the config in memory
CEncrypts the config in memory using LRU encryption
DPrevents concurrent threads from reading the config simultaneously
Secrets Manager has API call limits and costs money per call. lru_cache(maxsize=1) memoises the result — the first call fetches from Secrets Manager, every subsequent call returns the cached PlatformConfig object instantly.

🎉 Congratulations!

You've completed 30 Days of Mastering MCP — from running your first tool to deploying a production enterprise platform on AWS. You're now ready to build, secure, and scale agentic AI systems that power real-world workflows.

🏠 Return to Hub 🔁 Start Again → Mastering Claude