Compare commits

...

3 Commits

Author SHA1 Message Date
e73123b7f9 Add SQL mode and make it use the query! macro for type checking
All checks were successful
Rust / build (push) Successful in 55s
2024-04-26 21:44:04 +01:00
ef15a630d6 Add some logging 2024-04-26 20:37:30 +01:00
56829cfa58 Use ? operator for errors 2024-04-25 14:27:12 +01:00
7 changed files with 114 additions and 21 deletions

View File

@ -45,6 +45,8 @@ jobs:
- name: Build release binary
run: cargo build --release
env:
SQLX_OFFLINE: true
- name: Upload binary
uses: actions/upload-artifact@v3

View File

@ -0,0 +1,22 @@
{
"db_name": "PostgreSQL",
"query": "with timelineItems as (select jsonb_array_elements(raw_files.json -> 'timelineItems') as timelineItem\n from raw_files\n where date = ANY ($1)),\n max_last_saved as (select timelineItem ->> 'itemId' as itemId,\n max((timelineItem ->> 'lastSaved') :: timestamptz) as latest_last_saved\n from timelineItems\n group by timelineItem ->> 'itemId'),\n unique_timline_items as (select distinct on (max_last_saved.itemId) *\n from max_last_saved\n inner join timelineItems\n on timelineItems.timelineItem ->> 'itemId' = max_last_saved.itemId\n and (timelineItems.timelineItem ->> 'lastSaved') :: timestamptz =\n max_last_saved.latest_last_saved)\ninsert\ninto public.timeline_item (item_id, json, place_id, end_date, last_saved, server_last_updated)\nselect unique_timline_items.itemId :: uuid as item_id,\n unique_timline_items.timelineItem as json,\n (unique_timline_items.timelineItem -> 'place' ->> 'placeId') :: uuid as place_id,\n (unique_timline_items.timelineItem ->> 'endDate') :: timestamptz as end_date,\n unique_timline_items.latest_last_saved :: timestamptz as last_saved,\n now() as server_last_updated\nfrom unique_timline_items\non conflict (item_id) do update set json = excluded.json,\n place_id = excluded.place_id,\n end_date = excluded.end_date,\n last_saved = excluded.last_saved,\n server_last_updated = excluded.server_last_updated\nwhere excluded.last_saved > public.timeline_item.last_saved\nreturning item_id;\n",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "item_id",
"type_info": "Uuid"
}
],
"parameters": {
"Left": [
"TextArray"
]
},
"nullable": [
false
]
},
"hash": "ac15bfcd1737751e27388ffddfe8ec47fd7277be74c47a59484e4fc993671d43"
}

View File

@ -0,0 +1,14 @@
{
"db_name": "PostgreSQL",
"query": "with timelineItems as (select jsonb_array_elements(raw_files.json -> 'timelineItems') as timelineItem\n from raw_files\n where date = ANY ($1)),\n places as (select distinct on (md5(timelineItem ->> 'place' :: text)) timelineItem -> 'place' as place,\n timelineItem -> 'place' ->> 'placeId' as placeId,\n (timelineItem -> 'place' ->> 'lastSaved') :: timestamptz as lastSaved\n from timelineItems\n where timelineItem ->> 'place' is not null),\n places_with_max_last_saved as (select place -> 'placeId' as placeId,\n max((place ->> 'lastSaved') :: timestamptz) as latest_last_saved\n from places\n group by place -> 'placeId'),\n latest_places as (select places.*\n from places_with_max_last_saved\n inner join places on places.place -> 'placeId' = places_with_max_last_saved.placeId and\n places.lastSaved =\n places_with_max_last_saved.latest_last_saved)\ninsert\ninto public.place (place_id, json, last_saved, server_last_updated)\nselect (placeId :: uuid) as place_id, place as json, lastSaved as last_saved, now() as server_last_updated\nfrom latest_places\non conflict (place_id) do update set json = excluded.json,\n last_saved = excluded.last_saved,\n server_last_updated = excluded.server_last_updated\nwhere excluded.last_saved > public.place.last_saved;\n",
"describe": {
"columns": [],
"parameters": {
"Left": [
"TextArray"
]
},
"nullable": []
},
"hash": "f0402ce4a5c93837f39559bfb3358a7655f178080465f7abd7f7702d42474157"
}

