Compare commits

...

2 Commits

Author SHA1 Message Date
Joshua Coles
3f439c8a31 Add additional validation
Some checks failed
Build and Publish / Build and Test (push) Failing after 10m39s
2025-02-01 19:45:43 +00:00
Joshua Coles
5286c49bd3 Bump dep versions 2025-02-01 19:19:36 +00:00
5 changed files with 119 additions and 25 deletions

View File

@ -4,14 +4,14 @@ version = "0.1.0"
edition = "2021"
[dependencies]
axum = "0.7.5"
axum = "0.8.1"
chrono = { version = "0.4.38", features = ["serde"] }
governor = "0.6.3"
governor = "0.8.0"
reqwest = { version = "0.12.5", default-features = false, features = ["json", "rustls-tls", "http2", "macos-system-configuration", "charset"] }
reqwest-ratelimit = "0.2.0"
reqwest-middleware = { version = "0.3", features = ["json"] }
reqwest-retry = "0.6"
thiserror = "1.0.62"
reqwest-ratelimit = "0.3.0"
reqwest-middleware = { version = "0.4.0", features = ["json"] }
reqwest-retry = "0.7.0"
thiserror = "2.0.11"
tokio = { version = "1.38.0", features = ["full"] }
base64 = "0.22.1"
serde = { version = "1.0.204", features = ["derive"] }
@ -19,11 +19,12 @@ serde_json = "1.0.120"
serde_json_path_to_error = "0.1.4"
url = "2.5.2"
serde_with = "3.9.0"
sqlx = { version = "0.7.4", features = ["postgres", "macros", "chrono", "runtime-tokio-rustls"] }
sqlx = { version = "0.8.3", features = ["postgres", "macros", "chrono", "runtime-tokio-rustls"] }
futures = "0.3.30"
tracing = "0.1.40"
tracing-subscriber = "0.3.18"
itertools = "0.13.0"
soa-rs = "0.6.1"
itertools = "0.14.0"
soa-rs = "0.7.5"
clap = { version = "4.5.11", features = ["derive", "env"] }
dotenvy = "0.15.7"
async-trait = "0.1.86"

View File

@ -1,5 +1,5 @@
use axum::response::IntoResponse;
use sqlx::{Connection, PgPool};
use sqlx::{PgPool};
use toggl::TogglApi;
use worker::Worker;
@ -97,7 +97,7 @@ async fn main() {
let cli = Cli::parse();
let toggl_api = TogglApi::new(&cli.api_token, cli.default_workspace_id);
let mut db = PgPool::connect(&cli.database_url).await.unwrap();
let db = PgPool::connect(&cli.database_url).await.unwrap();
sqlx::migrate!("./migrations")
.run(&db)
@ -117,6 +117,7 @@ async fn main() {
.await
.expect("Failed to start server");
}
Commands::Sync => {
worker
.update(TimeDelta::days(30))

View File

@ -1,14 +1,73 @@
use axum::response::IntoResponse;
use axum::{
http::StatusCode,
routing::{get, post},
Extension, Json, Router,
};
use chrono::TimeDelta;
use std::net::IpAddr;
use crate::worker::Worker;
use crate::AppError;
use axum::response::IntoResponse;
use axum::{
routing::{get, post},
Extension, Router,
};
use chrono::TimeDelta;
use sqlx::postgres::PgListener;
use std::net::IpAddr;
#[derive(Debug, serde::Deserialize)]
struct Payload(Vec<PayloadItem>);
#[derive(Debug, serde::Deserialize)]
enum PayloadItem {
Num(i32),
Str(String),
}
impl PayloadItem {
fn to_i32(self) -> Option<i32> {
match self {
PayloadItem::Num(n) => Some(n),
PayloadItem::Str(s) => s.parse().ok(),
}
}
}
async fn listen_for_changes(pool: sqlx::PgPool, worker: Worker) {
let mut listener = PgListener::connect_with(&pool)
.await
.expect("Failed to connect to database");
// Enable LISTEN on the channel
listener
.listen("toggl_external_changes")
.await
.expect("Failed to LISTEN on channel");
while let Ok(notification) = listener.recv().await {
let payload = notification.payload();
if payload == "" {
worker
.update(TimeDelta::minutes(30))
.await
.expect("Failed to sync");
continue;
}
match serde_json::from_str::<Payload>(payload) {
Ok(payload) => {
let ids: Vec<i32> = payload
.0
.into_iter()
.filter_map(|v| v.to_i32())
.collect();
if !ids.is_empty() {
if let Err(e) = worker.fetch_time_entries_by_ids(&ids).await {
eprintln!("Error fetching time entries: {}", e);
}
}
}
Err(e) => eprintln!("Error parsing notification payload: {}", e),
}
}
}
async fn sync(Extension(worker): Extension<Worker>) -> Result<impl IntoResponse, AppError> {
worker.update(TimeDelta::days(30)).await?;
@ -17,6 +76,15 @@ async fn sync(Extension(worker): Extension<Worker>) -> Result<impl IntoResponse,
}
pub async fn serve(worker: Worker, ip: IpAddr, port: u16) -> Result<(), AppError> {
// Clone the pool and worker for the notification listener
let notification_pool = worker.db.clone();
let notification_worker = worker.clone();
// Spawn the notification listener
tokio::spawn(async move {
listen_for_changes(notification_pool, notification_worker).await;
});
// build our application with a route
let app = Router::new()
.route("/health", get(|| async { "Ok" }))

View File

@ -1,4 +1,4 @@
use axum::async_trait;
use async_trait::async_trait;
use governor::clock::DefaultClock;
use governor::state::{InMemoryState, NotKeyed};
use std::num::NonZero;

View File

@ -28,6 +28,30 @@ pub struct Worker {
}
impl Worker {
pub async fn fetch_time_entries_by_ids(&self, ids: &[i32]) -> Result<(), AppError> {
// Convert i32 IDs to u64 as that's what the Toggl API expects
let ids: Vec<u64> = ids.iter().map(|&id| id as u64).collect();
// Use the search API with time_entry_ids filter
let time_entries = self
.toggl_api
.search(
self.toggl_api.workspace_id,
TogglReportFilters {
time_entry_ids: Some(ids),
..Default::default()
},
)
.await?;
let time_entries = time_entries
.into_iter()
.map(|entry| entry.into_time_entry(self.toggl_api.workspace_id))
.collect::<Vec<_>>();
self.update_database(time_entries).await
}
async fn get_ids(&self) -> Result<TableSummary, AppError> {
let client_ids = sqlx::query!("select id from tracking_clients")
.fetch_all(&self.db)
@ -75,10 +99,6 @@ impl Worker {
}
pub async fn fetch_changed_since(&self, look_back: TimeDelta) -> Result<(), AppError> {
if look_back > TimeDelta::days(90) {
return Err(AppError::LookBackTooLarge);
}
self.update_time_entries(Utc::now() - look_back).await
}
@ -96,6 +116,10 @@ impl Worker {
}
async fn update_time_entries(&self, fetch_since: DateTime<Utc>) -> Result<(), AppError> {
if Utc::now() - fetch_since > TimeDelta::days(90) {
return Err(AppError::LookBackTooLarge);
}
let time_entries = self
.toggl_api
.get_time_entries_for_user_modified_since(fetch_since)