From f19f861297ec224665e0429d9adb97b6d37c16d0 Mon Sep 17 00:00:00 2001 From: Joshua Coles Date: Wed, 29 May 2024 20:49:48 +0100 Subject: [PATCH] Introduce MonzoRow as an intermediary between json/csv and the database entities. This will allow for hashing to perform identity checks --- src/ingestion/db.rs | 7 +- src/ingestion/ingestion_logic.rs | 224 +++++++++++++++---------------- 2 files changed, 117 insertions(+), 114 deletions(-) diff --git a/src/ingestion/db.rs b/src/ingestion/db.rs index 553b874..0edcdec 100644 --- a/src/ingestion/db.rs +++ b/src/ingestion/db.rs @@ -7,6 +7,7 @@ use sea_orm::{ ConnectionTrait, DatabaseBackend, DatabaseTransaction, DbErr, QueryFilter, QueryTrait, Statement, }; +use crate::ingestion::ingestion_logic::MonzoRow; pub struct Insertion { pub transaction: transaction::ActiveModel, @@ -52,9 +53,13 @@ async fn update_expenditures( // trying to move failures earlier and improve reporting. pub async fn insert( db: &DatabaseConnection, - insertions: Vec, + monzo_rows: Vec, ) -> Result, AppError> { let mut new_transaction_ids = Vec::new(); + let insertions = monzo_rows + .into_iter() + .map(MonzoRow::into_insertion) + .collect::, _>>()?; for insertions in insertions.chunks(400) { let tx = db.begin().await?; diff --git a/src/ingestion/ingestion_logic.rs b/src/ingestion/ingestion_logic.rs index 8fcd468..de42078 100644 --- a/src/ingestion/ingestion_logic.rs +++ b/src/ingestion/ingestion_logic.rs @@ -1,13 +1,13 @@ use crate::ingestion::db::Insertion; -use anyhow::Context; +use anyhow::{anyhow, Context}; use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime}; use csv::StringRecord; use entity::expenditure::ActiveModel; use entity::transaction; use num_traits::FromPrimitive; use sea_orm::prelude::Decimal; -use sea_orm::ActiveValue::*; use sea_orm::IntoActiveModel; +use serde_json::Value; #[allow(dead_code)] mod headings { @@ -29,41 +29,98 @@ mod headings { pub const CATEGORY_SPLIT: usize = 15; } -fn parse_section(monzo_transaction_id: &str, section: &str) -> anyhow::Result { - let mut components = section.split(':'); - let category: String = components - .next() - .context("Missing Missing category")? - .to_string(); - - let amount = components - .next() - .context("Missing amount")? - .parse::()?; - - Ok(entity::expenditure::Model { - transaction_id: monzo_transaction_id.to_string(), - category, - amount, - } - .into_active_model()) +#[derive(Debug, Eq, PartialEq, Hash)] +pub struct MonzoRow { + category_split: Option, + primary_category: String, + total_amount: Decimal, + receipt: Option, + notes: Option, + emoji: Option, + description: Option, + transaction_type: String, + title: String, + timestamp: NaiveDateTime, + transaction_id: String, } -fn json_opt(value: &serde_json::Value) -> Option { +impl MonzoRow { + fn parse_section(monzo_transaction_id: &str, section: &str) -> anyhow::Result { + let mut components = section.split(':'); + let category: String = components + .next() + .context("Missing Missing category")? + .to_string(); + + let amount = components + .next() + .context("Missing amount")? + .parse::()?; + + Ok(entity::expenditure::Model { + transaction_id: monzo_transaction_id.to_string(), + category, + amount, + }.into_active_model()) + } + + pub fn into_insertion(self) -> Result { + let expenditures: Vec<_> = match self.category_split { + Some(split) if !split.is_empty() => split + .split(',') + .map(|section| Self::parse_section(&self.transaction_id, section)) + .collect::, anyhow::Error>>()?, + + _ => vec![entity::expenditure::Model { + category: self.primary_category.clone(), + amount: self.total_amount, + transaction_id: self.transaction_id.clone(), + } + .into_active_model()], + }; + + Ok(Insertion { + transaction: transaction::Model { + id: self.transaction_id, + transaction_type: self.transaction_type, + timestamp: self.timestamp, + title: self.title, + emoji: self.emoji, + notes: self.notes, + receipt: self.receipt, + total_amount: self.total_amount, + description: self.description, + }.into_active_model(), + + contained_expenditures: expenditures, + }) + } +} + +fn json_opt(value: &Value) -> Option { match value { - serde_json::Value::String(string) if string.is_empty() => None, - serde_json::Value::String(string) => Some(string.to_string()), + Value::String(string) if string.is_empty() => None, + Value::String(string) => Some(string.to_string()), _ => None, } } -pub fn from_json_row(row: Vec) -> anyhow::Result { - use serde_json::Value; - let monzo_transaction_id = row[headings::TRANSACTION_ID] - .as_str() - .context("No transaction id")? - .to_string(); +fn json_required_str(value: &Value, label: &str) -> anyhow::Result { + match value { + Value::String(string) if string.is_empty() => Err(anyhow!("{} is empty", label)), + Value::String(string) => Ok(string.to_string()), + _ => Err(anyhow!("{} is not a string", label)), + } +} +fn parse_timestamp(date: &str, time: &str) -> anyhow::Result { + let date = NaiveDate::parse_from_str(date, "%Y-%m-%d")?; + let time = NaiveTime::parse_from_str(time, "%H:%M:%S")?; + + Ok(date.and_time(time)) +} + +pub fn from_json_row(row: Vec) -> anyhow::Result { let date = DateTime::parse_from_rfc3339(row[headings::DATE].as_str().context("No date")?) .context("Failed to parse date")?; @@ -73,54 +130,21 @@ pub fn from_json_row(row: Vec) -> anyhow::Result { let timestamp = date.date_naive().and_time(time); - let title = row[headings::NAME] - .as_str() - .context("No title")? - .to_string(); - - let monzo_transaction_type = row[headings::TYPE] - .as_str() - .context("No transaction type")? - .to_string(); - - let description = json_opt(&row[headings::DESCRIPTION]); - let emoji = json_opt(&row[headings::EMOJI]); - let notes = json_opt(&row[headings::NOTES_AND_TAGS]); - let receipt = json_opt(&row[headings::RECEIPT]); let total_amount = Decimal::from_f64(row[headings::AMOUNT].as_f64().context("No amount")?) .context("Failed to parse date")?; - let expenditures: Vec<_> = match row.get(headings::CATEGORY_SPLIT) { - Some(Value::String(split)) if !split.is_empty() => split - .split(',') - .map(|section| parse_section(&monzo_transaction_id, section)) - .collect::, anyhow::Error>>()?, - - _ => vec![entity::expenditure::Model { - category: row[headings::CATEGORY] - .as_str() - .context("No context")? - .to_string(), - amount: total_amount, - transaction_id: monzo_transaction_id.clone(), - } - .into_active_model()], - }; - - Ok(Insertion { - transaction: transaction::ActiveModel { - id: Set(monzo_transaction_id), - transaction_type: Set(monzo_transaction_type), - timestamp: Set(timestamp), - title: Set(title), - emoji: Set(emoji), - notes: Set(notes), - receipt: Set(receipt), - total_amount: Set(total_amount), - description: Set(description), - }, - - contained_expenditures: expenditures, + Ok(MonzoRow { + transaction_id: json_required_str(&row[headings::TRANSACTION_ID], "Transaction ID")?, + title: json_required_str(&row[headings::NAME], "Title")?, + transaction_type: json_required_str(&row[headings::TYPE], "Transaction type")?, + description: json_opt(&row[headings::DESCRIPTION]), + emoji: json_opt(&row[headings::EMOJI]), + notes: json_opt(&row[headings::NOTES_AND_TAGS]), + receipt: json_opt(&row[headings::RECEIPT]), + primary_category: json_required_str(&row[headings::CATEGORY], "Primary Category")?, + category_split: json_opt(&row[headings::CATEGORY_SPLIT]), + total_amount, + timestamp, }) } @@ -131,9 +155,7 @@ fn csv_opt(s: &str) -> Option { } } -pub fn from_csv_row(row: StringRecord) -> anyhow::Result { - let monzo_transaction_id = row[headings::TRANSACTION_ID].to_string(); - +pub fn from_csv_row(row: StringRecord) -> anyhow::Result { let date = NaiveDate::parse_from_str(&row[headings::DATE], "%d/%m/%Y") .context("Failed to parse date from csv")?; @@ -142,41 +164,17 @@ pub fn from_csv_row(row: StringRecord) -> anyhow::Result { let timestamp = NaiveDateTime::new(date, time); - let title = row[headings::NAME].to_string(); - let monzo_transaction_type = row[headings::TYPE].to_string(); - - let description = csv_opt(&row[headings::DESCRIPTION]); - let emoji = csv_opt(&row[headings::EMOJI]); - let notes = csv_opt(&row[headings::NOTES_AND_TAGS]); - let receipt = csv_opt(&row[headings::RECEIPT]); - let total_amount = row[headings::AMOUNT].parse::()?; - - let expenditures: Vec<_> = match row.get(headings::CATEGORY_SPLIT) { - Some(split) if !split.is_empty() => split - .split(',') - .map(|section| parse_section(&monzo_transaction_id, section)) - .collect::, anyhow::Error>>()?, - - _ => vec![entity::expenditure::Model { - transaction_id: monzo_transaction_id.clone(), - category: row[headings::CATEGORY].to_string(), - amount: total_amount, - } - .into_active_model()], - }; - - Ok(Insertion { - transaction: transaction::ActiveModel { - id: Set(monzo_transaction_id), - transaction_type: Set(monzo_transaction_type), - timestamp: Set(timestamp), - title: Set(title), - emoji: Set(emoji), - notes: Set(notes), - receipt: Set(receipt), - total_amount: Set(total_amount), - description: Set(description), - }, - contained_expenditures: expenditures, + Ok(MonzoRow { + timestamp, + transaction_id: row[headings::TRANSACTION_ID].to_string(), + title: row[headings::NAME].to_string(), + transaction_type: row[headings::TYPE].to_string(), + description: csv_opt(&row[headings::DESCRIPTION]), + emoji: csv_opt(&row[headings::EMOJI]), + notes: csv_opt(&row[headings::NOTES_AND_TAGS]), + receipt: csv_opt(&row[headings::RECEIPT]), + total_amount: row[headings::AMOUNT].parse::()?, + category_split: csv_opt(&row[headings::CATEGORY_SPLIT]), + primary_category: row[headings::CATEGORY].to_string(), }) }