[WIP] Refactor everything server to be more modular and use recommended APIs.

Added tools to toggle simulated logging and resource updates on and off rather than have them start immediately upon connection

* Updated architecture.md

* In server/index.ts
  - remove import of beginSimulatedResourceUpdates and beginSimulatedLogging
  - remove clientConnected from createServer factory result

* In tools/index.ts
  - import registerToggleLoggingTool and registerToggleSubscriberUpdatesTool
  - in registerTools
    - call registerToggleLoggingTool and registerToggleSubscriberUpdatesTool

* In logging.ts
  - in beginSimulatedLogging
    - refactor extract inline interval callback into function sendSimulatedLoggingMessage
  - call sendSimulatedLoggingMessage right away to send the first message
  - supply sendSimulatedLoggingMessage as interval callback

* In subscriptions.ts
  - remove import of Transport
  - remove transports map
  - in beginSimulatedResourceUpdates()
    - change arguments to server and sessionId
    - check for the subsUpdateInterval for the session
    - remove all transport storage and interaction
    - instead use the server to send the notification
 - in stopSimulatedResourceUpdates()
   - remove management of transports map

* In stdio.ts, sse.ts, and streamableHttp.ts
  - remove destructure and calling of clientConnected

* Added tools/toggle-logging.ts
  - registers a tool that
    - takes no arguments
    - tracks clients that have been enabled by session id in a set
    - if client isn't enabled,
       - calls beginSimulatedLogging
       - adds session id to client set
    - else
      - calls stopSimulatedLogging
      - deletes session id from client set
    - returns a message explaining what was done including what to expect when logging is enabled

 * Added tools/toggle-subscriber-updates.ts
   - registers a tool that
     - takes no arguments
     - tracks clients that have been enabled by session id in a set
     - if client isn't enabled,
        - calls beginSimulatedResourceUpdates
        - adds session id to client set
     - else
       - calls stopSimulatedResourceUpdates
       - deletes session id from client set
     - returns a message explaining what was done including what to expect when logging is enabled
This commit is contained in:
cliffhall
2025-12-08 17:13:42 -05:00
parent 16ed05957c
commit 346c29a086
10 changed files with 182 additions and 62 deletions

View File

