diff --git a/src/everything/package.json b/src/everything/package.json index bfcbfae4..ffe6b7cd 100644 --- a/src/everything/package.json +++ b/src/everything/package.json @@ -18,10 +18,11 @@ "prepare": "npm run build", "watch": "tsc --watch", "start": "node dist/index.js", - "start:sse": "node dist/sse.js" + "start:sse": "node dist/sse.js", + "start:streamableHttp": "node dist/streamableHttp.js" }, "dependencies": { - "@modelcontextprotocol/sdk": "^1.9.0", + "@modelcontextprotocol/sdk": "^1.10.1", "express": "^4.21.1", "zod": "^3.23.8", "zod-to-json-schema": "^3.23.5" diff --git a/src/everything/streamableHttp.ts b/src/everything/streamableHttp.ts new file mode 100644 index 00000000..4267b2f4 --- /dev/null +++ b/src/everything/streamableHttp.ts @@ -0,0 +1,151 @@ +import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js"; +import { isInitializeRequest } from '@modelcontextprotocol/sdk/types.js'; +import { InMemoryEventStore } from '@modelcontextprotocol/sdk/examples/shared/inMemoryEventStore.js'; +import express, { Request, Response } from "express"; +import { createServer } from "./everything.js"; +import { randomUUID } from 'node:crypto'; + +const app = express(); + +app.use(express.json()); + +const { server, cleanup } = createServer(); + +const transports: { [sessionId: string]: StreamableHTTPServerTransport } = {}; + +app.post('/mcp', async (req: Request, res: Response) => { + console.log('Received MCP request:', req.body); + try { + // Check for existing session ID + const sessionId = req.headers['mcp-session-id'] as string | undefined; + let transport: StreamableHTTPServerTransport; + + if (sessionId && transports[sessionId]) { + // Reuse existing transport + transport = transports[sessionId]; + } else if (!sessionId && isInitializeRequest(req.body)) { + // New initialization request + const eventStore = new InMemoryEventStore(); + transport = new StreamableHTTPServerTransport({ + sessionIdGenerator: () => randomUUID(), + eventStore, // Enable resumability + onsessioninitialized: (sessionId) => { + // Store the transport by session ID when session is initialized + // This avoids race conditions where requests might come in before the session is stored + console.log(`Session initialized with ID: ${sessionId}`); + transports[sessionId] = transport; + } + }); + + // Set up onclose handler to clean up transport when closed + transport.onclose = () => { + const sid = transport.sessionId; + if (sid && transports[sid]) { + console.log(`Transport closed for session ${sid}, removing from transports map`); + delete transports[sid]; + } + }; + + // Connect the transport to the MCP server BEFORE handling the request + // so responses can flow back through the same transport + await server.connect(transport); + + await transport.handleRequest(req, res, req.body); + return; // Already handled + } else { + // Invalid request - no session ID or not initialization request + res.status(400).json({ + jsonrpc: '2.0', + error: { + code: -32000, + message: 'Bad Request: No valid session ID provided', + }, + id: null, + }); + return; + } + + // Handle the request with existing transport - no need to reconnect + // The existing transport is already connected to the server + await transport.handleRequest(req, res, req.body); + } catch (error) { + console.error('Error handling MCP request:', error); + if (!res.headersSent) { + res.status(500).json({ + jsonrpc: '2.0', + error: { + code: -32603, + message: 'Internal server error', + }, + id: null, + }); + } + } +}); + +// Handle GET requests for SSE streams (using built-in support from StreamableHTTP) +app.get('/mcp', async (req: Request, res: Response) => { + const sessionId = req.headers['mcp-session-id'] as string | undefined; + if (!sessionId || !transports[sessionId]) { + res.status(400).send('Invalid or missing session ID'); + return; + } + + // Check for Last-Event-ID header for resumability + const lastEventId = req.headers['last-event-id'] as string | undefined; + if (lastEventId) { + console.log(`Client reconnecting with Last-Event-ID: ${lastEventId}`); + } else { + console.log(`Establishing new SSE stream for session ${sessionId}`); + } + + const transport = transports[sessionId]; + await transport.handleRequest(req, res); +}); + +// Handle DELETE requests for session termination (according to MCP spec) +app.delete('/mcp', async (req: Request, res: Response) => { + const sessionId = req.headers['mcp-session-id'] as string | undefined; + if (!sessionId || !transports[sessionId]) { + res.status(400).send('Invalid or missing session ID'); + return; + } + + console.log(`Received session termination request for session ${sessionId}`); + + try { + const transport = transports[sessionId]; + await transport.handleRequest(req, res); + } catch (error) { + console.error('Error handling session termination:', error); + if (!res.headersSent) { + res.status(500).send('Error processing session termination'); + } + } +}); + +// Start the server +const PORT = process.env.PORT || 3001; +app.listen(PORT, () => { + console.log(`MCP Streamable HTTP Server listening on port ${PORT}`); +}); + +// Handle server shutdown +process.on('SIGINT', async () => { + console.log('Shutting down server...'); + + // Close all active transports to properly clean up resources + for (const sessionId in transports) { + try { + console.log(`Closing transport for session ${sessionId}`); + await transports[sessionId].close(); + delete transports[sessionId]; + } catch (error) { + console.error(`Error closing transport for session ${sessionId}:`, error); + } + } + await cleanup(); + await server.close(); + console.log('Server shutdown complete'); + process.exit(0); +});