This commit is contained in:
Joshua Coles 2025-03-17 15:34:35 +00:00
parent 36eb457898
commit 8cfd28d3f6
4 changed files with 24 additions and 19 deletions

View File

@ -15,7 +15,7 @@ mod rpc_helpers;
mod mcp_client; mod mcp_client;
mod stdio_transport; mod stdio_transport;
use rpc_helpers::*; use rpc_helpers::*;
use crate::mcp_client::McpClient; use crate::mcp_client::McpClientConnection;
#[tokio::main] #[tokio::main]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
@ -29,7 +29,7 @@ async fn main() -> anyhow::Result<()> {
.with_level(true) .with_level(true)
.init(); .init();
let client = McpClient::new_stdio( let client = McpClientConnection::new_stdio(
"/Users/joshuacoles/.local/bin/mcp-server-fetch".to_string(), "/Users/joshuacoles/.local/bin/mcp-server-fetch".to_string(),
vec![], vec![],
InitializeRequestParams { InitializeRequestParams {

View File

@ -1,4 +1,4 @@
use crate::mcp_client::McpClient; use crate::mcp_client::McpClientConnection;
use jsonrpsee::async_client::{Client, ClientBuilder}; use jsonrpsee::async_client::{Client, ClientBuilder};
use jsonrpsee::core::client::ClientT; use jsonrpsee::core::client::ClientT;
use magnus::prelude::*; use magnus::prelude::*;
@ -30,14 +30,14 @@ static RUNTIME: Lazy<tokio::runtime::Runtime> =
#[magnus::wrap(class = "Mcp::Client", free_immediately, size)] #[magnus::wrap(class = "Mcp::Client", free_immediately, size)]
struct McpClientRb { struct McpClientRb {
client: Mutex<Option<McpClient>>, client: Mutex<Option<McpClientConnection>>,
} }
impl McpClientRb { impl McpClientRb {
fn new(command: String, args: Vec<String>) -> Result<Self, magnus::Error> { fn new(command: String, args: Vec<String>) -> Result<Self, magnus::Error> {
let client = RUNTIME let client = RUNTIME
.block_on(async { .block_on(async {
McpClient::new_stdio( McpClientConnection::new_stdio(
command, command,
args, args,
InitializeRequestParams { InitializeRequestParams {
@ -58,7 +58,7 @@ impl McpClientRb {
}) })
} }
async fn client<'a>(&'a self) -> Result<&'a McpClient, Error> { async fn client<'a>(&'a self) -> Result<&'a McpClientConnection, Error> {
self.client.lock().await.ok_or(Error::new( self.client.lock().await.ok_or(Error::new(
magnus::exception::runtime_error(), magnus::exception::runtime_error(),
"Client is not initialized".to_string(), "Client is not initialized".to_string(),

View File

@ -1,46 +1,50 @@
use std::path::Path; use std::path::Path;
use jsonrpsee::async_client::{Client, ClientBuilder}; use jsonrpsee::async_client::{Client, ClientBuilder};
use jsonrpsee::core::client::ClientT; use jsonrpsee::core::client::ClientT;
use tokio::io::BufReader; use keepcalm::SharedMut;
use tokio::io::{BufReader, Stdin};
use tokio::process::{Child, Command}; use tokio::process::{Child, Command};
use crate::rpc_helpers::{NoParams, ToRpcArg}; use crate::rpc_helpers::{NoParams, ToRpcArg};
use crate::stdio_transport;
use crate::stdio_transport::Adapter; use crate::stdio_transport::Adapter;
use crate::types::{CallToolRequestParams, InitializeRequestParams, InitializeResult, ListToolsRequestParams, ListToolsResult, Tool}; use crate::types::{CallToolRequestParams, InitializeRequestParams, InitializeResult, ListToolsRequestParams, ListToolsResult, Tool};
enum TransportHandle { enum TransportHandle {
Stdio(Child), Stdio {
child: Child,
stdin: SharedMut<Stdin>,
stdout: SharedMut<BufReader<tokio::process::ChildStdout>>,
},
} }
pub struct McpClient { pub struct McpClientConnection {
pub(crate) transport: TransportHandle, pub(crate) transport: TransportHandle,
pub(crate) client: Client, pub(crate) client: Client,
} }
impl McpClient { /// This represents a live MCP connection to an MCP server.
impl McpClientConnection {
pub async fn new_stdio(command: String, args: Vec<String>, init_params: InitializeRequestParams) -> Result<Self, anyhow::Error> { pub async fn new_stdio(command: String, args: Vec<String>, init_params: InitializeRequestParams) -> Result<Self, anyhow::Error> {
let mut child = Command::new(command) let mut child = Command::new(command)
.args(args) .args(args)
.stdin(std::process::Stdio::piped()) .stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped()) .stdout(std::process::Stdio::piped())
.spawn() .spawn()?;
.unwrap();
let stdin = child.stdin.take().unwrap(); let stdin = SharedMut::new_mutex(child.stdin.take().unwrap());
let stdout = BufReader::new(child.stdout.take().unwrap()); let stdout = SharedMut::new_mutex(BufReader::new(child.stdout.take().unwrap()));
let client = ClientBuilder::default().build_with_tokio( let client = ClientBuilder::default().build_with_tokio(
Adapter(stdin), Adapter(stdin),
Adapter(stdout), Adapter(stdout),
); );
let new_client = Self { transport: TransportHandle::Stdio(child), client }; let new_client = Self { transport: TransportHandle::Stdio { child }, client };
new_client.initialize(init_params).await?; new_client.initialize(init_params).await?;
Ok(new_client) Ok(new_client)
} }
pub async fn initialize(&self, params: InitializeRequestParams) -> Result<InitializeResult, anyhow::Error> { async fn initialize(&self, params: InitializeRequestParams) -> Result<InitializeResult, anyhow::Error> {
let result: InitializeResult = self.client.request("initialize", params.to_rpc()).await?; let result: InitializeResult = self.client.request("initialize", params.to_rpc()).await?;
self.client.notification("notifications/initialized", NoParams).await?; self.client.notification("notifications/initialized", NoParams).await?;
Ok(result) Ok(result)

View File

@ -3,13 +3,14 @@ use tokio::process::{Child, ChildStdin, ChildStdout, Command};
use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncWriteExt, BufReader};
use jsonrpsee::core::async_trait; use jsonrpsee::core::async_trait;
use jsonrpsee::core::client::{ReceivedMessage, TransportReceiverT, TransportSenderT}; use jsonrpsee::core::client::{ReceivedMessage, TransportReceiverT, TransportSenderT};
use keepcalm::SharedMut;
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tracing::debug; use tracing::debug;
pub struct Adapter<T>(pub T); pub struct Adapter<T>(pub SharedMut<T>);
#[async_trait] #[async_trait]
impl<T: AsyncWriteExt + Unpin + Send + 'static> TransportSenderT for Adapter<T> { impl<T: AsyncWriteExt> TransportSenderT for Adapter<T> {
type Error = tokio::io::Error; type Error = tokio::io::Error;
#[tracing::instrument(skip(self), level = "trace")] #[tracing::instrument(skip(self), level = "trace")]