This commit is contained in:
Joshua Coles 2025-03-17 15:55:55 +00:00
parent 6ba2e9e8df
commit 96ae3b2198
5 changed files with 77 additions and 59 deletions

View File

@ -1,21 +1,23 @@
use std::process::Stdio; use crate::types::{
CallToolRequestParams, ClientCapabilities, ListToolsRequestParams, ListToolsResult,
};
use jsonrpsee::core::client::{ use jsonrpsee::core::client::{
Client, ClientBuilder, ClientT, TransportReceiverT, TransportSenderT, Client, ClientBuilder, ClientT, TransportReceiverT, TransportSenderT,
}; };
use jsonrpsee::core::traits::ToRpcParams; use jsonrpsee::core::traits::ToRpcParams;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
use serde::Serialize; use serde::Serialize;
use serde_json::json; use serde_json::json;
use std::process::Stdio;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
use tokio::select; use tokio::select;
use types::{Implementation, InitializeRequestParams, InitializeResult}; use types::{Implementation, InitializeRequestParams, InitializeResult};
use crate::types::{CallToolRequestParams, ClientCapabilities, ListToolsRequestParams, ListToolsResult};
mod types;
mod rpc_helpers;
mod mcp_client; mod mcp_client;
mod rpc_helpers;
mod stdio_transport; mod stdio_transport;
use rpc_helpers::*; mod types;
use crate::mcp_client::McpClientConnection; use crate::mcp_client::McpClientConnection;
use rpc_helpers::*;
#[tokio::main] #[tokio::main]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
@ -39,8 +41,9 @@ async fn main() -> anyhow::Result<()> {
version: "0.0.1".to_string(), version: "0.0.1".to_string(),
}, },
protocol_version: "2024-11-05".to_string(), protocol_version: "2024-11-05".to_string(),
} },
).await?; )
.await?;
dbg!(client.list_tools()); dbg!(client.list_tools());

View File

