From 6f7db97ab691c06700b215c830d5db7e57193d1c Mon Sep 17 00:00:00 2001 From: Joshua Coles Date: Sat, 27 Jul 2024 20:41:43 +0100 Subject: [PATCH] Convert more to SOA --- src/main.rs | 159 ++++++++++++++++++++++++----------------------- src/toggl/mod.rs | 4 +- 2 files changed, 83 insertions(+), 80 deletions(-) diff --git a/src/main.rs b/src/main.rs index 9140436..b194136 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,9 +1,10 @@ use chrono::{DateTime, DurationRound, NaiveDate, TimeDelta, Utc}; use sqlx::{Connection, PgConnection, Postgres, QueryBuilder}; use toggl::TogglApi; -use crate::toggl::types::{Tag, TimeEntry, TogglReportFilters}; +use crate::toggl::types::{Tag, TimeEntry, TogglReportFilters, TrackingClient}; use itertools::Itertools; use soa_rs::Soa; +use tracing_subscriber::fmt::time; mod toggl; mod sensitive; @@ -129,48 +130,54 @@ impl Worker { self.update_tags().await?; } - for entry in time_entries { - sqlx::query!( - r#" - INSERT INTO time_entries (id, workspace_id, user_id, project_id, task_id, start, stop, duration, updated_at, description, tag_ids, billable, server_deleted_at, permissions) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14) - ON CONFLICT (id) DO UPDATE SET - workspace_id = excluded.workspace_id, - user_id = excluded.user_id, - project_id = excluded.project_id, - task_id = excluded.task_id, - start = excluded.start, - stop = excluded.stop, - duration = excluded.duration, - updated_at = excluded.updated_at, - description = excluded.description, - tag_ids = excluded.tag_ids, - billable = excluded.billable, - server_deleted_at = excluded.server_deleted_at, - permissions = excluded.permissions - "#, - entry.id as i64, - entry.workspace_id as i64, - entry.user_id as i64, - entry.project_id.map(|id| id as i64), - entry.task_id.map(|id| id as i64), - entry.start, - entry.stop, - entry.duration.map(|d| d as i32), - entry.updated_at, - entry.description, - &entry.tag_ids.iter().map(|id| *id as i64).collect::>(), - entry.billable, - entry.server_deleted_at, - entry.permissions, - ) - .execute(&mut self.db) - .await?; + for time_entries_chunk in time_entries.chunks(100) { + self.update_time_entries_chunk(time_entries_chunk).await?; } Ok(()) } + async fn update_time_entries_chunk(&mut self, time_entries: &[TimeEntry]) -> Result<(), AppError> { + let time_entries = Soa::from(time_entries); + + sqlx::query!( + r#" + INSERT INTO time_entries (id, workspace_id, user_id, project_id, task_id, start, stop, duration, updated_at, description, billable, server_deleted_at, permissions) + SELECT * FROM UNNEST($1::bigint[], $2::bigint[], $3::bigint[], $4::bigint[], $5::bigint[], $6::timestamptz[], $7::timestamptz[], $8::int[], $9::timestamptz[], $10::text[], $11::bool[], $12::timestamptz[], $13::text[]) + ON CONFLICT (id) DO UPDATE SET + workspace_id = excluded.workspace_id, + user_id = excluded.user_id, + project_id = excluded.project_id, + task_id = excluded.task_id, + start = excluded.start, + stop = excluded.stop, + duration = excluded.duration, + updated_at = excluded.updated_at, + description = excluded.description, + billable = excluded.billable, + server_deleted_at = excluded.server_deleted_at, + permissions = excluded.permissions + "#, + &time_entries.id().iter().map(|id| *id as i64).collect_vec()[..], + &time_entries.workspace_id().iter().map(|id| *id as i64).collect_vec()[..], + &time_entries.user_id().iter().map(|id| *id as i64).collect_vec()[..], + time_entries.project_id().iter().map(|opt| opt.map(|id| id as i64)).collect_vec() as _, + time_entries.task_id().iter().map(|opt| opt.map(|id| id as i64)).collect_vec() as _, + time_entries.start(), + time_entries.stop() as _, + time_entries.duration().iter().map(|opt| opt.map(|d| d as i32)).collect_vec() as _, + time_entries.updated_at(), + time_entries.description() as _, + time_entries.billable(), + time_entries.server_deleted_at() as _, + time_entries.permissions() as _, + ) + .execute(&mut self.db) + .await?; + + Ok(()) + } + async fn update_workspaces(&mut self, workspace_ids: &[u64]) -> Result<(), AppError> { let workspaces = workspace_ids.iter() .map(|id| self.toggl_api.get_workspace(*id)); @@ -277,9 +284,9 @@ impl Worker { &tags.creator_id().iter().map(|id| *id as i64).collect::>()[..], tags.updated_at(), // Nullable fields fail to type check with UNNEST batch inserts so we silence the - // errors using `: _`. - tags.deleted_at(): _, - tags.permissions(): _, + // errors using ` as _`. + tags.deleted_at() as _, + tags.permissions() as _, ) .execute(&mut self.db) .await?; @@ -290,25 +297,12 @@ impl Worker { async fn update_clients(&mut self) -> Result<(), AppError> { let clients = self.toggl_api.get_clients().await?; - let mut query_builder: QueryBuilder = QueryBuilder::new( - // Note the trailing space - "INSERT INTO tracking_clients (id, updated_at, archived, creator_id, integration_provider, notes, name, server_deleted_at, workspace_id, permissions) " - ); + let clients: Soa = Soa::from(clients.as_slice()); - query_builder.push_values(clients.into_iter(), |mut b, client| { - b.push_bind(client.id) - .push_bind(client.updated_at) - .push_bind(client.archived) - .push_bind(client.creator_id) - .push_bind(client.integration_provider) - .push_bind(client.notes) - .push_bind(client.name) - .push_bind(client.server_deleted_at) - .push_bind(client.workspace_id) - .push_bind(client.permissions); - }); - - query_builder.push(r#" + sqlx::query!(r#" + INSERT INTO tracking_clients + (id, updated_at, archived, creator_id, integration_provider, notes, name, server_deleted_at, workspace_id, permissions) + SELECT * FROM UNNEST($1::bigint[], $2::timestamptz[], $3::bool[], $4::bigint[], $5::text[], $6::text[], $7::text[], $8::timestamptz[], $9::bigint[], $10::text[]) ON CONFLICT (id) DO UPDATE SET updated_at = excluded.updated_at, archived = excluded.archived, @@ -319,11 +313,20 @@ impl Worker { server_deleted_at = excluded.server_deleted_at, workspace_id = excluded.workspace_id, permissions = excluded.permissions - "#); - - let query = query_builder.build(); - - query.execute(&mut self.db) + "#, + &clients.id().iter().map(|id| *id as i64).collect::>()[..], + clients.updated_at(), + clients.archived(), + &clients.creator_id().iter().map(|id| *id as i64).collect::>()[..], + // For the next two, we are assuming these are Option as datatype. If these are different, please update accordingly. + clients.integration_provider() as _, + clients.notes() as _, + clients.name(), + clients.server_deleted_at() as _, + &clients.workspace_id().iter().map(|id| *id as i64).collect::>()[..], + clients.permissions() as _ + ) + .execute(&mut self.db) .await?; Ok(()) @@ -355,18 +358,18 @@ async fn main() { .await .unwrap(); - // let start = NaiveDate::from_ymd_opt(2024, 2, 1) - // .expect("Invalid date") - // .and_hms_opt(0, 0, 0) - // .expect("Invalid time") - // .and_utc(); - // - // let end = NaiveDate::from_ymd_opt(2024, 5, 1) - // .expect("Invalid date") - // .and_hms_opt(0, 0, 0) - // .expect("Invalid time") - // .and_utc(); - // - // worker.fetch_within(start, end).await - // .expect("Failed to fetch time entries"); + let start = NaiveDate::from_ymd_opt(2024, 2, 1) + .expect("Invalid date") + .and_hms_opt(0, 0, 0) + .expect("Invalid time") + .and_utc(); + + let end = NaiveDate::from_ymd_opt(2024, 2, 2) + .expect("Invalid date") + .and_hms_opt(0, 0, 0) + .expect("Invalid time") + .and_utc(); + + worker.fetch_within(start, end).await + .expect("Failed to fetch time entries"); } diff --git a/src/toggl/mod.rs b/src/toggl/mod.rs index 16afbbb..1de4ee1 100644 --- a/src/toggl/mod.rs +++ b/src/toggl/mod.rs @@ -248,7 +248,7 @@ pub mod types { use serde::{Deserialize, Serialize}; use serde_with::skip_serializing_none; - #[derive(Serialize, Deserialize, Debug, Clone)] + #[derive(Serialize, Deserialize, Debug, Clone, Soars)] pub struct TimeEntry { pub id: u64, pub workspace_id: u64, @@ -349,7 +349,7 @@ pub mod types { } } - #[derive(Soars, Serialize, Deserialize, Debug, Clone)] + #[derive(Serialize, Deserialize, Debug, Clone, Soars)] pub struct TrackingClient { /// The unique identifier for the client. pub id: i64,