Compare commits
No commits in common. "8cf9d86f32b063d0683aea41b24307e1b5151287" and "99523430c650016f1ecf2f5f9c2b740fbec3459b" have entirely different histories.
8cf9d86f32
...
99523430c6
117
Cargo.lock
generated
117
Cargo.lock
generated
@ -99,20 +99,10 @@ dependencies = [
|
|||||||
"windows-sys 0.52.0",
|
"windows-sys 0.52.0",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "anyhow"
|
|
||||||
version = "1.0.82"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "f538837af36e6f6a9be0faa67f9a314f8119e4e4b5867c6ab40ed60360142519"
|
|
||||||
dependencies = [
|
|
||||||
"backtrace",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "arc-ingester"
|
name = "arc-ingester"
|
||||||
version = "0.1.1"
|
version = "0.1.1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
|
||||||
"chrono",
|
"chrono",
|
||||||
"clap",
|
"clap",
|
||||||
"flate2",
|
"flate2",
|
||||||
@ -123,8 +113,6 @@ dependencies = [
|
|||||||
"serde_json",
|
"serde_json",
|
||||||
"sha256",
|
"sha256",
|
||||||
"sqlx",
|
"sqlx",
|
||||||
"testcontainers",
|
|
||||||
"testcontainers-modules",
|
|
||||||
"thiserror",
|
"thiserror",
|
||||||
"tokio",
|
"tokio",
|
||||||
"uuid",
|
"uuid",
|
||||||
@ -217,16 +205,6 @@ dependencies = [
|
|||||||
"generic-array",
|
"generic-array",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "bollard-stubs"
|
|
||||||
version = "1.42.0-rc.3"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "ed59b5c00048f48d7af971b71f800fdf23e858844a6f9e4d32ca72e9399e7864"
|
|
||||||
dependencies = [
|
|
||||||
"serde",
|
|
||||||
"serde_with",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "bumpalo"
|
name = "bumpalo"
|
||||||
version = "3.14.0"
|
version = "3.14.0"
|
||||||
@ -410,41 +388,6 @@ dependencies = [
|
|||||||
"typenum",
|
"typenum",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "darling"
|
|
||||||
version = "0.13.4"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "a01d95850c592940db9b8194bc39f4bc0e89dee5c4265e4b1807c34a9aba453c"
|
|
||||||
dependencies = [
|
|
||||||
"darling_core",
|
|
||||||
"darling_macro",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "darling_core"
|
|
||||||
version = "0.13.4"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "859d65a907b6852c9361e3185c862aae7fafd2887876799fa55f5f99dc40d610"
|
|
||||||
dependencies = [
|
|
||||||
"fnv",
|
|
||||||
"ident_case",
|
|
||||||
"proc-macro2",
|
|
||||||
"quote",
|
|
||||||
"strsim",
|
|
||||||
"syn 1.0.109",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "darling_macro"
|
|
||||||
version = "0.13.4"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "9c972679f83bdf9c42bd905396b6c3588a843a17f0f16dfcfa3e2c5d57441835"
|
|
||||||
dependencies = [
|
|
||||||
"darling_core",
|
|
||||||
"quote",
|
|
||||||
"syn 1.0.109",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "der"
|
name = "der"
|
||||||
version = "0.7.8"
|
version = "0.7.8"
|
||||||
@ -549,12 +492,6 @@ dependencies = [
|
|||||||
"spin 0.9.8",
|
"spin 0.9.8",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "fnv"
|
|
||||||
version = "1.0.7"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "form_urlencoded"
|
name = "form_urlencoded"
|
||||||
version = "1.2.1"
|
version = "1.2.1"
|
||||||
@ -781,12 +718,6 @@ dependencies = [
|
|||||||
"cc",
|
"cc",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "ident_case"
|
|
||||||
version = "1.0.1"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "idna"
|
name = "idna"
|
||||||
version = "0.5.0"
|
version = "0.5.0"
|
||||||
@ -1323,28 +1254,6 @@ dependencies = [
|
|||||||
"serde",
|
"serde",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "serde_with"
|
|
||||||
version = "1.14.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "678b5a069e50bf00ecd22d0cd8ddf7c236f68581b03db652061ed5eb13a312ff"
|
|
||||||
dependencies = [
|
|
||||||
"serde",
|
|
||||||
"serde_with_macros",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "serde_with_macros"
|
|
||||||
version = "1.5.2"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "e182d6ec6f05393cc0e5ed1bf81ad6db3a8feedf8ee515ecdd369809bcce8082"
|
|
||||||
dependencies = [
|
|
||||||
"darling",
|
|
||||||
"proc-macro2",
|
|
||||||
"quote",
|
|
||||||
"syn 1.0.109",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "sha1"
|
name = "sha1"
|
||||||
version = "0.10.6"
|
version = "0.10.6"
|
||||||
@ -1725,32 +1634,6 @@ dependencies = [
|
|||||||
"windows-sys 0.52.0",
|
"windows-sys 0.52.0",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "testcontainers"
|
|
||||||
version = "0.15.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "f83d2931d7f521af5bae989f716c3fa43a6af9af7ec7a5e21b59ae40878cec00"
|
|
||||||
dependencies = [
|
|
||||||
"bollard-stubs",
|
|
||||||
"futures",
|
|
||||||
"hex",
|
|
||||||
"hmac",
|
|
||||||
"log",
|
|
||||||
"rand",
|
|
||||||
"serde",
|
|
||||||
"serde_json",
|
|
||||||
"sha2",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "testcontainers-modules"
|
|
||||||
version = "0.3.6"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "204d1c7516bfdc8a01bb85d3e30145e5bbeb2351812e5e8aa6971769109b45b5"
|
|
||||||
dependencies = [
|
|
||||||
"testcontainers",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "thiserror"
|
name = "thiserror"
|
||||||
version = "1.0.56"
|
version = "1.0.56"
|
||||||
|
|||||||
12
Cargo.toml
12
Cargo.toml
@ -4,15 +4,6 @@ version = "0.1.1"
|
|||||||
edition = "2021"
|
edition = "2021"
|
||||||
authors = ["Joshua Coles"]
|
authors = ["Joshua Coles"]
|
||||||
license = "MIT OR Apache-2.0"
|
license = "MIT OR Apache-2.0"
|
||||||
default-run = "arc-ingester"
|
|
||||||
|
|
||||||
[[bin]]
|
|
||||||
name = "arc-ingester"
|
|
||||||
path = "src/main.rs"
|
|
||||||
|
|
||||||
[[bin]]
|
|
||||||
name = "t1"
|
|
||||||
path = "src/t1.rs"
|
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
clap = { version = "4.4.18", features = ["derive", "env"] }
|
clap = { version = "4.4.18", features = ["derive", "env"] }
|
||||||
@ -28,6 +19,3 @@ futures = "0.3.30"
|
|||||||
itertools = "0.12.1"
|
itertools = "0.12.1"
|
||||||
chrono = { version = "0.4.33", features = ["serde"] }
|
chrono = { version = "0.4.33", features = ["serde"] }
|
||||||
uuid = { version = "1.7.0", features = ["serde"] }
|
uuid = { version = "1.7.0", features = ["serde"] }
|
||||||
testcontainers = "0.15.0"
|
|
||||||
testcontainers-modules = { version = "0.3.6", features = ["postgres"] }
|
|
||||||
anyhow = { version = "1.0.82", features = ["backtrace"] }
|
|
||||||
|
|||||||
@ -1,24 +0,0 @@
|
|||||||
with timelineItems as (select jsonb_array_elements(raw_files.json -> 'timelineItems') as timelineItem
|
|
||||||
from raw_files),
|
|
||||||
places as (select distinct on (md5(timelineItem ->> 'place' :: text)) timelineItem -> 'place' as place,
|
|
||||||
timelineItem -> 'place' ->> 'placeId' as placeId,
|
|
||||||
(timelineItem -> 'place' ->> 'lastSaved') :: timestamptz as lastSaved
|
|
||||||
from timelineItems
|
|
||||||
where timelineItem ->> 'place' is not null),
|
|
||||||
places_with_max_last_saved as (select place -> 'placeId' as placeId,
|
|
||||||
max((place ->> 'lastSaved') :: timestamptz) as latest_last_saved
|
|
||||||
from places
|
|
||||||
group by place -> 'placeId'),
|
|
||||||
latest_places as (select places.*
|
|
||||||
from places_with_max_last_saved
|
|
||||||
inner join places on places.place -> 'placeId' = places_with_max_last_saved.placeId and
|
|
||||||
places.lastSaved =
|
|
||||||
places_with_max_last_saved.latest_last_saved)
|
|
||||||
insert
|
|
||||||
into public.place (place_id, json, last_saved, server_last_updated)
|
|
||||||
select (placeId :: uuid) as place_id, place as json, lastSaved as last_saved, now() as server_last_updated
|
|
||||||
from latest_places
|
|
||||||
on conflict (place_id) do update set json = excluded.json,
|
|
||||||
last_saved = excluded.last_saved,
|
|
||||||
server_last_updated = excluded.server_last_updated
|
|
||||||
where excluded.last_saved > public.place.last_saved;
|
|
||||||
@ -1,28 +0,0 @@
|
|||||||
with timelineItems as (select jsonb_array_elements(raw_files.json -> 'timelineItems') as timelineItem
|
|
||||||
from raw_files),
|
|
||||||
max_last_saved as (select timelineItem ->> 'itemId' as itemId,
|
|
||||||
max((timelineItem ->> 'lastSaved') :: timestamptz) as latest_last_saved
|
|
||||||
from timelineItems
|
|
||||||
group by timelineItem ->> 'itemId'),
|
|
||||||
unique_timline_items as (select distinct on (max_last_saved.itemId) *
|
|
||||||
from max_last_saved
|
|
||||||
inner join timelineItems
|
|
||||||
on timelineItems.timelineItem ->> 'itemId' = max_last_saved.itemId
|
|
||||||
and (timelineItems.timelineItem ->> 'lastSaved') :: timestamptz =
|
|
||||||
max_last_saved.latest_last_saved)
|
|
||||||
insert
|
|
||||||
into public.timeline_item (item_id, json, place_id, end_date, last_saved, server_last_updated)
|
|
||||||
select unique_timline_items.itemId :: uuid as item_id,
|
|
||||||
unique_timline_items.timelineItem as json,
|
|
||||||
(unique_timline_items.timelineItem -> 'place' ->> 'placeId') :: uuid as place_id,
|
|
||||||
(unique_timline_items.timelineItem ->> 'endDate') :: timestamptz as end_date,
|
|
||||||
unique_timline_items.latest_last_saved :: timestamptz as last_saved,
|
|
||||||
now() as server_last_updated
|
|
||||||
from unique_timline_items
|
|
||||||
on conflict (item_id) do update set json = excluded.json,
|
|
||||||
place_id = excluded.place_id,
|
|
||||||
end_date = excluded.end_date,
|
|
||||||
last_saved = excluded.last_saved,
|
|
||||||
server_last_updated = excluded.server_last_updated
|
|
||||||
where excluded.last_saved > public.timeline_item.last_saved
|
|
||||||
returning item_id;
|
|
||||||
80
src/main.rs
80
src/main.rs
@ -2,10 +2,12 @@ use std::collections::HashMap;
|
|||||||
use std::fs::ReadDir;
|
use std::fs::ReadDir;
|
||||||
use std::io::Read;
|
use std::io::Read;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
use std::sync::Arc;
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use sqlx::FromRow;
|
use sqlx::FromRow;
|
||||||
|
use rayon::prelude::*;
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
@ -67,82 +69,80 @@ struct Place {
|
|||||||
rest: HashMap<String, Value>,
|
rest: HashMap<String, Value>,
|
||||||
}
|
}
|
||||||
|
|
||||||
fn hash_files(files: ReadDir) -> impl Iterator<Item=(String, PathBuf, String)> {
|
async fn find_updated(db: &sqlx::PgPool, files: ReadDir) -> Vec<UpdatedFile> {
|
||||||
files.into_iter()
|
let date_hashes: Vec<DateHash> = sqlx::query_as("SELECT date, sha256 FROM raw_files")
|
||||||
|
.fetch_all(db)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let date_hashes: Arc<HashMap<String, String>> = Arc::new(date_hashes.into_iter()
|
||||||
|
.map(|dh| (dh.date, dh.sha256))
|
||||||
|
.collect());
|
||||||
|
|
||||||
|
let date_hashes = &date_hashes;
|
||||||
|
|
||||||
|
// Find the files that need to be refreshed, based on the sha256 hash of the file compared to
|
||||||
|
// the hash stored in the database.
|
||||||
|
let need_refresh = files.into_iter()
|
||||||
.map(|f| f.unwrap().path())
|
.map(|f| f.unwrap().path())
|
||||||
.map(|path| {
|
.filter_map(|path| {
|
||||||
|
// Extract the date from the file name
|
||||||
let date = {
|
let date = {
|
||||||
let file_name = path.file_name().unwrap().to_str().unwrap();
|
let file_name = path.file_name().unwrap().to_str().unwrap();
|
||||||
let i = file_name.find('.').unwrap();
|
let i = file_name.find('.').unwrap();
|
||||||
&file_name[..i]
|
&file_name[..i]
|
||||||
};
|
};
|
||||||
|
|
||||||
let hash = sha256::try_digest(&path).unwrap();
|
let current_hash = sha256::try_digest(&path).unwrap();
|
||||||
(date.to_string(), path, hash)
|
let existing_hash = date_hashes.get(date);
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn find_updated(db: &sqlx::PgPool, files: ReadDir) -> Vec<UpdatedFile> {
|
if let Some(existing_hash) = existing_hash {
|
||||||
let date_hashes: Vec<DateHash> = sqlx::query_as("SELECT date, sha256 FROM raw_files")
|
if current_hash == *existing_hash {
|
||||||
.fetch_all(db)
|
|
||||||
.await
|
|
||||||
.expect("Failed to fetch date hashes from database");
|
|
||||||
|
|
||||||
let date_hashes: HashMap<String, String> = date_hashes.into_iter()
|
|
||||||
.map(|dh| (dh.date, dh.sha256))
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let new_hashes = hash_files(files);
|
|
||||||
|
|
||||||
new_hashes.filter_map(|(date, path, new_hash)| {
|
|
||||||
if let Some(existing_hash) = date_hashes.get(&date) {
|
|
||||||
if new_hash == *existing_hash {
|
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let bytes = std::fs::read(&path).unwrap();
|
let bytes = std::fs::read(&path).unwrap();
|
||||||
|
|
||||||
|
return Some((date.to_string(), current_hash, bytes));
|
||||||
|
})
|
||||||
|
.collect_vec();
|
||||||
|
|
||||||
|
let decompressed = need_refresh.par_iter().map(|(date, new_hash, bytes)| {
|
||||||
let mut decoder = flate2::bufread::GzDecoder::new(&bytes[..]);
|
let mut decoder = flate2::bufread::GzDecoder::new(&bytes[..]);
|
||||||
let mut string = String::new();
|
let mut string = String::new();
|
||||||
match decoder.read_to_string(&mut string) {
|
decoder.read_to_string(&mut string).unwrap();
|
||||||
Err(err) => {
|
|
||||||
eprintln!("Failed to parse file {path:?}");
|
|
||||||
eprintln!("Error {err:?}");
|
|
||||||
panic!("Error")
|
|
||||||
}
|
|
||||||
_ => {}
|
|
||||||
}
|
|
||||||
|
|
||||||
Some(UpdatedFile {
|
UpdatedFile {
|
||||||
date: date.clone(),
|
date: date.clone(),
|
||||||
sha256: new_hash.clone(),
|
sha256: new_hash.clone(),
|
||||||
json: serde_json::from_str(&string).unwrap(),
|
json: serde_json::from_str(&string).unwrap(),
|
||||||
})
|
}
|
||||||
}).collect_vec()
|
}).collect::<Vec<_>>();
|
||||||
|
|
||||||
|
decompressed
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() {
|
async fn main() {
|
||||||
let cli = Cli::parse();
|
let cli = Cli::parse();
|
||||||
let daily_exports = cli.root.join("Export/JSON/Daily");
|
let daily_exports = cli.root.join("Export/JSON/Daily");
|
||||||
let files = std::fs::read_dir(daily_exports)
|
let files = std::fs::read_dir(daily_exports).unwrap();
|
||||||
.expect("Failed to access daily exports directory");
|
|
||||||
|
|
||||||
let db = sqlx::PgPool::connect(&cli.conn)
|
let db = sqlx::PgPool::connect(&cli.conn)
|
||||||
.await
|
.await
|
||||||
.expect("Failed to connect to postgres database");
|
.unwrap();
|
||||||
|
|
||||||
sqlx::migrate!()
|
sqlx::migrate!()
|
||||||
.run(&db)
|
.run(&db)
|
||||||
.await
|
.await
|
||||||
.expect("Failed to migrate postgres database");
|
.unwrap();
|
||||||
|
|
||||||
let need_refresh = find_updated(&db, files)
|
let need_refresh = find_updated(&db, files).await;
|
||||||
.await;
|
|
||||||
|
|
||||||
// Refresh the database with the new files
|
// Refresh the database with the new files
|
||||||
for updated_file in &need_refresh {
|
for updated_file in &need_refresh {
|
||||||
sqlx::query("INSERT INTO raw_files (date, sha256, json) VALUES ($1, $2, $3 :: jsonb) ON CONFLICT (date) DO UPDATE SET sha256 = excluded.sha256, json = excluded.json")
|
sqlx::query("INSERT INTO raw_files (date, sha256, json) VALUES ($1, $2, $3 :: jsonb) ON CONFLICT (date) DO UPDATE SET sha256 = $2, json = $3 :: jsonb")
|
||||||
.bind(&updated_file.date)
|
.bind(&updated_file.date)
|
||||||
.bind(&updated_file.sha256)
|
.bind(&updated_file.sha256)
|
||||||
.bind(&serde_json::to_value(&updated_file.json).unwrap())
|
.bind(&serde_json::to_value(&updated_file.json).unwrap())
|
||||||
|
|||||||
16
src/t1.rs
16
src/t1.rs
@ -1,16 +0,0 @@
|
|||||||
use sqlx::{Connection, Executor};
|
|
||||||
use testcontainers::core::env::Os;
|
|
||||||
|
|
||||||
#[tokio::main]
|
|
||||||
async fn main() {
|
|
||||||
let tc = testcontainers::clients::Cli::new::<Os>();
|
|
||||||
let pg_spec = testcontainers_modules::postgres::Postgres::default();
|
|
||||||
let pg_container = tc.run(pg_spec);
|
|
||||||
pg_container.start();
|
|
||||||
println!("postgres running");
|
|
||||||
let mut pg = sqlx::postgres::PgConnection::connect(&format!("postgres://postgres:postgres@localhost:{}/postgres", pg_container.get_host_port_ipv4(5432)),)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
dbg!(pg.execute("select 1").await.unwrap());
|
|
||||||
}
|
|
||||||
Loading…
Reference in New Issue
Block a user