Compare commits
2 Commits
cfadd9f0aa
...
3f439c8a31
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3f439c8a31 | ||
|
|
5286c49bd3 |
19
Cargo.toml
19
Cargo.toml
@ -4,14 +4,14 @@ version = "0.1.0"
|
|||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
axum = "0.7.5"
|
axum = "0.8.1"
|
||||||
chrono = { version = "0.4.38", features = ["serde"] }
|
chrono = { version = "0.4.38", features = ["serde"] }
|
||||||
governor = "0.6.3"
|
governor = "0.8.0"
|
||||||
reqwest = { version = "0.12.5", default-features = false, features = ["json", "rustls-tls", "http2", "macos-system-configuration", "charset"] }
|
reqwest = { version = "0.12.5", default-features = false, features = ["json", "rustls-tls", "http2", "macos-system-configuration", "charset"] }
|
||||||
reqwest-ratelimit = "0.2.0"
|
reqwest-ratelimit = "0.3.0"
|
||||||
reqwest-middleware = { version = "0.3", features = ["json"] }
|
reqwest-middleware = { version = "0.4.0", features = ["json"] }
|
||||||
reqwest-retry = "0.6"
|
reqwest-retry = "0.7.0"
|
||||||
thiserror = "1.0.62"
|
thiserror = "2.0.11"
|
||||||
tokio = { version = "1.38.0", features = ["full"] }
|
tokio = { version = "1.38.0", features = ["full"] }
|
||||||
base64 = "0.22.1"
|
base64 = "0.22.1"
|
||||||
serde = { version = "1.0.204", features = ["derive"] }
|
serde = { version = "1.0.204", features = ["derive"] }
|
||||||
@ -19,11 +19,12 @@ serde_json = "1.0.120"
|
|||||||
serde_json_path_to_error = "0.1.4"
|
serde_json_path_to_error = "0.1.4"
|
||||||
url = "2.5.2"
|
url = "2.5.2"
|
||||||
serde_with = "3.9.0"
|
serde_with = "3.9.0"
|
||||||
sqlx = { version = "0.7.4", features = ["postgres", "macros", "chrono", "runtime-tokio-rustls"] }
|
sqlx = { version = "0.8.3", features = ["postgres", "macros", "chrono", "runtime-tokio-rustls"] }
|
||||||
futures = "0.3.30"
|
futures = "0.3.30"
|
||||||
tracing = "0.1.40"
|
tracing = "0.1.40"
|
||||||
tracing-subscriber = "0.3.18"
|
tracing-subscriber = "0.3.18"
|
||||||
itertools = "0.13.0"
|
itertools = "0.14.0"
|
||||||
soa-rs = "0.6.1"
|
soa-rs = "0.7.5"
|
||||||
clap = { version = "4.5.11", features = ["derive", "env"] }
|
clap = { version = "4.5.11", features = ["derive", "env"] }
|
||||||
dotenvy = "0.15.7"
|
dotenvy = "0.15.7"
|
||||||
|
async-trait = "0.1.86"
|
||||||
|
|||||||
@ -1,5 +1,5 @@
|
|||||||
use axum::response::IntoResponse;
|
use axum::response::IntoResponse;
|
||||||
use sqlx::{Connection, PgPool};
|
use sqlx::{PgPool};
|
||||||
use toggl::TogglApi;
|
use toggl::TogglApi;
|
||||||
use worker::Worker;
|
use worker::Worker;
|
||||||
|
|
||||||
@ -97,7 +97,7 @@ async fn main() {
|
|||||||
let cli = Cli::parse();
|
let cli = Cli::parse();
|
||||||
|
|
||||||
let toggl_api = TogglApi::new(&cli.api_token, cli.default_workspace_id);
|
let toggl_api = TogglApi::new(&cli.api_token, cli.default_workspace_id);
|
||||||
let mut db = PgPool::connect(&cli.database_url).await.unwrap();
|
let db = PgPool::connect(&cli.database_url).await.unwrap();
|
||||||
|
|
||||||
sqlx::migrate!("./migrations")
|
sqlx::migrate!("./migrations")
|
||||||
.run(&db)
|
.run(&db)
|
||||||
@ -117,6 +117,7 @@ async fn main() {
|
|||||||
.await
|
.await
|
||||||
.expect("Failed to start server");
|
.expect("Failed to start server");
|
||||||
}
|
}
|
||||||
|
|
||||||
Commands::Sync => {
|
Commands::Sync => {
|
||||||
worker
|
worker
|
||||||
.update(TimeDelta::days(30))
|
.update(TimeDelta::days(30))
|
||||||
|
|||||||
@ -1,14 +1,73 @@
|
|||||||
use axum::response::IntoResponse;
|
|
||||||
use axum::{
|
|
||||||
http::StatusCode,
|
|
||||||
routing::{get, post},
|
|
||||||
Extension, Json, Router,
|
|
||||||
};
|
|
||||||
use chrono::TimeDelta;
|
|
||||||
use std::net::IpAddr;
|
|
||||||
|
|
||||||
use crate::worker::Worker;
|
use crate::worker::Worker;
|
||||||
use crate::AppError;
|
use crate::AppError;
|
||||||
|
use axum::response::IntoResponse;
|
||||||
|
use axum::{
|
||||||
|
routing::{get, post},
|
||||||
|
Extension, Router,
|
||||||
|
};
|
||||||
|
use chrono::TimeDelta;
|
||||||
|
use sqlx::postgres::PgListener;
|
||||||
|
use std::net::IpAddr;
|
||||||
|
|
||||||
|
#[derive(Debug, serde::Deserialize)]
|
||||||
|
struct Payload(Vec<PayloadItem>);
|
||||||
|
|
||||||
|
#[derive(Debug, serde::Deserialize)]
|
||||||
|
enum PayloadItem {
|
||||||
|
Num(i32),
|
||||||
|
Str(String),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PayloadItem {
|
||||||
|
fn to_i32(self) -> Option<i32> {
|
||||||
|
match self {
|
||||||
|
PayloadItem::Num(n) => Some(n),
|
||||||
|
PayloadItem::Str(s) => s.parse().ok(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn listen_for_changes(pool: sqlx::PgPool, worker: Worker) {
|
||||||
|
let mut listener = PgListener::connect_with(&pool)
|
||||||
|
.await
|
||||||
|
.expect("Failed to connect to database");
|
||||||
|
|
||||||
|
// Enable LISTEN on the channel
|
||||||
|
listener
|
||||||
|
.listen("toggl_external_changes")
|
||||||
|
.await
|
||||||
|
.expect("Failed to LISTEN on channel");
|
||||||
|
|
||||||
|
while let Ok(notification) = listener.recv().await {
|
||||||
|
let payload = notification.payload();
|
||||||
|
|
||||||
|
if payload == "" {
|
||||||
|
worker
|
||||||
|
.update(TimeDelta::minutes(30))
|
||||||
|
.await
|
||||||
|
.expect("Failed to sync");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
match serde_json::from_str::<Payload>(payload) {
|
||||||
|
Ok(payload) => {
|
||||||
|
let ids: Vec<i32> = payload
|
||||||
|
.0
|
||||||
|
.into_iter()
|
||||||
|
.filter_map(|v| v.to_i32())
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
if !ids.is_empty() {
|
||||||
|
if let Err(e) = worker.fetch_time_entries_by_ids(&ids).await {
|
||||||
|
eprintln!("Error fetching time entries: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Err(e) => eprintln!("Error parsing notification payload: {}", e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async fn sync(Extension(worker): Extension<Worker>) -> Result<impl IntoResponse, AppError> {
|
async fn sync(Extension(worker): Extension<Worker>) -> Result<impl IntoResponse, AppError> {
|
||||||
worker.update(TimeDelta::days(30)).await?;
|
worker.update(TimeDelta::days(30)).await?;
|
||||||
@ -17,6 +76,15 @@ async fn sync(Extension(worker): Extension<Worker>) -> Result<impl IntoResponse,
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn serve(worker: Worker, ip: IpAddr, port: u16) -> Result<(), AppError> {
|
pub async fn serve(worker: Worker, ip: IpAddr, port: u16) -> Result<(), AppError> {
|
||||||
|
// Clone the pool and worker for the notification listener
|
||||||
|
let notification_pool = worker.db.clone();
|
||||||
|
let notification_worker = worker.clone();
|
||||||
|
|
||||||
|
// Spawn the notification listener
|
||||||
|
tokio::spawn(async move {
|
||||||
|
listen_for_changes(notification_pool, notification_worker).await;
|
||||||
|
});
|
||||||
|
|
||||||
// build our application with a route
|
// build our application with a route
|
||||||
let app = Router::new()
|
let app = Router::new()
|
||||||
.route("/health", get(|| async { "Ok" }))
|
.route("/health", get(|| async { "Ok" }))
|
||||||
|
|||||||
@ -1,4 +1,4 @@
|
|||||||
use axum::async_trait;
|
use async_trait::async_trait;
|
||||||
use governor::clock::DefaultClock;
|
use governor::clock::DefaultClock;
|
||||||
use governor::state::{InMemoryState, NotKeyed};
|
use governor::state::{InMemoryState, NotKeyed};
|
||||||
use std::num::NonZero;
|
use std::num::NonZero;
|
||||||
|
|||||||
@ -28,6 +28,30 @@ pub struct Worker {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl Worker {
|
impl Worker {
|
||||||
|
pub async fn fetch_time_entries_by_ids(&self, ids: &[i32]) -> Result<(), AppError> {
|
||||||
|
// Convert i32 IDs to u64 as that's what the Toggl API expects
|
||||||
|
let ids: Vec<u64> = ids.iter().map(|&id| id as u64).collect();
|
||||||
|
|
||||||
|
// Use the search API with time_entry_ids filter
|
||||||
|
let time_entries = self
|
||||||
|
.toggl_api
|
||||||
|
.search(
|
||||||
|
self.toggl_api.workspace_id,
|
||||||
|
TogglReportFilters {
|
||||||
|
time_entry_ids: Some(ids),
|
||||||
|
..Default::default()
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let time_entries = time_entries
|
||||||
|
.into_iter()
|
||||||
|
.map(|entry| entry.into_time_entry(self.toggl_api.workspace_id))
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
self.update_database(time_entries).await
|
||||||
|
}
|
||||||
|
|
||||||
async fn get_ids(&self) -> Result<TableSummary, AppError> {
|
async fn get_ids(&self) -> Result<TableSummary, AppError> {
|
||||||
let client_ids = sqlx::query!("select id from tracking_clients")
|
let client_ids = sqlx::query!("select id from tracking_clients")
|
||||||
.fetch_all(&self.db)
|
.fetch_all(&self.db)
|
||||||
@ -75,10 +99,6 @@ impl Worker {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn fetch_changed_since(&self, look_back: TimeDelta) -> Result<(), AppError> {
|
pub async fn fetch_changed_since(&self, look_back: TimeDelta) -> Result<(), AppError> {
|
||||||
if look_back > TimeDelta::days(90) {
|
|
||||||
return Err(AppError::LookBackTooLarge);
|
|
||||||
}
|
|
||||||
|
|
||||||
self.update_time_entries(Utc::now() - look_back).await
|
self.update_time_entries(Utc::now() - look_back).await
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -96,6 +116,10 @@ impl Worker {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn update_time_entries(&self, fetch_since: DateTime<Utc>) -> Result<(), AppError> {
|
async fn update_time_entries(&self, fetch_since: DateTime<Utc>) -> Result<(), AppError> {
|
||||||
|
if Utc::now() - fetch_since > TimeDelta::days(90) {
|
||||||
|
return Err(AppError::LookBackTooLarge);
|
||||||
|
}
|
||||||
|
|
||||||
let time_entries = self
|
let time_entries = self
|
||||||
.toggl_api
|
.toggl_api
|
||||||
.get_time_entries_for_user_modified_since(fetch_since)
|
.get_time_entries_for_user_modified_since(fetch_since)
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user