Compare commits

...

8 Commits

Author SHA1 Message Date
Joshua Coles
ead72d3b4a Stash
Some checks failed
Ruby / Ruby ${{ matrix.ruby }} (3.4.1) (push) Failing after 9m34s
2025-03-18 14:35:10 +00:00
Joshua Coles
ef56b3ff74 Add a stub to make type checkers happy 2025-03-17 16:06:20 +00:00
Joshua Coles
60279f0b4e Enough to use 2025-03-17 16:00:48 +00:00
Joshua Coles
96ae3b2198 Stash 2025-03-17 15:55:55 +00:00
Joshua Coles
6ba2e9e8df Stash 2025-03-17 15:50:28 +00:00
Joshua Coles
8cfd28d3f6 Stash 2025-03-17 15:34:35 +00:00
Joshua Coles
36eb457898 Stash 2025-03-17 15:10:01 +00:00
Joshua Coles
7ddc1196d5 Stash 2025-03-17 10:46:32 +00:00
13 changed files with 274 additions and 179 deletions

1
.ruby-version Normal file
View File

@ -0,0 +1 @@
3.4.1

View File

@ -5,6 +5,8 @@ source "https://rubygems.org"
# Specify your gem's dependencies in mcp.gemspec
gemspec
gem "pry"
gem "rake", "~> 13.0"
gem "rake-compiler", "~> 1.2.0"

View File

@ -8,14 +8,19 @@ GEM
remote: https://rubygems.org/
specs:
ast (2.4.2)
coderay (1.1.3)
diff-lcs (1.6.0)
json (2.10.2)
language_server-protocol (3.17.0.4)
lint_roller (1.1.0)
method_source (1.1.0)
parallel (1.26.3)
parser (3.3.7.1)
ast (~> 2.4.1)
racc
pry (0.15.2)
coderay (~> 1.1)
method_source (~> 1.0)
racc (1.8.1)
rainbow (3.1.1)
rake (13.2.1)
@ -78,6 +83,7 @@ PLATFORMS
DEPENDENCIES
mcp!
pry
rake (~> 13.0)
rake-compiler (~> 1.2.0)
rspec (~> 3.0)

11
ext/mcp/Cargo.lock generated
View File