@@ -6,6 +6,7 @@ This document summarizes the current layout and runtime architecture of the `src
- Purpose: A minimal, modular MCP server showcasing core Model Context Protocol features. It exposes a simple tool, several prompts, and both static and dynamic resources, and can be run over multiple transports (STDIO, SSE, and Streamable HTTP).
- Design: A small “server factory” constructs the MCP server and registers features. Transports are separate entry points that create/connect the server and handle network concerns. Tools, prompts, and resources are organized in their own submodules.
- Design: A small “server factory” constructs the MCP server and registers features. Transports are separate entry points that create/connect the server and handle network concerns. Tools, prompts, and resources are organized in their own submodules. Simulated logging and resourceupdate notifications are optin and controlled by tools.
- Two server implementations exist:
- `server/index.ts`: The lightweight, modular server used by transports in this package.
@@ -58,33 +59,39 @@ At `src/everything`:
- index.ts
- Server factory that creates an `McpServer` with declared capabilities, loads server instructions, and registers tools, prompts, and resources.
- Sets resource subscription handlers via `setSubscriptionHandlers(server)`.
- Exposes `{ server, clientConnected, cleanup }` to the chosen transport.
- Exposes `{ server, cleanup }` to the chosen transport. Cleanup stops any running intervals in the server when the transport disconencts.
- logging.ts
- Implements simulated logging. Periodically sends randomized log messages at various levels to the connected client session. Started/stopped via the server factory lifecycle hooks.
- Implements simulated logging. Periodically sends randomized log messages at various levels to the connected client session. Started/stopped on demand via a dedicated tool.
- everything.ts
- A full “reference/monolith” implementation demonstrating most MCP features. Not the default path used by the transports in this package.
- transports/
- stdio.ts
- Starts a `StdioServerTransport`, creates the server via `createServer()`, connects it, and invokes `clientConnected()` so simulated resource updates and logging can begin. Handles `SIGINT` to close cleanly.
- Starts a `StdioServerTransport`, created the server via `createServer()`, and connects it. Handles `SIGINT` to close cleanly and calls `cleanup()` to remove any live intervals.
- sse.ts
- Express server exposing:
- `GET /sse` to establish an SSE connection per session.
- `POST /message` for client messages.
- Manages a `Map<sessionId, SSEServerTransport>` for sessions. Calls `clientConnected(sessionId)` after connect so persession simulated resource updates and logging start.
- Manages multiple connected clients via a transport map.
- Starts an `SSEServerTransport`, created the server via `createServer()`, and connects it to a new transport.
- On server disconnect, calls `cleanup()` to remove any live intervals.
- streamableHttp.ts
- Express server exposing a single `/mcp` endpoint for POST (JSONRPC), GET (SSE stream), and DELETE (session termination) using `StreamableHTTPServerTransport`.
- Uses an `InMemoryEventStore` for resumable sessions and tracks transports by `sessionId`. Connects a fresh server instance on initialization POST, invokes `clientConnected(sessionId)`, then reuses the transport for subsequent requests.
- Uses an `InMemoryEventStore` for resumable sessions and tracks transports by `sessionId`. Connects a fresh server instance on initialization POST and reuses the transport for subsequent requests.
- tools/
- index.ts
- `registerTools(server)` orchestrator, currently delegates to `registerEchoTool` and `registerAddTool`.
- `registerTools(server)` orchestrator; delegates to basic tools and control tools.
- echo.ts
- Defines a minimal `echo` tool with a Zod input schema and returns `Echo: {message}`.
- add.ts
- Defines an `add` tool with a Zod input schema that sums two numbers `a` and `b` and returns the result.
- toggle-logging.ts
- Defines `toggle-logging`: starts/stops simulated logging for the invoking session.
- toggle-subscriber-updates.ts
- Defines `toggle-subscriber-updates`: starts/stops simulated resource subscription update checks for the invoking session.
- prompts/
@@ -149,14 +156,12 @@ At `src/everything`:
- Registers resources via `registerResources(server)`.
- Registers prompts via `registerPrompts(server)`.
- Sets up resource subscription handlers via `setSubscriptionHandlers(server)`.
- Returns the server and two lifecycle hooks:
- `clientConnected(sessionId?)`: transports call this after connecting so the server can begin persession simulated resource update notifications and simulated logging for that session.
- `cleanup(sessionId?)`: transports call this on session termination to stop simulated resource updates and simulated logging, and remove sessionscoped state.
- Returns the server and a `cleanup(sessionId?)` hook that stops any active intervals and removes any sessionscoped state.
4. Each transport is responsible for network/session lifecycle:
- STDIO: simple processbound connection; calls `clientConnected()` after connect; closes on `SIGINT` and calls `cleanup()`.
- SSE: maintains a session map keyed by `sessionId`, calls `clientConnected(sessionId)` after connect, hooks servers `onclose` to clean and remove session, exposes `/sse` (GET) and `/message` (POST) endpoints.
- Streamable HTTP: exposes `/mcp` for POST (JSONRPC messages), GET (SSE stream), and DELETE (termination). Uses an event store for resumability and stores transports by `sessionId`. Calls `clientConnected(sessionId)` on initialization and `cleanup(sessionId)` on DELETE.
- STDIO: simple processbound connection; closes on `SIGINT` and calls `cleanup()`.
- SSE: maintains a session map keyed by `sessionId`; hooks servers `onclose` to clean and remove session; exposes `/sse` (GET) and `/message` (POST) endpoints.
- Streamable HTTP: exposes `/mcp` for POST (JSONRPC messages), GET (SSE stream), and DELETE (termination). Uses an event store for resumability and stores transports by `sessionId`. Does not autostart simulated features; calls `cleanup(sessionId)` on DELETE.
## Registered Features (current minimal set)
@@ -164,6 +169,8 @@ At `src/everything`:
- `echo` (tools/echo.ts): Echoes the provided `message: string`. Uses Zod to validate inputs.
- `add` (tools/add.ts): Adds two numbers `a` and `b` and returns their sum. Uses Zod to validate inputs.
- `toggle-logging` (tools/toggle-logging.ts): Starts or stops simulated, randomleveled logging for the invoking session. Respects the clients selected minimum logging level.
- `toggle-subscriber-updates` (tools/toggle-subscriber-updates.ts): Starts or stops simulated resource update notifications for URIs the invoking session has subscribed to.
- Prompts
@@ -179,12 +186,13 @@ At `src/everything`:
- Static Docs: `demo://resource/static/document/<filename>` (serves files from `src/everything/docs/` as static file-based resources)
- Resource Subscriptions and Notifications
- Clients may subscribe/unsubscribe to resource URIs using the MCP `resources/subscribe` and `resources/unsubscribe` requests.
- The server sends simulated update notifications with method `notifications/resources/updated { uri }` only to sessions that subscribed to that URI.
- Simulated update notifications are optin and off by default. Use the `toggle-subscriber-updates` tool to start/stop a persession interval that emits `notifications/resources/updated { uri }` only for URIs that session has subscribed to.
- Multiple concurrent clients are supported; each clients subscriptions are tracked per session and notifications are delivered independently via the server instance associated with that session.
- Logging
- Simulated logging is enabled. The server emits periodic log messages of varying levels (debug, info, notice, warning, error, critical, alert, emergency) per session. Clients can control the minimum level they receive via standard MCP `logging/setLevel` request.
- Simulated logging is available but off by default. Use the `toggle-logging` tool to start/stop periodic log messages of varying levels (debug, info, notice, warning, error, critical, alert, emergency) per session. Clients can control the minimum level they receive via the standard MCP `logging/setLevel` request.
## Extension Points
@@ -209,7 +217,7 @@ At `src/everything`:
- Tracks subscribers per URI: `Map<uri, Set<sessionId>>`.
- Installs handlers via `setSubscriptionHandlers(server)` to process subscribe/unsubscribe requests and keep the map updated.
- `clientConnected(sessionId?)` (from the server factory) calls `beginSimulatedResourceUpdates(server, sessionId)`, which starts a persession interval that scans subscribed URIs and emits `notifications/resources/updated` from that sessions server instance only when applicable.
- Updates are started/stopped on demand by the `toggle-subscriber-updates` tool, which calls `beginSimulatedResourceUpdates(server, sessionId)` and `stopSimulatedResourceUpdates(sessionId)`.
- `cleanup(sessionId?)` calls `stopSimulatedResourceUpdates(sessionId)` to clear intervals and remove sessionscoped state.
- Design note: Each client session has its own `McpServer` instance; periodic checks run per session and invoke `server.notification(...)` on that instance, so messages are delivered only to the intended client.
@@ -219,7 +227,7 @@ At `src/everything`:
- Module: `server/logging.ts`
- Periodically sends randomized log messages at different levels. Messages can include the session ID for clarity during demos.
- Started via `beginSimulatedLogging(server, sessionId?)` when a client connects and stopped via `stopSimulatedLogging(sessionId?)` during cleanup.
- Started/stopped on demand via the `toggle-logging` tool, which calls `beginSimulatedLogging(server, sessionId?)` and `stopSimulatedLogging(sessionId?)`. Note that transport disconnect triggers `cleanup()` which also stops any active intervals.
- Uses `server.sendLoggingMessage({ level, data }, sessionId?)` so that the clients configured minimum logging level is respected by the SDK.
- Adding Transports

