Compare commits
3 Commits
56a60d969d
...
e73123b7f9
| Author | SHA1 | Date | |
|---|---|---|---|
| e73123b7f9 | |||
| ef15a630d6 | |||
| 56829cfa58 |
2
.github/workflows/build.yml
vendored
2
.github/workflows/build.yml
vendored
@ -45,6 +45,8 @@ jobs:
|
||||
|
||||
- name: Build release binary
|
||||
run: cargo build --release
|
||||
env:
|
||||
SQLX_OFFLINE: true
|
||||
|
||||
- name: Upload binary
|
||||
uses: actions/upload-artifact@v3
|
||||
|
||||
22
.sqlx/query-ac15bfcd1737751e27388ffddfe8ec47fd7277be74c47a59484e4fc993671d43.json
generated
Normal file
22
.sqlx/query-ac15bfcd1737751e27388ffddfe8ec47fd7277be74c47a59484e4fc993671d43.json
generated
Normal file
@ -0,0 +1,22 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "with timelineItems as (select jsonb_array_elements(raw_files.json -> 'timelineItems') as timelineItem\n from raw_files\n where date = ANY ($1)),\n max_last_saved as (select timelineItem ->> 'itemId' as itemId,\n max((timelineItem ->> 'lastSaved') :: timestamptz) as latest_last_saved\n from timelineItems\n group by timelineItem ->> 'itemId'),\n unique_timline_items as (select distinct on (max_last_saved.itemId) *\n from max_last_saved\n inner join timelineItems\n on timelineItems.timelineItem ->> 'itemId' = max_last_saved.itemId\n and (timelineItems.timelineItem ->> 'lastSaved') :: timestamptz =\n max_last_saved.latest_last_saved)\ninsert\ninto public.timeline_item (item_id, json, place_id, end_date, last_saved, server_last_updated)\nselect unique_timline_items.itemId :: uuid as item_id,\n unique_timline_items.timelineItem as json,\n (unique_timline_items.timelineItem -> 'place' ->> 'placeId') :: uuid as place_id,\n (unique_timline_items.timelineItem ->> 'endDate') :: timestamptz as end_date,\n unique_timline_items.latest_last_saved :: timestamptz as last_saved,\n now() as server_last_updated\nfrom unique_timline_items\non conflict (item_id) do update set json = excluded.json,\n place_id = excluded.place_id,\n end_date = excluded.end_date,\n last_saved = excluded.last_saved,\n server_last_updated = excluded.server_last_updated\nwhere excluded.last_saved > public.timeline_item.last_saved\nreturning item_id;\n",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "item_id",
|
||||
"type_info": "Uuid"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"TextArray"
|
||||
]
|
||||
},
|
||||
"nullable": [
|
||||
false
|
||||
]
|
||||
},
|
||||
"hash": "ac15bfcd1737751e27388ffddfe8ec47fd7277be74c47a59484e4fc993671d43"
|
||||
}
|
||||
14
.sqlx/query-f0402ce4a5c93837f39559bfb3358a7655f178080465f7abd7f7702d42474157.json
generated
Normal file
14
.sqlx/query-f0402ce4a5c93837f39559bfb3358a7655f178080465f7abd7f7702d42474157.json
generated
Normal file
@ -0,0 +1,14 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "with timelineItems as (select jsonb_array_elements(raw_files.json -> 'timelineItems') as timelineItem\n from raw_files\n where date = ANY ($1)),\n places as (select distinct on (md5(timelineItem ->> 'place' :: text)) timelineItem -> 'place' as place,\n timelineItem -> 'place' ->> 'placeId' as placeId,\n (timelineItem -> 'place' ->> 'lastSaved') :: timestamptz as lastSaved\n from timelineItems\n where timelineItem ->> 'place' is not null),\n places_with_max_last_saved as (select place -> 'placeId' as placeId,\n max((place ->> 'lastSaved') :: timestamptz) as latest_last_saved\n from places\n group by place -> 'placeId'),\n latest_places as (select places.*\n from places_with_max_last_saved\n inner join places on places.place -> 'placeId' = places_with_max_last_saved.placeId and\n places.lastSaved =\n places_with_max_last_saved.latest_last_saved)\ninsert\ninto public.place (place_id, json, last_saved, server_last_updated)\nselect (placeId :: uuid) as place_id, place as json, lastSaved as last_saved, now() as server_last_updated\nfrom latest_places\non conflict (place_id) do update set json = excluded.json,\n last_saved = excluded.last_saved,\n server_last_updated = excluded.server_last_updated\nwhere excluded.last_saved > public.place.last_saved;\n",
|
||||
"describe": {
|
||||
"columns": [],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"TextArray"
|
||||
]
|
||||
},
|
||||
"nullable": []
|
||||
},
|
||||
"hash": "f0402ce4a5c93837f39559bfb3358a7655f178080465f7abd7f7702d42474157"
|
||||
}
|
||||
1
build.rs
1
build.rs
@ -2,4 +2,5 @@
|
||||
fn main() {
|
||||
// trigger recompilation when a new migration is added
|
||||
println!("cargo:rerun-if-changed=migrations");
|
||||
println!("cargo:rustc-env=DATABASE_URL=postgres://joshuacoles@localhost/arc_test");
|
||||
}
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
with timelineItems as (select jsonb_array_elements(raw_files.json -> 'timelineItems') as timelineItem
|
||||
from raw_files),
|
||||
from raw_files
|
||||
where date = ANY ($1)),
|
||||
places as (select distinct on (md5(timelineItem ->> 'place' :: text)) timelineItem -> 'place' as place,
|
||||
timelineItem -> 'place' ->> 'placeId' as placeId,
|
||||
(timelineItem -> 'place' ->> 'lastSaved') :: timestamptz as lastSaved
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
with timelineItems as (select jsonb_array_elements(raw_files.json -> 'timelineItems') as timelineItem
|
||||
from raw_files),
|
||||
from raw_files
|
||||
where date = ANY ($1)),
|
||||
max_last_saved as (select timelineItem ->> 'itemId' as itemId,
|
||||
max((timelineItem ->> 'lastSaved') :: timestamptz) as latest_last_saved
|
||||
from timelineItems
|
||||
|
||||
90
src/main.rs
90
src/main.rs
@ -5,8 +5,9 @@ use std::path::PathBuf;
|
||||
use chrono::{DateTime, Utc};
|
||||
use clap::Parser;
|
||||
use itertools::Itertools;
|
||||
use sqlx::FromRow;
|
||||
use sqlx::{Executor, FromRow, PgPool, Pool, Postgres};
|
||||
use serde_json::Value;
|
||||
use tracing::instrument;
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
@ -18,6 +19,9 @@ struct Cli {
|
||||
/// psql connection string
|
||||
#[arg(long, short, env = "ARC_DB")]
|
||||
conn: String,
|
||||
|
||||
#[arg(long)]
|
||||
sql_only: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, FromRow)]
|
||||
@ -83,7 +87,8 @@ fn hash_files(files: ReadDir) -> impl Iterator<Item=(String, PathBuf, String)> {
|
||||
})
|
||||
}
|
||||
|
||||
async fn find_updated(db: &sqlx::PgPool, files: ReadDir) -> Vec<UpdatedFile> {
|
||||
#[instrument(skip(db, files))]
|
||||
async fn find_updated(db: &PgPool, files: ReadDir) -> Vec<UpdatedFile> {
|
||||
let date_hashes: Vec<DateHash> = sqlx::query_as("SELECT date, sha256 FROM raw_files")
|
||||
.fetch_all(db)
|
||||
.await
|
||||
@ -96,6 +101,9 @@ async fn find_updated(db: &sqlx::PgPool, files: ReadDir) -> Vec<UpdatedFile> {
|
||||
let new_hashes = hash_files(files);
|
||||
|
||||
new_hashes.filter_map(|(date, path, new_hash)| {
|
||||
let span = tracing::span!(tracing::Level::DEBUG, "considering_file", path = ?path, date = ?date);
|
||||
let _enter = span.enter();
|
||||
|
||||
tracing::debug!("Considering file for updates {path:?} (read as date: {date})");
|
||||
if let Some(existing_hash) = date_hashes.get(&date) {
|
||||
if new_hash == *existing_hash {
|
||||
@ -129,7 +137,7 @@ async fn find_updated(db: &sqlx::PgPool, files: ReadDir) -> Vec<UpdatedFile> {
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
tracing_subscriber::fmt::init();
|
||||
|
||||
let cli = Cli::parse();
|
||||
@ -149,17 +157,43 @@ async fn main() {
|
||||
let need_refresh = find_updated(&db, files)
|
||||
.await;
|
||||
|
||||
// Refresh the database with the new files
|
||||
for updated_file in &need_refresh {
|
||||
sqlx::query("INSERT INTO raw_files (date, sha256, json) VALUES ($1, $2, $3 :: jsonb) ON CONFLICT (date) DO UPDATE SET sha256 = excluded.sha256, json = excluded.json")
|
||||
.bind(&updated_file.date)
|
||||
.bind(&updated_file.sha256)
|
||||
.bind(&serde_json::to_value(&updated_file.json).unwrap())
|
||||
.execute(&db)
|
||||
.await
|
||||
.unwrap();
|
||||
upload_files(&db, &need_refresh)
|
||||
.await?;
|
||||
|
||||
if cli.sql_only {
|
||||
update_data_sql(&db, need_refresh)
|
||||
.await?;
|
||||
} else {
|
||||
update_data(&db, need_refresh)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip(db, need_refresh))]
|
||||
async fn update_data_sql(db: &PgPool, need_refresh: Vec<UpdatedFile>) -> anyhow::Result<()> {
|
||||
let vec = need_refresh.iter()
|
||||
.map(|d| d.date.clone())
|
||||
.collect_vec();
|
||||
|
||||
let result = sqlx::query_file!("functions/update_places.sql", &vec)
|
||||
.execute(db)
|
||||
.await?;
|
||||
|
||||
tracing::info!("Updated {} places", result.rows_affected());
|
||||
|
||||
let updated = sqlx::query_file!("functions/update_timeline_items.sql", &vec)
|
||||
.fetch_all(db)
|
||||
.await?;
|
||||
|
||||
tracing::info!("Updated {} timeline items", updated.len());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip(db, need_refresh))]
|
||||
async fn update_data(db: &Pool<Postgres>, need_refresh: Vec<UpdatedFile>) -> anyhow::Result<()> {
|
||||
// Take all the changed files' timeline items, and group them by item_id, then take the latest one.
|
||||
// If we are needing to update the database it will be with this one.
|
||||
let possibly_timeline_items = need_refresh.into_iter()
|
||||
@ -176,9 +210,8 @@ async fn main() {
|
||||
|
||||
let existing_last_saved_at_map = sqlx::query_as("SELECT item_id, end_date, last_saved FROM timeline_item where item_id = ANY($1)")
|
||||
.bind(&possibly_timeline_item_ids)
|
||||
.fetch_all(&db)
|
||||
.await
|
||||
.unwrap()
|
||||
.fetch_all(db)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|row: TimelineItemUpdatedCheckRow| (row.item_id, row.last_saved))
|
||||
.collect::<HashMap<_, _>>();
|
||||
@ -199,18 +232,37 @@ async fn main() {
|
||||
if let Some(place) = &updated_timeline_item.place {
|
||||
sqlx::query("INSERT INTO place (place_id, json, last_saved, server_last_updated) VALUES ($1, $2 :: jsonb, $3, now()) ON CONFLICT (place_id) DO UPDATE SET json = $2 :: jsonb, last_saved = $3, server_last_updated = now()")
|
||||
.bind(&place.place_id)
|
||||
.bind(&serde_json::to_value(&place).unwrap())
|
||||
.bind(&serde_json::to_value(&place)?)
|
||||
.bind(&place.last_saved)
|
||||
.execute(&db).await.unwrap();
|
||||
.execute(db)
|
||||
.await?;
|
||||
}
|
||||
|
||||
// Then we can insert/update the timeline item.
|
||||
sqlx::query("INSERT INTO timeline_item (item_id, json, place_id, end_date, last_saved, server_last_updated) VALUES ($1, $2 :: jsonb, $3, $4, $5, now()) ON CONFLICT (item_id) DO UPDATE SET json = $2 :: jsonb, place_id = $3, end_date = $4, last_saved = $5, server_last_updated = now()")
|
||||
.bind(&updated_timeline_item.item_id)
|
||||
.bind(&serde_json::to_value(&updated_timeline_item).unwrap())
|
||||
.bind(&serde_json::to_value(&updated_timeline_item)?)
|
||||
.bind(&updated_timeline_item.place.map(|place| place.place_id))
|
||||
.bind(&updated_timeline_item.end_date)
|
||||
.bind(&updated_timeline_item.last_saved)
|
||||
.execute(&db).await.unwrap();
|
||||
.execute(db)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(db, need_refresh), err)]
|
||||
async fn upload_files(db: &PgPool, need_refresh: &[UpdatedFile]) -> anyhow::Result<()> {
|
||||
// Refresh the database with the new files
|
||||
for updated_file in need_refresh {
|
||||
sqlx::query("INSERT INTO raw_files (date, sha256, json) VALUES ($1, $2, $3 :: jsonb) ON CONFLICT (date) DO UPDATE SET sha256 = excluded.sha256, json = excluded.json")
|
||||
.bind(&updated_file.date)
|
||||
.bind(&updated_file.sha256)
|
||||
.bind(&serde_json::to_value(&updated_file.json)?)
|
||||
.execute(db)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user