Simplify code to determine if files need to be changed and prep to move data transformation into PSQL
This commit is contained in:
		
							parent
							
								
									99523430c6
								
							
						
					
					
						commit
						bcfa0c2d89
					
				| @ -4,6 +4,15 @@ version = "0.1.1" | ||||
| edition = "2021" | ||||
| authors = ["Joshua Coles"] | ||||
| license = "MIT OR Apache-2.0" | ||||
| default-run = "arc-ingester" | ||||
| 
 | ||||
| [[bin]] | ||||
| name = "arc-ingester" | ||||
| path = "src/main.rs" | ||||
| 
 | ||||
| [[bin]] | ||||
| name = "t1" | ||||
| path = "src/t1.rs" | ||||
| 
 | ||||
| [dependencies] | ||||
| clap = { version = "4.4.18", features = ["derive", "env"] } | ||||
|  | ||||
							
								
								
									
										24
									
								
								functions/update_places.sql
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										24
									
								
								functions/update_places.sql
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,24 @@ | ||||
| with timelineItems as (select jsonb_array_elements(raw_files.json -> 'timelineItems') as timelineItem | ||||
|                        from raw_files), | ||||
|      places as (select distinct on (md5(timelineItem ->> 'place' :: text)) timelineItem -> 'place'                                  as place, | ||||
|                                                                            timelineItem -> 'place' ->> 'placeId'                    as placeId, | ||||
|                                                                            (timelineItem -> 'place' ->> 'lastSaved') :: timestamptz as lastSaved | ||||
|                 from timelineItems | ||||
|                 where timelineItem ->> 'place' is not null), | ||||
|      places_with_max_last_saved as (select place -> 'placeId'                          as placeId, | ||||
|                                            max((place ->> 'lastSaved') :: timestamptz) as latest_last_saved | ||||
|                                     from places | ||||
|                                     group by place -> 'placeId'), | ||||
|      latest_places as (select places.* | ||||
|                        from places_with_max_last_saved | ||||
|                                 inner join places on places.place -> 'placeId' = places_with_max_last_saved.placeId and | ||||
|                                                      places.lastSaved = | ||||
|                                                      places_with_max_last_saved.latest_last_saved) | ||||
| insert | ||||
| into public.place (place_id, json, last_saved, server_last_updated) | ||||
| select (placeId :: uuid) as place_id, place as json, lastSaved as last_saved, now() as server_last_updated | ||||
| from latest_places | ||||
| on conflict (place_id) do update set json                = excluded.json, | ||||
|                                      last_saved          = excluded.last_saved, | ||||
|                                      server_last_updated = excluded.server_last_updated | ||||
| where excluded.last_saved > public.place.last_saved; | ||||
							
								
								
									
										28
									
								
								functions/update_timeline_items.sql
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										28
									
								
								functions/update_timeline_items.sql
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,28 @@ | ||||
| with timelineItems as (select jsonb_array_elements(raw_files.json -> 'timelineItems') as timelineItem | ||||
|                        from raw_files), | ||||
|      max_last_saved as (select timelineItem ->> 'itemId'                          as itemId, | ||||
|                                max((timelineItem ->> 'lastSaved') :: timestamptz) as latest_last_saved | ||||
|                         from timelineItems | ||||
|                         group by timelineItem ->> 'itemId'), | ||||
|      unique_timline_items as (select distinct on (max_last_saved.itemId) * | ||||
|                               from max_last_saved | ||||
|                                        inner join timelineItems | ||||
|                                                   on timelineItems.timelineItem ->> 'itemId' = max_last_saved.itemId | ||||
|                                                       and (timelineItems.timelineItem ->> 'lastSaved') :: timestamptz = | ||||
|                                                           max_last_saved.latest_last_saved) | ||||
| insert | ||||
| into public.timeline_item (item_id, json, place_id, end_date, last_saved, server_last_updated) | ||||
| select unique_timline_items.itemId :: uuid                                  as item_id, | ||||
|        unique_timline_items.timelineItem                                    as json, | ||||
|        (unique_timline_items.timelineItem -> 'place' ->> 'placeId') :: uuid as place_id, | ||||
|        (unique_timline_items.timelineItem ->> 'endDate') :: timestamptz     as end_date, | ||||
|        unique_timline_items.latest_last_saved :: timestamptz                as last_saved, | ||||
|        now()                                                                as server_last_updated | ||||
| from unique_timline_items | ||||
| on conflict (item_id) do update set json                = excluded.json, | ||||
|                                     place_id            = excluded.place_id, | ||||
|                                     end_date            = excluded.end_date, | ||||
|                                     last_saved          = excluded.last_saved, | ||||
|                                     server_last_updated = excluded.server_last_updated | ||||
| where excluded.last_saved > public.timeline_item.last_saved | ||||
| returning item_id; | ||||
							
								
								
									
										59
									
								
								src/main.rs
									
									
									
									
									
								
							
							
						
						
									
										59
									
								
								src/main.rs
									
									
									
									
									
								
							| @ -2,12 +2,10 @@ use std::collections::HashMap; | ||||
