Allow multiple connections to the everything server.

For both sse and streamableHttp, a server instance needs to be created for each transport. Otherwise, when a new client connects and its new transport is connected to the single server, the previous transport is overwritten in the server instance and can no longer communicate.

* In sse.ts
  - remove global server, cleanup, and transport vars
  - add transports map
  - in sse GET handler,
    - check for sessionId, there shouldn't be one, so comment "Reconnecting?" and do nothing if present
    - if sessionId not present
      - create new server and transport instance
      - connect server to transport
      - add transport to transports map
      - in server.onclose, delete the transport from the transports map and call cleanup
  - in /message POST handler
    - get the sessionId from the request
    - get the transport from the map by sessionId
    - handle the message if the transport was found

* In streamableHttp.ts
  - remove the global server and cleanup vars
  - change transports var to Map
  - in /mcp POST handler
    - when creating a new session
      - create a server instance
      - in server.onclose, delete the transport from the transports map and call cleanup
  - remove the calls to cleanup and server.close in the SIGINT handler, because the transport is closed and its onclose handler closes the server.
This commit is contained in:
cliffhall
2025-05-22 11:41:16 -04:00
parent 626dc0aa2c
commit 145f893108
2 changed files with 51 additions and 39 deletions

View File

@@ -6,36 +6,46 @@ console.error('Starting SSE server...');
const app = express();
const { server, cleanup } = createServer();
let transport: SSEServerTransport;
const transports: Map<string, SSEServerTransport> = new Map<string, SSEServerTransport>();
app.get("/sse", async (req, res) => {
console.error("Received connection");
transport = new SSEServerTransport("/message", res);
await server.connect(transport);
let transport: SSEServerTransport;
const { server, cleanup } = createServer();
server.onclose = async () => {
await cleanup();
await server.close();
};
if (req?.query?.sessionId) {
const sessionId = (req?.query?.sessionId as string) || "none";
transport = transports.get(sessionId) as SSEServerTransport;
console.error("Client Reconnecting? ", transport.sessionId);
} else {
// Create and store transport for new session
transport = new SSEServerTransport("/message", res);
transports.set(transport.sessionId, transport);
// Connect server to transport
await server.connect(transport);
console.error("Client Connected: ", transport.sessionId);
// Handle close of connection
server.onclose = async () => {
console.error("Client Disconnected: ", transport.sessionId);
transports.delete(transport.sessionId);
await cleanup();
};
}
});
app.post("/message", async (req, res) => {
console.error("Received message");
await transport.handlePostMessage(req, res);
});
process.on("SIGINT", async () => {
await cleanup();
await server.close();
process.exit(0);
const sessionId = (req?.query?.sessionId as string) || "none";
const transport = transports.get(sessionId);
if (transport) {
console.error("Client Message from", sessionId);
await transport.handlePostMessage(req, res);
}
});
const PORT = process.env.PORT || 3001;
app.listen(PORT, () => {
console.error(`Server is running on port ${PORT}`);
});