Compare commits

..

No commits in common. "e73123b7f92f407da96a29754dae34c7dba195ac" and "56a60d969d10779cad94515b517f6b065f03db41" have entirely different histories.

7 changed files with 21 additions and 114 deletions

View File

@ -45,8 +45,6 @@ jobs:
- name: Build release binary - name: Build release binary
run: cargo build --release run: cargo build --release
env:
SQLX_OFFLINE: true
- name: Upload binary - name: Upload binary
uses: actions/upload-artifact@v3 uses: actions/upload-artifact@v3

View File

@ -1,22 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "with timelineItems as (select jsonb_array_elements(raw_files.json -> 'timelineItems') as timelineItem\n from raw_files\n where date = ANY ($1)),\n max_last_saved as (select timelineItem ->> 'itemId' as itemId,\n max((timelineItem ->> 'lastSaved') :: timestamptz) as latest_last_saved\n from timelineItems\n group by timelineItem ->> 'itemId'),\n unique_timline_items as (select distinct on (max_last_saved.itemId) *\n from max_last_saved\n inner join timelineItems\n on timelineItems.timelineItem ->> 'itemId' = max_last_saved.itemId\n and (timelineItems.timelineItem ->> 'lastSaved') :: timestamptz =\n max_last_saved.latest_last_saved)\ninsert\ninto public.timeline_item (item_id, json, place_id, end_date, last_saved, server_last_updated)\nselect unique_timline_items.itemId :: uuid as item_id,\n unique_timline_items.timelineItem as json,\n (unique_timline_items.timelineItem -> 'place' ->> 'placeId') :: uuid as place_id,\n (unique_timline_items.timelineItem ->> 'endDate') :: timestamptz as end_date,\n unique_timline_items.latest_last_saved :: timestamptz as last_saved,\n now() as server_last_updated\nfrom unique_timline_items\non conflict (item_id) do update set json = excluded.json,\n place_id = excluded.place_id,\n end_date = excluded.end_date,\n last_saved = excluded.last_saved,\n server_last_updated = excluded.server_last_updated\nwhere excluded.last_saved > public.timeline_item.last_saved\nreturning item_id;\n",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "item_id",
"type_info": "Uuid"
}
],
"parameters": {
"Left": [
"TextArray"
]
},
"nullable": [
false
]
},
"hash": "ac15bfcd1737751e27388ffddfe8ec47fd7277be74c47a59484e4fc993671d43"
}

View File

@ -1,14 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "with timelineItems as (select jsonb_array_elements(raw_files.json -> 'timelineItems') as timelineItem\n from raw_files\n where date = ANY ($1)),\n places as (select distinct on (md5(timelineItem ->> 'place' :: text)) timelineItem -> 'place' as place,\n timelineItem -> 'place' ->> 'placeId' as placeId,\n (timelineItem -> 'place' ->> 'lastSaved') :: timestamptz as lastSaved\n from timelineItems\n where timelineItem ->> 'place' is not null),\n places_with_max_last_saved as (select place -> 'placeId' as placeId,\n max((place ->> 'lastSaved') :: timestamptz) as latest_last_saved\n from places\n group by place -> 'placeId'),\n latest_places as (select places.*\n from places_with_max_last_saved\n inner join places on places.place -> 'placeId' = places_with_max_last_saved.placeId and\n places.lastSaved =\n places_with_max_last_saved.latest_last_saved)\ninsert\ninto public.place (place_id, json, last_saved, server_last_updated)\nselect (placeId :: uuid) as place_id, place as json, lastSaved as last_saved, now() as server_last_updated\nfrom latest_places\non conflict (place_id) do update set json = excluded.json,\n last_saved = excluded.last_saved,\n server_last_updated = excluded.server_last_updated\nwhere excluded.last_saved > public.place.last_saved;\n",
"describe": {
"columns": [],
"parameters": {
"Left": [
"TextArray"
]
},
"nullable": []
},
"hash": "f0402ce4a5c93837f39559bfb3358a7655f178080465f7abd7f7702d42474157"
}

View File

@ -2,5 +2,4 @@
fn main() { fn main() {
// trigger recompilation when a new migration is added // trigger recompilation when a new migration is added
println!("cargo:rerun-if-changed=migrations"); println!("cargo:rerun-if-changed=migrations");
println!("cargo:rustc-env=DATABASE_URL=postgres://joshuacoles@localhost/arc_test");
} }

View File

