From 90fabce9d57c46fde3565aac6350fd4b4aa4d63c Mon Sep 17 00:00:00 2001 From: Ben Borla Date: Mon, 9 Dec 2024 15:11:37 +0800 Subject: [PATCH] refactored code for better readability and maintainability --- src/mysql/index.ts | 330 +++++++++++++++++++++++++-------------------- 1 file changed, 186 insertions(+), 144 deletions(-) diff --git a/src/mysql/index.ts b/src/mysql/index.ts index e15f2dae..82f0e4d9 100644 --- a/src/mysql/index.ts +++ b/src/mysql/index.ts @@ -8,7 +8,7 @@ import { ListToolsRequestSchema, ReadResourceRequestSchema, } from "@modelcontextprotocol/sdk/types.js"; -import mysql, { MysqlError, PoolConnection, OkPacket } from "mysql"; +import mysql, { MysqlError, PoolConnection } from "mysql"; type MySQLErrorType = MysqlError | null; @@ -21,187 +21,229 @@ interface ColumnRow { data_type: string; } -type QueryResult = OkPacket | any[] | any; -const server = new Server( - { +const config = { + server: { name: "example-servers/mysql", version: "0.1.0", }, - { - capabilities: { - resources: {}, - tools: {}, - }, + mysql: { + host: process.env.MYSQL_HOST || "127.0.0.1", + port: Number(process.env.MYSQL_PORT || "3306"), + user: process.env.MYSQL_USER || "root", + password: process.env.MYSQL_PASS || "", + database: process.env.MYSQL_DB || "", + connectionLimit: 10, }, -); + paths: { + schema: "schema", + }, +}; -const MYSQL_HOST = process.env.MYSQL_HOST || "127.0.0.1"; -const MYSQL_PORT = process.env.MYSQL_PORT || "3306"; -const MYSQL_USER = process.env.MYSQL_USER || "root"; -const MYSQL_PASS = process.env.MYSQL_PASS || ""; -const MYSQL_DB = process.env.MYSQL_DB || ""; +const mysqlQuery = ( + connection: PoolConnection, + sql: string, + params: any[] = [], +): Promise => { + return new Promise((resolve, reject) => { + connection.query(sql, params, (error: MySQLErrorType, results: any) => { + if (error) reject(error); + else resolve(results); + }); + }); +}; -const pool = mysql.createPool({ - connectionLimit: 10, - host: MYSQL_HOST, - port: Number(MYSQL_PORT), - user: MYSQL_USER, - password: MYSQL_PASS, - database: MYSQL_DB, +const mysqlGetConnection = (pool: mysql.Pool): Promise => { + return new Promise( + ( + resolve: (value: PoolConnection | PromiseLike) => void, + reject, + ) => { + pool.getConnection( + (error: MySQLErrorType, connection: PoolConnection) => { + if (error) reject(error); + else resolve(connection); + }, + ); + }, + ); +}; + +const mysqlBeginTransaction = (connection: PoolConnection): Promise => { + return new Promise((resolve, reject) => { + connection.beginTransaction((error: MySQLErrorType) => { + if (error) reject(error); + else resolve(); + }); + }); +}; + +const mysqlRollback = (connection: PoolConnection): Promise => { + return new Promise((resolve, _) => { + connection.rollback(() => resolve()); + }); +}; + +const pool = mysql.createPool(config.mysql); +const server = new Server(config.server, { + capabilities: { + resources: {}, + tools: {}, + }, }); -const SCHEMA_PATH = "schema"; +async function executeQuery(sql: string, params: any[] = []): Promise { + const connection = await mysqlGetConnection(pool); + try { + const results = await mysqlQuery(connection, sql, params); + return results; + } finally { + connection.release(); + } +} +async function executeReadOnlyQuery(sql: string): Promise { + const connection = await mysqlGetConnection(pool); + + try { + // Set read-only mode + await mysqlQuery(connection, "SET SESSION TRANSACTION READ ONLY"); + + // Begin transaction + await mysqlBeginTransaction(connection); + + // Execute query + const results = await mysqlQuery(connection, sql); + + // Rollback transaction (since it's read-only) + await mysqlRollback(connection); + + // Reset to read-write mode + await mysqlQuery(connection, "SET SESSION TRANSACTION READ WRITE"); + + return { + content: [ + { + type: "text", + text: JSON.stringify(results, null, 2), + }, + ], + isError: false, + }; + } catch (error) { + await mysqlRollback(connection); + throw error; + } finally { + connection.release(); + } +} + +// Request handlers server.setRequestHandler(ListResourcesRequestSchema, async () => { - return new Promise((resolve, reject) => { - pool.query( - "SELECT table_name FROM information_schema.tables WHERE table_schema = DATABASE()", - (error: MySQLErrorType, results: TableRow[]) => { - if (error) reject(error); - resolve({ - resources: results.map((row: TableRow) => ({ - uri: new URL( - `${row.table_name}/${SCHEMA_PATH}`, - `${MYSQL_HOST}:${MYSQL_PORT}`, - ).href, - mimeType: "application/json", - name: `"${row.table_name}" database schema`, - })), - }); - }, - ); - }); + const results = (await executeQuery( + "SELECT table_name FROM information_schema.tables WHERE table_schema = DATABASE()", + )) as TableRow[]; + + return { + resources: results.map((row: TableRow) => ({ + uri: new URL( + `${row.table_name}/${config.paths.schema}`, + `${config.mysql.host}:${config.mysql.port}`, + ).href, + mimeType: "application/json", + name: `"${row.table_name}" database schema`, + })), + }; }); server.setRequestHandler(ReadResourceRequestSchema, async (request) => { const resourceUrl = new URL(request.params.uri); - const pathComponents = resourceUrl.pathname.split("/"); const schema = pathComponents.pop(); const tableName = pathComponents.pop(); - if (schema !== SCHEMA_PATH) { + if (schema !== config.paths.schema) { throw new Error("Invalid resource URI"); } - return new Promise((resolve, reject) => { - pool.query( - "SELECT column_name, data_type FROM information_schema.columns WHERE table_name = ?", - [tableName], - (error: MySQLErrorType, results: ColumnRow[]) => { - if (error) reject(error); - resolve({ - contents: [ - { - uri: request.params.uri, - mimeType: "application/json", - text: JSON.stringify(results, null, 2), - }, - ], - }); - }, - ); - }); -}); + const results = (await executeQuery( + "SELECT column_name, data_type FROM information_schema.columns WHERE table_name = ?", + [tableName], + )) as ColumnRow[]; -server.setRequestHandler(ListToolsRequestSchema, async () => { return { - tools: [ + contents: [ { - name: "mysql_query", - description: "Run a read-only MySQL query", - inputSchema: { - type: "object", - properties: { - sql: { type: "string" }, - }, - }, + uri: request.params.uri, + mimeType: "application/json", + text: JSON.stringify(results, null, 2), }, ], }; }); +server.setRequestHandler(ListToolsRequestSchema, async () => ({ + tools: [ + { + name: "mysql_query", + description: "Run a read-only MySQL query", + inputSchema: { + type: "object", + properties: { + sql: { type: "string" }, + }, + }, + }, + ], +})); + server.setRequestHandler(CallToolRequestSchema, async (request) => { - if (request.params.name === "mysql_query") { - const sql = request.params.arguments?.sql as string; - - return new Promise((resolve, reject) => { - pool.getConnection((err: MySQLErrorType, connection: PoolConnection) => { - if (err) reject(err); - - // @INFO: Set session to read only BEFORE beginning the transaction - connection.query( - "SET SESSION TRANSACTION READ ONLY", - (err: MySQLErrorType) => { - if (err) { - connection.release(); - reject(err); - return; - } - - connection.beginTransaction((err: MySQLErrorType) => { - if (err) { - connection.release(); - reject(err); - return; - } - - connection.query( - sql, - (error: MySQLErrorType, results: QueryResult) => { - if (error) { - connection.rollback(() => { - connection.release(); - reject(error); - }); - return; - } - - // @INFO: Reset the transaction mode back to default before releasing - connection.rollback(() => { - connection.query( - "SET SESSION TRANSACTION READ WRITE", - (err: MySQLErrorType) => { - connection.release(); - if (err) { - console.warn( - "Failed to reset transaction mode:", - err, - ); - } - resolve({ - content: [ - { - type: "text", - text: JSON.stringify(results, null, 2), - }, - ], - isError: false, - }); - }, - ); - }); - }, - ); - }); - }, - ); - }); - }); + if (request.params.name !== "mysql_query") { + throw new Error(`Unknown tool: ${request.params.name}`); } - throw new Error(`Unknown tool: ${request.params.name}`); + + const sql = request.params.arguments?.sql as string; + return executeReadOnlyQuery(sql); }); +// Server startup and shutdown async function runServer() { const transport = new StdioServerTransport(); await server.connect(transport); } -process.on("SIGINT", () => { - pool.end((err: MySQLErrorType) => { - if (err) console.error("Error closing pool:", err); - process.exit(err ? 1 : 0); +const shutdown = async (signal: string) => { + console.log(`Received ${signal}. Shutting down...`); + return new Promise((resolve, reject) => { + pool.end((err: MySQLErrorType) => { + if (err) { + console.error("Error closing pool:", err); + reject(err); + } else { + resolve(); + } + }); }); +}; + +process.on("SIGINT", async () => { + try { + await shutdown("SIGINT"); + process.exit(0); + } catch (err) { + process.exit(1); + } }); -runServer().catch(console.error); +process.on("SIGTERM", async () => { + try { + await shutdown("SIGTERM"); + process.exit(0); + } catch (err) { + process.exit(1); + } +}); + +runServer().catch((error: unknown) => { + console.error("Server error:", error); + process.exit(1); +});