diff --git a/src/everything/docs/architecture.md b/src/everything/docs/architecture.md index 1b44dc88..b8204ebd 100644 --- a/src/everything/docs/architecture.md +++ b/src/everything/docs/architecture.md @@ -7,6 +7,7 @@ This document summarizes the current layout and runtime architecture of the `src - Purpose: A minimal, modular MCP server showcasing core Model Context Protocol features. It exposes a simple tool, several prompts, and both static and dynamic resources, and can be run over multiple transports (STDIO, SSE, and Streamable HTTP). - Design: A small “server factory” constructs the MCP server and registers features. Transports are separate entry points that create/connect the server and handle network concerns. Tools, prompts, and resources are organized in their own submodules. - Two server implementations exist: + - `server/index.ts`: The lightweight, modular server used by transports in this package. - `server/everything.ts`: A comprehensive reference server (much larger, many tools/prompts/resources) kept for reference/testing but not wired up by default in the entry points. @@ -19,6 +20,7 @@ src/everything ├── index.ts ├── server │ ├── index.ts +│ ├── logging.ts │ └── everything.ts ├── transports │ ├── stdio.ts @@ -57,21 +59,23 @@ At `src/everything`: - Server factory that creates an `McpServer` with declared capabilities, loads server instructions, and registers tools, prompts, and resources. - Sets resource subscription handlers via `setSubscriptionHandlers(server)`. - Exposes `{ server, clientConnected, cleanup }` to the chosen transport. + - logging.ts + - Implements simulated logging. Periodically sends randomized log messages at various levels to the connected client session. Started/stopped via the server factory lifecycle hooks. - everything.ts - A full “reference/monolith” implementation demonstrating most MCP features. Not the default path used by the transports in this package. - transports/ - stdio.ts - - Starts a `StdioServerTransport`, creates the server via `createServer()`, connects it, and invokes `clientConnected(transport)` so simulated resource updates can begin. Handles `SIGINT` to close cleanly. + - Starts a `StdioServerTransport`, creates the server via `createServer()`, connects it, and invokes `clientConnected()` so simulated resource updates and logging can begin. Handles `SIGINT` to close cleanly. - sse.ts - Express server exposing: - `GET /sse` to establish an SSE connection per session. - `POST /message` for client messages. - - Manages a `Map` for sessions. Calls `clientConnected(transport)` after connect so per‑session simulated resource updates start. + - Manages a `Map` for sessions. Calls `clientConnected(sessionId)` after connect so per‑session simulated resource updates and logging start. - streamableHttp.ts - Express server exposing a single `/mcp` endpoint for POST (JSON‑RPC), GET (SSE stream), and DELETE (session termination) using `StreamableHTTPServerTransport`. - - Uses an `InMemoryEventStore` for resumable sessions and tracks transports by `sessionId`. Connects a fresh server instance on initialization POST, invokes `clientConnected(transport)`, then reuses the transport for subsequent requests. + - Uses an `InMemoryEventStore` for resumable sessions and tracks transports by `sessionId`. Connects a fresh server instance on initialization POST, invokes `clientConnected(sessionId)`, then reuses the transport for subsequent requests. - tools/ @@ -146,13 +150,13 @@ At `src/everything`: - Registers prompts via `registerPrompts(server)`. - Sets up resource subscription handlers via `setSubscriptionHandlers(server)`. - Returns the server and two lifecycle hooks: - - `clientConnected(transport)`: transports call this after connecting so the server can begin per‑session simulated resource update notifications over that specific transport. - - `cleanup(sessionId?)`: transports call this on session termination to stop simulated updates and remove session‑scoped state. + - `clientConnected(sessionId?)`: transports call this after connecting so the server can begin per‑session simulated resource update notifications and simulated logging for that session. + - `cleanup(sessionId?)`: transports call this on session termination to stop simulated resource updates and simulated logging, and remove session‑scoped state. 4. Each transport is responsible for network/session lifecycle: - - STDIO: simple process‑bound connection; calls `clientConnected(transport)` after connect; closes on `SIGINT` and calls `cleanup()`. - - SSE: maintains a session map keyed by `sessionId`, calls `clientConnected(transport)` after connect, hooks server’s `onclose` to clean and remove session, exposes `/sse` (GET) and `/message` (POST) endpoints. - - Streamable HTTP: exposes `/mcp` for POST (JSON‑RPC messages), GET (SSE stream), and DELETE (termination). Uses an event store for resumability and stores transports by `sessionId`. Calls `clientConnected(transport)` on initialization and `cleanup(sessionId)` on DELETE. + - STDIO: simple process‑bound connection; calls `clientConnected()` after connect; closes on `SIGINT` and calls `cleanup()`. + - SSE: maintains a session map keyed by `sessionId`, calls `clientConnected(sessionId)` after connect, hooks server’s `onclose` to clean and remove session, exposes `/sse` (GET) and `/message` (POST) endpoints. + - Streamable HTTP: exposes `/mcp` for POST (JSON‑RPC messages), GET (SSE stream), and DELETE (termination). Uses an event store for resumability and stores transports by `sessionId`. Calls `clientConnected(sessionId)` on initialization and `cleanup(sessionId)` on DELETE. ## Registered Features (current minimal set) @@ -169,14 +173,18 @@ At `src/everything`: - `resource-prompt` (prompts/resource.ts): Accepts `resourceType` ("Text" or "Blob") and `resourceId` (string convertible to integer) and returns messages that include an embedded dynamic resource of the selected type generated via `resources/templates.ts`. - Resources + - Dynamic Text: `demo://resource/dynamic/text/{index}` (content generated on the fly) - Dynamic Blob: `demo://resource/dynamic/blob/{index}` (base64 payload generated on the fly) - Static Docs: `demo://resource/static/document/` (serves files from `src/everything/docs/` as static file-based resources) - Resource Subscriptions and Notifications - Clients may subscribe/unsubscribe to resource URIs using the MCP `resources/subscribe` and `resources/unsubscribe` requests. - - The server sends simulated update notifications with method `notifications/resources/updated { uri }` only to transports (sessions) that subscribed to that URI. - - Multiple concurrent clients are supported; each client’s subscriptions are tracked per session and notifications are delivered independently over that client’s transport. + - The server sends simulated update notifications with method `notifications/resources/updated { uri }` only to sessions that subscribed to that URI. + - Multiple concurrent clients are supported; each client’s subscriptions are tracked per session and notifications are delivered independently via the server instance associated with that session. + +- Logging + - Simulated logging is enabled. The server emits periodic log messages of varying levels (debug, info, notice, warning, error, critical, alert, emergency) per session. Clients can control the minimum level they receive via standard MCP `logging/setLevel` request. ## Extension Points @@ -198,13 +206,21 @@ At `src/everything`: ## Resource Subscriptions – How It Works - Module: `resources/subscriptions.ts` - - Tracks subscribers per URI: `Map>`. - - Tracks active transports per session: `Map`. - - Installs handlers via `setSubscriptionHandlers(server)` to process subscribe/unsubscribe requests and keep the maps updated. - - `clientConnected(transport)` (from the server factory) calls `beginSimulatedResourceUpdates(transport)`, which starts a per‑session interval that scans subscribed URIs and emits `notifications/resources/updated` to that session only when applicable. - - `cleanup(sessionId?)` calls `stopSimulatedResourceUpdates(sessionId)` to clear intervals and remove transport/state for the session. -- Design note: Notifications are sent over the specific subscriber’s transport rather than broadcasting via `server.notification`, ensuring that each client receives only the updates for its own subscriptions. + - Tracks subscribers per URI: `Map>`. + - Installs handlers via `setSubscriptionHandlers(server)` to process subscribe/unsubscribe requests and keep the map updated. + - `clientConnected(sessionId?)` (from the server factory) calls `beginSimulatedResourceUpdates(server, sessionId)`, which starts a per‑session interval that scans subscribed URIs and emits `notifications/resources/updated` from that session’s server instance only when applicable. + - `cleanup(sessionId?)` calls `stopSimulatedResourceUpdates(sessionId)` to clear intervals and remove session‑scoped state. + +- Design note: Each client session has its own `McpServer` instance; periodic checks run per session and invoke `server.notification(...)` on that instance, so messages are delivered only to the intended client. + +## Simulated Logging – How It Works + +- Module: `server/logging.ts` + + - Periodically sends randomized log messages at different levels. Messages can include the session ID for clarity during demos. + - Started via `beginSimulatedLogging(server, sessionId?)` when a client connects and stopped via `stopSimulatedLogging(sessionId?)` during cleanup. + - Uses `server.sendLoggingMessage({ level, data }, sessionId?)` so that the client’s configured minimum logging level is respected by the SDK. - Adding Transports - Implement a new transport module under `transports/`. diff --git a/src/everything/resources/subscriptions.ts b/src/everything/resources/subscriptions.ts index 339cc30b..c10ed922 100644 --- a/src/everything/resources/subscriptions.ts +++ b/src/everything/resources/subscriptions.ts @@ -1,5 +1,4 @@ import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; -import { Transport } from "@modelcontextprotocol/sdk/shared/transport.js"; import { SubscribeRequestSchema, UnsubscribeRequestSchema, @@ -11,12 +10,6 @@ const subscriptions: Map> = new Map< Set >(); -// Track transport by session id -const transports: Map = new Map< - string | undefined, - Transport ->(); - // Interval to send notifications to subscribers const subsUpdateIntervals: Map = new Map(); @@ -105,35 +98,28 @@ export const setSubscriptionHandlers = (server: McpServer) => { /** * Starts the process of simulating resource updates and sending server notifications - * to subscribed clients at regular intervals. If the update interval is already active, - * invoking this function will not start another interval. + * to the client for the resources they are subscribed to. If the update interval is + * already active, invoking this function will not start another interval. * - * Note that tracking and sending updates on the transport of the subscriber allows for - * multiple clients to be connected and independently receive only updates about their - * own subscriptions. Had we used `server.notification` instead, all clients would - * receive updates for all subscriptions. - * - * @param {Transport} transport - The transport to the subscriber + * @param server + * @param sessionId */ -export const beginSimulatedResourceUpdates = (transport: Transport) => { - const sessionId = transport?.sessionId; - if (!transports.has(sessionId)) { - // Store the transport - transports.set(sessionId, transport); - - // Set the interval to send notifications to the subscribers +export const beginSimulatedResourceUpdates = ( + server: McpServer, + sessionId: string | undefined +) => { + if (!subsUpdateIntervals.has(sessionId)) { + // Set the interval to send resource update notifications to this client subsUpdateIntervals.set( sessionId, setInterval(async () => { - // Send notifications to all subscribers for each URI + // Search all URIs for ones this client is subscribed to for (const uri of subscriptions.keys()) { const subscribers = subscriptions.get(uri) as Set; - // Get the transport for the subscriber and send the notification + // If this client is subscribed, send the notification if (subscribers.has(sessionId)) { - const transport = transports.get(sessionId) as Transport; - await transport.send({ - jsonrpc: "2.0", + await server.server.notification({ method: "notifications/resources/updated", params: { uri }, }); @@ -162,8 +148,4 @@ export const stopSimulatedResourceUpdates = (sessionId?: string) => { clearInterval(subsUpdateInterval); subsUpdateIntervals.delete(sessionId); } - // Remove transport for the session - if (transports.has(sessionId)) { - transports.delete(sessionId); - } }; diff --git a/src/everything/server/everything.ts b/src/everything/server/everything.ts index 7e686ea4..33863063 100644 --- a/src/everything/server/everything.ts +++ b/src/everything/server/everything.ts @@ -171,7 +171,6 @@ export const createServer = () => { let subscriptions: Set = new Set(); let subsUpdateInterval: NodeJS.Timeout | undefined; - let stdErrUpdateInterval: NodeJS.Timeout | undefined; let logsUpdateInterval: NodeJS.Timeout | undefined; // Store client capabilities @@ -1147,7 +1146,6 @@ export const createServer = () => { const cleanup = async () => { if (subsUpdateInterval) clearInterval(subsUpdateInterval); if (logsUpdateInterval) clearInterval(logsUpdateInterval); - if (stdErrUpdateInterval) clearInterval(stdErrUpdateInterval); }; return { server, cleanup, startNotificationIntervals }; diff --git a/src/everything/server/index.ts b/src/everything/server/index.ts index fd16d736..2013b481 100644 --- a/src/everything/server/index.ts +++ b/src/everything/server/index.ts @@ -2,7 +2,6 @@ import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; import { dirname, join } from "path"; import { readFileSync } from "fs"; import { fileURLToPath } from "url"; -import { Transport } from "@modelcontextprotocol/sdk/shared/transport.js"; import { setSubscriptionHandlers, beginSimulatedResourceUpdates, @@ -11,6 +10,7 @@ import { import { registerTools } from "../tools/index.js"; import { registerResources } from "../resources/index.js"; import { registerPrompts } from "../prompts/index.js"; +import { beginSimulatedLogging, stopSimulatedLogging } from "./logging.js"; // Everything Server factory export const createServer = () => { @@ -51,12 +51,15 @@ export const createServer = () => { return { server, - clientConnected: (transport: Transport) => { - beginSimulatedResourceUpdates(transport); - // TODO simulated logging + // When the client connects, begin simulated resource updates and logging + clientConnected: (sessionId?: string) => { + beginSimulatedResourceUpdates(server, sessionId); + beginSimulatedLogging(server, sessionId); }, + // When the client disconnects, stop simulated resource updates and logging cleanup: (sessionId?: string) => { stopSimulatedResourceUpdates(sessionId); + stopSimulatedLogging(sessionId); }, }; }; diff --git a/src/everything/server/logging.ts b/src/everything/server/logging.ts new file mode 100644 index 00000000..5612d724 --- /dev/null +++ b/src/everything/server/logging.ts @@ -0,0 +1,73 @@ +import { LoggingLevel } from "@modelcontextprotocol/sdk/types.js"; +import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; + +// Map session ID to the interval for sending logging messages to the client +const logsUpdateIntervals: Map = + new Map(); + +/** + * Initiates a simulated logging process by sending random log messages to the client at a + * fixed interval. Each log message contains a random logging level and optional session ID. + * + * @param {McpServer} server - The server instance responsible for handling the logging messages. + * @param {string | undefined} sessionId - An optional identifier for the session. If provided, + * the session ID will be appended to log messages. + */ +export const beginSimulatedLogging = ( + server: McpServer, + sessionId: string | undefined +) => { + const maybeAppendSessionId = sessionId ? ` - SessionId ${sessionId}` : ""; + const messages: { level: LoggingLevel; data: string }[] = [ + { level: "debug", data: `Debug-level message${maybeAppendSessionId}` }, + { level: "info", data: `Info-level message${maybeAppendSessionId}` }, + { level: "notice", data: `Notice-level message${maybeAppendSessionId}` }, + { + level: "warning", + data: `Warning-level message${maybeAppendSessionId}`, + }, + { level: "error", data: `Error-level message${maybeAppendSessionId}` }, + { + level: "critical", + data: `Critical-level message${maybeAppendSessionId}`, + }, + { level: "alert", data: `Alert level-message${maybeAppendSessionId}` }, + { + level: "emergency", + data: `Emergency-level message${maybeAppendSessionId}`, + }, + ]; + + // Set the interval to send logging messages to this client + if (!logsUpdateIntervals.has(sessionId)) { + logsUpdateIntervals.set( + sessionId, + setInterval(async () => { + // By using the `sendLoggingMessage` function to send the message, we + // ensure that the client's chosen logging level will be respected + await server.sendLoggingMessage( + messages[Math.floor(Math.random() * messages.length)], + sessionId + ); + }, 15000) + ); + } +}; + +/** + * Stops the simulated logging process for a given session. + * + * This function halts the periodic logging updates associated with the specified + * session ID by clearing the interval and removing the session's tracking + * reference. Session ID can be undefined for stdio. + * + * @param {string} [sessionId] - The optional unique identifier of the session. + */ +export const stopSimulatedLogging = (sessionId?: string) => { + // Remove active intervals + if (logsUpdateIntervals.has(sessionId)) { + const logsUpdateInterval = logsUpdateIntervals.get(sessionId); + clearInterval(logsUpdateInterval); + logsUpdateIntervals.delete(sessionId); + } +}; diff --git a/src/everything/transports/sse.ts b/src/everything/transports/sse.ts index 9791fdf2..06da202b 100644 --- a/src/everything/transports/sse.ts +++ b/src/everything/transports/sse.ts @@ -37,10 +37,11 @@ app.get("/sse", async (req, res) => { // Connect server to transport await server.connect(transport); - console.error("Client Connected: ", transport.sessionId); + const sessionId = transport.sessionId; + console.error("Client Connected: ", sessionId); // Start simulated logging and subscription updates when a client connects - clientConnected(transport); + clientConnected(sessionId); // Handle close of connection server.server.onclose = async () => { diff --git a/src/everything/transports/stdio.ts b/src/everything/transports/stdio.ts index f9329bb4..4e61c5be 100644 --- a/src/everything/transports/stdio.ts +++ b/src/everything/transports/stdio.ts @@ -10,7 +10,7 @@ async function main() { const { server, clientConnected, cleanup } = createServer(); await server.connect(transport); - clientConnected(transport); + clientConnected(); // Cleanup on exit process.on("SIGINT", async () => { diff --git a/src/everything/transports/streamableHttp.ts b/src/everything/transports/streamableHttp.ts index 3443116c..23872a27 100644 --- a/src/everything/transports/streamableHttp.ts +++ b/src/everything/transports/streamableHttp.ts @@ -49,7 +49,7 @@ app.post("/mcp", async (req: Request, res: Response) => { transports.set(sessionId, transport); // Start simulated logging and subscription updates when a client connects - clientConnected(transport); + clientConnected(sessionId); }, });