This commit is contained in:
Joshua Coles 2024-08-09 09:43:10 +01:00
parent bf47520d31
commit 08766dc0e0
7 changed files with 149 additions and 80 deletions

1
Cargo.lock generated
View File

@ -1757,6 +1757,7 @@ dependencies = [
"http",
"migration",
"num-traits",
"once_cell",
"sea-orm",
"serde",
"serde_json",

View File

@ -31,6 +31,7 @@ testcontainers-modules = { version = "0.5.0", features = ["postgres"] }
sqlx = { version = "0.7.4", features = ["postgres"] }
tower-http = { version = "0.5.2", features = ["trace"] }
bytes = "1.6.0"
once_cell = "1.19.0"
[workspace]
members = [".", "migration", "entity"]

View File

@ -1,58 +1,48 @@
# Stage 1: Build
# Setup the base build image, this will be used for planning (to cache dependencies) and axctually building the image
ARG RUST_VERSION=1.76.0
FROM lukemathwalker/cargo-chef:latest-rust-${RUST_VERSION} as chef
WORKDIR /build/
# hadolint ignore=DL3008
FROM clux/muslrust:${RUST_VERSION}-stable AS chef
USER root
RUN apt-get update && \
apt-get install -y --no-install-recommends \
lld \
clang \
libclang-dev \
lld musl-tools clang libclang-dev llvm \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
ARG RUST_TARGET_ARCH=aarch64
ARG RUST_TARGET=${RUST_TARGET_ARCH}-unknown-linux-musl
ENV CC_${RUST_TARGET_ARCH}_unknown_linux_musl=clang
ENV AR_${RUST_TARGET_ARCH}_unknown_linux_musl=llvm-ar
ENV CARGO_TARGET_${RUST_TARGET_ARCH}_UNKNOWN_LINUX_MUSL_RUSTFLAGS="-Clink-self-contained=yes -Clinker=rust-lld"
RUN cargo install cargo-chef
WORKDIR /app
FROM chef as planner
COPY . .
RUN cargo chef prepare --recipe-path recipe.json
FROM chef as builder
COPY --from=planner /build/recipe.json recipe.json
# Build dependencies - this is the caching Docker layer!
RUN cargo chef cook --release -p monzo-ingestion --recipe-path recipe.json
# Build application
FROM chef AS builder
ARG BINARY=monzo-ingestion
ARG RUST_TARGET_ARCH=aarch64
ARG RUST_TARGET=${RUST_TARGET_ARCH}-unknown-linux-musl
COPY --from=planner /app/recipe.json recipe.json
RUN cargo chef cook --release --target "${RUST_TARGET}" --recipe-path recipe.json
COPY . .
RUN cargo build --release -p monzo-ingestion
RUN cargo build --release --target "${RUST_TARGET}" --bin "${BINARY}"
# Stage 2: Run
FROM debian:bookworm-slim AS runtime
FROM alpine AS runtime
ARG APP_USER=appuser
RUN addgroup -S ${APP_USER} && adduser -S ${APP_USER} -G ${APP_USER}
RUN apk add --no-cache ca-certificates curl
RUN set -ex; \
apt-get update && \
apt-get -y install --no-install-recommends \
ca-certificates curl && \
rm -rf /var/lib/apt/lists/*
COPY --from=builder /app/target/${RUST_TARGET}/release/${BINARY} /usr/local/bin/server
# Create a non-privileged user that the app will run under.
# See https://docs.docker.com/develop/develop-images/dockerfile_best-practices/#user
ARG UID=10001
RUN adduser \
--disabled-password \
--gecos "" \
--home "/nonexistent" \
--shell "/sbin/nologin" \
--no-create-home \
--uid "${UID}" \
appuser
USER appuser
# Copy the executable from the "build" stage.
COPY --from=builder /build/target/release/monzo-ingestion /bin/server
# Expose the port that the application listens on.
EXPOSE 3000
HEALTHCHECK --interval=5s --timeout=3s --retries=3 \
CMD curl -f http://localhost:3000/health || exit 1
# What the container should run when it is started.
CMD ["/bin/server", "serve", "--addr", "0.0.0.0:3000"]
USER ${APP_USER}
CMD ["/usr/local/bin/server", "serve", "--addr", "0.0.0.0:3000"]

View File

@ -73,16 +73,16 @@ async fn update_expenditures(
.flat_map(|i| &i.contained_expenditures)
.cloned(),
)
.on_conflict(
OnConflict::columns(vec![
expenditure::Column::TransactionId,
expenditure::Column::Category,
])
.update_columns(expenditure::Column::iter())
.to_owned(),
)
.exec(tx)
.await?;
.on_conflict(
OnConflict::columns(vec![
expenditure::Column::TransactionId,
expenditure::Column::Category,
])
.update_columns(expenditure::Column::iter())
.to_owned(),
)
.exec(tx)
.await?;
Ok(())
}
@ -118,10 +118,12 @@ async fn whittle_insertions<'a>(
.select_only()
.columns([transaction::Column::IdentityHash])
.filter(transaction::Column::IdentityHash.is_not_null())
.into_tuple::<(i64,)>()
.into_tuple::<(i64, )>()
.all(tx)
.await?;
tracing::debug!("Found existing entries: {existing_hashes:?}");
// We will only update those where the hash is different to avoid unnecessary updates and
// notifications.
let new_or_updated_insertions = insertions
@ -130,7 +132,7 @@ async fn whittle_insertions<'a>(
let hash = i.identity_hash;
!existing_hashes
.iter()
.any(|(existing_hash,)| *existing_hash == hash)
.any(|(existing_hash, )| *existing_hash == hash)
})
.collect::<Vec<_>>();
@ -155,23 +157,28 @@ async fn notify_new_transactions(
}
mod tests {
use super::{notify_new_transactions, update_expenditures, update_transactions};
use super::{insert, notify_new_transactions, update_expenditures, update_transactions};
use anyhow::Error;
use tokio::sync::OnceCell;
use migration::MigratorTrait;
use sea_orm::{DatabaseConnection, TransactionTrait};
use serde_json::Value;
use sqlx::postgres::PgListener;
use sqlx::PgPool;
use testcontainers::runners::AsyncRunner;
use testcontainers::ContainerAsync;
use crate::ingestion::ingestion_logic::from_json_row;
async fn initialise() -> Result<
(
ContainerAsync<testcontainers_modules::postgres::Postgres>,
DatabaseConnection,
PgPool,
),
Error,
> {
#[derive(Debug)]
struct DatabaseInstance {
container: ContainerAsync<testcontainers_modules::postgres::Postgres>,
db: DatabaseConnection,
pool: PgPool,
}
static INSTANCE: OnceCell<DatabaseInstance> = OnceCell::const_new();
async fn initialise_db() -> Result<DatabaseInstance, Error> {
let container = testcontainers_modules::postgres::Postgres::default()
.start()
.await?;
@ -186,15 +193,29 @@ mod tests {
migration::Migrator::up(&db, None).await?;
let pool = PgPool::connect(connection_string).await?;
let instance = DatabaseInstance {
container,
db,
pool,
};
Ok((container, db, pool))
Ok(instance)
}
async fn get_or_initialize_db_instance() -> Result<
&'static DatabaseInstance,
Error,
> {
Ok(INSTANCE.get_or_init(|| async {
initialise_db().await.unwrap()
}).await)
}
#[tokio::test]
async fn test_no_new_insertions() -> Result<(), Error> {
let (_container, db, _pool) = initialise().await?;
async fn test_empty_insertion_list() -> Result<(), Error> {
let db = get_or_initialize_db_instance().await?;
let insertions = vec![];
let tx = db.begin().await?;
let tx = db.db.begin().await?;
update_transactions(&tx, &insertions).await?;
update_expenditures(&tx, &insertions, &vec![]).await?;
tx.commit().await?;
@ -204,8 +225,8 @@ mod tests {
#[tokio::test]
async fn test_notify() -> Result<(), Error> {
let (_container, db, pool) = initialise().await?;
let mut listener = PgListener::connect_with(&pool).await?;
let dbi = get_or_initialize_db_instance().await?;
let mut listener = PgListener::connect_with(&dbi.pool).await?;
listener.listen("monzo_new_transactions").await?;
let ids = vec![
@ -214,7 +235,7 @@ mod tests {
"test3".to_string(),
];
notify_new_transactions(&db, &ids).await?;
notify_new_transactions(&dbi.db, &ids).await?;
let notification = listener.recv().await?;
let payload = notification.payload();
@ -227,4 +248,52 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn test_notify_on_insert() -> Result<(), Error> {
let dbi = get_or_initialize_db_instance().await?;
let mut listener = PgListener::connect_with(&dbi.pool).await?;
listener.listen("monzo_new_transactions").await?;
let json = include_str!("../../fixtures/transactions.json");
let json: Vec<Vec<Value>> = serde_json::from_str(json).unwrap();
let data = json
.iter()
.map(|row| from_json_row(row.clone()))
.collect::<Result<Vec<_>, anyhow::Error>>()
.unwrap();
insert(&dbi.db, data.clone()).await?;
let notification = listener.recv().await?;
let payload = notification.payload();
let mut payload = serde_json::from_str::<Vec<String>>(&payload)?;
payload.sort();
let mut ids = data.iter()
.map(|row| row.transaction_id.clone())
.collect::<Vec<_>>();
ids.sort();
assert_eq!(payload, ids, "Inserted IDs do not match");
insert(&dbi.db, data.clone()).await?;
let notification = listener.recv().await?;
let payload = notification.payload();
let payload = serde_json::from_str::<Vec<String>>(&payload)?;
assert_eq!(payload, Vec::<String>::new(), "Re-inserting identical rows triggered double notification");
let mut altered_data = data.clone();
altered_data[0].description = Some("New description".to_string());
assert_ne!(altered_data[0].compute_hash(), data[0].compute_hash(), "Alterations have the same hash");
insert(&dbi.db, altered_data.clone()).await?;
let notification = listener.recv().await?;
let payload = notification.payload();
let payload = serde_json::from_str::<Vec<String>>(&payload)?;
assert_eq!(payload, vec![altered_data[0].transaction_id.clone()], "Re-inserting altered row failed to re-trigger notification");
Ok(())
}
}

7
src/ingestion/flex.rs Normal file
View File

@ -0,0 +1,7 @@
mod headings {
pub use super::super::ingestion_logic::headings::*;
// Additional FLex headings
pub const MONEY_OUT: usize = 16;
pub const MONEY_IN: usize = 17;
}

View File

@ -11,7 +11,7 @@ use serde_json::Value;
use std::hash::Hash;
#[allow(dead_code)]
mod headings {
pub(crate) mod headings {
pub const TRANSACTION_ID: usize = 0;
pub const DATE: usize = 1;
pub const TIME: usize = 2;
@ -30,19 +30,19 @@ mod headings {
pub const CATEGORY_SPLIT: usize = 15;
}
#[derive(Debug, Eq, PartialEq, Hash)]
#[derive(Debug, Eq, PartialEq, Hash, Clone)]
pub struct MonzoRow {
category_split: Option<String>,
primary_category: String,
total_amount: Decimal,
receipt: Option<String>,
notes: Option<String>,
emoji: Option<String>,
description: Option<String>,
transaction_type: String,
title: Option<String>,
timestamp: NaiveDateTime,
transaction_id: String,
pub category_split: Option<String>,
pub primary_category: String,
pub total_amount: Decimal,
pub receipt: Option<String>,
pub notes: Option<String>,
pub emoji: Option<String>,
pub description: Option<String>,
pub transaction_type: String,
pub title: Option<String>,
pub timestamp: NaiveDateTime,
pub transaction_id: String,
}
impl MonzoRow {

View File

@ -1,3 +1,4 @@
pub mod db;
pub mod ingestion_logic;
pub mod flex;
pub mod routes;