diff --git a/src/main.rs b/src/main.rs index 6a13de4..4fbcf96 100644 --- a/src/main.rs +++ b/src/main.rs @@ -68,7 +68,24 @@ struct Place { } async fn init_db(db: &sqlx::PgPool) { - db.execute("CREATE TABLE IF NOT EXISTS raw_files (date TEXT PRIMARY KEY not null, sha256 TEXT not null, json JSONB NOT NULL)") + let init_query = r#" + CREATE TABLE IF NOT EXISTS place ( + place_id UUID PRIMARY KEY not null, + json JSONB NOT NULL + ); + + CREATE TABLE IF NOT EXISTS timeline_item ( + item_id UUID PRIMARY KEY not null, + json JSONB NOT NULL, + place_id UUID, + end_date TIMESTAMP WITH TIME ZONE NOT NULL, + foreign key (place_id) references place (place_id) + ); + + CREATE TABLE IF NOT EXISTS raw_files (date TEXT PRIMARY KEY not null, sha256 TEXT not null, json JSONB NOT NULL) + "#; + + db.execute(init_query) .await .unwrap(); } @@ -140,56 +157,10 @@ async fn main() { init_db(&db).await; - let date_hashes: Vec = sqlx::query_as("SELECT date, sha256 FROM raw_files") - .fetch_all(&db) - .await - .unwrap(); + let need_refresh = find_updated(&db, files).await; - let date_hashes: Arc> = Arc::new(date_hashes.into_iter() - .map(|dh| (dh.date, dh.sha256)) - .collect()); - - let date_hashes = &date_hashes; - - let need_refresh = files.into_iter() - .map(|f| f.unwrap().path()) - .map(|path| async move { - let date = { - let file_name = path.file_name().unwrap().to_str().unwrap(); - let i = file_name.find('.').unwrap(); - &file_name[..i] - }; - - let bytes = tokio::fs::read(&path).await.unwrap(); - let current_hash = sha256::digest(&bytes); - let existing_hash = date_hashes.get(date); - - if let Some(existing_hash) = existing_hash { - if current_hash == *existing_hash { - return None; - } - } - - return Some((date.to_string(), current_hash, bytes)); - }); - - let need_refresh = futures::future::join_all(need_refresh).await.into_iter() - .filter_map(|x| x) - .collect::>(); - - let decompressed = need_refresh.par_iter().map(|(date, new_hash, bytes)| { - let mut decoder = flate2::bufread::GzDecoder::new(&bytes[..]); - let mut string = String::new(); - decoder.read_to_string(&mut string).unwrap(); - - UpdatedFile { - date: date.clone(), - sha256: new_hash.clone(), - json: serde_json::from_str(&string).unwrap(), - } - }).collect::>(); - - for updated_file in &decompressed { + // Refresh the database with the new files + for updated_file in &need_refresh { sqlx::query("INSERT INTO raw_files (date, sha256, json) VALUES ($1, $2, $3 :: jsonb) ON CONFLICT (date) DO UPDATE SET sha256 = $2, json = $3 :: jsonb") .bind(&updated_file.date) .bind(&updated_file.sha256) @@ -199,37 +170,33 @@ async fn main() { // Take all of the changed files' timeline items, and group them by item_id, then take the latest one. // If we are needing to updated the database it will be with this one. - let possibly_timeline_items = decompressed.into_iter() + let possibly_timeline_items = need_refresh.into_iter() .flat_map(|f| f.json.timeline_items) .group_by(|item| item.item_id.clone()) .into_iter() - .map(|(_, items)| { - items.max_by_key(|item| item.end_date).unwrap() - }).collect_vec(); + .map(|(_, items)| items.max_by_key(|item| item.end_date).unwrap()).collect_vec(); let possibly_timeline_item_ids = possibly_timeline_items.iter().map(|item| item.item_id.clone()).collect::>(); - let existing: Vec = sqlx::query_as("SELECT item_id, end_date FROM timeline_item where item_id = ANY($1)") + let existing = sqlx::query_as("SELECT item_id, end_date FROM timeline_item where item_id = ANY($1)") .bind(&possibly_timeline_item_ids) .fetch_all(&db) .await - .unwrap(); + .unwrap() + .into_iter() + .map(|row: TimelineItemUpdatedCheckRow| (row.item_id, row.end_date)) + .collect::>(); - let updated_timeline_items = possibly_timeline_items.into_iter().filter(|item| { - let existing = existing.iter().find(|e| e.item_id == item.item_id); - if let Some(existing) = existing { - if existing.end_date < item.end_date { - return true; + for updated_timeline_item in possibly_timeline_items { + // If we already have a timeline item with this id, and it has a later end date, we don't need to update it. + if let Some(existing_end_date) = existing.get(&updated_timeline_item.item_id) { + if *existing_end_date >= updated_timeline_item.end_date { + continue; } - } else { - return true; } - false - }); - - for updated_timeline_item in updated_timeline_items { - // First we need to insert the place if it doesn't exist + // We have a new or updated timeline item, we need to insert its associated place first if + // it exists. if let Some(place) = &updated_timeline_item.place { sqlx::query("INSERT INTO place (place_id, json) VALUES ($1, $2 :: jsonb) ON CONFLICT (place_id) DO UPDATE SET json = $2 :: jsonb") .bind(&place.place_id) @@ -237,6 +204,7 @@ async fn main() { .execute(&db).await.unwrap(); } + // Then we can insert/update the timeline item. sqlx::query("INSERT INTO timeline_item (item_id, json, place_id, end_date) VALUES ($1, $2 :: jsonb, $3, $4) ON CONFLICT (item_id) DO UPDATE SET json = $2 :: jsonb, place_id = $3, end_date = $4") .bind(&updated_timeline_item.item_id) .bind(&serde_json::to_value(&updated_timeline_item).unwrap())