Claude MCP 集成指南:模型上下文协议深度实践
深入了解 Claude MCP(Model Context Protocol)的核心概念和实践应用,掌握多模型协作和上下文共享的先进技术。
MCP 协议概述
Model Context Protocol (MCP) 是 Anthropic 开发的开放标准,旨在实现 AI 应用与外部数据源和工具的安全连接。MCP 使得 Claude 能够访问实时信息、执行复杂操作,并与各种系统无缝集成。
🔗 统一连接
标准化的接口协议,简化 AI 应用与外部系统的集成
- 数据库连接
- API 集成
- 文件系统访问
- 云服务对接
🛡️ 安全可控
内置安全机制,确保数据访问的安全性和可控性
- 权限管理
- 访问控制
- 数据加密
- 审计日志
⚡ 高效协作
优化的上下文共享机制,提升多模型协作效率
- 上下文缓存
- 智能路由
- 负载均衡
- 故障恢复
MCP 架构与核心组件
系统架构
🖥️ MCP Client
Claude Desktop, IDE 插件等
↕️
📡 MCP Protocol
标准化通信协议
↕️
🔧 MCP Server
数据源和工具连接器
核心概念
📋 Resources
MCP 服务器提供的数据资源,如文件、数据库记录、API 响应等
{
"uri": "file:///path/to/document.txt",
"name": "Project Documentation",
"description": "Main project documentation file",
"mimeType": "text/plain"
}
🛠️ Tools
Claude 可以调用的功能工具,如数据查询、文件操作、API 调用等
{
"name": "search_database",
"description": "Search the customer database",
"inputSchema": {
"type": "object",
"properties": {
"query": {"type": "string"},
"limit": {"type": "number", "default": 10}
}
}
}
💬 Prompts
预定义的提示模板,用于标准化常见的交互模式
{
"name": "analyze_data",
"description": "Analyze dataset with specific focus",
"arguments": [
{
"name": "dataset",
"description": "Path to the dataset",
"required": true
},
{
"name": "focus_area",
"description": "Analysis focus area",
"required": false
}
]
}
MCP 服务器开发实战
基础服务器实现
# mcp_server.py
import asyncio
import json
from typing import Any, Dict, List
from mcp import Server, types
from mcp.server.models import InitializationOptions
class CustomMCPServer:
def __init__(self):
self.server = Server("custom-mcp-server")
self.setup_handlers()
def setup_handlers(self):
"""设置 MCP 处理器"""
@self.server.list_resources()
async def handle_list_resources() -> List[types.Resource]:
"""列出可用资源"""
return [
types.Resource(
uri="custom://database/users",
name="User Database",
description="Customer user database",
mimeType="application/json"
),
types.Resource(
uri="custom://files/logs",
name="Application Logs",
description="System application logs",
mimeType="text/plain"
)
]
@self.server.read_resource()
async def handle_read_resource(uri: str) -> str:
"""读取指定资源"""
if uri == "custom://database/users":
# 模拟数据库查询
users = await self.query_user_database()
return json.dumps(users, indent=2)
elif uri == "custom://files/logs":
# 读取日志文件
return await self.read_log_files()
else:
raise ValueError(f"Unknown resource: {uri}")
@self.server.list_tools()
async def handle_list_tools() -> List[types.Tool]:
"""列出可用工具"""
return [
types.Tool(
name="search_users",
description="Search users in the database",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query"
},
"limit": {
"type": "number",
"description": "Maximum results",
"default": 10
}
},
"required": ["query"]
}
),
types.Tool(
name="create_user",
description="Create a new user",
inputSchema={
"type": "object",
"properties": {
"name": {"type": "string"},
"email": {"type": "string"},
"role": {"type": "string", "default": "user"}
},
"required": ["name", "email"]
}
)
]
@self.server.call_tool()
async def handle_call_tool(name: str, arguments: Dict[str, Any]) -> List[types.TextContent]:
"""执行工具调用"""
if name == "search_users":
results = await self.search_users(
arguments["query"],
arguments.get("limit", 10)
)
return [types.TextContent(
type="text",
text=json.dumps(results, indent=2)
)]
elif name == "create_user":
user = await self.create_user(
arguments["name"],
arguments["email"],
arguments.get("role", "user")
)
return [types.TextContent(
type="text",
text=f"Created user: {json.dumps(user, indent=2)}"
)]
else:
raise ValueError(f"Unknown tool: {name}")
async def query_user_database(self) -> List[Dict]:
"""查询用户数据库"""
# 模拟数据库查询
return [
{"id": 1, "name": "Alice", "email": "alice@example.com", "role": "admin"},
{"id": 2, "name": "Bob", "email": "bob@example.com", "role": "user"},
{"id": 3, "name": "Charlie", "email": "charlie@example.com", "role": "user"}
]
async def search_users(self, query: str, limit: int) -> List[Dict]:
"""搜索用户"""
all_users = await self.query_user_database()
filtered_users = [
user for user in all_users
if query.lower() in user["name"].lower() or query.lower() in user["email"].lower()
]
return filtered_users[:limit]
async def create_user(self, name: str, email: str, role: str) -> Dict:
"""创建新用户"""
# 模拟用户创建
new_user = {
"id": len(await self.query_user_database()) + 1,
"name": name,
"email": email,
"role": role
}
# 这里应该是实际的数据库插入操作
return new_user
async def read_log_files(self) -> str:
"""读取日志文件"""
# 模拟日志读取
return """
2024-01-15 10:30:00 INFO: Application started
2024-01-15 10:30:05 INFO: Database connection established
2024-01-15 10:31:00 INFO: User login: alice@example.com
2024-01-15 10:32:15 WARN: High memory usage detected
2024-01-15 10:33:00 INFO: User logout: alice@example.com
""".strip()
async def run(self, transport_type: str = "stdio"):
"""运行 MCP 服务器"""
if transport_type == "stdio":
from mcp.server.stdio import stdio_server
async with stdio_server() as (read_stream, write_stream):
await self.server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="custom-mcp-server",
server_version="1.0.0",
capabilities=self.server.get_capabilities()
)
)
# 启动服务器
if __name__ == "__main__":
server = CustomMCPServer()
asyncio.run(server.run())
高级功能实现
# advanced_mcp_server.py
import asyncio
import aiohttp
import sqlite3
from datetime import datetime
from typing import Optional
class AdvancedMCPServer(CustomMCPServer):
def __init__(self, db_path: str = "mcp_data.db"):
super().__init__()
self.db_path = db_path
self.init_database()
self.setup_advanced_handlers()
def init_database(self):
"""初始化数据库"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
role TEXT DEFAULT 'user',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS api_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
endpoint TEXT NOT NULL,
method TEXT NOT NULL,
status_code INTEGER,
response_time REAL,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
conn.commit()
conn.close()
def setup_advanced_handlers(self):
"""设置高级处理器"""
@self.server.list_tools()
async def handle_advanced_tools() -> List[types.Tool]:
"""扩展工具列表"""
basic_tools = await super().handle_list_tools()
advanced_tools = [
types.Tool(
name="fetch_api_data",
description="Fetch data from external API",
inputSchema={
"type": "object",
"properties": {
"url": {"type": "string"},
"method": {"type": "string", "default": "GET"},
"headers": {"type": "object"},
"params": {"type": "object"}
},
"required": ["url"]
}
),
types.Tool(
name="analyze_logs",
description="Analyze API usage logs",
inputSchema={
"type": "object",
"properties": {
"start_date": {"type": "string"},
"end_date": {"type": "string"},
"endpoint_filter": {"type": "string"}
}
}
),
types.Tool(
name="generate_report",
description="Generate usage report",
inputSchema={
"type": "object",
"properties": {
"report_type": {
"type": "string",
"enum": ["user_activity", "api_usage", "error_summary"]
},
"format": {
"type": "string",
"enum": ["json", "csv", "html"],
"default": "json"
}
},
"required": ["report_type"]
}
)
]
return basic_tools + advanced_tools
@self.server.call_tool()
async def handle_advanced_tool_calls(name: str, arguments: Dict[str, Any]) -> List[types.TextContent]:
"""处理高级工具调用"""
if name == "fetch_api_data":
return await self.fetch_api_data(arguments)
elif name == "analyze_logs":
return await self.analyze_logs(arguments)
elif name == "generate_report":
return await self.generate_report(arguments)
else:
# 回退到基础工具处理
return await super().handle_call_tool(name, arguments)
async def fetch_api_data(self, args: Dict[str, Any]) -> List[types.TextContent]:
"""获取外部 API 数据"""
url = args["url"]
method = args.get("method", "GET")
headers = args.get("headers", {})
params = args.get("params", {})
start_time = datetime.now()
try:
async with aiohttp.ClientSession() as session:
async with session.request(
method, url, headers=headers, params=params
) as response:
data = await response.text()
status_code = response.status
# 记录 API 调用日志
response_time = (datetime.now() - start_time).total_seconds()
await self.log_api_call(url, method, status_code, response_time)
return [types.TextContent(
type="text",
text=f"API Response (Status: {status_code}):\n{data}"
)]
except Exception as e:
await self.log_api_call(url, method, 0, 0)
return [types.TextContent(
type="text",
text=f"API call failed: {str(e)}"
)]
async def log_api_call(self, endpoint: str, method: str, status_code: int, response_time: float):
"""记录 API 调用日志"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
INSERT INTO api_logs (endpoint, method, status_code, response_time)
VALUES (?, ?, ?, ?)
""", (endpoint, method, status_code, response_time))
conn.commit()
conn.close()
async def analyze_logs(self, args: Dict[str, Any]) -> List[types.TextContent]:
"""分析 API 使用日志"""
start_date = args.get("start_date")
end_date = args.get("end_date")
endpoint_filter = args.get("endpoint_filter")
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
query = "SELECT * FROM api_logs WHERE 1=1"
params = []
if start_date:
query += " AND timestamp >= ?"
params.append(start_date)
if end_date:
query += " AND timestamp <= ?"
params.append(end_date)
if endpoint_filter:
query += " AND endpoint LIKE ?"
params.append(f"%{endpoint_filter}%")
cursor.execute(query, params)
logs = cursor.fetchall()
# 分析统计
total_calls = len(logs)
success_calls = len([log for log in logs if 200 <= log[3] < 300])
avg_response_time = sum(log[4] for log in logs) / total_calls if total_calls > 0 else 0
analysis = {
"total_calls": total_calls,
"success_rate": success_calls / total_calls if total_calls > 0 else 0,
"average_response_time": avg_response_time,
"recent_logs": logs[-10:] # 最近10条记录
}
conn.close()
return [types.TextContent(
type="text",
text=json.dumps(analysis, indent=2, default=str)
)]
async def generate_report(self, args: Dict[str, Any]) -> List[types.TextContent]:
"""生成使用报告"""
report_type = args["report_type"]
format_type = args.get("format", "json")
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
if report_type == "user_activity":
cursor.execute("SELECT COUNT(*) as total_users FROM users")
user_count = cursor.fetchone()[0]
cursor.execute("""
SELECT role, COUNT(*) as count
FROM users
GROUP BY role
""")
role_distribution = cursor.fetchall()
report_data = {
"total_users": user_count,
"role_distribution": dict(role_distribution),
"generated_at": datetime.now().isoformat()
}
elif report_type == "api_usage":
cursor.execute("""
SELECT
COUNT(*) as total_calls,
AVG(response_time) as avg_response_time,
COUNT(CASE WHEN status_code >= 200 AND status_code < 300 THEN 1 END) as success_calls
FROM api_logs
""")
stats = cursor.fetchone()
report_data = {
"total_api_calls": stats[0],
"average_response_time": stats[1],
"success_calls": stats[2],
"success_rate": stats[2] / stats[0] if stats[0] > 0 else 0,
"generated_at": datetime.now().isoformat()
}
else: # error_summary
cursor.execute("""
SELECT status_code, COUNT(*) as count
FROM api_logs
WHERE status_code >= 400
GROUP BY status_code
ORDER BY count DESC
""")
error_stats = cursor.fetchall()
report_data = {
"error_distribution": dict(error_stats),
"total_errors": sum(count for _, count in error_stats),
"generated_at": datetime.now().isoformat()
}
conn.close()
if format_type == "json":
report_text = json.dumps(report_data, indent=2)
elif format_type == "csv":
# 简化的 CSV 格式
report_text = "key,value\n" + "\n".join(
f"{k},{v}" for k, v in report_data.items()
)
else: # html
report_text = f"""
{report_type.title()} Report
{report_type.title()} Report
{json.dumps(report_data, indent=2)}
"""
return [types.TextContent(
type="text",
text=report_text
)]
客户端集成与配置
Claude Desktop 配置
# ~/.claude_desktop_config.json
{
"mcpServers": {
"custom-server": {
"command": "python",
"args": ["/path/to/mcp_server.py"],
"env": {
"DATABASE_URL": "sqlite:///mcp_data.db",
"LOG_LEVEL": "INFO"
}
},
"filesystem": {
"command": "uvx",
"args": ["mcp-server-filesystem", "/path/to/allowed/directory"],
"env": {}
},
"github": {
"command": "uvx",
"args": ["mcp-server-github"],
"env": {
"GITHUB_PERSONAL_ACCESS_TOKEN": "your_token_here"
}
}
}
}
编程式客户端
# mcp_client.py
import asyncio
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
class MCPClient:
def __init__(self):
self.session = None
async def connect_to_server(self, server_params: StdioServerParameters):
"""连接到 MCP 服务器"""
self.session = await stdio_client(server_params)
# 初始化会话
await self.session.initialize()
print("Connected to MCP server successfully!")
async def list_available_resources(self):
"""列出可用资源"""
if not self.session:
raise RuntimeError("Not connected to server")
resources = await self.session.list_resources()
print("Available resources:")
for resource in resources:
print(f" - {resource.name}: {resource.uri}")
return resources
async def list_available_tools(self):
"""列出可用工具"""
if not self.session:
raise RuntimeError("Not connected to server")
tools = await self.session.list_tools()
print("Available tools:")
for tool in tools:
print(f" - {tool.name}: {tool.description}")
return tools
async def call_tool(self, tool_name: str, arguments: dict):
"""调用工具"""
if not self.session:
raise RuntimeError("Not connected to server")
result = await self.session.call_tool(tool_name, arguments)
return result
async def read_resource(self, uri: str):
"""读取资源"""
if not self.session:
raise RuntimeError("Not connected to server")
content = await self.session.read_resource(uri)
return content
async def close(self):
"""关闭连接"""
if self.session:
await self.session.close()
# 使用示例
async def main():
client = MCPClient()
# 连接到服务器
server_params = StdioServerParameters(
command="python",
args=["mcp_server.py"]
)
await client.connect_to_server(server_params)
try:
# 列出资源和工具
await client.list_available_resources()
await client.list_available_tools()
# 调用工具
search_result = await client.call_tool("search_users", {
"query": "alice",
"limit": 5
})
print("Search result:", search_result)
# 读取资源
user_data = await client.read_resource("custom://database/users")
print("User data:", user_data)
finally:
await client.close()
if __name__ == "__main__":
asyncio.run(main())
安全性与最佳实践
安全配置
🔐 认证与授权
# 安全认证示例
class SecureMCPServer(AdvancedMCPServer):
def __init__(self, api_key: str, allowed_origins: List[str]):
super().__init__()
self.api_key = api_key
self.allowed_origins = allowed_origins
self.setup_security()
def setup_security(self):
@self.server.request_handler()
async def handle_request(request):
# 验证 API 密钥
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
raise PermissionError("Missing or invalid authorization")
token = auth_header[7:] # Remove "Bearer "
if token != self.api_key:
raise PermissionError("Invalid API key")
# 验证来源
origin = request.headers.get("Origin")
if origin and origin not in self.allowed_origins:
raise PermissionError("Origin not allowed")
return await super().handle_request(request)
🛡️ 数据保护
# 数据加密和脱敏
import hashlib
from cryptography.fernet import Fernet
class DataProtectionMixin:
def __init__(self):
self.encryption_key = Fernet.generate_key()
self.cipher = Fernet(self.encryption_key)
def encrypt_sensitive_data(self, data: str) -> str:
"""加密敏感数据"""
return self.cipher.encrypt(data.encode()).decode()
def decrypt_sensitive_data(self, encrypted_data: str) -> str:
"""解密敏感数据"""
return self.cipher.decrypt(encrypted_data.encode()).decode()
def hash_pii(self, pii_data: str) -> str:
"""对个人信息进行哈希处理"""
return hashlib.sha256(pii_data.encode()).hexdigest()
def sanitize_output(self, data: dict) -> dict:
"""清理输出数据,移除敏感信息"""
sensitive_fields = ["password", "ssn", "credit_card"]
sanitized = data.copy()
for field in sensitive_fields:
if field in sanitized:
sanitized[field] = "***REDACTED***"
return sanitized
性能优化
# 性能优化策略
import asyncio
from functools import lru_cache
import aioredis
class OptimizedMCPServer(SecureMCPServer):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.redis_client = None
self.setup_caching()
async def setup_caching(self):
"""设置缓存系统"""
self.redis_client = await aioredis.from_url("redis://localhost")
@lru_cache(maxsize=128)
def get_cached_resource(self, uri: str) -> str:
"""本地缓存资源"""
# 实现本地缓存逻辑
pass
async def get_distributed_cache(self, key: str) -> Optional[str]:
"""分布式缓存获取"""
if self.redis_client:
return await self.redis_client.get(key)
return None
async def set_distributed_cache(self, key: str, value: str, ttl: int = 3600):
"""分布式缓存设置"""
if self.redis_client:
await self.redis_client.setex(key, ttl, value)
async def batch_process_requests(self, requests: List[dict]) -> List[dict]:
"""批量处理请求"""
tasks = []
for request in requests:
task = asyncio.create_task(self.process_single_request(request))
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
async def process_single_request(self, request: dict) -> dict:
"""处理单个请求"""
# 实现具体的请求处理逻辑
pass
监控与调试
日志和监控
# 监控和日志系统
import logging
import time
from prometheus_client import Counter, Histogram, Gauge
from functools import wraps
# Prometheus 指标
mcp_requests_total = Counter('mcp_requests_total', 'Total MCP requests', ['method', 'status'])
mcp_request_duration = Histogram('mcp_request_duration_seconds', 'MCP request duration')
mcp_active_connections = Gauge('mcp_active_connections', 'Active MCP connections')
class MonitoringMCPServer(OptimizedMCPServer):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.setup_logging()
self.active_connections = 0
def setup_logging(self):
"""设置日志系统"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('mcp_server.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def monitor_request(self, method_name: str):
"""请求监控装饰器"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
status = "success"
try:
result = await func(*args, **kwargs)
return result
except Exception as e:
status = "error"
self.logger.error(f"Error in {method_name}: {str(e)}")
raise
finally:
duration = time.time() - start_time
mcp_requests_total.labels(method=method_name, status=status).inc()
mcp_request_duration.observe(duration)
self.logger.info(f"{method_name} completed in {duration:.3f}s with status: {status}")
return wrapper
return decorator
@monitor_request("list_resources")
async def handle_list_resources(self) -> List[types.Resource]:
"""监控的资源列表处理"""
return await super().handle_list_resources()
@monitor_request("call_tool")
async def handle_call_tool(self, name: str, arguments: Dict[str, Any]) -> List[types.TextContent]:
"""监控的工具调用处理"""
return await super().handle_call_tool(name, arguments)
async def handle_connection(self, read_stream, write_stream):
"""处理连接,包含监控"""
self.active_connections += 1
mcp_active_connections.set(self.active_connections)
try:
await super().handle_connection(read_stream, write_stream)
finally:
self.active_connections -= 1
mcp_active_connections.set(self.active_connections)
def get_health_status(self) -> dict:
"""获取健康状态"""
return {
"status": "healthy",
"active_connections": self.active_connections,
"uptime": time.time() - self.start_time,
"version": "1.0.0"
}
调试工具
# 调试和诊断工具
class DebugMCPServer(MonitoringMCPServer):
def __init__(self, *args, debug_mode: bool = False, **kwargs):
super().__init__(*args, **kwargs)
self.debug_mode = debug_mode
self.request_history = []
async def debug_request(self, request_data: dict):
"""调试请求数据"""
if self.debug_mode:
self.request_history.append({
"timestamp": time.time(),
"request": request_data,
"stack_trace": self.get_stack_trace() if self.debug_mode else None
})
# 保持历史记录在合理范围内
if len(self.request_history) > 1000:
self.request_history = self.request_history[-500:]
def get_stack_trace(self) -> str:
"""获取调用栈"""
import traceback
return traceback.format_stack()
def get_debug_info(self) -> dict:
"""获取调试信息"""
return {
"debug_mode": self.debug_mode,
"request_history_count": len(self.request_history),
"recent_requests": self.request_history[-10:] if self.debug_mode else [],
"server_stats": self.get_health_status()
}
async def export_debug_data(self, format_type: str = "json") -> str:
"""导出调试数据"""
debug_data = {
"server_info": self.get_debug_info(),
"request_history": self.request_history,
"exported_at": time.time()
}
if format_type == "json":
return json.dumps(debug_data, indent=2, default=str)
elif format_type == "csv":
# 简化的 CSV 导出
import csv
import io
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(["timestamp", "request_type", "status"])
for req in self.request_history:
writer.writerow([
req["timestamp"],
req["request"].get("method", "unknown"),
"completed"
])
return output.getvalue()
return str(debug_data)
实际应用场景
🏢 企业数据集成
连接企业内部系统,实现数据统一访问
- CRM 系统集成
- ERP 数据访问
- 财务系统对接
- 人力资源管理
🔬 研发工具链
集成开发工具和服务,提升研发效率
- 代码仓库访问
- CI/CD 流水线
- 问题跟踪系统
- 文档管理平台
📊 数据分析平台
连接各种数据源,支持智能分析
- 数据库查询
- API 数据获取
- 文件系统访问
- 实时数据流
总结与展望
MCP 集成关键要点:
- ✅ 标准化的 AI 应用集成协议
- ✅ 安全可控的外部系统访问
- ✅ 灵活的资源和工具管理
- ✅ 高效的上下文共享机制
- ✅ 完善的监控和调试支持
- ✅ 广泛的应用场景和扩展性
技术前景:MCP 协议为 AI 应用的生态系统建设奠定了基础,随着更多工具和服务的支持,将极大地扩展 Claude 的能力边界,实现真正的智能化工作流程。