Merge pull request #3242 from cliffhall/clear-timeouts-in-cleanup

Clear initialization timeout on disconnect
This commit is contained in:
Ola Hungerford
2026-01-24 10:47:19 -07:00
committed by GitHub
8 changed files with 120 additions and 60 deletions

View File

@@ -89,13 +89,14 @@ Use `trigger-sampling-request-async` or `trigger-elicitation-request-async` to d
MCP Tasks are bidirectional - both server and client can be task executors:
| Direction | Request Type | Task Executor | Demo Tool |
|-----------|--------------|---------------|-----------|
| Client -> Server | `tools/call` | Server | `simulate-research-query` |
| Server -> Client | `sampling/createMessage` | Client | `trigger-sampling-request-async` |
| Server -> Client | `elicitation/create` | Client | `trigger-elicitation-request-async` |
| Direction | Request Type | Task Executor | Demo Tool |
| ---------------- | ------------------------ | ------------- | ----------------------------------- |
| Client -> Server | `tools/call` | Server | `simulate-research-query` |
| Server -> Client | `sampling/createMessage` | Client | `trigger-sampling-request-async` |
| Server -> Client | `elicitation/create` | Client | `trigger-elicitation-request-async` |
For client-side tasks:
1. Server sends request with task metadata (e.g., `params.task.ttl`)
2. Client creates task and returns `CreateTaskResult` with `taskId`
3. Server polls `tasks/get` for status updates

View File

@@ -40,6 +40,8 @@ export const createServer: () => ServerFactoryResponse = () => {
const taskStore = new InMemoryTaskStore();
const taskMessageQueue = new InMemoryTaskMessageQueue();
let initializeTimeout: NodeJS.Timeout | null = null;
// Create the server
const server = new McpServer(
{
@@ -98,7 +100,7 @@ export const createServer: () => ServerFactoryResponse = () => {
// This is delayed until after the `notifications/initialized` handler finishes,
// otherwise, the request gets lost.
const sessionId = server.server.transport?.sessionId;
setTimeout(() => syncRoots(server, sessionId), 350);
initializeTimeout = setTimeout(() => syncRoots(server, sessionId), 350);
};
// Return the ServerFactoryResponse
@@ -110,6 +112,7 @@ export const createServer: () => ServerFactoryResponse = () => {
stopSimulatedResourceUpdates(sessionId);
// Clean up task store timers
taskStore.cleanup();
if (initializeTimeout) clearTimeout(initializeTimeout);
},
} satisfies ServerFactoryResponse;
};

View File

@@ -63,15 +63,10 @@ export const syncRoots = async (server: McpServer, sessionId?: string) => {
);
}
} catch (error) {
await server.sendLoggingMessage(
{
level: "error",
logger: "everything-server",
data: `Failed to request roots from client: ${
error instanceof Error ? error.message : String(error)
}`,
},
sessionId
console.error(
`Failed to request roots from client ${sessionId}: ${
error instanceof Error ? error.message : String(error)
}`
);
}
};

View File