View File

@@ -96,6 +96,38 @@ export const setSubscriptionHandlers = (server: McpServer) => {
);
};
/**
* Sends simulated resource update notifications to the subscribed client.
*
* This function iterates through all resource URIs stored in the subscriptions
* and checks if the specified session ID is subscribed to them. If so, it sends
* a notification through the provided server. If the session ID is no longer valid
* (disconnected), it removes the session ID from the list of subscribers.
*
* @param {McpServer} server - The server instance used to send notifications.
* @param {string | undefined} sessionId - The session ID of the client to check for subscriptions.
* @returns {Promise<void>} Resolves once all applicable notifications are sent.
*/
const sendSimulatedResourceUpdates = async (
server: McpServer,
sessionId: string | undefined
): Promise<void> => {
// Search all URIs for ones this client is subscribed to
for (const uri of subscriptions.keys()) {
const subscribers = subscriptions.get(uri) as Set<string | undefined>;
// If this client is subscribed, send the notification
if (subscribers.has(sessionId)) {
await server.server.notification({
method: "notifications/resources/updated",
params: { uri },
});
} else {
subscribers.delete(sessionId); // subscriber has disconnected
}
}
};
/**
* Starts the process of simulating resource updates and sending server notifications
* to the client for the resources they are subscribed to. If the update interval is
@@ -109,25 +141,13 @@ export const beginSimulatedResourceUpdates = (
sessionId: string | undefined
) => {
if (!subsUpdateIntervals.has(sessionId)) {
// Set the interval to send resource update notifications to this client
// Send once immediately
sendSimulatedResourceUpdates(server, sessionId);
// Set the interval to send later resource update notifications to this client
subsUpdateIntervals.set(
sessionId,
setInterval(async () => {
// Search all URIs for ones this client is subscribed to
for (const uri of subscriptions.keys()) {
const subscribers = subscriptions.get(uri) as Set<string | undefined>;
// If this client is subscribed, send the notification
if (subscribers.has(sessionId)) {
await server.server.notification({
method: "notifications/resources/updated",
params: { uri },
});
} else {
subscribers.delete(sessionId); // subscriber has disconnected
}
}
}, 10000)
setInterval(() => sendSimulatedResourceUpdates(server, sessionId), 5000)
);
}
};

View File

@@ -4,13 +4,12 @@ import { readFileSync } from "fs";
import { fileURLToPath } from "url";
import {
setSubscriptionHandlers,
beginSimulatedResourceUpdates,
stopSimulatedResourceUpdates
stopSimulatedResourceUpdates,
} from "../resources/subscriptions.js";
import { registerTools } from "../tools/index.js";
import { registerResources } from "../resources/index.js";
import { registerPrompts } from "../prompts/index.js";
import { beginSimulatedLogging, stopSimulatedLogging } from "./logging.js";
import { stopSimulatedLogging } from "./logging.js";
// Everything Server factory
export const createServer = () => {
@@ -49,17 +48,13 @@ export const createServer = () => {
// Set resource subscription handlers
setSubscriptionHandlers(server);
// Return server instance and cleanup function
return {
server,
// When the client connects, begin simulated resource updates and logging
clientConnected: (sessionId?: string) => {
beginSimulatedResourceUpdates(server, sessionId);
beginSimulatedLogging(server, sessionId);
},
// When the client disconnects, stop simulated resource updates and logging
cleanup: (sessionId?: string) => {
stopSimulatedResourceUpdates(sessionId);
// Stop any simulated logging or resource updates that may have been initiated.
stopSimulatedLogging(sessionId);
stopSimulatedResourceUpdates(sessionId);
},
};
};

View File

@@ -38,18 +38,27 @@ export const beginSimulatedLogging = (
},
];
// Set the interval to send logging messages to this client
/**
* Send a simulated logging message to the client
*/
const sendSimulatedLoggingMessage = async (sessionId: string | undefined) => {
// By using the `sendLoggingMessage` function to send the message, we
// ensure that the client's chosen logging level will be respected
await server.sendLoggingMessage(
messages[Math.floor(Math.random() * messages.length)],
sessionId
);
};
// Set the interval to send later logging messages to this client
if (!logsUpdateIntervals.has(sessionId)) {
// Send once immediately
sendSimulatedLoggingMessage(sessionId);
// Sen
logsUpdateIntervals.set(
sessionId,
setInterval(async () => {
// By using the `sendLoggingMessage` function to send the message, we
// ensure that the client's chosen logging level will be respected
await server.sendLoggingMessage(
messages[Math.floor(Math.random() * messages.length)],
sessionId
);
}, 15000)
setInterval(() => sendSimulatedLoggingMessage(sessionId), 5000)
);
}
};

