Improve updates, perform proper normalisation

This commit is contained in:
Joshua Coles 2024-02-07 20:12:25 +00:00
parent 500488f6c4
commit 65664947f4

View File

@ -68,7 +68,24 @@ struct Place {
}
async fn init_db(db: &sqlx::PgPool) {
db.execute("CREATE TABLE IF NOT EXISTS raw_files (date TEXT PRIMARY KEY not null, sha256 TEXT not null, json JSONB NOT NULL)")
let init_query = r#"
CREATE TABLE IF NOT EXISTS place (
place_id UUID PRIMARY KEY not null,
json JSONB NOT NULL
);
CREATE TABLE IF NOT EXISTS timeline_item (
item_id UUID PRIMARY KEY not null,
json JSONB NOT NULL,
place_id UUID,
end_date TIMESTAMP WITH TIME ZONE NOT NULL,
foreign key (place_id) references place (place_id)
);
CREATE TABLE IF NOT EXISTS raw_files (date TEXT PRIMARY KEY not null, sha256 TEXT not null, json JSONB NOT NULL)
"#;
db.execute(init_query)
.await
.unwrap();
}
@ -140,56 +157,10 @@ async fn main() {
init_db(&db).await;
let date_hashes: Vec<DateHash> = sqlx::query_as("SELECT date, sha256 FROM raw_files")
.fetch_all(&db)
.await
.unwrap();
let need_refresh = find_updated(&db, files).await;
let date_hashes: Arc<HashMap<String, String>> = Arc::new(date_hashes.into_iter()
.map(|dh| (dh.date, dh.sha256))
.collect());
let date_hashes = &date_hashes;
let need_refresh = files.into_iter()
.map(|f| f.unwrap().path())
.map(|path| async move {
let date = {
let file_name = path.file_name().unwrap().to_str().unwrap();
let i = file_name.find('.').unwrap();
&file_name[..i]
};
let bytes = tokio::fs::read(&path).await.unwrap();
let current_hash = sha256::digest(&bytes);
let existing_hash = date_hashes.get(date);
if let Some(existing_hash) = existing_hash {
if current_hash == *existing_hash {
return None;
}
}
return Some((date.to_string(), current_hash, bytes));
});
let need_refresh = futures::future::join_all(need_refresh).await.into_iter()
.filter_map(|x| x)
.collect::<Vec<_>>();
let decompressed = need_refresh.par_iter().map(|(date, new_hash, bytes)| {
let mut decoder = flate2::bufread::GzDecoder::new(&bytes[..]);
let mut string = String::new();
decoder.read_to_string(&mut string).unwrap();
UpdatedFile {
date: date.clone(),
sha256: new_hash.clone(),
json: serde_json::from_str(&string).unwrap(),
}
}).collect::<Vec<_>>();
for updated_file in &decompressed {
// Refresh the database with the new files
for updated_file in &need_refresh {
sqlx::query("INSERT INTO raw_files (date, sha256, json) VALUES ($1, $2, $3 :: jsonb) ON CONFLICT (date) DO UPDATE SET sha256 = $2, json = $3 :: jsonb")
.bind(&updated_file.date)
.bind(&updated_file.sha256)
@ -199,37 +170,33 @@ async fn main() {
// Take all of the changed files' timeline items, and group them by item_id, then take the latest one.
// If we are needing to updated the database it will be with this one.
let possibly_timeline_items = decompressed.into_iter()
let possibly_timeline_items = need_refresh.into_iter()
.flat_map(|f| f.json.timeline_items)
.group_by(|item| item.item_id.clone())
.into_iter()
.map(|(_, items)| {
items.max_by_key(|item| item.end_date).unwrap()
}).collect_vec();
.map(|(_, items)| items.max_by_key(|item| item.end_date).unwrap()).collect_vec();
let possibly_timeline_item_ids = possibly_timeline_items.iter().map(|item| item.item_id.clone()).collect::<Vec<_>>();
let existing: Vec<TimelineItemUpdatedCheckRow> = sqlx::query_as("SELECT item_id, end_date FROM timeline_item where item_id = ANY($1)")
let existing = sqlx::query_as("SELECT item_id, end_date FROM timeline_item where item_id = ANY($1)")
.bind(&possibly_timeline_item_ids)
.fetch_all(&db)
.await
.unwrap();
.unwrap()
.into_iter()
.map(|row: TimelineItemUpdatedCheckRow| (row.item_id, row.end_date))
.collect::<HashMap<_, _>>();
let updated_timeline_items = possibly_timeline_items.into_iter().filter(|item| {
let existing = existing.iter().find(|e| e.item_id == item.item_id);
if let Some(existing) = existing {
if existing.end_date < item.end_date {
return true;
for updated_timeline_item in possibly_timeline_items {
// If we already have a timeline item with this id, and it has a later end date, we don't need to update it.
if let Some(existing_end_date) = existing.get(&updated_timeline_item.item_id) {
if *existing_end_date >= updated_timeline_item.end_date {
continue;
}
} else {
return true;
}
false
});
for updated_timeline_item in updated_timeline_items {
// First we need to insert the place if it doesn't exist
// We have a new or updated timeline item, we need to insert its associated place first if
// it exists.
if let Some(place) = &updated_timeline_item.place {
sqlx::query("INSERT INTO place (place_id, json) VALUES ($1, $2 :: jsonb) ON CONFLICT (place_id) DO UPDATE SET json = $2 :: jsonb")
.bind(&place.place_id)
@ -237,6 +204,7 @@ async fn main() {
.execute(&db).await.unwrap();
}
// Then we can insert/update the timeline item.
sqlx::query("INSERT INTO timeline_item (item_id, json, place_id, end_date) VALUES ($1, $2 :: jsonb, $3, $4) ON CONFLICT (item_id) DO UPDATE SET json = $2 :: jsonb, place_id = $3, end_date = $4")
.bind(&updated_timeline_item.item_id)
.bind(&serde_json::to_value(&updated_timeline_item).unwrap())