@ -1,6 +1,5 @@
with timelineItems as (select jsonb_array_elements(raw_files.json -> 'timelineItems') as timelineItem with timelineItems as (select jsonb_array_elements(raw_files.json -> 'timelineItems') as timelineItem
from raw_files from raw_files),
where date = ANY ($1)),
places as (select distinct on (md5(timelineItem ->> 'place' :: text)) timelineItem -> 'place' as place, places as (select distinct on (md5(timelineItem ->> 'place' :: text)) timelineItem -> 'place' as place,
timelineItem -> 'place' ->> 'placeId' as placeId, timelineItem -> 'place' ->> 'placeId' as placeId,
(timelineItem -> 'place' ->> 'lastSaved') :: timestamptz as lastSaved (timelineItem -> 'place' ->> 'lastSaved') :: timestamptz as lastSaved

View File

@ -1,6 +1,5 @@
with timelineItems as (select jsonb_array_elements(raw_files.json -> 'timelineItems') as timelineItem with timelineItems as (select jsonb_array_elements(raw_files.json -> 'timelineItems') as timelineItem
from raw_files from raw_files),
where date = ANY ($1)),
max_last_saved as (select timelineItem ->> 'itemId' as itemId, max_last_saved as (select timelineItem ->> 'itemId' as itemId,
max((timelineItem ->> 'lastSaved') :: timestamptz) as latest_last_saved max((timelineItem ->> 'lastSaved') :: timestamptz) as latest_last_saved
from timelineItems from timelineItems

View File

@ -5,9 +5,8 @@ use std::path::PathBuf;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use clap::Parser; use clap::Parser;
use itertools::Itertools; use itertools::Itertools;
use sqlx::{Executor, FromRow, PgPool, Pool, Postgres}; use sqlx::FromRow;
use serde_json::Value; use serde_json::Value;
use tracing::instrument;
use uuid::Uuid; use uuid::Uuid;
#[derive(Parser, Debug)] #[derive(Parser, Debug)]
@ -19,9 +18,6 @@ struct Cli {
/// psql connection string /// psql connection string
#[arg(long, short, env = "ARC_DB")] #[arg(long, short, env = "ARC_DB")]
conn: String, conn: String,
#[arg(long)]
sql_only: bool,
} }
#[derive(Debug, FromRow)] #[derive(Debug, FromRow)]
@ -87,8 +83,7 @@ fn hash_files(files: ReadDir) -> impl Iterator<Item=(String, PathBuf, String)> {
}) })
} }
#[instrument(skip(db, files))] async fn find_updated(db: &sqlx::PgPool, files: ReadDir) -> Vec<UpdatedFile> {
async fn find_updated(db: &PgPool, files: ReadDir) -> Vec<UpdatedFile> {
let date_hashes: Vec<DateHash> = sqlx::query_as("SELECT date, sha256 FROM raw_files") let date_hashes: Vec<DateHash> = sqlx::query_as("SELECT date, sha256 FROM raw_files")
.fetch_all(db) .fetch_all(db)
.await .await
@ -101,9 +96,6 @@ async fn find_updated(db: &PgPool, files: ReadDir) -> Vec<UpdatedFile> {
let new_hashes = hash_files(files); let new_hashes = hash_files(files);
new_hashes.filter_map(|(date, path, new_hash)| { new_hashes.filter_map(|(date, path, new_hash)| {
let span = tracing::span!(tracing::Level::DEBUG, "considering_file", path = ?path, date = ?date);
let _enter = span.enter();
tracing::debug!("Considering file for updates {path:?} (read as date: {date})"); tracing::debug!("Considering file for updates {path:?} (read as date: {date})");
if let Some(existing_hash) = date_hashes.get(&date) { if let Some(existing_hash) = date_hashes.get(&date) {
if new_hash == *existing_hash { if new_hash == *existing_hash {
@ -137,7 +129,7 @@ async fn find_updated(db: &PgPool, files: ReadDir) -> Vec<UpdatedFile> {
} }
#[tokio::main] #[tokio::main]
async fn main() -> anyhow::Result<()> { async fn main() {
tracing_subscriber::fmt::init(); tracing_subscriber::fmt::init();
let cli = Cli::parse(); let cli = Cli::parse();
@ -157,43 +149,17 @@ async fn main() -> anyhow::Result<()> {
let need_refresh = find_updated(&db, files) let need_refresh = find_updated(&db, files)
.await; .await;
upload_files(&db, &need_refresh) // Refresh the database with the new files
.await?; for updated_file in &need_refresh {
sqlx::query("INSERT INTO raw_files (date, sha256, json) VALUES ($1, $2, $3 :: jsonb) ON CONFLICT (date) DO UPDATE SET sha256 = excluded.sha256, json = excluded.json")
if cli.sql_only { .bind(&updated_file.date)
update_data_sql(&db, need_refresh) .bind(&updated_file.sha256)
.await?; .bind(&serde_json::to_value(&updated_file.json).unwrap())
} else { .execute(&db)
update_data(&db, need_refresh) .await
.await?; .unwrap();
} }
Ok(())
}
#[instrument(skip(db, need_refresh))]
async fn update_data_sql(db: &PgPool, need_refresh: Vec<UpdatedFile>) -> anyhow::Result<()> {
let vec = need_refresh.iter()
.map(|d| d.date.clone())
.collect_vec();
let result = sqlx::query_file!("functions/update_places.sql", &vec)
.execute(db)
.await?;
tracing::info!("Updated {} places", result.rows_affected());
let updated = sqlx::query_file!("functions/update_timeline_items.sql", &vec)
.fetch_all(db)
.await?;
tracing::info!("Updated {} timeline items", updated.len());
Ok(())
}
#[instrument(skip(db, need_refresh))]
async fn update_data(db: &Pool<Postgres>, need_refresh: Vec<UpdatedFile>) -> anyhow::Result<()> {
// Take all the changed files' timeline items, and group them by item_id, then take the latest one. // Take all the changed files' timeline items, and group them by item_id, then take the latest one.
// If we are needing to update the database it will be with this one. // If we are needing to update the database it will be with this one.
let possibly_timeline_items = need_refresh.into_iter() let possibly_timeline_items = need_refresh.into_iter()
@ -210,8 +176,9 @@ async fn update_data(db: &Pool<Postgres>, need_refresh: Vec<UpdatedFile>) -> any
let existing_last_saved_at_map = sqlx::query_as("SELECT item_id, end_date, last_saved FROM timeline_item where item_id = ANY($1)") let existing_last_saved_at_map = sqlx::query_as("SELECT item_id, end_date, last_saved FROM timeline_item where item_id = ANY($1)")
.bind(&possibly_timeline_item_ids) .bind(&possibly_timeline_item_ids)
.fetch_all(db) .fetch_all(&db)
.await? .await
.unwrap()
.into_iter() .into_iter()
.map(|row: TimelineItemUpdatedCheckRow| (row.item_id, row.last_saved)) .map(|row: TimelineItemUpdatedCheckRow| (row.item_id, row.last_saved))
.collect::<HashMap<_, _>>(); .collect::<HashMap<_, _>>();
@ -232,37 +199,18 @@ async fn update_data(db: &Pool<Postgres>, need_refresh: Vec<UpdatedFile>) -> any
if let Some(place) = &updated_timeline_item.place { if let Some(place) = &updated_timeline_item.place {
sqlx::query("INSERT INTO place (place_id, json, last_saved, server_last_updated) VALUES ($1, $2 :: jsonb, $3, now()) ON CONFLICT (place_id) DO UPDATE SET json = $2 :: jsonb, last_saved = $3, server_last_updated = now()") sqlx::query("INSERT INTO place (place_id, json, last_saved, server_last_updated) VALUES ($1, $2 :: jsonb, $3, now()) ON CONFLICT (place_id) DO UPDATE SET json = $2 :: jsonb, last_saved = $3, server_last_updated = now()")
.bind(&place.place_id) .bind(&place.place_id)
.bind(&serde_json::to_value(&place)?) .bind(&serde_json::to_value(&place).unwrap())
.bind(&place.last_saved) .bind(&place.last_saved)
.execute(db) .execute(&db).await.unwrap();
.await?;
} }
// Then we can insert/update the timeline item. // Then we can insert/update the timeline item.
sqlx::query("INSERT INTO timeline_item (item_id, json, place_id, end_date, last_saved, server_last_updated) VALUES ($1, $2 :: jsonb, $3, $4, $5, now()) ON CONFLICT (item_id) DO UPDATE SET json = $2 :: jsonb, place_id = $3, end_date = $4, last_saved = $5, server_last_updated = now()") sqlx::query("INSERT INTO timeline_item (item_id, json, place_id, end_date, last_saved, server_last_updated) VALUES ($1, $2 :: jsonb, $3, $4, $5, now()) ON CONFLICT (item_id) DO UPDATE SET json = $2 :: jsonb, place_id = $3, end_date = $4, last_saved = $5, server_last_updated = now()")
.bind(&updated_timeline_item.item_id) .bind(&updated_timeline_item.item_id)
.bind(&serde_json::to_value(&updated_timeline_item)?) .bind(&serde_json::to_value(&updated_timeline_item).unwrap())
.bind(&updated_timeline_item.place.map(|place| place.place_id)) .bind(&updated_timeline_item.place.map(|place| place.place_id))
.bind(&updated_timeline_item.end_date) .bind(&updated_timeline_item.end_date)
.bind(&updated_timeline_item.last_saved) .bind(&updated_timeline_item.last_saved)
.execute(db) .execute(&db).await.unwrap();
.await?;
} }
Ok(())
}
#[tracing::instrument(skip(db, need_refresh), err)]
async fn upload_files(db: &PgPool, need_refresh: &[UpdatedFile]) -> anyhow::Result<()> {
// Refresh the database with the new files
for updated_file in need_refresh {
sqlx::query("INSERT INTO raw_files (date, sha256, json) VALUES ($1, $2, $3 :: jsonb) ON CONFLICT (date) DO UPDATE SET sha256 = excluded.sha256, json = excluded.json")
.bind(&updated_file.date)
.bind(&updated_file.sha256)
.bind(&serde_json::to_value(&updated_file.json)?)
.execute(db)
.await?;
}
Ok(())
} }