Add last_saved and related fields to the database
This commit is contained in:
parent
057fa81e67
commit
32db8446b5
@ -7,7 +7,7 @@ license = "MIT OR Apache-2.0"
|
|||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
clap = { version = "4.4.18", features = ["derive", "env"] }
|
clap = { version = "4.4.18", features = ["derive", "env"] }
|
||||||
sqlx = { version = "0.7.3", features = ["json", "runtime-tokio-rustls", "postgres", "chrono", 'uuid'] }
|
sqlx = { version = "0.7.3", features = ["json", "runtime-tokio-rustls", "postgres", "chrono", "uuid"] }
|
||||||
tokio = { version = "1", features = ["full"] }
|
tokio = { version = "1", features = ["full"] }
|
||||||
flate2 = "1"
|
flate2 = "1"
|
||||||
thiserror = "1"
|
thiserror = "1"
|
||||||
|
|||||||
29
DATA_OBSERVATIONS.md
Normal file
29
DATA_OBSERVATIONS.md
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
# Repeated Entries
|
||||||
|
|
||||||
|
```postgresql
|
||||||
|
with timeline_items as (select jsonb_array_elements(json -> 'timelineItems') - 'samples' as entry
|
||||||
|
from raw_files)
|
||||||
|
select entry ->> 'itemId' as item_id,
|
||||||
|
entry ->> 'lastSaved' as last_saved,
|
||||||
|
entry ->> 'endDate' as end_date,
|
||||||
|
entry ->> 'startDate' as start_date
|
||||||
|
from timeline_items
|
||||||
|
WHERE entry ->> 'itemId' IN (SELECT entry ->> 'itemId'
|
||||||
|
FROM timeline_items
|
||||||
|
GROUP BY entry ->> 'itemId'
|
||||||
|
HAVING COUNT(entry ->> 'itemId') > 1);
|
||||||
|
```
|
||||||
|
|
||||||
|
```postgresql
|
||||||
|
select md5(rfe.entry)
|
||||||
|
from (select jsonb_array_elements(json -> 'timelineItems') - 'samples' as entry
|
||||||
|
from raw_files) as rfe
|
||||||
|
where rfe.entry ->> 'itemId' = '01300b32-a3f7-4911-ab63-a0cbf0f312e0';
|
||||||
|
```
|
||||||
|
|
||||||
|
```csv
|
||||||
|
c7e2a142aabdcdddd00143da6612c235
|
||||||
|
c7e2a142aabdcdddd00143da6612c235
|
||||||
|
```
|
||||||
|
|
||||||
|
running this query we see that the JSON content of the repeated entry is the same so it can be discounted.
|
||||||
32
migrations/20240310183925_add_last_saved_timestamps.sql
Normal file
32
migrations/20240310183925_add_last_saved_timestamps.sql
Normal file
@ -0,0 +1,32 @@
|
|||||||
|
-- Add last_saved columns
|
||||||
|
|
||||||
|
-- timeline_item
|
||||||
|
alter table timeline_item
|
||||||
|
add column last_saved timestamptz;
|
||||||
|
|
||||||
|
update timeline_item
|
||||||
|
set last_saved = (json ->> 'lastSaved') :: timestamptz
|
||||||
|
where last_saved is null;
|
||||||
|
|
||||||
|
alter table timeline_item
|
||||||
|
alter column last_saved set not null;
|
||||||
|
|
||||||
|
-- place
|
||||||
|
|
||||||
|
alter table place
|
||||||
|
add column last_saved timestamptz;
|
||||||
|
|
||||||
|
update place
|
||||||
|
set last_saved = (json ->> 'lastSaved') :: timestamptz
|
||||||
|
where last_saved is null;
|
||||||
|
|
||||||
|
alter table place
|
||||||
|
alter column last_saved set not null;
|
||||||
|
|
||||||
|
-- Add server_last_updated columns
|
||||||
|
|
||||||
|
alter table timeline_item
|
||||||
|
add column server_last_updated timestamptz not null default now();
|
||||||
|
|
||||||
|
alter table place
|
||||||
|
add column server_last_updated timestamptz not null default now();
|
||||||
44
src/main.rs
44
src/main.rs
@ -13,7 +13,7 @@ use uuid::Uuid;
|
|||||||
|
|
||||||
#[derive(Parser, Debug)]
|
#[derive(Parser, Debug)]
|
||||||
struct Cli {
|
struct Cli {
|
||||||
/// Root of the arc iCloud directory
|
/// Root of the Arc iCloud directory
|
||||||
#[arg(long, short, env = "ARC_ROOT")]
|
#[arg(long, short, env = "ARC_ROOT")]
|
||||||
root: PathBuf,
|
root: PathBuf,
|
||||||
|
|
||||||
@ -31,7 +31,7 @@ struct DateHash {
|
|||||||
#[derive(Debug, FromRow)]
|
#[derive(Debug, FromRow)]
|
||||||
struct TimelineItemUpdatedCheckRow {
|
struct TimelineItemUpdatedCheckRow {
|
||||||
item_id: Uuid,
|
item_id: Uuid,
|
||||||
end_date: DateTime<Utc>,
|
last_saved: DateTime<Utc>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
@ -53,6 +53,7 @@ struct TimelineItem {
|
|||||||
item_id: Uuid,
|
item_id: Uuid,
|
||||||
place: Option<Place>,
|
place: Option<Place>,
|
||||||
end_date: DateTime<Utc>,
|
end_date: DateTime<Utc>,
|
||||||
|
last_saved: DateTime<Utc>,
|
||||||
|
|
||||||
#[serde(flatten)]
|
#[serde(flatten)]
|
||||||
rest: HashMap<String, Value>,
|
rest: HashMap<String, Value>,
|
||||||
@ -62,6 +63,7 @@ struct TimelineItem {
|
|||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
struct Place {
|
struct Place {
|
||||||
place_id: Uuid,
|
place_id: Uuid,
|
||||||
|
last_saved: DateTime<Utc>,
|
||||||
|
|
||||||
#[serde(flatten)]
|
#[serde(flatten)]
|
||||||
rest: HashMap<String, Value>,
|
rest: HashMap<String, Value>,
|
||||||
@ -79,9 +81,12 @@ async fn find_updated(db: &sqlx::PgPool, files: ReadDir) -> Vec<UpdatedFile> {
|
|||||||
|
|
||||||
let date_hashes = &date_hashes;
|
let date_hashes = &date_hashes;
|
||||||
|
|
||||||
|
// Find the files that need to be refreshed, based on the sha256 hash of the file compared to
|
||||||
|
// the hash stored in the database.
|
||||||
let need_refresh = files.into_iter()
|
let need_refresh = files.into_iter()
|
||||||
.map(|f| f.unwrap().path())
|
.map(|f| f.unwrap().path())
|
||||||
.map(|path| async move {
|
.map(|path| async move {
|
||||||
|
// Extract the date from the file name
|
||||||
let date = {
|
let date = {
|
||||||
let file_name = path.file_name().unwrap().to_str().unwrap();
|
let file_name = path.file_name().unwrap().to_str().unwrap();
|
||||||
let i = file_name.find('.').unwrap();
|
let i = file_name.find('.').unwrap();
|
||||||
@ -123,9 +128,7 @@ async fn find_updated(db: &sqlx::PgPool, files: ReadDir) -> Vec<UpdatedFile> {
|
|||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() {
|
async fn main() {
|
||||||
let cli = Cli::parse();
|
let cli = Cli::parse();
|
||||||
|
let daily_exports = cli.root.join("Export/JSON/Daily");
|
||||||
let root = cli.root;
|
|
||||||
let daily_exports = root.join("Export/JSON/Daily");
|
|
||||||
let files = std::fs::read_dir(daily_exports).unwrap();
|
let files = std::fs::read_dir(daily_exports).unwrap();
|
||||||
|
|
||||||
let db = sqlx::PgPool::connect(&cli.conn)
|
let db = sqlx::PgPool::connect(&cli.conn)
|
||||||
@ -145,7 +148,9 @@ async fn main() {
|
|||||||
.bind(&updated_file.date)
|
.bind(&updated_file.date)
|
||||||
.bind(&updated_file.sha256)
|
.bind(&updated_file.sha256)
|
||||||
.bind(&serde_json::to_value(&updated_file.json).unwrap())
|
.bind(&serde_json::to_value(&updated_file.json).unwrap())
|
||||||
.execute(&db).await.unwrap();
|
.execute(&db)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Take all the changed files' timeline items, and group them by item_id, then take the latest one.
|
// Take all the changed files' timeline items, and group them by item_id, then take the latest one.
|
||||||
@ -154,42 +159,51 @@ async fn main() {
|
|||||||
.flat_map(|f| f.json.timeline_items)
|
.flat_map(|f| f.json.timeline_items)
|
||||||
.group_by(|item| item.item_id.clone())
|
.group_by(|item| item.item_id.clone())
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|(_, items)| items.max_by_key(|item| item.end_date).unwrap()).collect_vec();
|
.map(|(_, items)| items.max_by_key(|item| item.last_saved).unwrap())
|
||||||
|
.collect_vec();
|
||||||
|
|
||||||
let possibly_timeline_item_ids = possibly_timeline_items.iter().map(|item| item.item_id.clone()).collect::<Vec<_>>();
|
let possibly_timeline_item_ids = possibly_timeline_items
|
||||||
|
.iter()
|
||||||
|
.map(|item| item.item_id.clone())
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
let existing = sqlx::query_as("SELECT item_id, end_date FROM timeline_item where item_id = ANY($1)")
|
let existing_last_saved_at_map = sqlx::query_as("SELECT item_id, end_date, last_saved FROM timeline_item where item_id = ANY($1)")
|
||||||
.bind(&possibly_timeline_item_ids)
|
.bind(&possibly_timeline_item_ids)
|
||||||
.fetch_all(&db)
|
.fetch_all(&db)
|
||||||
.await
|
.await
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|row: TimelineItemUpdatedCheckRow| (row.item_id, row.end_date))
|
.map(|row: TimelineItemUpdatedCheckRow| (row.item_id, row.last_saved))
|
||||||
.collect::<HashMap<_, _>>();
|
.collect::<HashMap<_, _>>();
|
||||||
|
|
||||||
for updated_timeline_item in possibly_timeline_items {
|
for updated_timeline_item in possibly_timeline_items {
|
||||||
// If we already have a timeline item with this id, and it has a later end date, we don't need to update it.
|
// The contents of duplicated entries appears to be identical, so we only check to see if
|
||||||
if let Some(existing_end_date) = existing.get(&updated_timeline_item.item_id) {
|
// the last_saved date is newer than the one we have in the database.
|
||||||
if *existing_end_date >= updated_timeline_item.end_date {
|
if let Some(existing_last_saved_at) = existing_last_saved_at_map.get(&updated_timeline_item.item_id) {
|
||||||
|
if updated_timeline_item.last_saved <= *existing_last_saved_at {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// We have a new or updated timeline item, we need to insert its associated place first if
|
// We have a new or updated timeline item, we need to insert its associated place first if
|
||||||
// it exists.
|
// it exists.
|
||||||
|
// ASSUMPTION: This assumes that all place changes as associated with the timeline item
|
||||||
|
// changes. Is this true?
|
||||||
if let Some(place) = &updated_timeline_item.place {
|
if let Some(place) = &updated_timeline_item.place {
|
||||||
sqlx::query("INSERT INTO place (place_id, json) VALUES ($1, $2 :: jsonb) ON CONFLICT (place_id) DO UPDATE SET json = $2 :: jsonb")
|
sqlx::query("INSERT INTO place (place_id, json, last_saved, server_last_updated) VALUES ($1, $2 :: jsonb, $3, now()) ON CONFLICT (place_id) DO UPDATE SET json = $2 :: jsonb, last_saved = $3, server_last_updated = now()")
|
||||||
.bind(&place.place_id)
|
.bind(&place.place_id)
|
||||||
.bind(&serde_json::to_value(&place).unwrap())
|
.bind(&serde_json::to_value(&place).unwrap())
|
||||||
|
.bind(&place.last_saved)
|
||||||
.execute(&db).await.unwrap();
|
.execute(&db).await.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Then we can insert/update the timeline item.
|
// Then we can insert/update the timeline item.
|
||||||
sqlx::query("INSERT INTO timeline_item (item_id, json, place_id, end_date) VALUES ($1, $2 :: jsonb, $3, $4) ON CONFLICT (item_id) DO UPDATE SET json = $2 :: jsonb, place_id = $3, end_date = $4")
|
sqlx::query("INSERT INTO timeline_item (item_id, json, place_id, end_date, last_saved, server_last_updated) VALUES ($1, $2 :: jsonb, $3, $4, $5, now()) ON CONFLICT (item_id) DO UPDATE SET json = $2 :: jsonb, place_id = $3, end_date = $4, last_saved = $5, server_last_updated = now()")
|
||||||
.bind(&updated_timeline_item.item_id)
|
.bind(&updated_timeline_item.item_id)
|
||||||
.bind(&serde_json::to_value(&updated_timeline_item).unwrap())
|
.bind(&serde_json::to_value(&updated_timeline_item).unwrap())
|
||||||
.bind(&updated_timeline_item.place.map(|place| place.place_id))
|
.bind(&updated_timeline_item.place.map(|place| place.place_id))
|
||||||
.bind(&updated_timeline_item.end_date)
|
.bind(&updated_timeline_item.end_date)
|
||||||
|
.bind(&updated_timeline_item.last_saved)
|
||||||
.execute(&db).await.unwrap();
|
.execute(&db).await.unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user