From 08766dc0e07e7f6192a73c404264105930e04c93 Mon Sep 17 00:00:00 2001 From: Joshua Coles Date: Fri, 9 Aug 2024 09:43:10 +0100 Subject: [PATCH] Stash --- Cargo.lock | 1 + Cargo.toml | 1 + Dockerfile.cache | 68 +++++++---------- src/ingestion/db.rs | 125 ++++++++++++++++++++++++------- src/ingestion/flex.rs | 7 ++ src/ingestion/ingestion_logic.rs | 26 +++---- src/ingestion/mod.rs | 1 + 7 files changed, 149 insertions(+), 80 deletions(-) create mode 100644 src/ingestion/flex.rs diff --git a/Cargo.lock b/Cargo.lock index 8a87891..04ed5f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1757,6 +1757,7 @@ dependencies = [ "http", "migration", "num-traits", + "once_cell", "sea-orm", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index a03576e..a22c28b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,7 @@ testcontainers-modules = { version = "0.5.0", features = ["postgres"] } sqlx = { version = "0.7.4", features = ["postgres"] } tower-http = { version = "0.5.2", features = ["trace"] } bytes = "1.6.0" +once_cell = "1.19.0" [workspace] members = [".", "migration", "entity"] diff --git a/Dockerfile.cache b/Dockerfile.cache index 41422d3..7ecf8ac 100644 --- a/Dockerfile.cache +++ b/Dockerfile.cache @@ -1,58 +1,48 @@ -# Stage 1: Build +# Setup the base build image, this will be used for planning (to cache dependencies) and axctually building the image ARG RUST_VERSION=1.76.0 -FROM lukemathwalker/cargo-chef:latest-rust-${RUST_VERSION} as chef -WORKDIR /build/ -# hadolint ignore=DL3008 +FROM clux/muslrust:${RUST_VERSION}-stable AS chef +USER root RUN apt-get update && \ apt-get install -y --no-install-recommends \ - lld \ - clang \ - libclang-dev \ + lld musl-tools clang libclang-dev llvm \ && apt-get clean \ && rm -rf /var/lib/apt/lists/* +ARG RUST_TARGET_ARCH=aarch64 +ARG RUST_TARGET=${RUST_TARGET_ARCH}-unknown-linux-musl + +ENV CC_${RUST_TARGET_ARCH}_unknown_linux_musl=clang +ENV AR_${RUST_TARGET_ARCH}_unknown_linux_musl=llvm-ar +ENV CARGO_TARGET_${RUST_TARGET_ARCH}_UNKNOWN_LINUX_MUSL_RUSTFLAGS="-Clink-self-contained=yes -Clinker=rust-lld" + +RUN cargo install cargo-chef +WORKDIR /app FROM chef as planner COPY . . RUN cargo chef prepare --recipe-path recipe.json -FROM chef as builder -COPY --from=planner /build/recipe.json recipe.json -# Build dependencies - this is the caching Docker layer! -RUN cargo chef cook --release -p monzo-ingestion --recipe-path recipe.json -# Build application +FROM chef AS builder +ARG BINARY=monzo-ingestion +ARG RUST_TARGET_ARCH=aarch64 +ARG RUST_TARGET=${RUST_TARGET_ARCH}-unknown-linux-musl + +COPY --from=planner /app/recipe.json recipe.json + +RUN cargo chef cook --release --target "${RUST_TARGET}" --recipe-path recipe.json COPY . . -RUN cargo build --release -p monzo-ingestion +RUN cargo build --release --target "${RUST_TARGET}" --bin "${BINARY}" -# Stage 2: Run -FROM debian:bookworm-slim AS runtime +FROM alpine AS runtime +ARG APP_USER=appuser +RUN addgroup -S ${APP_USER} && adduser -S ${APP_USER} -G ${APP_USER} +RUN apk add --no-cache ca-certificates curl -RUN set -ex; \ - apt-get update && \ - apt-get -y install --no-install-recommends \ - ca-certificates curl && \ - rm -rf /var/lib/apt/lists/* +COPY --from=builder /app/target/${RUST_TARGET}/release/${BINARY} /usr/local/bin/server -# Create a non-privileged user that the app will run under. -# See https://docs.docker.com/develop/develop-images/dockerfile_best-practices/#user -ARG UID=10001 -RUN adduser \ - --disabled-password \ - --gecos "" \ - --home "/nonexistent" \ - --shell "/sbin/nologin" \ - --no-create-home \ - --uid "${UID}" \ - appuser -USER appuser - -# Copy the executable from the "build" stage. -COPY --from=builder /build/target/release/monzo-ingestion /bin/server - -# Expose the port that the application listens on. EXPOSE 3000 HEALTHCHECK --interval=5s --timeout=3s --retries=3 \ CMD curl -f http://localhost:3000/health || exit 1 -# What the container should run when it is started. -CMD ["/bin/server", "serve", "--addr", "0.0.0.0:3000"] +USER ${APP_USER} +CMD ["/usr/local/bin/server", "serve", "--addr", "0.0.0.0:3000"] diff --git a/src/ingestion/db.rs b/src/ingestion/db.rs index 762658e..0d04174 100644 --- a/src/ingestion/db.rs +++ b/src/ingestion/db.rs @@ -73,16 +73,16 @@ async fn update_expenditures( .flat_map(|i| &i.contained_expenditures) .cloned(), ) - .on_conflict( - OnConflict::columns(vec![ - expenditure::Column::TransactionId, - expenditure::Column::Category, - ]) - .update_columns(expenditure::Column::iter()) - .to_owned(), - ) - .exec(tx) - .await?; + .on_conflict( + OnConflict::columns(vec![ + expenditure::Column::TransactionId, + expenditure::Column::Category, + ]) + .update_columns(expenditure::Column::iter()) + .to_owned(), + ) + .exec(tx) + .await?; Ok(()) } @@ -118,10 +118,12 @@ async fn whittle_insertions<'a>( .select_only() .columns([transaction::Column::IdentityHash]) .filter(transaction::Column::IdentityHash.is_not_null()) - .into_tuple::<(i64,)>() + .into_tuple::<(i64, )>() .all(tx) .await?; + tracing::debug!("Found existing entries: {existing_hashes:?}"); + // We will only update those where the hash is different to avoid unnecessary updates and // notifications. let new_or_updated_insertions = insertions @@ -130,7 +132,7 @@ async fn whittle_insertions<'a>( let hash = i.identity_hash; !existing_hashes .iter() - .any(|(existing_hash,)| *existing_hash == hash) + .any(|(existing_hash, )| *existing_hash == hash) }) .collect::>(); @@ -155,23 +157,28 @@ async fn notify_new_transactions( } mod tests { - use super::{notify_new_transactions, update_expenditures, update_transactions}; + use super::{insert, notify_new_transactions, update_expenditures, update_transactions}; use anyhow::Error; + use tokio::sync::OnceCell; use migration::MigratorTrait; use sea_orm::{DatabaseConnection, TransactionTrait}; + use serde_json::Value; use sqlx::postgres::PgListener; use sqlx::PgPool; use testcontainers::runners::AsyncRunner; use testcontainers::ContainerAsync; + use crate::ingestion::ingestion_logic::from_json_row; - async fn initialise() -> Result< - ( - ContainerAsync, - DatabaseConnection, - PgPool, - ), - Error, - > { + #[derive(Debug)] + struct DatabaseInstance { + container: ContainerAsync, + db: DatabaseConnection, + pool: PgPool, + } + + static INSTANCE: OnceCell = OnceCell::const_new(); + + async fn initialise_db() -> Result { let container = testcontainers_modules::postgres::Postgres::default() .start() .await?; @@ -186,15 +193,29 @@ mod tests { migration::Migrator::up(&db, None).await?; let pool = PgPool::connect(connection_string).await?; + let instance = DatabaseInstance { + container, + db, + pool, + }; - Ok((container, db, pool)) + Ok(instance) + } + + async fn get_or_initialize_db_instance() -> Result< + &'static DatabaseInstance, + Error, + > { + Ok(INSTANCE.get_or_init(|| async { + initialise_db().await.unwrap() + }).await) } #[tokio::test] - async fn test_no_new_insertions() -> Result<(), Error> { - let (_container, db, _pool) = initialise().await?; + async fn test_empty_insertion_list() -> Result<(), Error> { + let db = get_or_initialize_db_instance().await?; let insertions = vec![]; - let tx = db.begin().await?; + let tx = db.db.begin().await?; update_transactions(&tx, &insertions).await?; update_expenditures(&tx, &insertions, &vec![]).await?; tx.commit().await?; @@ -204,8 +225,8 @@ mod tests { #[tokio::test] async fn test_notify() -> Result<(), Error> { - let (_container, db, pool) = initialise().await?; - let mut listener = PgListener::connect_with(&pool).await?; + let dbi = get_or_initialize_db_instance().await?; + let mut listener = PgListener::connect_with(&dbi.pool).await?; listener.listen("monzo_new_transactions").await?; let ids = vec![ @@ -214,7 +235,7 @@ mod tests { "test3".to_string(), ]; - notify_new_transactions(&db, &ids).await?; + notify_new_transactions(&dbi.db, &ids).await?; let notification = listener.recv().await?; let payload = notification.payload(); @@ -227,4 +248,52 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_notify_on_insert() -> Result<(), Error> { + let dbi = get_or_initialize_db_instance().await?; + let mut listener = PgListener::connect_with(&dbi.pool).await?; + listener.listen("monzo_new_transactions").await?; + + let json = include_str!("../../fixtures/transactions.json"); + let json: Vec> = serde_json::from_str(json).unwrap(); + let data = json + .iter() + .map(|row| from_json_row(row.clone())) + .collect::, anyhow::Error>>() + .unwrap(); + + insert(&dbi.db, data.clone()).await?; + let notification = listener.recv().await?; + let payload = notification.payload(); + let mut payload = serde_json::from_str::>(&payload)?; + payload.sort(); + + let mut ids = data.iter() + .map(|row| row.transaction_id.clone()) + .collect::>(); + + ids.sort(); + + assert_eq!(payload, ids, "Inserted IDs do not match"); + + insert(&dbi.db, data.clone()).await?; + let notification = listener.recv().await?; + let payload = notification.payload(); + let payload = serde_json::from_str::>(&payload)?; + assert_eq!(payload, Vec::::new(), "Re-inserting identical rows triggered double notification"); + + let mut altered_data = data.clone(); + altered_data[0].description = Some("New description".to_string()); + + assert_ne!(altered_data[0].compute_hash(), data[0].compute_hash(), "Alterations have the same hash"); + + insert(&dbi.db, altered_data.clone()).await?; + let notification = listener.recv().await?; + let payload = notification.payload(); + let payload = serde_json::from_str::>(&payload)?; + assert_eq!(payload, vec![altered_data[0].transaction_id.clone()], "Re-inserting altered row failed to re-trigger notification"); + + Ok(()) + } } diff --git a/src/ingestion/flex.rs b/src/ingestion/flex.rs new file mode 100644 index 0000000..2ab3bac --- /dev/null +++ b/src/ingestion/flex.rs @@ -0,0 +1,7 @@ +mod headings { + pub use super::super::ingestion_logic::headings::*; + + // Additional FLex headings + pub const MONEY_OUT: usize = 16; + pub const MONEY_IN: usize = 17; +} diff --git a/src/ingestion/ingestion_logic.rs b/src/ingestion/ingestion_logic.rs index 4ad534e..1e67948 100644 --- a/src/ingestion/ingestion_logic.rs +++ b/src/ingestion/ingestion_logic.rs @@ -11,7 +11,7 @@ use serde_json::Value; use std::hash::Hash; #[allow(dead_code)] -mod headings { +pub(crate) mod headings { pub const TRANSACTION_ID: usize = 0; pub const DATE: usize = 1; pub const TIME: usize = 2; @@ -30,19 +30,19 @@ mod headings { pub const CATEGORY_SPLIT: usize = 15; } -#[derive(Debug, Eq, PartialEq, Hash)] +#[derive(Debug, Eq, PartialEq, Hash, Clone)] pub struct MonzoRow { - category_split: Option, - primary_category: String, - total_amount: Decimal, - receipt: Option, - notes: Option, - emoji: Option, - description: Option, - transaction_type: String, - title: Option, - timestamp: NaiveDateTime, - transaction_id: String, + pub category_split: Option, + pub primary_category: String, + pub total_amount: Decimal, + pub receipt: Option, + pub notes: Option, + pub emoji: Option, + pub description: Option, + pub transaction_type: String, + pub title: Option, + pub timestamp: NaiveDateTime, + pub transaction_id: String, } impl MonzoRow { diff --git a/src/ingestion/mod.rs b/src/ingestion/mod.rs index 8460d27..59d7f8b 100644 --- a/src/ingestion/mod.rs +++ b/src/ingestion/mod.rs @@ -1,3 +1,4 @@ pub mod db; pub mod ingestion_logic; +pub mod flex; pub mod routes;