From 96ae3b219879f5412cb4837fec7c29ebea8774bf Mon Sep 17 00:00:00 2001 From: Joshua Coles Date: Mon, 17 Mar 2025 15:55:55 +0000 Subject: [PATCH] Stash --- ext/mcp/src/internal-test.rs | 19 +++++----- ext/mcp/src/lib.rs | 44 +++++++++-------------- ext/mcp/src/mcp_client.rs | 64 ++++++++++++++++++++++++---------- ext/mcp/src/rpc_helpers.rs | 2 +- ext/mcp/src/stdio_transport.rs | 7 ++-- 5 files changed, 77 insertions(+), 59 deletions(-) diff --git a/ext/mcp/src/internal-test.rs b/ext/mcp/src/internal-test.rs index dcb59a7..10dbd4a 100644 --- a/ext/mcp/src/internal-test.rs +++ b/ext/mcp/src/internal-test.rs @@ -1,21 +1,23 @@ -use std::process::Stdio; +use crate::types::{ + CallToolRequestParams, ClientCapabilities, ListToolsRequestParams, ListToolsResult, +}; use jsonrpsee::core::client::{ Client, ClientBuilder, ClientT, TransportReceiverT, TransportSenderT, }; use jsonrpsee::core::traits::ToRpcParams; -use tokio::io::{AsyncBufReadExt, AsyncWriteExt}; use serde::Serialize; use serde_json::json; +use std::process::Stdio; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt}; use tokio::select; use types::{Implementation, InitializeRequestParams, InitializeResult}; -use crate::types::{CallToolRequestParams, ClientCapabilities, ListToolsRequestParams, ListToolsResult}; -mod types; -mod rpc_helpers; mod mcp_client; +mod rpc_helpers; mod stdio_transport; -use rpc_helpers::*; +mod types; use crate::mcp_client::McpClientConnection; +use rpc_helpers::*; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -39,8 +41,9 @@ async fn main() -> anyhow::Result<()> { version: "0.0.1".to_string(), }, protocol_version: "2024-11-05".to_string(), - } - ).await?; + }, + ) + .await?; dbg!(client.list_tools()); diff --git a/ext/mcp/src/lib.rs b/ext/mcp/src/lib.rs index 69e6e13..82903ce 100644 --- a/ext/mcp/src/lib.rs +++ b/ext/mcp/src/lib.rs @@ -1,27 +1,17 @@ use crate::mcp_client::McpClientConnection; -use jsonrpsee::async_client::{Client, ClientBuilder}; -use jsonrpsee::core::client::ClientT; use magnus::prelude::*; use once_cell::sync::Lazy; -use tokio::process::Command; mod mcp_client; mod rpc_helpers; mod stdio_transport; mod types; -use crate::types::{ - CallToolRequestParams, Implementation, InitializeRequestParams, InitializeResult, -}; +use crate::types::{CallToolRequestParams, Implementation, InitializeRequestParams}; use magnus::{ - function, method, - prelude::*, - scan_args::{get_kwargs, scan_args}, - typed_data, Error, RHash, Ruby, Symbol, TryConvert, Value, + eval, function, method, prelude::*, Error, ExceptionClass, RHash, Ruby, TryConvert, Value, }; use serde_magnus::serialize; -use std::hash::{Hash, Hasher}; -use std::sync::{Arc, MutexGuard}; use tokio::sync::Mutex; // Create global runtime @@ -58,7 +48,7 @@ impl McpClientRb { }) } - fn dispose(&self) { + fn disconnect(&self) { RUNTIME.block_on(async { self.client.lock().await.take(); }) @@ -71,7 +61,11 @@ impl McpClientRb { .lock() .await .as_ref() - .unwrap() + .ok_or(Error::new( + ExceptionClass::from_value(eval("Mcp::ClientDisconnectedError").unwrap()) + .unwrap(), + "Client is not connected", + ))? .list_tools() .await; @@ -85,16 +79,8 @@ impl McpClientRb { }) } - fn call_tool(&self, values: &[Value]) -> Result { - let args = scan_args::<(Value,), (), (), (), RHash, ()>(values)?; - let ((name,)) = args.required; - let kwargs: RHash = args.keywords; - let kwargs: serde_json::Map = serde_magnus::deserialize(kwargs)?; - - let name = match Symbol::from_value(name) { - Some(symbol) => symbol.name()?.to_string(), - None => String::try_convert(name)?, - }; + fn call_tool(&self, name: String, rhash: RHash) -> Result { + let kwargs: serde_json::Map = serde_magnus::deserialize(rhash)?; RUNTIME.block_on(async { let a = self @@ -102,7 +88,11 @@ impl McpClientRb { .lock() .await .as_ref() - .unwrap() + .ok_or(Error::new( + ExceptionClass::from_value(eval("Mcp::ClientDisconnectedError").unwrap()) + .unwrap(), + "Client is not connected", + ))? .call_tool::(CallToolRequestParams { name, arguments: kwargs, @@ -137,8 +127,8 @@ fn init(ruby: &Ruby) -> Result<(), Error> { client_class.define_singleton_method("new", function!(McpClientRb::new, 2))?; client_class.define_method("list_tools", method!(McpClientRb::list_tools, 0))?; - client_class.define_method("call_tool", method!(McpClientRb::call_tool, -1))?; - client_class.define_method("dispose", method!(McpClientRb::dispose, 0))?; + client_class.define_method("call_tool", method!(McpClientRb::call_tool, 2))?; + client_class.define_method("disconnect", method!(McpClientRb::disconnect, 0))?; Ok(()) } diff --git a/ext/mcp/src/mcp_client.rs b/ext/mcp/src/mcp_client.rs index d2916c3..89f79a1 100644 --- a/ext/mcp/src/mcp_client.rs +++ b/ext/mcp/src/mcp_client.rs @@ -1,13 +1,15 @@ -use std::path::Path; -use std::sync::Arc; -use jsonrpsee::async_client::{Client, ClientBuilder}; -use jsonrpsee::core::client::ClientT; -use tokio::io::{BufReader}; -use tokio::process::{Child, Command, ChildStdin, ChildStdout}; -use tokio::sync::Mutex; use crate::rpc_helpers::{NoParams, ToRpcArg}; use crate::stdio_transport::Adapter; -use crate::types::{CallToolRequestParams, InitializeRequestParams, InitializeResult, ListToolsRequestParams, ListToolsResult, Tool}; +use crate::types::{ + CallToolRequestParams, InitializeRequestParams, InitializeResult, ListToolsRequestParams, + ListToolsResult, Tool, +}; +use jsonrpsee::async_client::{Client, ClientBuilder}; +use jsonrpsee::core::client::ClientT; +use std::sync::Arc; +use tokio::io::BufReader; +use tokio::process::{Child, ChildStdin, ChildStdout, Command}; +use tokio::sync::Mutex; enum TransportHandle { Stdio { @@ -24,7 +26,11 @@ pub struct McpClientConnection { } impl McpClientConnection { - pub async fn new_stdio(command: String, args: Vec, init_params: InitializeRequestParams) -> Result { + pub async fn new_stdio( + command: String, + args: Vec, + init_params: InitializeRequestParams, + ) -> Result { let mut child = Command::new(command) .args(args) .stdin(std::process::Stdio::piped()) @@ -36,20 +42,30 @@ impl McpClientConnection { let stdin = Arc::new(Mutex::new(child.stdin.take().unwrap())); let stdout = Arc::new(Mutex::new(BufReader::new(child.stdout.take().unwrap()))); - let client = ClientBuilder::default().build_with_tokio( - Adapter(stdin.clone()), - Adapter(stdout.clone()), - ); + let client = ClientBuilder::default() + .build_with_tokio(Adapter(stdin.clone()), Adapter(stdout.clone())); - let new_client = Self { transport: TransportHandle::Stdio { child, stdin, stdout }, client }; + let new_client = Self { + transport: TransportHandle::Stdio { + child, + stdin, + stdout, + }, + client, + }; new_client.initialize(init_params).await?; Ok(new_client) } - async fn initialize(&self, params: InitializeRequestParams) -> Result { + async fn initialize( + &self, + params: InitializeRequestParams, + ) -> Result { let result: InitializeResult = self.client.request("initialize", params.to_rpc()).await?; - self.client.notification("notifications/initialized", NoParams).await?; + self.client + .notification("notifications/initialized", NoParams) + .await?; Ok(result) } @@ -60,14 +76,26 @@ impl McpClientConnection { tools.extend(result.tools); while let Some(cursor) = result.next_cursor.as_ref() { - let result: ListToolsResult = self.client.request("tools/list", ListToolsRequestParams { cursor: Some(cursor.clone()) }.to_rpc()).await?; + let result: ListToolsResult = self + .client + .request( + "tools/list", + ListToolsRequestParams { + cursor: Some(cursor.clone()), + } + .to_rpc(), + ) + .await?; tools.extend(result.tools); } Ok(tools) } - pub async fn call_tool(&self, params: CallToolRequestParams) -> Result { + pub async fn call_tool( + &self, + params: CallToolRequestParams, + ) -> Result { Ok(self.client.request("tools/call", params.to_rpc()).await?) } } diff --git a/ext/mcp/src/rpc_helpers.rs b/ext/mcp/src/rpc_helpers.rs index 6e303e8..2a3fb27 100644 --- a/ext/mcp/src/rpc_helpers.rs +++ b/ext/mcp/src/rpc_helpers.rs @@ -1,7 +1,7 @@ use jsonrpsee::core::traits::ToRpcParams; use serde::Serialize; -use serde_json::Error; use serde_json::value::RawValue; +use serde_json::Error; pub struct RpcArg(T); diff --git a/ext/mcp/src/stdio_transport.rs b/ext/mcp/src/stdio_transport.rs index 49bd106..1af5861 100644 --- a/ext/mcp/src/stdio_transport.rs +++ b/ext/mcp/src/stdio_transport.rs @@ -1,11 +1,8 @@ -use std::sync::Arc; -use tokio::process::{Child, ChildStdin, ChildStdout, Command}; -use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncWriteExt, BufReader}; use jsonrpsee::core::async_trait; use jsonrpsee::core::client::{ReceivedMessage, TransportReceiverT, TransportSenderT}; -use keepcalm::SharedMut; +use std::sync::Arc; +use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncWriteExt}; use tokio::sync::Mutex; -use tracing::debug; pub struct Adapter(pub Arc>);