[WIP] Refactor everything server to be more modular and use recommended APIs.

Adding resource subscriptions:

* Updated architecture.md

* In server/index.ts
  - imported Transport, setSubscriptionHandlers,beginSimulatedResourceUpdates, and stopSimulatedResourceUpdates
  - call setSubscriptionHandlers passing server
  - in returned object,
    - refactor/renamed startNotificationIntervals placehodler to clientConnected, which takes a transport argument and calls beginSimulatedResourceUpdates, passing the transport
    - replaced cleanup placeholder with a function that takes an optional sessionId and calls stopSimulatedResourceUpdates, passing the sessionId

* In sse.ts, stdio.ts, and streamableHttp.ts
  - when transport is connected, called clientConnect, passing transport
  - when disconnecting, called cleanup, passing sessionId

* Added subscriptions.ts
  - tracks subscriber session id lists by URI
  - tracks transport by session id
  - tracks subscription update intervals by sessionId
  - in setSubscriptionHandlers
    - set request handlers for SubscribeRequestSchema and UnsubscribeRequestSchema
  - in beginSimulatedResourceUpdates
    - starts an interval to send updates to the transport for all subscribed resources
  - in stopSimulatedResourceUpdates
    - removes intervals and transport for gien session id
This commit is contained in:
cliffhall
2025-12-06 19:44:07 -05:00
parent 7b2ff6b064
commit 07867a5dd5
6 changed files with 231 additions and 21 deletions

View File

