use crate::toggl::types::{Project, Tag, TimeEntry, TogglReportFilters, TrackingClient}; use crate::toggl::TogglApi; use crate::{AppError, TableSummary}; use chrono::{DateTime, NaiveDate, TimeDelta, Utc}; use itertools::Itertools; use soa_rs::Soa; use sqlx::PgPool; macro_rules! cast_slice { ($slice: expr, $typ: ty) => { &($slice.iter().map(|id| *id as $typ).collect_vec())[..] }; } macro_rules! cast_slice_opts { ($slice: expr, $typ: ty) => { $slice .iter() .map(|opt| opt.map(|id| id as $typ)) .collect_vec() }; } #[derive(Debug, Clone)] pub struct Worker { pub(crate) db: PgPool, pub(crate) toggl_api: TogglApi, } impl Worker { async fn get_ids(&self) -> Result { let client_ids = sqlx::query!("select id from tracking_clients") .fetch_all(&self.db) .await?; let workspace_ids = sqlx::query!("select id from workspaces") .fetch_all(&self.db) .await?; let project_ids = sqlx::query!("select id from projects") .fetch_all(&self.db) .await?; let tag_ids = sqlx::query!("select id from tags") .fetch_all(&self.db) .await?; Ok(TableSummary { client_ids: client_ids.iter().map(|row| row.id as u64).collect(), workspace_ids: workspace_ids.iter().map(|row| row.id as u64).collect(), project_ids: project_ids.iter().map(|row| row.id as u64).collect(), tag_ids: tag_ids.iter().map(|row| row.id as u64).collect(), }) } pub async fn fetch_within(&self, start: NaiveDate, end: NaiveDate) -> Result<(), AppError> { let results = self .toggl_api .search( self.toggl_api.workspace_id, TogglReportFilters { start_date: Some(start), end_date: Some(end), ..Default::default() }, ) .await?; let time_entries = results .into_iter() .map(|entry| entry.into_time_entry(self.toggl_api.workspace_id)) .collect::>(); self.update_database(time_entries).await } pub async fn fetch_changed_since(&self, look_back: TimeDelta) -> Result<(), AppError> { if look_back > TimeDelta::days(90) { return Err(AppError::LookBackTooLarge); } self.update_time_entries(Utc::now() - look_back).await } pub async fn update(&self, default_look_back: TimeDelta) -> Result<(), AppError> { let result = sqlx::query!("select max(updated_at) as last_updated_at from time_entries") .fetch_optional(&self.db) .await .expect("Could not fetch max updated_at from time_entries"); let fetch_since = result .and_then(|record| record.last_updated_at) .unwrap_or_else(|| Utc::now() - default_look_back); self.update_time_entries(fetch_since).await } async fn update_time_entries(&self, fetch_since: DateTime) -> Result<(), AppError> { let time_entries = self .toggl_api .get_time_entries_for_user_modified_since(fetch_since) .await?; self.update_database(time_entries).await } async fn update_database(&self, time_entries: Vec) -> Result<(), AppError> { let existing_ids = self.get_ids().await?; let fetch_workspaces = time_entries .iter() .map(|entry| entry.workspace_id) .filter(|workspace_id| !existing_ids.workspace_ids.contains(&workspace_id)) .unique() .collect::>(); let fetch_projects = time_entries .iter() .map(|entry| entry.project_id) .filter_map(|project_id| project_id) .any(|project_id| !existing_ids.project_ids.contains(&project_id)); let fetch_tags = time_entries .iter() .flat_map(|entry| entry.tag_ids.iter()) .any(|tag| !existing_ids.tag_ids.contains(&tag)); if !fetch_workspaces.is_empty() { self.update_workspaces(&fetch_workspaces).await?; } if fetch_projects { self.update_projects(&existing_ids).await?; } if fetch_tags { self.update_tags().await?; } for time_entries_chunk in time_entries.chunks(100) { self.update_time_entries_chunk(time_entries_chunk).await?; } Ok(()) } async fn update_time_entries_chunk(&self, time_entries: &[TimeEntry]) -> Result<(), AppError> { let time_entries = Soa::from(time_entries); sqlx::query!( r#" INSERT INTO time_entries (id, workspace_id, user_id, project_id, task_id, start, stop, duration, updated_at, description, billable, server_deleted_at, permissions) SELECT * FROM UNNEST($1::bigint[], $2::bigint[], $3::bigint[], $4::bigint[], $5::bigint[], $6::timestamptz[], $7::timestamptz[], $8::int[], $9::timestamptz[], $10::text[], $11::bool[], $12::timestamptz[], $13::text[]) ON CONFLICT (id) DO UPDATE SET workspace_id = excluded.workspace_id, user_id = excluded.user_id, project_id = excluded.project_id, task_id = excluded.task_id, start = excluded.start, stop = excluded.stop, duration = excluded.duration, updated_at = excluded.updated_at, description = excluded.description, billable = excluded.billable, server_deleted_at = excluded.server_deleted_at, permissions = excluded.permissions "#, cast_slice!(time_entries.id(), i64), cast_slice!(time_entries.workspace_id(), i64), cast_slice!(time_entries.user_id(), i64), cast_slice_opts!(time_entries.project_id(), i64) as _, cast_slice_opts!(time_entries.task_id(), i64) as _, time_entries.start(), time_entries.stop() as _, cast_slice_opts!(time_entries.duration(), i32) as _, time_entries.updated_at(), time_entries.description() as _, time_entries.billable(), time_entries.server_deleted_at() as _, time_entries.permissions() as _, ) .execute(&self.db) .await?; Ok(()) } #[tracing::instrument(skip(self))] async fn update_workspaces(&self, workspace_ids: &[u64]) -> Result<(), AppError> { let workspaces = workspace_ids .iter() .map(|id| self.toggl_api.get_workspace(*id)); let workspaces = futures::future::join_all(workspaces) .await .into_iter() .collect::, _>>()?; let workspaces = Soa::from(workspaces.as_slice()); sqlx::query!( r#" INSERT INTO workspaces (id, organization_id, name) SELECT * FROM UNNEST($1::bigint[], $2::bigint[], $3::text[]) ON CONFLICT (id) DO UPDATE SET organization_id = excluded.organization_id, name = excluded.name "#, cast_slice!(workspaces.id(), i64), cast_slice!(workspaces.organization_id(), i64), workspaces.name(), ) .execute(&self.db) .await?; Ok(()) } async fn update_projects(&self, existing_ids: &TableSummary) -> Result<(), AppError> { let projects = self.toggl_api.get_projects().await?; let fetch_clients = projects .iter() .map(|project| project.client_id) .filter_map(|client_id| client_id) .any(|client_id| !existing_ids.client_ids.contains(&(client_id as u64))); if fetch_clients { self.update_clients().await?; } let projects: Soa = Soa::from(projects.as_slice()); sqlx::query!( r#" INSERT INTO projects (id, workspace_id, client_id, name, color, status, active, updated_at, start_date, created_at, server_deleted_at, actual_hours, actual_seconds, can_track_time, permissions) SELECT * FROM UNNEST($1::bigint[], $2::bigint[], $3::bigint[], $4::text[], $5::text[], $6::text[], $7::bool[], $8::timestamptz[], $9::date[], $10::timestamptz[], $11::timestamptz[], $12::int[], $13::int[], $14::bool[], $15::text[]) ON CONFLICT (id) DO UPDATE SET workspace_id = excluded.workspace_id, client_id = excluded.client_id, name = excluded.name, color = excluded.color, status = excluded.status, active = excluded.active, updated_at = excluded.updated_at, start_date = excluded.start_date, created_at = excluded.created_at, server_deleted_at = excluded.server_deleted_at, actual_hours = excluded.actual_hours, actual_seconds = excluded.actual_seconds, can_track_time = excluded.can_track_time, permissions = excluded.permissions "#, cast_slice!(projects.id(), i64), cast_slice!(projects.workspace_id(), i64), cast_slice_opts!(projects.client_id(), i64) as _, projects.name(), projects.color(), &projects.status().iter().map(|s| s.to_string()).collect::>()[..], projects.active(), projects.updated_at(), projects.start_date() as _, projects.created_at(), projects.server_deleted_at() as _, cast_slice_opts!(projects.actual_hours(), i64) as _, cast_slice_opts!(projects.actual_seconds(), i64) as _, projects.can_track_time(), projects.permissions() as _, ) .execute(&self.db) .await?; Ok(()) } pub(crate) async fn update_tags(&self) -> Result<(), AppError> { let tags = self.toggl_api.get_tags().await?; let tags: Soa = Soa::from(tags.as_slice()); sqlx::query!( r#" INSERT INTO tags (id, name, workspace_id, creator_id, updated_at, deleted_at, permissions) SELECT * FROM UNNEST($1::bigint[], $2::text[], $3::bigint[], $4::bigint[], $5::timestamptz[], $6::timestamptz[], $7::text[]) ON CONFLICT (id) DO UPDATE SET name = excluded.name, workspace_id = excluded.workspace_id, creator_id = excluded.creator_id, updated_at = excluded.updated_at, deleted_at = excluded.deleted_at, permissions = excluded.permissions "#, cast_slice!(tags.id(), i64), tags.name(), cast_slice!(tags.workspace_id(), i64), cast_slice!(tags.creator_id(), i64), tags.updated_at(), // Nullable fields fail to type check with UNNEST batch inserts so we silence the // errors using ` as _`. tags.deleted_at() as _, tags.permissions() as _, ) .execute(&self.db) .await?; Ok(()) } async fn update_clients(&self) -> Result<(), AppError> { let clients = self.toggl_api.get_clients().await?; let clients: Soa = Soa::from(clients.as_slice()); sqlx::query!(r#" INSERT INTO tracking_clients (id, updated_at, archived, creator_id, integration_provider, notes, name, server_deleted_at, workspace_id, permissions) SELECT * FROM UNNEST($1::bigint[], $2::timestamptz[], $3::bool[], $4::bigint[], $5::text[], $6::text[], $7::text[], $8::timestamptz[], $9::bigint[], $10::text[]) ON CONFLICT (id) DO UPDATE SET updated_at = excluded.updated_at, archived = excluded.archived, creator_id = excluded.creator_id, integration_provider = excluded.integration_provider, notes = excluded.notes, name = excluded.name, server_deleted_at = excluded.server_deleted_at, workspace_id = excluded.workspace_id, permissions = excluded.permissions "#, cast_slice!(clients.id(), i64), clients.updated_at(), clients.archived(), cast_slice!(clients.creator_id(), i64), // For the next two, we are assuming these are Option as datatype. If these are different, please update accordingly. clients.integration_provider() as _, clients.notes() as _, clients.name(), clients.server_deleted_at() as _, cast_slice!(clients.workspace_id(), i64), clients.permissions() as _ ) .execute(&self.db) .await?; Ok(()) } }