@@ -106,7 +106,8 @@ async function runResearchProcess(
interpretation: {
type: "string",
title: "Clarification",
description: "Which interpretation of the topic do you mean?",
description:
"Which interpretation of the topic do you mean?",
oneOf: getInterpretationsForTopic(state.topic),
},
},
@@ -187,18 +188,28 @@ This tool demonstrates MCP's task-based execution pattern for long-running opera
**Task Lifecycle Demonstrated:**
1. \`tools/call\` with \`task\` parameter → Server returns \`CreateTaskResult\` (not the final result)
2. Client polls \`tasks/get\` → Server returns current status and \`statusMessage\`
3. Status progressed: \`working\`${state.clarification ? `\`input_required\`\`working\`` : ""}\`completed\`
3. Status progressed: \`working\`${
state.clarification ? `\`input_required\`\`working\`` : ""
}\`completed\`
4. Client calls \`tasks/result\` → Server returns this final result
${state.clarification ? `**Elicitation Flow:**
${
state.clarification
? `**Elicitation Flow:**
When the query was ambiguous, the server sent an \`elicitation/create\` request
to the client. The task status changed to \`input_required\` while awaiting user input.
${state.clarification.includes("unavailable on HTTP") ? `
${
state.clarification.includes("unavailable on HTTP")
? `
**Note:** Elicitation was skipped because this server is running over HTTP transport.
The current SDK's \`sendRequest\` only works over STDIO. Full HTTP elicitation support
requires SDK PR #1210's streaming \`elicitInputStream\` API.
` : `After receiving clarification ("${state.clarification}"), the task resumed processing and completed.`}
` : ""}
`
: `After receiving clarification ("${state.clarification}"), the task resumed processing and completed.`
}
`
: ""
}
**Key Concepts:**
- Tasks enable "call now, fetch later" patterns
- \`statusMessage\` provides human-readable progress updates
@@ -288,9 +299,7 @@ export const registerSimulateResearchQueryTool = (server: McpServer) => {
* Returns the current status of the research task.
*/
getTask: async (args, extra): Promise<GetTaskResult> => {
const task = await extra.taskStore.getTask(extra.taskId);
// The SDK's RequestTaskStore.getTask throws if not found, so task is always defined
return task;
return await extra.taskStore.getTask(extra.taskId);
},
/**

View File

@@ -31,15 +31,20 @@ const MAX_POLL_ATTEMPTS = 600;
*
* @param {McpServer} server - The McpServer instance where the tool will be registered.
*/
export const registerTriggerElicitationRequestAsyncTool = (server: McpServer) => {
export const registerTriggerElicitationRequestAsyncTool = (
server: McpServer
) => {
// Check client capabilities
const clientCapabilities = server.server.getClientCapabilities() || {};
// Client must support elicitation AND tasks.requests.elicitation
const clientSupportsElicitation = clientCapabilities.elicitation !== undefined;
const clientTasksCapability = clientCapabilities.tasks as {
requests?: { elicitation?: { create?: object } };
} | undefined;
const clientSupportsElicitation =
clientCapabilities.elicitation !== undefined;
const clientTasksCapability = clientCapabilities.tasks as
| {
requests?: { elicitation?: { create?: object } };
}
| undefined;
const clientSupportsAsyncElicitation =
clientTasksCapability?.requests?.elicitation?.create !== undefined;
@@ -56,7 +61,8 @@ export const registerTriggerElicitationRequestAsyncTool = (server: McpServer) =>
task: {
ttl: 600000, // 10 minutes (user input may take a while)
},
message: "Please provide inputs for the following fields (async task demo):",
message:
"Please provide inputs for the following fields (async task demo):",
requestedSchema: {
type: "object" as const,
properties: {
@@ -107,14 +113,18 @@ export const registerTriggerElicitationRequestAsyncTool = (server: McpServer) =>
);
// Check if client returned CreateTaskResult (has task object)
const isTaskResult = 'task' in elicitResponse && elicitResponse.task;
const isTaskResult = "task" in elicitResponse && elicitResponse.task;
if (!isTaskResult) {
// Client executed synchronously - return the direct response
return {
content: [
{
type: "text",
text: `[SYNC] Client executed synchronously:\n${JSON.stringify(elicitResponse, null, 2)}`,
text: `[SYNC] Client executed synchronously:\n${JSON.stringify(
elicitResponse,
null,
2
)}`,
},
],
};
@@ -145,19 +155,27 @@ export const registerTriggerElicitationRequestAsyncTool = (server: McpServer) =>
method: "tasks/get",
params: { taskId },
},
z.object({
status: z.string(),
statusMessage: z.string().optional(),
}).passthrough()
z
.object({
status: z.string(),
statusMessage: z.string().optional(),
})
.passthrough()
);
taskStatus = pollResult.status;
taskStatusMessage = pollResult.statusMessage;
// Only log status changes or every 10 polls to avoid spam
if (attempts === 1 || attempts % 10 === 0 || taskStatus !== "input_required") {
if (
attempts === 1 ||
attempts % 10 === 0 ||
taskStatus !== "input_required"
) {
statusMessages.push(
`Poll ${attempts}: ${taskStatus}${taskStatusMessage ? ` - ${taskStatusMessage}` : ""}`
`Poll ${attempts}: ${taskStatus}${
taskStatusMessage ? ` - ${taskStatusMessage}` : ""
}`
);
}
}
@@ -168,7 +186,9 @@ export const registerTriggerElicitationRequestAsyncTool = (server: McpServer) =>
content: [
{
type: "text",
text: `[TIMEOUT] Task timed out after ${MAX_POLL_ATTEMPTS} poll attempts\n\nProgress:\n${statusMessages.join("\n")}`,
text: `[TIMEOUT] Task timed out after ${MAX_POLL_ATTEMPTS} poll attempts\n\nProgress:\n${statusMessages.join(
"\n"
)}`,
},
],
};
@@ -180,7 +200,9 @@ export const registerTriggerElicitationRequestAsyncTool = (server: McpServer) =>
content: [
{
type: "text",
text: `[${taskStatus.toUpperCase()}] ${taskStatusMessage || "No message"}\n\nProgress:\n${statusMessages.join("\n")}`,
text: `[${taskStatus.toUpperCase()}] ${
taskStatusMessage || "No message"
}\n\nProgress:\n${statusMessages.join("\n")}`,
},
],
};
@@ -207,8 +229,10 @@ export const registerTriggerElicitationRequestAsyncTool = (server: McpServer) =>
const userData = result.content as Record<string, unknown>;
const lines = [];
if (userData.name) lines.push(`- Name: ${userData.name}`);
if (userData.favoriteColor) lines.push(`- Favorite Color: ${userData.favoriteColor}`);
if (userData.agreeToTerms !== undefined) lines.push(`- Agreed to terms: ${userData.agreeToTerms}`);
if (userData.favoriteColor)
lines.push(`- Favorite Color: ${userData.favoriteColor}`);
if (userData.agreeToTerms !== undefined)
lines.push(`- Agreed to terms: ${userData.agreeToTerms}`);
content.push({
type: "text",
@@ -229,7 +253,9 @@ export const registerTriggerElicitationRequestAsyncTool = (server: McpServer) =>
// Include progress and raw result for debugging
content.push({
type: "text",
text: `\nProgress:\n${statusMessages.join("\n")}\n\nRaw result: ${JSON.stringify(result, null, 2)}`,
text: `\nProgress:\n${statusMessages.join(
"\n"
)}\n\nRaw result: ${JSON.stringify(result, null, 2)}`,
});
return { content };

View File

@@ -1,5 +1,8 @@
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { ElicitResultSchema, CallToolResult } from "@modelcontextprotocol/sdk/types.js";
import {
ElicitResultSchema,
CallToolResult,
} from "@modelcontextprotocol/sdk/types.js";
// Tool configuration
const name = "trigger-elicitation-request";

View File

@@ -48,9 +48,11 @@ export const registerTriggerSamplingRequestAsyncTool = (server: McpServer) => {
// Client must support sampling AND tasks.requests.sampling
const clientSupportsSampling = clientCapabilities.sampling !== undefined;
const clientTasksCapability = clientCapabilities.tasks as {
requests?: { sampling?: { createMessage?: object } };
} | undefined;
const clientTasksCapability = clientCapabilities.tasks as
| {
requests?: { sampling?: { createMessage?: object } };
}
| undefined;
const clientSupportsAsyncSampling =
clientTasksCapability?.requests?.sampling?.createMessage !== undefined;
@@ -64,7 +66,9 @@ export const registerTriggerSamplingRequestAsyncTool = (server: McpServer) => {
// Create the sampling request WITH task metadata
// The params.task field signals to the client that this should be executed as a task
const request: CreateMessageRequest & { params: { task?: { ttl: number } } } = {
const request: CreateMessageRequest & {
params: { task?: { ttl: number } };
} = {
method: "sampling/createMessage",
params: {
task: {
@@ -112,14 +116,19 @@ export const registerTriggerSamplingRequestAsyncTool = (server: McpServer) => {
);
// Check if client returned CreateTaskResult (has task object)
const isTaskResult = 'task' in samplingResponse && samplingResponse.task;
const isTaskResult =
"task" in samplingResponse && samplingResponse.task;
if (!isTaskResult) {
// Client executed synchronously - return the direct response
return {
content: [
{
type: "text",
text: `[SYNC] Client executed synchronously:\n${JSON.stringify(samplingResponse, null, 2)}`,
text: `[SYNC] Client executed synchronously:\n${JSON.stringify(
samplingResponse,
null,
2
)}`,
},
],
};
@@ -150,16 +159,20 @@ export const registerTriggerSamplingRequestAsyncTool = (server: McpServer) => {
method: "tasks/get",
params: { taskId },
},
z.object({
status: z.string(),
statusMessage: z.string().optional(),
}).passthrough()
z
.object({
status: z.string(),
statusMessage: z.string().optional(),
})
.passthrough()
);
taskStatus = pollResult.status;
taskStatusMessage = pollResult.statusMessage;
statusMessages.push(
`Poll ${attempts}: ${taskStatus}${taskStatusMessage ? ` - ${taskStatusMessage}` : ""}`
`Poll ${attempts}: ${taskStatus}${
taskStatusMessage ? ` - ${taskStatusMessage}` : ""
}`
);
}
@@ -169,7 +182,9 @@ export const registerTriggerSamplingRequestAsyncTool = (server: McpServer) => {
content: [
{
type: "text",
text: `[TIMEOUT] Task timed out after ${MAX_POLL_ATTEMPTS} poll attempts\n\nProgress:\n${statusMessages.join("\n")}`,
text: `[TIMEOUT] Task timed out after ${MAX_POLL_ATTEMPTS} poll attempts\n\nProgress:\n${statusMessages.join(
"\n"
)}`,
},
],
};
@@ -181,7 +196,9 @@ export const registerTriggerSamplingRequestAsyncTool = (server: McpServer) => {
content: [
{
type: "text",
text: `[${taskStatus.toUpperCase()}] ${taskStatusMessage || "No message"}\n\nProgress:\n${statusMessages.join("\n")}`,
text: `[${taskStatus.toUpperCase()}] ${
taskStatusMessage || "No message"
}\n\nProgress:\n${statusMessages.join("\n")}`,
},
],
};
@@ -201,7 +218,9 @@ export const registerTriggerSamplingRequestAsyncTool = (server: McpServer) => {
content: [
{
type: "text",
text: `[COMPLETED] Async sampling completed!\n\n**Progress:**\n${statusMessages.join("\n")}\n\n**Result:**\n${JSON.stringify(result, null, 2)}`,
text: `[COMPLETED] Async sampling completed!\n\n**Progress:**\n${statusMessages.join(
"\n"
)}\n\n**Result:**\n${JSON.stringify(result, null, 2)}`,
},
],
};

View File

@@ -1,4 +1,7 @@
import { StreamableHTTPServerTransport, EventStore } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import {
StreamableHTTPServerTransport,
EventStore,
} from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import express, { Request, Response } from "express";
import { createServer } from "../server/index.js";
import { randomUUID } from "node:crypto";
@@ -6,7 +9,8 @@ import cors from "cors";
// Simple in-memory event store for SSE resumability
class InMemoryEventStore implements EventStore {
private events: Map<string, { streamId: string; message: unknown }> = new Map();
private events: Map<string, { streamId: string; message: unknown }> =
new Map();
async storeEvent(streamId: string, message: unknown): Promise<string> {
const eventId = randomUUID();