Compare commits

..

No commits in common. "848da0f5f3a17f416120c32c3295d2c67037188e" and "89fccc8d85b4e0007db3a9fdeebd93ad2225b299" have entirely different histories.

12 changed files with 528 additions and 539 deletions

1
.gitignore vendored
View File

@ -1,3 +1,2 @@
/target /target
/.idea /.idea
client_secret.json

851
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -7,25 +7,25 @@ edition = "2021"
entity = { path = "entity" } entity = { path = "entity" }
migration = { path = "migration" } migration = { path = "migration" }
axum = { version = "0.7.5", features = ["multipart"] } axum = { version = "0.7.4", features = ["multipart"] }
tokio = { version = "1.37.0", features = ["full"] } tokio = { version = "1.36.0", features = ["full"] }
sea-orm = { version = "1.0.0-rc.4", features = [ sea-orm = { version = "1.0.0-rc.1", features = [
"sqlx-postgres", "sqlx-postgres",
"runtime-tokio-rustls", "runtime-tokio-rustls",
"macros" "macros"
] } ] }
serde = { version = "1.0.203", features = ["derive"] } serde = { version = "1.0.197", features = ["derive"] }
serde_json = "1.0.117" serde_json = "1.0.114"
tracing-subscriber = "0.3.18" tracing-subscriber = "0.3.18"
tracing = "0.1.40" tracing = "0.1.40"
anyhow = "1.0.86" anyhow = "1.0.80"
thiserror = "1.0.61" thiserror = "1.0.57"
http = "1.1.0" http = "1.0.0"
chrono = { version = "0.4.38", features = ["serde"] } chrono = { version = "0.4.34", features = ["serde"] }
num-traits = "0.2.19" num-traits = "0.2.18"
csv = "1.3.0" csv = "1.3.0"
clap = "4.5.4" clap = "4.5.1"
[workspace] [workspace]
members = [".", "migration", "entity"] members = [".", "migration", "entity"]

View File

@ -9,5 +9,5 @@ name = "entity"
path = "src/lib.rs" path = "src/lib.rs"
[dependencies] [dependencies]
sea-orm = { version = "1.0.0-rc.4" } sea-orm = { version = "1.0.0-rc.1" }
serde = { version = "1.0.203", features = ["derive"] } serde = { version = "1.0.197", features = ["derive"] }

View File

@ -12,7 +12,7 @@ path = "src/lib.rs"
async-std = { version = "1", features = ["attributes", "tokio1"] } async-std = { version = "1", features = ["attributes", "tokio1"] }
[dependencies.sea-orm-migration] [dependencies.sea-orm-migration]
version = "1.0.0-rc.4" version = "1.0.0-rc.1"
features = [ features = [
"runtime-tokio-rustls", # `ASYNC_RUNTIME` feature "runtime-tokio-rustls", # `ASYNC_RUNTIME` feature
"sqlx-postgres", # `DATABASE_DRIVER` feature "sqlx-postgres", # `DATABASE_DRIVER` feature

View File

@ -6,40 +6,54 @@ pub struct Migration;
#[async_trait::async_trait] #[async_trait::async_trait]
impl MigrationTrait for Migration { impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager manager.create_table(
.create_table( Table::create()
Table::create() .table(Transaction::Table)
.table(Transaction::Table) .if_not_exists()
.if_not_exists() .col(
.col( ColumnDef::new(Transaction::Id)
ColumnDef::new(Transaction::Id) .string()
.string() .not_null()
.not_null() .primary_key(),
.primary_key(), )
) .col(
.col( ColumnDef::new(Transaction::TransactionType)
ColumnDef::new(Transaction::TransactionType) .string()
.string() .not_null(),
.not_null(), )
) .col(
.col( ColumnDef::new(Transaction::TotalAmount)
ColumnDef::new(Transaction::TotalAmount) .decimal()
.decimal() .not_null(),
.not_null(), )
) .col(
.col( ColumnDef::new(Transaction::Timestamp)
ColumnDef::new(Transaction::Timestamp) .timestamp()
.timestamp() .not_null(),
.not_null(), )
) .col(
.col(ColumnDef::new(Transaction::Title).string().not_null()) ColumnDef::new(Transaction::Title)
.col(ColumnDef::new(Transaction::Emoji).string()) .string()
.col(ColumnDef::new(Transaction::Notes).string()) .not_null(),
.col(ColumnDef::new(Transaction::Receipt).string()) )
.col(ColumnDef::new(Transaction::Description).string()) .col(
.to_owned(), ColumnDef::new(Transaction::Emoji)
) .string(),
.await?; )
.col(
ColumnDef::new(Transaction::Notes)
.string(),
)
.col(
ColumnDef::new(Transaction::Receipt)
.string(),
)
.col(
ColumnDef::new(Transaction::Description)
.string(),
)
.to_owned()
).await?;
manager manager
.create_table( .create_table(
@ -59,8 +73,7 @@ impl MigrationTrait for Migration {
) )
.col(ColumnDef::new(Expenditure::Amount).decimal().not_null()) .col(ColumnDef::new(Expenditure::Amount).decimal().not_null())
.to_owned(), .to_owned(),
) ).await?;
.await?;
Ok(()) Ok(())
} }

