Try bulk insert with query builder

This commit is contained in:
Joshua Coles 2024-07-27 20:03:26 +01:00
parent ead22776c3
commit ce46739f30
4 changed files with 67 additions and 46 deletions

10
Cargo.lock generated
View File

@ -954,6 +954,15 @@ version = "2.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3"
[[package]]
name = "itertools"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186"
dependencies = [
"either",
]
[[package]]
name = "itoa"
version = "1.0.11"
@ -2325,6 +2334,7 @@ dependencies = [
"dotenv",
"futures",
"governor",
"itertools",
"reqwest",
"reqwest-middleware",
"reqwest-ratelimit",

View File

@ -24,3 +24,4 @@ dotenv = "0.15.0"
futures = "0.3.30"
tracing = "0.1.40"
tracing-subscriber = "0.3.18"
itertools = "0.13.0"

View File

@ -1,7 +1,8 @@
use chrono::{DateTime, DurationRound, NaiveDate, TimeDelta, Utc};
use sqlx::{Connection, PgConnection};
use sqlx::{Connection, PgConnection, Postgres, QueryBuilder};
use toggl::TogglApi;
use crate::toggl::types::{TimeEntry, TogglReportFilters};
use itertools::Itertools;
mod toggl;
mod sensitive;
@ -72,7 +73,7 @@ impl Worker {
pub async fn fetch_changed_since(&mut self, look_back: TimeDelta) -> Result<(), AppError> {
if look_back > TimeDelta::days(90) {
return Err(AppError::LookBackTooLarge)
return Err(AppError::LookBackTooLarge);
}
self.update_time_entries(Utc::now() - look_back).await
@ -287,36 +288,41 @@ impl Worker {
async fn update_clients(&mut self) -> Result<(), AppError> {
let clients = self.toggl_api.get_clients().await?;
for client in clients {
sqlx::query!(
r#"
INSERT INTO tracking_clients (id, updated_at, archived, creator_id, integration_provider, notes, name, server_deleted_at, workspace_id, permissions)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
ON CONFLICT (id) DO UPDATE SET
updated_at = excluded.updated_at,
archived = excluded.archived,
creator_id = excluded.creator_id,
integration_provider = excluded.integration_provider,
notes = excluded.notes,
name = excluded.name,
server_deleted_at = excluded.server_deleted_at,
workspace_id = excluded.workspace_id,
permissions = excluded.permissions
"#,
client.id,
client.updated_at,
client.archived,
client.creator_id,
client.integration_provider,
client.notes,
client.name,
client.server_deleted_at,
client.workspace_id,
client.permissions,
)
.execute(&mut self.db)
.await?;
}
let mut query_builder: QueryBuilder<Postgres> = QueryBuilder::new(
// Note the trailing space
"INSERT INTO tracking_clients (id, updated_at, archived, creator_id, integration_provider, notes, name, server_deleted_at, workspace_id, permissions) "
);
query_builder.push_values(clients.into_iter(), |mut b, client| {
b.push_bind(client.id)
.push_bind(client.updated_at)
.push_bind(client.archived)
.push_bind(client.creator_id)
.push_bind(client.integration_provider)
.push_bind(client.notes)
.push_bind(client.name)
.push_bind(client.server_deleted_at)
.push_bind(client.workspace_id)
.push_bind(client.permissions);
});
query_builder.push(r#"
ON CONFLICT (id) DO UPDATE SET
updated_at = excluded.updated_at,
archived = excluded.archived,
creator_id = excluded.creator_id,
integration_provider = excluded.integration_provider,
notes = excluded.notes,
name = excluded.name,
server_deleted_at = excluded.server_deleted_at,
workspace_id = excluded.workspace_id,
permissions = excluded.permissions
"#);
let query = query_builder.build();
query.execute(&mut self.db)
.await?;
Ok(())
}
@ -343,18 +349,22 @@ async fn main() {
toggl_api: api,
};
let start = NaiveDate::from_ymd_opt(2024, 2, 1)
.expect("Invalid date")
.and_hms_opt(0, 0, 0)
.expect("Invalid time")
.and_utc();
worker.update_clients()
.await
.unwrap();
let end = NaiveDate::from_ymd_opt(2024, 5, 1)
.expect("Invalid date")
.and_hms_opt(0, 0, 0)
.expect("Invalid time")
.and_utc();
worker.fetch_within(start, end).await
.expect("Failed to fetch time entries");
// let start = NaiveDate::from_ymd_opt(2024, 2, 1)
// .expect("Invalid date")
// .and_hms_opt(0, 0, 0)
// .expect("Invalid time")
// .and_utc();
//
// let end = NaiveDate::from_ymd_opt(2024, 5, 1)
// .expect("Invalid date")
// .and_hms_opt(0, 0, 0)
// .expect("Invalid time")
// .and_utc();
//
// worker.fetch_within(start, end).await
// .expect("Failed to fetch time entries");
}

View File

@ -47,7 +47,7 @@ const BASE_URL: &str = "https://api.track.toggl.com/api/v9";
const REPORTS_BASE_URL: &str = "https://api.track.toggl.com/reports/api/v3";
impl TogglApi {
pub fn new(api_key: &str, workspace_id: u32) -> Self {
pub fn new(api_key: &str, workspace_id: u64) -> Self {
let rate_limiter = ReqwestRateLimiter::new();
let backoff = ExponentialBackoff::builder()
.retry_bounds(Duration::from_secs(1), Duration::from_secs(60))