From 72aaf40f4bba37427458b631d5ec84f162de2bcf Mon Sep 17 00:00:00 2001 From: Joshua Coles Date: Mon, 15 Jul 2024 13:26:21 +0100 Subject: [PATCH] Move retry and rate limiting to a library --- Cargo.lock | 118 ++++++++++++++++++++++++++++++++++++++++++++-- Cargo.toml | 2 + src/main.rs | 131 ++++++++++++++-------------------------------------- 3 files changed, 149 insertions(+), 102 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a55261c..a75ad98 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -219,7 +219,7 @@ dependencies = [ "hashbrown", "lock_api", "once_cell", - "parking_lot_core", + "parking_lot_core 0.9.10", ] [[package]] @@ -385,8 +385,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi", + "wasm-bindgen", ] [[package]] @@ -407,7 +409,7 @@ dependencies = [ "futures-timer", "no-std-compat", "nonzero_ext", - "parking_lot", + "parking_lot 0.12.3", "portable-atomic", "quanta", "rand", @@ -609,6 +611,18 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "instant" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "ipnet" version = "2.9.0" @@ -803,6 +817,17 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "parking_lot" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" +dependencies = [ + "instant", + "lock_api", + "parking_lot_core 0.8.6", +] + [[package]] name = "parking_lot" version = "0.12.3" @@ -810,7 +835,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27" dependencies = [ "lock_api", - "parking_lot_core", + "parking_lot_core 0.9.10", +] + +[[package]] +name = "parking_lot_core" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60a2cfe6f0ad2bfc16aefa463b497d5c7a5ecd44a23efa72aa342d90177356dc" +dependencies = [ + "cfg-if", + "instant", + "libc", + "redox_syscall 0.2.16", + "smallvec", + "winapi", ] [[package]] @@ -821,7 +860,7 @@ checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" dependencies = [ "cfg-if", "libc", - "redox_syscall", + "redox_syscall 0.5.2", "smallvec", "windows-targets 0.52.6", ] @@ -954,6 +993,15 @@ dependencies = [ "bitflags 2.6.0", ] +[[package]] +name = "redox_syscall" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "redox_syscall" version = "0.5.2" @@ -1033,6 +1081,37 @@ dependencies = [ "reqwest-middleware", ] +[[package]] +name = "reqwest-retry" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf2a94ba69ceb30c42079a137e2793d6d0f62e581a24c06cd4e9bb32e973c7da" +dependencies = [ + "anyhow", + "async-trait", + "chrono", + "futures", + "getrandom", + "http", + "hyper", + "parking_lot 0.11.2", + "reqwest", + "reqwest-middleware", + "retry-policies", + "tokio", + "tracing", + "wasm-timer", +] + +[[package]] +name = "retry-policies" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5875471e6cab2871bc150ecb8c727db5113c9338cc3354dc5ee3425b6aa40a1c" +dependencies = [ + "rand", +] + [[package]] name = "ring" version = "0.17.8" @@ -1364,7 +1443,9 @@ dependencies = [ "chrono", "governor", "reqwest", + "reqwest-middleware", "reqwest-ratelimit", + "reqwest-retry", "thiserror", "tokio", ] @@ -1380,7 +1461,7 @@ dependencies = [ "libc", "mio", "num_cpus", - "parking_lot", + "parking_lot 0.12.3", "pin-project-lite", "signal-hook-registry", "socket2", @@ -1469,9 +1550,21 @@ checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ "log", "pin-project-lite", + "tracing-attributes", "tracing-core", ] +[[package]] +name = "tracing-attributes" +version = "0.1.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tracing-core" version = "0.1.32" @@ -1612,6 +1705,21 @@ version = "0.2.92" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af190c94f2773fdb3729c55b007a722abb5384da03bc0986df4c289bf5567e96" +[[package]] +name = "wasm-timer" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be0ecb0db480561e9a7642b5d3e4187c128914e58aa84330b9493e3eb68c5e7f" +dependencies = [ + "futures", + "js-sys", + "parking_lot 0.11.2", + "pin-utils", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "web-sys" version = "0.3.69" diff --git a/Cargo.toml b/Cargo.toml index d52419b..5fb7ae4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,5 +9,7 @@ chrono = "0.4.38" governor = "0.6.3" reqwest = "0.12.5" reqwest-ratelimit = "0.2.0" +reqwest-middleware = "0.3" +reqwest-retry = "0.6" thiserror = "1.0.62" tokio = { version = "1.38.0", features = ["full"] } diff --git a/src/main.rs b/src/main.rs index 372250b..82572c5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,120 +1,57 @@ -use std::cmp::max; use std::num::NonZero; -use chrono::Utc; +use std::time::Duration; +use axum::async_trait; use governor::clock::DefaultClock; use governor::state::{InMemoryState, NotKeyed}; -use reqwest::{Client, Error, Method, Request, RequestBuilder, Response, StatusCode}; -use tokio::sync::oneshot; +use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; +use reqwest_retry::{RetryTransientMiddleware, policies::ExponentialBackoff, Jitter}; -#[derive(Debug, thiserror::Error)] -enum TogglApiError { - #[error("Reqwest error: {0}")] - ReqwestError(#[from] reqwest::Error), - #[error("Retries exceeded")] - RetriesExceeded, -} - -struct TogglApiRequest( - Request, - oneshot::Sender>, -); - -struct TogglApiWorker { +struct ReqwestRateLimiter { rate_limiter: governor::RateLimiter, - rx: tokio::sync::mpsc::Receiver, - client: Client, - max_retries: Option, - default_delay: f64, } -impl TogglApiWorker { - fn new() -> (Self, TogglApi) { - let (tx, rx) = tokio::sync::mpsc::channel(100); - - let client = Client::new(); - - let rate_limiter = governor::RateLimiter::direct( - governor::Quota::per_second(NonZero::new(1u32).unwrap()) - ); - - (Self { rate_limiter, client, rx, max_retries: Some(3), default_delay: 3.14 }, TogglApi { tx }) - } - - pub async fn start(&mut self) { - loop { - // We limit ourselves to the recommended rate of 1 req/s - self.rate_limiter.until_ready().await; - - let TogglApiRequest(request, tx) = self.rx.recv().await.unwrap(); - let response = self.make_request(request).await; - tx.send(response).unwrap(); +impl ReqwestRateLimiter { + fn new() -> Self { + Self { + rate_limiter: governor::RateLimiter::direct( + governor::Quota::per_second(NonZero::new(1u32).unwrap()) + ), } } +} - async fn make_request(&self, request: RequestBuilder) -> Result { - let max_retries = self.max_retries.unwrap_or(1); - - for _ in 0..max_retries { - let response = self.client.execute(request.clone()).await?; - - if response.status().is_server_error() || response.status() == StatusCode::TOO_MANY_REQUESTS { - let delay = self.parse_retry_after_header(&response) - .unwrap_or(self.default_delay); - - tokio::time::sleep(tokio::time::Duration::from_secs_f64(delay)).await; - } else { - return Ok(response); - } - } - - Err(TogglApiError::RetriesExceeded) - } - - fn parse_retry_after_header(&self, response: &Response) -> Option { - match response.headers().get("Retry-After") { - Some(retry_after) => { - let retry_after = retry_after.to_str() - .unwrap(); - - let a = retry_after.parse::() - .ok() - .or_else(|_| { - let date = chrono::NaiveDateTime::parse_from_str( - retry_after, - "%a, %d %b %Y %H:%M:%S GMT", - ); - - date.map(|date| date.and_utc()) - .map(|date| Utc::now().signed_duration_since(date)) - .map(|time_delta| time_delta.num_seconds() as f64) - .ok() - }); - - a - } - - None => Some(self.default_delay), - } +#[async_trait] +impl reqwest_ratelimit::RateLimiter for ReqwestRateLimiter { + async fn acquire_permit(&self) { + self.rate_limiter.until_ready().await; } } struct TogglApi { - tx: tokio::sync::mpsc::Sender, + client: ClientWithMiddleware, + api_key: String, + workspace_id: u32, } impl TogglApi { - pub async fn request(&self, request: Request) -> Result { - let (tx, rx) = oneshot::channel(); - self.tx.send(TogglApiRequest(request, tx)).await.expect("send request"); - rx.await.unwrap() + fn new(api_key: String, workspace_id: u32) -> Self { + let rate_limiter = ReqwestRateLimiter::new(); + let backoff = ExponentialBackoff::builder() + .retry_bounds(Duration::from_secs(1), Duration::from_secs(60)) + .jitter(Jitter::Bounded) + .base(2) + .build_with_total_retry_duration(Duration::from_secs(24 * 60 * 60)); + + let client = ClientBuilder::new(reqwest::Client::new()) + .with(reqwest_ratelimit::all(rate_limiter)) + .with(RetryTransientMiddleware::new_with_policy(backoff)) + .build(); + + Self { client, api_key, workspace_id } } } #[tokio::main] async fn main() { - let (mut worker, api_client) = TogglApiWorker::new(); - - tokio::spawn(async move { worker.start().await; }); - - dbg!(api_client.request(Request::new(Method::GET, "https://www.google.com".parse().unwrap())).await.unwrap()); + let api = TogglApi::new("api_key".to_string(), 123); }