Move retry and rate limiting to a library

This commit is contained in:
Joshua Coles 2024-07-15 13:26:21 +01:00
parent d8206cd99b
commit 72aaf40f4b
3 changed files with 149 additions and 102 deletions

118
Cargo.lock generated
View File

@ -219,7 +219,7 @@ dependencies = [
"hashbrown",
"lock_api",
"once_cell",
"parking_lot_core",
"parking_lot_core 0.9.10",
]
[[package]]
@ -385,8 +385,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7"
dependencies = [
"cfg-if",
"js-sys",
"libc",
"wasi",
"wasm-bindgen",
]
[[package]]
@ -407,7 +409,7 @@ dependencies = [
"futures-timer",
"no-std-compat",
"nonzero_ext",
"parking_lot",
"parking_lot 0.12.3",
"portable-atomic",
"quanta",
"rand",
@ -609,6 +611,18 @@ dependencies = [
"hashbrown",
]
[[package]]
name = "instant"
version = "0.1.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222"
dependencies = [
"cfg-if",
"js-sys",
"wasm-bindgen",
"web-sys",
]
[[package]]
name = "ipnet"
version = "2.9.0"
@ -803,6 +817,17 @@ dependencies = [
"vcpkg",
]
[[package]]
name = "parking_lot"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99"
dependencies = [
"instant",
"lock_api",
"parking_lot_core 0.8.6",
]
[[package]]
name = "parking_lot"
version = "0.12.3"
@ -810,7 +835,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27"
dependencies = [
"lock_api",
"parking_lot_core",
"parking_lot_core 0.9.10",
]
[[package]]
name = "parking_lot_core"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "60a2cfe6f0ad2bfc16aefa463b497d5c7a5ecd44a23efa72aa342d90177356dc"
dependencies = [
"cfg-if",
"instant",
"libc",
"redox_syscall 0.2.16",
"smallvec",
"winapi",
]
[[package]]
@ -821,7 +860,7 @@ checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8"
dependencies = [
"cfg-if",
"libc",
"redox_syscall",
"redox_syscall 0.5.2",
"smallvec",
"windows-targets 0.52.6",
]
@ -954,6 +993,15 @@ dependencies = [
"bitflags 2.6.0",
]
[[package]]
name = "redox_syscall"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a"
dependencies = [
"bitflags 1.3.2",
]
[[package]]
name = "redox_syscall"
version = "0.5.2"
@ -1033,6 +1081,37 @@ dependencies = [
"reqwest-middleware",
]
[[package]]
name = "reqwest-retry"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf2a94ba69ceb30c42079a137e2793d6d0f62e581a24c06cd4e9bb32e973c7da"
dependencies = [
"anyhow",
"async-trait",
"chrono",
"futures",
"getrandom",
"http",
"hyper",
"parking_lot 0.11.2",
"reqwest",
"reqwest-middleware",
"retry-policies",
"tokio",
"tracing",
"wasm-timer",
]
[[package]]
name = "retry-policies"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5875471e6cab2871bc150ecb8c727db5113c9338cc3354dc5ee3425b6aa40a1c"
dependencies = [
"rand",
]
[[package]]
name = "ring"
version = "0.17.8"
@ -1364,7 +1443,9 @@ dependencies = [
"chrono",
"governor",
"reqwest",
"reqwest-middleware",
"reqwest-ratelimit",
"reqwest-retry",
"thiserror",
"tokio",
]
@ -1380,7 +1461,7 @@ dependencies = [
"libc",
"mio",
"num_cpus",
"parking_lot",
"parking_lot 0.12.3",
"pin-project-lite",
"signal-hook-registry",
"socket2",
@ -1469,9 +1550,21 @@ checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef"
dependencies = [
"log",
"pin-project-lite",
"tracing-attributes",
"tracing-core",
]
[[package]]
name = "tracing-attributes"
version = "0.1.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "tracing-core"
version = "0.1.32"
@ -1612,6 +1705,21 @@ version = "0.2.92"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af190c94f2773fdb3729c55b007a722abb5384da03bc0986df4c289bf5567e96"
[[package]]
name = "wasm-timer"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be0ecb0db480561e9a7642b5d3e4187c128914e58aa84330b9493e3eb68c5e7f"
dependencies = [
"futures",
"js-sys",
"parking_lot 0.11.2",
"pin-utils",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
]
[[package]]
name = "web-sys"
version = "0.3.69"

View File

@ -9,5 +9,7 @@ chrono = "0.4.38"
governor = "0.6.3"
reqwest = "0.12.5"
reqwest-ratelimit = "0.2.0"
reqwest-middleware = "0.3"
reqwest-retry = "0.6"
thiserror = "1.0.62"
tokio = { version = "1.38.0", features = ["full"] }

View File

@ -1,120 +1,57 @@
use std::cmp::max;
use std::num::NonZero;
use chrono::Utc;
use std::time::Duration;
use axum::async_trait;
use governor::clock::DefaultClock;
use governor::state::{InMemoryState, NotKeyed};
use reqwest::{Client, Error, Method, Request, RequestBuilder, Response, StatusCode};
use tokio::sync::oneshot;
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
use reqwest_retry::{RetryTransientMiddleware, policies::ExponentialBackoff, Jitter};
#[derive(Debug, thiserror::Error)]
enum TogglApiError {
#[error("Reqwest error: {0}")]
ReqwestError(#[from] reqwest::Error),
#[error("Retries exceeded")]
RetriesExceeded,
}
struct TogglApiRequest(
Request,
oneshot::Sender<Result<Response, TogglApiError>>,
);
struct TogglApiWorker {
struct ReqwestRateLimiter {
rate_limiter: governor::RateLimiter<NotKeyed, InMemoryState, DefaultClock>,
rx: tokio::sync::mpsc::Receiver<TogglApiRequest>,
client: Client,
max_retries: Option<u32>,
default_delay: f64,
}
impl TogglApiWorker {
fn new() -> (Self, TogglApi) {
let (tx, rx) = tokio::sync::mpsc::channel(100);
let client = Client::new();
let rate_limiter = governor::RateLimiter::direct(
governor::Quota::per_second(NonZero::new(1u32).unwrap())
);
(Self { rate_limiter, client, rx, max_retries: Some(3), default_delay: 3.14 }, TogglApi { tx })
}
pub async fn start(&mut self) {
loop {
// We limit ourselves to the recommended rate of 1 req/s
self.rate_limiter.until_ready().await;
let TogglApiRequest(request, tx) = self.rx.recv().await.unwrap();
let response = self.make_request(request).await;
tx.send(response).unwrap();
impl ReqwestRateLimiter {
fn new() -> Self {
Self {
rate_limiter: governor::RateLimiter::direct(
governor::Quota::per_second(NonZero::new(1u32).unwrap())
),
}
}
}
async fn make_request(&self, request: RequestBuilder) -> Result<Response, TogglApiError> {
let max_retries = self.max_retries.unwrap_or(1);
for _ in 0..max_retries {
let response = self.client.execute(request.clone()).await?;
if response.status().is_server_error() || response.status() == StatusCode::TOO_MANY_REQUESTS {
let delay = self.parse_retry_after_header(&response)
.unwrap_or(self.default_delay);
tokio::time::sleep(tokio::time::Duration::from_secs_f64(delay)).await;
} else {
return Ok(response);
}
}
Err(TogglApiError::RetriesExceeded)
}
fn parse_retry_after_header(&self, response: &Response) -> Option<f64> {
match response.headers().get("Retry-After") {
Some(retry_after) => {
let retry_after = retry_after.to_str()
.unwrap();
let a = retry_after.parse::<f64>()
.ok()
.or_else(|_| {
let date = chrono::NaiveDateTime::parse_from_str(
retry_after,
"%a, %d %b %Y %H:%M:%S GMT",
);
date.map(|date| date.and_utc())
.map(|date| Utc::now().signed_duration_since(date))
.map(|time_delta| time_delta.num_seconds() as f64)
.ok()
});
a
}
None => Some(self.default_delay),
}
#[async_trait]
impl reqwest_ratelimit::RateLimiter for ReqwestRateLimiter {
async fn acquire_permit(&self) {
self.rate_limiter.until_ready().await;
}
}
struct TogglApi {
tx: tokio::sync::mpsc::Sender<TogglApiRequest>,
client: ClientWithMiddleware,
api_key: String,
workspace_id: u32,
}
impl TogglApi {
pub async fn request(&self, request: Request) -> Result<Response, TogglApiError> {
let (tx, rx) = oneshot::channel();
self.tx.send(TogglApiRequest(request, tx)).await.expect("send request");
rx.await.unwrap()
fn new(api_key: String, workspace_id: u32) -> Self {
let rate_limiter = ReqwestRateLimiter::new();
let backoff = ExponentialBackoff::builder()
.retry_bounds(Duration::from_secs(1), Duration::from_secs(60))
.jitter(Jitter::Bounded)
.base(2)
.build_with_total_retry_duration(Duration::from_secs(24 * 60 * 60));
let client = ClientBuilder::new(reqwest::Client::new())
.with(reqwest_ratelimit::all(rate_limiter))
.with(RetryTransientMiddleware::new_with_policy(backoff))
.build();
Self { client, api_key, workspace_id }
}
}
#[tokio::main]
async fn main() {
let (mut worker, api_client) = TogglApiWorker::new();
tokio::spawn(async move { worker.start().await; });
dbg!(api_client.request(Request::new(Method::GET, "https://www.google.com".parse().unwrap())).await.unwrap());
let api = TogglApi::new("api_key".to_string(), 123);
}