View File

@@ -1,6 +1,8 @@
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { registerEchoTool } from "./echo.js";
import { registerAddTool } from "./add.js";
import { registerToggleLoggingTool } from "./toggle-logging.js";
import { registerToggleSubscriberUpdatesTool } from "./toggle-subscriber-updates.js";
/**
* Register the tools with the MCP server.
@@ -9,4 +11,6 @@ import { registerAddTool } from "./add.js";
export const registerTools = (server: McpServer) => {
registerEchoTool(server);
registerAddTool(server);
registerToggleLoggingTool(server);
registerToggleSubscriberUpdatesTool(server);
};

View File

@@ -0,0 +1,51 @@
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { CallToolResult } from "@modelcontextprotocol/sdk/types.js";
import {
beginSimulatedLogging,
stopSimulatedLogging,
} from "../server/logging.js";
const name = "toggle-logging";
const config = {
title: "Toggle Logging",
description: "Toggles simulated logging on or off.",
inputSchema: {},
};
const clients: Set<string | undefined> = new Set<string | undefined>();
/**
* Registers a tool that toggles simulated logging for a session on or off.
*
* This function allows the server to manage simulated logging for client sessions.
* When invoked, it either starts or stops simulated logging based on the session's
* current state. If logging for the specified session is active, it will be stopped;
* if it is inactive it will be started.
*
* @param {McpServer} server - The server instance to which the tool is registered.
* @returns {void}
*/
export const registerToggleLoggingTool = (server: McpServer) => {
server.registerTool(
name,
config,
async (_args, extra): Promise<CallToolResult> => {
const sessionId = extra?.sessionId;
let response: string;
if (clients.has(sessionId)) {
stopSimulatedLogging(sessionId);
clients.delete(sessionId);
response = `Stopped simulated logging for session ${sessionId}`;
} else {
beginSimulatedLogging(server, sessionId);
clients.add(sessionId);
response = `Started simulated, random-leveled logging for session ${sessionId} at a 5 second pace. Client's selected logging level will be respected. If an interval elapses and the message to be sent is below the selected level, it will not be sent. Thus at higher chosen logging levels, messages should arrive further apart. `;
}
return {
content: [{ type: "text", text: `${response}` }],
};
}
);
};

