"Everything Server crashes when multiple clients reconnect"

* In index.ts
  - added a variable to hold the initialize timeout
  - store the timeout in the oninitialized handler
  - clear the timeout in the cleanup callback

* In roots.ts
  - In the catch block of syncRoots, log the error to the console via .error rather than attempting to send to the client because the most probable case here is that we don't have a connection.

* In simulate-research-query.ts
  - remove redundant local variable in getTask
* Everywhere else, prettier.
This commit is contained in:
cliffhall
2026-01-23 13:26:02 -05:00
parent eedb060099
commit 9ade57133f
6 changed files with 112 additions and 50 deletions

View File

@@ -89,13 +89,14 @@ Use `trigger-sampling-request-async` or `trigger-elicitation-request-async` to d
MCP Tasks are bidirectional - both server and client can be task executors:
| Direction | Request Type | Task Executor | Demo Tool |
|-----------|--------------|---------------|-----------|
| Client -> Server | `tools/call` | Server | `simulate-research-query` |
| Server -> Client | `sampling/createMessage` | Client | `trigger-sampling-request-async` |
| Server -> Client | `elicitation/create` | Client | `trigger-elicitation-request-async` |
| Direction | Request Type | Task Executor | Demo Tool |
| ---------------- | ------------------------ | ------------- | ----------------------------------- |
| Client -> Server | `tools/call` | Server | `simulate-research-query` |
| Server -> Client | `sampling/createMessage` | Client | `trigger-sampling-request-async` |
| Server -> Client | `elicitation/create` | Client | `trigger-elicitation-request-async` |
For client-side tasks:
1. Server sends request with task metadata (e.g., `params.task.ttl`)
2. Client creates task and returns `CreateTaskResult` with `taskId`
3. Server polls `tasks/get` for status updates

View File