View File

@ -12,7 +12,7 @@ pub enum AppError {
#[error("Invalid request {0}")] #[error("Invalid request {0}")]
BadRequest(anyhow::Error), BadRequest(anyhow::Error),
/// Catch all for error we don't care to expose publicly. /// Catch all for error we dont care to expose publicly.
#[error("Internal error")] #[error("Internal error")]
Anyhow(#[from] anyhow::Error), Anyhow(#[from] anyhow::Error),
} }

View File

@ -1,10 +1,7 @@
use anyhow::anyhow;
use entity::{expenditure, transaction}; use entity::{expenditure, transaction};
use sea_orm::sea_query::OnConflict; use sea_orm::sea_query::OnConflict;
use sea_orm::{ConnectionTrait, DatabaseBackend, QueryFilter, QueryTrait, Statement}; use sea_orm::QueryFilter;
use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, Iterable, TransactionTrait}; use sea_orm::{ColumnTrait, DatabaseConnection, DbErr, EntityTrait, Iterable, TransactionTrait};
use migration::PostgresQueryBuilder;
use crate::error::AppError;
pub struct Insertion { pub struct Insertion {
pub transaction: transaction::ActiveModel, pub transaction: transaction::ActiveModel,
@ -12,42 +9,34 @@ pub struct Insertion {
} }
// Note while this is more efficient in db calls, it does bind together the entire group. // Note while this is more efficient in db calls, it does bind together the entire group.
// We employ a batching process for now to try balance speed and failure rate, but it is worth // We employ a batching process for now to try balance speed and failure rate but it is worth trying
// trying to move failures earlier and improve reporting. // to move failures earlier and improve reporting.
pub async fn insert(db: &DatabaseConnection, insertions: Vec<Insertion>) -> Result<Vec<String>, AppError> { pub async fn insert(db: &DatabaseConnection, insertions: Vec<Insertion>) -> Result<(), DbErr> {
let mut new_transaction_ids = Vec::new();
for insertions in insertions.chunks(400) { for insertions in insertions.chunks(400) {
let tx = db.begin().await?; let tx = db.begin().await?;
let insert = transaction::Entity::insert_many(insertions.iter().map(|i| &i.transaction).cloned()) transaction::Entity::insert_many(
.on_conflict( insertions.iter().map(|i| &i.transaction).cloned(),
OnConflict::column(transaction::Column::Id) )
.update_columns(transaction::Column::iter()) .on_conflict(
.to_owned(), OnConflict::column(transaction::Column::Id)
) .update_columns(transaction::Column::iter())
.into_query() .to_owned(),
.returning_col(transaction::Column::Id) )
.build(PostgresQueryBuilder); .exec(&tx)
.await?;
let inserted_transaction_ids = tx.query_all(Statement::from_sql_and_values( // Expenditures can change as we recagegorise them, so we delete all the old ones and insert
DatabaseBackend::Postgres, // an entirely new set to ensure we don't end up leaving old ones around.
insert.0,
insert.1,
)).await?
.iter()
.map(|r| r.try_get_by("id"))
.collect::<Result<Vec<String>, _>>()?;
// Expenditures can change as we re-categorise them, so we delete all the old ones and
// insert an entirely new set to ensure we don't end up leaving old ones around.
expenditure::Entity::delete_many() expenditure::Entity::delete_many()
.filter( .filter(
expenditure::Column::TransactionId expenditure::Column::TransactionId.is_in(
.is_in(insertions.iter().map(|i| i.transaction.id.as_ref())), insertions
.iter()
.map(|i| i.transaction.id.as_ref()),
),
) )
.exec(&tx) .exec(&tx).await?;
.await?;
expenditure::Entity::insert_many( expenditure::Entity::insert_many(
insertions insertions
@ -67,19 +56,7 @@ pub async fn insert(db: &DatabaseConnection, insertions: Vec<Insertion>) -> Resu
.await?; .await?;
tx.commit().await?; tx.commit().await?;
new_transaction_ids.extend(inserted_transaction_ids);
} }
let payload = serde_json::to_string(&new_transaction_ids) Ok(())
.map_err(|e| anyhow!(e))?;
db.execute(
Statement::from_sql_and_values(
DatabaseBackend::Postgres,
"NOTIFY monzo_new_transactions, $1",
vec![sea_orm::Value::from(payload)],
)
).await?;
Ok(new_transaction_ids)
} }

View File

@ -1,13 +1,13 @@
use crate::ingestion::db::Insertion;
use anyhow::Context; use anyhow::Context;
use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime}; use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime};
use csv::StringRecord;
use entity::expenditure::ActiveModel; use entity::expenditure::ActiveModel;
use entity::transaction; use entity::transaction;
use num_traits::FromPrimitive; use num_traits::FromPrimitive;
use sea_orm::prelude::Decimal; use sea_orm::prelude::Decimal;
use sea_orm::ActiveValue::*; use sea_orm::ActiveValue::*;
use sea_orm::IntoActiveModel; use sea_orm::IntoActiveModel;
use crate::ingestion::db::Insertion;
use csv::StringRecord;
#[allow(dead_code)] #[allow(dead_code)]
mod headings { mod headings {

View File

@ -1,3 +1,3 @@
pub mod db; pub mod db;
pub mod ingestion_logic;
pub mod routes; pub mod routes;
pub mod ingestion_logic;

View File

@ -1,11 +1,11 @@
use crate::error::AppError;
use crate::ingestion::db;
use crate::ingestion::ingestion_logic::{from_csv_row, from_json_row};
use anyhow::anyhow; use anyhow::anyhow;
use axum::extract::{Extension, Json, Multipart}; use axum::extract::{Extension, Json, Multipart};
use sea_orm::DatabaseConnection; use sea_orm::DatabaseConnection;
use serde_json::Value; use serde_json::Value;
use std::io::Cursor; use std::io::Cursor;
use crate::error::AppError;
use crate::ingestion::db;
use crate::ingestion::ingestion_logic::{from_csv_row, from_json_row};
pub async fn monzo_updated( pub async fn monzo_updated(
Extension(db): Extension<DatabaseConnection>, Extension(db): Extension<DatabaseConnection>,

View File

@ -1,14 +1,14 @@
mod error;
mod ingestion; mod ingestion;
mod error;
use axum::{Extension, Router};
use std::net::SocketAddr;
use axum::routing::{get, post};
use clap::Parser;
use sea_orm::{ConnectionTrait, DatabaseConnection};
use migration::{Migrator, MigratorTrait};
use crate::error::AppError; use crate::error::AppError;
use crate::ingestion::routes::{monzo_batched_csv, monzo_batched_json, monzo_updated}; use crate::ingestion::routes::{monzo_batched_csv, monzo_batched_json, monzo_updated};
use axum::routing::{get, post};
use axum::{Extension, Router};
use clap::Parser;
use migration::{Migrator, MigratorTrait};
use sea_orm::{ConnectionTrait, DatabaseConnection};
use std::net::SocketAddr;
#[derive(Debug, clap::Parser)] #[derive(Debug, clap::Parser)]
struct Config { struct Config {
@ -28,7 +28,8 @@ struct Config {
async fn health_check( async fn health_check(
Extension(db): Extension<DatabaseConnection>, Extension(db): Extension<DatabaseConnection>,
) -> Result<&'static str, AppError> { ) -> Result<&'static str, AppError> {
db.execute_unprepared("SELECT 1").await?; db.execute_unprepared("SELECT 1")
.await?;
Ok("Ok") Ok("Ok")
} }