Import of old stuff, it might work now

This commit is contained in:
Joshua Coles 2023-09-04 17:00:36 +01:00
parent 554e645f92
commit b6e7c870a7
11 changed files with 480 additions and 33 deletions

102
Cargo.lock generated
View File

@ -124,6 +124,12 @@ dependencies = [
"windows-sys", "windows-sys",
] ]
[[package]]
name = "anyhow"
version = "1.0.75"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6"
[[package]] [[package]]
name = "arrayvec" name = "arrayvec"
version = "0.7.4" version = "0.7.4"
@ -315,6 +321,7 @@ dependencies = [
"matchit", "matchit",
"memchr", "memchr",
"mime", "mime",
"multer",
"percent-encoding", "percent-encoding",
"pin-project-lite", "pin-project-lite",
"rustversion", "rustversion",
@ -543,8 +550,11 @@ checksum = "95ed24df0632f708f5f6d8082675bef2596f7084dee3dd55f632290bf35bfe0f"
dependencies = [ dependencies = [
"android-tzdata", "android-tzdata",
"iana-time-zone", "iana-time-zone",
"js-sys",
"num-traits", "num-traits",
"serde", "serde",
"time 0.1.45",
"wasm-bindgen",
"windows-targets", "windows-targets",
] ]
@ -668,6 +678,27 @@ dependencies = [
"typenum", "typenum",
] ]
[[package]]
name = "csv"
version = "1.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "626ae34994d3d8d668f4269922248239db4ae42d538b14c398b74a52208e8086"
dependencies = [
"csv-core",
"itoa",
"ryu",
"serde",
]
[[package]]
name = "csv-core"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b2466559f260f48ad25fe6317b3c8dac77b5bdb5763ac7d9d6103530663bc90"
dependencies = [
"memchr",
]
[[package]] [[package]]
name = "der" name = "der"
version = "0.7.8" version = "0.7.8"
@ -726,6 +757,15 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "encoding_rs"
version = "0.8.33"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7268b386296a025e474d5140678f75d6de9493ae55a5d709eeb9dd08149945e1"
dependencies = [
"cfg-if",
]
[[package]] [[package]]
name = "entity" name = "entity"
version = "0.1.0" version = "0.1.0"
@ -946,7 +986,7 @@ checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"libc", "libc",
"wasi", "wasi 0.11.0+wasi-snapshot-preview1",
] ]
[[package]] [[package]]
@ -1351,7 +1391,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2" checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2"
dependencies = [ dependencies = [
"libc", "libc",
"wasi", "wasi 0.11.0+wasi-snapshot-preview1",
"windows-sys", "windows-sys",
] ]
@ -1359,15 +1399,42 @@ dependencies = [
name = "monzo-ingestion" name = "monzo-ingestion"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow",
"axum", "axum",
"chrono",
"clap",
"csv",
"entity",
"http",
"migration",
"num-traits",
"sea-orm", "sea-orm",
"serde", "serde",
"serde_json", "serde_json",
"thiserror",
"tokio", "tokio",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",
] ]
[[package]]
name = "multer"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "01acbdc23469fd8fe07ab135923371d5f5a422fbf9c522158677c8eb15bc51c2"
dependencies = [
"bytes",
"encoding_rs",
"futures-util",
"http",
"httparse",
"log",
"memchr",
"mime",
"spin 0.9.8",
"version_check",
]
[[package]] [[package]]
name = "nom" name = "nom"
version = "7.1.3" version = "7.1.3"
@ -2018,7 +2085,7 @@ dependencies = [
"sqlx", "sqlx",
"strum", "strum",
"thiserror", "thiserror",
"time", "time 0.3.28",
"tracing", "tracing",
"url", "url",
"uuid", "uuid",
@ -2086,7 +2153,7 @@ dependencies = [
"rust_decimal", "rust_decimal",
"sea-query-derive", "sea-query-derive",
"serde_json", "serde_json",
"time", "time 0.3.28",
"uuid", "uuid",
] ]
@ -2102,7 +2169,7 @@ dependencies = [
"sea-query", "sea-query",
"serde_json", "serde_json",
"sqlx", "sqlx",
"time", "time 0.3.28",
"uuid", "uuid",
] ]
@ -2380,7 +2447,7 @@ dependencies = [
"smallvec", "smallvec",
"sqlformat", "sqlformat",
"thiserror", "thiserror",
"time", "time 0.3.28",
"tokio", "tokio",
"tokio-stream", "tokio-stream",
"tracing", "tracing",
@ -2469,7 +2536,7 @@ dependencies = [
"sqlx-core", "sqlx-core",
"stringprep", "stringprep",
"thiserror", "thiserror",
"time", "time 0.3.28",
"tracing", "tracing",
"uuid", "uuid",
"whoami", "whoami",
@ -2514,7 +2581,7 @@ dependencies = [
"sqlx-core", "sqlx-core",
"stringprep", "stringprep",
"thiserror", "thiserror",
"time", "time 0.3.28",
"tracing", "tracing",
"uuid", "uuid",
"whoami", "whoami",
@ -2539,7 +2606,7 @@ dependencies = [
"percent-encoding", "percent-encoding",
"serde", "serde",
"sqlx-core", "sqlx-core",
"time", "time 0.3.28",
"tracing", "tracing",
"url", "url",
"uuid", "uuid",
@ -2656,6 +2723,17 @@ dependencies = [
"once_cell", "once_cell",
] ]
[[package]]
name = "time"
version = "0.1.45"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b797afad3f312d1c66a56d11d0316f916356d11bd158fbc6ca6389ff6bf805a"
dependencies = [
"libc",
"wasi 0.10.0+wasi-snapshot-preview1",
"winapi",
]
[[package]] [[package]]
name = "time" name = "time"
version = "0.3.28" version = "0.3.28"
@ -2956,6 +3034,12 @@ dependencies = [
"try-lock", "try-lock",
] ]
[[package]]
name = "wasi"
version = "0.10.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f"
[[package]] [[package]]
name = "wasi" name = "wasi"
version = "0.11.0+wasi-snapshot-preview1" version = "0.11.0+wasi-snapshot-preview1"

View File

@ -4,7 +4,10 @@ version = "0.1.0"
edition = "2021" edition = "2021"
[dependencies] [dependencies]
axum = "0.6.20" entity = { path = "entity" }
migration = { path = "migration" }
axum = { version = "0.6.20", features = ["multipart"] }
tokio = { version = "1.32.0", features = ["full"] } tokio = { version = "1.32.0", features = ["full"] }
sea-orm = { version = "0.12", features = [ sea-orm = { version = "0.12", features = [
"sqlx-postgres", "sqlx-postgres",
@ -16,6 +19,13 @@ serde = { version = "1.0.188", features = ["derive"] }
serde_json = "1.0.105" serde_json = "1.0.105"
tracing-subscriber = "0.3.17" tracing-subscriber = "0.3.17"
tracing = "0.1.37" tracing = "0.1.37"
anyhow = "1.0.75"
thiserror = "1.0.48"
http = "0.2.9"
chrono = { version = "0.4.28", features = ["serde"] }
num-traits = "0.2.16"
csv = "1.2.2"
clap = "4.4.2"
[workspace] [workspace]
members = [".", "migration", "entity"] members = [".", "migration", "entity"]

View File

@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize};
#[sea_orm(table_name = "expenditure")] #[sea_orm(table_name = "expenditure")]
pub struct Model { pub struct Model {
#[sea_orm(primary_key, auto_increment = false)] #[sea_orm(primary_key, auto_increment = false)]
pub transaction_id: i32, pub transaction_id: String,
#[sea_orm(primary_key, auto_increment = false)] #[sea_orm(primary_key, auto_increment = false)]
pub category: String, pub category: String,
pub amount: Decimal, pub amount: Decimal,

View File

@ -62,7 +62,7 @@ impl MigrationTrait for Migration {
.if_not_exists() .if_not_exists()
.col( .col(
ColumnDef::new(Expenditure::TransactionId) ColumnDef::new(Expenditure::TransactionId)
.integer() .string()
.not_null(), .not_null(),
) )
.col(ColumnDef::new(Expenditure::Category).string().not_null()) .col(ColumnDef::new(Expenditure::Category).string().not_null())

30
src/error.rs Normal file
View File

@ -0,0 +1,30 @@
use axum::response::{IntoResponse, Response};
use http::StatusCode;
use sea_orm::DbErr;
use tracing::log::error;
#[derive(thiserror::Error, Debug)]
pub enum AppError {
/// SeaORM error, separated for ease of use allowing us to `?` db operations.
#[error("Internal error")]
DbError(#[from] DbErr),
#[error("Invalid request {0}")]
BadRequest(anyhow::Error),
/// Catch all for error we dont care to expose publicly.
#[error("Internal error")]
Anyhow(#[from] anyhow::Error),
}
impl IntoResponse for AppError {
fn into_response(self) -> Response {
error!("Internal server error: {self:?}");
let status_code = match self {
_ => StatusCode::INTERNAL_SERVER_ERROR,
};
(status_code, self.to_string()).into_response()
}
}

62
src/ingestion/db.rs Normal file
View File

@ -0,0 +1,62 @@
use entity::{expenditure, transaction};
use sea_orm::sea_query::OnConflict;
use sea_orm::QueryFilter;
use sea_orm::{ColumnTrait, DatabaseConnection, DbErr, EntityTrait, Iterable, TransactionTrait};
pub struct Insertion {
pub transaction: transaction::ActiveModel,
pub contained_expenditures: Vec<expenditure::ActiveModel>,
}
// Note while this is more efficient in db calls, it does bind together the entire group.
// We employ a batching process for now to try balance speed and failure rate but it is worth trying
// to move failures earlier and improve reporting.
pub async fn insert(db: &DatabaseConnection, insertions: Vec<Insertion>) -> Result<(), DbErr> {
for insertions in insertions.chunks(400) {
let tx = db.begin().await?;
transaction::Entity::insert_many(
insertions.iter().map(|i| &i.transaction).cloned(),
)
.on_conflict(
OnConflict::column(transaction::Column::Id)
.update_columns(transaction::Column::iter())
.to_owned(),
)
.exec(&tx)
.await?;
// Expenditures can change as we recagegorise them, so we delete all the old ones and insert
// an entirely new set to ensure we don't end up leaving old ones around.
expenditure::Entity::delete_many()
.filter(
expenditure::Column::TransactionId.is_in(
insertions
.iter()
.map(|i| i.transaction.id.as_ref()),
),
)
.exec(&tx).await?;
expenditure::Entity::insert_many(
insertions
.iter()
.flat_map(|i| &i.contained_expenditures)
.cloned(),
)
.on_conflict(
OnConflict::columns(vec![
expenditure::Column::TransactionId,
expenditure::Column::Category,
])
.update_columns(expenditure::Column::iter())
.to_owned(),
)
.exec(&tx)
.await?;
tx.commit().await?;
}
Ok(())
}

View File

@ -0,0 +1,67 @@
use anyhow::anyhow;
use axum::extract::{Extension, Json, Multipart};
use sea_orm::DatabaseConnection;
use serde_json::Value;
use std::io::Cursor;
use crate::error::AppError;
use crate::ingestion::db;
use crate::ingestion::ingestion_logic::{from_csv_row, from_json_row};
pub async fn monzo_updated(
Extension(db): Extension<DatabaseConnection>,
Json(row): Json<Vec<Value>>,
) -> Result<&'static str, AppError> {
db::insert(&db, vec![from_json_row(row)?]).await.unwrap();
Ok("Ok")
}
pub async fn monzo_batched_json(
Extension(db): Extension<DatabaseConnection>,
Json(data): Json<Vec<Vec<Value>>>,
) -> Result<&'static str, AppError> {
let insertions = data
.into_iter()
.skip(1)
.map(|row| from_json_row(row))
.collect::<Result<_, _>>()?;
db::insert(&db, insertions).await.unwrap();
Ok("Ok")
}
pub async fn monzo_batched_csv(
Extension(db): Extension<DatabaseConnection>,
mut multipart: Multipart,
) -> Result<&'static str, AppError> {
let csv = loop {
match multipart.next_field().await.unwrap() {
Some(field) if field.name() == Some("csv") => {
break Some(field.bytes().await.unwrap());
}
Some(_) => {}
None => break None,
}
};
let Some(csv) = csv else {
return Err(AppError::BadRequest(anyhow!("No CSV file provided")));
};
let csv = Cursor::new(csv);
let mut csv = csv::Reader::from_reader(csv);
let data = csv.records();
db::insert(
&db,
data.filter_map(|f| f.ok())
.map(from_csv_row)
.collect::<Result<_, _>>()?,
)
.await
.unwrap();
Ok("Ok")
}

View File

@ -0,0 +1,182 @@
use anyhow::Context;
use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime};
use entity::expenditure::ActiveModel;
use entity::transaction;
use num_traits::FromPrimitive;
use sea_orm::prelude::Decimal;
use sea_orm::ActiveValue::*;
use sea_orm::IntoActiveModel;
use crate::ingestion::db::Insertion;
use csv::StringRecord;
#[allow(dead_code)]
mod headings {
pub const TRANSACTION_ID: usize = 0;
pub const DATE: usize = 1;
pub const TIME: usize = 2;
pub const TYPE: usize = 3;
pub const NAME: usize = 4;
pub const EMOJI: usize = 5;
pub const CATEGORY: usize = 6;
pub const AMOUNT: usize = 7;
pub const CURRENCY: usize = 8;
pub const LOCAL_AMOUNT: usize = 9;
pub const LOCAL_CURRENCY: usize = 10;
pub const NOTES_AND_TAGS: usize = 11;
pub const ADDRESS: usize = 12;
pub const RECEIPT: usize = 13;
pub const DESCRIPTION: usize = 14;
pub const CATEGORY_SPLIT: usize = 15;
}
fn parse_section(monzo_transaction_id: &str, section: &str) -> anyhow::Result<ActiveModel> {
let mut components = section.split(':');
let category: String = components
.next()
.context("Missing Missing category")?
.to_string();
let amount = components
.next()
.context("Missing amount")?
.parse::<Decimal>()?;
Ok(entity::expenditure::Model {
transaction_id: monzo_transaction_id.to_string(),
category,
amount,
}
.into_active_model())
}
fn json_opt(value: &serde_json::Value) -> Option<String> {
match value {
serde_json::Value::String(string) if string.is_empty() => None,
serde_json::Value::String(string) => Some(string.to_string()),
_ => None,
}
}
pub fn from_json_row(row: Vec<serde_json::Value>) -> anyhow::Result<Insertion> {
use serde_json::Value;
let monzo_transaction_id = row[headings::TRANSACTION_ID]
.as_str()
.context("No transaction id")?
.to_string();
let date = DateTime::parse_from_rfc3339(row[headings::DATE].as_str().context("No date")?)
.context("Failed to parse date")?;
let time = DateTime::parse_from_rfc3339(row[headings::TIME].as_str().context("No time")?)
.context("Failed to parse date")?
.time();
let timestamp = date.date_naive().and_time(time);
let title = row[headings::NAME]
.as_str()
.context("No title")?
.to_string();
let monzo_transaction_type = row[headings::TYPE]
.as_str()
.context("No transaction type")?
.to_string();
let description = json_opt(&row[headings::DESCRIPTION]);
let emoji = json_opt(&row[headings::EMOJI]);
let notes = json_opt(&row[headings::NOTES_AND_TAGS]);
let receipt = json_opt(&row[headings::RECEIPT]);
let total_amount = Decimal::from_f64(row[headings::AMOUNT].as_f64().context("No amount")?)
.context("Failed to parse date")?;
let expenditures: Vec<_> = match row.get(headings::CATEGORY_SPLIT) {
Some(Value::String(split)) if !split.is_empty() => split
.split(',')
.map(|section| parse_section(&monzo_transaction_id, section))
.collect::<Result<Vec<_>, anyhow::Error>>()?,
_ => vec![entity::expenditure::Model {
category: row[headings::CATEGORY]
.as_str()
.context("No context")?
.to_string(),
amount: total_amount,
transaction_id: monzo_transaction_id.clone(),
}
.into_active_model()],
};
Ok(Insertion {
transaction: transaction::ActiveModel {
id: Set(monzo_transaction_id),
transaction_type: Set(monzo_transaction_type),
timestamp: Set(timestamp),
title: Set(title),
emoji: Set(emoji),
notes: Set(notes),
receipt: Set(receipt),
total_amount: Set(total_amount),
description: Set(description),
},
contained_expenditures: expenditures,
})
}
fn csv_opt(s: &str) -> Option<String> {
match s {
"" => None,
v => Some(v.to_string()),
}
}
pub fn from_csv_row(row: StringRecord) -> anyhow::Result<Insertion> {
let monzo_transaction_id = row[headings::TRANSACTION_ID].to_string();
let date = NaiveDate::parse_from_str(&row[headings::DATE], "%d/%m/%Y")
.context("Failed to parse date from csv")?;
let time = NaiveTime::parse_from_str(&row[headings::TIME], "%H:%M:%S")
.context("Failed to parse time from csv")?;
let timestamp = NaiveDateTime::new(date, time);
let title = row[headings::NAME].to_string();
let monzo_transaction_type = row[headings::TYPE].to_string();
let description = csv_opt(&row[headings::DESCRIPTION]);
let emoji = csv_opt(&row[headings::EMOJI]);
let notes = csv_opt(&row[headings::NOTES_AND_TAGS]);
let receipt = csv_opt(&row[headings::RECEIPT]);
let total_amount = row[headings::AMOUNT].parse::<Decimal>()?;
let expenditures: Vec<_> = match row.get(headings::CATEGORY_SPLIT) {
Some(split) if !split.is_empty() => split
.split(',')
.map(|section| parse_section(&monzo_transaction_id, section))
.collect::<Result<Vec<_>, anyhow::Error>>()?,
_ => vec![entity::expenditure::Model {
transaction_id: monzo_transaction_id.clone(),
category: row[headings::CATEGORY].to_string(),
amount: total_amount,
}
.into_active_model()],
};
Ok(Insertion {
transaction: transaction::ActiveModel {
id: Set(monzo_transaction_id),
transaction_type: Set(monzo_transaction_type),
timestamp: Set(timestamp),
title: Set(title),
emoji: Set(emoji),
notes: Set(notes),
receipt: Set(receipt),
total_amount: Set(total_amount),
description: Set(description),
},
contained_expenditures: expenditures,
})
}

3
src/ingestion/mod.rs Normal file
View File

@ -0,0 +1,3 @@
pub mod db;
pub mod ingestion;
pub mod ingestion_logic;

View File

@ -1,30 +1,39 @@
use axum::{ mod ingestion;
routing::{get, post}, mod error;
http::StatusCode,
response::IntoResponse, use axum::{Extension, Router};
Json, Router,
};
use serde::{Deserialize, Serialize};
use std::net::SocketAddr; use std::net::SocketAddr;
use axum::routing::post;
use clap::Parser;
use migration::{Migrator, MigratorTrait};
use crate::ingestion::ingestion::{monzo_batched_csv, monzo_batched_json, monzo_updated};
#[derive(Debug, clap::Parser)]
struct Config {
#[clap(short, long, env)]
addr: SocketAddr,
#[clap(short, long = "db", env)]
database_url: String,
}
#[tokio::main] #[tokio::main]
async fn main() { async fn main() -> anyhow::Result<()> {
// initialize tracing let config: Config = Config::parse();
let connection = sea_orm::Database::connect(&config.database_url).await?;
Migrator::up(&connection, None).await?;
tracing_subscriber::fmt::init(); tracing_subscriber::fmt::init();
// build our application with a route
let app = Router::new() let app = Router::new()
// `GET /` goes to `root` .route("/monzo-updated", post(monzo_updated))
.route("/", get(root)) .route("/monzo-batch-export", post(monzo_batched_json))
// `POST /users` goes to `create_user` .route("/monzo-csv-ingestion", post(monzo_batched_csv))
.route("/users", post(create_user)); .layer(Extension(connection.clone()));
// run our app with hyper tracing::debug!("listening on {}", &config.addr);
// `axum::Server` is a re-export of `hyper::Server` axum::Server::bind(&config.addr)
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
tracing::debug!("listening on {}", addr);
axum::Server::bind(&addr)
.serve(app.into_make_service()) .serve(app.into_make_service())
.await .await
.unwrap(); .unwrap();
}
Ok(())
}