diff --git a/src/everything/docs/architecture.md b/src/everything/docs/architecture.md index cfa084d3..1b44dc88 100644 --- a/src/everything/docs/architecture.md +++ b/src/everything/docs/architecture.md @@ -10,6 +10,8 @@ This document summarizes the current layout and runtime architecture of the `src - `server/index.ts`: The lightweight, modular server used by transports in this package. - `server/everything.ts`: A comprehensive reference server (much larger, many tools/prompts/resources) kept for reference/testing but not wired up by default in the entry points. +- Multi‑client subscriptions: The server supports multiple concurrent clients. Each client manages its own resource subscriptions and receives notifications only for the URIs it subscribed to, independent of other clients. + ## Directory Layout ``` @@ -35,7 +37,8 @@ src/everything ├── resources │ ├── index.ts │ ├── templates.ts -│ └── files.ts +│ ├── files.ts +│ └── subscriptions.ts ├── docs │ ├── server-instructions.md │ └── architecture.md @@ -52,22 +55,23 @@ At `src/everything`: - index.ts - Server factory that creates an `McpServer` with declared capabilities, loads server instructions, and registers tools, prompts, and resources. - - Exposes `{ server, cleanup, startNotificationIntervals }` to the chosen transport. + - Sets resource subscription handlers via `setSubscriptionHandlers(server)`. + - Exposes `{ server, clientConnected, cleanup }` to the chosen transport. - everything.ts - A full “reference/monolith” implementation demonstrating most MCP features. Not the default path used by the transports in this package. - transports/ - stdio.ts - - Starts a `StdioServerTransport`, creates the server via `createServer()`, and connects it. Handles `SIGINT` to close cleanly. + - Starts a `StdioServerTransport`, creates the server via `createServer()`, connects it, and invokes `clientConnected(transport)` so simulated resource updates can begin. Handles `SIGINT` to close cleanly. - sse.ts - Express server exposing: - `GET /sse` to establish an SSE connection per session. - `POST /message` for client messages. - - Manages a `Map` for sessions. Calls `startNotificationIntervals(sessionId)` after connect (hook currently a no‑op in the factory). + - Manages a `Map` for sessions. Calls `clientConnected(transport)` after connect so per‑session simulated resource updates start. - streamableHttp.ts - Express server exposing a single `/mcp` endpoint for POST (JSON‑RPC), GET (SSE stream), and DELETE (session termination) using `StreamableHTTPServerTransport`. - - Uses an `InMemoryEventStore` for resumable sessions and tracks transports by `sessionId`. Connects a fresh server instance on initialization POST, then reuses transport for subsequent requests. + - Uses an `InMemoryEventStore` for resumable sessions and tracks transports by `sessionId`. Connects a fresh server instance on initialization POST, invokes `clientConnected(transport)`, then reuses the transport for subsequent requests. - tools/ @@ -140,14 +144,15 @@ At `src/everything`: - Registers tools via `registerTools(server)`. - Registers resources via `registerResources(server)`. - Registers prompts via `registerPrompts(server)`. + - Sets up resource subscription handlers via `setSubscriptionHandlers(server)`. - Returns the server and two lifecycle hooks: - - `cleanup`: transport may call on shutdown (currently a no‑op). - - `startNotificationIntervals(sessionId?)`: currently a no‑op; wired in SSE transport for future periodic notifications. + - `clientConnected(transport)`: transports call this after connecting so the server can begin per‑session simulated resource update notifications over that specific transport. + - `cleanup(sessionId?)`: transports call this on session termination to stop simulated updates and remove session‑scoped state. 4. Each transport is responsible for network/session lifecycle: - - STDIO: simple process‑bound connection; closes on `SIGINT`. - - SSE: maintains a session map keyed by `sessionId`, hooks server’s `onclose` to clean and remove session, exposes `/sse` (GET) and `/message` (POST) endpoints. - - Streamable HTTP: exposes `/mcp` for POST (JSON‑RPC messages), GET (SSE stream), and DELETE (termination). Uses an event store for resumability and stores transports by `sessionId`. + - STDIO: simple process‑bound connection; calls `clientConnected(transport)` after connect; closes on `SIGINT` and calls `cleanup()`. + - SSE: maintains a session map keyed by `sessionId`, calls `clientConnected(transport)` after connect, hooks server’s `onclose` to clean and remove session, exposes `/sse` (GET) and `/message` (POST) endpoints. + - Streamable HTTP: exposes `/mcp` for POST (JSON‑RPC messages), GET (SSE stream), and DELETE (termination). Uses an event store for resumability and stores transports by `sessionId`. Calls `clientConnected(transport)` on initialization and `cleanup(sessionId)` on DELETE. ## Registered Features (current minimal set) @@ -168,6 +173,11 @@ At `src/everything`: - Dynamic Blob: `demo://resource/dynamic/blob/{index}` (base64 payload generated on the fly) - Static Docs: `demo://resource/static/document/` (serves files from `src/everything/docs/` as static file-based resources) +- Resource Subscriptions and Notifications + - Clients may subscribe/unsubscribe to resource URIs using the MCP `resources/subscribe` and `resources/unsubscribe` requests. + - The server sends simulated update notifications with method `notifications/resources/updated { uri }` only to transports (sessions) that subscribed to that URI. + - Multiple concurrent clients are supported; each client’s subscriptions are tracked per session and notifications are delivered independently over that client’s transport. + ## Extension Points - Adding Tools @@ -185,6 +195,17 @@ At `src/everything`: - Create a new file under `resources/` with your `registerXResources(server)` function using `server.registerResource(...)` (optionally with `ResourceTemplate`). - Export and call it from `resources/index.ts` inside `registerResources(server)`. +## Resource Subscriptions – How It Works + +- Module: `resources/subscriptions.ts` + - Tracks subscribers per URI: `Map>`. + - Tracks active transports per session: `Map`. + - Installs handlers via `setSubscriptionHandlers(server)` to process subscribe/unsubscribe requests and keep the maps updated. + - `clientConnected(transport)` (from the server factory) calls `beginSimulatedResourceUpdates(transport)`, which starts a per‑session interval that scans subscribed URIs and emits `notifications/resources/updated` to that session only when applicable. + - `cleanup(sessionId?)` calls `stopSimulatedResourceUpdates(sessionId)` to clear intervals and remove transport/state for the session. + +- Design note: Notifications are sent over the specific subscriber’s transport rather than broadcasting via `server.notification`, ensuring that each client receives only the updates for its own subscriptions. + - Adding Transports - Implement a new transport module under `transports/`. - Add a case to `index.ts` so the CLI can select it. diff --git a/src/everything/resources/subscriptions.ts b/src/everything/resources/subscriptions.ts new file mode 100644 index 00000000..afe527c5 --- /dev/null +++ b/src/everything/resources/subscriptions.ts @@ -0,0 +1,168 @@ +import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; +import { Transport } from "@modelcontextprotocol/sdk/shared/transport.js"; +import { + SubscribeRequestSchema, + UnsubscribeRequestSchema, +} from "@modelcontextprotocol/sdk/types.js"; + +// Track subscriber session id lists by URI +const subscriptions: Map> = new Map< + string, + Set +>(); + +// Track transport by session id +const transports: Map = new Map< + string | undefined, + Transport +>(); + +// Interval to send notifications to subscribers +let subsUpdateIntervals: Map = + new Map(); + +/** + * Sets up the subscription and unsubscription handlers for the provided server. + * + * The function defines two request handlers: + * 1. A `Subscribe` handler that allows clients to subscribe to specific resource URIs. + * 2. An `Unsubscribe` handler that allows clients to unsubscribe from specific resource URIs. + * + * The `Subscribe` handler performs the following actions: + * - Extracts the URI and session ID from the request. + * - Logs a message acknowledging the subscription request. + * - Updates the internal tracking of subscribers for the given URI. + * + * The `Unsubscribe` handler performs the following actions: + * - Extracts the URI and session ID from the request. + * - Logs a message acknowledging the unsubscription request. + * - Removes the subscriber for the specified URI. + * + * @param {McpServer} server - The server instance to which subscription handlers will be attached. + */ +export const setSubscriptionHandlers = (server: McpServer) => { + // Set the subscription handler + server.server.setRequestHandler( + SubscribeRequestSchema, + async (request, extra) => { + // Get the URI to subscribe to + const { uri } = request.params; + + // Get the session id (can be undefined for stdio) + const sessionId = extra.sessionId as string; + + // Acknowledge the subscribe request + await server.sendLoggingMessage( + { + level: "info", + data: `Received Subscribe Resource request for URI: ${uri} ${ + sessionId ? `from session ${sessionId}` : "" + }`, + }, + sessionId + ); + + // Get the subscribers for this URI + const subscribers = subscriptions.has(uri) + ? (subscriptions.get(uri) as Set) + : new Set(); + subscribers.add(sessionId); + subscriptions.set(uri, subscribers); + return {}; + } + ); + + // Set the unsubscription handler + server.server.setRequestHandler( + UnsubscribeRequestSchema, + async (request, extra) => { + // Get the URI to subscribe to + const { uri } = request.params; + + // Get the session id (can be undefined for stdio) + const sessionId = extra.sessionId as string; + + // Acknowledge the subscribe request + await server.sendLoggingMessage( + { + level: "info", + data: `Received Unsubscribe Resource request: ${uri} ${ + sessionId ? `from session ${sessionId}` : "" + }`, + }, + sessionId + ); + + // Remove the subscriber + if (subscriptions.has(uri)) { + const subscribers = subscriptions.get(uri) as Set; + if (subscribers.has(sessionId)) subscribers.delete(sessionId); + } + return {}; + } + ); +}; + +/** + * Starts the process of simulating resource updates and sending server notifications + * to subscribed clients at regular intervals. If the update interval is already active, + * invoking this function will not start another interval. + * + * Note that tracking and sending updates on the transport of the subscriber allows for + * multiple clients to be connected and independently receive only updates about their + * own subscriptions. Had we used `server.notification` instead, all clients would + * receive updates for all subscriptions. + * + * @param {Transport} transport - The transport to the subscriber + */ +export const beginSimulatedResourceUpdates = (transport: Transport) => { + const sessionId = transport?.sessionId; + if (!transports.has(sessionId)) { + // Store the transport + transports.set(sessionId, transport); + + // Set the interval to send notifications to the subscribers + subsUpdateIntervals.set( + sessionId, + setInterval(async () => { + // Send notifications to all subscribers for each URI + for (const uri of subscriptions.keys()) { + const subscribers = subscriptions.get(uri) as Set; + + // Get the transport for the subscriber and send the notification + if (subscribers.has(sessionId)) { + const transport = transports.get(sessionId) as Transport; + await transport.send({ + jsonrpc: "2.0", + method: "notifications/resources/updated", + params: { uri }, + }); + } else { + subscribers.delete(sessionId); // subscriber has disconnected + } + } + }, 10000) + ); + } +}; + +/** + * Stops simulated resource updates for a given session. + * + * This function halts any active intervals associated with the provided session ID + * and removes the session's corresponding entries from resource management collections. + * + * @param {string} [sessionId] - The unique identifier of the session for which simulated resource updates should be stopped. If not provided, no action is performed. + */ +export const stopSimulatedResourceUpdates = (sessionId?: string) => { + // Remove active intervals + if (subsUpdateIntervals.has(sessionId)) { + const subsUpdateInterval = subsUpdateIntervals.get(sessionId); + clearInterval(subsUpdateInterval); + subsUpdateIntervals.delete(sessionId); + } + // Remove transport for the session + if (transports.has(sessionId)) { + transports.delete(sessionId); + } +}; diff --git a/src/everything/server/index.ts b/src/everything/server/index.ts index d9489674..fd16d736 100644 --- a/src/everything/server/index.ts +++ b/src/everything/server/index.ts @@ -2,6 +2,12 @@ import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; import { dirname, join } from "path"; import { readFileSync } from "fs"; import { fileURLToPath } from "url"; +import { Transport } from "@modelcontextprotocol/sdk/shared/transport.js"; +import { + setSubscriptionHandlers, + beginSimulatedResourceUpdates, + stopSimulatedResourceUpdates +} from "../resources/subscriptions.js"; import { registerTools } from "../tools/index.js"; import { registerResources } from "../resources/index.js"; import { registerPrompts } from "../prompts/index.js"; @@ -40,10 +46,18 @@ export const createServer = () => { // Register the prompts registerPrompts(server); + // Set resource subscription handlers + setSubscriptionHandlers(server); + return { server, - cleanup: () => {}, - startNotificationIntervals: (sessionId?: string) => {}, + clientConnected: (transport: Transport) => { + beginSimulatedResourceUpdates(transport); + // TODO simulated logging + }, + cleanup: (sessionId?: string) => { + stopSimulatedResourceUpdates(sessionId); + }, }; }; diff --git a/src/everything/transports/sse.ts b/src/everything/transports/sse.ts index 68690c0b..9791fdf2 100644 --- a/src/everything/transports/sse.ts +++ b/src/everything/transports/sse.ts @@ -21,7 +21,7 @@ const transports: Map = new Map< app.get("/sse", async (req, res) => { let transport: SSEServerTransport; - const { server, cleanup, startNotificationIntervals } = createServer(); + const { server, clientConnected, cleanup } = createServer(); if (req?.query?.sessionId) { const sessionId = req?.query?.sessionId as string; @@ -39,14 +39,15 @@ app.get("/sse", async (req, res) => { await server.connect(transport); console.error("Client Connected: ", transport.sessionId); - // Start notification intervals after client connects - startNotificationIntervals(transport.sessionId); + // Start simulated logging and subscription updates when a client connects + clientConnected(transport); // Handle close of connection server.server.onclose = async () => { - console.error("Client Disconnected: ", transport.sessionId); - transports.delete(transport.sessionId); - await cleanup(); + const sessionId = transport.sessionId; + console.error("Client Disconnected: ", sessionId); + transports.delete(sessionId); + await cleanup(sessionId); }; } }); diff --git a/src/everything/transports/stdio.ts b/src/everything/transports/stdio.ts index c5060f4e..f9329bb4 100644 --- a/src/everything/transports/stdio.ts +++ b/src/everything/transports/stdio.ts @@ -7,13 +7,15 @@ console.error("Starting default (STDIO) server..."); async function main() { const transport = new StdioServerTransport(); - const { server } = createServer(); + const { server, clientConnected, cleanup } = createServer(); await server.connect(transport); + clientConnected(transport); // Cleanup on exit process.on("SIGINT", async () => { await server.close(); + cleanup(); process.exit(0); }); } diff --git a/src/everything/transports/streamableHttp.ts b/src/everything/transports/streamableHttp.ts index 96a7be4c..3443116c 100644 --- a/src/everything/transports/streamableHttp.ts +++ b/src/everything/transports/streamableHttp.ts @@ -35,7 +35,7 @@ app.post("/mcp", async (req: Request, res: Response) => { // Reuse existing transport transport = transports.get(sessionId)!; } else if (!sessionId) { - const { server } = createServer(); + const { server, clientConnected, cleanup } = createServer(); // New initialization request const eventStore = new InMemoryEventStore(); @@ -43,10 +43,13 @@ app.post("/mcp", async (req: Request, res: Response) => { sessionIdGenerator: () => randomUUID(), eventStore, // Enable resumability onsessioninitialized: (sessionId: string) => { - // Store the transport by session ID when session is initialized + // Store the transport by session ID when a session is initialized // This avoids race conditions where requests might come in before the session is stored console.log(`Session initialized with ID: ${sessionId}`); transports.set(sessionId, transport); + + // Start simulated logging and subscription updates when a client connects + clientConnected(transport); }, }); @@ -58,6 +61,7 @@ app.post("/mcp", async (req: Request, res: Response) => { `Transport closed for session ${sid}, removing from transports map` ); transports.delete(sid); + cleanup(sid); } };