This commit is contained in:
Joshua Coles 2024-07-27 20:51:30 +01:00
parent 71f9ed3312
commit 25a52fa0e5
2 changed files with 193 additions and 117 deletions

View File

@ -1,13 +1,13 @@
use chrono::{DateTime, DurationRound, NaiveDate, TimeDelta, Utc};
use sqlx::{Connection, PgConnection, Postgres, QueryBuilder};
use toggl::TogglApi;
use crate::toggl::types::{Project, Tag, TimeEntry, TogglReportFilters, TrackingClient};
use chrono::{DateTime, DurationRound, NaiveDate, TimeDelta, Utc};
use itertools::Itertools;
use soa_rs::Soa;
use sqlx::{Connection, PgConnection, Postgres, QueryBuilder};
use toggl::TogglApi;
use tracing_subscriber::fmt::time;
mod toggl;
mod sensitive;
mod toggl;
#[derive(Debug, thiserror::Error)]
enum AppError {
@ -59,14 +59,25 @@ impl Worker {
})
}
pub async fn fetch_within(&mut self, start: DateTime<Utc>, end: DateTime<Utc>) -> Result<(), AppError> {
let results = self.toggl_api.search(self.toggl_api.workspace_id, TogglReportFilters {
start_date: Some(start.date_naive()),
end_date: Some(end.date_naive()),
..Default::default()
}).await?;
pub async fn fetch_within(
&mut self,
start: DateTime<Utc>,
end: DateTime<Utc>,
) -> Result<(), AppError> {
let results = self
.toggl_api
.search(
self.toggl_api.workspace_id,
TogglReportFilters {
start_date: Some(start.date_naive()),
end_date: Some(end.date_naive()),
..Default::default()
},
)
.await?;
let time_entries = results.into_iter()
let time_entries = results
.into_iter()
.map(|entry| entry.into_time_entry(self.toggl_api.workspace_id))
.collect::<Vec<_>>();
@ -95,8 +106,10 @@ impl Worker {
}
async fn update_time_entries(&mut self, fetch_since: DateTime<Utc>) -> Result<(), AppError> {
let time_entries = self.toggl_api
.get_time_entries_for_user_modified_since(fetch_since).await?;
let time_entries = self
.toggl_api
.get_time_entries_for_user_modified_since(fetch_since)
.await?;
self.update_database(time_entries).await
}
@ -104,17 +117,20 @@ impl Worker {
async fn update_database(&mut self, time_entries: Vec<TimeEntry>) -> Result<(), AppError> {
let existing_ids = self.get_ids().await?;
let fetch_workspaces = time_entries.iter()
let fetch_workspaces = time_entries
.iter()
.map(|entry| entry.workspace_id)
.filter(|workspace_id| !existing_ids.workspace_ids.contains(&workspace_id))
.collect::<Vec<_>>();
let fetch_projects = time_entries.iter()
let fetch_projects = time_entries
.iter()
.map(|entry| entry.project_id)
.filter_map(|project_id| project_id)
.any(|project_id| !existing_ids.project_ids.contains(&project_id));
let fetch_tags = time_entries.iter()
let fetch_tags = time_entries
.iter()
.flat_map(|entry| entry.tag_ids.iter())
.any(|tag| !existing_ids.tag_ids.contains(&tag));
@ -137,7 +153,10 @@ impl Worker {
Ok(())
}
async fn update_time_entries_chunk(&mut self, time_entries: &[TimeEntry]) -> Result<(), AppError> {
async fn update_time_entries_chunk(
&mut self,
time_entries: &[TimeEntry],
) -> Result<(), AppError> {
let time_entries = Soa::from(time_entries);
sqlx::query!(
@ -179,10 +198,12 @@ impl Worker {
}
async fn update_workspaces(&mut self, workspace_ids: &[u64]) -> Result<(), AppError> {
let workspaces = workspace_ids.iter()
let workspaces = workspace_ids
.iter()
.map(|id| self.toggl_api.get_workspace(*id));
let workspaces = futures::future::join_all(workspaces).await
let workspaces = futures::future::join_all(workspaces)
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()?;
@ -196,12 +217,20 @@ impl Worker {
organization_id = excluded.organization_id,
name = excluded.name
"#,
&workspaces.id().iter().map(|id| *id as i64).collect::<Vec<_>>()[..],
&workspaces.organization_id().iter().map(|id| *id as i64).collect::<Vec<_>>()[..],
&workspaces
.id()
.iter()
.map(|id| *id as i64)
.collect::<Vec<_>>()[..],
&workspaces
.organization_id()
.iter()
.map(|id| *id as i64)
.collect::<Vec<_>>()[..],
workspaces.name(),
)
.execute(&mut self.db)
.await?;
.execute(&mut self.db)
.await?;
Ok(())
}
@ -209,7 +238,8 @@ impl Worker {
async fn update_projects(&mut self, existing_ids: &TableSummary) -> Result<(), AppError> {
let projects = self.toggl_api.get_projects().await?;
let fetch_clients = projects.iter()
let fetch_clients = projects
.iter()
.map(|project| project.client_id)
.filter_map(|client_id| client_id)
.any(|client_id| !existing_ids.client_ids.contains(&(client_id as u64)));
@ -335,28 +365,21 @@ impl Worker {
#[tokio::main]
async fn main() {
dotenv::dotenv()
.expect("Failed to load .env file");
dotenv::dotenv().expect("Failed to load .env file");
// Init tracing
tracing_subscriber::fmt::init();
let api = TogglApi::new(
sensitive::API_TOKEN,
sensitive::WORKSPACE_ID,
);
let api = TogglApi::new(sensitive::API_TOKEN, sensitive::WORKSPACE_ID);
let database_url = std::env::var("DATABASE_URL")
.expect("DATABASE_URL must be set");
let database_url = std::env::var("DATABASE_URL").expect("DATABASE_URL must be set");
let mut worker = Worker {
db: PgConnection::connect(&database_url).await.unwrap(),
toggl_api: api,
};
worker.update_tags()
.await
.unwrap();
worker.update_tags().await.unwrap();
let start = NaiveDate::from_ymd_opt(2024, 2, 1)
.expect("Invalid date")
@ -370,6 +393,8 @@ async fn main() {
.expect("Invalid time")
.and_utc();
worker.fetch_within(start, end).await
worker
.fetch_within(start, end)
.await
.expect("Failed to fetch time entries");
}

View File

@ -1,17 +1,17 @@
use reqwest_retry::policies::ExponentialBackoff;
use std::time::Duration;
use reqwest_retry::{Jitter, RetryTransientMiddleware};
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
use std::num::NonZero;
use axum::async_trait;
use governor::state::{InMemoryState, NotKeyed};
use governor::clock::DefaultClock;
use reqwest::header::{HeaderMap, HeaderValue};
use base64::engine::general_purpose::STANDARD;
use base64::Engine;
use chrono::{DateTime, SecondsFormat, Utc};
use governor::clock::DefaultClock;
use governor::state::{InMemoryState, NotKeyed};
use reqwest::header::{HeaderMap, HeaderValue};
use reqwest::Response;
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
use reqwest_retry::policies::ExponentialBackoff;
use reqwest_retry::{Jitter, RetryTransientMiddleware};
use serde::de::DeserializeOwned;
use std::num::NonZero;
use std::time::Duration;
struct ReqwestRateLimiter {
rate_limiter: governor::RateLimiter<NotKeyed, InMemoryState, DefaultClock>,
@ -20,9 +20,9 @@ struct ReqwestRateLimiter {
impl ReqwestRateLimiter {
fn new() -> Self {
Self {
rate_limiter: governor::RateLimiter::direct(
governor::Quota::per_second(NonZero::new(1u32).unwrap())
),
rate_limiter: governor::RateLimiter::direct(governor::Quota::per_second(
NonZero::new(1u32).unwrap(),
)),
}
}
}
@ -63,7 +63,11 @@ impl TogglApi {
let toggl_auth = &STANDARD.encode(format!("{}:api_token", api_key));
let headers = Self::authorisation_headers(toggl_auth);
Self { client, workspace_id, headers }
Self {
client,
workspace_id,
headers,
}
}
fn authorisation_headers(toggl_auth: &str) -> HeaderMap {
@ -77,14 +81,15 @@ impl TogglApi {
/// Get the workspaces that a user is a part of
#[tracing::instrument(skip(self))]
async fn get_users_workspaces(
&self
) -> Result<Vec<types::Workspace>, TogglError> {
async fn get_users_workspaces(&self) -> Result<Vec<types::Workspace>, TogglError> {
let url = format!("{base_url}/me/workspaces", base_url = BASE_URL);
let response = self.client.get(&url)
let response = self
.client
.get(&url)
.headers(self.headers.clone())
.send().await?;
.send()
.await?;
let data = response.text().await?;
let workspaces: Vec<types::Workspace> = serde_json::from_str(&data)?;
@ -95,62 +100,80 @@ impl TogglApi {
/// Get a specific workspace by its ID
#[tracing::instrument(skip(self))]
pub async fn get_workspace(&self, id: u64) -> Result<types::Workspace, TogglError> {
let url = format!(
"{base_url}/workspaces/{id}",
base_url = BASE_URL,
id = id
);
let url = format!("{base_url}/workspaces/{id}", base_url = BASE_URL, id = id);
Self::parse(self.client.get(&url)
.headers(self.headers.clone())
.send().await?).await
Self::parse(
self.client
.get(&url)
.headers(self.headers.clone())
.send()
.await?,
)
.await
}
/// Fetches all time entries for this user from Toggl that have been modified since the given
/// date.
#[tracing::instrument(skip(self))]
pub async fn get_time_entries_for_user_modified_since(&self, since: DateTime<Utc>) -> Result<Vec<types::TimeEntry>, TogglError> {
pub async fn get_time_entries_for_user_modified_since(
&self,
since: DateTime<Utc>,
) -> Result<Vec<types::TimeEntry>, TogglError> {
let url = format!(
"{base_url}/me/time_entries?since={since}",
base_url = BASE_URL,
since = since.timestamp()
);
Self::parse(self.client.get(&url)
.headers(self.headers.clone())
.send().await?).await
Self::parse(
self.client
.get(&url)
.headers(self.headers.clone())
.send()
.await?,
)
.await
}
/// Fetches all time entries for this user from Toggl that have a start time between the given
/// start and end times.
#[tracing::instrument(skip(self))]
pub async fn get_time_user_entries_between(&self, start: DateTime<Utc>, until: DateTime<Utc>) -> Result<Vec<types::TimeEntry>, TogglError> {
let url = format!(
"{base_url}/me/time_entries",
base_url = BASE_URL,
);
pub async fn get_time_user_entries_between(
&self,
start: DateTime<Utc>,
until: DateTime<Utc>,
) -> Result<Vec<types::TimeEntry>, TogglError> {
let url = format!("{base_url}/me/time_entries", base_url = BASE_URL,);
Self::parse(self.client.get(url)
.headers(self.headers.clone())
.query(&[
("start_date", start.to_rfc3339_opts(SecondsFormat::Secs, true)),
("end_date", until.to_rfc3339_opts(SecondsFormat::Secs, true)),
])
.send().await?).await
Self::parse(
self.client
.get(url)
.headers(self.headers.clone())
.query(&[
(
"start_date",
start.to_rfc3339_opts(SecondsFormat::Secs, true),
),
("end_date", until.to_rfc3339_opts(SecondsFormat::Secs, true)),
])
.send()
.await?,
)
.await
}
#[tracing::instrument(skip(self))]
pub async fn get_current_time_entry(&self) -> Result<Option<types::TimeEntry>, TogglError> {
let url = format!(
"{base_url}/me/time_entries/current",
base_url = BASE_URL
);
let url = format!("{base_url}/me/time_entries/current", base_url = BASE_URL);
Ok(self.client.get(&url)
Ok(self
.client
.get(&url)
.headers(self.headers.clone())
.send()
.await?
.json().await?)
.json()
.await?)
}
#[tracing::instrument(skip(self))]
@ -161,9 +184,14 @@ impl TogglApi {
workspace_id = self.workspace_id
);
Self::parse(self.client.get(&url)
.headers(self.headers.clone())
.send().await?).await
Self::parse(
self.client
.get(&url)
.headers(self.headers.clone())
.send()
.await?,
)
.await
}
#[tracing::instrument(skip(self))]
@ -174,20 +202,21 @@ impl TogglApi {
workspace_id = self.workspace_id
);
Self::parse(self.client.get(&url)
.headers(self.headers.clone())
.send().await?).await
Self::parse(
self.client
.get(&url)
.headers(self.headers.clone())
.send()
.await?,
)
.await
}
async fn parse<T: DeserializeOwned>(response: Response) -> Result<T, TogglError> {
let data = response.text().await?;
let result = serde_json_path_to_error::from_str(&data);
let result = result
.map_err(|error| TogglError::JsonWithDataError {
data,
error,
})?;
let result = result.map_err(|error| TogglError::JsonWithDataError { data, error })?;
Ok(result)
}
@ -200,19 +229,31 @@ impl TogglApi {
workspace_id = self.workspace_id
);
Self::parse(self.client.get(&url)
.headers(self.headers.clone())
.send().await?).await
Self::parse(
self.client
.get(&url)
.headers(self.headers.clone())
.send()
.await?,
)
.await
}
fn paginate_filters(original_filters: &types::TogglReportFilters, last_row_id: u64) -> types::TogglReportFilters {
fn paginate_filters(
original_filters: &types::TogglReportFilters,
last_row_id: u64,
) -> types::TogglReportFilters {
let mut filters = original_filters.clone();
filters.first_row_number = Some(last_row_id + 1);
filters
}
#[tracing::instrument(skip(self))]
pub async fn search(&self, workspace_id: u64, filters: types::TogglReportFilters) -> Result<Vec<types::ReportEntry>, TogglError> {
pub async fn search(
&self,
workspace_id: u64,
filters: types::TogglReportFilters,
) -> Result<Vec<types::ReportEntry>, TogglError> {
let url = format!(
"{base_url}/workspace/{workspace_id}/search/time_entries",
base_url = &REPORTS_BASE_URL,
@ -228,10 +269,15 @@ impl TogglApi {
tokio::time::sleep(Duration::from_secs(1)).await;
}
let data: Vec<types::ReportEntry> = Self::parse(self.client.post(&url)
.headers(self.headers.clone())
.json(&Self::paginate_filters(&filters, last_row_number_n))
.send().await?).await?;
let data: Vec<types::ReportEntry> = Self::parse(
self.client
.post(&url)
.headers(self.headers.clone())
.json(&Self::paginate_filters(&filters, last_row_number_n))
.send()
.await?,
)
.await?;
last_row_number = data.last().map(|e| e.row_number as u64);
@ -243,10 +289,10 @@ impl TogglApi {
}
pub mod types {
use std::collections::HashMap;
use chrono::{DateTime, NaiveDate, Utc};
use serde::{Deserialize, Serialize};
use serde_with::skip_serializing_none;
use std::collections::HashMap;
#[derive(Serialize, Deserialize, Debug, Clone, Soars)]
pub struct TimeEntry {
@ -282,8 +328,8 @@ pub mod types {
use serde::{Deserialize, Serialize};
pub fn serialize<S>(duration: &Option<u32>, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
where
S: serde::Serializer,
{
match duration {
None => i32::serialize(&-1, serializer),
@ -292,8 +338,8 @@ pub mod types {
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Option<u32>, D::Error>
where
D: serde::Deserializer<'de>,
where
D: serde::Deserializer<'de>,
{
let duration = i32::deserialize(deserializer)?;
if duration < 0 {
@ -339,13 +385,17 @@ pub mod types {
impl fmt::Display for ProjectStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", match self {
ProjectStatus::Upcoming => "upcoming",
ProjectStatus::Active => "active",
ProjectStatus::Archived => "archived",
ProjectStatus::Ended => "ended",
ProjectStatus::Deleted => "deleted",
})
write!(
f,
"{}",
match self {
ProjectStatus::Upcoming => "upcoming",
ProjectStatus::Active => "active",
ProjectStatus::Archived => "archived",
ProjectStatus::Ended => "ended",
ProjectStatus::Deleted => "deleted",
}
)
}
}
@ -444,7 +494,8 @@ pub mod types {
duration: Some(self.time_entries[0].seconds),
updated_at: self.time_entries[0].updated_at,
description: self.description,
tags: self.enriched_information
tags: self
.enriched_information
.map(|e| e.tag_names.clone())
.unwrap_or_default(),
@ -492,8 +543,8 @@ pub mod types {
pub rest: HashMap<String, serde_json::Value>,
}
use std::fmt;
use soa_rs::Soars;
use std::fmt;
impl fmt::Debug for TogglReportFilters {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {