Compare commits

...

3 Commits

Author SHA1 Message Date
af0588c5ef Replace the Docker build with the one from toggl-bridge as it's better
Some checks failed
Rust CI / Build and Test (push) Has been cancelled
2024-08-09 09:57:48 +01:00
21a63a10a4 Fix some clippy lints 2024-08-09 09:56:23 +01:00
08766dc0e0 Stash 2024-08-09 09:43:10 +01:00
11 changed files with 512 additions and 519 deletions

View File

@ -1,39 +1,98 @@
name: Build and Publish Docker Container
name: Rust CI
on:
push:
branches:
- main
branches: [ main ]
pull_request:
branches: [ main ]
env:
CARGO_TERM_COLOR: always
jobs:
build:
name: Build and Test
runs-on: ubuntu-latest
container:
image: catthehacker/ubuntu:act-latest
container: catthehacker/ubuntu:act-latest
steps:
- name: Checkout code
uses: actions/checkout@v2
- uses: actions/checkout@v3
- name: Install Rust
uses: actions-rs/toolchain@v1
with:
toolchain: stable
profile: minimal
override: true
components: rustfmt, clippy
- name: Add ARM64 target
run: rustup target add aarch64-unknown-linux-gnu
- name: Install ARM64 toolchain
run: |
apt-get update
apt-get install -y gcc-aarch64-linux-gnu
- name: Cache dependencies
uses: actions/cache@v3
with:
path: |
~/.cargo
target/
key: "${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}"
restore-keys: |
${{ runner.os }}-cargo-
- name: Build (x86_64)
uses: actions-rs/cargo@v1
with:
command: build
args: --release --all-features
- name: Build (ARM64)
uses: actions-rs/cargo@v1
with:
command: build
args: --release --all-features --target aarch64-unknown-linux-gnu
env:
CARGO_TARGET_AARCH64_UNKNOWN_LINUX_GNU_LINKER: aarch64-linux-gnu-gcc
- name: Run tests
uses: actions-rs/cargo@v1
with:
command: test
args: --all-features
- name: Upload artifacts
uses: actions/upload-artifact@v3
with:
name: binaries
path: |
target/release/${{ github.event.repository.name }}
target/aarch64-unknown-linux-gnu/release/${{ github.event.repository.name }}
- name: Set up QEMU
uses: docker/setup-qemu-action@v2
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v1
uses: docker/setup-buildx-action@v2
- name: Login to Docker
uses: docker/login-action@v1
- name: Login to DockerHub
uses: docker/login-action@v2
with:
registry: git.joshuacoles.me
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Build and Push Docker image
uses: docker/build-push-action@v5
- name: Build and push multi-arch Docker image
uses: docker/build-push-action@v4
with:
context: .
file: ./Dockerfile
platforms: linux/amd64,linux/arm64
push: true
file: ./Dockerfile.cache
tags: git.joshuacoles.me/${{ github.repository }}:latest,git.joshuacoles.me/${{ github.repository }}:${{ github.sha }}
cache-from: type=gha
cache-to: type=gha,mode=max
- uses: robiningelbrecht/ntfy-action@v1.0.0
name: Notify via ntfy.sh

625
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -31,6 +31,7 @@ testcontainers-modules = { version = "0.5.0", features = ["postgres"] }
sqlx = { version = "0.7.4", features = ["postgres"] }
tower-http = { version = "0.5.2", features = ["trace"] }
bytes = "1.6.0"
once_cell = "1.19.0"
[workspace]
members = [".", "migration", "entity"]

View File

