diff --git a/Cargo.lock b/Cargo.lock index 4a05de8..64f53de 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -124,6 +124,12 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "anyhow" +version = "1.0.75" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6" + [[package]] name = "arrayvec" version = "0.7.4" @@ -315,6 +321,7 @@ dependencies = [ "matchit", "memchr", "mime", + "multer", "percent-encoding", "pin-project-lite", "rustversion", @@ -543,8 +550,11 @@ checksum = "95ed24df0632f708f5f6d8082675bef2596f7084dee3dd55f632290bf35bfe0f" dependencies = [ "android-tzdata", "iana-time-zone", + "js-sys", "num-traits", "serde", + "time 0.1.45", + "wasm-bindgen", "windows-targets", ] @@ -668,6 +678,27 @@ dependencies = [ "typenum", ] +[[package]] +name = "csv" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "626ae34994d3d8d668f4269922248239db4ae42d538b14c398b74a52208e8086" +dependencies = [ + "csv-core", + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "csv-core" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b2466559f260f48ad25fe6317b3c8dac77b5bdb5763ac7d9d6103530663bc90" +dependencies = [ + "memchr", +] + [[package]] name = "der" version = "0.7.8" @@ -726,6 +757,15 @@ dependencies = [ "serde", ] +[[package]] +name = "encoding_rs" +version = "0.8.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7268b386296a025e474d5140678f75d6de9493ae55a5d709eeb9dd08149945e1" +dependencies = [ + "cfg-if", +] + [[package]] name = "entity" version = "0.1.0" @@ -946,7 +986,7 @@ checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427" dependencies = [ "cfg-if", "libc", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", ] [[package]] @@ -1351,7 +1391,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2" dependencies = [ "libc", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", "windows-sys", ] @@ -1359,15 +1399,42 @@ dependencies = [ name = "monzo-ingestion" version = "0.1.0" dependencies = [ + "anyhow", "axum", + "chrono", + "clap", + "csv", + "entity", + "http", + "migration", + "num-traits", "sea-orm", "serde", "serde_json", + "thiserror", "tokio", "tracing", "tracing-subscriber", ] +[[package]] +name = "multer" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01acbdc23469fd8fe07ab135923371d5f5a422fbf9c522158677c8eb15bc51c2" +dependencies = [ + "bytes", + "encoding_rs", + "futures-util", + "http", + "httparse", + "log", + "memchr", + "mime", + "spin 0.9.8", + "version_check", +] + [[package]] name = "nom" version = "7.1.3" @@ -2018,7 +2085,7 @@ dependencies = [ "sqlx", "strum", "thiserror", - "time", + "time 0.3.28", "tracing", "url", "uuid", @@ -2086,7 +2153,7 @@ dependencies = [ "rust_decimal", "sea-query-derive", "serde_json", - "time", + "time 0.3.28", "uuid", ] @@ -2102,7 +2169,7 @@ dependencies = [ "sea-query", "serde_json", "sqlx", - "time", + "time 0.3.28", "uuid", ] @@ -2380,7 +2447,7 @@ dependencies = [ "smallvec", "sqlformat", "thiserror", - "time", + "time 0.3.28", "tokio", "tokio-stream", "tracing", @@ -2469,7 +2536,7 @@ dependencies = [ "sqlx-core", "stringprep", "thiserror", - "time", + "time 0.3.28", "tracing", "uuid", "whoami", @@ -2514,7 +2581,7 @@ dependencies = [ "sqlx-core", "stringprep", "thiserror", - "time", + "time 0.3.28", "tracing", "uuid", "whoami", @@ -2539,7 +2606,7 @@ dependencies = [ "percent-encoding", "serde", "sqlx-core", - "time", + "time 0.3.28", "tracing", "url", "uuid", @@ -2656,6 +2723,17 @@ dependencies = [ "once_cell", ] +[[package]] +name = "time" +version = "0.1.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b797afad3f312d1c66a56d11d0316f916356d11bd158fbc6ca6389ff6bf805a" +dependencies = [ + "libc", + "wasi 0.10.0+wasi-snapshot-preview1", + "winapi", +] + [[package]] name = "time" version = "0.3.28" @@ -2956,6 +3034,12 @@ dependencies = [ "try-lock", ] +[[package]] +name = "wasi" +version = "0.10.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" diff --git a/Cargo.toml b/Cargo.toml index c38e926..d496aeb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,10 @@ version = "0.1.0" edition = "2021" [dependencies] -axum = "0.6.20" +entity = { path = "entity" } +migration = { path = "migration" } + +axum = { version = "0.6.20", features = ["multipart"] } tokio = { version = "1.32.0", features = ["full"] } sea-orm = { version = "0.12", features = [ "sqlx-postgres", @@ -16,6 +19,13 @@ serde = { version = "1.0.188", features = ["derive"] } serde_json = "1.0.105" tracing-subscriber = "0.3.17" tracing = "0.1.37" +anyhow = "1.0.75" +thiserror = "1.0.48" +http = "0.2.9" +chrono = { version = "0.4.28", features = ["serde"] } +num-traits = "0.2.16" +csv = "1.2.2" +clap = "4.4.2" [workspace] members = [".", "migration", "entity"] diff --git a/entity/src/expenditure.rs b/entity/src/expenditure.rs index 062593e..c457147 100644 --- a/entity/src/expenditure.rs +++ b/entity/src/expenditure.rs @@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize}; #[sea_orm(table_name = "expenditure")] pub struct Model { #[sea_orm(primary_key, auto_increment = false)] - pub transaction_id: i32, + pub transaction_id: String, #[sea_orm(primary_key, auto_increment = false)] pub category: String, pub amount: Decimal, diff --git a/entity/src/mod.rs b/entity/src/lib.rs similarity index 100% rename from entity/src/mod.rs rename to entity/src/lib.rs diff --git a/migration/src/m20230904_141851_create_monzo_tables.rs b/migration/src/m20230904_141851_create_monzo_tables.rs index f364ab7..3f86d3e 100644 --- a/migration/src/m20230904_141851_create_monzo_tables.rs +++ b/migration/src/m20230904_141851_create_monzo_tables.rs @@ -62,7 +62,7 @@ impl MigrationTrait for Migration { .if_not_exists() .col( ColumnDef::new(Expenditure::TransactionId) - .integer() + .string() .not_null(), ) .col(ColumnDef::new(Expenditure::Category).string().not_null()) diff --git a/src/error.rs b/src/error.rs new file mode 100644 index 0000000..af1787d --- /dev/null +++ b/src/error.rs @@ -0,0 +1,30 @@ +use axum::response::{IntoResponse, Response}; +use http::StatusCode; +use sea_orm::DbErr; +use tracing::log::error; + +#[derive(thiserror::Error, Debug)] +pub enum AppError { + /// SeaORM error, separated for ease of use allowing us to `?` db operations. + #[error("Internal error")] + DbError(#[from] DbErr), + + #[error("Invalid request {0}")] + BadRequest(anyhow::Error), + + /// Catch all for error we dont care to expose publicly. + #[error("Internal error")] + Anyhow(#[from] anyhow::Error), +} + +impl IntoResponse for AppError { + fn into_response(self) -> Response { + error!("Internal server error: {self:?}"); + + let status_code = match self { + _ => StatusCode::INTERNAL_SERVER_ERROR, + }; + + (status_code, self.to_string()).into_response() + } +} diff --git a/src/ingestion/db.rs b/src/ingestion/db.rs new file mode 100644 index 0000000..ea8996a --- /dev/null +++ b/src/ingestion/db.rs @@ -0,0 +1,62 @@ +use entity::{expenditure, transaction}; +use sea_orm::sea_query::OnConflict; +use sea_orm::QueryFilter; +use sea_orm::{ColumnTrait, DatabaseConnection, DbErr, EntityTrait, Iterable, TransactionTrait}; + +pub struct Insertion { + pub transaction: transaction::ActiveModel, + pub contained_expenditures: Vec, +} + +// Note while this is more efficient in db calls, it does bind together the entire group. +// We employ a batching process for now to try balance speed and failure rate but it is worth trying +// to move failures earlier and improve reporting. +pub async fn insert(db: &DatabaseConnection, insertions: Vec) -> Result<(), DbErr> { + for insertions in insertions.chunks(400) { + let tx = db.begin().await?; + + transaction::Entity::insert_many( + insertions.iter().map(|i| &i.transaction).cloned(), + ) + .on_conflict( + OnConflict::column(transaction::Column::Id) + .update_columns(transaction::Column::iter()) + .to_owned(), + ) + .exec(&tx) + .await?; + + // Expenditures can change as we recagegorise them, so we delete all the old ones and insert + // an entirely new set to ensure we don't end up leaving old ones around. + expenditure::Entity::delete_many() + .filter( + expenditure::Column::TransactionId.is_in( + insertions + .iter() + .map(|i| i.transaction.id.as_ref()), + ), + ) + .exec(&tx).await?; + + expenditure::Entity::insert_many( + insertions + .iter() + .flat_map(|i| &i.contained_expenditures) + .cloned(), + ) + .on_conflict( + OnConflict::columns(vec![ + expenditure::Column::TransactionId, + expenditure::Column::Category, + ]) + .update_columns(expenditure::Column::iter()) + .to_owned(), + ) + .exec(&tx) + .await?; + + tx.commit().await?; + } + + Ok(()) +} diff --git a/src/ingestion/ingestion.rs b/src/ingestion/ingestion.rs new file mode 100644 index 0000000..2bbaa44 --- /dev/null +++ b/src/ingestion/ingestion.rs @@ -0,0 +1,67 @@ +use anyhow::anyhow; +use axum::extract::{Extension, Json, Multipart}; +use sea_orm::DatabaseConnection; +use serde_json::Value; +use std::io::Cursor; +use crate::error::AppError; +use crate::ingestion::db; +use crate::ingestion::ingestion_logic::{from_csv_row, from_json_row}; + +pub async fn monzo_updated( + Extension(db): Extension, + Json(row): Json>, +) -> Result<&'static str, AppError> { + db::insert(&db, vec![from_json_row(row)?]).await.unwrap(); + + Ok("Ok") +} + +pub async fn monzo_batched_json( + Extension(db): Extension, + Json(data): Json>>, +) -> Result<&'static str, AppError> { + let insertions = data + .into_iter() + .skip(1) + .map(|row| from_json_row(row)) + .collect::>()?; + + db::insert(&db, insertions).await.unwrap(); + + Ok("Ok") +} + +pub async fn monzo_batched_csv( + Extension(db): Extension, + mut multipart: Multipart, +) -> Result<&'static str, AppError> { + let csv = loop { + match multipart.next_field().await.unwrap() { + Some(field) if field.name() == Some("csv") => { + break Some(field.bytes().await.unwrap()); + } + + Some(_) => {} + None => break None, + } + }; + + let Some(csv) = csv else { + return Err(AppError::BadRequest(anyhow!("No CSV file provided"))); + }; + + let csv = Cursor::new(csv); + let mut csv = csv::Reader::from_reader(csv); + let data = csv.records(); + + db::insert( + &db, + data.filter_map(|f| f.ok()) + .map(from_csv_row) + .collect::>()?, + ) + .await + .unwrap(); + + Ok("Ok") +} diff --git a/src/ingestion/ingestion_logic.rs b/src/ingestion/ingestion_logic.rs new file mode 100644 index 0000000..68a8a48 --- /dev/null +++ b/src/ingestion/ingestion_logic.rs @@ -0,0 +1,182 @@ +use anyhow::Context; +use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime}; +use entity::expenditure::ActiveModel; +use entity::transaction; +use num_traits::FromPrimitive; +use sea_orm::prelude::Decimal; +use sea_orm::ActiveValue::*; +use sea_orm::IntoActiveModel; +use crate::ingestion::db::Insertion; +use csv::StringRecord; + +#[allow(dead_code)] +mod headings { + pub const TRANSACTION_ID: usize = 0; + pub const DATE: usize = 1; + pub const TIME: usize = 2; + pub const TYPE: usize = 3; + pub const NAME: usize = 4; + pub const EMOJI: usize = 5; + pub const CATEGORY: usize = 6; + pub const AMOUNT: usize = 7; + pub const CURRENCY: usize = 8; + pub const LOCAL_AMOUNT: usize = 9; + pub const LOCAL_CURRENCY: usize = 10; + pub const NOTES_AND_TAGS: usize = 11; + pub const ADDRESS: usize = 12; + pub const RECEIPT: usize = 13; + pub const DESCRIPTION: usize = 14; + pub const CATEGORY_SPLIT: usize = 15; +} + +fn parse_section(monzo_transaction_id: &str, section: &str) -> anyhow::Result { + let mut components = section.split(':'); + let category: String = components + .next() + .context("Missing Missing category")? + .to_string(); + + let amount = components + .next() + .context("Missing amount")? + .parse::()?; + + Ok(entity::expenditure::Model { + transaction_id: monzo_transaction_id.to_string(), + category, + amount, + } + .into_active_model()) +} + +fn json_opt(value: &serde_json::Value) -> Option { + match value { + serde_json::Value::String(string) if string.is_empty() => None, + serde_json::Value::String(string) => Some(string.to_string()), + _ => None, + } +} + +pub fn from_json_row(row: Vec) -> anyhow::Result { + use serde_json::Value; + let monzo_transaction_id = row[headings::TRANSACTION_ID] + .as_str() + .context("No transaction id")? + .to_string(); + + let date = DateTime::parse_from_rfc3339(row[headings::DATE].as_str().context("No date")?) + .context("Failed to parse date")?; + + let time = DateTime::parse_from_rfc3339(row[headings::TIME].as_str().context("No time")?) + .context("Failed to parse date")? + .time(); + + let timestamp = date.date_naive().and_time(time); + + let title = row[headings::NAME] + .as_str() + .context("No title")? + .to_string(); + + let monzo_transaction_type = row[headings::TYPE] + .as_str() + .context("No transaction type")? + .to_string(); + + let description = json_opt(&row[headings::DESCRIPTION]); + let emoji = json_opt(&row[headings::EMOJI]); + let notes = json_opt(&row[headings::NOTES_AND_TAGS]); + let receipt = json_opt(&row[headings::RECEIPT]); + let total_amount = Decimal::from_f64(row[headings::AMOUNT].as_f64().context("No amount")?) + .context("Failed to parse date")?; + + let expenditures: Vec<_> = match row.get(headings::CATEGORY_SPLIT) { + Some(Value::String(split)) if !split.is_empty() => split + .split(',') + .map(|section| parse_section(&monzo_transaction_id, section)) + .collect::, anyhow::Error>>()?, + + _ => vec![entity::expenditure::Model { + category: row[headings::CATEGORY] + .as_str() + .context("No context")? + .to_string(), + amount: total_amount, + transaction_id: monzo_transaction_id.clone(), + } + .into_active_model()], + }; + + Ok(Insertion { + transaction: transaction::ActiveModel { + id: Set(monzo_transaction_id), + transaction_type: Set(monzo_transaction_type), + timestamp: Set(timestamp), + title: Set(title), + emoji: Set(emoji), + notes: Set(notes), + receipt: Set(receipt), + total_amount: Set(total_amount), + description: Set(description), + }, + + contained_expenditures: expenditures, + }) +} + +fn csv_opt(s: &str) -> Option { + match s { + "" => None, + v => Some(v.to_string()), + } +} + +pub fn from_csv_row(row: StringRecord) -> anyhow::Result { + let monzo_transaction_id = row[headings::TRANSACTION_ID].to_string(); + + let date = NaiveDate::parse_from_str(&row[headings::DATE], "%d/%m/%Y") + .context("Failed to parse date from csv")?; + + let time = NaiveTime::parse_from_str(&row[headings::TIME], "%H:%M:%S") + .context("Failed to parse time from csv")?; + + let timestamp = NaiveDateTime::new(date, time); + + let title = row[headings::NAME].to_string(); + let monzo_transaction_type = row[headings::TYPE].to_string(); + + let description = csv_opt(&row[headings::DESCRIPTION]); + let emoji = csv_opt(&row[headings::EMOJI]); + let notes = csv_opt(&row[headings::NOTES_AND_TAGS]); + let receipt = csv_opt(&row[headings::RECEIPT]); + let total_amount = row[headings::AMOUNT].parse::()?; + + let expenditures: Vec<_> = match row.get(headings::CATEGORY_SPLIT) { + Some(split) if !split.is_empty() => split + .split(',') + .map(|section| parse_section(&monzo_transaction_id, section)) + .collect::, anyhow::Error>>()?, + + _ => vec![entity::expenditure::Model { + transaction_id: monzo_transaction_id.clone(), + category: row[headings::CATEGORY].to_string(), + amount: total_amount, + } + .into_active_model()], + }; + + Ok(Insertion { + transaction: transaction::ActiveModel { + id: Set(monzo_transaction_id), + transaction_type: Set(monzo_transaction_type), + timestamp: Set(timestamp), + title: Set(title), + emoji: Set(emoji), + notes: Set(notes), + receipt: Set(receipt), + total_amount: Set(total_amount), + description: Set(description), + }, + contained_expenditures: expenditures, + }) +} diff --git a/src/ingestion/mod.rs b/src/ingestion/mod.rs new file mode 100644 index 0000000..14e76f1 --- /dev/null +++ b/src/ingestion/mod.rs @@ -0,0 +1,3 @@ +pub mod db; +pub mod ingestion; +pub mod ingestion_logic; diff --git a/src/main.rs b/src/main.rs index 693a042..4c9c4a9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,30 +1,39 @@ -use axum::{ - routing::{get, post}, - http::StatusCode, - response::IntoResponse, - Json, Router, -}; -use serde::{Deserialize, Serialize}; +mod ingestion; +mod error; + +use axum::{Extension, Router}; use std::net::SocketAddr; +use axum::routing::post; +use clap::Parser; +use migration::{Migrator, MigratorTrait}; +use crate::ingestion::ingestion::{monzo_batched_csv, monzo_batched_json, monzo_updated}; + +#[derive(Debug, clap::Parser)] +struct Config { + #[clap(short, long, env)] + addr: SocketAddr, + #[clap(short, long = "db", env)] + database_url: String, +} #[tokio::main] -async fn main() { - // initialize tracing +async fn main() -> anyhow::Result<()> { + let config: Config = Config::parse(); + let connection = sea_orm::Database::connect(&config.database_url).await?; + Migrator::up(&connection, None).await?; + tracing_subscriber::fmt::init(); - - // build our application with a route let app = Router::new() - // `GET /` goes to `root` - .route("/", get(root)) - // `POST /users` goes to `create_user` - .route("/users", post(create_user)); + .route("/monzo-updated", post(monzo_updated)) + .route("/monzo-batch-export", post(monzo_batched_json)) + .route("/monzo-csv-ingestion", post(monzo_batched_csv)) + .layer(Extension(connection.clone())); - // run our app with hyper - // `axum::Server` is a re-export of `hyper::Server` - let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); - tracing::debug!("listening on {}", addr); - axum::Server::bind(&addr) + tracing::debug!("listening on {}", &config.addr); + axum::Server::bind(&config.addr) .serve(app.into_make_service()) .await .unwrap(); -} \ No newline at end of file + + Ok(()) +}