diff --git a/src/main.rs b/src/main.rs index a77334b..f19b71a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,13 +1,13 @@ -use chrono::{DateTime, DurationRound, NaiveDate, TimeDelta, Utc}; -use sqlx::{Connection, PgConnection, Postgres, QueryBuilder}; -use toggl::TogglApi; use crate::toggl::types::{Project, Tag, TimeEntry, TogglReportFilters, TrackingClient}; +use chrono::{DateTime, DurationRound, NaiveDate, TimeDelta, Utc}; use itertools::Itertools; use soa_rs::Soa; +use sqlx::{Connection, PgConnection, Postgres, QueryBuilder}; +use toggl::TogglApi; use tracing_subscriber::fmt::time; -mod toggl; mod sensitive; +mod toggl; #[derive(Debug, thiserror::Error)] enum AppError { @@ -59,14 +59,25 @@ impl Worker { }) } - pub async fn fetch_within(&mut self, start: DateTime, end: DateTime) -> Result<(), AppError> { - let results = self.toggl_api.search(self.toggl_api.workspace_id, TogglReportFilters { - start_date: Some(start.date_naive()), - end_date: Some(end.date_naive()), - ..Default::default() - }).await?; + pub async fn fetch_within( + &mut self, + start: DateTime, + end: DateTime, + ) -> Result<(), AppError> { + let results = self + .toggl_api + .search( + self.toggl_api.workspace_id, + TogglReportFilters { + start_date: Some(start.date_naive()), + end_date: Some(end.date_naive()), + ..Default::default() + }, + ) + .await?; - let time_entries = results.into_iter() + let time_entries = results + .into_iter() .map(|entry| entry.into_time_entry(self.toggl_api.workspace_id)) .collect::>(); @@ -95,8 +106,10 @@ impl Worker { } async fn update_time_entries(&mut self, fetch_since: DateTime) -> Result<(), AppError> { - let time_entries = self.toggl_api - .get_time_entries_for_user_modified_since(fetch_since).await?; + let time_entries = self + .toggl_api + .get_time_entries_for_user_modified_since(fetch_since) + .await?; self.update_database(time_entries).await } @@ -104,17 +117,20 @@ impl Worker { async fn update_database(&mut self, time_entries: Vec) -> Result<(), AppError> { let existing_ids = self.get_ids().await?; - let fetch_workspaces = time_entries.iter() + let fetch_workspaces = time_entries + .iter() .map(|entry| entry.workspace_id) .filter(|workspace_id| !existing_ids.workspace_ids.contains(&workspace_id)) .collect::>(); - let fetch_projects = time_entries.iter() + let fetch_projects = time_entries + .iter() .map(|entry| entry.project_id) .filter_map(|project_id| project_id) .any(|project_id| !existing_ids.project_ids.contains(&project_id)); - let fetch_tags = time_entries.iter() + let fetch_tags = time_entries + .iter() .flat_map(|entry| entry.tag_ids.iter()) .any(|tag| !existing_ids.tag_ids.contains(&tag)); @@ -137,7 +153,10 @@ impl Worker { Ok(()) } - async fn update_time_entries_chunk(&mut self, time_entries: &[TimeEntry]) -> Result<(), AppError> { + async fn update_time_entries_chunk( + &mut self, + time_entries: &[TimeEntry], + ) -> Result<(), AppError> { let time_entries = Soa::from(time_entries); sqlx::query!( @@ -179,10 +198,12 @@ impl Worker { } async fn update_workspaces(&mut self, workspace_ids: &[u64]) -> Result<(), AppError> { - let workspaces = workspace_ids.iter() + let workspaces = workspace_ids + .iter() .map(|id| self.toggl_api.get_workspace(*id)); - let workspaces = futures::future::join_all(workspaces).await + let workspaces = futures::future::join_all(workspaces) + .await .into_iter() .collect::, _>>()?; @@ -196,12 +217,20 @@ impl Worker { organization_id = excluded.organization_id, name = excluded.name "#, - &workspaces.id().iter().map(|id| *id as i64).collect::>()[..], - &workspaces.organization_id().iter().map(|id| *id as i64).collect::>()[..], + &workspaces + .id() + .iter() + .map(|id| *id as i64) + .collect::>()[..], + &workspaces + .organization_id() + .iter() + .map(|id| *id as i64) + .collect::>()[..], workspaces.name(), ) - .execute(&mut self.db) - .await?; + .execute(&mut self.db) + .await?; Ok(()) } @@ -209,7 +238,8 @@ impl Worker { async fn update_projects(&mut self, existing_ids: &TableSummary) -> Result<(), AppError> { let projects = self.toggl_api.get_projects().await?; - let fetch_clients = projects.iter() + let fetch_clients = projects + .iter() .map(|project| project.client_id) .filter_map(|client_id| client_id) .any(|client_id| !existing_ids.client_ids.contains(&(client_id as u64))); @@ -335,28 +365,21 @@ impl Worker { #[tokio::main] async fn main() { - dotenv::dotenv() - .expect("Failed to load .env file"); + dotenv::dotenv().expect("Failed to load .env file"); // Init tracing tracing_subscriber::fmt::init(); - let api = TogglApi::new( - sensitive::API_TOKEN, - sensitive::WORKSPACE_ID, - ); + let api = TogglApi::new(sensitive::API_TOKEN, sensitive::WORKSPACE_ID); - let database_url = std::env::var("DATABASE_URL") - .expect("DATABASE_URL must be set"); + let database_url = std::env::var("DATABASE_URL").expect("DATABASE_URL must be set"); let mut worker = Worker { db: PgConnection::connect(&database_url).await.unwrap(), toggl_api: api, }; - worker.update_tags() - .await - .unwrap(); + worker.update_tags().await.unwrap(); let start = NaiveDate::from_ymd_opt(2024, 2, 1) .expect("Invalid date") @@ -370,6 +393,8 @@ async fn main() { .expect("Invalid time") .and_utc(); - worker.fetch_within(start, end).await + worker + .fetch_within(start, end) + .await .expect("Failed to fetch time entries"); } diff --git a/src/toggl/mod.rs b/src/toggl/mod.rs index 530ea74..fbafa6e 100644 --- a/src/toggl/mod.rs +++ b/src/toggl/mod.rs @@ -1,17 +1,17 @@ -use reqwest_retry::policies::ExponentialBackoff; -use std::time::Duration; -use reqwest_retry::{Jitter, RetryTransientMiddleware}; -use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; -use std::num::NonZero; use axum::async_trait; -use governor::state::{InMemoryState, NotKeyed}; -use governor::clock::DefaultClock; -use reqwest::header::{HeaderMap, HeaderValue}; use base64::engine::general_purpose::STANDARD; use base64::Engine; use chrono::{DateTime, SecondsFormat, Utc}; +use governor::clock::DefaultClock; +use governor::state::{InMemoryState, NotKeyed}; +use reqwest::header::{HeaderMap, HeaderValue}; use reqwest::Response; +use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; +use reqwest_retry::policies::ExponentialBackoff; +use reqwest_retry::{Jitter, RetryTransientMiddleware}; use serde::de::DeserializeOwned; +use std::num::NonZero; +use std::time::Duration; struct ReqwestRateLimiter { rate_limiter: governor::RateLimiter, @@ -20,9 +20,9 @@ struct ReqwestRateLimiter { impl ReqwestRateLimiter { fn new() -> Self { Self { - rate_limiter: governor::RateLimiter::direct( - governor::Quota::per_second(NonZero::new(1u32).unwrap()) - ), + rate_limiter: governor::RateLimiter::direct(governor::Quota::per_second( + NonZero::new(1u32).unwrap(), + )), } } } @@ -63,7 +63,11 @@ impl TogglApi { let toggl_auth = &STANDARD.encode(format!("{}:api_token", api_key)); let headers = Self::authorisation_headers(toggl_auth); - Self { client, workspace_id, headers } + Self { + client, + workspace_id, + headers, + } } fn authorisation_headers(toggl_auth: &str) -> HeaderMap { @@ -77,14 +81,15 @@ impl TogglApi { /// Get the workspaces that a user is a part of #[tracing::instrument(skip(self))] - async fn get_users_workspaces( - &self - ) -> Result, TogglError> { + async fn get_users_workspaces(&self) -> Result, TogglError> { let url = format!("{base_url}/me/workspaces", base_url = BASE_URL); - let response = self.client.get(&url) + let response = self + .client + .get(&url) .headers(self.headers.clone()) - .send().await?; + .send() + .await?; let data = response.text().await?; let workspaces: Vec = serde_json::from_str(&data)?; @@ -95,62 +100,80 @@ impl TogglApi { /// Get a specific workspace by its ID #[tracing::instrument(skip(self))] pub async fn get_workspace(&self, id: u64) -> Result { - let url = format!( - "{base_url}/workspaces/{id}", - base_url = BASE_URL, - id = id - ); + let url = format!("{base_url}/workspaces/{id}", base_url = BASE_URL, id = id); - Self::parse(self.client.get(&url) - .headers(self.headers.clone()) - .send().await?).await + Self::parse( + self.client + .get(&url) + .headers(self.headers.clone()) + .send() + .await?, + ) + .await } /// Fetches all time entries for this user from Toggl that have been modified since the given /// date. #[tracing::instrument(skip(self))] - pub async fn get_time_entries_for_user_modified_since(&self, since: DateTime) -> Result, TogglError> { + pub async fn get_time_entries_for_user_modified_since( + &self, + since: DateTime, + ) -> Result, TogglError> { let url = format!( "{base_url}/me/time_entries?since={since}", base_url = BASE_URL, since = since.timestamp() ); - Self::parse(self.client.get(&url) - .headers(self.headers.clone()) - .send().await?).await + Self::parse( + self.client + .get(&url) + .headers(self.headers.clone()) + .send() + .await?, + ) + .await } /// Fetches all time entries for this user from Toggl that have a start time between the given /// start and end times. #[tracing::instrument(skip(self))] - pub async fn get_time_user_entries_between(&self, start: DateTime, until: DateTime) -> Result, TogglError> { - let url = format!( - "{base_url}/me/time_entries", - base_url = BASE_URL, - ); + pub async fn get_time_user_entries_between( + &self, + start: DateTime, + until: DateTime, + ) -> Result, TogglError> { + let url = format!("{base_url}/me/time_entries", base_url = BASE_URL,); - Self::parse(self.client.get(url) - .headers(self.headers.clone()) - .query(&[ - ("start_date", start.to_rfc3339_opts(SecondsFormat::Secs, true)), - ("end_date", until.to_rfc3339_opts(SecondsFormat::Secs, true)), - ]) - .send().await?).await + Self::parse( + self.client + .get(url) + .headers(self.headers.clone()) + .query(&[ + ( + "start_date", + start.to_rfc3339_opts(SecondsFormat::Secs, true), + ), + ("end_date", until.to_rfc3339_opts(SecondsFormat::Secs, true)), + ]) + .send() + .await?, + ) + .await } #[tracing::instrument(skip(self))] pub async fn get_current_time_entry(&self) -> Result, TogglError> { - let url = format!( - "{base_url}/me/time_entries/current", - base_url = BASE_URL - ); + let url = format!("{base_url}/me/time_entries/current", base_url = BASE_URL); - Ok(self.client.get(&url) + Ok(self + .client + .get(&url) .headers(self.headers.clone()) .send() .await? - .json().await?) + .json() + .await?) } #[tracing::instrument(skip(self))] @@ -161,9 +184,14 @@ impl TogglApi { workspace_id = self.workspace_id ); - Self::parse(self.client.get(&url) - .headers(self.headers.clone()) - .send().await?).await + Self::parse( + self.client + .get(&url) + .headers(self.headers.clone()) + .send() + .await?, + ) + .await } #[tracing::instrument(skip(self))] @@ -174,20 +202,21 @@ impl TogglApi { workspace_id = self.workspace_id ); - Self::parse(self.client.get(&url) - .headers(self.headers.clone()) - .send().await?).await + Self::parse( + self.client + .get(&url) + .headers(self.headers.clone()) + .send() + .await?, + ) + .await } async fn parse(response: Response) -> Result { let data = response.text().await?; let result = serde_json_path_to_error::from_str(&data); - let result = result - .map_err(|error| TogglError::JsonWithDataError { - data, - error, - })?; + let result = result.map_err(|error| TogglError::JsonWithDataError { data, error })?; Ok(result) } @@ -200,19 +229,31 @@ impl TogglApi { workspace_id = self.workspace_id ); - Self::parse(self.client.get(&url) - .headers(self.headers.clone()) - .send().await?).await + Self::parse( + self.client + .get(&url) + .headers(self.headers.clone()) + .send() + .await?, + ) + .await } - fn paginate_filters(original_filters: &types::TogglReportFilters, last_row_id: u64) -> types::TogglReportFilters { + fn paginate_filters( + original_filters: &types::TogglReportFilters, + last_row_id: u64, + ) -> types::TogglReportFilters { let mut filters = original_filters.clone(); filters.first_row_number = Some(last_row_id + 1); filters } #[tracing::instrument(skip(self))] - pub async fn search(&self, workspace_id: u64, filters: types::TogglReportFilters) -> Result, TogglError> { + pub async fn search( + &self, + workspace_id: u64, + filters: types::TogglReportFilters, + ) -> Result, TogglError> { let url = format!( "{base_url}/workspace/{workspace_id}/search/time_entries", base_url = &REPORTS_BASE_URL, @@ -228,10 +269,15 @@ impl TogglApi { tokio::time::sleep(Duration::from_secs(1)).await; } - let data: Vec = Self::parse(self.client.post(&url) - .headers(self.headers.clone()) - .json(&Self::paginate_filters(&filters, last_row_number_n)) - .send().await?).await?; + let data: Vec = Self::parse( + self.client + .post(&url) + .headers(self.headers.clone()) + .json(&Self::paginate_filters(&filters, last_row_number_n)) + .send() + .await?, + ) + .await?; last_row_number = data.last().map(|e| e.row_number as u64); @@ -243,10 +289,10 @@ impl TogglApi { } pub mod types { - use std::collections::HashMap; use chrono::{DateTime, NaiveDate, Utc}; use serde::{Deserialize, Serialize}; use serde_with::skip_serializing_none; + use std::collections::HashMap; #[derive(Serialize, Deserialize, Debug, Clone, Soars)] pub struct TimeEntry { @@ -282,8 +328,8 @@ pub mod types { use serde::{Deserialize, Serialize}; pub fn serialize(duration: &Option, serializer: S) -> Result - where - S: serde::Serializer, + where + S: serde::Serializer, { match duration { None => i32::serialize(&-1, serializer), @@ -292,8 +338,8 @@ pub mod types { } pub fn deserialize<'de, D>(deserializer: D) -> Result, D::Error> - where - D: serde::Deserializer<'de>, + where + D: serde::Deserializer<'de>, { let duration = i32::deserialize(deserializer)?; if duration < 0 { @@ -339,13 +385,17 @@ pub mod types { impl fmt::Display for ProjectStatus { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}", match self { - ProjectStatus::Upcoming => "upcoming", - ProjectStatus::Active => "active", - ProjectStatus::Archived => "archived", - ProjectStatus::Ended => "ended", - ProjectStatus::Deleted => "deleted", - }) + write!( + f, + "{}", + match self { + ProjectStatus::Upcoming => "upcoming", + ProjectStatus::Active => "active", + ProjectStatus::Archived => "archived", + ProjectStatus::Ended => "ended", + ProjectStatus::Deleted => "deleted", + } + ) } } @@ -444,7 +494,8 @@ pub mod types { duration: Some(self.time_entries[0].seconds), updated_at: self.time_entries[0].updated_at, description: self.description, - tags: self.enriched_information + tags: self + .enriched_information .map(|e| e.tag_names.clone()) .unwrap_or_default(), @@ -492,8 +543,8 @@ pub mod types { pub rest: HashMap, } - use std::fmt; use soa_rs::Soars; + use std::fmt; impl fmt::Debug for TogglReportFilters { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {