Compare commits
	
		
			No commits in common. "8cf9d86f32b063d0683aea41b24307e1b5151287" and "99523430c650016f1ecf2f5f9c2b740fbec3459b" have entirely different histories.
		
	
	
		
			8cf9d86f32
			...
			99523430c6
		
	
		
							
								
								
									
										117
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										117
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							| @ -99,20 +99,10 @@ dependencies = [ | ||||
|  "windows-sys 0.52.0", | ||||
| ] | ||||
| 
 | ||||
| [[package]] | ||||
| name = "anyhow" | ||||
| version = "1.0.82" | ||||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||||
| checksum = "f538837af36e6f6a9be0faa67f9a314f8119e4e4b5867c6ab40ed60360142519" | ||||
| dependencies = [ | ||||
|  "backtrace", | ||||
| ] | ||||
| 
 | ||||
| [[package]] | ||||
| name = "arc-ingester" | ||||
| version = "0.1.1" | ||||
| dependencies = [ | ||||
|  "anyhow", | ||||
|  "chrono", | ||||
|  "clap", | ||||
|  "flate2", | ||||
| @ -123,8 +113,6 @@ dependencies = [ | ||||
|  "serde_json", | ||||
|  "sha256", | ||||
|  "sqlx", | ||||
|  "testcontainers", | ||||
|  "testcontainers-modules", | ||||
|  "thiserror", | ||||
|  "tokio", | ||||
|  "uuid", | ||||
| @ -217,16 +205,6 @@ dependencies = [ | ||||
|  "generic-array", | ||||
| ] | ||||
| 
 | ||||
| [[package]] | ||||
| name = "bollard-stubs" | ||||
| version = "1.42.0-rc.3" | ||||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||||
| checksum = "ed59b5c00048f48d7af971b71f800fdf23e858844a6f9e4d32ca72e9399e7864" | ||||
| dependencies = [ | ||||
|  "serde", | ||||
|  "serde_with", | ||||
| ] | ||||
| 
 | ||||
| [[package]] | ||||
| name = "bumpalo" | ||||
| version = "3.14.0" | ||||
| @ -410,41 +388,6 @@ dependencies = [ | ||||
|  "typenum", | ||||
| ] | ||||
| 
 | ||||
| [[package]] | ||||
| name = "darling" | ||||
| version = "0.13.4" | ||||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||||
| checksum = "a01d95850c592940db9b8194bc39f4bc0e89dee5c4265e4b1807c34a9aba453c" | ||||
| dependencies = [ | ||||
|  "darling_core", | ||||
|  "darling_macro", | ||||
| ] | ||||
| 
 | ||||
| [[package]] | ||||
| name = "darling_core" | ||||
| version = "0.13.4" | ||||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||||
| checksum = "859d65a907b6852c9361e3185c862aae7fafd2887876799fa55f5f99dc40d610" | ||||
| dependencies = [ | ||||
|  "fnv", | ||||
|  "ident_case", | ||||
|  "proc-macro2", | ||||
|  "quote", | ||||
|  "strsim", | ||||
|  "syn 1.0.109", | ||||
| ] | ||||
| 
 | ||||
| [[package]] | ||||
| name = "darling_macro" | ||||
| version = "0.13.4" | ||||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||||
| checksum = "9c972679f83bdf9c42bd905396b6c3588a843a17f0f16dfcfa3e2c5d57441835" | ||||
| dependencies = [ | ||||
|  "darling_core", | ||||
|  "quote", | ||||
|  "syn 1.0.109", | ||||
| ] | ||||
| 
 | ||||
| [[package]] | ||||
| name = "der" | ||||
| version = "0.7.8" | ||||
| @ -549,12 +492,6 @@ dependencies = [ | ||||
|  "spin 0.9.8", | ||||
| ] | ||||
| 
 | ||||
| [[package]] | ||||
| name = "fnv" | ||||
| version = "1.0.7" | ||||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||||
| checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" | ||||
| 
 | ||||
| [[package]] | ||||
| name = "form_urlencoded" | ||||
| version = "1.2.1" | ||||
| @ -781,12 +718,6 @@ dependencies = [ | ||||
|  "cc", | ||||
| ] | ||||
| 
 | ||||
| [[package]] | ||||
| name = "ident_case" | ||||
| version = "1.0.1" | ||||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||||
| checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" | ||||
| 
 | ||||
| [[package]] | ||||
| name = "idna" | ||||
| version = "0.5.0" | ||||
| @ -1323,28 +1254,6 @@ dependencies = [ | ||||
|  "serde", | ||||
| ] | ||||
| 
 | ||||
| [[package]] | ||||
| name = "serde_with" | ||||
| version = "1.14.0" | ||||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||||
| checksum = "678b5a069e50bf00ecd22d0cd8ddf7c236f68581b03db652061ed5eb13a312ff" | ||||
| dependencies = [ | ||||
|  "serde", | ||||
|  "serde_with_macros", | ||||
| ] | ||||
| 
 | ||||
| [[package]] | ||||
| name = "serde_with_macros" | ||||
| version = "1.5.2" | ||||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||||
| checksum = "e182d6ec6f05393cc0e5ed1bf81ad6db3a8feedf8ee515ecdd369809bcce8082" | ||||
| dependencies = [ | ||||
|  "darling", | ||||
|  "proc-macro2", | ||||
|  "quote", | ||||
|  "syn 1.0.109", | ||||
| ] | ||||
| 
 | ||||
| [[package]] | ||||
| name = "sha1" | ||||
| version = "0.10.6" | ||||
| @ -1725,32 +1634,6 @@ dependencies = [ | ||||
|  "windows-sys 0.52.0", | ||||
| ] | ||||
| 
 | ||||
| [[package]] | ||||
| name = "testcontainers" | ||||
| version = "0.15.0" | ||||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||||
| checksum = "f83d2931d7f521af5bae989f716c3fa43a6af9af7ec7a5e21b59ae40878cec00" | ||||
| dependencies = [ | ||||
|  "bollard-stubs", | ||||
|  "futures", | ||||
|  "hex", | ||||
|  "hmac", | ||||
|  "log", | ||||
|  "rand", | ||||
|  "serde", | ||||
|  "serde_json", | ||||
|  "sha2", | ||||
| ] | ||||
| 
 | ||||
| [[package]] | ||||
| name = "testcontainers-modules" | ||||
| version = "0.3.6" | ||||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||||
| checksum = "204d1c7516bfdc8a01bb85d3e30145e5bbeb2351812e5e8aa6971769109b45b5" | ||||
| dependencies = [ | ||||
|  "testcontainers", | ||||
| ] | ||||
| 
 | ||||
| [[package]] | ||||
| name = "thiserror" | ||||
| version = "1.0.56" | ||||
|  | ||||
							
								
								
									
										12
									
								
								Cargo.toml
									
									
									
									
									
								
							
							
						
						
									
										12
									
								
								Cargo.toml
									
									
									
									
									
								
							| @ -4,15 +4,6 @@ version = "0.1.1" | ||||
| edition = "2021" | ||||
| authors = ["Joshua Coles"] | ||||
| license = "MIT OR Apache-2.0" | ||||
| default-run = "arc-ingester" | ||||
| 
 | ||||
| [[bin]] | ||||
| name = "arc-ingester" | ||||
| path = "src/main.rs" | ||||
| 
 | ||||
| [[bin]] | ||||
| name = "t1" | ||||
| path = "src/t1.rs" | ||||
| 
 | ||||
| [dependencies] | ||||
| clap = { version = "4.4.18", features = ["derive", "env"] } | ||||
| @ -28,6 +19,3 @@ futures = "0.3.30" | ||||
| itertools = "0.12.1" | ||||
| chrono = { version = "0.4.33", features = ["serde"] } | ||||
| uuid = { version = "1.7.0", features = ["serde"] } | ||||
| testcontainers = "0.15.0" | ||||
| testcontainers-modules = { version = "0.3.6", features = ["postgres"] } | ||||
| anyhow = { version = "1.0.82", features = ["backtrace"] } | ||||
|  | ||||
| @ -1,24 +0,0 @@ | ||||
| with timelineItems as (select jsonb_array_elements(raw_files.json -> 'timelineItems') as timelineItem | ||||
|                        from raw_files), | ||||
|      places as (select distinct on (md5(timelineItem ->> 'place' :: text)) timelineItem -> 'place'                                  as place, | ||||
|                                                                            timelineItem -> 'place' ->> 'placeId'                    as placeId, | ||||
|                                                                            (timelineItem -> 'place' ->> 'lastSaved') :: timestamptz as lastSaved | ||||
|                 from timelineItems | ||||
|                 where timelineItem ->> 'place' is not null), | ||||
|      places_with_max_last_saved as (select place -> 'placeId'                          as placeId, | ||||
|                                            max((place ->> 'lastSaved') :: timestamptz) as latest_last_saved | ||||
|                                     from places | ||||
|                                     group by place -> 'placeId'), | ||||
|      latest_places as (select places.* | ||||
|                        from places_with_max_last_saved | ||||
|                                 inner join places on places.place -> 'placeId' = places_with_max_last_saved.placeId and | ||||
|                                                      places.lastSaved = | ||||
|                                                      places_with_max_last_saved.latest_last_saved) | ||||
| insert | ||||
| into public.place (place_id, json, last_saved, server_last_updated) | ||||
| select (placeId :: uuid) as place_id, place as json, lastSaved as last_saved, now() as server_last_updated | ||||
| from latest_places | ||||
| on conflict (place_id) do update set json                = excluded.json, | ||||
|                                      last_saved          = excluded.last_saved, | ||||
|                                      server_last_updated = excluded.server_last_updated | ||||
| where excluded.last_saved > public.place.last_saved; | ||||
| @ -1,28 +0,0 @@ | ||||
| with timelineItems as (select jsonb_array_elements(raw_files.json -> 'timelineItems') as timelineItem | ||||
|                        from raw_files), | ||||
|      max_last_saved as (select timelineItem ->> 'itemId'                          as itemId, | ||||
|                                max((timelineItem ->> 'lastSaved') :: timestamptz) as latest_last_saved | ||||
|                         from timelineItems | ||||
|                         group by timelineItem ->> 'itemId'), | ||||
|      unique_timline_items as (select distinct on (max_last_saved.itemId) * | ||||
|                               from max_last_saved | ||||
|                                        inner join timelineItems | ||||
|                                                   on timelineItems.timelineItem ->> 'itemId' = max_last_saved.itemId | ||||
|                                                       and (timelineItems.timelineItem ->> 'lastSaved') :: timestamptz = | ||||
|                                                           max_last_saved.latest_last_saved) | ||||
| insert | ||||
| into public.timeline_item (item_id, json, place_id, end_date, last_saved, server_last_updated) | ||||
| select unique_timline_items.itemId :: uuid                                  as item_id, | ||||
|        unique_timline_items.timelineItem                                    as json, | ||||
|        (unique_timline_items.timelineItem -> 'place' ->> 'placeId') :: uuid as place_id, | ||||
|        (unique_timline_items.timelineItem ->> 'endDate') :: timestamptz     as end_date, | ||||
|        unique_timline_items.latest_last_saved :: timestamptz                as last_saved, | ||||
|        now()                                                                as server_last_updated | ||||
| from unique_timline_items | ||||
| on conflict (item_id) do update set json                = excluded.json, | ||||
|                                     place_id            = excluded.place_id, | ||||
|                                     end_date            = excluded.end_date, | ||||
|                                     last_saved          = excluded.last_saved, | ||||
|                                     server_last_updated = excluded.server_last_updated | ||||
| where excluded.last_saved > public.timeline_item.last_saved | ||||
| returning item_id; | ||||
							
								
								
									
										80
									
								
								src/main.rs
									
									
									
									
									
								
							
							
						
						
									
										80
									
								
								src/main.rs
									
									
									
									
									
								
							| @ -2,10 +2,12 @@ use std::collections::HashMap; | ||||
| use std::fs::ReadDir; | ||||
| use std::io::Read; | ||||
| use std::path::PathBuf; | ||||
| use std::sync::Arc; | ||||
| use chrono::{DateTime, Utc}; | ||||
| use clap::Parser; | ||||
| use itertools::Itertools; | ||||
| use sqlx::FromRow; | ||||
| use rayon::prelude::*; | ||||
| use serde_json::Value; | ||||
| use uuid::Uuid; | ||||
| 
 | ||||
| @ -67,82 +69,80 @@ struct Place { | ||||
|     rest: HashMap<String, Value>, | ||||
| } | ||||
| 
 | ||||
| fn hash_files(files: ReadDir) -> impl Iterator<Item=(String, PathBuf, String)> { | ||||
|     files.into_iter() | ||||
| async fn find_updated(db: &sqlx::PgPool, files: ReadDir) -> Vec<UpdatedFile> { | ||||
|     let date_hashes: Vec<DateHash> = sqlx::query_as("SELECT date, sha256 FROM raw_files") | ||||
|         .fetch_all(db) | ||||
|         .await | ||||
|         .unwrap(); | ||||
| 
 | ||||
|     let date_hashes: Arc<HashMap<String, String>> = Arc::new(date_hashes.into_iter() | ||||
|         .map(|dh| (dh.date, dh.sha256)) | ||||
|         .collect()); | ||||
| 
 | ||||
|     let date_hashes = &date_hashes; | ||||
| 
 | ||||
|     // Find the files that need to be refreshed, based on the sha256 hash of the file compared to
 | ||||
|     // the hash stored in the database.
 | ||||
|     let need_refresh = files.into_iter() | ||||
|         .map(|f| f.unwrap().path()) | ||||
|         .map(|path| { | ||||
|         .filter_map(|path| { | ||||
|             // Extract the date from the file name
 | ||||
|             let date = { | ||||
|                 let file_name = path.file_name().unwrap().to_str().unwrap(); | ||||
|                 let i = file_name.find('.').unwrap(); | ||||
|                 &file_name[..i] | ||||
|             }; | ||||
| 
 | ||||
|             let hash = sha256::try_digest(&path).unwrap(); | ||||
|             (date.to_string(), path, hash) | ||||
|         }) | ||||
| } | ||||
|             let current_hash = sha256::try_digest(&path).unwrap(); | ||||
|             let existing_hash = date_hashes.get(date); | ||||
| 
 | ||||
| async fn find_updated(db: &sqlx::PgPool, files: ReadDir) -> Vec<UpdatedFile> { | ||||
|     let date_hashes: Vec<DateHash> = sqlx::query_as("SELECT date, sha256 FROM raw_files") | ||||
|         .fetch_all(db) | ||||
|         .await | ||||
|         .expect("Failed to fetch date hashes from database"); | ||||
| 
 | ||||
|     let date_hashes: HashMap<String, String> = date_hashes.into_iter() | ||||
|         .map(|dh| (dh.date, dh.sha256)) | ||||
|         .collect(); | ||||
| 
 | ||||
|     let new_hashes = hash_files(files); | ||||
| 
 | ||||
|     new_hashes.filter_map(|(date, path, new_hash)| { | ||||
|         if let Some(existing_hash) = date_hashes.get(&date) { | ||||
|             if new_hash == *existing_hash { | ||||
|             if let Some(existing_hash) = existing_hash { | ||||
|                 if current_hash == *existing_hash { | ||||
|                     return None; | ||||
|                 } | ||||
|             } | ||||
| 
 | ||||
|             let bytes = std::fs::read(&path).unwrap(); | ||||
| 
 | ||||
|             return Some((date.to_string(), current_hash, bytes)); | ||||
|         }) | ||||
|         .collect_vec(); | ||||
| 
 | ||||
|     let decompressed = need_refresh.par_iter().map(|(date, new_hash, bytes)| { | ||||
|         let mut decoder = flate2::bufread::GzDecoder::new(&bytes[..]); | ||||
|         let mut string = String::new(); | ||||
|         match decoder.read_to_string(&mut string) { | ||||
|             Err(err) => { | ||||
|                 eprintln!("Failed to parse file {path:?}"); | ||||
|                 eprintln!("Error {err:?}"); | ||||
|                 panic!("Error") | ||||
|             } | ||||
|             _ => {} | ||||
|         } | ||||
|         decoder.read_to_string(&mut string).unwrap(); | ||||
| 
 | ||||
|         Some(UpdatedFile { | ||||
|         UpdatedFile { | ||||
|             date: date.clone(), | ||||
|             sha256: new_hash.clone(), | ||||
|             json: serde_json::from_str(&string).unwrap(), | ||||
|         }) | ||||
|     }).collect_vec() | ||||
|         } | ||||
|     }).collect::<Vec<_>>(); | ||||
| 
 | ||||
|     decompressed | ||||
| } | ||||
| 
 | ||||
| #[tokio::main] | ||||
| async fn main() { | ||||
|     let cli = Cli::parse(); | ||||
|     let daily_exports = cli.root.join("Export/JSON/Daily"); | ||||
|     let files = std::fs::read_dir(daily_exports) | ||||
|         .expect("Failed to access daily exports directory"); | ||||
|     let files = std::fs::read_dir(daily_exports).unwrap(); | ||||
| 
 | ||||
|     let db = sqlx::PgPool::connect(&cli.conn) | ||||
|         .await | ||||
|         .expect("Failed to connect to postgres database"); | ||||
|         .unwrap(); | ||||
| 
 | ||||
|     sqlx::migrate!() | ||||
|         .run(&db) | ||||
|         .await | ||||
|         .expect("Failed to migrate postgres database"); | ||||
|         .unwrap(); | ||||
| 
 | ||||
|     let need_refresh = find_updated(&db, files) | ||||
|         .await; | ||||
|     let need_refresh = find_updated(&db, files).await; | ||||
| 
 | ||||
|     // Refresh the database with the new files
 | ||||
|     for updated_file in &need_refresh { | ||||
|         sqlx::query("INSERT INTO raw_files (date, sha256, json) VALUES ($1, $2, $3 :: jsonb) ON CONFLICT (date) DO UPDATE SET sha256 = excluded.sha256, json = excluded.json") | ||||
|         sqlx::query("INSERT INTO raw_files (date, sha256, json) VALUES ($1, $2, $3 :: jsonb) ON CONFLICT (date) DO UPDATE SET sha256 = $2, json = $3 :: jsonb") | ||||
|             .bind(&updated_file.date) | ||||
|             .bind(&updated_file.sha256) | ||||
|             .bind(&serde_json::to_value(&updated_file.json).unwrap()) | ||||
|  | ||||
							
								
								
									
										16
									
								
								src/t1.rs
									
									
									
									
									
								
							
							
						
						
									
										16
									
								
								src/t1.rs
									
									
									
									
									
								
							| @ -1,16 +0,0 @@ | ||||
| use sqlx::{Connection, Executor}; | ||||
| use testcontainers::core::env::Os; | ||||
| 
 | ||||
| #[tokio::main] | ||||
| async fn main() { | ||||
|     let tc = testcontainers::clients::Cli::new::<Os>(); | ||||
|     let pg_spec = testcontainers_modules::postgres::Postgres::default(); | ||||
|     let pg_container = tc.run(pg_spec); | ||||
|     pg_container.start(); | ||||
|     println!("postgres running"); | ||||
|     let mut pg = sqlx::postgres::PgConnection::connect(&format!("postgres://postgres:postgres@localhost:{}/postgres", pg_container.get_host_port_ipv4(5432)),) | ||||
|         .await | ||||
|         .unwrap(); | ||||
| 
 | ||||
|     dbg!(pg.execute("select 1").await.unwrap()); | ||||
| } | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user