Spark MCP (Model Context Protocol) Optimizer
This project implements a Model Context Protocol (MCP) server and client for optimizing Apache Spark code. The system provides intelligent code optimization suggestions and performance analysis through a client-server architecture.
How It Works
Code Optimization Workflow
graph TB
subgraph Input
A[Input PySpark Code] --> |spark_code_input.py| B[run_client.py]
end
subgraph MCP Client
B --> |Async HTTP| C[SparkMCPClient]
C --> |Protocol Handler| D[Tools Interface]
end
subgraph MCP Server
E[run_server.py] --> F[SparkMCPServer]
F --> |Tool Registry| G[optimize_spark_code]
F --> |Tool Registry| H[analyze_performance]
F --> |Protocol Handler| I[Claude AI Integration]
end
subgraph Resources
I --> |Code Analysis| J[Claude AI Model]
J --> |Optimization| K[Optimized Code Generation]
K --> |Validation| L[PySpark Runtime]
end
subgraph Output
M[optimized_spark_code.py]
N[performance_analysis.md]
end
D --> |MCP Request| F
G --> |Generate| M
H --> |Generate| N
classDef client fill:#e1f5fe,stroke:#01579b
classDef server fill:#f3e5f5,stroke:#4a148c
classDef resource fill:#e8f5e9,stroke:#1b5e20
classDef output fill:#fff3e0,stroke:#e65100
class A,B,C,D client
class E,F,G,H,I server
class J,K,L resource
class M,N,O output
Component Details
- Input Layer
* `spark_code_input.py`: Source PySpark code for optimization
* `run_client.py`: Client startup and configuration
- MCP Client Layer
* Tools Interface: Protocol-compliant tool invocation
- MCP Server Layer
* `run_server.py`: Server initialization
* Tool Registry: Optimization and analysis tools
* Protocol Handler: MCP request/response management
- Resource Layer
* Claude AI: Code analysis and optimization
* PySpark Runtime: Code execution and validation
- Output Layer
* `optimized_spark_code.py`: Optimized code
* `performance_analysis.md`: Detailed analysis
This workflow illustrates:
- Input PySpark code submission
- MCP protocol handling and routing
- Claude AI analysis and optimization
- Code transformation and validation
- Performance analysis and reporting
Architecture
This project follows the Model Context Protocol architecture for standardized AI model interactions:
┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ │ │ MCP Server │ │ Resources │
│ MCP Client │ │ (SparkMCPServer)│ │ │
│ (SparkMCPClient) │ │ │ │ ┌──────────────┐ │
│ │ │ ┌─────────┐ │ │ │ Claude AI │ │
│ ┌─────────┐ │ │ │ Tools │ │ <──> │ │ Model │ │
│ │ Tools │ │ │ │Registry │ │ │ └──────────────┘ │
│ │Interface│ │ <──> │ └─────────┘ │ │ │
│ └─────────┘ │ │ ┌─────────┐ │ │ ┌──────────────┐ │
│ │ │ │Protocol │ │ │ │ PySpark │ │
│ │ │ │Handler │ │ │ │ Runtime │ │
│ │ │ └─────────┘ │ │ └──────────────┘ │
└──────────────────┘ └──────────────────┘ └──────────────────┘
│ │ │
│ │ │
v v v
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Available │ │ Registered │ │ External │
│ Tools │ │ Tools │ │ Resources │
├──────────────┤ ├──────────────┤ ├──────────────┤
│optimize_code │ │optimize_code │ │ Claude API │
│analyze_perf │ │analyze_perf │ │ Spark Engine │
└──────────────┘ └──────────────┘ └──────────────┘
Components
- MCP Client
* Provides tool interface for code optimization
* Handles async communication with server
* Manages file I/O for code generation
- MCP Server
* Implements MCP protocol handlers
* Manages tool registry and execution
* Coordinates between client and resources
- Resources
* Claude AI: Provides code optimization intelligence
* PySpark Runtime: Executes and validates optimizations
Protocol Flow
- Client sends optimization request via MCP protocol
- Server validates request and invokes appropriate tool
- Tool utilizes Claude AI for optimization
- Optimized code is returned via MCP response
- Client saves and validates the optimized code
End-to-End Functionality
sequenceDiagram
participant U as User
participant C as MCP Client
participant S as MCP Server
participant AI as Claude AI
participant P as PySpark Runtime
U->>C: Submit Spark Code
C->>S: Send Optimization Request
S->>AI: Analyze Code
AI-->>S: Optimization Suggestions
S->>C: Return Optimized Code
C->>P: Run Original Code
C->>P: Run Optimized Code
P-->>C: Execution Results
C->>C: Generate Analysis
C-->>U: Final Report
- Code Submission
* User places PySpark code in `v1/input/spark_code_input.py`
* Code is read by the MCP client
- Optimization Process
* MCP client connects to server via standardized protocol
* Server forwards code to Claude AI for analysis
* AI suggests optimizations based on best practices
* Server validates and processes suggestions
- Code Generation
* Optimized code saved to `v1/output/optimized_spark_code.py`
* Includes detailed comments explaining optimizations
* Maintains original code structure while improving performance
- Performance Analysis
* Both versions executed in PySpark runtime
* Execution times compared
* Results validated for correctness
* Metrics collected and analyzed
- Results Generation
* Comprehensive analysis in `v1/output/performance_analysis.md`
* Side-by-side execution comparison
* Performance improvement statistics
* Optimization explanations and rationale
Usage
Requirements
- Python 3.8+
- PySpark 3.2.0+
- Anthropic API Key (for Claude AI)
Installation
pip install -r requirements.txt
Quick Start
Add your Spark code to optimize in input/spark_code_input.py
Start the MCP server:
python v1/run_server.py
- Run the client to optimize your code:
python v1/run_client.py
This will generate two files:
output/optimized_spark_example.py
: The optimized Spark code with detailed optimization comments
output/performance_analysis.md
: Comprehensive performance analysis
- Run and compare code versions:
python v1/run_optimized.py
This will:
- Execute both original and optimized code
- Compare execution times and results
- Update the performance analysis with execution metrics
- Show detailed performance improvement statistics
Project Structure
ai-mcp/
├── input/
│ └── spark_code_input.py # Original Spark code to optimize
├── output/
│ ├── optimized_spark_example.py # Generated optimized code
│ └── performance_analysis.md # Detailed performance comparison
├── spark_mcp/
│ ├── client.py # MCP client implementation
│ └── server.py # MCP server implementation
├── run_client.py # Client script to optimize code
├── run_server.py # Server startup script
└── run_optimized.py # Script to run and compare code versions
Why MCP?
The Model Context Protocol (MCP) provides several key advantages for Spark code optimization:
Direct Claude AI Call vs MCP Server
Aspect |
Direct Claude AI Call |
MCP Server |
Integration |
• Custom integration per team • Manual response handling • Duplicate implementations |
• Pre-built client libraries • Automated workflows • Unified interfaces |
Infrastructure |
• No built-in validation • No result persistence • Manual tracking |
• Automatic validation • Result persistence • Version control |
Context |
• Basic code suggestions • No execution context • Limited optimization scope |
• Context-aware optimization • Full execution history • Comprehensive improvements |
Validation |
• Manual testing required • No performance metrics • Uncertain outcomes |
• Automated testing • Performance metrics • Validated results |
Workflow |
• Ad-hoc process • No standardization • Manual intervention needed |
• Structured process • Standard protocols • Automated pipeline |
Key Differences:
1. AI Integration
Approach |
Code Example |
Benefits |
Traditional |
client = anthropic.Client(api_key)
response = client.messages.create(...) |
• Complex setup • Custom error handling • Tight coupling |
MCP |
client = SparkMCPClient()
result = await client.optimize_spark_code(code) |
• Simple interface • Built-in validation • Loose coupling |
2. Tool Management
Approach |
Code Example |
Benefits |
Traditional |
class SparkOptimizer: def register_tool(self, name, func): self.tools[name] = func |
• Manual registration • No validation • Complex maintenance |
MCP |
@register_tool("optimize_spark_code")
async def optimize_spark_code(code: str): |
• Auto-discovery • Type checking • Easy extension |
3. Resource Management
Approach |
Code Example |
Benefits |
Traditional |
def __init__(self): self.claude = init_claude() self.spark = init_spark() |
• Manual orchestration • Manual cleanup • Error-prone |
MCP |
@requires_resources(["claude_ai", "spark"])
async def optimize_spark_code(code: str): |
• Auto-coordination • Lifecycle management • Error handling |
4. Communication Protocol
Approach |
Code Example |
Benefits |
Traditional |
{"type": "request", "payload": {"code": code}} |
• Custom format • Manual validation • Custom debugging |
MCP |
{"method": "tools/call", "params": {"name": "optimize_code"}} |
• Standard format • Auto-validation • Easy debugging |
Features
- Intelligent Code Optimization : Leverages Claude AI to analyze and optimize PySpark code
- Performance Analysis : Provides detailed analysis of performance differences between original and optimized code
- MCP Architecture : Implements the Model Context Protocol for standardized AI model interactions
- Easy Integration : Simple client interface for code optimization requests
- Code Generation : Automatically saves optimized code to separate files
Advanced Usage
You can also use the client programmatically:
from spark_mcp.client import SparkMCPClient
async def main():
# Connect to the MCP server
client = SparkMCPClient()
await client.connect()
# Your Spark code to optimize
spark_code = '''
# Your PySpark code here
'''
# Get optimized code with performance analysis
optimized_code = await client.optimize_spark_code(
code=spark_code,
optimization_level="advanced",
save_to_file=True # Save to output/optimized_spark_example.py
)
# Analyze performance differences
analysis = await client.analyze_performance(
original_code=spark_code,
optimized_code=optimized_code,
save_to_file=True # Save to output/performance_analysis.md
)
# Run both versions and compare
# You can use the run_optimized.py script or implement your own comparison
await client.close()
# Analyze performance
performance = await client.analyze_performance(spark_code, optimized_code)
await client.close()
Example Input and Output
The repository includes an example workflow:
- Input Code (
input/spark_code_input.py
):
# Create DataFrames and join
emp_df = spark.createDataFrame(employees, ["id", "name", "age", "dept", "salary"])
dept_df = spark.createDataFrame(departments, ["dept", "location", "budget"])
# Join and analyze
result = emp_df.join(dept_df, "dept") \
.groupBy("dept", "location") \
.agg({"salary": "avg", "age": "avg", "id": "count"}) \
.orderBy("dept")
- Optimized Code (
output/optimized_spark_example.py
):
# Performance-optimized version with caching and improved configurations
spark = SparkSession.builder \
.appName("EmployeeAnalysis") \
.config("spark.sql.shuffle.partitions", 200) \
.getOrCreate()
# Create and cache DataFrames
emp_df = spark.createDataFrame(employees, ["id", "name", "age", "dept", "salary"]).cache()
dept_df = spark.createDataFrame(departments, ["dept", "location", "budget"]).cache()
# Optimized join and analysis
result = emp_df.join(dept_df, "dept") \
.groupBy("dept", "location") \
.agg(
avg("salary").alias("avg_salary"),
avg("age").alias("avg_age"),
count("id").alias("employee_count")
) \
.orderBy("dept")
- Performance Analysis (
output/performance_analysis.md
):
## Execution Results Comparison
### Timing Comparison
- Original Code: 5.18 seconds
- Optimized Code: 0.65 seconds
- Performance Improvement: 87.4%
### Optimization Details
- Caching frequently used DataFrames
- Optimized shuffle partitions
- Improved column expressions
- Better memory management
Project Structure
ai-mcp/
├── spark_mcp/
│ ├── __init__.py
│ ├── client.py # MCP client implementation
│ └── server.py # MCP server implementation
├── examples/
│ ├── optimize_code.py # Example usage
│ └── optimized_spark_example.py # Generated optimized code
├── requirements.txt
└── run_server.py # Server startup script
Available Tools
- optimize_spark_code
* Optimizes PySpark code for better performance
* Supports basic and advanced optimization levels
* Automatically saves optimized code to examples/optimized_spark_example.py
- analyze_performance
* Analyzes performance differences between original and optimized code
* Provides insights on:
* Performance improvements
* Resource utilization
* Scalability considerations
* Potential trade-offs
Environment Variables
ANTHROPIC_API_KEY
: Your Anthropic API key for Claude AI
Example Optimizations
The system implements various PySpark optimizations including:
- Broadcast joins for small-large table joins
- Efficient window function usage
- Strategic data caching
- Query plan optimizations
- Performance-oriented operation ordering
Contributing
Feel free to submit issues and enhancement requests!
License
MIT License