@@ -10,6 +10,8 @@ This document summarizes the current layout and runtime architecture of the `src
- `server/index.ts`: The lightweight, modular server used by transports in this package.
- `server/everything.ts`: A comprehensive reference server (much larger, many tools/prompts/resources) kept for reference/testing but not wired up by default in the entry points.
- Multiclient subscriptions: The server supports multiple concurrent clients. Each client manages its own resource subscriptions and receives notifications only for the URIs it subscribed to, independent of other clients.
## Directory Layout
```
@@ -35,7 +37,8 @@ src/everything
├── resources
│ ├── index.ts
│ ├── templates.ts
── files.ts
── files.ts
│ └── subscriptions.ts
├── docs
│ ├── server-instructions.md
│ └── architecture.md
@@ -52,22 +55,23 @@ At `src/everything`:
- index.ts
- Server factory that creates an `McpServer` with declared capabilities, loads server instructions, and registers tools, prompts, and resources.
- Exposes `{ server, cleanup, startNotificationIntervals }` to the chosen transport.
- Sets resource subscription handlers via `setSubscriptionHandlers(server)`.
- Exposes `{ server, clientConnected, cleanup }` to the chosen transport.
- everything.ts
- A full “reference/monolith” implementation demonstrating most MCP features. Not the default path used by the transports in this package.
- transports/
- stdio.ts
- Starts a `StdioServerTransport`, creates the server via `createServer()`, and connects it. Handles `SIGINT` to close cleanly.
- Starts a `StdioServerTransport`, creates the server via `createServer()`, connects it, and invokes `clientConnected(transport)` so simulated resource updates can begin. Handles `SIGINT` to close cleanly.
- sse.ts
- Express server exposing:
- `GET /sse` to establish an SSE connection per session.
- `POST /message` for client messages.
- Manages a `Map<sessionId, SSEServerTransport>` for sessions. Calls `startNotificationIntervals(sessionId)` after connect (hook currently a noop in the factory).
- Manages a `Map<sessionId, SSEServerTransport>` for sessions. Calls `clientConnected(transport)` after connect so persession simulated resource updates start.
- streamableHttp.ts
- Express server exposing a single `/mcp` endpoint for POST (JSONRPC), GET (SSE stream), and DELETE (session termination) using `StreamableHTTPServerTransport`.
- Uses an `InMemoryEventStore` for resumable sessions and tracks transports by `sessionId`. Connects a fresh server instance on initialization POST, then reuses transport for subsequent requests.
- Uses an `InMemoryEventStore` for resumable sessions and tracks transports by `sessionId`. Connects a fresh server instance on initialization POST, invokes `clientConnected(transport)`, then reuses the transport for subsequent requests.
- tools/
@@ -140,14 +144,15 @@ At `src/everything`:
- Registers tools via `registerTools(server)`.
- Registers resources via `registerResources(server)`.
- Registers prompts via `registerPrompts(server)`.
- Sets up resource subscription handlers via `setSubscriptionHandlers(server)`.
- Returns the server and two lifecycle hooks:
- `cleanup`: transport may call on shutdown (currently a noop).
- `startNotificationIntervals(sessionId?)`: currently a noop; wired in SSE transport for future periodic notifications.
- `clientConnected(transport)`: transports call this after connecting so the server can begin persession simulated resource update notifications over that specific transport.
- `cleanup(sessionId?)`: transports call this on session termination to stop simulated updates and remove sessionscoped state.
4. Each transport is responsible for network/session lifecycle:
- STDIO: simple processbound connection; closes on `SIGINT`.
- SSE: maintains a session map keyed by `sessionId`, hooks servers `onclose` to clean and remove session, exposes `/sse` (GET) and `/message` (POST) endpoints.
- Streamable HTTP: exposes `/mcp` for POST (JSONRPC messages), GET (SSE stream), and DELETE (termination). Uses an event store for resumability and stores transports by `sessionId`.
- STDIO: simple processbound connection; calls `clientConnected(transport)` after connect; closes on `SIGINT` and calls `cleanup()`.
- SSE: maintains a session map keyed by `sessionId`, calls `clientConnected(transport)` after connect, hooks servers `onclose` to clean and remove session, exposes `/sse` (GET) and `/message` (POST) endpoints.
- Streamable HTTP: exposes `/mcp` for POST (JSONRPC messages), GET (SSE stream), and DELETE (termination). Uses an event store for resumability and stores transports by `sessionId`. Calls `clientConnected(transport)` on initialization and `cleanup(sessionId)` on DELETE.
## Registered Features (current minimal set)
@@ -168,6 +173,11 @@ At `src/everything`:
- Dynamic Blob: `demo://resource/dynamic/blob/{index}` (base64 payload generated on the fly)
- Static Docs: `demo://resource/static/document/<filename>` (serves files from `src/everything/docs/` as static file-based resources)
- Resource Subscriptions and Notifications
- Clients may subscribe/unsubscribe to resource URIs using the MCP `resources/subscribe` and `resources/unsubscribe` requests.
- The server sends simulated update notifications with method `notifications/resources/updated { uri }` only to transports (sessions) that subscribed to that URI.
- Multiple concurrent clients are supported; each clients subscriptions are tracked per session and notifications are delivered independently over that clients transport.
## Extension Points
- Adding Tools
@@ -185,6 +195,17 @@ At `src/everything`:
- Create a new file under `resources/` with your `registerXResources(server)` function using `server.registerResource(...)` (optionally with `ResourceTemplate`).
- Export and call it from `resources/index.ts` inside `registerResources(server)`.
## Resource Subscriptions How It Works
- Module: `resources/subscriptions.ts`
- Tracks subscribers per URI: `Map<uri, Set<sessionId>>`.
- Tracks active transports per session: `Map<sessionId, Transport>`.
- Installs handlers via `setSubscriptionHandlers(server)` to process subscribe/unsubscribe requests and keep the maps updated.
- `clientConnected(transport)` (from the server factory) calls `beginSimulatedResourceUpdates(transport)`, which starts a persession interval that scans subscribed URIs and emits `notifications/resources/updated` to that session only when applicable.
- `cleanup(sessionId?)` calls `stopSimulatedResourceUpdates(sessionId)` to clear intervals and remove transport/state for the session.
- Design note: Notifications are sent over the specific subscribers transport rather than broadcasting via `server.notification`, ensuring that each client receives only the updates for its own subscriptions.
- Adding Transports
- Implement a new transport module under `transports/`.
- Add a case to `index.ts` so the CLI can select it.

View File

