Convert more to SOA

This commit is contained in:
Joshua Coles 2024-07-27 20:41:43 +01:00
parent 300ee37cf5
commit 6f7db97ab6
2 changed files with 83 additions and 80 deletions

View File

@ -1,9 +1,10 @@
use chrono::{DateTime, DurationRound, NaiveDate, TimeDelta, Utc}; use chrono::{DateTime, DurationRound, NaiveDate, TimeDelta, Utc};
use sqlx::{Connection, PgConnection, Postgres, QueryBuilder}; use sqlx::{Connection, PgConnection, Postgres, QueryBuilder};
use toggl::TogglApi; use toggl::TogglApi;
use crate::toggl::types::{Tag, TimeEntry, TogglReportFilters}; use crate::toggl::types::{Tag, TimeEntry, TogglReportFilters, TrackingClient};
use itertools::Itertools; use itertools::Itertools;
use soa_rs::Soa; use soa_rs::Soa;
use tracing_subscriber::fmt::time;
mod toggl; mod toggl;
mod sensitive; mod sensitive;
@ -129,11 +130,20 @@ impl Worker {
self.update_tags().await?; self.update_tags().await?;
} }
for entry in time_entries { for time_entries_chunk in time_entries.chunks(100) {
self.update_time_entries_chunk(time_entries_chunk).await?;
}
Ok(())
}
async fn update_time_entries_chunk(&mut self, time_entries: &[TimeEntry]) -> Result<(), AppError> {
let time_entries = Soa::from(time_entries);
sqlx::query!( sqlx::query!(
r#" r#"
INSERT INTO time_entries (id, workspace_id, user_id, project_id, task_id, start, stop, duration, updated_at, description, tag_ids, billable, server_deleted_at, permissions) INSERT INTO time_entries (id, workspace_id, user_id, project_id, task_id, start, stop, duration, updated_at, description, billable, server_deleted_at, permissions)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14) SELECT * FROM UNNEST($1::bigint[], $2::bigint[], $3::bigint[], $4::bigint[], $5::bigint[], $6::timestamptz[], $7::timestamptz[], $8::int[], $9::timestamptz[], $10::text[], $11::bool[], $12::timestamptz[], $13::text[])
ON CONFLICT (id) DO UPDATE SET ON CONFLICT (id) DO UPDATE SET
workspace_id = excluded.workspace_id, workspace_id = excluded.workspace_id,
user_id = excluded.user_id, user_id = excluded.user_id,
@ -144,29 +154,26 @@ impl Worker {
duration = excluded.duration, duration = excluded.duration,
updated_at = excluded.updated_at, updated_at = excluded.updated_at,
description = excluded.description, description = excluded.description,
tag_ids = excluded.tag_ids,
billable = excluded.billable, billable = excluded.billable,
server_deleted_at = excluded.server_deleted_at, server_deleted_at = excluded.server_deleted_at,
permissions = excluded.permissions permissions = excluded.permissions
"#, "#,
entry.id as i64, &time_entries.id().iter().map(|id| *id as i64).collect_vec()[..],
entry.workspace_id as i64, &time_entries.workspace_id().iter().map(|id| *id as i64).collect_vec()[..],
entry.user_id as i64, &time_entries.user_id().iter().map(|id| *id as i64).collect_vec()[..],
entry.project_id.map(|id| id as i64), time_entries.project_id().iter().map(|opt| opt.map(|id| id as i64)).collect_vec() as _,
entry.task_id.map(|id| id as i64), time_entries.task_id().iter().map(|opt| opt.map(|id| id as i64)).collect_vec() as _,
entry.start, time_entries.start(),
entry.stop, time_entries.stop() as _,
entry.duration.map(|d| d as i32), time_entries.duration().iter().map(|opt| opt.map(|d| d as i32)).collect_vec() as _,
entry.updated_at, time_entries.updated_at(),
entry.description, time_entries.description() as _,
&entry.tag_ids.iter().map(|id| *id as i64).collect::<Vec<_>>(), time_entries.billable(),
entry.billable, time_entries.server_deleted_at() as _,
entry.server_deleted_at, time_entries.permissions() as _,
entry.permissions,
) )
.execute(&mut self.db) .execute(&mut self.db)
.await?; .await?;
}
Ok(()) Ok(())
} }
@ -277,9 +284,9 @@ impl Worker {
&tags.creator_id().iter().map(|id| *id as i64).collect::<Vec<_>>()[..], &tags.creator_id().iter().map(|id| *id as i64).collect::<Vec<_>>()[..],
tags.updated_at(), tags.updated_at(),
// Nullable fields fail to type check with UNNEST batch inserts so we silence the // Nullable fields fail to type check with UNNEST batch inserts so we silence the
// errors using `: _`. // errors using ` as _`.
tags.deleted_at(): _, tags.deleted_at() as _,
tags.permissions(): _, tags.permissions() as _,
) )
.execute(&mut self.db) .execute(&mut self.db)
.await?; .await?;
@ -290,25 +297,12 @@ impl Worker {
async fn update_clients(&mut self) -> Result<(), AppError> { async fn update_clients(&mut self) -> Result<(), AppError> {
let clients = self.toggl_api.get_clients().await?; let clients = self.toggl_api.get_clients().await?;
let mut query_builder: QueryBuilder<Postgres> = QueryBuilder::new( let clients: Soa<TrackingClient> = Soa::from(clients.as_slice());
// Note the trailing space
"INSERT INTO tracking_clients (id, updated_at, archived, creator_id, integration_provider, notes, name, server_deleted_at, workspace_id, permissions) "
);
query_builder.push_values(clients.into_iter(), |mut b, client| { sqlx::query!(r#"
b.push_bind(client.id) INSERT INTO tracking_clients
.push_bind(client.updated_at) (id, updated_at, archived, creator_id, integration_provider, notes, name, server_deleted_at, workspace_id, permissions)
.push_bind(client.archived) SELECT * FROM UNNEST($1::bigint[], $2::timestamptz[], $3::bool[], $4::bigint[], $5::text[], $6::text[], $7::text[], $8::timestamptz[], $9::bigint[], $10::text[])
.push_bind(client.creator_id)
.push_bind(client.integration_provider)
.push_bind(client.notes)
.push_bind(client.name)
.push_bind(client.server_deleted_at)
.push_bind(client.workspace_id)
.push_bind(client.permissions);
});
query_builder.push(r#"
ON CONFLICT (id) DO UPDATE SET ON CONFLICT (id) DO UPDATE SET
updated_at = excluded.updated_at, updated_at = excluded.updated_at,
archived = excluded.archived, archived = excluded.archived,
@ -319,11 +313,20 @@ impl Worker {
server_deleted_at = excluded.server_deleted_at, server_deleted_at = excluded.server_deleted_at,
workspace_id = excluded.workspace_id, workspace_id = excluded.workspace_id,
permissions = excluded.permissions permissions = excluded.permissions
"#); "#,
&clients.id().iter().map(|id| *id as i64).collect::<Vec<_>>()[..],
let query = query_builder.build(); clients.updated_at(),
clients.archived(),
query.execute(&mut self.db) &clients.creator_id().iter().map(|id| *id as i64).collect::<Vec<_>>()[..],
// For the next two, we are assuming these are Option<String> as datatype. If these are different, please update accordingly.
clients.integration_provider() as _,
clients.notes() as _,
clients.name(),
clients.server_deleted_at() as _,
&clients.workspace_id().iter().map(|id| *id as i64).collect::<Vec<_>>()[..],
clients.permissions() as _
)
.execute(&mut self.db)
.await?; .await?;
Ok(()) Ok(())
@ -355,18 +358,18 @@ async fn main() {
.await .await
.unwrap(); .unwrap();
// let start = NaiveDate::from_ymd_opt(2024, 2, 1) let start = NaiveDate::from_ymd_opt(2024, 2, 1)
// .expect("Invalid date") .expect("Invalid date")
// .and_hms_opt(0, 0, 0) .and_hms_opt(0, 0, 0)
// .expect("Invalid time") .expect("Invalid time")
// .and_utc(); .and_utc();
//
// let end = NaiveDate::from_ymd_opt(2024, 5, 1) let end = NaiveDate::from_ymd_opt(2024, 2, 2)
// .expect("Invalid date") .expect("Invalid date")
// .and_hms_opt(0, 0, 0) .and_hms_opt(0, 0, 0)
// .expect("Invalid time") .expect("Invalid time")
// .and_utc(); .and_utc();
//
// worker.fetch_within(start, end).await worker.fetch_within(start, end).await
// .expect("Failed to fetch time entries"); .expect("Failed to fetch time entries");
} }

View File

@ -248,7 +248,7 @@ pub mod types {
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_with::skip_serializing_none; use serde_with::skip_serializing_none;
#[derive(Serialize, Deserialize, Debug, Clone)] #[derive(Serialize, Deserialize, Debug, Clone, Soars)]
pub struct TimeEntry { pub struct TimeEntry {
pub id: u64, pub id: u64,
pub workspace_id: u64, pub workspace_id: u64,
@ -349,7 +349,7 @@ pub mod types {
} }
} }
#[derive(Soars, Serialize, Deserialize, Debug, Clone)] #[derive(Serialize, Deserialize, Debug, Clone, Soars)]
pub struct TrackingClient { pub struct TrackingClient {
/// The unique identifier for the client. /// The unique identifier for the client.
pub id: i64, pub id: i64,