From 32db8446b58e229159dade08121ffa2222d3e68e Mon Sep 17 00:00:00 2001 From: Joshua Coles Date: Sun, 10 Mar 2024 18:56:40 +0000 Subject: [PATCH] Add last_saved and related fields to the database --- Cargo.toml | 2 +- DATA_OBSERVATIONS.md | 29 ++++++++++++ ...240310183925_add_last_saved_timestamps.sql | 32 ++++++++++++++ src/main.rs | 44 ++++++++++++------- 4 files changed, 91 insertions(+), 16 deletions(-) create mode 100644 DATA_OBSERVATIONS.md create mode 100644 migrations/20240310183925_add_last_saved_timestamps.sql diff --git a/Cargo.toml b/Cargo.toml index be1334d..ad8b927 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,7 @@ license = "MIT OR Apache-2.0" [dependencies] clap = { version = "4.4.18", features = ["derive", "env"] } -sqlx = { version = "0.7.3", features = ["json", "runtime-tokio-rustls", "postgres", "chrono", 'uuid'] } +sqlx = { version = "0.7.3", features = ["json", "runtime-tokio-rustls", "postgres", "chrono", "uuid"] } tokio = { version = "1", features = ["full"] } flate2 = "1" thiserror = "1" diff --git a/DATA_OBSERVATIONS.md b/DATA_OBSERVATIONS.md new file mode 100644 index 0000000..b5e97de --- /dev/null +++ b/DATA_OBSERVATIONS.md @@ -0,0 +1,29 @@ +# Repeated Entries + +```postgresql +with timeline_items as (select jsonb_array_elements(json -> 'timelineItems') - 'samples' as entry + from raw_files) +select entry ->> 'itemId' as item_id, + entry ->> 'lastSaved' as last_saved, + entry ->> 'endDate' as end_date, + entry ->> 'startDate' as start_date +from timeline_items +WHERE entry ->> 'itemId' IN (SELECT entry ->> 'itemId' + FROM timeline_items + GROUP BY entry ->> 'itemId' + HAVING COUNT(entry ->> 'itemId') > 1); +``` + +```postgresql +select md5(rfe.entry) +from (select jsonb_array_elements(json -> 'timelineItems') - 'samples' as entry + from raw_files) as rfe +where rfe.entry ->> 'itemId' = '01300b32-a3f7-4911-ab63-a0cbf0f312e0'; +``` + +```csv +c7e2a142aabdcdddd00143da6612c235 +c7e2a142aabdcdddd00143da6612c235 +``` + +running this query we see that the JSON content of the repeated entry is the same so it can be discounted. diff --git a/migrations/20240310183925_add_last_saved_timestamps.sql b/migrations/20240310183925_add_last_saved_timestamps.sql new file mode 100644 index 0000000..4a9f3cf --- /dev/null +++ b/migrations/20240310183925_add_last_saved_timestamps.sql @@ -0,0 +1,32 @@ +-- Add last_saved columns + +-- timeline_item +alter table timeline_item + add column last_saved timestamptz; + +update timeline_item +set last_saved = (json ->> 'lastSaved') :: timestamptz +where last_saved is null; + +alter table timeline_item + alter column last_saved set not null; + +-- place + +alter table place + add column last_saved timestamptz; + +update place +set last_saved = (json ->> 'lastSaved') :: timestamptz +where last_saved is null; + +alter table place + alter column last_saved set not null; + +-- Add server_last_updated columns + +alter table timeline_item + add column server_last_updated timestamptz not null default now(); + +alter table place + add column server_last_updated timestamptz not null default now(); diff --git a/src/main.rs b/src/main.rs index 5c32437..370a126 100644 --- a/src/main.rs +++ b/src/main.rs @@ -13,7 +13,7 @@ use uuid::Uuid; #[derive(Parser, Debug)] struct Cli { - /// Root of the arc iCloud directory + /// Root of the Arc iCloud directory #[arg(long, short, env = "ARC_ROOT")] root: PathBuf, @@ -31,7 +31,7 @@ struct DateHash { #[derive(Debug, FromRow)] struct TimelineItemUpdatedCheckRow { item_id: Uuid, - end_date: DateTime, + last_saved: DateTime, } #[derive(Debug)] @@ -53,6 +53,7 @@ struct TimelineItem { item_id: Uuid, place: Option, end_date: DateTime, + last_saved: DateTime, #[serde(flatten)] rest: HashMap, @@ -62,6 +63,7 @@ struct TimelineItem { #[serde(rename_all = "camelCase")] struct Place { place_id: Uuid, + last_saved: DateTime, #[serde(flatten)] rest: HashMap, @@ -79,9 +81,12 @@ async fn find_updated(db: &sqlx::PgPool, files: ReadDir) -> Vec { let date_hashes = &date_hashes; + // Find the files that need to be refreshed, based on the sha256 hash of the file compared to + // the hash stored in the database. let need_refresh = files.into_iter() .map(|f| f.unwrap().path()) .map(|path| async move { + // Extract the date from the file name let date = { let file_name = path.file_name().unwrap().to_str().unwrap(); let i = file_name.find('.').unwrap(); @@ -123,9 +128,7 @@ async fn find_updated(db: &sqlx::PgPool, files: ReadDir) -> Vec { #[tokio::main] async fn main() { let cli = Cli::parse(); - - let root = cli.root; - let daily_exports = root.join("Export/JSON/Daily"); + let daily_exports = cli.root.join("Export/JSON/Daily"); let files = std::fs::read_dir(daily_exports).unwrap(); let db = sqlx::PgPool::connect(&cli.conn) @@ -145,7 +148,9 @@ async fn main() { .bind(&updated_file.date) .bind(&updated_file.sha256) .bind(&serde_json::to_value(&updated_file.json).unwrap()) - .execute(&db).await.unwrap(); + .execute(&db) + .await + .unwrap(); } // Take all the changed files' timeline items, and group them by item_id, then take the latest one. @@ -154,42 +159,51 @@ async fn main() { .flat_map(|f| f.json.timeline_items) .group_by(|item| item.item_id.clone()) .into_iter() - .map(|(_, items)| items.max_by_key(|item| item.end_date).unwrap()).collect_vec(); + .map(|(_, items)| items.max_by_key(|item| item.last_saved).unwrap()) + .collect_vec(); - let possibly_timeline_item_ids = possibly_timeline_items.iter().map(|item| item.item_id.clone()).collect::>(); + let possibly_timeline_item_ids = possibly_timeline_items + .iter() + .map(|item| item.item_id.clone()) + .collect::>(); - let existing = sqlx::query_as("SELECT item_id, end_date FROM timeline_item where item_id = ANY($1)") + let existing_last_saved_at_map = sqlx::query_as("SELECT item_id, end_date, last_saved FROM timeline_item where item_id = ANY($1)") .bind(&possibly_timeline_item_ids) .fetch_all(&db) .await .unwrap() .into_iter() - .map(|row: TimelineItemUpdatedCheckRow| (row.item_id, row.end_date)) + .map(|row: TimelineItemUpdatedCheckRow| (row.item_id, row.last_saved)) .collect::>(); for updated_timeline_item in possibly_timeline_items { - // If we already have a timeline item with this id, and it has a later end date, we don't need to update it. - if let Some(existing_end_date) = existing.get(&updated_timeline_item.item_id) { - if *existing_end_date >= updated_timeline_item.end_date { + // The contents of duplicated entries appears to be identical, so we only check to see if + // the last_saved date is newer than the one we have in the database. + if let Some(existing_last_saved_at) = existing_last_saved_at_map.get(&updated_timeline_item.item_id) { + if updated_timeline_item.last_saved <= *existing_last_saved_at { continue; } } // We have a new or updated timeline item, we need to insert its associated place first if // it exists. + // ASSUMPTION: This assumes that all place changes as associated with the timeline item + // changes. Is this true? if let Some(place) = &updated_timeline_item.place { - sqlx::query("INSERT INTO place (place_id, json) VALUES ($1, $2 :: jsonb) ON CONFLICT (place_id) DO UPDATE SET json = $2 :: jsonb") + sqlx::query("INSERT INTO place (place_id, json, last_saved, server_last_updated) VALUES ($1, $2 :: jsonb, $3, now()) ON CONFLICT (place_id) DO UPDATE SET json = $2 :: jsonb, last_saved = $3, server_last_updated = now()") .bind(&place.place_id) .bind(&serde_json::to_value(&place).unwrap()) + .bind(&place.last_saved) .execute(&db).await.unwrap(); } // Then we can insert/update the timeline item. - sqlx::query("INSERT INTO timeline_item (item_id, json, place_id, end_date) VALUES ($1, $2 :: jsonb, $3, $4) ON CONFLICT (item_id) DO UPDATE SET json = $2 :: jsonb, place_id = $3, end_date = $4") + sqlx::query("INSERT INTO timeline_item (item_id, json, place_id, end_date, last_saved, server_last_updated) VALUES ($1, $2 :: jsonb, $3, $4, $5, now()) ON CONFLICT (item_id) DO UPDATE SET json = $2 :: jsonb, place_id = $3, end_date = $4, last_saved = $5, server_last_updated = now()") .bind(&updated_timeline_item.item_id) .bind(&serde_json::to_value(&updated_timeline_item).unwrap()) .bind(&updated_timeline_item.place.map(|place| place.place_id)) .bind(&updated_timeline_item.end_date) + .bind(&updated_timeline_item.last_saved) .execute(&db).await.unwrap(); } }