| use std::fs::ReadDir; | ||||
| use std::io::Read; | ||||
| use std::path::PathBuf; | ||||
| use std::sync::Arc; | ||||
| use chrono::{DateTime, Utc}; | ||||
| use clap::Parser; | ||||
| use itertools::Itertools; | ||||
| use sqlx::FromRow; | ||||
| use rayon::prelude::*; | ||||
| use serde_json::Value; | ||||
| use uuid::Uuid; | ||||
| 
 | ||||
| @ -69,58 +67,51 @@ struct Place { | ||||
|     rest: HashMap<String, Value>, | ||||
| } | ||||
| 
 | ||||
| async fn find_updated(db: &sqlx::PgPool, files: ReadDir) -> Vec<UpdatedFile> { | ||||
|     let date_hashes: Vec<DateHash> = sqlx::query_as("SELECT date, sha256 FROM raw_files") | ||||
|         .fetch_all(db) | ||||
|         .await | ||||
|         .unwrap(); | ||||
| 
 | ||||
|     let date_hashes: Arc<HashMap<String, String>> = Arc::new(date_hashes.into_iter() | ||||
|         .map(|dh| (dh.date, dh.sha256)) | ||||
|         .collect()); | ||||
| 
 | ||||
|     let date_hashes = &date_hashes; | ||||
| 
 | ||||
|     // Find the files that need to be refreshed, based on the sha256 hash of the file compared to
 | ||||
|     // the hash stored in the database.
 | ||||
|     let need_refresh = files.into_iter() | ||||
| fn hash_files(files: ReadDir) -> impl Iterator<Item=(String, PathBuf, String)> { | ||||
|     files.into_iter() | ||||
|         .map(|f| f.unwrap().path()) | ||||
|         .filter_map(|path| { | ||||
|             // Extract the date from the file name
 | ||||
|         .map(|path| { | ||||
|             let date = { | ||||
|                 let file_name = path.file_name().unwrap().to_str().unwrap(); | ||||
|                 let i = file_name.find('.').unwrap(); | ||||
|                 &file_name[..i] | ||||
|             }; | ||||
| 
 | ||||
|             let current_hash = sha256::try_digest(&path).unwrap(); | ||||
|             let existing_hash = date_hashes.get(date); | ||||
|             let hash = sha256::try_digest(&path).unwrap(); | ||||
|             (date.to_string(), path, hash) | ||||
|         }) | ||||
| } | ||||
| 
 | ||||
|             if let Some(existing_hash) = existing_hash { | ||||
|                 if current_hash == *existing_hash { | ||||
| async fn find_updated(db: &sqlx::PgPool, files: ReadDir) -> Vec<UpdatedFile> { | ||||
|     let date_hashes: Vec<DateHash> = sqlx::query_as("SELECT date, sha256 FROM raw_files") | ||||
|         .fetch_all(db) | ||||
|         .await | ||||
|         .unwrap(); | ||||
| 
 | ||||
|     let date_hashes: HashMap<String, String> = date_hashes.into_iter() | ||||
|         .map(|dh| (dh.date, dh.sha256)) | ||||
|         .collect(); | ||||
| 
 | ||||
|     let new_hashes = hash_files(files); | ||||
| 
 | ||||
|     new_hashes.filter_map(|(date, path, new_hash)| { | ||||
|         if let Some(existing_hash) = date_hashes.get(&date) { | ||||
|             if new_hash == *existing_hash { | ||||
|                 return None; | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         let bytes = std::fs::read(&path).unwrap(); | ||||
| 
 | ||||
|             return Some((date.to_string(), current_hash, bytes)); | ||||
|         }) | ||||
|         .collect_vec(); | ||||
| 
 | ||||
|     let decompressed = need_refresh.par_iter().map(|(date, new_hash, bytes)| { | ||||
|         let mut decoder = flate2::bufread::GzDecoder::new(&bytes[..]); | ||||
|         let mut string = String::new(); | ||||
|         decoder.read_to_string(&mut string).unwrap(); | ||||
| 
 | ||||
|         UpdatedFile { | ||||
|         Some(UpdatedFile { | ||||
|             date: date.clone(), | ||||
|             sha256: new_hash.clone(), | ||||
|             json: serde_json::from_str(&string).unwrap(), | ||||
|         } | ||||
|     }).collect::<Vec<_>>(); | ||||
| 
 | ||||
|     decompressed | ||||
|         }) | ||||
|     }).collect_vec() | ||||
| } | ||||
| 
 | ||||
| #[tokio::main] | ||||
|  | ||||
							
								
								
									
										7
									
								
								src/t1.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										7
									
								
								src/t1.rs
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,7 @@ | ||||
| fn main() { | ||||
|     let files = std::fs::read_dir("/Users/joshuacoles/Library/Mobile Documents/iCloud~com~bigpaua~LearnerCoacher/Documents/Export/JSON/Daily/").unwrap(); | ||||
|     for file in files { | ||||
|         let file = file.unwrap().path(); | ||||
|         println!("{}\t{}", file.display(), sha256::try_digest(&file).unwrap()) | ||||
|     } | ||||
| } | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user