Move worker into its own module also
This commit is contained in:
parent
6d64044296
commit
eca0a825dc
360
src/main.rs
360
src/main.rs
@ -1,27 +1,11 @@
|
|||||||
use crate::toggl::types::{Project, Tag, TimeEntry, TogglReportFilters, TrackingClient};
|
use chrono::NaiveDate;
|
||||||
use chrono::{DateTime, NaiveDate, TimeDelta, Utc};
|
|
||||||
use itertools::Itertools;
|
|
||||||
use soa_rs::Soa;
|
|
||||||
use sqlx::{Connection, PgConnection};
|
use sqlx::{Connection, PgConnection};
|
||||||
use toggl::TogglApi;
|
use toggl::TogglApi;
|
||||||
|
use worker::Worker;
|
||||||
|
|
||||||
mod sensitive;
|
mod sensitive;
|
||||||
mod toggl;
|
mod toggl;
|
||||||
|
mod worker;
|
||||||
macro_rules! cast_slice {
|
|
||||||
($slice: expr, $typ: ty) => {
|
|
||||||
&($slice.iter().map(|id| *id as $typ).collect_vec())[..]
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
macro_rules! cast_slice_opts {
|
|
||||||
($slice: expr, $typ: ty) => {
|
|
||||||
$slice
|
|
||||||
.iter()
|
|
||||||
.map(|opt| opt.map(|id| id as $typ))
|
|
||||||
.collect_vec()
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, thiserror::Error)]
|
#[derive(Debug, thiserror::Error)]
|
||||||
enum AppError {
|
enum AppError {
|
||||||
@ -35,11 +19,6 @@ enum AppError {
|
|||||||
LookBackTooLarge,
|
LookBackTooLarge,
|
||||||
}
|
}
|
||||||
|
|
||||||
struct Worker {
|
|
||||||
db: PgConnection,
|
|
||||||
toggl_api: TogglApi,
|
|
||||||
}
|
|
||||||
|
|
||||||
struct TableSummary {
|
struct TableSummary {
|
||||||
client_ids: Vec<u64>,
|
client_ids: Vec<u64>,
|
||||||
workspace_ids: Vec<u64>,
|
workspace_ids: Vec<u64>,
|
||||||
@ -47,328 +26,6 @@ struct TableSummary {
|
|||||||
tag_ids: Vec<u64>,
|
tag_ids: Vec<u64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Worker {
|
|
||||||
async fn get_ids(&mut self) -> Result<TableSummary, AppError> {
|
|
||||||
let client_ids = sqlx::query!("select id from tracking_clients")
|
|
||||||
.fetch_all(&mut self.db)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
let workspace_ids = sqlx::query!("select id from workspaces")
|
|
||||||
.fetch_all(&mut self.db)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
let project_ids = sqlx::query!("select id from projects")
|
|
||||||
.fetch_all(&mut self.db)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
let tag_ids = sqlx::query!("select id from tags")
|
|
||||||
.fetch_all(&mut self.db)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(TableSummary {
|
|
||||||
client_ids: client_ids.iter().map(|row| row.id as u64).collect(),
|
|
||||||
workspace_ids: workspace_ids.iter().map(|row| row.id as u64).collect(),
|
|
||||||
project_ids: project_ids.iter().map(|row| row.id as u64).collect(),
|
|
||||||
tag_ids: tag_ids.iter().map(|row| row.id as u64).collect(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn fetch_within(
|
|
||||||
&mut self,
|
|
||||||
start: NaiveDate,
|
|
||||||
end: NaiveDate,
|
|
||||||
) -> Result<(), AppError> {
|
|
||||||
let results = self
|
|
||||||
.toggl_api
|
|
||||||
.search(
|
|
||||||
self.toggl_api.workspace_id,
|
|
||||||
TogglReportFilters {
|
|
||||||
start_date: Some(start),
|
|
||||||
end_date: Some(end),
|
|
||||||
..Default::default()
|
|
||||||
},
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
let time_entries = results
|
|
||||||
.into_iter()
|
|
||||||
.map(|entry| entry.into_time_entry(self.toggl_api.workspace_id))
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
|
|
||||||
self.update_database(time_entries).await
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn fetch_changed_since(&mut self, look_back: TimeDelta) -> Result<(), AppError> {
|
|
||||||
if look_back > TimeDelta::days(90) {
|
|
||||||
return Err(AppError::LookBackTooLarge);
|
|
||||||
}
|
|
||||||
|
|
||||||
self.update_time_entries(Utc::now() - look_back).await
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn update(&mut self, default_look_back: TimeDelta) -> Result<(), AppError> {
|
|
||||||
let result = sqlx::query!("select max(updated_at) as last_updated_at from time_entries")
|
|
||||||
.fetch_optional(&mut self.db)
|
|
||||||
.await
|
|
||||||
.expect("Could not fetch max updated_at from time_entries");
|
|
||||||
|
|
||||||
let fetch_since = result
|
|
||||||
.and_then(|record| record.last_updated_at)
|
|
||||||
.unwrap_or_else(|| Utc::now() - default_look_back);
|
|
||||||
|
|
||||||
self.update_time_entries(fetch_since).await
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn update_time_entries(&mut self, fetch_since: DateTime<Utc>) -> Result<(), AppError> {
|
|
||||||
let time_entries = self
|
|
||||||
.toggl_api
|
|
||||||
.get_time_entries_for_user_modified_since(fetch_since)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
self.update_database(time_entries).await
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn update_database(&mut self, time_entries: Vec<TimeEntry>) -> Result<(), AppError> {
|
|
||||||
let existing_ids = self.get_ids().await?;
|
|
||||||
|
|
||||||
let fetch_workspaces = time_entries
|
|
||||||
.iter()
|
|
||||||
.map(|entry| entry.workspace_id)
|
|
||||||
.filter(|workspace_id| !existing_ids.workspace_ids.contains(&workspace_id))
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
|
|
||||||
let fetch_projects = time_entries
|
|
||||||
.iter()
|
|
||||||
.map(|entry| entry.project_id)
|
|
||||||
.filter_map(|project_id| project_id)
|
|
||||||
.any(|project_id| !existing_ids.project_ids.contains(&project_id));
|
|
||||||
|
|
||||||
let fetch_tags = time_entries
|
|
||||||
.iter()
|
|
||||||
.flat_map(|entry| entry.tag_ids.iter())
|
|
||||||
.any(|tag| !existing_ids.tag_ids.contains(&tag));
|
|
||||||
|
|
||||||
if !fetch_workspaces.is_empty() {
|
|
||||||
self.update_workspaces(&fetch_workspaces).await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
if fetch_projects {
|
|
||||||
self.update_projects(&existing_ids).await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
if fetch_tags {
|
|
||||||
self.update_tags().await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
for time_entries_chunk in time_entries.chunks(100) {
|
|
||||||
self.update_time_entries_chunk(time_entries_chunk).await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn update_time_entries_chunk(
|
|
||||||
&mut self,
|
|
||||||
time_entries: &[TimeEntry],
|
|
||||||
) -> Result<(), AppError> {
|
|
||||||
let time_entries = Soa::from(time_entries);
|
|
||||||
|
|
||||||
sqlx::query!(
|
|
||||||
r#"
|
|
||||||
INSERT INTO time_entries (id, workspace_id, user_id, project_id, task_id, start, stop, duration, updated_at, description, billable, server_deleted_at, permissions)
|
|
||||||
SELECT * FROM UNNEST($1::bigint[], $2::bigint[], $3::bigint[], $4::bigint[], $5::bigint[], $6::timestamptz[], $7::timestamptz[], $8::int[], $9::timestamptz[], $10::text[], $11::bool[], $12::timestamptz[], $13::text[])
|
|
||||||
ON CONFLICT (id) DO UPDATE SET
|
|
||||||
workspace_id = excluded.workspace_id,
|
|
||||||
user_id = excluded.user_id,
|
|
||||||
project_id = excluded.project_id,
|
|
||||||
task_id = excluded.task_id,
|
|
||||||
start = excluded.start,
|
|
||||||
stop = excluded.stop,
|
|
||||||
duration = excluded.duration,
|
|
||||||
updated_at = excluded.updated_at,
|
|
||||||
description = excluded.description,
|
|
||||||
billable = excluded.billable,
|
|
||||||
server_deleted_at = excluded.server_deleted_at,
|
|
||||||
permissions = excluded.permissions
|
|
||||||
"#,
|
|
||||||
cast_slice!(time_entries.id(), i64),
|
|
||||||
cast_slice!(time_entries.workspace_id(), i64),
|
|
||||||
cast_slice!(time_entries.user_id(), i64),
|
|
||||||
cast_slice_opts!(time_entries.project_id(), i64) as _,
|
|
||||||
cast_slice_opts!(time_entries.task_id(), i64) as _,
|
|
||||||
time_entries.start(),
|
|
||||||
time_entries.stop() as _,
|
|
||||||
cast_slice_opts!(time_entries.duration(), i32) as _,
|
|
||||||
time_entries.updated_at(),
|
|
||||||
time_entries.description() as _,
|
|
||||||
time_entries.billable(),
|
|
||||||
time_entries.server_deleted_at() as _,
|
|
||||||
time_entries.permissions() as _,
|
|
||||||
)
|
|
||||||
.execute(&mut self.db)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn update_workspaces(&mut self, workspace_ids: &[u64]) -> Result<(), AppError> {
|
|
||||||
let workspaces = workspace_ids
|
|
||||||
.iter()
|
|
||||||
.map(|id| self.toggl_api.get_workspace(*id));
|
|
||||||
|
|
||||||
let workspaces = futures::future::join_all(workspaces)
|
|
||||||
.await
|
|
||||||
.into_iter()
|
|
||||||
.collect::<Result<Vec<_>, _>>()?;
|
|
||||||
|
|
||||||
let workspaces = Soa::from(workspaces.as_slice());
|
|
||||||
|
|
||||||
sqlx::query!(
|
|
||||||
r#"
|
|
||||||
INSERT INTO workspaces (id, organization_id, name)
|
|
||||||
SELECT * FROM UNNEST($1::bigint[], $2::bigint[], $3::text[])
|
|
||||||
ON CONFLICT (id) DO UPDATE SET
|
|
||||||
organization_id = excluded.organization_id,
|
|
||||||
name = excluded.name
|
|
||||||
"#,
|
|
||||||
cast_slice!(workspaces.id(), i64),
|
|
||||||
cast_slice!(workspaces.organization_id(), i64),
|
|
||||||
workspaces.name(),
|
|
||||||
)
|
|
||||||
.execute(&mut self.db)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn update_projects(&mut self, existing_ids: &TableSummary) -> Result<(), AppError> {
|
|
||||||
let projects = self.toggl_api.get_projects().await?;
|
|
||||||
|
|
||||||
let fetch_clients = projects
|
|
||||||
.iter()
|
|
||||||
.map(|project| project.client_id)
|
|
||||||
.filter_map(|client_id| client_id)
|
|
||||||
.any(|client_id| !existing_ids.client_ids.contains(&(client_id as u64)));
|
|
||||||
|
|
||||||
if fetch_clients {
|
|
||||||
self.update_clients().await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
let projects: Soa<Project> = Soa::from(projects.as_slice());
|
|
||||||
|
|
||||||
sqlx::query!(
|
|
||||||
r#"
|
|
||||||
INSERT INTO projects (id, workspace_id, client_id, name, color, status, active, updated_at, start_date, created_at, server_deleted_at, actual_hours, actual_seconds, can_track_time, permissions)
|
|
||||||
SELECT * FROM UNNEST($1::bigint[], $2::bigint[], $3::bigint[], $4::text[], $5::text[], $6::text[], $7::bool[], $8::timestamptz[], $9::date[], $10::timestamptz[], $11::timestamptz[], $12::int[], $13::int[], $14::bool[], $15::text[])
|
|
||||||
ON CONFLICT (id) DO UPDATE SET
|
|
||||||
workspace_id = excluded.workspace_id,
|
|
||||||
client_id = excluded.client_id,
|
|
||||||
name = excluded.name,
|
|
||||||
color = excluded.color,
|
|
||||||
status = excluded.status,
|
|
||||||
active = excluded.active,
|
|
||||||
updated_at = excluded.updated_at,
|
|
||||||
start_date = excluded.start_date,
|
|
||||||
created_at = excluded.created_at,
|
|
||||||
server_deleted_at = excluded.server_deleted_at,
|
|
||||||
actual_hours = excluded.actual_hours,
|
|
||||||
actual_seconds = excluded.actual_seconds,
|
|
||||||
can_track_time = excluded.can_track_time,
|
|
||||||
permissions = excluded.permissions
|
|
||||||
"#,
|
|
||||||
cast_slice!(projects.id(), i64),
|
|
||||||
cast_slice!(projects.workspace_id(), i64),
|
|
||||||
cast_slice_opts!(projects.client_id(), i64) as _,
|
|
||||||
projects.name(),
|
|
||||||
projects.color(),
|
|
||||||
&projects.status().iter().map(|s| s.to_string()).collect::<Vec<_>>()[..],
|
|
||||||
projects.active(),
|
|
||||||
projects.updated_at(),
|
|
||||||
projects.start_date() as _,
|
|
||||||
projects.created_at(),
|
|
||||||
projects.server_deleted_at() as _,
|
|
||||||
cast_slice_opts!(projects.actual_hours(), i64) as _,
|
|
||||||
cast_slice_opts!(projects.actual_seconds(), i64) as _,
|
|
||||||
projects.can_track_time(),
|
|
||||||
projects.permissions() as _,
|
|
||||||
)
|
|
||||||
.execute(&mut self.db)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn update_tags(&mut self) -> Result<(), AppError> {
|
|
||||||
let tags = self.toggl_api.get_tags().await?;
|
|
||||||
let tags: Soa<Tag> = Soa::from(tags.as_slice());
|
|
||||||
|
|
||||||
sqlx::query!(
|
|
||||||
r#"
|
|
||||||
INSERT INTO tags (id, name, workspace_id, creator_id, updated_at, deleted_at, permissions)
|
|
||||||
SELECT * FROM UNNEST($1::bigint[], $2::text[], $3::bigint[], $4::bigint[], $5::timestamptz[], $6::timestamptz[], $7::text[])
|
|
||||||
ON CONFLICT (id) DO UPDATE SET
|
|
||||||
name = excluded.name,
|
|
||||||
workspace_id = excluded.workspace_id,
|
|
||||||
creator_id = excluded.creator_id,
|
|
||||||
updated_at = excluded.updated_at,
|
|
||||||
deleted_at = excluded.deleted_at,
|
|
||||||
permissions = excluded.permissions
|
|
||||||
"#,
|
|
||||||
cast_slice!(tags.id(), i64),
|
|
||||||
tags.name(),
|
|
||||||
cast_slice!(tags.workspace_id(), i64),
|
|
||||||
cast_slice!(tags.creator_id(), i64),
|
|
||||||
tags.updated_at(),
|
|
||||||
// Nullable fields fail to type check with UNNEST batch inserts so we silence the
|
|
||||||
// errors using ` as _`.
|
|
||||||
tags.deleted_at() as _,
|
|
||||||
tags.permissions() as _,
|
|
||||||
)
|
|
||||||
.execute(&mut self.db)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn update_clients(&mut self) -> Result<(), AppError> {
|
|
||||||
let clients = self.toggl_api.get_clients().await?;
|
|
||||||
|
|
||||||
let clients: Soa<TrackingClient> = Soa::from(clients.as_slice());
|
|
||||||
|
|
||||||
sqlx::query!(r#"
|
|
||||||
INSERT INTO tracking_clients
|
|
||||||
(id, updated_at, archived, creator_id, integration_provider, notes, name, server_deleted_at, workspace_id, permissions)
|
|
||||||
SELECT * FROM UNNEST($1::bigint[], $2::timestamptz[], $3::bool[], $4::bigint[], $5::text[], $6::text[], $7::text[], $8::timestamptz[], $9::bigint[], $10::text[])
|
|
||||||
ON CONFLICT (id) DO UPDATE SET
|
|
||||||
updated_at = excluded.updated_at,
|
|
||||||
archived = excluded.archived,
|
|
||||||
creator_id = excluded.creator_id,
|
|
||||||
integration_provider = excluded.integration_provider,
|
|
||||||
notes = excluded.notes,
|
|
||||||
name = excluded.name,
|
|
||||||
server_deleted_at = excluded.server_deleted_at,
|
|
||||||
workspace_id = excluded.workspace_id,
|
|
||||||
permissions = excluded.permissions
|
|
||||||
"#,
|
|
||||||
cast_slice!(clients.id(), i64),
|
|
||||||
clients.updated_at(),
|
|
||||||
clients.archived(),
|
|
||||||
cast_slice!(clients.creator_id(), i64),
|
|
||||||
// For the next two, we are assuming these are Option<String> as datatype. If these are different, please update accordingly.
|
|
||||||
clients.integration_provider() as _,
|
|
||||||
clients.notes() as _,
|
|
||||||
clients.name(),
|
|
||||||
clients.server_deleted_at() as _,
|
|
||||||
cast_slice!(clients.workspace_id(), i64),
|
|
||||||
clients.permissions() as _
|
|
||||||
)
|
|
||||||
.execute(&mut self.db)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() {
|
async fn main() {
|
||||||
dotenv::dotenv().expect("Failed to load .env file");
|
dotenv::dotenv().expect("Failed to load .env file");
|
||||||
@ -387,18 +44,13 @@ async fn main() {
|
|||||||
.await
|
.await
|
||||||
.expect("Failed to run migrations");
|
.expect("Failed to run migrations");
|
||||||
|
|
||||||
let mut worker = Worker {
|
let mut worker = Worker { db, toggl_api: api };
|
||||||
db,
|
|
||||||
toggl_api: api,
|
|
||||||
};
|
|
||||||
|
|
||||||
worker.update_tags().await.unwrap();
|
worker.update_tags().await.unwrap();
|
||||||
|
|
||||||
let start = NaiveDate::from_ymd_opt(2024, 2, 1)
|
let start = NaiveDate::from_ymd_opt(2024, 2, 1).expect("Invalid date");
|
||||||
.expect("Invalid date");
|
|
||||||
|
|
||||||
let end = NaiveDate::from_ymd_opt(2024, 2, 2)
|
let end = NaiveDate::from_ymd_opt(2024, 2, 2).expect("Invalid date");
|
||||||
.expect("Invalid date");
|
|
||||||
|
|
||||||
worker
|
worker
|
||||||
.fetch_within(start, end)
|
.fetch_within(start, end)
|
||||||
|
|||||||
@ -10,8 +10,8 @@ use serde::de::DeserializeOwned;
|
|||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use support::ReqwestRateLimiter;
|
use support::ReqwestRateLimiter;
|
||||||
|
|
||||||
pub mod types;
|
|
||||||
mod support;
|
mod support;
|
||||||
|
pub mod types;
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct TogglApi {
|
pub struct TogglApi {
|
||||||
|
|||||||
@ -1,7 +1,7 @@
|
|||||||
use axum::async_trait;
|
use axum::async_trait;
|
||||||
use std::num::NonZero;
|
|
||||||
use governor::state::{InMemoryState, NotKeyed};
|
|
||||||
use governor::clock::DefaultClock;
|
use governor::clock::DefaultClock;
|
||||||
|
use governor::state::{InMemoryState, NotKeyed};
|
||||||
|
use std::num::NonZero;
|
||||||
|
|
||||||
pub struct ReqwestRateLimiter {
|
pub struct ReqwestRateLimiter {
|
||||||
rate_limiter: governor::RateLimiter<NotKeyed, InMemoryState, DefaultClock>,
|
rate_limiter: governor::RateLimiter<NotKeyed, InMemoryState, DefaultClock>,
|
||||||
|
|||||||
345
src/worker.rs
Normal file
345
src/worker.rs
Normal file
@ -0,0 +1,345 @@
|
|||||||
|
use crate::toggl::types::{Project, Tag, TimeEntry, TogglReportFilters, TrackingClient};
|
||||||
|
use crate::toggl::TogglApi;
|
||||||
|
use crate::{AppError, TableSummary};
|
||||||
|
use chrono::{DateTime, NaiveDate, TimeDelta, Utc};
|
||||||
|
use itertools::Itertools;
|
||||||
|
use soa_rs::Soa;
|
||||||
|
use sqlx::PgConnection;
|
||||||
|
|
||||||
|
macro_rules! cast_slice {
|
||||||
|
($slice: expr, $typ: ty) => {
|
||||||
|
&($slice.iter().map(|id| *id as $typ).collect_vec())[..]
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
macro_rules! cast_slice_opts {
|
||||||
|
($slice: expr, $typ: ty) => {
|
||||||
|
$slice
|
||||||
|
.iter()
|
||||||
|
.map(|opt| opt.map(|id| id as $typ))
|
||||||
|
.collect_vec()
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Worker {
|
||||||
|
pub(crate) db: PgConnection,
|
||||||
|
pub(crate) toggl_api: TogglApi,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Worker {
|
||||||
|
async fn get_ids(&mut self) -> Result<TableSummary, AppError> {
|
||||||
|
let client_ids = sqlx::query!("select id from tracking_clients")
|
||||||
|
.fetch_all(&mut self.db)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let workspace_ids = sqlx::query!("select id from workspaces")
|
||||||
|
.fetch_all(&mut self.db)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let project_ids = sqlx::query!("select id from projects")
|
||||||
|
.fetch_all(&mut self.db)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let tag_ids = sqlx::query!("select id from tags")
|
||||||
|
.fetch_all(&mut self.db)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(TableSummary {
|
||||||
|
client_ids: client_ids.iter().map(|row| row.id as u64).collect(),
|
||||||
|
workspace_ids: workspace_ids.iter().map(|row| row.id as u64).collect(),
|
||||||
|
project_ids: project_ids.iter().map(|row| row.id as u64).collect(),
|
||||||
|
tag_ids: tag_ids.iter().map(|row| row.id as u64).collect(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn fetch_within(&mut self, start: NaiveDate, end: NaiveDate) -> Result<(), AppError> {
|
||||||
|
let results = self
|
||||||
|
.toggl_api
|
||||||
|
.search(
|
||||||
|
self.toggl_api.workspace_id,
|
||||||
|
TogglReportFilters {
|
||||||
|
start_date: Some(start),
|
||||||
|
end_date: Some(end),
|
||||||
|
..Default::default()
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let time_entries = results
|
||||||
|
.into_iter()
|
||||||
|
.map(|entry| entry.into_time_entry(self.toggl_api.workspace_id))
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
self.update_database(time_entries).await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn fetch_changed_since(&mut self, look_back: TimeDelta) -> Result<(), AppError> {
|
||||||
|
if look_back > TimeDelta::days(90) {
|
||||||
|
return Err(AppError::LookBackTooLarge);
|
||||||
|
}
|
||||||
|
|
||||||
|
self.update_time_entries(Utc::now() - look_back).await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn update(&mut self, default_look_back: TimeDelta) -> Result<(), AppError> {
|
||||||
|
let result = sqlx::query!("select max(updated_at) as last_updated_at from time_entries")
|
||||||
|
.fetch_optional(&mut self.db)
|
||||||
|
.await
|
||||||
|
.expect("Could not fetch max updated_at from time_entries");
|
||||||
|
|
||||||
|
let fetch_since = result
|
||||||
|
.and_then(|record| record.last_updated_at)
|
||||||
|
.unwrap_or_else(|| Utc::now() - default_look_back);
|
||||||
|
|
||||||
|
self.update_time_entries(fetch_since).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn update_time_entries(&mut self, fetch_since: DateTime<Utc>) -> Result<(), AppError> {
|
||||||
|
let time_entries = self
|
||||||
|
.toggl_api
|
||||||
|
.get_time_entries_for_user_modified_since(fetch_since)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
self.update_database(time_entries).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn update_database(&mut self, time_entries: Vec<TimeEntry>) -> Result<(), AppError> {
|
||||||
|
let existing_ids = self.get_ids().await?;
|
||||||
|
|
||||||
|
let fetch_workspaces = time_entries
|
||||||
|
.iter()
|
||||||
|
.map(|entry| entry.workspace_id)
|
||||||
|
.filter(|workspace_id| !existing_ids.workspace_ids.contains(&workspace_id))
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
let fetch_projects = time_entries
|
||||||
|
.iter()
|
||||||
|
.map(|entry| entry.project_id)
|
||||||
|
.filter_map(|project_id| project_id)
|
||||||
|
.any(|project_id| !existing_ids.project_ids.contains(&project_id));
|
||||||
|
|
||||||
|
let fetch_tags = time_entries
|
||||||
|
.iter()
|
||||||
|
.flat_map(|entry| entry.tag_ids.iter())
|
||||||
|
.any(|tag| !existing_ids.tag_ids.contains(&tag));
|
||||||
|
|
||||||
|
if !fetch_workspaces.is_empty() {
|
||||||
|
self.update_workspaces(&fetch_workspaces).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
if fetch_projects {
|
||||||
|
self.update_projects(&existing_ids).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
if fetch_tags {
|
||||||
|
self.update_tags().await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
for time_entries_chunk in time_entries.chunks(100) {
|
||||||
|
self.update_time_entries_chunk(time_entries_chunk).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn update_time_entries_chunk(
|
||||||
|
&mut self,
|
||||||
|
time_entries: &[TimeEntry],
|
||||||
|
) -> Result<(), AppError> {
|
||||||
|
let time_entries = Soa::from(time_entries);
|
||||||
|
|
||||||
|
sqlx::query!(
|
||||||
|
r#"
|
||||||
|
INSERT INTO time_entries (id, workspace_id, user_id, project_id, task_id, start, stop, duration, updated_at, description, billable, server_deleted_at, permissions)
|
||||||
|
SELECT * FROM UNNEST($1::bigint[], $2::bigint[], $3::bigint[], $4::bigint[], $5::bigint[], $6::timestamptz[], $7::timestamptz[], $8::int[], $9::timestamptz[], $10::text[], $11::bool[], $12::timestamptz[], $13::text[])
|
||||||
|
ON CONFLICT (id) DO UPDATE SET
|
||||||
|
workspace_id = excluded.workspace_id,
|
||||||
|
user_id = excluded.user_id,
|
||||||
|
project_id = excluded.project_id,
|
||||||
|
task_id = excluded.task_id,
|
||||||
|
start = excluded.start,
|
||||||
|
stop = excluded.stop,
|
||||||
|
duration = excluded.duration,
|
||||||
|
updated_at = excluded.updated_at,
|
||||||
|
description = excluded.description,
|
||||||
|
billable = excluded.billable,
|
||||||
|
server_deleted_at = excluded.server_deleted_at,
|
||||||
|
permissions = excluded.permissions
|
||||||
|
"#,
|
||||||
|
cast_slice!(time_entries.id(), i64),
|
||||||
|
cast_slice!(time_entries.workspace_id(), i64),
|
||||||
|
cast_slice!(time_entries.user_id(), i64),
|
||||||
|
cast_slice_opts!(time_entries.project_id(), i64) as _,
|
||||||
|
cast_slice_opts!(time_entries.task_id(), i64) as _,
|
||||||
|
time_entries.start(),
|
||||||
|
time_entries.stop() as _,
|
||||||
|
cast_slice_opts!(time_entries.duration(), i32) as _,
|
||||||
|
time_entries.updated_at(),
|
||||||
|
time_entries.description() as _,
|
||||||
|
time_entries.billable(),
|
||||||
|
time_entries.server_deleted_at() as _,
|
||||||
|
time_entries.permissions() as _,
|
||||||
|
)
|
||||||
|
.execute(&mut self.db)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn update_workspaces(&mut self, workspace_ids: &[u64]) -> Result<(), AppError> {
|
||||||
|
let workspaces = workspace_ids
|
||||||
|
.iter()
|
||||||
|
.map(|id| self.toggl_api.get_workspace(*id));
|
||||||
|
|
||||||
|
let workspaces = futures::future::join_all(workspaces)
|
||||||
|
.await
|
||||||
|
.into_iter()
|
||||||
|
.collect::<Result<Vec<_>, _>>()?;
|
||||||
|
|
||||||
|
let workspaces = Soa::from(workspaces.as_slice());
|
||||||
|
|
||||||
|
sqlx::query!(
|
||||||
|
r#"
|
||||||
|
INSERT INTO workspaces (id, organization_id, name)
|
||||||
|
SELECT * FROM UNNEST($1::bigint[], $2::bigint[], $3::text[])
|
||||||
|
ON CONFLICT (id) DO UPDATE SET
|
||||||
|
organization_id = excluded.organization_id,
|
||||||
|
name = excluded.name
|
||||||
|
"#,
|
||||||
|
cast_slice!(workspaces.id(), i64),
|
||||||
|
cast_slice!(workspaces.organization_id(), i64),
|
||||||
|
workspaces.name(),
|
||||||
|
)
|
||||||
|
.execute(&mut self.db)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn update_projects(&mut self, existing_ids: &TableSummary) -> Result<(), AppError> {
|
||||||
|
let projects = self.toggl_api.get_projects().await?;
|
||||||
|
|
||||||
|
let fetch_clients = projects
|
||||||
|
.iter()
|
||||||
|
.map(|project| project.client_id)
|
||||||
|
.filter_map(|client_id| client_id)
|
||||||
|
.any(|client_id| !existing_ids.client_ids.contains(&(client_id as u64)));
|
||||||
|
|
||||||
|
if fetch_clients {
|
||||||
|
self.update_clients().await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
let projects: Soa<Project> = Soa::from(projects.as_slice());
|
||||||
|
|
||||||
|
sqlx::query!(
|
||||||
|
r#"
|
||||||
|
INSERT INTO projects (id, workspace_id, client_id, name, color, status, active, updated_at, start_date, created_at, server_deleted_at, actual_hours, actual_seconds, can_track_time, permissions)
|
||||||
|
SELECT * FROM UNNEST($1::bigint[], $2::bigint[], $3::bigint[], $4::text[], $5::text[], $6::text[], $7::bool[], $8::timestamptz[], $9::date[], $10::timestamptz[], $11::timestamptz[], $12::int[], $13::int[], $14::bool[], $15::text[])
|
||||||
|
ON CONFLICT (id) DO UPDATE SET
|
||||||
|
workspace_id = excluded.workspace_id,
|
||||||
|
client_id = excluded.client_id,
|
||||||
|
name = excluded.name,
|
||||||
|
color = excluded.color,
|
||||||
|
status = excluded.status,
|
||||||
|
active = excluded.active,
|
||||||
|
updated_at = excluded.updated_at,
|
||||||
|
start_date = excluded.start_date,
|
||||||
|
created_at = excluded.created_at,
|
||||||
|
server_deleted_at = excluded.server_deleted_at,
|
||||||
|
actual_hours = excluded.actual_hours,
|
||||||
|
actual_seconds = excluded.actual_seconds,
|
||||||
|
can_track_time = excluded.can_track_time,
|
||||||
|
permissions = excluded.permissions
|
||||||
|
"#,
|
||||||
|
cast_slice!(projects.id(), i64),
|
||||||
|
cast_slice!(projects.workspace_id(), i64),
|
||||||
|
cast_slice_opts!(projects.client_id(), i64) as _,
|
||||||
|
projects.name(),
|
||||||
|
projects.color(),
|
||||||
|
&projects.status().iter().map(|s| s.to_string()).collect::<Vec<_>>()[..],
|
||||||
|
projects.active(),
|
||||||
|
projects.updated_at(),
|
||||||
|
projects.start_date() as _,
|
||||||
|
projects.created_at(),
|
||||||
|
projects.server_deleted_at() as _,
|
||||||
|
cast_slice_opts!(projects.actual_hours(), i64) as _,
|
||||||
|
cast_slice_opts!(projects.actual_seconds(), i64) as _,
|
||||||
|
projects.can_track_time(),
|
||||||
|
projects.permissions() as _,
|
||||||
|
)
|
||||||
|
.execute(&mut self.db)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn update_tags(&mut self) -> Result<(), AppError> {
|
||||||
|
let tags = self.toggl_api.get_tags().await?;
|
||||||
|
let tags: Soa<Tag> = Soa::from(tags.as_slice());
|
||||||
|
|
||||||
|
sqlx::query!(
|
||||||
|
r#"
|
||||||
|
INSERT INTO tags (id, name, workspace_id, creator_id, updated_at, deleted_at, permissions)
|
||||||
|
SELECT * FROM UNNEST($1::bigint[], $2::text[], $3::bigint[], $4::bigint[], $5::timestamptz[], $6::timestamptz[], $7::text[])
|
||||||
|
ON CONFLICT (id) DO UPDATE SET
|
||||||
|
name = excluded.name,
|
||||||
|
workspace_id = excluded.workspace_id,
|
||||||
|
creator_id = excluded.creator_id,
|
||||||
|
updated_at = excluded.updated_at,
|
||||||
|
deleted_at = excluded.deleted_at,
|
||||||
|
permissions = excluded.permissions
|
||||||
|
"#,
|
||||||
|
cast_slice!(tags.id(), i64),
|
||||||
|
tags.name(),
|
||||||
|
cast_slice!(tags.workspace_id(), i64),
|
||||||
|
cast_slice!(tags.creator_id(), i64),
|
||||||
|
tags.updated_at(),
|
||||||
|
// Nullable fields fail to type check with UNNEST batch inserts so we silence the
|
||||||
|
// errors using ` as _`.
|
||||||
|
tags.deleted_at() as _,
|
||||||
|
tags.permissions() as _,
|
||||||
|
)
|
||||||
|
.execute(&mut self.db)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn update_clients(&mut self) -> Result<(), AppError> {
|
||||||
|
let clients = self.toggl_api.get_clients().await?;
|
||||||
|
|
||||||
|
let clients: Soa<TrackingClient> = Soa::from(clients.as_slice());
|
||||||
|
|
||||||
|
sqlx::query!(r#"
|
||||||
|
INSERT INTO tracking_clients
|
||||||
|
(id, updated_at, archived, creator_id, integration_provider, notes, name, server_deleted_at, workspace_id, permissions)
|
||||||
|
SELECT * FROM UNNEST($1::bigint[], $2::timestamptz[], $3::bool[], $4::bigint[], $5::text[], $6::text[], $7::text[], $8::timestamptz[], $9::bigint[], $10::text[])
|
||||||
|
ON CONFLICT (id) DO UPDATE SET
|
||||||
|
updated_at = excluded.updated_at,
|
||||||
|
archived = excluded.archived,
|
||||||
|
creator_id = excluded.creator_id,
|
||||||
|
integration_provider = excluded.integration_provider,
|
||||||
|
notes = excluded.notes,
|
||||||
|
name = excluded.name,
|
||||||
|
server_deleted_at = excluded.server_deleted_at,
|
||||||
|
workspace_id = excluded.workspace_id,
|
||||||
|
permissions = excluded.permissions
|
||||||
|
"#,
|
||||||
|
cast_slice!(clients.id(), i64),
|
||||||
|
clients.updated_at(),
|
||||||
|
clients.archived(),
|
||||||
|
cast_slice!(clients.creator_id(), i64),
|
||||||
|
// For the next two, we are assuming these are Option<String> as datatype. If these are different, please update accordingly.
|
||||||
|
clients.integration_provider() as _,
|
||||||
|
clients.notes() as _,
|
||||||
|
clients.name(),
|
||||||
|
clients.server_deleted_at() as _,
|
||||||
|
cast_slice!(clients.workspace_id(), i64),
|
||||||
|
clients.permissions() as _
|
||||||
|
)
|
||||||
|
.execute(&mut self.db)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue
Block a user