use crate::client::TogglClient; use crate::entity::prelude::TimeEntry; use crate::entity::time_entry; use crate::entity::time_entry::ActiveModel; use crate::types::{Current, Project, ProjectClient, ReportEntry, TogglQuery}; use anyhow::anyhow; use axum::extract::multipart::Field; use axum::extract::{Multipart, Query}; use axum::http::StatusCode; use axum::response::IntoResponse; use axum::routing::{get, post}; use axum::{Extension, Json, Router}; use base64::engine::general_purpose::STANDARD; use base64::Engine; use utils::{shutdown_signal, Result}; use chrono::{NaiveDate, NaiveTime}; use clap::Parser; use migration::{Migrator, MigratorTrait}; use sea_orm::sea_query::IntoCondition; use sea_orm::{ColumnTrait, Condition, DatabaseConnection, EntityTrait, QueryFilter}; use serde::Deserialize; use serde_json::Value; use std::collections::HashMap; use std::net::SocketAddr; use tower_http::trace::TraceLayer; use tracing::{debug, instrument}; mod client; mod db; mod entity; mod poll; mod types; mod utils; mod csv_parser; #[derive(Debug, Clone, Parser)] struct Config { #[arg(long = "workspace", short, env)] workspace_id: u32, #[arg(long = "token", short, env)] toggl_api_token: String, #[arg(long = "addr", short, env, default_value = "0.0.0.0:3000")] address: SocketAddr, #[arg(long = "db", short, env)] database_url: String, /// How often to poll the Toggl API for new time entries #[arg(long, short, env, default_value = "7200")] poll_period: u64, } #[instrument(skip(db, toggl_client))] pub async fn report( Extension(toggl_client): Extension, Extension(db): Extension, Json(query): Json, ) -> Result>> { let report = toggl_client.full_report(&query).await?; debug!("Returned results: {:?}", report); // We don't perform any deletes on report-fetched entries cache_report(&db, &report, None).await?; Ok(Json(report)) } #[instrument(skip_all)] async fn cache_report( db: &DatabaseConnection, models: &Vec, exclusive_on: Option, ) -> Result<()> { let models = models.iter().flat_map(|entry| entry.as_models()); let models = models.collect::>(); let ids = models .iter() .map(|entry| entry.toggl_id.clone().unwrap()) .collect::>(); debug!("Caching report entries: {:?}", models); // TODO: Why is this needed? if models.is_empty() { return Ok(()); } TimeEntry::insert_many(models) .on_conflict(ReportEntry::grafting_conflict_statement()) .exec(db) .await?; if let Some(exclusive_on) = exclusive_on { TimeEntry::delete_many() .filter( Condition::all() .add(exclusive_on) .add(time_entry::Column::TogglId.is_in(ids).not()), ) .exec(db) .await?; } Ok(()) } #[instrument(skip(toggl_client))] pub async fn current( Extension(toggl_client): Extension, ) -> Result>> { Ok(toggl_client.get_current().await.map(Json)?) } #[instrument(skip(toggl_client))] pub async fn start_time_entry( Extension(toggl_client): Extension, Json(body): Json>, ) -> Result { toggl_client.start_time_entry(body).await?; Ok((StatusCode::OK, "Ok")) } #[instrument(skip(db, toggl_client))] async fn projects( Extension(db): Extension, Extension(toggl_client): Extension, ) -> Result>> { let projects = toggl_client.fetch_projects().await?; entity::project::Entity::insert_many(projects.iter().map(Project::as_model)) .on_conflict(Project::grafting_conflict_statement()) .exec(&db) .await?; Ok(Json(projects)) } #[instrument(skip(toggl_client, db))] async fn clients( Extension(db): Extension, Extension(toggl_client): Extension, ) -> Result>> { let clients = toggl_client.fetch_clients().await?; entity::client::Entity::insert_many(clients.iter().map(ProjectClient::as_model)) .on_conflict(ProjectClient::grafting_conflict_statement()) .exec(&db) .await?; Ok(Json(clients)) } async fn health(Extension(toggl_client): Extension) -> Result<&'static str> { return if toggl_client.check_health().await { Ok("Ok") } else { Err(anyhow!("Panopto health check failed").into()) }; } #[derive(Debug, Clone, Deserialize)] struct RefreshQuery { start_date: Option, } #[instrument(skip(toggl_client, db))] async fn refresh( Extension(toggl_client): Extension, Extension(db): Extension, Query(RefreshQuery { start_date }): Query, ) -> Result<&'static str> { let end_date = chrono::Utc::now(); let end_date_query_string = end_date.date_naive().format("%Y-%m-%d").to_string(); let start_date_query_string = start_date.unwrap_or(end_date_query_string.clone()); let start_date = NaiveDate::parse_from_str(&start_date_query_string, "%Y-%m-%d")?; let query = TogglQuery { start_date: Some(start_date_query_string), end_date: Some(end_date_query_string), ..Default::default() }; let report = toggl_client.full_report(&query).await?; let exclusivity_condition = day_exclusivity_condition(start_date, end_date.date_naive()); cache_report(&db, &report, Some(exclusivity_condition)).await?; Ok("Ok") } fn day_exclusivity_condition(start: NaiveDate, end: NaiveDate) -> Condition { time_entry::Column::Start .between( start.and_time(NaiveTime::from_hms_opt(0, 0, 0).unwrap()), end.and_time(NaiveTime::from_hms_opt(23, 59, 59).unwrap()), ) .into_condition() } fn from_csv_row(row: csv::StringRecord) -> ActiveModel { unimplemented!("Need to refactor db first") } async fn import_csv( Extension(db): Extension, mut multipart: Multipart, ) -> Result { return Ok((StatusCode::NOT_IMPLEMENTED, "Not implemented")); // while let Some(field) = multipart.next_field().await? { // // if let Some("csv") = field.name() { // // let csv = field.bytes().await?; // // let mut csv = csv::Reader::from_reader(csv.as_ref()); // // let data = csv.records().filter_map(|f| f.ok()).map(from_csv_row); // // // // time_entry::Entity::insert_many(data.collect::>().unwrap()) // // .on_conflict(ReportEntry::grafting_conflict_statement()) // // .exec(&db) // // .await // // .unwrap() // // } // } } #[tokio::main] async fn main() -> Result<()> { // install global collector configured based on RUST_LOG env var. tracing_subscriber::fmt::init(); let config = Config::parse(); let toggl_client = TogglClient::new( &config.workspace_id.to_string(), &STANDARD.encode(&format!("{}:api_token", config.toggl_api_token)), ); let db = sea_orm::Database::connect(config.database_url) .await .unwrap(); Migrator::up(&db, None).await.expect("Failed to migrate"); tokio::spawn(poll::poll_job( toggl_client.clone(), db.clone(), config.poll_period, )); // build our application with a route let app = Router::new() .route("/import_csv", post(import_csv)) .route("/health", get(health)) .route("/current", get(current)) .route("/refresh", post(refresh)) .route("/report", post(report)) .route("/projects", get(projects)) .route("/clients", get(clients)) .route("/start_time_entry", post(start_time_entry)) .layer(Extension(toggl_client)) .layer(Extension(db)) .layer(TraceLayer::new_for_http()); tracing::info!("Listening on {}", config.address); axum::Server::try_bind(&config.address)? .serve(app.into_make_service()) .with_graceful_shutdown(shutdown_signal()) .await?; Ok(()) }