diff --git a/src/main.rs b/src/main.rs index 0dcc507..6d2b66e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,7 +5,7 @@ use std::path::PathBuf; use chrono::{DateTime, Utc}; use clap::Parser; use itertools::Itertools; -use sqlx::FromRow; +use sqlx::{FromRow, PgPool, Pool, Postgres}; use serde_json::Value; use uuid::Uuid; @@ -129,7 +129,7 @@ async fn find_updated(db: &sqlx::PgPool, files: ReadDir) -> Vec { } #[tokio::main] -async fn main() { +async fn main() -> anyhow::Result<()> { tracing_subscriber::fmt::init(); let cli = Cli::parse(); @@ -149,17 +149,16 @@ async fn main() { let need_refresh = find_updated(&db, files) .await; - // Refresh the database with the new files - for updated_file in &need_refresh { - sqlx::query("INSERT INTO raw_files (date, sha256, json) VALUES ($1, $2, $3 :: jsonb) ON CONFLICT (date) DO UPDATE SET sha256 = excluded.sha256, json = excluded.json") - .bind(&updated_file.date) - .bind(&updated_file.sha256) - .bind(&serde_json::to_value(&updated_file.json).unwrap()) - .execute(&db) - .await - .unwrap(); - } + upload_files(&db, &need_refresh) + .await?; + update_data(&db, need_refresh) + .await?; + + Ok(()) +} + +async fn update_data(db: &Pool, need_refresh: Vec) -> anyhow::Result<()> { // Take all the changed files' timeline items, and group them by item_id, then take the latest one. // If we are needing to update the database it will be with this one. let possibly_timeline_items = need_refresh.into_iter() @@ -176,9 +175,8 @@ async fn main() { let existing_last_saved_at_map = sqlx::query_as("SELECT item_id, end_date, last_saved FROM timeline_item where item_id = ANY($1)") .bind(&possibly_timeline_item_ids) - .fetch_all(&db) - .await - .unwrap() + .fetch_all(db) + .await? .into_iter() .map(|row: TimelineItemUpdatedCheckRow| (row.item_id, row.last_saved)) .collect::>(); @@ -199,18 +197,37 @@ async fn main() { if let Some(place) = &updated_timeline_item.place { sqlx::query("INSERT INTO place (place_id, json, last_saved, server_last_updated) VALUES ($1, $2 :: jsonb, $3, now()) ON CONFLICT (place_id) DO UPDATE SET json = $2 :: jsonb, last_saved = $3, server_last_updated = now()") .bind(&place.place_id) - .bind(&serde_json::to_value(&place).unwrap()) + .bind(&serde_json::to_value(&place)?) .bind(&place.last_saved) - .execute(&db).await.unwrap(); + .execute(db) + .await?; } // Then we can insert/update the timeline item. sqlx::query("INSERT INTO timeline_item (item_id, json, place_id, end_date, last_saved, server_last_updated) VALUES ($1, $2 :: jsonb, $3, $4, $5, now()) ON CONFLICT (item_id) DO UPDATE SET json = $2 :: jsonb, place_id = $3, end_date = $4, last_saved = $5, server_last_updated = now()") .bind(&updated_timeline_item.item_id) - .bind(&serde_json::to_value(&updated_timeline_item).unwrap()) + .bind(&serde_json::to_value(&updated_timeline_item)?) .bind(&updated_timeline_item.place.map(|place| place.place_id)) .bind(&updated_timeline_item.end_date) .bind(&updated_timeline_item.last_saved) - .execute(&db).await.unwrap(); + .execute(db) + .await?; } + + Ok(()) +} + +#[tracing::instrument(skip(db, need_refresh), err)] +async fn upload_files(db: &PgPool, need_refresh: &[UpdatedFile]) -> anyhow::Result<()> { + // Refresh the database with the new files + for updated_file in need_refresh { + sqlx::query("INSERT INTO raw_files (date, sha256, json) VALUES ($1, $2, $3 :: jsonb) ON CONFLICT (date) DO UPDATE SET sha256 = excluded.sha256, json = excluded.json") + .bind(&updated_file.date) + .bind(&updated_file.sha256) + .bind(&serde_json::to_value(&updated_file.json)?) + .execute(db) + .await?; + } + + Ok(()) }