From bcfa0c2d8914079e42e09d6daf8ed046f9a32c67 Mon Sep 17 00:00:00 2001 From: Joshua Coles Date: Thu, 4 Apr 2024 21:49:52 +0100 Subject: [PATCH] Simplify code to determine if files need to be changed and prep to move data transformation into PSQL --- Cargo.toml | 9 ++++ functions/update_places.sql | 24 +++++++++++ functions/update_timeline_items.sql | 28 ++++++++++++ src/main.rs | 67 +++++++++++++---------------- src/t1.rs | 7 +++ 5 files changed, 97 insertions(+), 38 deletions(-) create mode 100644 functions/update_places.sql create mode 100644 functions/update_timeline_items.sql create mode 100644 src/t1.rs diff --git a/Cargo.toml b/Cargo.toml index 2593a13..6177e2a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,15 @@ version = "0.1.1" edition = "2021" authors = ["Joshua Coles"] license = "MIT OR Apache-2.0" +default-run = "arc-ingester" + +[[bin]] +name = "arc-ingester" +path = "src/main.rs" + +[[bin]] +name = "t1" +path = "src/t1.rs" [dependencies] clap = { version = "4.4.18", features = ["derive", "env"] } diff --git a/functions/update_places.sql b/functions/update_places.sql new file mode 100644 index 0000000..9d339ec --- /dev/null +++ b/functions/update_places.sql @@ -0,0 +1,24 @@ +with timelineItems as (select jsonb_array_elements(raw_files.json -> 'timelineItems') as timelineItem + from raw_files), + places as (select distinct on (md5(timelineItem ->> 'place' :: text)) timelineItem -> 'place' as place, + timelineItem -> 'place' ->> 'placeId' as placeId, + (timelineItem -> 'place' ->> 'lastSaved') :: timestamptz as lastSaved + from timelineItems + where timelineItem ->> 'place' is not null), + places_with_max_last_saved as (select place -> 'placeId' as placeId, + max((place ->> 'lastSaved') :: timestamptz) as latest_last_saved + from places + group by place -> 'placeId'), + latest_places as (select places.* + from places_with_max_last_saved + inner join places on places.place -> 'placeId' = places_with_max_last_saved.placeId and + places.lastSaved = + places_with_max_last_saved.latest_last_saved) +insert +into public.place (place_id, json, last_saved, server_last_updated) +select (placeId :: uuid) as place_id, place as json, lastSaved as last_saved, now() as server_last_updated +from latest_places +on conflict (place_id) do update set json = excluded.json, + last_saved = excluded.last_saved, + server_last_updated = excluded.server_last_updated +where excluded.last_saved > public.place.last_saved; diff --git a/functions/update_timeline_items.sql b/functions/update_timeline_items.sql new file mode 100644 index 0000000..5265475 --- /dev/null +++ b/functions/update_timeline_items.sql @@ -0,0 +1,28 @@ +with timelineItems as (select jsonb_array_elements(raw_files.json -> 'timelineItems') as timelineItem + from raw_files), + max_last_saved as (select timelineItem ->> 'itemId' as itemId, + max((timelineItem ->> 'lastSaved') :: timestamptz) as latest_last_saved + from timelineItems + group by timelineItem ->> 'itemId'), + unique_timline_items as (select distinct on (max_last_saved.itemId) * + from max_last_saved + inner join timelineItems + on timelineItems.timelineItem ->> 'itemId' = max_last_saved.itemId + and (timelineItems.timelineItem ->> 'lastSaved') :: timestamptz = + max_last_saved.latest_last_saved) +insert +into public.timeline_item (item_id, json, place_id, end_date, last_saved, server_last_updated) +select unique_timline_items.itemId :: uuid as item_id, + unique_timline_items.timelineItem as json, + (unique_timline_items.timelineItem -> 'place' ->> 'placeId') :: uuid as place_id, + (unique_timline_items.timelineItem ->> 'endDate') :: timestamptz as end_date, + unique_timline_items.latest_last_saved :: timestamptz as last_saved, + now() as server_last_updated +from unique_timline_items +on conflict (item_id) do update set json = excluded.json, + place_id = excluded.place_id, + end_date = excluded.end_date, + last_saved = excluded.last_saved, + server_last_updated = excluded.server_last_updated +where excluded.last_saved > public.timeline_item.last_saved +returning item_id; diff --git a/src/main.rs b/src/main.rs index e25bfe5..c6570c8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,12 +2,10 @@ use std::collections::HashMap; use std::fs::ReadDir; use std::io::Read; use std::path::PathBuf; -use std::sync::Arc; use chrono::{DateTime, Utc}; use clap::Parser; use itertools::Itertools; use sqlx::FromRow; -use rayon::prelude::*; use serde_json::Value; use uuid::Uuid; @@ -69,58 +67,51 @@ struct Place { rest: HashMap, } -async fn find_updated(db: &sqlx::PgPool, files: ReadDir) -> Vec { - let date_hashes: Vec = sqlx::query_as("SELECT date, sha256 FROM raw_files") - .fetch_all(db) - .await - .unwrap(); - - let date_hashes: Arc> = Arc::new(date_hashes.into_iter() - .map(|dh| (dh.date, dh.sha256)) - .collect()); - - let date_hashes = &date_hashes; - - // Find the files that need to be refreshed, based on the sha256 hash of the file compared to - // the hash stored in the database. - let need_refresh = files.into_iter() +fn hash_files(files: ReadDir) -> impl Iterator { + files.into_iter() .map(|f| f.unwrap().path()) - .filter_map(|path| { - // Extract the date from the file name + .map(|path| { let date = { let file_name = path.file_name().unwrap().to_str().unwrap(); let i = file_name.find('.').unwrap(); &file_name[..i] }; - let current_hash = sha256::try_digest(&path).unwrap(); - let existing_hash = date_hashes.get(date); - - if let Some(existing_hash) = existing_hash { - if current_hash == *existing_hash { - return None; - } - } - - let bytes = std::fs::read(&path).unwrap(); - - return Some((date.to_string(), current_hash, bytes)); + let hash = sha256::try_digest(&path).unwrap(); + (date.to_string(), path, hash) }) - .collect_vec(); +} - let decompressed = need_refresh.par_iter().map(|(date, new_hash, bytes)| { +async fn find_updated(db: &sqlx::PgPool, files: ReadDir) -> Vec { + let date_hashes: Vec = sqlx::query_as("SELECT date, sha256 FROM raw_files") + .fetch_all(db) + .await + .unwrap(); + + let date_hashes: HashMap = date_hashes.into_iter() + .map(|dh| (dh.date, dh.sha256)) + .collect(); + + let new_hashes = hash_files(files); + + new_hashes.filter_map(|(date, path, new_hash)| { + if let Some(existing_hash) = date_hashes.get(&date) { + if new_hash == *existing_hash { + return None; + } + } + + let bytes = std::fs::read(&path).unwrap(); let mut decoder = flate2::bufread::GzDecoder::new(&bytes[..]); let mut string = String::new(); decoder.read_to_string(&mut string).unwrap(); - UpdatedFile { + Some(UpdatedFile { date: date.clone(), sha256: new_hash.clone(), json: serde_json::from_str(&string).unwrap(), - } - }).collect::>(); - - decompressed + }) + }).collect_vec() } #[tokio::main] diff --git a/src/t1.rs b/src/t1.rs new file mode 100644 index 0000000..7cf18de --- /dev/null +++ b/src/t1.rs @@ -0,0 +1,7 @@ +fn main() { + let files = std::fs::read_dir("/Users/joshuacoles/Library/Mobile Documents/iCloud~com~bigpaua~LearnerCoacher/Documents/Export/JSON/Daily/").unwrap(); + for file in files { + let file = file.unwrap().path(); + println!("{}\t{}", file.display(), sha256::try_digest(&file).unwrap()) + } +}