Files
servers-modelcontextprotocol-1/src/everything/transports/streamableHttp.ts
cliffhall 346c29a086 [WIP] Refactor everything server to be more modular and use recommended APIs.
Added tools to toggle simulated logging and resource updates on and off rather than have them start immediately upon connection

* Updated architecture.md

* In server/index.ts
  - remove import of beginSimulatedResourceUpdates and beginSimulatedLogging
  - remove clientConnected from createServer factory result

* In tools/index.ts
  - import registerToggleLoggingTool and registerToggleSubscriberUpdatesTool
  - in registerTools
    - call registerToggleLoggingTool and registerToggleSubscriberUpdatesTool

* In logging.ts
  - in beginSimulatedLogging
    - refactor extract inline interval callback into function sendSimulatedLoggingMessage
  - call sendSimulatedLoggingMessage right away to send the first message
  - supply sendSimulatedLoggingMessage as interval callback

* In subscriptions.ts
  - remove import of Transport
  - remove transports map
  - in beginSimulatedResourceUpdates()
    - change arguments to server and sessionId
    - check for the subsUpdateInterval for the session
    - remove all transport storage and interaction
    - instead use the server to send the notification
 - in stopSimulatedResourceUpdates()
   - remove management of transports map

* In stdio.ts, sse.ts, and streamableHttp.ts
  - remove destructure and calling of clientConnected

* Added tools/toggle-logging.ts
  - registers a tool that
    - takes no arguments
    - tracks clients that have been enabled by session id in a set
    - if client isn't enabled,
       - calls beginSimulatedLogging
       - adds session id to client set
    - else
      - calls stopSimulatedLogging
      - deletes session id from client set
    - returns a message explaining what was done including what to expect when logging is enabled

 * Added tools/toggle-subscriber-updates.ts
   - registers a tool that
     - takes no arguments
     - tracks clients that have been enabled by session id in a set
     - if client isn't enabled,
        - calls beginSimulatedResourceUpdates
        - adds session id to client set
     - else
       - calls stopSimulatedResourceUpdates
       - deletes session id from client set
     - returns a message explaining what was done including what to expect when logging is enabled
2025-12-08 17:13:42 -05:00

208 lines
6.4 KiB
TypeScript

import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import { InMemoryEventStore } from "@modelcontextprotocol/sdk/examples/shared/inMemoryEventStore.js";
import express, { Request, Response } from "express";
import { createServer } from "../server/index.js";
import { randomUUID } from "node:crypto";
import cors from "cors";
console.log("Starting Streamable HTTP server...");
const app = express();
app.use(
cors({
origin: "*", // use "*" with caution in production
methods: "GET,POST,DELETE",
preflightContinue: false,
optionsSuccessStatus: 204,
exposedHeaders: ["mcp-session-id", "last-event-id", "mcp-protocol-version"],
})
); // Enable CORS for all routes so Inspector can connect
const transports: Map<string, StreamableHTTPServerTransport> = new Map<
string,
StreamableHTTPServerTransport
>();
app.post("/mcp", async (req: Request, res: Response) => {
console.log("Received MCP POST request");
try {
// Check for existing session ID
const sessionId = req.headers["mcp-session-id"] as string | undefined;
let transport: StreamableHTTPServerTransport;
if (sessionId && transports.has(sessionId)) {
// Reuse existing transport
transport = transports.get(sessionId)!;
} else if (!sessionId) {
const { server, cleanup } = createServer();
// New initialization request
const eventStore = new InMemoryEventStore();
transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID(),
eventStore, // Enable resumability
onsessioninitialized: (sessionId: string) => {
// Store the transport by session ID when a session is initialized
// This avoids race conditions where requests might come in before the session is stored
console.log(`Session initialized with ID: ${sessionId}`);
transports.set(sessionId, transport);
},
});
// Set up onclose handler to clean up transport when closed
server.server.onclose = async () => {
const sid = transport.sessionId;
if (sid && transports.has(sid)) {
console.log(
`Transport closed for session ${sid}, removing from transports map`
);
transports.delete(sid);
cleanup(sid);
}
};
// Connect the transport to the MCP server BEFORE handling the request
// so responses can flow back through the same transport
await server.connect(transport);
await transport.handleRequest(req, res);
return;
} else {
// Invalid request - no session ID or not initialization request
res.status(400).json({
jsonrpc: "2.0",
error: {
code: -32000,
message: "Bad Request: No valid session ID provided",
},
id: req?.body?.id,
});
return;
}
// Handle the request with existing transport - no need to reconnect
// The existing transport is already connected to the server
await transport.handleRequest(req, res);
} catch (error) {
console.log("Error handling MCP request:", error);
if (!res.headersSent) {
res.status(500).json({
jsonrpc: "2.0",
error: {
code: -32603,
message: "Internal server error",
},
id: req?.body?.id,
});
return;
}
}
});
// Handle GET requests for SSE streams (using built-in support from StreamableHTTP)
app.get("/mcp", async (req: Request, res: Response) => {
console.log("Received MCP GET request");
const sessionId = req.headers["mcp-session-id"] as string | undefined;
if (!sessionId || !transports.has(sessionId)) {
res.status(400).json({
jsonrpc: "2.0",
error: {
code: -32000,
message: "Bad Request: No valid session ID provided",
},
id: req?.body?.id,
});
return;
}
// Check for Last-Event-ID header for resumability
const lastEventId = req.headers["last-event-id"] as string | undefined;
if (lastEventId) {
console.log(`Client reconnecting with Last-Event-ID: ${lastEventId}`);
} else {
console.log(`Establishing new SSE stream for session ${sessionId}`);
}
const transport = transports.get(sessionId);
await transport!.handleRequest(req, res);
});
// Handle DELETE requests for session termination (according to MCP spec)
app.delete("/mcp", async (req: Request, res: Response) => {
const sessionId = req.headers["mcp-session-id"] as string | undefined;
if (!sessionId || !transports.has(sessionId)) {
res.status(400).json({
jsonrpc: "2.0",
error: {
code: -32000,
message: "Bad Request: No valid session ID provided",
},
id: req?.body?.id,
});
return;
}
console.log(`Received session termination request for session ${sessionId}`);
try {
const transport = transports.get(sessionId);
await transport!.handleRequest(req, res);
} catch (error) {
console.log("Error handling session termination:", error);
if (!res.headersSent) {
res.status(500).json({
jsonrpc: "2.0",
error: {
code: -32603,
message: "Error handling session termination",
},
id: req?.body?.id,
});
return;
}
}
});
// Start the server
const PORT = process.env.PORT || 3001;
const server = app.listen(PORT, () => {
console.error(`MCP Streamable HTTP Server listening on port ${PORT}`);
});
server.on("error", (err: unknown) => {
const code =
typeof err === "object" && err !== null && "code" in err
? (err as { code?: unknown }).code
: undefined;
if (code === "EADDRINUSE") {
console.error(
`Failed to start: Port ${PORT} is already in use. Set PORT to a free port or stop the conflicting process.`
);
} else {
console.error("HTTP server encountered an error while starting:", err);
}
// Ensure a non-zero exit so npm reports the failure instead of silently exiting
process.exit(1);
});
// Handle server shutdown
process.on("SIGINT", async () => {
console.log("Shutting down server...");
// Close all active transports to properly clean up resources
for (const sessionId in transports) {
try {
console.log(`Closing transport for session ${sessionId}`);
await transports.get(sessionId)!.close();
transports.delete(sessionId);
} catch (error) {
console.log(`Error closing transport for session ${sessionId}:`, error);
}
}
console.log("Server shutdown complete");
process.exit(0);
});