Add additional validation
Some checks failed
Build and Publish / Build and Test (push) Failing after 10m39s
Some checks failed
Build and Publish / Build and Test (push) Failing after 10m39s
This commit is contained in:
parent
5286c49bd3
commit
3f439c8a31
@ -1,5 +1,5 @@
|
|||||||
use axum::response::IntoResponse;
|
use axum::response::IntoResponse;
|
||||||
use sqlx::{Connection, PgPool};
|
use sqlx::{PgPool};
|
||||||
use toggl::TogglApi;
|
use toggl::TogglApi;
|
||||||
use worker::Worker;
|
use worker::Worker;
|
||||||
|
|
||||||
@ -97,7 +97,7 @@ async fn main() {
|
|||||||
let cli = Cli::parse();
|
let cli = Cli::parse();
|
||||||
|
|
||||||
let toggl_api = TogglApi::new(&cli.api_token, cli.default_workspace_id);
|
let toggl_api = TogglApi::new(&cli.api_token, cli.default_workspace_id);
|
||||||
let mut db = PgPool::connect(&cli.database_url).await.unwrap();
|
let db = PgPool::connect(&cli.database_url).await.unwrap();
|
||||||
|
|
||||||
sqlx::migrate!("./migrations")
|
sqlx::migrate!("./migrations")
|
||||||
.run(&db)
|
.run(&db)
|
||||||
|
|||||||
@ -1,14 +1,73 @@
|
|||||||
use axum::response::IntoResponse;
|
|
||||||
use axum::{
|
|
||||||
http::StatusCode,
|
|
||||||
routing::{get, post},
|
|
||||||
Extension, Json, Router,
|
|
||||||
};
|
|
||||||
use chrono::TimeDelta;
|
|
||||||
use std::net::IpAddr;
|
|
||||||
|
|
||||||
use crate::worker::Worker;
|
use crate::worker::Worker;
|
||||||
use crate::AppError;
|
use crate::AppError;
|
||||||
|
use axum::response::IntoResponse;
|
||||||
|
use axum::{
|
||||||
|
routing::{get, post},
|
||||||
|
Extension, Router,
|
||||||
|
};
|
||||||
|
use chrono::TimeDelta;
|
||||||
|
use sqlx::postgres::PgListener;
|
||||||
|
use std::net::IpAddr;
|
||||||
|
|
||||||
|
#[derive(Debug, serde::Deserialize)]
|
||||||
|
struct Payload(Vec<PayloadItem>);
|
||||||
|
|
||||||
|
#[derive(Debug, serde::Deserialize)]
|
||||||
|
enum PayloadItem {
|
||||||
|
Num(i32),
|
||||||
|
Str(String),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PayloadItem {
|
||||||
|
fn to_i32(self) -> Option<i32> {
|
||||||
|
match self {
|
||||||
|
PayloadItem::Num(n) => Some(n),
|
||||||
|
PayloadItem::Str(s) => s.parse().ok(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn listen_for_changes(pool: sqlx::PgPool, worker: Worker) {
|
||||||
|
let mut listener = PgListener::connect_with(&pool)
|
||||||
|
.await
|
||||||
|
.expect("Failed to connect to database");
|
||||||
|
|
||||||
|
// Enable LISTEN on the channel
|
||||||
|
listener
|
||||||
|
.listen("toggl_external_changes")
|
||||||
|
.await
|
||||||
|
.expect("Failed to LISTEN on channel");
|
||||||
|
|
||||||
|
while let Ok(notification) = listener.recv().await {
|
||||||
|
let payload = notification.payload();
|
||||||
|
|
||||||
|
if payload == "" {
|
||||||
|
worker
|
||||||
|
.update(TimeDelta::minutes(30))
|
||||||
|
.await
|
||||||
|
.expect("Failed to sync");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
match serde_json::from_str::<Payload>(payload) {
|
||||||
|
Ok(payload) => {
|
||||||
|
let ids: Vec<i32> = payload
|
||||||
|
.0
|
||||||
|
.into_iter()
|
||||||
|
.filter_map(|v| v.to_i32())
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
if !ids.is_empty() {
|
||||||
|
if let Err(e) = worker.fetch_time_entries_by_ids(&ids).await {
|
||||||
|
eprintln!("Error fetching time entries: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Err(e) => eprintln!("Error parsing notification payload: {}", e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async fn sync(Extension(worker): Extension<Worker>) -> Result<impl IntoResponse, AppError> {
|
async fn sync(Extension(worker): Extension<Worker>) -> Result<impl IntoResponse, AppError> {
|
||||||
worker.update(TimeDelta::days(30)).await?;
|
worker.update(TimeDelta::days(30)).await?;
|
||||||
@ -17,6 +76,15 @@ async fn sync(Extension(worker): Extension<Worker>) -> Result<impl IntoResponse,
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn serve(worker: Worker, ip: IpAddr, port: u16) -> Result<(), AppError> {
|
pub async fn serve(worker: Worker, ip: IpAddr, port: u16) -> Result<(), AppError> {
|
||||||
|
// Clone the pool and worker for the notification listener
|
||||||
|
let notification_pool = worker.db.clone();
|
||||||
|
let notification_worker = worker.clone();
|
||||||
|
|
||||||
|
// Spawn the notification listener
|
||||||
|
tokio::spawn(async move {
|
||||||
|
listen_for_changes(notification_pool, notification_worker).await;
|
||||||
|
});
|
||||||
|
|
||||||
// build our application with a route
|
// build our application with a route
|
||||||
let app = Router::new()
|
let app = Router::new()
|
||||||
.route("/health", get(|| async { "Ok" }))
|
.route("/health", get(|| async { "Ok" }))
|
||||||
|
|||||||
@ -28,6 +28,30 @@ pub struct Worker {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl Worker {
|
impl Worker {
|
||||||
|
pub async fn fetch_time_entries_by_ids(&self, ids: &[i32]) -> Result<(), AppError> {
|
||||||
|
// Convert i32 IDs to u64 as that's what the Toggl API expects
|
||||||
|
let ids: Vec<u64> = ids.iter().map(|&id| id as u64).collect();
|
||||||
|
|
||||||
|
// Use the search API with time_entry_ids filter
|
||||||
|
let time_entries = self
|
||||||
|
.toggl_api
|
||||||
|
.search(
|
||||||
|
self.toggl_api.workspace_id,
|
||||||
|
TogglReportFilters {
|
||||||
|
time_entry_ids: Some(ids),
|
||||||
|
..Default::default()
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let time_entries = time_entries
|
||||||
|
.into_iter()
|
||||||
|
.map(|entry| entry.into_time_entry(self.toggl_api.workspace_id))
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
self.update_database(time_entries).await
|
||||||
|
}
|
||||||
|
|
||||||
async fn get_ids(&self) -> Result<TableSummary, AppError> {
|
async fn get_ids(&self) -> Result<TableSummary, AppError> {
|
||||||
let client_ids = sqlx::query!("select id from tracking_clients")
|
let client_ids = sqlx::query!("select id from tracking_clients")
|
||||||
.fetch_all(&self.db)
|
.fetch_all(&self.db)
|
||||||
@ -75,10 +99,6 @@ impl Worker {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn fetch_changed_since(&self, look_back: TimeDelta) -> Result<(), AppError> {
|
pub async fn fetch_changed_since(&self, look_back: TimeDelta) -> Result<(), AppError> {
|
||||||
if look_back > TimeDelta::days(90) {
|
|
||||||
return Err(AppError::LookBackTooLarge);
|
|
||||||
}
|
|
||||||
|
|
||||||
self.update_time_entries(Utc::now() - look_back).await
|
self.update_time_entries(Utc::now() - look_back).await
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -96,6 +116,10 @@ impl Worker {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn update_time_entries(&self, fetch_since: DateTime<Utc>) -> Result<(), AppError> {
|
async fn update_time_entries(&self, fetch_since: DateTime<Utc>) -> Result<(), AppError> {
|
||||||
|
if Utc::now() - fetch_since > TimeDelta::days(90) {
|
||||||
|
return Err(AppError::LookBackTooLarge);
|
||||||
|
}
|
||||||
|
|
||||||
let time_entries = self
|
let time_entries = self
|
||||||
.toggl_api
|
.toggl_api
|
||||||
.get_time_entries_for_user_modified_since(fetch_since)
|
.get_time_entries_for_user_modified_since(fetch_since)
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user