use chrono::{DateTime, DurationRound, NaiveDate, TimeDelta, Utc}; use sqlx::{Connection, PgConnection, Postgres, QueryBuilder}; use toggl::TogglApi; use crate::toggl::types::{Tag, TimeEntry, TogglReportFilters}; use itertools::Itertools; use soa_rs::Soa; mod toggl; mod sensitive; #[derive(Debug, thiserror::Error)] enum AppError { #[error("Database error: {0}")] SqlxError(#[from] sqlx::Error), #[error("Toggl error: {0}")] TogglError(#[from] toggl::TogglError), #[error("User modified since time delta is too large. Max allowed is 90 days.")] LookBackTooLarge, } struct Worker { db: PgConnection, toggl_api: TogglApi, } struct TableSummary { client_ids: Vec, workspace_ids: Vec, project_ids: Vec, tag_ids: Vec, } impl Worker { async fn get_ids(&mut self) -> Result { let client_ids = sqlx::query!("select id from tracking_clients") .fetch_all(&mut self.db) .await?; let workspace_ids = sqlx::query!("select id from workspaces") .fetch_all(&mut self.db) .await?; let project_ids = sqlx::query!("select id from projects") .fetch_all(&mut self.db) .await?; let tag_ids = sqlx::query!("select id from tags") .fetch_all(&mut self.db) .await?; Ok(TableSummary { client_ids: client_ids.iter().map(|row| row.id as u64).collect(), workspace_ids: workspace_ids.iter().map(|row| row.id as u64).collect(), project_ids: project_ids.iter().map(|row| row.id as u64).collect(), tag_ids: tag_ids.iter().map(|row| row.id as u64).collect(), }) } pub async fn fetch_within(&mut self, start: DateTime, end: DateTime) -> Result<(), AppError> { let results = self.toggl_api.search(self.toggl_api.workspace_id, TogglReportFilters { start_date: Some(start.date_naive()), end_date: Some(end.date_naive()), ..Default::default() }).await?; let time_entries = results.into_iter() .map(|entry| entry.into_time_entry(self.toggl_api.workspace_id)) .collect::>(); self.update_database(time_entries).await } pub async fn fetch_changed_since(&mut self, look_back: TimeDelta) -> Result<(), AppError> { if look_back > TimeDelta::days(90) { return Err(AppError::LookBackTooLarge); } self.update_time_entries(Utc::now() - look_back).await } pub async fn update(&mut self, default_look_back: TimeDelta) -> Result<(), AppError> { let result = sqlx::query!("select max(updated_at) as last_updated_at from time_entries") .fetch_optional(&mut self.db) .await .expect("Could not fetch max updated_at from time_entries"); let fetch_since = result .and_then(|record| record.last_updated_at) .unwrap_or_else(|| Utc::now() - default_look_back); self.update_time_entries(fetch_since).await } async fn update_time_entries(&mut self, fetch_since: DateTime) -> Result<(), AppError> { let time_entries = self.toggl_api .get_time_entries_for_user_modified_since(fetch_since).await?; self.update_database(time_entries).await } async fn update_database(&mut self, time_entries: Vec) -> Result<(), AppError> { let existing_ids = self.get_ids().await?; let fetch_workspaces = time_entries.iter() .map(|entry| entry.workspace_id) .filter(|workspace_id| !existing_ids.workspace_ids.contains(&workspace_id)) .collect::>(); let fetch_projects = time_entries.iter() .map(|entry| entry.project_id) .filter_map(|project_id| project_id) .any(|project_id| !existing_ids.project_ids.contains(&project_id)); let fetch_tags = time_entries.iter() .flat_map(|entry| entry.tag_ids.iter()) .any(|tag| !existing_ids.tag_ids.contains(&tag)); if !fetch_workspaces.is_empty() { self.update_workspaces(&fetch_workspaces).await?; } if fetch_projects { self.update_projects(&existing_ids).await?; } if fetch_tags { self.update_tags().await?; } for entry in time_entries { sqlx::query!( r#" INSERT INTO time_entries (id, workspace_id, user_id, project_id, task_id, start, stop, duration, updated_at, description, tag_ids, billable, server_deleted_at, permissions) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14) ON CONFLICT (id) DO UPDATE SET workspace_id = excluded.workspace_id, user_id = excluded.user_id, project_id = excluded.project_id, task_id = excluded.task_id, start = excluded.start, stop = excluded.stop, duration = excluded.duration, updated_at = excluded.updated_at, description = excluded.description, tag_ids = excluded.tag_ids, billable = excluded.billable, server_deleted_at = excluded.server_deleted_at, permissions = excluded.permissions "#, entry.id as i64, entry.workspace_id as i64, entry.user_id as i64, entry.project_id.map(|id| id as i64), entry.task_id.map(|id| id as i64), entry.start, entry.stop, entry.duration.map(|d| d as i32), entry.updated_at, entry.description, &entry.tag_ids.iter().map(|id| *id as i64).collect::>(), entry.billable, entry.server_deleted_at, entry.permissions, ) .execute(&mut self.db) .await?; } Ok(()) } async fn update_workspaces(&mut self, workspace_ids: &[u64]) -> Result<(), AppError> { let workspaces = workspace_ids.iter() .map(|id| self.toggl_api.get_workspace(*id)); let workspaces = futures::future::join_all(workspaces).await .into_iter() .collect::, _>>()?; let workspaces = Soa::from(workspaces.as_slice()); sqlx::query!( r#" INSERT INTO workspaces (id, organization_id, name) SELECT * FROM UNNEST($1::bigint[], $2::bigint[], $3::text[]) ON CONFLICT (id) DO UPDATE SET organization_id = excluded.organization_id, name = excluded.name "#, &workspaces.id().iter().map(|id| *id as i64).collect::>()[..], &workspaces.organization_id().iter().map(|id| *id as i64).collect::>()[..], workspaces.name(), ) .execute(&mut self.db) .await?; Ok(()) } async fn update_projects(&mut self, existing_ids: &TableSummary) -> Result<(), AppError> { let projects = self.toggl_api.get_projects().await?; let fetch_clients = projects.iter() .map(|project| project.client_id) .filter_map(|client_id| client_id) .any(|client_id| !existing_ids.client_ids.contains(&(client_id as u64))); if fetch_clients { self.update_clients().await?; } for project in projects { sqlx::query!( r#" INSERT INTO projects (id, workspace_id, client_id, name, color, status, active, updated_at, start_date, created_at, server_deleted_at, actual_hours, actual_seconds, can_track_time, permissions) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15) ON CONFLICT (id) DO UPDATE SET workspace_id = excluded.workspace_id, client_id = excluded.client_id, name = excluded.name, color = excluded.color, status = excluded.status, active = excluded.active, updated_at = excluded.updated_at, start_date = excluded.start_date, created_at = excluded.created_at, server_deleted_at = excluded.server_deleted_at, actual_hours = excluded.actual_hours, actual_seconds = excluded.actual_seconds, can_track_time = excluded.can_track_time, permissions = excluded.permissions "#, project.id, project.workspace_id, project.client_id, project.name, project.color, project.status.to_string(), project.active, project.updated_at, project.start_date, project.created_at, project.server_deleted_at, project.actual_hours, project.actual_seconds, project.can_track_time, project.permissions, ) .execute(&mut self.db) .await?; } Ok(()) } async fn update_tags(&mut self) -> Result<(), AppError> { let tags = self.toggl_api.get_tags().await?; let tags: Soa = Soa::from(tags.as_slice()); sqlx::query!( r#" INSERT INTO tags (id, name, workspace_id, creator_id, updated_at, deleted_at, permissions) SELECT * FROM UNNEST($1::bigint[], $2::text[], $3::bigint[], $4::bigint[], $5::timestamptz[], $6::timestamptz[], $7::text[]) ON CONFLICT (id) DO UPDATE SET name = excluded.name, workspace_id = excluded.workspace_id, creator_id = excluded.creator_id, updated_at = excluded.updated_at, deleted_at = excluded.deleted_at, permissions = excluded.permissions "#, &tags.id().iter().map(|id| *id as i64).collect::>()[..], tags.name(), &tags.workspace_id().iter().map(|id| *id as i64).collect::>()[..], &tags.creator_id().iter().map(|id| *id as i64).collect::>()[..], tags.updated_at(), // Nullable fields fail to type check with UNNEST batch inserts so we silence the // errors using `: _`. tags.deleted_at(): _, tags.permissions(): _, ) .execute(&mut self.db) .await?; Ok(()) } async fn update_clients(&mut self) -> Result<(), AppError> { let clients = self.toggl_api.get_clients().await?; let mut query_builder: QueryBuilder = QueryBuilder::new( // Note the trailing space "INSERT INTO tracking_clients (id, updated_at, archived, creator_id, integration_provider, notes, name, server_deleted_at, workspace_id, permissions) " ); query_builder.push_values(clients.into_iter(), |mut b, client| { b.push_bind(client.id) .push_bind(client.updated_at) .push_bind(client.archived) .push_bind(client.creator_id) .push_bind(client.integration_provider) .push_bind(client.notes) .push_bind(client.name) .push_bind(client.server_deleted_at) .push_bind(client.workspace_id) .push_bind(client.permissions); }); query_builder.push(r#" ON CONFLICT (id) DO UPDATE SET updated_at = excluded.updated_at, archived = excluded.archived, creator_id = excluded.creator_id, integration_provider = excluded.integration_provider, notes = excluded.notes, name = excluded.name, server_deleted_at = excluded.server_deleted_at, workspace_id = excluded.workspace_id, permissions = excluded.permissions "#); let query = query_builder.build(); query.execute(&mut self.db) .await?; Ok(()) } } #[tokio::main] async fn main() { dotenv::dotenv() .expect("Failed to load .env file"); // Init tracing tracing_subscriber::fmt::init(); let api = TogglApi::new( sensitive::API_TOKEN, sensitive::WORKSPACE_ID, ); let database_url = std::env::var("DATABASE_URL") .expect("DATABASE_URL must be set"); let mut worker = Worker { db: PgConnection::connect(&database_url).await.unwrap(), toggl_api: api, }; worker.update_tags() .await .unwrap(); // let start = NaiveDate::from_ymd_opt(2024, 2, 1) // .expect("Invalid date") // .and_hms_opt(0, 0, 0) // .expect("Invalid time") // .and_utc(); // // let end = NaiveDate::from_ymd_opt(2024, 5, 1) // .expect("Invalid date") // .and_hms_opt(0, 0, 0) // .expect("Invalid time") // .and_utc(); // // worker.fetch_within(start, end).await // .expect("Failed to fetch time entries"); }