@@ -106,7 +106,8 @@ async function runResearchProcess(
interpretation: {
type: "string",
title: "Clarification",
description: "Which interpretation of the topic do you mean?",
description:
"Which interpretation of the topic do you mean?",
oneOf: getInterpretationsForTopic(state.topic),
},
},
@@ -187,18 +188,28 @@ This tool demonstrates MCP's task-based execution pattern for long-running opera
**Task Lifecycle Demonstrated:**
1. \`tools/call\` with \`task\` parameter → Server returns \`CreateTaskResult\` (not the final result)
2. Client polls \`tasks/get\` → Server returns current status and \`statusMessage\`
3. Status progressed: \`working\`${state.clarification ? `\`input_required\`\`working\`` : ""}\`completed\`
3. Status progressed: \`working\`${
state.clarification ? `\`input_required\`\`working\`` : ""
}\`completed\`
4. Client calls \`tasks/result\` → Server returns this final result
${state.clarification ? `**Elicitation Flow:**
${
state.clarification
? `**Elicitation Flow:**
When the query was ambiguous, the server sent an \`elicitation/create\` request
to the client. The task status changed to \`input_required\` while awaiting user input.
${state.clarification.includes("unavailable on HTTP") ? `
${
state.clarification.includes("unavailable on HTTP")
? `
**Note:** Elicitation was skipped because this server is running over HTTP transport.
The current SDK's \`sendRequest\` only works over STDIO. Full HTTP elicitation support
requires SDK PR #1210's streaming \`elicitInputStream\` API.
` : `After receiving clarification ("${state.clarification}"), the task resumed processing and completed.`}
` : ""}
`
: `After receiving clarification ("${state.clarification}"), the task resumed processing and completed.`
}
`
: ""
}
**Key Concepts:**
- Tasks enable "call now, fetch later" patterns
- \`statusMessage\` provides human-readable progress updates
@@ -288,9 +299,7 @@ export const registerSimulateResearchQueryTool = (server: McpServer) => {
* Returns the current status of the research task.
*/
getTask: async (args, extra): Promise<GetTaskResult> => {
const task = await extra.taskStore.getTask(extra.taskId);
// The SDK's RequestTaskStore.getTask throws if not found, so task is always defined
return task;
return await extra.taskStore.getTask(extra.taskId);
},
/**

View File

@@ -31,15 +31,20 @@ const MAX_POLL_ATTEMPTS = 600;
*
* @param {McpServer} server - The McpServer instance where the tool will be registered.
*/
export const registerTriggerElicitationRequestAsyncTool = (server: McpServer) => {
export const registerTriggerElicitationRequestAsyncTool = (
server: McpServer
) => {
// Check client capabilities
const clientCapabilities = server.server.getClientCapabilities() || {};
// Client must support elicitation AND tasks.requests.elicitation
const clientSupportsElicitation = clientCapabilities.elicitation !== undefined;
const clientTasksCapability = clientCapabilities.tasks as {
requests?: { elicitation?: { create?: object } };
} | undefined;
const clientSupportsElicitation =
clientCapabilities.elicitation !== undefined;
const clientTasksCapability = clientCapabilities.tasks as
| {
requests?: { elicitation?: { create?: object } };
}
| undefined;
const clientSupportsAsyncElicitation =
clientTasksCapability?.requests?.elicitation?.create !== undefined;
@@ -56,7 +61,8 @@ export const registerTriggerElicitationRequestAsyncTool = (server: McpServer) =>
task: {
ttl: 600000, // 10 minutes (user input may take a while)
},
message: "Please provide inputs for the following fields (async task demo):",
message:
"Please provide inputs for the following fields (async task demo):",
requestedSchema: {
type: "object" as const,
properties: {
@@ -107,14 +113,18 @@ export const registerTriggerElicitationRequestAsyncTool = (server: McpServer) =>
);
// Check if client returned CreateTaskResult (has task object)
const isTaskResult = 'task' in elicitResponse && elicitResponse.task;
const isTaskResult = "task" in elicitResponse && elicitResponse.task;
if (!isTaskResult) {
// Client executed synchronously - return the direct response
return {
content: [
{
type: "text",
text: `[SYNC] Client executed synchronously:\n${JSON.stringify(elicitResponse, null, 2)}`,
text: `[SYNC] Client executed synchronously:\n${JSON.stringify(
elicitResponse,
null,
2
)}`,
},
],
};
@@ -145,19 +155,27 @@ export const registerTriggerElicitationRequestAsyncTool = (server: McpServer) =>
method: "tasks/get",
params: { taskId },
},
z.object({
status: z.string(),
statusMessage: z.string().optional(),
}).passthrough()
z
.object({
status: z.string(),
statusMessage: z.string().optional(),
})
.passthrough()
);
taskStatus = pollResult.status;
taskStatusMessage = pollResult.statusMessage;
// Only log status changes or every 10 polls to avoid spam
if (attempts === 1 || attempts % 10 === 0 || taskStatus !== "input_required") {
if (
attempts === 1 ||
attempts % 10 === 0 ||
taskStatus !== "input_required"
) {
statusMessages.push(
`Poll ${attempts}: ${taskStatus}${taskStatusMessage ? ` - ${taskStatusMessage}` : ""}`
`Poll ${attempts}: ${taskStatus}${
taskStatusMessage ? ` - ${taskStatusMessage}` : ""
}`
);
}
}
@@ -168,7 +186,9 @@ export const registerTriggerElicitationRequestAsyncTool = (server: McpServer) =>
content: [
{
type: "text",
text: `[TIMEOUT] Task timed out after ${MAX_POLL_ATTEMPTS} poll attempts\n\nProgress:\n${statusMessages.join("\n")}`,
text: `[TIMEOUT] Task timed out after ${MAX_POLL_ATTEMPTS} poll attempts\n\nProgress:\n${statusMessages.join(
"\n"
)}`,
},
],
};
@@ -180,7 +200,9 @@ export const registerTriggerElicitationRequestAsyncTool = (server: McpServer) =>
content: [
{
type: "text",
text: `[${taskStatus.toUpperCase()}] ${taskStatusMessage || "No message"}\n\nProgress:\n${statusMessages.join("\n")}`,
text: `[${taskStatus.toUpperCase()}] ${
taskStatusMessage || "No message"
}\n\nProgress:\n${statusMessages.join("\n")}`,
},
],
};
@@ -207,8 +229,10 @@ export const registerTriggerElicitationRequestAsyncTool = (server: McpServer) =>
const userData = result.content as Record<string, unknown>;
const lines = [];
if (userData.name) lines.push(`- Name: ${userData.name}`);
if (userData.favoriteColor) lines.push(`- Favorite Color: ${userData.favoriteColor}`);
if (userData.agreeToTerms !== undefined) lines.push(`- Agreed to terms: ${userData.agreeToTerms}`);
if (userData.favoriteColor)
lines.push(`- Favorite Color: ${userData.favoriteColor}`);
if (userData.agreeToTerms !== undefined)
lines.push(`- Agreed to terms: ${userData.agreeToTerms}`);
content.push({
type: "text",
@@ -229,7 +253,9 @@ export const registerTriggerElicitationRequestAsyncTool = (server: McpServer) =>
// Include progress and raw result for debugging
content.push({
type: "text",
text: `\nProgress:\n${statusMessages.join("\n")}\n\nRaw result: ${JSON.stringify(result, null, 2)}`,
text: `\nProgress:\n${statusMessages.join(
"\n"
)}\n\nRaw result: ${JSON.stringify(result, null, 2)}`,
});
return { content };

View File

@@ -1,5 +1,8 @@
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { ElicitResultSchema, CallToolResult } from "@modelcontextprotocol/sdk/types.js";
import {
ElicitResultSchema,
CallToolResult,
} from "@modelcontextprotocol/sdk/types.js";
// Tool configuration
const name = "trigger-elicitation-request";

View File

@@ -48,9 +48,11 @@ export const registerTriggerSamplingRequestAsyncTool = (server: McpServer) => {
// Client must support sampling AND tasks.requests.sampling
const clientSupportsSampling = clientCapabilities.sampling !== undefined;
const clientTasksCapability = clientCapabilities.tasks as {
requests?: { sampling?: { createMessage?: object } };
} | undefined;
const clientTasksCapability = clientCapabilities.tasks as
| {
requests?: { sampling?: { createMessage?: object } };
}
| undefined;
const clientSupportsAsyncSampling =
clientTasksCapability?.requests?.sampling?.createMessage !== undefined;
@@ -64,7 +66,9 @@ export const registerTriggerSamplingRequestAsyncTool = (server: McpServer) => {
// Create the sampling request WITH task metadata
// The params.task field signals to the client that this should be executed as a task
const request: CreateMessageRequest & { params: { task?: { ttl: number } } } = {
const request: CreateMessageRequest & {
params: { task?: { ttl: number } };
} = {
method: "sampling/createMessage",
params: {
task: {
@@ -112,14 +116,19 @@ export const registerTriggerSamplingRequestAsyncTool = (server: McpServer) => {
);
// Check if client returned CreateTaskResult (has task object)
const isTaskResult = 'task' in samplingResponse && samplingResponse.task;
const isTaskResult =
"task" in samplingResponse && samplingResponse.task;
if (!isTaskResult) {
// Client executed synchronously - return the direct response
return {
content: [
{
type: "text",
text: `[SYNC] Client executed synchronously:\n${JSON.stringify(samplingResponse, null, 2)}`,
text: `[SYNC] Client executed synchronously:\n${JSON.stringify(
samplingResponse,
null,
2
)}`,
},
],
};
@@ -150,16 +159,20 @@ export const registerTriggerSamplingRequestAsyncTool = (server: McpServer) => {
method: "tasks/get",
params: { taskId },
},
z.object({
status: z.string(),
statusMessage: z.string().optional(),
}).passthrough()
z
.object({
status: z.string(),
statusMessage: z.string().optional(),
})
.passthrough()
);
taskStatus = pollResult.status;
taskStatusMessage = pollResult.statusMessage;
statusMessages.push(
`Poll ${attempts}: ${taskStatus}${taskStatusMessage ? ` - ${taskStatusMessage}` : ""}`
`Poll ${attempts}: ${taskStatus}${
taskStatusMessage ? ` - ${taskStatusMessage}` : ""
}`
);
}
@@ -169,7 +182,9 @@ export const registerTriggerSamplingRequestAsyncTool = (server: McpServer) => {
content: [
{
type: "text",
text: `[TIMEOUT] Task timed out after ${MAX_POLL_ATTEMPTS} poll attempts\n\nProgress:\n${statusMessages.join("\n")}`,
text: `[TIMEOUT] Task timed out after ${MAX_POLL_ATTEMPTS} poll attempts\n\nProgress:\n${statusMessages.join(
"\n"
)}`,
},
],
};
@@ -181,7 +196,9 @@ export const registerTriggerSamplingRequestAsyncTool = (server: McpServer) => {
content: [
{
type: "text",
text: `[${taskStatus.toUpperCase()}] ${taskStatusMessage || "No message"}\n\nProgress:\n${statusMessages.join("\n")}`,
text: `[${taskStatus.toUpperCase()}] ${
taskStatusMessage || "No message"
}\n\nProgress:\n${statusMessages.join("\n")}`,
},
],
};
@@ -201,7 +218,9 @@ export const registerTriggerSamplingRequestAsyncTool = (server: McpServer) => {
content: [
{
type: "text",
text: `[COMPLETED] Async sampling completed!\n\n**Progress:**\n${statusMessages.join("\n")}\n\n**Result:**\n${JSON.stringify(result, null, 2)}`,
text: `[COMPLETED] Async sampling completed!\n\n**Progress:**\n${statusMessages.join(
"\n"
)}\n\n**Result:**\n${JSON.stringify(result, null, 2)}`,
},
],
};

View File

@@ -1,4 +1,7 @@
import { StreamableHTTPServerTransport, EventStore } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import {
StreamableHTTPServerTransport,
EventStore,
} from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import express, { Request, Response } from "express";
import { createServer } from "../server/index.js";
import { randomUUID } from "node:crypto";
@@ -6,7 +9,8 @@ import cors from "cors";
// Simple in-memory event store for SSE resumability
class InMemoryEventStore implements EventStore {
private events: Map<string, { streamId: string; message: unknown }> = new Map();
private events: Map<string, { streamId: string; message: unknown }> =
new Map();
async storeEvent(streamId: string, message: unknown): Promise<string> {
const eventId = randomUUID();