View File

@ -2,4 +2,5 @@
fn main() {
// trigger recompilation when a new migration is added
println!("cargo:rerun-if-changed=migrations");
println!("cargo:rustc-env=DATABASE_URL=postgres://joshuacoles@localhost/arc_test");
}

View File

@ -1,5 +1,6 @@
with timelineItems as (select jsonb_array_elements(raw_files.json -> 'timelineItems') as timelineItem
from raw_files),
from raw_files
where date = ANY ($1)),
places as (select distinct on (md5(timelineItem ->> 'place' :: text)) timelineItem -> 'place' as place,
timelineItem -> 'place' ->> 'placeId' as placeId,
(timelineItem -> 'place' ->> 'lastSaved') :: timestamptz as lastSaved

View File

@ -1,5 +1,6 @@
with timelineItems as (select jsonb_array_elements(raw_files.json -> 'timelineItems') as timelineItem
from raw_files),
from raw_files
where date = ANY ($1)),
max_last_saved as (select timelineItem ->> 'itemId' as itemId,
max((timelineItem ->> 'lastSaved') :: timestamptz) as latest_last_saved
from timelineItems

View File

@ -5,8 +5,9 @@ use std::path::PathBuf;
use chrono::{DateTime, Utc};
use clap::Parser;
use itertools::Itertools;
use sqlx::FromRow;
use sqlx::{Executor, FromRow, PgPool, Pool, Postgres};
use serde_json::Value;
use tracing::instrument;
use uuid::Uuid;
#[derive(Parser, Debug)]
@ -18,6 +19,9 @@ struct Cli {
/// psql connection string
#[arg(long, short, env = "ARC_DB")]
conn: String,
#[arg(long)]
sql_only: bool,
}
#[derive(Debug, FromRow)]
@ -83,7 +87,8 @@ fn hash_files(files: ReadDir) -> impl Iterator<Item=(String, PathBuf, String)> {
})
}
async fn find_updated(db: &sqlx::PgPool, files: ReadDir) -> Vec<UpdatedFile> {
#[instrument(skip(db, files))]
async fn find_updated(db: &PgPool, files: ReadDir) -> Vec<UpdatedFile> {
let date_hashes: Vec<DateHash> = sqlx::query_as("SELECT date, sha256 FROM raw_files")
.fetch_all(db)
.await
@ -96,6 +101,9 @@ async fn find_updated(db: &sqlx::PgPool, files: ReadDir) -> Vec<UpdatedFile> {
let new_hashes = hash_files(files);
new_hashes.filter_map(|(date, path, new_hash)| {
let span = tracing::span!(tracing::Level::DEBUG, "considering_file", path = ?path, date = ?date);
let _enter = span.enter();
tracing::debug!("Considering file for updates {path:?} (read as date: {date})");
if let Some(existing_hash) = date_hashes.get(&date) {
if new_hash == *existing_hash {
@ -129,7 +137,7 @@ async fn find_updated(db: &sqlx::PgPool, files: ReadDir) -> Vec<UpdatedFile> {
}
#[tokio::main]
async fn main() {
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let cli = Cli::parse();
@ -149,17 +157,43 @@ async fn main() {
let need_refresh = find_updated(&db, files)
.await;
// Refresh the database with the new files
for updated_file in &need_refresh {
sqlx::query("INSERT INTO raw_files (date, sha256, json) VALUES ($1, $2, $3 :: jsonb) ON CONFLICT (date) DO UPDATE SET sha256 = excluded.sha256, json = excluded.json")
.bind(&updated_file.date)
.bind(&updated_file.sha256)
.bind(&serde_json::to_value(&updated_file.json).unwrap())
.execute(&db)
.await
.unwrap();
upload_files(&db, &need_refresh)
.await?;
if cli.sql_only {
update_data_sql(&db, need_refresh)
.await?;
} else {
update_data(&db, need_refresh)
.await?;
}
Ok(())
}
#[instrument(skip(db, need_refresh))]
async fn update_data_sql(db: &PgPool, need_refresh: Vec<UpdatedFile>) -> anyhow::Result<()> {
let vec = need_refresh.iter()
.map(|d| d.date.clone())
.collect_vec();
let result = sqlx::query_file!("functions/update_places.sql", &vec)
.execute(db)
.await?;
tracing::info!("Updated {} places", result.rows_affected());
let updated = sqlx::query_file!("functions/update_timeline_items.sql", &vec)
.fetch_all(db)
.await?;
tracing::info!("Updated {} timeline items", updated.len());
Ok(())
}
#[instrument(skip(db, need_refresh))]
async fn update_data(db: &Pool<Postgres>, need_refresh: Vec<UpdatedFile>) -> anyhow::Result<()> {
// Take all the changed files' timeline items, and group them by item_id, then take the latest one.
// If we are needing to update the database it will be with this one.
let possibly_timeline_items = need_refresh.into_iter()
@ -176,9 +210,8 @@ async fn main() {
let existing_last_saved_at_map = sqlx::query_as("SELECT item_id, end_date, last_saved FROM timeline_item where item_id = ANY($1)")
.bind(&possibly_timeline_item_ids)
.fetch_all(&db)
.await
.unwrap()
.fetch_all(db)
.await?
.into_iter()
.map(|row: TimelineItemUpdatedCheckRow| (row.item_id, row.last_saved))
.collect::<HashMap<_, _>>();
@ -199,18 +232,37 @@ async fn main() {
if let Some(place) = &updated_timeline_item.place {
sqlx::query("INSERT INTO place (place_id, json, last_saved, server_last_updated) VALUES ($1, $2 :: jsonb, $3, now()) ON CONFLICT (place_id) DO UPDATE SET json = $2 :: jsonb, last_saved = $3, server_last_updated = now()")
.bind(&place.place_id)
.bind(&serde_json::to_value(&place).unwrap())
.bind(&serde_json::to_value(&place)?)
.bind(&place.last_saved)
.execute(&db).await.unwrap();
.execute(db)
.await?;
}
// Then we can insert/update the timeline item.
sqlx::query("INSERT INTO timeline_item (item_id, json, place_id, end_date, last_saved, server_last_updated) VALUES ($1, $2 :: jsonb, $3, $4, $5, now()) ON CONFLICT (item_id) DO UPDATE SET json = $2 :: jsonb, place_id = $3, end_date = $4, last_saved = $5, server_last_updated = now()")
.bind(&updated_timeline_item.item_id)
.bind(&serde_json::to_value(&updated_timeline_item).unwrap())
.bind(&serde_json::to_value(&updated_timeline_item)?)
.bind(&updated_timeline_item.place.map(|place| place.place_id))
.bind(&updated_timeline_item.end_date)
.bind(&updated_timeline_item.last_saved)
.execute(&db).await.unwrap();
.execute(db)
.await?;
}
Ok(())
}
#[tracing::instrument(skip(db, need_refresh), err)]
async fn upload_files(db: &PgPool, need_refresh: &[UpdatedFile]) -> anyhow::Result<()> {
// Refresh the database with the new files
for updated_file in need_refresh {
sqlx::query("INSERT INTO raw_files (date, sha256, json) VALUES ($1, $2, $3 :: jsonb) ON CONFLICT (date) DO UPDATE SET sha256 = excluded.sha256, json = excluded.json")
.bind(&updated_file.date)
.bind(&updated_file.sha256)
.bind(&serde_json::to_value(&updated_file.json)?)
.execute(db)
.await?;
}
Ok(())
}