@ -1,78 +1,14 @@
# syntax=docker/dockerfile:1
# Comments are provided throughout this file to help you get started.
# If you need more help, visit the Dockerfile reference guide at
# https://docs.docker.com/engine/reference/builder/
################################################################################
# Create a stage for building the application.
ARG RUST_VERSION=1.76.0
ARG APP_NAME=monzo-ingestion
FROM rust:${RUST_VERSION}-slim-bullseye AS build
ARG APP_NAME
FROM --platform=$BUILDPLATFORM debian:bullseye-slim AS builder
ARG TARGETPLATFORM
WORKDIR /app
COPY . .
RUN case "$TARGETPLATFORM" in \
"linux/amd64") BINARY_PATH="target/release/toggl-bridge" ;; \
"linux/arm64") BINARY_PATH="target/aarch64-unknown-linux-gnu/release/toggl-bridge" ;; \
*) exit 1 ;; \
esac && \
mv "$BINARY_PATH" /usr/local/bin/toggl-bridge
# Build the application.
# Leverage a cache mount to /usr/local/cargo/registry/
# for downloaded dependencies and a cache mount to /app/target/ for
# compiled dependencies which will speed up subsequent builds.
# Leverage a bind mount to the src directory to avoid having to copy the
# source code into the container. Once built, copy the executable to an
# output directory before the cache mounted /app/target is unmounted.
RUN --mount=type=bind,source=src,target=src \
--mount=type=bind,source=entity,target=entity \
--mount=type=bind,source=migration,target=migration \
--mount=type=bind,source=Cargo.toml,target=Cargo.toml \
--mount=type=bind,source=Cargo.lock,target=Cargo.lock \
--mount=type=cache,target=/app/target/ \
--mount=type=cache,target=/usr/local/cargo/registry/ \
<<EOF
set -e
cargo build --locked --release
cp ./target/release/$APP_NAME /bin/server
EOF
################################################################################
# Create a new stage for running the application that contains the minimal
# runtime dependencies for the application. This often uses a different base
# image from the build stage where the necessary files are copied from the build
# stage.
#
# The example below uses the debian bullseye image as the foundation for running the app.
# By specifying the "bullseye-slim" tag, it will also use whatever happens to be the
# most recent version of that tag when you build your Dockerfile. If
# reproducability is important, consider using a digest
# (e.g., debian@sha256:ac707220fbd7b67fc19b112cee8170b41a9e97f703f588b2cdbbcdcecdd8af57).
FROM debian:bullseye-slim AS final
RUN set -ex; \
apt-get update && \
apt-get -y install --no-install-recommends \
ca-certificates curl && \
rm -rf /var/lib/apt/lists/*
# Create a non-privileged user that the app will run under.
# See https://docs.docker.com/develop/develop-images/dockerfile_best-practices/#user
ARG UID=10001
RUN adduser \
--disabled-password \
--gecos "" \
--home "/nonexistent" \
--shell "/sbin/nologin" \
--no-create-home \
--uid "${UID}" \
appuser
USER appuser
# Copy the executable from the "build" stage.
COPY --from=build /bin/server /bin/
# Expose the port that the application listens on.
EXPOSE 3000
HEALTHCHECK --interval=5s --timeout=3s --retries=3 \
CMD curl -f http://localhost:3000/health || exit 1
# What the container should run when it is started.
CMD ["/bin/server", "web", "--addr", "0.0.0.0:3000"]
FROM --platform=$TARGETPLATFORM debian:bullseye-slim
COPY --from=builder /usr/local/bin/toggl-bridge /usr/local/bin/
CMD ["toggl-bridge"]

View File

@ -1,58 +0,0 @@
# Stage 1: Build
ARG RUST_VERSION=1.76.0
FROM lukemathwalker/cargo-chef:latest-rust-${RUST_VERSION} as chef
WORKDIR /build/
# hadolint ignore=DL3008
RUN apt-get update && \
apt-get install -y --no-install-recommends \
lld \
clang \
libclang-dev \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
FROM chef as planner
COPY . .
RUN cargo chef prepare --recipe-path recipe.json
FROM chef as builder
COPY --from=planner /build/recipe.json recipe.json
# Build dependencies - this is the caching Docker layer!
RUN cargo chef cook --release -p monzo-ingestion --recipe-path recipe.json
# Build application
COPY . .
RUN cargo build --release -p monzo-ingestion
# Stage 2: Run
FROM debian:bookworm-slim AS runtime
RUN set -ex; \
apt-get update && \
apt-get -y install --no-install-recommends \
ca-certificates curl && \
rm -rf /var/lib/apt/lists/*
# Create a non-privileged user that the app will run under.
# See https://docs.docker.com/develop/develop-images/dockerfile_best-practices/#user
ARG UID=10001
RUN adduser \
--disabled-password \
--gecos "" \
--home "/nonexistent" \
--shell "/sbin/nologin" \
--no-create-home \
--uid "${UID}" \
appuser
USER appuser
# Copy the executable from the "build" stage.
COPY --from=builder /build/target/release/monzo-ingestion /bin/server
# Expose the port that the application listens on.
EXPOSE 3000
HEALTHCHECK --interval=5s --timeout=3s --retries=3 \
CMD curl -f http://localhost:3000/health || exit 1
# What the container should run when it is started.
CMD ["/bin/server", "serve", "--addr", "0.0.0.0:3000"]

View File

@ -29,6 +29,7 @@ impl AppError {
impl IntoResponse for AppError {
fn into_response(self) -> Response {
let status_code = match self {
AppError::BadRequest(_) => StatusCode::BAD_REQUEST,
_ => StatusCode::INTERNAL_SERVER_ERROR,
};

View File

@ -29,7 +29,7 @@ pub async fn insert(
for insertions in insertions.chunks(400) {
let (new_or_updated_insertions, inserted_transaction_ids) =
whittle_insertions(insertions, &db).await?;
whittle_insertions(insertions, db).await?;
if new_or_updated_insertions.is_empty() {
continue;
@ -69,20 +69,20 @@ async fn update_expenditures(
expenditure::Entity::insert_many(
new_or_updated_insertions
.into_iter()
.iter()
.flat_map(|i| &i.contained_expenditures)
.cloned(),
)
.on_conflict(
OnConflict::columns(vec![
expenditure::Column::TransactionId,
expenditure::Column::Category,
])
.update_columns(expenditure::Column::iter())
.to_owned(),
)
.exec(tx)
.await?;
.on_conflict(
OnConflict::columns(vec![
expenditure::Column::TransactionId,
expenditure::Column::Category,
])
.update_columns(expenditure::Column::iter())
.to_owned(),
)
.exec(tx)
.await?;
Ok(())
}
@ -118,19 +118,21 @@ async fn whittle_insertions<'a>(
.select_only()
.columns([transaction::Column::IdentityHash])
.filter(transaction::Column::IdentityHash.is_not_null())
.into_tuple::<(i64,)>()
.into_tuple::<(i64, )>()
.all(tx)
.await?;
tracing::debug!("Found existing entries: {existing_hashes:?}");
// We will only update those where the hash is different to avoid unnecessary updates and
// notifications.
let new_or_updated_insertions = insertions
.into_iter()
.iter()
.filter(|i| {
let hash = i.identity_hash;
!existing_hashes
.iter()
.any(|(existing_hash,)| *existing_hash == hash)
.any(|(existing_hash, )| *existing_hash == hash)
})
.collect::<Vec<_>>();
@ -155,23 +157,28 @@ async fn notify_new_transactions(
}
mod tests {
use super::{notify_new_transactions, update_expenditures, update_transactions};
use super::{insert, notify_new_transactions, update_expenditures, update_transactions};
use anyhow::Error;
use tokio::sync::OnceCell;
use migration::MigratorTrait;
use sea_orm::{DatabaseConnection, TransactionTrait};
use serde_json::Value;
use sqlx::postgres::PgListener;
use sqlx::PgPool;
use testcontainers::runners::AsyncRunner;
use testcontainers::ContainerAsync;
use crate::ingestion::ingestion_logic::from_json_row;
async fn initialise() -> Result<
(
ContainerAsync<testcontainers_modules::postgres::Postgres>,
DatabaseConnection,
PgPool,
),
Error,
> {
#[derive(Debug)]
struct DatabaseInstance {
container: ContainerAsync<testcontainers_modules::postgres::Postgres>,
db: DatabaseConnection,
pool: PgPool,
}
static INSTANCE: OnceCell<DatabaseInstance> = OnceCell::const_new();
async fn initialise_db() -> Result<DatabaseInstance, Error> {
let container = testcontainers_modules::postgres::Postgres::default()
.start()
.await?;
@ -186,15 +193,29 @@ mod tests {
migration::Migrator::up(&db, None).await?;
let pool = PgPool::connect(connection_string).await?;
let instance = DatabaseInstance {
container,
db,
pool,
};
Ok((container, db, pool))
Ok(instance)
}
async fn get_or_initialize_db_instance() -> Result<
&'static DatabaseInstance,
Error,
> {
Ok(INSTANCE.get_or_init(|| async {
initialise_db().await.unwrap()
}).await)
}
#[tokio::test]
async fn test_no_new_insertions() -> Result<(), Error> {
let (_container, db, _pool) = initialise().await?;
async fn test_empty_insertion_list() -> Result<(), Error> {
let db = get_or_initialize_db_instance().await?;
let insertions = vec![];
let tx = db.begin().await?;
let tx = db.db.begin().await?;
update_transactions(&tx, &insertions).await?;
update_expenditures(&tx, &insertions, &vec![]).await?;
tx.commit().await?;
@ -204,8 +225,8 @@ mod tests {
#[tokio::test]
async fn test_notify() -> Result<(), Error> {
let (_container, db, pool) = initialise().await?;
let mut listener = PgListener::connect_with(&pool).await?;
let dbi = get_or_initialize_db_instance().await?;
let mut listener = PgListener::connect_with(&dbi.pool).await?;
listener.listen("monzo_new_transactions").await?;
let ids = vec![
@ -214,7 +235,7 @@ mod tests {
"test3".to_string(),
];
notify_new_transactions(&db, &ids).await?;
notify_new_transactions(&dbi.db, &ids).await?;
let notification = listener.recv().await?;
let payload = notification.payload();
@ -227,4 +248,52 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn test_notify_on_insert() -> Result<(), Error> {
let dbi = get_or_initialize_db_instance().await?;
let mut listener = PgListener::connect_with(&dbi.pool).await?;
listener.listen("monzo_new_transactions").await?;
let json = include_str!("../../fixtures/transactions.json");
let json: Vec<Vec<Value>> = serde_json::from_str(json).unwrap();
let data = json
.iter()
.map(|row| from_json_row(row.clone()))
.collect::<Result<Vec<_>, anyhow::Error>>()
.unwrap();
insert(&dbi.db, data.clone()).await?;
let notification = listener.recv().await?;
let payload = notification.payload();
let mut payload = serde_json::from_str::<Vec<String>>(&payload)?;
payload.sort();
let mut ids = data.iter()
.map(|row| row.transaction_id.clone())
.collect::<Vec<_>>();
ids.sort();
assert_eq!(payload, ids, "Inserted IDs do not match");
insert(&dbi.db, data.clone()).await?;
let notification = listener.recv().await?;
let payload = notification.payload();
let payload = serde_json::from_str::<Vec<String>>(&payload)?;
assert_eq!(payload, Vec::<String>::new(), "Re-inserting identical rows triggered double notification");
let mut altered_data = data.clone();
altered_data[0].description = Some("New description".to_string());
assert_ne!(altered_data[0].compute_hash(), data[0].compute_hash(), "Alterations have the same hash");
insert(&dbi.db, altered_data.clone()).await?;
let notification = listener.recv().await?;
let payload = notification.payload();
let payload = serde_json::from_str::<Vec<String>>(&payload)?;
assert_eq!(payload, vec![altered_data[0].transaction_id.clone()], "Re-inserting altered row failed to re-trigger notification");
Ok(())
}
}

9
src/ingestion/flex.rs Normal file
View File

@ -0,0 +1,9 @@
#[allow(dead_code)]
mod headings {
#[allow(unused_imports)]
pub use super::super::ingestion_logic::headings::*;
// Additional FLex headings
pub const MONEY_OUT: usize = 16;
pub const MONEY_IN: usize = 17;
}

View File

@ -11,7 +11,7 @@ use serde_json::Value;
use std::hash::Hash;
#[allow(dead_code)]
mod headings {
pub(crate) mod headings {
pub const TRANSACTION_ID: usize = 0;
pub const DATE: usize = 1;
pub const TIME: usize = 2;
@ -30,19 +30,19 @@ mod headings {
pub const CATEGORY_SPLIT: usize = 15;
}
#[derive(Debug, Eq, PartialEq, Hash)]
#[derive(Debug, Eq, PartialEq, Hash, Clone)]
pub struct MonzoRow {
category_split: Option<String>,
primary_category: String,
total_amount: Decimal,
receipt: Option<String>,
notes: Option<String>,
emoji: Option<String>,
description: Option<String>,
transaction_type: String,
title: Option<String>,
timestamp: NaiveDateTime,
transaction_id: String,
pub category_split: Option<String>,
pub primary_category: String,
pub total_amount: Decimal,
pub receipt: Option<String>,
pub notes: Option<String>,
pub emoji: Option<String>,
pub description: Option<String>,
pub transaction_type: String,
pub title: Option<String>,
pub timestamp: NaiveDateTime,
pub transaction_id: String,
}
impl MonzoRow {

View File

@ -1,3 +1,4 @@
pub mod db;
pub mod ingestion_logic;
pub mod flex;
pub mod routes;

View File

@ -43,7 +43,7 @@ pub async fn monzo_batched_csv(
Extension(db): Extension<DatabaseConnection>,
multipart: Multipart,
) -> Result<&'static str, AppError> {
static CSV_MISSING_ERR_MSG: &'static str = "No CSV file provided. Expected a multipart request with a `csv` field containing the contents of the CSV.";
static CSV_MISSING_ERR_MSG: &str = "No CSV file provided. Expected a multipart request with a `csv` field containing the contents of the CSV.";
let csv = extract_csv(multipart)
.await