@@ -0,0 +1,168 @@
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { Transport } from "@modelcontextprotocol/sdk/shared/transport.js";
import {
SubscribeRequestSchema,
UnsubscribeRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
// Track subscriber session id lists by URI
const subscriptions: Map<string, Set<string | undefined>> = new Map<
string,
Set<string | undefined>
>();
// Track transport by session id
const transports: Map<string | undefined, Transport> = new Map<
string | undefined,
Transport
>();
// Interval to send notifications to subscribers
let subsUpdateIntervals: Map<string | undefined, NodeJS.Timeout | undefined> =
new Map<string | undefined, NodeJS.Timeout | undefined>();
/**
* Sets up the subscription and unsubscription handlers for the provided server.
*
* The function defines two request handlers:
* 1. A `Subscribe` handler that allows clients to subscribe to specific resource URIs.
* 2. An `Unsubscribe` handler that allows clients to unsubscribe from specific resource URIs.
*
* The `Subscribe` handler performs the following actions:
* - Extracts the URI and session ID from the request.
* - Logs a message acknowledging the subscription request.
* - Updates the internal tracking of subscribers for the given URI.
*
* The `Unsubscribe` handler performs the following actions:
* - Extracts the URI and session ID from the request.
* - Logs a message acknowledging the unsubscription request.
* - Removes the subscriber for the specified URI.
*
* @param {McpServer} server - The server instance to which subscription handlers will be attached.
*/
export const setSubscriptionHandlers = (server: McpServer) => {
// Set the subscription handler
server.server.setRequestHandler(
SubscribeRequestSchema,
async (request, extra) => {
// Get the URI to subscribe to
const { uri } = request.params;
// Get the session id (can be undefined for stdio)
const sessionId = extra.sessionId as string;
// Acknowledge the subscribe request
await server.sendLoggingMessage(
{
level: "info",
data: `Received Subscribe Resource request for URI: ${uri} ${
sessionId ? `from session ${sessionId}` : ""
}`,
},
sessionId
);
// Get the subscribers for this URI
const subscribers = subscriptions.has(uri)
? (subscriptions.get(uri) as Set<string>)
: new Set<string>();
subscribers.add(sessionId);
subscriptions.set(uri, subscribers);
return {};
}
);
// Set the unsubscription handler
server.server.setRequestHandler(
UnsubscribeRequestSchema,
async (request, extra) => {
// Get the URI to subscribe to
const { uri } = request.params;
// Get the session id (can be undefined for stdio)
const sessionId = extra.sessionId as string;
// Acknowledge the subscribe request
await server.sendLoggingMessage(
{
level: "info",
data: `Received Unsubscribe Resource request: ${uri} ${
sessionId ? `from session ${sessionId}` : ""
}`,
},
sessionId
);
// Remove the subscriber
if (subscriptions.has(uri)) {
const subscribers = subscriptions.get(uri) as Set<string>;
if (subscribers.has(sessionId)) subscribers.delete(sessionId);
}
return {};
}
);
};
/**
* Starts the process of simulating resource updates and sending server notifications
* to subscribed clients at regular intervals. If the update interval is already active,
* invoking this function will not start another interval.
*
* Note that tracking and sending updates on the transport of the subscriber allows for
* multiple clients to be connected and independently receive only updates about their
* own subscriptions. Had we used `server.notification` instead, all clients would
* receive updates for all subscriptions.
*
* @param {Transport} transport - The transport to the subscriber
*/
export const beginSimulatedResourceUpdates = (transport: Transport) => {
const sessionId = transport?.sessionId;
if (!transports.has(sessionId)) {
// Store the transport
transports.set(sessionId, transport);
// Set the interval to send notifications to the subscribers
subsUpdateIntervals.set(
sessionId,
setInterval(async () => {
// Send notifications to all subscribers for each URI
for (const uri of subscriptions.keys()) {
const subscribers = subscriptions.get(uri) as Set<string | undefined>;
// Get the transport for the subscriber and send the notification
if (subscribers.has(sessionId)) {
const transport = transports.get(sessionId) as Transport;
await transport.send({
jsonrpc: "2.0",
method: "notifications/resources/updated",
params: { uri },
});
} else {
subscribers.delete(sessionId); // subscriber has disconnected
}
}
}, 10000)
);
}
};
/**
* Stops simulated resource updates for a given session.
*
* This function halts any active intervals associated with the provided session ID
* and removes the session's corresponding entries from resource management collections.
*
* @param {string} [sessionId] - The unique identifier of the session for which simulated resource updates should be stopped. If not provided, no action is performed.
*/
export const stopSimulatedResourceUpdates = (sessionId?: string) => {
// Remove active intervals
if (subsUpdateIntervals.has(sessionId)) {
const subsUpdateInterval = subsUpdateIntervals.get(sessionId);
clearInterval(subsUpdateInterval);
subsUpdateIntervals.delete(sessionId);
}
// Remove transport for the session
if (transports.has(sessionId)) {
transports.delete(sessionId);
}
};

View File

@@ -2,6 +2,12 @@ import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { dirname, join } from "path";
import { readFileSync } from "fs";
import { fileURLToPath } from "url";
import { Transport } from "@modelcontextprotocol/sdk/shared/transport.js";
import {
setSubscriptionHandlers,
beginSimulatedResourceUpdates,
stopSimulatedResourceUpdates
} from "../resources/subscriptions.js";
import { registerTools } from "../tools/index.js";
import { registerResources } from "../resources/index.js";
import { registerPrompts } from "../prompts/index.js";
@@ -40,10 +46,18 @@ export const createServer = () => {
// Register the prompts
registerPrompts(server);
// Set resource subscription handlers
setSubscriptionHandlers(server);
return {
server,
cleanup: () => {},
startNotificationIntervals: (sessionId?: string) => {},
clientConnected: (transport: Transport) => {
beginSimulatedResourceUpdates(transport);
// TODO simulated logging
},
cleanup: (sessionId?: string) => {
stopSimulatedResourceUpdates(sessionId);
},
};
};

View File

@@ -21,7 +21,7 @@ const transports: Map<string, SSEServerTransport> = new Map<
app.get("/sse", async (req, res) => {
let transport: SSEServerTransport;
const { server, cleanup, startNotificationIntervals } = createServer();
const { server, clientConnected, cleanup } = createServer();
if (req?.query?.sessionId) {
const sessionId = req?.query?.sessionId as string;
@@ -39,14 +39,15 @@ app.get("/sse", async (req, res) => {
await server.connect(transport);
console.error("Client Connected: ", transport.sessionId);
// Start notification intervals after client connects
startNotificationIntervals(transport.sessionId);
// Start simulated logging and subscription updates when a client connects
clientConnected(transport);
// Handle close of connection
server.server.onclose = async () => {
console.error("Client Disconnected: ", transport.sessionId);
transports.delete(transport.sessionId);
await cleanup();
const sessionId = transport.sessionId;
console.error("Client Disconnected: ", sessionId);
transports.delete(sessionId);
await cleanup(sessionId);
};
}
});

View File

@@ -7,13 +7,15 @@ console.error("Starting default (STDIO) server...");
async function main() {
const transport = new StdioServerTransport();
const { server } = createServer();
const { server, clientConnected, cleanup } = createServer();
await server.connect(transport);
clientConnected(transport);
// Cleanup on exit
process.on("SIGINT", async () => {
await server.close();
cleanup();
process.exit(0);
});
}

View File

@@ -35,7 +35,7 @@ app.post("/mcp", async (req: Request, res: Response) => {
// Reuse existing transport
transport = transports.get(sessionId)!;
} else if (!sessionId) {
const { server } = createServer();
const { server, clientConnected, cleanup } = createServer();
// New initialization request
const eventStore = new InMemoryEventStore();
@@ -43,10 +43,13 @@ app.post("/mcp", async (req: Request, res: Response) => {
sessionIdGenerator: () => randomUUID(),
eventStore, // Enable resumability
onsessioninitialized: (sessionId: string) => {
// Store the transport by session ID when session is initialized
// Store the transport by session ID when a session is initialized
// This avoids race conditions where requests might come in before the session is stored
console.log(`Session initialized with ID: ${sessionId}`);
transports.set(sessionId, transport);
// Start simulated logging and subscription updates when a client connects
clientConnected(transport);
},
});
@@ -58,6 +61,7 @@ app.post("/mcp", async (req: Request, res: Response) => {
`Transport closed for session ${sid}, removing from transports map`
);
transports.delete(sid);
cleanup(sid);
}
};