@ -1,27 +1,17 @@
use crate::mcp_client::McpClientConnection; use crate::mcp_client::McpClientConnection;
use jsonrpsee::async_client::{Client, ClientBuilder};
use jsonrpsee::core::client::ClientT;
use magnus::prelude::*; use magnus::prelude::*;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use tokio::process::Command;
mod mcp_client; mod mcp_client;
mod rpc_helpers; mod rpc_helpers;
mod stdio_transport; mod stdio_transport;
mod types; mod types;
use crate::types::{ use crate::types::{CallToolRequestParams, Implementation, InitializeRequestParams};
CallToolRequestParams, Implementation, InitializeRequestParams, InitializeResult,
};
use magnus::{ use magnus::{
function, method, eval, function, method, prelude::*, Error, ExceptionClass, RHash, Ruby, TryConvert, Value,
prelude::*,
scan_args::{get_kwargs, scan_args},
typed_data, Error, RHash, Ruby, Symbol, TryConvert, Value,
}; };
use serde_magnus::serialize; use serde_magnus::serialize;
use std::hash::{Hash, Hasher};
use std::sync::{Arc, MutexGuard};
use tokio::sync::Mutex; use tokio::sync::Mutex;
// Create global runtime // Create global runtime
@ -58,7 +48,7 @@ impl McpClientRb {
}) })
} }
fn dispose(&self) { fn disconnect(&self) {
RUNTIME.block_on(async { RUNTIME.block_on(async {
self.client.lock().await.take(); self.client.lock().await.take();
}) })
@ -71,7 +61,11 @@ impl McpClientRb {
.lock() .lock()
.await .await
.as_ref() .as_ref()
.unwrap() .ok_or(Error::new(
ExceptionClass::from_value(eval("Mcp::ClientDisconnectedError").unwrap())
.unwrap(),
"Client is not connected",
))?
.list_tools() .list_tools()
.await; .await;
@ -85,16 +79,8 @@ impl McpClientRb {
}) })
} }
fn call_tool(&self, values: &[Value]) -> Result<Value, magnus::Error> { fn call_tool(&self, name: String, rhash: RHash) -> Result<Value, magnus::Error> {
let args = scan_args::<(Value,), (), (), (), RHash, ()>(values)?; let kwargs: serde_json::Map<String, serde_json::Value> = serde_magnus::deserialize(rhash)?;
let ((name,)) = args.required;
let kwargs: RHash = args.keywords;
let kwargs: serde_json::Map<String, serde_json::Value> = serde_magnus::deserialize(kwargs)?;
let name = match Symbol::from_value(name) {
Some(symbol) => symbol.name()?.to_string(),
None => String::try_convert(name)?,
};
RUNTIME.block_on(async { RUNTIME.block_on(async {
let a = self let a = self
@ -102,7 +88,11 @@ impl McpClientRb {
.lock() .lock()
.await .await
.as_ref() .as_ref()
.unwrap() .ok_or(Error::new(
ExceptionClass::from_value(eval("Mcp::ClientDisconnectedError").unwrap())
.unwrap(),
"Client is not connected",
))?
.call_tool::<serde_json::Value>(CallToolRequestParams { .call_tool::<serde_json::Value>(CallToolRequestParams {
name, name,
arguments: kwargs, arguments: kwargs,
@ -137,8 +127,8 @@ fn init(ruby: &Ruby) -> Result<(), Error> {
client_class.define_singleton_method("new", function!(McpClientRb::new, 2))?; client_class.define_singleton_method("new", function!(McpClientRb::new, 2))?;
client_class.define_method("list_tools", method!(McpClientRb::list_tools, 0))?; client_class.define_method("list_tools", method!(McpClientRb::list_tools, 0))?;
client_class.define_method("call_tool", method!(McpClientRb::call_tool, -1))?; client_class.define_method("call_tool", method!(McpClientRb::call_tool, 2))?;
client_class.define_method("dispose", method!(McpClientRb::dispose, 0))?; client_class.define_method("disconnect", method!(McpClientRb::disconnect, 0))?;
Ok(()) Ok(())
} }

View File

@ -1,13 +1,15 @@
use std::path::Path;
use std::sync::Arc;
use jsonrpsee::async_client::{Client, ClientBuilder};
use jsonrpsee::core::client::ClientT;
use tokio::io::{BufReader};
use tokio::process::{Child, Command, ChildStdin, ChildStdout};
use tokio::sync::Mutex;
use crate::rpc_helpers::{NoParams, ToRpcArg}; use crate::rpc_helpers::{NoParams, ToRpcArg};
use crate::stdio_transport::Adapter; use crate::stdio_transport::Adapter;
use crate::types::{CallToolRequestParams, InitializeRequestParams, InitializeResult, ListToolsRequestParams, ListToolsResult, Tool}; use crate::types::{
CallToolRequestParams, InitializeRequestParams, InitializeResult, ListToolsRequestParams,
ListToolsResult, Tool,
};
use jsonrpsee::async_client::{Client, ClientBuilder};
use jsonrpsee::core::client::ClientT;
use std::sync::Arc;
use tokio::io::BufReader;
use tokio::process::{Child, ChildStdin, ChildStdout, Command};
use tokio::sync::Mutex;
enum TransportHandle { enum TransportHandle {
Stdio { Stdio {
@ -24,7 +26,11 @@ pub struct McpClientConnection {
} }
impl McpClientConnection { impl McpClientConnection {
pub async fn new_stdio(command: String, args: Vec<String>, init_params: InitializeRequestParams) -> Result<Self, anyhow::Error> { pub async fn new_stdio(
command: String,
args: Vec<String>,
init_params: InitializeRequestParams,
) -> Result<Self, anyhow::Error> {
let mut child = Command::new(command) let mut child = Command::new(command)
.args(args) .args(args)
.stdin(std::process::Stdio::piped()) .stdin(std::process::Stdio::piped())
@ -36,20 +42,30 @@ impl McpClientConnection {
let stdin = Arc::new(Mutex::new(child.stdin.take().unwrap())); let stdin = Arc::new(Mutex::new(child.stdin.take().unwrap()));
let stdout = Arc::new(Mutex::new(BufReader::new(child.stdout.take().unwrap()))); let stdout = Arc::new(Mutex::new(BufReader::new(child.stdout.take().unwrap())));
let client = ClientBuilder::default().build_with_tokio( let client = ClientBuilder::default()
Adapter(stdin.clone()), .build_with_tokio(Adapter(stdin.clone()), Adapter(stdout.clone()));
Adapter(stdout.clone()),
);
let new_client = Self { transport: TransportHandle::Stdio { child, stdin, stdout }, client }; let new_client = Self {
transport: TransportHandle::Stdio {
child,
stdin,
stdout,
},
client,
};
new_client.initialize(init_params).await?; new_client.initialize(init_params).await?;
Ok(new_client) Ok(new_client)
} }
async fn initialize(&self, params: InitializeRequestParams) -> Result<InitializeResult, anyhow::Error> { async fn initialize(
&self,
params: InitializeRequestParams,
) -> Result<InitializeResult, anyhow::Error> {
let result: InitializeResult = self.client.request("initialize", params.to_rpc()).await?; let result: InitializeResult = self.client.request("initialize", params.to_rpc()).await?;
self.client.notification("notifications/initialized", NoParams).await?; self.client
.notification("notifications/initialized", NoParams)
.await?;
Ok(result) Ok(result)
} }
@ -60,14 +76,26 @@ impl McpClientConnection {
tools.extend(result.tools); tools.extend(result.tools);
while let Some(cursor) = result.next_cursor.as_ref() { while let Some(cursor) = result.next_cursor.as_ref() {
let result: ListToolsResult = self.client.request("tools/list", ListToolsRequestParams { cursor: Some(cursor.clone()) }.to_rpc()).await?; let result: ListToolsResult = self
.client
.request(
"tools/list",
ListToolsRequestParams {
cursor: Some(cursor.clone()),
}
.to_rpc(),
)
.await?;
tools.extend(result.tools); tools.extend(result.tools);
} }
Ok(tools) Ok(tools)
} }
pub async fn call_tool<T: serde::de::DeserializeOwned>(&self, params: CallToolRequestParams) -> Result<T, anyhow::Error> { pub async fn call_tool<T: serde::de::DeserializeOwned>(
&self,
params: CallToolRequestParams,
) -> Result<T, anyhow::Error> {
Ok(self.client.request("tools/call", params.to_rpc()).await?) Ok(self.client.request("tools/call", params.to_rpc()).await?)
} }
} }

View File

@ -1,7 +1,7 @@
use jsonrpsee::core::traits::ToRpcParams; use jsonrpsee::core::traits::ToRpcParams;
use serde::Serialize; use serde::Serialize;
use serde_json::Error;
use serde_json::value::RawValue; use serde_json::value::RawValue;
use serde_json::Error;
pub struct RpcArg<T>(T); pub struct RpcArg<T>(T);

View File

@ -1,11 +1,8 @@
use std::sync::Arc;
use tokio::process::{Child, ChildStdin, ChildStdout, Command};
use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncWriteExt, BufReader};
use jsonrpsee::core::async_trait; use jsonrpsee::core::async_trait;
use jsonrpsee::core::client::{ReceivedMessage, TransportReceiverT, TransportSenderT}; use jsonrpsee::core::client::{ReceivedMessage, TransportReceiverT, TransportSenderT};
use keepcalm::SharedMut; use std::sync::Arc;
use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncWriteExt};
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tracing::debug;
pub struct Adapter<T>(pub Arc<Mutex<T>>); pub struct Adapter<T>(pub Arc<Mutex<T>>);