Initial db sync work

This commit is contained in:
Joshua Coles 2024-07-27 17:58:34 +01:00
parent 5969a1b3d8
commit 5887ae4946
4 changed files with 260 additions and 42 deletions

25
Cargo.lock generated
View File

@ -398,6 +398,12 @@ dependencies = [
"subtle",
]
[[package]]
name = "dotenv"
version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f"
[[package]]
name = "dotenvy"
version = "0.15.7"
@ -1943,6 +1949,7 @@ dependencies = [
"atoi",
"byteorder",
"bytes",
"chrono",
"crc",
"crossbeam-queue",
"either",
@ -1966,6 +1973,8 @@ dependencies = [
"smallvec",
"sqlformat",
"thiserror",
"tokio",
"tokio-stream",
"tracing",
"url",
]
@ -2005,6 +2014,7 @@ dependencies = [
"sqlx-sqlite",
"syn 1.0.109",
"tempfile",
"tokio",
"url",
]
@ -2019,6 +2029,7 @@ dependencies = [
"bitflags 2.6.0",
"byteorder",
"bytes",
"chrono",
"crc",
"digest",
"dotenvy",
@ -2060,6 +2071,7 @@ dependencies = [
"base64 0.21.7",
"bitflags 2.6.0",
"byteorder",
"chrono",
"crc",
"dotenvy",
"etcetera",
@ -2095,6 +2107,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b244ef0a8414da0bed4bb1910426e890b19e5e9bccc27ada6b797d05c55ae0aa"
dependencies = [
"atoi",
"chrono",
"flume",
"futures-channel",
"futures-core",
@ -2274,6 +2287,7 @@ dependencies = [
"axum",
"base64 0.22.1",
"chrono",
"dotenv",
"governor",
"reqwest",
"reqwest-middleware",
@ -2340,6 +2354,17 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-stream"
version = "0.1.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "267ac89e0bec6e691e5813911606935d77c476ff49024f98abcea3e7b15e37af"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
]
[[package]]
name = "tokio-util"
version = "0.7.11"

View File

@ -19,4 +19,5 @@ serde_json = "1.0.120"
serde_json_path_to_error = "0.1.4"
url = "2.5.2"
serde_with = "3.9.0"
sqlx = { version = "0.7.4", features = ["postgres"] }
sqlx = { version = "0.7.4", features = ["postgres", "runtime-tokio", "macros", "chrono"] }
dotenv = "0.15.0"

View File

@ -1,21 +1,149 @@
use std::ops::Sub;
use chrono::NaiveDate;
use chrono::{NaiveDate, TimeDelta, Utc};
use sqlx::{Connection, PgConnection};
use toggl::TogglApi;
mod toggl;
mod sensitive;
#[derive(Debug, thiserror::Error)]
enum AppError {
#[error("Database error: {0}")]
SqlxError(#[from] sqlx::Error),
#[error("Toggl error: {0}")]
TogglError(#[from] toggl::TogglError),
}
struct Worker {
db: PgConnection,
toggl_api: TogglApi,
}
struct TableSummary {
client_ids: Vec<u64>,
workspace_ids: Vec<u64>,
project_ids: Vec<u64>,
tag_ids: Vec<u64>,
}
impl Worker {
async fn get_ids(&mut self) -> Result<TableSummary, AppError> {
let client_ids = sqlx::query!("select id from tracking_clients")
.fetch_all(&mut self.db)
.await?;
let workspace_ids = sqlx::query!("select id from workspaces")
.fetch_all(&mut self.db)
.await?;
let project_ids = sqlx::query!("select id from projects")
.fetch_all(&mut self.db)
.await?;
let tag_ids = sqlx::query!("select id from tags")
.fetch_all(&mut self.db)
.await?;
Ok(TableSummary {
client_ids: client_ids.iter().map(|row| row.id as u64).collect(),
workspace_ids: workspace_ids.iter().map(|row| row.id as u64).collect(),
project_ids: project_ids.iter().map(|row| row.id as u64).collect(),
tag_ids: tag_ids.iter().map(|row| row.id as u64).collect(),
})
}
async fn update(&mut self, default_look_back: TimeDelta) -> Result<(), AppError> {
let result = sqlx::query!("select max(updated_at) as last_updated_at from time_entries")
.fetch_one(&mut self.db)
.await
.expect("Could not fetch max updated_at from time_entries");
let existing_ids = self.get_ids().await?;
let fetch_since = result.last_updated_at
.unwrap_or_else(|| Utc::now() - default_look_back);
let time_entries = self.toggl_api
.get_time_entries_for_user_modified_since(fetch_since).await?;
let refetch_projects = time_entries.iter()
.map(|entry| entry.project_id)
.filter_map(|project_id| project_id)
.any(|project_id| !existing_ids.project_ids.contains(&project_id));
let refetch_tags = time_entries.iter()
.flat_map(|entry| entry.tag_ids.iter())
.any(|tag| !existing_ids.tag_ids.contains(&tag));
Ok(())
}
async fn update_projects(&mut self) -> Result<(), AppError> {
let projects = self.toggl_api.get_projects().await?;
for project in projects {
sqlx::query!(
r#"
INSERT INTO projects (id, workspace_id, client_id, name, color, status, active, updated_at, start_date, created_at, server_deleted_at, actual_hours, actual_seconds, can_track_time, permissions)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
ON CONFLICT (id) DO UPDATE SET
workspace_id = excluded.workspace_id,
client_id = excluded.client_id,
name = excluded.name,
color = excluded.color,
status = excluded.status,
active = excluded.active,
updated_at = excluded.updated_at,
start_date = excluded.start_date,
created_at = excluded.created_at,
server_deleted_at = excluded.server_deleted_at,
actual_hours = excluded.actual_hours,
actual_seconds = excluded.actual_seconds,
can_track_time = excluded.can_track_time,
permissions = excluded.permissions
"#,
project.id,
project.workspace_id,
project.client_id,
project.name,
project.color,
project.status.to_string(),
project.active,
project.updated_at,
project.start_date,
project.created_at,
project.server_deleted_at,
project.actual_hours,
project.actual_seconds,
project.can_track_time,
project.permissions,
)
.execute(&mut self.db)
.await?;
}
Ok(())
}
}
#[tokio::main]
async fn main() {
dotenv::dotenv()
.expect("Failed to load .env file");
let api = TogglApi::new(
sensitive::API_TOKEN,
sensitive::WORKSPACE_ID,
);
dbg!(api.search(toggl::types::TogglReportFilters {
start_date: Some(NaiveDate::from_ymd_opt(2024, 07, 10).unwrap()),
end_date: Some(NaiveDate::from_ymd_opt(2024, 07, 16).unwrap()),
enrich_response: Some(true),
..Default::default()
}).await);
let database_url = std::env::var("DATABASE_URL")
.expect("DATABASE_URL must be set");
let mut worker = Worker {
db: PgConnection::connect(&database_url).await.unwrap(),
toggl_api: api,
};
worker.update_projects()
.await
.unwrap();
}

View File

@ -10,7 +10,7 @@ use reqwest::header::{HeaderMap, HeaderValue};
use base64::engine::general_purpose::STANDARD;
use base64::Engine;
use chrono::{DateTime, SecondsFormat, Utc};
use reqwest::{Response, Url};
use reqwest::Response;
use serde::de::DeserializeOwned;
struct ReqwestRateLimiter {
@ -89,6 +89,19 @@ impl TogglApi {
Ok(workspaces)
}
/// Get a specific workspace by its ID
pub async fn get_workspace(&self, id: u64) -> Result<types::Workspace, TogglError> {
let url = format!(
"{base_url}/workspaces/{id}",
base_url = BASE_URL,
id = id
);
Self::parse(self.client.get(&url)
.headers(self.headers.clone())
.send().await?).await
}
/// Fetches all time entries for this user from Toggl that have been modified since the given
/// date.
pub async fn get_time_entries_for_user_modified_since(&self, since: DateTime<Utc>) -> Result<Vec<types::TimeEntry>, TogglError> {
@ -214,7 +227,8 @@ impl TogglApi {
data.into_iter().for_each(|e| results.push(e));
}
Ok(results) }
Ok(results)
}
}
pub mod types {
@ -225,32 +239,32 @@ pub mod types {
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct TimeEntry {
id: u64,
workspace_id: u64,
user_id: u64,
project_id: Option<u64>,
task_id: Option<u64>,
pub id: u64,
pub workspace_id: u64,
pub user_id: u64,
pub project_id: Option<u64>,
pub task_id: Option<u64>,
start: DateTime<Utc>,
stop: Option<DateTime<Utc>>,
pub start: DateTime<Utc>,
pub stop: Option<DateTime<Utc>>,
#[serde(with = "duration_field")]
duration: Option<u32>,
pub duration: Option<u32>,
#[serde(rename = "at")]
updated_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
description: String,
pub description: String,
#[serde(default)]
tags: Vec<String>,
pub tags: Vec<String>,
#[serde(default)]
tag_ids: Vec<u64>,
pub tag_ids: Vec<u64>,
billable: bool,
server_deleted_at: Option<DateTime<Utc>>,
permissions: Option<String>,
pub billable: bool,
pub server_deleted_at: Option<DateTime<Utc>>,
pub permissions: Option<String>,
}
mod duration_field {
@ -281,25 +295,25 @@ pub mod types {
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Project {
id: i64,
workspace_id: i64,
client_id: Option<i64>,
pub id: i64,
pub workspace_id: i64,
pub client_id: Option<i64>,
name: String,
color: String,
status: ProjectStatus,
active: bool,
pub name: String,
pub color: String,
pub status: ProjectStatus,
pub active: bool,
#[serde(rename = "at")]
updated_at: DateTime<Utc>,
start_date: NaiveDate,
created_at: DateTime<Utc>,
server_deleted_at: Option<DateTime<Utc>>,
pub updated_at: DateTime<Utc>,
pub start_date: NaiveDate,
pub created_at: DateTime<Utc>,
pub server_deleted_at: Option<DateTime<Utc>>,
actual_hours: Option<i64>,
actual_seconds: Option<i64>,
can_track_time: bool,
permissions: Option<String>,
pub actual_hours: Option<i64>,
pub actual_seconds: Option<i64>,
pub can_track_time: bool,
pub permissions: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
@ -312,6 +326,18 @@ pub mod types {
Deleted,
}
impl fmt::Display for ProjectStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", match self {
ProjectStatus::Upcoming => "upcoming",
ProjectStatus::Active => "active",
ProjectStatus::Archived => "archived",
ProjectStatus::Ended => "ended",
ProjectStatus::Deleted => "deleted",
})
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct TrackingClient {
/// The unique identifier for the client.
@ -355,7 +381,7 @@ pub mod types {
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ReportEntry {
pub user_id: u32,
pub user_id: u64,
pub username: String,
pub project_id: Option<u64>,
pub task_id: Option<u64>,
@ -368,10 +394,22 @@ pub mod types {
pub time_entries: Vec<ReportEntryTimeDetails>,
pub row_number: u32,
#[serde(flatten)]
pub enriched_information: Option<ReportEntryEnrichedInfo>,
#[serde(flatten)]
pub rest: HashMap<String, serde_json::Value>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ReportEntryEnrichedInfo {
pub project_id: Option<u64>,
pub project_name: Option<String>,
pub project_hex: Option<String>,
pub tag_names: Vec<String>,
}
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct ReportEntryTimeDetails {
pub id: u64,
@ -382,6 +420,31 @@ pub mod types {
pub updated_at: DateTime<Utc>,
}
impl ReportEntry {
fn into_time_entry(self, workspace_id: u64) -> TimeEntry {
TimeEntry {
id: self.time_entries[0].id,
workspace_id,
user_id: self.user_id,
project_id: self.project_id,
task_id: self.task_id,
start: self.time_entries[0].start,
stop: Some(self.time_entries[0].stop),
duration: Some(self.time_entries[0].seconds),
updated_at: self.time_entries[0].updated_at,
description: self.description,
tags: self.enriched_information
.map(|e| e.tag_names.clone())
.unwrap_or_default(),
tag_ids: self.tag_ids,
billable: self.billable,
server_deleted_at: None,
permissions: None,
}
}
}
#[skip_serializing_none]
#[derive(Serialize, Deserialize, Clone, Default)]
pub struct TogglReportFilters {
@ -419,6 +482,7 @@ pub mod types {
}
use std::fmt;
use std::path::Display;
impl fmt::Debug for TogglReportFilters {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {