Use ? operator for errors

This commit is contained in:
Joshua Coles 2024-04-25 14:27:12 +01:00
parent 56a60d969d
commit 56829cfa58

View File

@ -5,7 +5,7 @@ use std::path::PathBuf;
use chrono::{DateTime, Utc};
use clap::Parser;
use itertools::Itertools;
use sqlx::FromRow;
use sqlx::{FromRow, PgPool, Pool, Postgres};
use serde_json::Value;
use uuid::Uuid;
@ -129,7 +129,7 @@ async fn find_updated(db: &sqlx::PgPool, files: ReadDir) -> Vec<UpdatedFile> {
}
#[tokio::main]
async fn main() {
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let cli = Cli::parse();
@ -149,17 +149,16 @@ async fn main() {
let need_refresh = find_updated(&db, files)
.await;
// Refresh the database with the new files
for updated_file in &need_refresh {
sqlx::query("INSERT INTO raw_files (date, sha256, json) VALUES ($1, $2, $3 :: jsonb) ON CONFLICT (date) DO UPDATE SET sha256 = excluded.sha256, json = excluded.json")
.bind(&updated_file.date)
.bind(&updated_file.sha256)
.bind(&serde_json::to_value(&updated_file.json).unwrap())
.execute(&db)
.await
.unwrap();
}
upload_files(&db, &need_refresh)
.await?;
update_data(&db, need_refresh)
.await?;
Ok(())
}
async fn update_data(db: &Pool<Postgres>, need_refresh: Vec<UpdatedFile>) -> anyhow::Result<()> {
// Take all the changed files' timeline items, and group them by item_id, then take the latest one.
// If we are needing to update the database it will be with this one.
let possibly_timeline_items = need_refresh.into_iter()
@ -176,9 +175,8 @@ async fn main() {
let existing_last_saved_at_map = sqlx::query_as("SELECT item_id, end_date, last_saved FROM timeline_item where item_id = ANY($1)")
.bind(&possibly_timeline_item_ids)
.fetch_all(&db)
.await
.unwrap()
.fetch_all(db)
.await?
.into_iter()
.map(|row: TimelineItemUpdatedCheckRow| (row.item_id, row.last_saved))
.collect::<HashMap<_, _>>();
@ -199,18 +197,37 @@ async fn main() {
if let Some(place) = &updated_timeline_item.place {
sqlx::query("INSERT INTO place (place_id, json, last_saved, server_last_updated) VALUES ($1, $2 :: jsonb, $3, now()) ON CONFLICT (place_id) DO UPDATE SET json = $2 :: jsonb, last_saved = $3, server_last_updated = now()")
.bind(&place.place_id)
.bind(&serde_json::to_value(&place).unwrap())
.bind(&serde_json::to_value(&place)?)
.bind(&place.last_saved)
.execute(&db).await.unwrap();
.execute(db)
.await?;
}
// Then we can insert/update the timeline item.
sqlx::query("INSERT INTO timeline_item (item_id, json, place_id, end_date, last_saved, server_last_updated) VALUES ($1, $2 :: jsonb, $3, $4, $5, now()) ON CONFLICT (item_id) DO UPDATE SET json = $2 :: jsonb, place_id = $3, end_date = $4, last_saved = $5, server_last_updated = now()")
.bind(&updated_timeline_item.item_id)
.bind(&serde_json::to_value(&updated_timeline_item).unwrap())
.bind(&serde_json::to_value(&updated_timeline_item)?)
.bind(&updated_timeline_item.place.map(|place| place.place_id))
.bind(&updated_timeline_item.end_date)
.bind(&updated_timeline_item.last_saved)
.execute(&db).await.unwrap();
.execute(db)
.await?;
}
Ok(())
}
#[tracing::instrument(skip(db, need_refresh), err)]
async fn upload_files(db: &PgPool, need_refresh: &[UpdatedFile]) -> anyhow::Result<()> {
// Refresh the database with the new files
for updated_file in need_refresh {
sqlx::query("INSERT INTO raw_files (date, sha256, json) VALUES ($1, $2, $3 :: jsonb) ON CONFLICT (date) DO UPDATE SET sha256 = excluded.sha256, json = excluded.json")
.bind(&updated_file.date)
.bind(&updated_file.sha256)
.bind(&serde_json::to_value(&updated_file.json)?)
.execute(db)
.await?;
}
Ok(())
}