Compare commits
3 Commits
bf47520d31
...
af0588c5ef
| Author | SHA1 | Date | |
|---|---|---|---|
| af0588c5ef | |||
| 21a63a10a4 | |||
| 08766dc0e0 |
89
.github/workflows/build.yml
vendored
89
.github/workflows/build.yml
vendored
@ -1,39 +1,98 @@
|
||||
name: Build and Publish Docker Container
|
||||
name: Rust CI
|
||||
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
- main
|
||||
branches: [ main ]
|
||||
pull_request:
|
||||
branches: [ main ]
|
||||
|
||||
env:
|
||||
CARGO_TERM_COLOR: always
|
||||
|
||||
jobs:
|
||||
build:
|
||||
name: Build and Test
|
||||
runs-on: ubuntu-latest
|
||||
container:
|
||||
image: catthehacker/ubuntu:act-latest
|
||||
container: catthehacker/ubuntu:act-latest
|
||||
|
||||
steps:
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v2
|
||||
- uses: actions/checkout@v3
|
||||
|
||||
- name: Install Rust
|
||||
uses: actions-rs/toolchain@v1
|
||||
with:
|
||||
toolchain: stable
|
||||
profile: minimal
|
||||
override: true
|
||||
components: rustfmt, clippy
|
||||
|
||||
- name: Add ARM64 target
|
||||
run: rustup target add aarch64-unknown-linux-gnu
|
||||
|
||||
- name: Install ARM64 toolchain
|
||||
run: |
|
||||
apt-get update
|
||||
apt-get install -y gcc-aarch64-linux-gnu
|
||||
|
||||
- name: Cache dependencies
|
||||
uses: actions/cache@v3
|
||||
with:
|
||||
path: |
|
||||
~/.cargo
|
||||
target/
|
||||
key: "${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}"
|
||||
restore-keys: |
|
||||
${{ runner.os }}-cargo-
|
||||
|
||||
- name: Build (x86_64)
|
||||
uses: actions-rs/cargo@v1
|
||||
with:
|
||||
command: build
|
||||
args: --release --all-features
|
||||
|
||||
- name: Build (ARM64)
|
||||
uses: actions-rs/cargo@v1
|
||||
with:
|
||||
command: build
|
||||
args: --release --all-features --target aarch64-unknown-linux-gnu
|
||||
env:
|
||||
CARGO_TARGET_AARCH64_UNKNOWN_LINUX_GNU_LINKER: aarch64-linux-gnu-gcc
|
||||
|
||||
- name: Run tests
|
||||
uses: actions-rs/cargo@v1
|
||||
with:
|
||||
command: test
|
||||
args: --all-features
|
||||
|
||||
- name: Upload artifacts
|
||||
uses: actions/upload-artifact@v3
|
||||
with:
|
||||
name: binaries
|
||||
path: |
|
||||
target/release/${{ github.event.repository.name }}
|
||||
target/aarch64-unknown-linux-gnu/release/${{ github.event.repository.name }}
|
||||
|
||||
- name: Set up QEMU
|
||||
uses: docker/setup-qemu-action@v2
|
||||
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@v1
|
||||
uses: docker/setup-buildx-action@v2
|
||||
|
||||
- name: Login to Docker
|
||||
uses: docker/login-action@v1
|
||||
- name: Login to DockerHub
|
||||
uses: docker/login-action@v2
|
||||
with:
|
||||
registry: git.joshuacoles.me
|
||||
username: ${{ secrets.DOCKER_USERNAME }}
|
||||
password: ${{ secrets.DOCKER_PASSWORD }}
|
||||
|
||||
- name: Build and Push Docker image
|
||||
uses: docker/build-push-action@v5
|
||||
- name: Build and push multi-arch Docker image
|
||||
uses: docker/build-push-action@v4
|
||||
with:
|
||||
context: .
|
||||
file: ./Dockerfile
|
||||
platforms: linux/amd64,linux/arm64
|
||||
push: true
|
||||
file: ./Dockerfile.cache
|
||||
tags: git.joshuacoles.me/${{ github.repository }}:latest,git.joshuacoles.me/${{ github.repository }}:${{ github.sha }}
|
||||
cache-from: type=gha
|
||||
cache-to: type=gha,mode=max
|
||||
|
||||
- uses: robiningelbrecht/ntfy-action@v1.0.0
|
||||
name: Notify via ntfy.sh
|
||||
|
||||
625
Cargo.lock
generated
625
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@ -31,6 +31,7 @@ testcontainers-modules = { version = "0.5.0", features = ["postgres"] }
|
||||
sqlx = { version = "0.7.4", features = ["postgres"] }
|
||||
tower-http = { version = "0.5.2", features = ["trace"] }
|
||||
bytes = "1.6.0"
|
||||
once_cell = "1.19.0"
|
||||
|
||||
[workspace]
|
||||
members = [".", "migration", "entity"]
|
||||
|
||||
88
Dockerfile
88
Dockerfile
@ -1,78 +1,14 @@
|
||||
# syntax=docker/dockerfile:1
|
||||
|
||||
# Comments are provided throughout this file to help you get started.
|
||||
# If you need more help, visit the Dockerfile reference guide at
|
||||
# https://docs.docker.com/engine/reference/builder/
|
||||
|
||||
################################################################################
|
||||
# Create a stage for building the application.
|
||||
|
||||
ARG RUST_VERSION=1.76.0
|
||||
ARG APP_NAME=monzo-ingestion
|
||||
FROM rust:${RUST_VERSION}-slim-bullseye AS build
|
||||
ARG APP_NAME
|
||||
FROM --platform=$BUILDPLATFORM debian:bullseye-slim AS builder
|
||||
ARG TARGETPLATFORM
|
||||
WORKDIR /app
|
||||
COPY . .
|
||||
RUN case "$TARGETPLATFORM" in \
|
||||
"linux/amd64") BINARY_PATH="target/release/toggl-bridge" ;; \
|
||||
"linux/arm64") BINARY_PATH="target/aarch64-unknown-linux-gnu/release/toggl-bridge" ;; \
|
||||
*) exit 1 ;; \
|
||||
esac && \
|
||||
mv "$BINARY_PATH" /usr/local/bin/toggl-bridge
|
||||
|
||||
# Build the application.
|
||||
# Leverage a cache mount to /usr/local/cargo/registry/
|
||||
# for downloaded dependencies and a cache mount to /app/target/ for
|
||||
# compiled dependencies which will speed up subsequent builds.
|
||||
# Leverage a bind mount to the src directory to avoid having to copy the
|
||||
# source code into the container. Once built, copy the executable to an
|
||||
# output directory before the cache mounted /app/target is unmounted.
|
||||
RUN --mount=type=bind,source=src,target=src \
|
||||
--mount=type=bind,source=entity,target=entity \
|
||||
--mount=type=bind,source=migration,target=migration \
|
||||
--mount=type=bind,source=Cargo.toml,target=Cargo.toml \
|
||||
--mount=type=bind,source=Cargo.lock,target=Cargo.lock \
|
||||
--mount=type=cache,target=/app/target/ \
|
||||
--mount=type=cache,target=/usr/local/cargo/registry/ \
|
||||
<<EOF
|
||||
set -e
|
||||
cargo build --locked --release
|
||||
cp ./target/release/$APP_NAME /bin/server
|
||||
EOF
|
||||
|
||||
################################################################################
|
||||
# Create a new stage for running the application that contains the minimal
|
||||
# runtime dependencies for the application. This often uses a different base
|
||||
# image from the build stage where the necessary files are copied from the build
|
||||
# stage.
|
||||
#
|
||||
# The example below uses the debian bullseye image as the foundation for running the app.
|
||||
# By specifying the "bullseye-slim" tag, it will also use whatever happens to be the
|
||||
# most recent version of that tag when you build your Dockerfile. If
|
||||
# reproducability is important, consider using a digest
|
||||
# (e.g., debian@sha256:ac707220fbd7b67fc19b112cee8170b41a9e97f703f588b2cdbbcdcecdd8af57).
|
||||
FROM debian:bullseye-slim AS final
|
||||
|
||||
RUN set -ex; \
|
||||
apt-get update && \
|
||||
apt-get -y install --no-install-recommends \
|
||||
ca-certificates curl && \
|
||||
rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# Create a non-privileged user that the app will run under.
|
||||
# See https://docs.docker.com/develop/develop-images/dockerfile_best-practices/#user
|
||||
ARG UID=10001
|
||||
RUN adduser \
|
||||
--disabled-password \
|
||||
--gecos "" \
|
||||
--home "/nonexistent" \
|
||||
--shell "/sbin/nologin" \
|
||||
--no-create-home \
|
||||
--uid "${UID}" \
|
||||
appuser
|
||||
USER appuser
|
||||
|
||||
# Copy the executable from the "build" stage.
|
||||
COPY --from=build /bin/server /bin/
|
||||
|
||||
# Expose the port that the application listens on.
|
||||
EXPOSE 3000
|
||||
|
||||
HEALTHCHECK --interval=5s --timeout=3s --retries=3 \
|
||||
CMD curl -f http://localhost:3000/health || exit 1
|
||||
|
||||
# What the container should run when it is started.
|
||||
CMD ["/bin/server", "web", "--addr", "0.0.0.0:3000"]
|
||||
FROM --platform=$TARGETPLATFORM debian:bullseye-slim
|
||||
COPY --from=builder /usr/local/bin/toggl-bridge /usr/local/bin/
|
||||
CMD ["toggl-bridge"]
|
||||
|
||||
@ -1,58 +0,0 @@
|
||||
# Stage 1: Build
|
||||
ARG RUST_VERSION=1.76.0
|
||||
FROM lukemathwalker/cargo-chef:latest-rust-${RUST_VERSION} as chef
|
||||
WORKDIR /build/
|
||||
# hadolint ignore=DL3008
|
||||
RUN apt-get update && \
|
||||
apt-get install -y --no-install-recommends \
|
||||
lld \
|
||||
clang \
|
||||
libclang-dev \
|
||||
&& apt-get clean \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
FROM chef as planner
|
||||
COPY . .
|
||||
RUN cargo chef prepare --recipe-path recipe.json
|
||||
|
||||
FROM chef as builder
|
||||
COPY --from=planner /build/recipe.json recipe.json
|
||||
# Build dependencies - this is the caching Docker layer!
|
||||
RUN cargo chef cook --release -p monzo-ingestion --recipe-path recipe.json
|
||||
# Build application
|
||||
COPY . .
|
||||
RUN cargo build --release -p monzo-ingestion
|
||||
|
||||
# Stage 2: Run
|
||||
FROM debian:bookworm-slim AS runtime
|
||||
|
||||
RUN set -ex; \
|
||||
apt-get update && \
|
||||
apt-get -y install --no-install-recommends \
|
||||
ca-certificates curl && \
|
||||
rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# Create a non-privileged user that the app will run under.
|
||||
# See https://docs.docker.com/develop/develop-images/dockerfile_best-practices/#user
|
||||
ARG UID=10001
|
||||
RUN adduser \
|
||||
--disabled-password \
|
||||
--gecos "" \
|
||||
--home "/nonexistent" \
|
||||
--shell "/sbin/nologin" \
|
||||
--no-create-home \
|
||||
--uid "${UID}" \
|
||||
appuser
|
||||
USER appuser
|
||||
|
||||
# Copy the executable from the "build" stage.
|
||||
COPY --from=builder /build/target/release/monzo-ingestion /bin/server
|
||||
|
||||
# Expose the port that the application listens on.
|
||||
EXPOSE 3000
|
||||
|
||||
HEALTHCHECK --interval=5s --timeout=3s --retries=3 \
|
||||
CMD curl -f http://localhost:3000/health || exit 1
|
||||
|
||||
# What the container should run when it is started.
|
||||
CMD ["/bin/server", "serve", "--addr", "0.0.0.0:3000"]
|
||||
@ -29,6 +29,7 @@ impl AppError {
|
||||
impl IntoResponse for AppError {
|
||||
fn into_response(self) -> Response {
|
||||
let status_code = match self {
|
||||
AppError::BadRequest(_) => StatusCode::BAD_REQUEST,
|
||||
_ => StatusCode::INTERNAL_SERVER_ERROR,
|
||||
};
|
||||
|
||||
|
||||
@ -29,7 +29,7 @@ pub async fn insert(
|
||||
|
||||
for insertions in insertions.chunks(400) {
|
||||
let (new_or_updated_insertions, inserted_transaction_ids) =
|
||||
whittle_insertions(insertions, &db).await?;
|
||||
whittle_insertions(insertions, db).await?;
|
||||
|
||||
if new_or_updated_insertions.is_empty() {
|
||||
continue;
|
||||
@ -69,20 +69,20 @@ async fn update_expenditures(
|
||||
|
||||
expenditure::Entity::insert_many(
|
||||
new_or_updated_insertions
|
||||
.into_iter()
|
||||
.iter()
|
||||
.flat_map(|i| &i.contained_expenditures)
|
||||
.cloned(),
|
||||
)
|
||||
.on_conflict(
|
||||
OnConflict::columns(vec![
|
||||
expenditure::Column::TransactionId,
|
||||
expenditure::Column::Category,
|
||||
])
|
||||
.update_columns(expenditure::Column::iter())
|
||||
.to_owned(),
|
||||
)
|
||||
.exec(tx)
|
||||
.await?;
|
||||
.on_conflict(
|
||||
OnConflict::columns(vec![
|
||||
expenditure::Column::TransactionId,
|
||||
expenditure::Column::Category,
|
||||
])
|
||||
.update_columns(expenditure::Column::iter())
|
||||
.to_owned(),
|
||||
)
|
||||
.exec(tx)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -118,19 +118,21 @@ async fn whittle_insertions<'a>(
|
||||
.select_only()
|
||||
.columns([transaction::Column::IdentityHash])
|
||||
.filter(transaction::Column::IdentityHash.is_not_null())
|
||||
.into_tuple::<(i64,)>()
|
||||
.into_tuple::<(i64, )>()
|
||||
.all(tx)
|
||||
.await?;
|
||||
|
||||
tracing::debug!("Found existing entries: {existing_hashes:?}");
|
||||
|
||||
// We will only update those where the hash is different to avoid unnecessary updates and
|
||||
// notifications.
|
||||
let new_or_updated_insertions = insertions
|
||||
.into_iter()
|
||||
.iter()
|
||||
.filter(|i| {
|
||||
let hash = i.identity_hash;
|
||||
!existing_hashes
|
||||
.iter()
|
||||
.any(|(existing_hash,)| *existing_hash == hash)
|
||||
.any(|(existing_hash, )| *existing_hash == hash)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
@ -155,23 +157,28 @@ async fn notify_new_transactions(
|
||||
}
|
||||
|
||||
mod tests {
|
||||
use super::{notify_new_transactions, update_expenditures, update_transactions};
|
||||
use super::{insert, notify_new_transactions, update_expenditures, update_transactions};
|
||||
use anyhow::Error;
|
||||
use tokio::sync::OnceCell;
|
||||
use migration::MigratorTrait;
|
||||
use sea_orm::{DatabaseConnection, TransactionTrait};
|
||||
use serde_json::Value;
|
||||
use sqlx::postgres::PgListener;
|
||||
use sqlx::PgPool;
|
||||
use testcontainers::runners::AsyncRunner;
|
||||
use testcontainers::ContainerAsync;
|
||||
use crate::ingestion::ingestion_logic::from_json_row;
|
||||
|
||||
async fn initialise() -> Result<
|
||||
(
|
||||
ContainerAsync<testcontainers_modules::postgres::Postgres>,
|
||||
DatabaseConnection,
|
||||
PgPool,
|
||||
),
|
||||
Error,
|
||||
> {
|
||||
#[derive(Debug)]
|
||||
struct DatabaseInstance {
|
||||
container: ContainerAsync<testcontainers_modules::postgres::Postgres>,
|
||||
db: DatabaseConnection,
|
||||
pool: PgPool,
|
||||
}
|
||||
|
||||
static INSTANCE: OnceCell<DatabaseInstance> = OnceCell::const_new();
|
||||
|
||||
async fn initialise_db() -> Result<DatabaseInstance, Error> {
|
||||
let container = testcontainers_modules::postgres::Postgres::default()
|
||||
.start()
|
||||
.await?;
|
||||
@ -186,15 +193,29 @@ mod tests {
|
||||
migration::Migrator::up(&db, None).await?;
|
||||
|
||||
let pool = PgPool::connect(connection_string).await?;
|
||||
let instance = DatabaseInstance {
|
||||
container,
|
||||
db,
|
||||
pool,
|
||||
};
|
||||
|
||||
Ok((container, db, pool))
|
||||
Ok(instance)
|
||||
}
|
||||
|
||||
async fn get_or_initialize_db_instance() -> Result<
|
||||
&'static DatabaseInstance,
|
||||
Error,
|
||||
> {
|
||||
Ok(INSTANCE.get_or_init(|| async {
|
||||
initialise_db().await.unwrap()
|
||||
}).await)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_no_new_insertions() -> Result<(), Error> {
|
||||
let (_container, db, _pool) = initialise().await?;
|
||||
async fn test_empty_insertion_list() -> Result<(), Error> {
|
||||
let db = get_or_initialize_db_instance().await?;
|
||||
let insertions = vec![];
|
||||
let tx = db.begin().await?;
|
||||
let tx = db.db.begin().await?;
|
||||
update_transactions(&tx, &insertions).await?;
|
||||
update_expenditures(&tx, &insertions, &vec![]).await?;
|
||||
tx.commit().await?;
|
||||
@ -204,8 +225,8 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_notify() -> Result<(), Error> {
|
||||
let (_container, db, pool) = initialise().await?;
|
||||
let mut listener = PgListener::connect_with(&pool).await?;
|
||||
let dbi = get_or_initialize_db_instance().await?;
|
||||
let mut listener = PgListener::connect_with(&dbi.pool).await?;
|
||||
listener.listen("monzo_new_transactions").await?;
|
||||
|
||||
let ids = vec![
|
||||
@ -214,7 +235,7 @@ mod tests {
|
||||
"test3".to_string(),
|
||||
];
|
||||
|
||||
notify_new_transactions(&db, &ids).await?;
|
||||
notify_new_transactions(&dbi.db, &ids).await?;
|
||||
|
||||
let notification = listener.recv().await?;
|
||||
let payload = notification.payload();
|
||||
@ -227,4 +248,52 @@ mod tests {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_notify_on_insert() -> Result<(), Error> {
|
||||
let dbi = get_or_initialize_db_instance().await?;
|
||||
let mut listener = PgListener::connect_with(&dbi.pool).await?;
|
||||
listener.listen("monzo_new_transactions").await?;
|
||||
|
||||
let json = include_str!("../../fixtures/transactions.json");
|
||||
let json: Vec<Vec<Value>> = serde_json::from_str(json).unwrap();
|
||||
let data = json
|
||||
.iter()
|
||||
.map(|row| from_json_row(row.clone()))
|
||||
.collect::<Result<Vec<_>, anyhow::Error>>()
|
||||
.unwrap();
|
||||
|
||||
insert(&dbi.db, data.clone()).await?;
|
||||
let notification = listener.recv().await?;
|
||||
let payload = notification.payload();
|
||||
let mut payload = serde_json::from_str::<Vec<String>>(&payload)?;
|
||||
payload.sort();
|
||||
|
||||
let mut ids = data.iter()
|
||||
.map(|row| row.transaction_id.clone())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
ids.sort();
|
||||
|
||||
assert_eq!(payload, ids, "Inserted IDs do not match");
|
||||
|
||||
insert(&dbi.db, data.clone()).await?;
|
||||
let notification = listener.recv().await?;
|
||||
let payload = notification.payload();
|
||||
let payload = serde_json::from_str::<Vec<String>>(&payload)?;
|
||||
assert_eq!(payload, Vec::<String>::new(), "Re-inserting identical rows triggered double notification");
|
||||
|
||||
let mut altered_data = data.clone();
|
||||
altered_data[0].description = Some("New description".to_string());
|
||||
|
||||
assert_ne!(altered_data[0].compute_hash(), data[0].compute_hash(), "Alterations have the same hash");
|
||||
|
||||
insert(&dbi.db, altered_data.clone()).await?;
|
||||
let notification = listener.recv().await?;
|
||||
let payload = notification.payload();
|
||||
let payload = serde_json::from_str::<Vec<String>>(&payload)?;
|
||||
assert_eq!(payload, vec![altered_data[0].transaction_id.clone()], "Re-inserting altered row failed to re-trigger notification");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
9
src/ingestion/flex.rs
Normal file
9
src/ingestion/flex.rs
Normal file
@ -0,0 +1,9 @@
|
||||
#[allow(dead_code)]
|
||||
mod headings {
|
||||
#[allow(unused_imports)]
|
||||
pub use super::super::ingestion_logic::headings::*;
|
||||
|
||||
// Additional FLex headings
|
||||
pub const MONEY_OUT: usize = 16;
|
||||
pub const MONEY_IN: usize = 17;
|
||||
}
|
||||
@ -11,7 +11,7 @@ use serde_json::Value;
|
||||
use std::hash::Hash;
|
||||
|
||||
#[allow(dead_code)]
|
||||
mod headings {
|
||||
pub(crate) mod headings {
|
||||
pub const TRANSACTION_ID: usize = 0;
|
||||
pub const DATE: usize = 1;
|
||||
pub const TIME: usize = 2;
|
||||
@ -30,19 +30,19 @@ mod headings {
|
||||
pub const CATEGORY_SPLIT: usize = 15;
|
||||
}
|
||||
|
||||
#[derive(Debug, Eq, PartialEq, Hash)]
|
||||
#[derive(Debug, Eq, PartialEq, Hash, Clone)]
|
||||
pub struct MonzoRow {
|
||||
category_split: Option<String>,
|
||||
primary_category: String,
|
||||
total_amount: Decimal,
|
||||
receipt: Option<String>,
|
||||
notes: Option<String>,
|
||||
emoji: Option<String>,
|
||||
description: Option<String>,
|
||||
transaction_type: String,
|
||||
title: Option<String>,
|
||||
timestamp: NaiveDateTime,
|
||||
transaction_id: String,
|
||||
pub category_split: Option<String>,
|
||||
pub primary_category: String,
|
||||
pub total_amount: Decimal,
|
||||
pub receipt: Option<String>,
|
||||
pub notes: Option<String>,
|
||||
pub emoji: Option<String>,
|
||||
pub description: Option<String>,
|
||||
pub transaction_type: String,
|
||||
pub title: Option<String>,
|
||||
pub timestamp: NaiveDateTime,
|
||||
pub transaction_id: String,
|
||||
}
|
||||
|
||||
impl MonzoRow {
|
||||
|
||||
@ -1,3 +1,4 @@
|
||||
pub mod db;
|
||||
pub mod ingestion_logic;
|
||||
pub mod flex;
|
||||
pub mod routes;
|
||||
|
||||
@ -43,7 +43,7 @@ pub async fn monzo_batched_csv(
|
||||
Extension(db): Extension<DatabaseConnection>,
|
||||
multipart: Multipart,
|
||||
) -> Result<&'static str, AppError> {
|
||||
static CSV_MISSING_ERR_MSG: &'static str = "No CSV file provided. Expected a multipart request with a `csv` field containing the contents of the CSV.";
|
||||
static CSV_MISSING_ERR_MSG: &str = "No CSV file provided. Expected a multipart request with a `csv` field containing the contents of the CSV.";
|
||||
|
||||
let csv = extract_csv(multipart)
|
||||
.await
|
||||
|
||||
Loading…
Reference in New Issue
Block a user