Files
servers/src/everything/resources/subscriptions.ts
cliffhall 16ed05957c [WIP] Refactor everything server to be more modular and use recommended APIs.
Adding simulated logging and refactoring subscriptions to not need to track transports

* Updated architecture.md

* In server/index.ts
  - remove import of Transport
  - import beginSimulatedLogging and stopSimulatedLogging
  - in clientConnected()
    - change argument to sessionId? instead of transport
    - add call to beginSimulatedLogging
    - send server and sessionId to beginSimulatedResourceUpdates and beginSimulatedLogging
  - in cleanup()
    - add call to stopSimulatedLogging passing sessionId

* Added server/logging.ts
  - Initialize logsUpdateIntervals to Map session ID to the interval for sending logging messages to the client
  - in beginSimulatedLogging()
    - create an array of logging meesages, customized with the sessionId if present
    - if the interval for the sessionId hasn't been set, create one, calling server.sendLoggingMessage with a random message to the client each time the interval elapses
  - in stopSimulatedLogging()
    - if a logging interval exists for the sessionId, clear it and remove it

* In subscriptions.ts
  - remove import of Transport
  - remove transports map
  - in beginSimulatedResourceUpdates()
    - change arguments to server and sessionId
    - check for the subsUpdateInterval for the session
    - remove all transport storage and interaction
    - instead use the server to send the notification
 - in stopSimulatedResourceUpdates()
   - remove management of transports map

* In sse.ts and streamableHttp.ts
  - when calling clientConnected, pass sessionId instead of transport

* In stdio.ts,
- when calling clientConnected, pass nothing instead of transport

* In subscriptions.ts
  - updated inline doc
2025-12-07 19:32:18 -05:00

152 lines
4.9 KiB
TypeScript

import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import {
SubscribeRequestSchema,
UnsubscribeRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
// Track subscriber session id lists by URI
const subscriptions: Map<string, Set<string | undefined>> = new Map<
string,
Set<string | undefined>
>();
// Interval to send notifications to subscribers
const subsUpdateIntervals: Map<string | undefined, NodeJS.Timeout | undefined> =
new Map<string | undefined, NodeJS.Timeout | undefined>();
/**
* Sets up the subscription and unsubscription handlers for the provided server.
*
* The function defines two request handlers:
* 1. A `Subscribe` handler that allows clients to subscribe to specific resource URIs.
* 2. An `Unsubscribe` handler that allows clients to unsubscribe from specific resource URIs.
*
* The `Subscribe` handler performs the following actions:
* - Extracts the URI and session ID from the request.
* - Logs a message acknowledging the subscription request.
* - Updates the internal tracking of subscribers for the given URI.
*
* The `Unsubscribe` handler performs the following actions:
* - Extracts the URI and session ID from the request.
* - Logs a message acknowledging the unsubscription request.
* - Removes the subscriber for the specified URI.
*
* @param {McpServer} server - The server instance to which subscription handlers will be attached.
*/
export const setSubscriptionHandlers = (server: McpServer) => {
// Set the subscription handler
server.server.setRequestHandler(
SubscribeRequestSchema,
async (request, extra) => {
// Get the URI to subscribe to
const { uri } = request.params;
// Get the session id (can be undefined for stdio)
const sessionId = extra.sessionId as string;
// Acknowledge the subscribe request
await server.sendLoggingMessage(
{
level: "info",
data: `Received Subscribe Resource request for URI: ${uri} ${
sessionId ? `from session ${sessionId}` : ""
}`,
},
sessionId
);
// Get the subscribers for this URI
const subscribers = subscriptions.has(uri)
? (subscriptions.get(uri) as Set<string>)
: new Set<string>();
subscribers.add(sessionId);
subscriptions.set(uri, subscribers);
return {};
}
);
// Set the unsubscription handler
server.server.setRequestHandler(
UnsubscribeRequestSchema,
async (request, extra) => {
// Get the URI to subscribe to
const { uri } = request.params;
// Get the session id (can be undefined for stdio)
const sessionId = extra.sessionId as string;
// Acknowledge the subscribe request
await server.sendLoggingMessage(
{
level: "info",
data: `Received Unsubscribe Resource request: ${uri} ${
sessionId ? `from session ${sessionId}` : ""
}`,
},
sessionId
);
// Remove the subscriber
if (subscriptions.has(uri)) {
const subscribers = subscriptions.get(uri) as Set<string>;
if (subscribers.has(sessionId)) subscribers.delete(sessionId);
}
return {};
}
);
};
/**
* Starts the process of simulating resource updates and sending server notifications
* to the client for the resources they are subscribed to. If the update interval is
* already active, invoking this function will not start another interval.
*
* @param server
* @param sessionId
*/
export const beginSimulatedResourceUpdates = (
server: McpServer,
sessionId: string | undefined
) => {
if (!subsUpdateIntervals.has(sessionId)) {
// Set the interval to send resource update notifications to this client
subsUpdateIntervals.set(
sessionId,
setInterval(async () => {
// Search all URIs for ones this client is subscribed to
for (const uri of subscriptions.keys()) {
const subscribers = subscriptions.get(uri) as Set<string | undefined>;
// If this client is subscribed, send the notification
if (subscribers.has(sessionId)) {
await server.server.notification({
method: "notifications/resources/updated",
params: { uri },
});
} else {
subscribers.delete(sessionId); // subscriber has disconnected
}
}
}, 10000)
);
}
};
/**
* Stops simulated resource updates for a given session.
*
* This function halts any active intervals associated with the provided session ID
* and removes the session's corresponding entries from resource management collections.
* Session ID can be undefined for stdio.
*
* @param {string} [sessionId]
*/
export const stopSimulatedResourceUpdates = (sessionId?: string) => {
// Remove active intervals
if (subsUpdateIntervals.has(sessionId)) {
const subsUpdateInterval = subsUpdateIntervals.get(sessionId);
clearInterval(subsUpdateInterval);
subsUpdateIntervals.delete(sessionId);
}
};