View File

@@ -0,0 +1,40 @@
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { CallToolResult } from "@modelcontextprotocol/sdk/types.js";
import {
beginSimulatedResourceUpdates,
stopSimulatedResourceUpdates,
} from "../resources/subscriptions.js";
const name = "toggle-subscriber-updates";
const config = {
title: "Toggle Subscriber Updates",
description: "Toggles simulated resource subscription updates on or off.",
inputSchema: {},
};
const clients: Set<string | undefined> = new Set<string | undefined>();
export const registerToggleSubscriberUpdatesTool = (server: McpServer) => {
server.registerTool(
name,
config,
async (_args, extra): Promise<CallToolResult> => {
const sessionId = extra?.sessionId;
let response: string;
if (clients.has(sessionId)) {
stopSimulatedResourceUpdates(sessionId);
clients.delete(sessionId);
response = `Stopped simulated resource updates for session ${sessionId}`;
} else {
beginSimulatedResourceUpdates(server, sessionId);
clients.add(sessionId);
response = `Started simulated resource updated notifications for session ${sessionId} at a 5 second pace. Client will receive updates for any resources the it is subscribed to.`;
}
return {
content: [{ type: "text", text: `${response}` }],
};
}
);
};

View File

@@ -21,7 +21,7 @@ const transports: Map<string, SSEServerTransport> = new Map<
app.get("/sse", async (req, res) => {
let transport: SSEServerTransport;
const { server, clientConnected, cleanup } = createServer();
const { server, cleanup } = createServer();
if (req?.query?.sessionId) {
const sessionId = req?.query?.sessionId as string;
@@ -40,9 +40,6 @@ app.get("/sse", async (req, res) => {
const sessionId = transport.sessionId;
console.error("Client Connected: ", sessionId);
// Start simulated logging and subscription updates when a client connects
clientConnected(sessionId);
// Handle close of connection
server.server.onclose = async () => {
const sessionId = transport.sessionId;

View File

@@ -7,10 +7,9 @@ console.error("Starting default (STDIO) server...");
async function main() {
const transport = new StdioServerTransport();
const { server, clientConnected, cleanup } = createServer();
const { server, cleanup } = createServer();
await server.connect(transport);
clientConnected();
// Cleanup on exit
process.on("SIGINT", async () => {

View File

@@ -35,7 +35,7 @@ app.post("/mcp", async (req: Request, res: Response) => {
// Reuse existing transport
transport = transports.get(sessionId)!;
} else if (!sessionId) {
const { server, clientConnected, cleanup } = createServer();
const { server, cleanup } = createServer();
// New initialization request
const eventStore = new InMemoryEventStore();
@@ -47,9 +47,6 @@ app.post("/mcp", async (req: Request, res: Response) => {
// This avoids race conditions where requests might come in before the session is stored
console.log(`Session initialized with ID: ${sessionId}`);
transports.set(sessionId, transport);
// Start simulated logging and subscription updates when a client connects
clientConnected(sessionId);
},
});