Compare commits
No commits in common. "af0588c5efcb75a8d8f45348d1801511f1041597" and "bf47520d3191fdc8dcadfd4b0add7e846e157f91" have entirely different histories.
af0588c5ef
...
bf47520d31
89
.github/workflows/build.yml
vendored
89
.github/workflows/build.yml
vendored
@ -1,98 +1,39 @@
|
|||||||
name: Rust CI
|
name: Build and Publish Docker Container
|
||||||
|
|
||||||
on:
|
on:
|
||||||
push:
|
push:
|
||||||
branches: [ main ]
|
branches:
|
||||||
pull_request:
|
- main
|
||||||
branches: [ main ]
|
|
||||||
|
|
||||||
env:
|
|
||||||
CARGO_TERM_COLOR: always
|
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
build:
|
build:
|
||||||
name: Build and Test
|
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
container: catthehacker/ubuntu:act-latest
|
container:
|
||||||
|
image: catthehacker/ubuntu:act-latest
|
||||||
|
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v3
|
- name: Checkout code
|
||||||
|
uses: actions/checkout@v2
|
||||||
- name: Install Rust
|
|
||||||
uses: actions-rs/toolchain@v1
|
|
||||||
with:
|
|
||||||
toolchain: stable
|
|
||||||
profile: minimal
|
|
||||||
override: true
|
|
||||||
components: rustfmt, clippy
|
|
||||||
|
|
||||||
- name: Add ARM64 target
|
|
||||||
run: rustup target add aarch64-unknown-linux-gnu
|
|
||||||
|
|
||||||
- name: Install ARM64 toolchain
|
|
||||||
run: |
|
|
||||||
apt-get update
|
|
||||||
apt-get install -y gcc-aarch64-linux-gnu
|
|
||||||
|
|
||||||
- name: Cache dependencies
|
|
||||||
uses: actions/cache@v3
|
|
||||||
with:
|
|
||||||
path: |
|
|
||||||
~/.cargo
|
|
||||||
target/
|
|
||||||
key: "${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}"
|
|
||||||
restore-keys: |
|
|
||||||
${{ runner.os }}-cargo-
|
|
||||||
|
|
||||||
- name: Build (x86_64)
|
|
||||||
uses: actions-rs/cargo@v1
|
|
||||||
with:
|
|
||||||
command: build
|
|
||||||
args: --release --all-features
|
|
||||||
|
|
||||||
- name: Build (ARM64)
|
|
||||||
uses: actions-rs/cargo@v1
|
|
||||||
with:
|
|
||||||
command: build
|
|
||||||
args: --release --all-features --target aarch64-unknown-linux-gnu
|
|
||||||
env:
|
|
||||||
CARGO_TARGET_AARCH64_UNKNOWN_LINUX_GNU_LINKER: aarch64-linux-gnu-gcc
|
|
||||||
|
|
||||||
- name: Run tests
|
|
||||||
uses: actions-rs/cargo@v1
|
|
||||||
with:
|
|
||||||
command: test
|
|
||||||
args: --all-features
|
|
||||||
|
|
||||||
- name: Upload artifacts
|
|
||||||
uses: actions/upload-artifact@v3
|
|
||||||
with:
|
|
||||||
name: binaries
|
|
||||||
path: |
|
|
||||||
target/release/${{ github.event.repository.name }}
|
|
||||||
target/aarch64-unknown-linux-gnu/release/${{ github.event.repository.name }}
|
|
||||||
|
|
||||||
- name: Set up QEMU
|
|
||||||
uses: docker/setup-qemu-action@v2
|
|
||||||
|
|
||||||
- name: Set up Docker Buildx
|
- name: Set up Docker Buildx
|
||||||
uses: docker/setup-buildx-action@v2
|
uses: docker/setup-buildx-action@v1
|
||||||
|
|
||||||
- name: Login to DockerHub
|
- name: Login to Docker
|
||||||
uses: docker/login-action@v2
|
uses: docker/login-action@v1
|
||||||
with:
|
with:
|
||||||
registry: git.joshuacoles.me
|
registry: git.joshuacoles.me
|
||||||
username: ${{ secrets.DOCKER_USERNAME }}
|
username: ${{ secrets.DOCKER_USERNAME }}
|
||||||
password: ${{ secrets.DOCKER_PASSWORD }}
|
password: ${{ secrets.DOCKER_PASSWORD }}
|
||||||
|
|
||||||
- name: Build and push multi-arch Docker image
|
- name: Build and Push Docker image
|
||||||
uses: docker/build-push-action@v4
|
uses: docker/build-push-action@v5
|
||||||
with:
|
with:
|
||||||
context: .
|
context: .
|
||||||
file: ./Dockerfile
|
|
||||||
platforms: linux/amd64,linux/arm64
|
|
||||||
push: true
|
push: true
|
||||||
|
file: ./Dockerfile.cache
|
||||||
tags: git.joshuacoles.me/${{ github.repository }}:latest,git.joshuacoles.me/${{ github.repository }}:${{ github.sha }}
|
tags: git.joshuacoles.me/${{ github.repository }}:latest,git.joshuacoles.me/${{ github.repository }}:${{ github.sha }}
|
||||||
|
cache-from: type=gha
|
||||||
|
cache-to: type=gha,mode=max
|
||||||
|
|
||||||
- uses: robiningelbrecht/ntfy-action@v1.0.0
|
- uses: robiningelbrecht/ntfy-action@v1.0.0
|
||||||
name: Notify via ntfy.sh
|
name: Notify via ntfy.sh
|
||||||
|
|||||||
627
Cargo.lock
generated
627
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@ -31,7 +31,6 @@ testcontainers-modules = { version = "0.5.0", features = ["postgres"] }
|
|||||||
sqlx = { version = "0.7.4", features = ["postgres"] }
|
sqlx = { version = "0.7.4", features = ["postgres"] }
|
||||||
tower-http = { version = "0.5.2", features = ["trace"] }
|
tower-http = { version = "0.5.2", features = ["trace"] }
|
||||||
bytes = "1.6.0"
|
bytes = "1.6.0"
|
||||||
once_cell = "1.19.0"
|
|
||||||
|
|
||||||
[workspace]
|
[workspace]
|
||||||
members = [".", "migration", "entity"]
|
members = [".", "migration", "entity"]
|
||||||
|
|||||||
90
Dockerfile
90
Dockerfile
@ -1,14 +1,78 @@
|
|||||||
FROM --platform=$BUILDPLATFORM debian:bullseye-slim AS builder
|
# syntax=docker/dockerfile:1
|
||||||
ARG TARGETPLATFORM
|
|
||||||
WORKDIR /app
|
|
||||||
COPY . .
|
|
||||||
RUN case "$TARGETPLATFORM" in \
|
|
||||||
"linux/amd64") BINARY_PATH="target/release/toggl-bridge" ;; \
|
|
||||||
"linux/arm64") BINARY_PATH="target/aarch64-unknown-linux-gnu/release/toggl-bridge" ;; \
|
|
||||||
*) exit 1 ;; \
|
|
||||||
esac && \
|
|
||||||
mv "$BINARY_PATH" /usr/local/bin/toggl-bridge
|
|
||||||
|
|
||||||
FROM --platform=$TARGETPLATFORM debian:bullseye-slim
|
# Comments are provided throughout this file to help you get started.
|
||||||
COPY --from=builder /usr/local/bin/toggl-bridge /usr/local/bin/
|
# If you need more help, visit the Dockerfile reference guide at
|
||||||
CMD ["toggl-bridge"]
|
# https://docs.docker.com/engine/reference/builder/
|
||||||
|
|
||||||
|
################################################################################
|
||||||
|
# Create a stage for building the application.
|
||||||
|
|
||||||
|
ARG RUST_VERSION=1.76.0
|
||||||
|
ARG APP_NAME=monzo-ingestion
|
||||||
|
FROM rust:${RUST_VERSION}-slim-bullseye AS build
|
||||||
|
ARG APP_NAME
|
||||||
|
WORKDIR /app
|
||||||
|
|
||||||
|
# Build the application.
|
||||||
|
# Leverage a cache mount to /usr/local/cargo/registry/
|
||||||
|
# for downloaded dependencies and a cache mount to /app/target/ for
|
||||||
|
# compiled dependencies which will speed up subsequent builds.
|
||||||
|
# Leverage a bind mount to the src directory to avoid having to copy the
|
||||||
|
# source code into the container. Once built, copy the executable to an
|
||||||
|
# output directory before the cache mounted /app/target is unmounted.
|
||||||
|
RUN --mount=type=bind,source=src,target=src \
|
||||||
|
--mount=type=bind,source=entity,target=entity \
|
||||||
|
--mount=type=bind,source=migration,target=migration \
|
||||||
|
--mount=type=bind,source=Cargo.toml,target=Cargo.toml \
|
||||||
|
--mount=type=bind,source=Cargo.lock,target=Cargo.lock \
|
||||||
|
--mount=type=cache,target=/app/target/ \
|
||||||
|
--mount=type=cache,target=/usr/local/cargo/registry/ \
|
||||||
|
<<EOF
|
||||||
|
set -e
|
||||||
|
cargo build --locked --release
|
||||||
|
cp ./target/release/$APP_NAME /bin/server
|
||||||
|
EOF
|
||||||
|
|
||||||
|
################################################################################
|
||||||
|
# Create a new stage for running the application that contains the minimal
|
||||||
|
# runtime dependencies for the application. This often uses a different base
|
||||||
|
# image from the build stage where the necessary files are copied from the build
|
||||||
|
# stage.
|
||||||
|
#
|
||||||
|
# The example below uses the debian bullseye image as the foundation for running the app.
|
||||||
|
# By specifying the "bullseye-slim" tag, it will also use whatever happens to be the
|
||||||
|
# most recent version of that tag when you build your Dockerfile. If
|
||||||
|
# reproducability is important, consider using a digest
|
||||||
|
# (e.g., debian@sha256:ac707220fbd7b67fc19b112cee8170b41a9e97f703f588b2cdbbcdcecdd8af57).
|
||||||
|
FROM debian:bullseye-slim AS final
|
||||||
|
|
||||||
|
RUN set -ex; \
|
||||||
|
apt-get update && \
|
||||||
|
apt-get -y install --no-install-recommends \
|
||||||
|
ca-certificates curl && \
|
||||||
|
rm -rf /var/lib/apt/lists/*
|
||||||
|
|
||||||
|
# Create a non-privileged user that the app will run under.
|
||||||
|
# See https://docs.docker.com/develop/develop-images/dockerfile_best-practices/#user
|
||||||
|
ARG UID=10001
|
||||||
|
RUN adduser \
|
||||||
|
--disabled-password \
|
||||||
|
--gecos "" \
|
||||||
|
--home "/nonexistent" \
|
||||||
|
--shell "/sbin/nologin" \
|
||||||
|
--no-create-home \
|
||||||
|
--uid "${UID}" \
|
||||||
|
appuser
|
||||||
|
USER appuser
|
||||||
|
|
||||||
|
# Copy the executable from the "build" stage.
|
||||||
|
COPY --from=build /bin/server /bin/
|
||||||
|
|
||||||
|
# Expose the port that the application listens on.
|
||||||
|
EXPOSE 3000
|
||||||
|
|
||||||
|
HEALTHCHECK --interval=5s --timeout=3s --retries=3 \
|
||||||
|
CMD curl -f http://localhost:3000/health || exit 1
|
||||||
|
|
||||||
|
# What the container should run when it is started.
|
||||||
|
CMD ["/bin/server", "web", "--addr", "0.0.0.0:3000"]
|
||||||
|
|||||||
58
Dockerfile.cache
Normal file
58
Dockerfile.cache
Normal file
@ -0,0 +1,58 @@
|
|||||||
|
# Stage 1: Build
|
||||||
|
ARG RUST_VERSION=1.76.0
|
||||||
|
FROM lukemathwalker/cargo-chef:latest-rust-${RUST_VERSION} as chef
|
||||||
|
WORKDIR /build/
|
||||||
|
# hadolint ignore=DL3008
|
||||||
|
RUN apt-get update && \
|
||||||
|
apt-get install -y --no-install-recommends \
|
||||||
|
lld \
|
||||||
|
clang \
|
||||||
|
libclang-dev \
|
||||||
|
&& apt-get clean \
|
||||||
|
&& rm -rf /var/lib/apt/lists/*
|
||||||
|
|
||||||
|
FROM chef as planner
|
||||||
|
COPY . .
|
||||||
|
RUN cargo chef prepare --recipe-path recipe.json
|
||||||
|
|
||||||
|
FROM chef as builder
|
||||||
|
COPY --from=planner /build/recipe.json recipe.json
|
||||||
|
# Build dependencies - this is the caching Docker layer!
|
||||||
|
RUN cargo chef cook --release -p monzo-ingestion --recipe-path recipe.json
|
||||||
|
# Build application
|
||||||
|
COPY . .
|
||||||
|
RUN cargo build --release -p monzo-ingestion
|
||||||
|
|
||||||
|
# Stage 2: Run
|
||||||
|
FROM debian:bookworm-slim AS runtime
|
||||||
|
|
||||||
|
RUN set -ex; \
|
||||||
|
apt-get update && \
|
||||||
|
apt-get -y install --no-install-recommends \
|
||||||
|
ca-certificates curl && \
|
||||||
|
rm -rf /var/lib/apt/lists/*
|
||||||
|
|
||||||
|
# Create a non-privileged user that the app will run under.
|
||||||
|
# See https://docs.docker.com/develop/develop-images/dockerfile_best-practices/#user
|
||||||
|
ARG UID=10001
|
||||||
|
RUN adduser \
|
||||||
|
--disabled-password \
|
||||||
|
--gecos "" \
|
||||||
|
--home "/nonexistent" \
|
||||||
|
--shell "/sbin/nologin" \
|
||||||
|
--no-create-home \
|
||||||
|
--uid "${UID}" \
|
||||||
|
appuser
|
||||||
|
USER appuser
|
||||||
|
|
||||||
|
# Copy the executable from the "build" stage.
|
||||||
|
COPY --from=builder /build/target/release/monzo-ingestion /bin/server
|
||||||
|
|
||||||
|
# Expose the port that the application listens on.
|
||||||
|
EXPOSE 3000
|
||||||
|
|
||||||
|
HEALTHCHECK --interval=5s --timeout=3s --retries=3 \
|
||||||
|
CMD curl -f http://localhost:3000/health || exit 1
|
||||||
|
|
||||||
|
# What the container should run when it is started.
|
||||||
|
CMD ["/bin/server", "serve", "--addr", "0.0.0.0:3000"]
|
||||||
@ -29,7 +29,6 @@ impl AppError {
|
|||||||
impl IntoResponse for AppError {
|
impl IntoResponse for AppError {
|
||||||
fn into_response(self) -> Response {
|
fn into_response(self) -> Response {
|
||||||
let status_code = match self {
|
let status_code = match self {
|
||||||
AppError::BadRequest(_) => StatusCode::BAD_REQUEST,
|
|
||||||
_ => StatusCode::INTERNAL_SERVER_ERROR,
|
_ => StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@ -29,7 +29,7 @@ pub async fn insert(
|
|||||||
|
|
||||||
for insertions in insertions.chunks(400) {
|
for insertions in insertions.chunks(400) {
|
||||||
let (new_or_updated_insertions, inserted_transaction_ids) =
|
let (new_or_updated_insertions, inserted_transaction_ids) =
|
||||||
whittle_insertions(insertions, db).await?;
|
whittle_insertions(insertions, &db).await?;
|
||||||
|
|
||||||
if new_or_updated_insertions.is_empty() {
|
if new_or_updated_insertions.is_empty() {
|
||||||
continue;
|
continue;
|
||||||
@ -69,20 +69,20 @@ async fn update_expenditures(
|
|||||||
|
|
||||||
expenditure::Entity::insert_many(
|
expenditure::Entity::insert_many(
|
||||||
new_or_updated_insertions
|
new_or_updated_insertions
|
||||||
.iter()
|
.into_iter()
|
||||||
.flat_map(|i| &i.contained_expenditures)
|
.flat_map(|i| &i.contained_expenditures)
|
||||||
.cloned(),
|
.cloned(),
|
||||||
)
|
)
|
||||||
.on_conflict(
|
.on_conflict(
|
||||||
OnConflict::columns(vec![
|
OnConflict::columns(vec![
|
||||||
expenditure::Column::TransactionId,
|
expenditure::Column::TransactionId,
|
||||||
expenditure::Column::Category,
|
expenditure::Column::Category,
|
||||||
])
|
])
|
||||||
.update_columns(expenditure::Column::iter())
|
.update_columns(expenditure::Column::iter())
|
||||||
.to_owned(),
|
.to_owned(),
|
||||||
)
|
)
|
||||||
.exec(tx)
|
.exec(tx)
|
||||||
.await?;
|
.await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -118,21 +118,19 @@ async fn whittle_insertions<'a>(
|
|||||||
.select_only()
|
.select_only()
|
||||||
.columns([transaction::Column::IdentityHash])
|
.columns([transaction::Column::IdentityHash])
|
||||||
.filter(transaction::Column::IdentityHash.is_not_null())
|
.filter(transaction::Column::IdentityHash.is_not_null())
|
||||||
.into_tuple::<(i64, )>()
|
.into_tuple::<(i64,)>()
|
||||||
.all(tx)
|
.all(tx)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
tracing::debug!("Found existing entries: {existing_hashes:?}");
|
|
||||||
|
|
||||||
// We will only update those where the hash is different to avoid unnecessary updates and
|
// We will only update those where the hash is different to avoid unnecessary updates and
|
||||||
// notifications.
|
// notifications.
|
||||||
let new_or_updated_insertions = insertions
|
let new_or_updated_insertions = insertions
|
||||||
.iter()
|
.into_iter()
|
||||||
.filter(|i| {
|
.filter(|i| {
|
||||||
let hash = i.identity_hash;
|
let hash = i.identity_hash;
|
||||||
!existing_hashes
|
!existing_hashes
|
||||||
.iter()
|
.iter()
|
||||||
.any(|(existing_hash, )| *existing_hash == hash)
|
.any(|(existing_hash,)| *existing_hash == hash)
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
@ -157,28 +155,23 @@ async fn notify_new_transactions(
|
|||||||
}
|
}
|
||||||
|
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::{insert, notify_new_transactions, update_expenditures, update_transactions};
|
use super::{notify_new_transactions, update_expenditures, update_transactions};
|
||||||
use anyhow::Error;
|
use anyhow::Error;
|
||||||
use tokio::sync::OnceCell;
|
|
||||||
use migration::MigratorTrait;
|
use migration::MigratorTrait;
|
||||||
use sea_orm::{DatabaseConnection, TransactionTrait};
|
use sea_orm::{DatabaseConnection, TransactionTrait};
|
||||||
use serde_json::Value;
|
|
||||||
use sqlx::postgres::PgListener;
|
use sqlx::postgres::PgListener;
|
||||||
use sqlx::PgPool;
|
use sqlx::PgPool;
|
||||||
use testcontainers::runners::AsyncRunner;
|
use testcontainers::runners::AsyncRunner;
|
||||||
use testcontainers::ContainerAsync;
|
use testcontainers::ContainerAsync;
|
||||||
use crate::ingestion::ingestion_logic::from_json_row;
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
async fn initialise() -> Result<
|
||||||
struct DatabaseInstance {
|
(
|
||||||
container: ContainerAsync<testcontainers_modules::postgres::Postgres>,
|
ContainerAsync<testcontainers_modules::postgres::Postgres>,
|
||||||
db: DatabaseConnection,
|
DatabaseConnection,
|
||||||
pool: PgPool,
|
PgPool,
|
||||||
}
|
),
|
||||||
|
Error,
|
||||||
static INSTANCE: OnceCell<DatabaseInstance> = OnceCell::const_new();
|
> {
|
||||||
|
|
||||||
async fn initialise_db() -> Result<DatabaseInstance, Error> {
|
|
||||||
let container = testcontainers_modules::postgres::Postgres::default()
|
let container = testcontainers_modules::postgres::Postgres::default()
|
||||||
.start()
|
.start()
|
||||||
.await?;
|
.await?;
|
||||||
@ -193,29 +186,15 @@ mod tests {
|
|||||||
migration::Migrator::up(&db, None).await?;
|
migration::Migrator::up(&db, None).await?;
|
||||||
|
|
||||||
let pool = PgPool::connect(connection_string).await?;
|
let pool = PgPool::connect(connection_string).await?;
|
||||||
let instance = DatabaseInstance {
|
|
||||||
container,
|
|
||||||
db,
|
|
||||||
pool,
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(instance)
|
Ok((container, db, pool))
|
||||||
}
|
|
||||||
|
|
||||||
async fn get_or_initialize_db_instance() -> Result<
|
|
||||||
&'static DatabaseInstance,
|
|
||||||
Error,
|
|
||||||
> {
|
|
||||||
Ok(INSTANCE.get_or_init(|| async {
|
|
||||||
initialise_db().await.unwrap()
|
|
||||||
}).await)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_empty_insertion_list() -> Result<(), Error> {
|
async fn test_no_new_insertions() -> Result<(), Error> {
|
||||||
let db = get_or_initialize_db_instance().await?;
|
let (_container, db, _pool) = initialise().await?;
|
||||||
let insertions = vec![];
|
let insertions = vec![];
|
||||||
let tx = db.db.begin().await?;
|
let tx = db.begin().await?;
|
||||||
update_transactions(&tx, &insertions).await?;
|
update_transactions(&tx, &insertions).await?;
|
||||||
update_expenditures(&tx, &insertions, &vec![]).await?;
|
update_expenditures(&tx, &insertions, &vec![]).await?;
|
||||||
tx.commit().await?;
|
tx.commit().await?;
|
||||||
@ -225,8 +204,8 @@ mod tests {
|
|||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_notify() -> Result<(), Error> {
|
async fn test_notify() -> Result<(), Error> {
|
||||||
let dbi = get_or_initialize_db_instance().await?;
|
let (_container, db, pool) = initialise().await?;
|
||||||
let mut listener = PgListener::connect_with(&dbi.pool).await?;
|
let mut listener = PgListener::connect_with(&pool).await?;
|
||||||
listener.listen("monzo_new_transactions").await?;
|
listener.listen("monzo_new_transactions").await?;
|
||||||
|
|
||||||
let ids = vec![
|
let ids = vec![
|
||||||
@ -235,7 +214,7 @@ mod tests {
|
|||||||
"test3".to_string(),
|
"test3".to_string(),
|
||||||
];
|
];
|
||||||
|
|
||||||
notify_new_transactions(&dbi.db, &ids).await?;
|
notify_new_transactions(&db, &ids).await?;
|
||||||
|
|
||||||
let notification = listener.recv().await?;
|
let notification = listener.recv().await?;
|
||||||
let payload = notification.payload();
|
let payload = notification.payload();
|
||||||
@ -248,52 +227,4 @@ mod tests {
|
|||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_notify_on_insert() -> Result<(), Error> {
|
|
||||||
let dbi = get_or_initialize_db_instance().await?;
|
|
||||||
let mut listener = PgListener::connect_with(&dbi.pool).await?;
|
|
||||||
listener.listen("monzo_new_transactions").await?;
|
|
||||||
|
|
||||||
let json = include_str!("../../fixtures/transactions.json");
|
|
||||||
let json: Vec<Vec<Value>> = serde_json::from_str(json).unwrap();
|
|
||||||
let data = json
|
|
||||||
.iter()
|
|
||||||
.map(|row| from_json_row(row.clone()))
|
|
||||||
.collect::<Result<Vec<_>, anyhow::Error>>()
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
insert(&dbi.db, data.clone()).await?;
|
|
||||||
let notification = listener.recv().await?;
|
|
||||||
let payload = notification.payload();
|
|
||||||
let mut payload = serde_json::from_str::<Vec<String>>(&payload)?;
|
|
||||||
payload.sort();
|
|
||||||
|
|
||||||
let mut ids = data.iter()
|
|
||||||
.map(|row| row.transaction_id.clone())
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
|
|
||||||
ids.sort();
|
|
||||||
|
|
||||||
assert_eq!(payload, ids, "Inserted IDs do not match");
|
|
||||||
|
|
||||||
insert(&dbi.db, data.clone()).await?;
|
|
||||||
let notification = listener.recv().await?;
|
|
||||||
let payload = notification.payload();
|
|
||||||
let payload = serde_json::from_str::<Vec<String>>(&payload)?;
|
|
||||||
assert_eq!(payload, Vec::<String>::new(), "Re-inserting identical rows triggered double notification");
|
|
||||||
|
|
||||||
let mut altered_data = data.clone();
|
|
||||||
altered_data[0].description = Some("New description".to_string());
|
|
||||||
|
|
||||||
assert_ne!(altered_data[0].compute_hash(), data[0].compute_hash(), "Alterations have the same hash");
|
|
||||||
|
|
||||||
insert(&dbi.db, altered_data.clone()).await?;
|
|
||||||
let notification = listener.recv().await?;
|
|
||||||
let payload = notification.payload();
|
|
||||||
let payload = serde_json::from_str::<Vec<String>>(&payload)?;
|
|
||||||
assert_eq!(payload, vec![altered_data[0].transaction_id.clone()], "Re-inserting altered row failed to re-trigger notification");
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,9 +0,0 @@
|
|||||||
#[allow(dead_code)]
|
|
||||||
mod headings {
|
|
||||||
#[allow(unused_imports)]
|
|
||||||
pub use super::super::ingestion_logic::headings::*;
|
|
||||||
|
|
||||||
// Additional FLex headings
|
|
||||||
pub const MONEY_OUT: usize = 16;
|
|
||||||
pub const MONEY_IN: usize = 17;
|
|
||||||
}
|
|
||||||
@ -11,7 +11,7 @@ use serde_json::Value;
|
|||||||
use std::hash::Hash;
|
use std::hash::Hash;
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
pub(crate) mod headings {
|
mod headings {
|
||||||
pub const TRANSACTION_ID: usize = 0;
|
pub const TRANSACTION_ID: usize = 0;
|
||||||
pub const DATE: usize = 1;
|
pub const DATE: usize = 1;
|
||||||
pub const TIME: usize = 2;
|
pub const TIME: usize = 2;
|
||||||
@ -30,19 +30,19 @@ pub(crate) mod headings {
|
|||||||
pub const CATEGORY_SPLIT: usize = 15;
|
pub const CATEGORY_SPLIT: usize = 15;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Eq, PartialEq, Hash, Clone)]
|
#[derive(Debug, Eq, PartialEq, Hash)]
|
||||||
pub struct MonzoRow {
|
pub struct MonzoRow {
|
||||||
pub category_split: Option<String>,
|
category_split: Option<String>,
|
||||||
pub primary_category: String,
|
primary_category: String,
|
||||||
pub total_amount: Decimal,
|
total_amount: Decimal,
|
||||||
pub receipt: Option<String>,
|
receipt: Option<String>,
|
||||||
pub notes: Option<String>,
|
notes: Option<String>,
|
||||||
pub emoji: Option<String>,
|
emoji: Option<String>,
|
||||||
pub description: Option<String>,
|
description: Option<String>,
|
||||||
pub transaction_type: String,
|
transaction_type: String,
|
||||||
pub title: Option<String>,
|
title: Option<String>,
|
||||||
pub timestamp: NaiveDateTime,
|
timestamp: NaiveDateTime,
|
||||||
pub transaction_id: String,
|
transaction_id: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MonzoRow {
|
impl MonzoRow {
|
||||||
|
|||||||
@ -1,4 +1,3 @@
|
|||||||
pub mod db;
|
pub mod db;
|
||||||
pub mod ingestion_logic;
|
pub mod ingestion_logic;
|
||||||
pub mod flex;
|
|
||||||
pub mod routes;
|
pub mod routes;
|
||||||
|
|||||||
@ -43,7 +43,7 @@ pub async fn monzo_batched_csv(
|
|||||||
Extension(db): Extension<DatabaseConnection>,
|
Extension(db): Extension<DatabaseConnection>,
|
||||||
multipart: Multipart,
|
multipart: Multipart,
|
||||||
) -> Result<&'static str, AppError> {
|
) -> Result<&'static str, AppError> {
|
||||||
static CSV_MISSING_ERR_MSG: &str = "No CSV file provided. Expected a multipart request with a `csv` field containing the contents of the CSV.";
|
static CSV_MISSING_ERR_MSG: &'static str = "No CSV file provided. Expected a multipart request with a `csv` field containing the contents of the CSV.";
|
||||||
|
|
||||||
let csv = extract_csv(multipart)
|
let csv = extract_csv(multipart)
|
||||||
.await
|
.await
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user