@ -251,6 +251,16 @@ dependencies = [
"thiserror",
]
[[package]]
name = "keepcalm"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "031ddc7e27bbb011c78958881a3723873608397b8b10e146717fc05cf3364d78"
dependencies = [
"once_cell",
"parking_lot",
]
[[package]]
name = "lazy_static"
version = "1.5.0"
@ -333,6 +343,7 @@ version = "0.1.0"
dependencies = [
"anyhow",
"jsonrpsee",
"keepcalm",
"magnus",
"once_cell",
"serde",

View File

@ -21,3 +21,4 @@ serde_json = "1.0.140"
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
once_cell = "1.19.0"
serde_magnus = "0.9.0"
keepcalm = "0.3.5"

View File

@ -1,21 +1,23 @@
use std::process::Stdio;
use crate::types::{
CallToolRequestParams, ClientCapabilities, ListToolsRequestParams, ListToolsResult,
};
use jsonrpsee::core::client::{
Client, ClientBuilder, ClientT, TransportReceiverT, TransportSenderT,
};
use jsonrpsee::core::traits::ToRpcParams;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
use serde::Serialize;
use serde_json::json;
use std::process::Stdio;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
use tokio::select;
use types::{Implementation, InitializeRequestParams, InitializeResult};
use crate::types::{CallToolRequestParams, ClientCapabilities, ListToolsRequestParams, ListToolsResult};
mod types;
mod mcp_client;
mod rpc_helpers;
mod stdio_transport;
mod types;
use crate::mcp_client::McpClientConnection;
use rpc_helpers::*;
use stdio_transport::StdioTransport;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
@ -29,51 +31,54 @@ async fn main() -> anyhow::Result<()> {
.with_level(true)
.init();
let client = McpClientConnection::new_stdio(
"/Users/joshuacoles/.local/bin/mcp-server-fetch".to_string(),
vec![],
InitializeRequestParams {
capabilities: Default::default(),
client_info: Implementation {
name: "ABC".to_string(),
version: "0.0.1".to_string(),
},
protocol_version: "2024-11-05".to_string(),
},
)
.await?;
let mut cmd = tokio::process::Command::new("/Users/joshuacoles/.local/bin/mcp-server-fetch")
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()?;
dbg!(client.list_tools());
let transport = StdioTransport::new(&mut cmd);
let client: Client = ClientBuilder::default().build_with_tokio(
transport.clone(),
transport.clone(),
);
let response: InitializeResult = client.request("initialize", InitializeRequestParams {
capabilities: ClientCapabilities::default(),
client_info: Implementation { name: "Rust MCP".to_string(), version: "0.1.0".to_string() },
protocol_version: "2024-11-05".to_string(),
}.to_rpc()).await?;
client.notification("notifications/initialized", NoParams).await?;
println!("Hey");
// drop(transport.stdin);
select! {
_ = cmd.wait() => {
println!("Command exited");
}
_ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {
cmd.kill().await?;
}
}
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
// let response: ListToolsResult = client.request("tools/list", ListToolsRequestParams::default().to_rpc()).await?;
// let response: serde_json::Value = client.request("tools/call", CallToolRequestParams {
// arguments: json!({ "url": "http://example.com" }).as_object().unwrap().clone(),
// name: "fetch".to_string(),
// let response: InitializeResult = client.request("initialize", InitializeRequestParams {
// capabilities: ClientCapabilities::default(),
// client_info: Implementation { name: "Rust MCP".to_string(), version: "0.1.0".to_string() },
// protocol_version: "2024-11-05".to_string(),
// }.to_rpc()).await?;
// println!("Response: {:#?}", response);
//
// client.notification("notifications/initialized", NoParams).await?;
//
// println!("Hey");
//
// // drop(transport.stdin);
//
// select! {
// _ = cmd.wait() => {
// println!("Command exited");
// }
//
// _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {
// cmd.kill().await?;
// }
// }
//
// tokio::time::sleep(std::time::Duration::from_secs(1)).await;
//
// // let response: ListToolsResult = client.request("tools/list", ListToolsRequestParams::default().to_rpc()).await?;
//
// // let response: serde_json::Value = client.request("tools/call", CallToolRequestParams {
// // arguments: json!({ "url": "http://example.com" }).as_object().unwrap().clone(),
// // name: "fetch".to_string(),
// // }.to_rpc()).await?;
//
// // println!("Response: {:#?}", response);
Ok(())
}

View File

@ -1,75 +1,73 @@
use jsonrpsee::async_client::{Client, ClientBuilder};
use tokio::process::Command;
use crate::mcp_client::McpClient;
use jsonrpsee::core::client::ClientT;
use once_cell::sync::Lazy;
use crate::mcp_client::McpClientConnection;
use magnus::prelude::*;
use once_cell::sync::Lazy;
mod mcp_client;
mod types;
mod rpc_helpers;
mod stdio_transport;
mod types;
use std::{
hash::{Hash, Hasher},
use crate::types::{CallToolRequestParams, Implementation, InitializeRequestParams};
use magnus::{
eval, function, method, prelude::*, Error, ExceptionClass, RHash, Ruby, TryConvert, Value,
};
use magnus::{function, method, prelude::*, scan_args::{get_kwargs, scan_args}, typed_data, Error, RHash, Ruby, Symbol, TryConvert, Value};
use serde_magnus::serialize;
use crate::types::{CallToolRequestParams, Implementation, InitializeRequestParams, InitializeResult};
use tokio::sync::Mutex;
// Create global runtime
static RUNTIME: Lazy<tokio::runtime::Runtime> = Lazy::new(|| {
tokio::runtime::Runtime::new().expect("Failed to create Tokio runtime")
});
static RUNTIME: Lazy<tokio::runtime::Runtime> =
Lazy::new(|| tokio::runtime::Runtime::new().expect("Failed to create Tokio runtime"));
#[magnus::wrap(class = "Mcp::Client", free_immediately, size)]
struct McpClientRb {
client: McpClient,
client: Mutex<Option<McpClientConnection>>,
}
impl McpClientRb {
fn new(command: String, args: Vec<String>) -> Result<Self, magnus::Error> {
let client = RUNTIME.block_on(async {
let child = Command::new(command)
.args(args)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.spawn()
.unwrap();
let client = RUNTIME
.block_on(async {
McpClientConnection::new_stdio(
command,
args,
InitializeRequestParams {
capabilities: Default::default(),
client_info: Implementation {
name: "ABC".to_string(),
version: "0.0.1".to_string(),
},
protocol_version: "2024-11-05".to_string(),
},
)
.await
})
.map_err(|err| Error::new(magnus::exception::runtime_error(), err.to_string()))?;
let transport = stdio_transport::StdioTransport::new(child);
ClientBuilder::default().build_with_tokio(
transport.clone(),
transport.clone(),
)
});
Ok(Self { client: McpClient { client } })
Ok(Self {
client: Mutex::new(Some(client)),
})
}
fn connect(&self) -> Result<bool, magnus::Error> {
fn disconnect(&self) {
RUNTIME.block_on(async {
let a = self.client.initialize(InitializeRequestParams {
capabilities: Default::default(),
client_info: Implementation { name: "ABC".to_string(), version: "0.0.1".to_string() },
protocol_version: "2024-11-05".to_string(),
}).await;
match a {
Ok(_) => Ok(true),
Err(e) => Err(magnus::Error::new(
magnus::exception::runtime_error(),
e.to_string(),
)),
}
self.client.lock().await.take();
})
}
fn list_tools(&self) -> Result<Value, magnus::Error> {
RUNTIME.block_on(async {
let a = self.client.list_tools().await;
let a = self
.client
.lock()
.await
.as_ref()
.ok_or(Error::new(
ExceptionClass::from_value(eval("Mcp::ClientDisconnectedError").unwrap())
.unwrap(),
"Client is not connected",
))?
.list_tools()
.await;
match a {
Ok(tools) => serialize::<_, Value>(&tools),
@ -81,22 +79,25 @@ impl McpClientRb {
})
}
fn call_tool(&self, values: &[Value]) -> Result<Value, magnus::Error> {
let args = scan_args::<(Value,), (), (), (), RHash, ()>(values)?;
let ((name,)) = args.required;
let kwargs: RHash = args.keywords;
let kwargs: serde_json::Map<String, serde_json::Value> = serde_magnus::deserialize(kwargs)?;
let name = match Symbol::from_value(name) {
Some(symbol) => symbol.name()?.to_string(),
None => String::try_convert(name)?,
};
fn call_tool(&self, name: String, rhash: RHash) -> Result<Value, magnus::Error> {
let kwargs: serde_json::Map<String, serde_json::Value> = serde_magnus::deserialize(rhash)?;
RUNTIME.block_on(async {
let a = self.client.call_tool::<serde_json::Value>(CallToolRequestParams {
name,
arguments: kwargs,
}).await;
let a = self
.client
.lock()
.await
.as_ref()
.ok_or(Error::new(
ExceptionClass::from_value(eval("Mcp::ClientDisconnectedError").unwrap())
.unwrap(),
"Client is not connected",
))?
.call_tool::<serde_json::Value>(CallToolRequestParams {
name,
arguments: kwargs,
})
.await;
match a {
Ok(a) => Ok(serde_magnus::serialize(&a)?),
@ -111,13 +112,23 @@ impl McpClientRb {
#[magnus::init]
fn init(ruby: &Ruby) -> Result<(), Error> {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.with_file(true)
.with_line_number(true)
.with_thread_ids(true)
.with_thread_names(true)
.with_target(true)
.with_level(true)
.init();
let module = ruby.define_module("Mcp")?;
let client_class = module.define_class("Client", ruby.class_object())?;
client_class.define_singleton_method("new", function!(McpClientRb::new, 2))?;
client_class.define_method("connect", method!(McpClientRb::connect, 0))?;
client_class.define_method("list_tools", method!(McpClientRb::list_tools, 0))?;
client_class.define_method("call_tool", method!(McpClientRb::call_tool, -1))?;
client_class.define_method("call_tool", method!(McpClientRb::call_tool, 2))?;
client_class.define_method("disconnect", method!(McpClientRb::disconnect, 0))?;
Ok(())
}

View File

@ -1,25 +1,72 @@
use jsonrpsee::async_client::Client;
use jsonrpsee::core::client::ClientT;
use tokio::process::Child;
use stdio_transport::StdioTransport;
use crate::rpc_helpers::{NoParams, ToRpcArg};
use crate::stdio_transport;
use crate::types::{CallToolRequestParams, InitializeRequestParams, InitializeResult, ListToolsRequestParams, ListToolsResult, Tool};
use crate::stdio_transport::Adapter;
use crate::types::{
CallToolRequestParams, InitializeRequestParams, InitializeResult, ListToolsRequestParams,
ListToolsResult, Tool,
};
use jsonrpsee::async_client::{Client, ClientBuilder};
use jsonrpsee::core::client::ClientT;
use std::sync::Arc;
use tokio::io::BufReader;
use tokio::process::{Child, ChildStdin, ChildStdout, Command};
use tokio::sync::Mutex;
pub struct McpClient {
pub(crate) transport: StdioTransport,
enum TransportHandle {
Stdio {
child: Child,
stdin: Arc<Mutex<ChildStdin>>,
stdout: Arc<Mutex<BufReader<ChildStdout>>>,
},
}
/// This represents a live MCP connection to an MCP server. It will close the connection when dropped on a best effort basis.
pub struct McpClientConnection {
pub(crate) transport: TransportHandle,
pub(crate) client: Client,
}
impl McpClient {
pub async fn initialize(&self, params: InitializeRequestParams) -> Result<InitializeResult, anyhow::Error> {
let result: InitializeResult = self.client.request("initialize", params.to_rpc()).await?;
self.client.notification("notifications/initialized", NoParams).await?;
Ok(result)
impl McpClientConnection {
pub async fn new_stdio(
command: String,
args: Vec<String>,
init_params: InitializeRequestParams,
) -> Result<Self, anyhow::Error> {
let mut child = Command::new(command)
.args(args)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.kill_on_drop(true)
.spawn()?;
// We take ownership of the stdin and stdout here to pass them to the transport, wrapping them in an Arc and Mutex to allow them to be shared between threads in an async context.
let stdin = Arc::new(Mutex::new(child.stdin.take().unwrap()));
let stdout = Arc::new(Mutex::new(BufReader::new(child.stdout.take().unwrap())));
let client = ClientBuilder::default()
.build_with_tokio(Adapter(stdin.clone()), Adapter(stdout.clone()));
let new_client = Self {
transport: TransportHandle::Stdio {
child,
stdin,
stdout,
},
client,
};
new_client.initialize(init_params).await?;
Ok(new_client)
}
pub async fn shutdown(mut self) {
self.transport.shutdown();
async fn initialize(
&self,
params: InitializeRequestParams,
) -> Result<InitializeResult, anyhow::Error> {
let result: InitializeResult = self.client.request("initialize", params.to_rpc()).await?;
self.client
.notification("notifications/initialized", NoParams)
.await?;
Ok(result)
}
pub async fn list_tools(&self) -> Result<Vec<Tool>, anyhow::Error> {
@ -29,14 +76,26 @@ impl McpClient {
tools.extend(result.tools);
while let Some(cursor) = result.next_cursor.as_ref() {
let result: ListToolsResult = self.client.request("tools/list", ListToolsRequestParams { cursor: Some(cursor.clone()) }.to_rpc()).await?;
let result: ListToolsResult = self
.client
.request(
"tools/list",
ListToolsRequestParams {
cursor: Some(cursor.clone()),
}
.to_rpc(),
)
.await?;
tools.extend(result.tools);
}
Ok(tools)
}
pub async fn call_tool<T: serde::de::DeserializeOwned>(&self, params: CallToolRequestParams) -> Result<T, anyhow::Error> {
pub async fn call_tool<T: serde::de::DeserializeOwned>(
&self,
params: CallToolRequestParams,
) -> Result<T, anyhow::Error> {
Ok(self.client.request("tools/call", params.to_rpc()).await?)
}
}

View File

@ -1,7 +1,7 @@
use jsonrpsee::core::traits::ToRpcParams;
use serde::Serialize;
use serde_json::Error;
use serde_json::value::RawValue;
use serde_json::Error;
pub struct RpcArg<T>(T);

View File

@ -1,53 +1,33 @@
use std::sync::Arc;
use tokio::process::{Child, ChildStdin, ChildStdout, Command};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use jsonrpsee::core::async_trait;
use jsonrpsee::core::client::{ReceivedMessage, TransportReceiverT, TransportSenderT};
use std::sync::Arc;
use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncWriteExt};
use tokio::sync::Mutex;
use tracing::debug;
#[derive(Debug, Clone)]
pub struct StdioTransport {
pub stdin: Arc<Mutex<ChildStdin>>,
pub stdout: Arc<Mutex<BufReader<ChildStdout>>>,
}
impl StdioTransport {
pub fn new(child: &mut Child) -> Self {
let stdin = Arc::new(Mutex::new(child.stdin.take().unwrap()));
let stdout = Arc::new(Mutex::new(BufReader::new(child.stdout.take().unwrap())));
Self { stdin, stdout }
}
pub(crate) async fn shutdown(mut self) -> Result<(), tokio::io::Error> {
Ok(())
}
}
pub struct Adapter<T>(pub Arc<Mutex<T>>);
#[async_trait]
impl TransportSenderT for StdioTransport {
impl<T: Unpin + Send + 'static + AsyncWriteExt> TransportSenderT for Adapter<T> {
type Error = tokio::io::Error;
#[tracing::instrument(skip(self), level = "trace")]
async fn send(&mut self, msg: String) -> Result<(), Self::Error> {
debug!("Sending: {}", msg);
let mut stdin = self.stdin.lock().await;
stdin.write_all(msg.as_bytes()).await?;
stdin.write_all(b"\n").await?;
let mut guard = self.0.lock().await;
guard.write_all(msg.as_bytes()).await?;
guard.write_all(b"\n").await?;
Ok(())
}
}
#[async_trait]
impl TransportReceiverT for StdioTransport {
impl<T: AsyncBufRead + Unpin + Send + 'static> TransportReceiverT for Adapter<T> {
type Error = tokio::io::Error;
#[tracing::instrument(skip(self), level = "trace")]
async fn receive(&mut self) -> Result<ReceivedMessage, Self::Error> {
let mut stdout = self.stdout.lock().await;
let mut str = String::new();
stdout.read_line(&mut str).await?;
debug!("Received: {}", str);
self.0.lock().await.read_line(&mut str).await?;
Ok(ReceivedMessage::Text(str))
}
}
}

View File

@ -5,7 +5,18 @@ require_relative "mcp/version"
module Mcp
class Error < StandardError; end
class ClientDisconnectedError < Error; end
class Client
def initialize(command, args)
# This is implemented in rust
raise NotImplementedError
end
def list_tools
raise NotImplementedError
end
def tools
ToolsProxy.new(self)
end
@ -19,12 +30,10 @@ module Mcp
private
def respond_to_missing?(name, include_private = false)
@tools.any? { |tool| tool["name"] == name.to_s } || super
end
def respond_to_missing?(name, include_private = false) end
def method_missing(name, **kwargs)
@client.call_tool(name, **kwargs)
@client.call_tool(name.to_s, kwargs)
end
end
end

View File

@ -6,19 +6,19 @@ Gem::Specification.new do |spec|
spec.name = "mcp"
spec.version = Mcp::VERSION
spec.authors = ["Joshua Coles"]
spec.email = ["josh@coles.to"]
spec.email = ["joshuac@amphora-research.com"]
spec.summary = "TODO: Write a short summary, because RubyGems requires one."
spec.description = "TODO: Write a longer description or delete this line."
spec.homepage = "TODO: Put your gem's website or public repo URL here."
spec.summary = "MCP Client implementation"
# spec.description = "TODO: Write a longer description or delete this line."
# spec.homepage = "TODO: Put your gem's website or public repo URL here."
spec.license = "MIT"
spec.required_ruby_version = ">= 3.1.0"
spec.metadata["allowed_push_host"] = "TODO: Set to your gem server 'https://example.com'"
spec.metadata["homepage_uri"] = spec.homepage
spec.metadata["source_code_uri"] = "TODO: Put your gem's public repo URL here."
spec.metadata["changelog_uri"] = "TODO: Put your gem's CHANGELOG.md URL here."
# spec.metadata["allowed_push_host"] = "TODO: Set to your gem server 'https://example.com'"
#
# spec.metadata["homepage_uri"] = spec.homepage
# spec.metadata["source_code_uri"] = "TODO: Put your gem's public repo URL here."
# spec.metadata["changelog_uri"] = "TODO: Put your gem's CHANGELOG.md URL here."
# Specify which files should be added to the gem when it is released.
# The `git ls-files -z` loads the files in the RubyGem that have been added into git.

View File

@ -1,14 +1,24 @@
# frozen_string_literal: true
RSpec.describe Mcp do
it "has a version number" do
expect(distance([1, 1], [1, 2])).to eq(1)
it "can list tools" do
client = Mcp::Client.new("/Users/joshuacoles/.local/bin/mcp-server-fetch", [])
expect(client.list_tools).to_not be_nil
end
it "does something useful" do
a = Mcp::Client.new("/Users/joshuacoles/.local/bin/mcp-server-fetch", [])
expect(a.connect).to eq(true)
puts a.list_tools
puts a.tools.fetch(url: 'http://example.com')
it 'can call tools' do
client = Mcp::Client.new("/Users/joshuacoles/.local/bin/mcp-server-fetch", [])
result = client.tools.fetch(url: 'http://example.com')
expect(result).to be_a(Hash)
expect(result['content'][0]).to be_a(Hash)
expect(result['content'][0]['text']).to include('Contents of http://example.com/')
end
it 'handles calls after disconnect' do
client = Mcp::Client.new("/Users/joshuacoles/.local/bin/mcp-server-fetch", [])
client.disconnect
expect { client.tools.fetch(url: 'http://example.com') }.to raise_error(Mcp::ClientDisconnectedError)
end
end