Compare commits

...

2 Commits

Author SHA1 Message Date
3df05b2d9c Add identity_hash column
Some checks failed
Build and Publish Docker Container / build (push) Failing after 9m27s
2024-05-29 21:08:51 +01:00
f19f861297 Introduce MonzoRow as an intermediary between json/csv and the database entities. This will allow for hashing to perform identity checks 2024-05-29 20:49:48 +01:00
5 changed files with 174 additions and 116 deletions

View File

@ -16,6 +16,8 @@ pub struct Model {
pub notes: Option<String>, pub notes: Option<String>,
pub receipt: Option<String>, pub receipt: Option<String>,
pub description: Option<String>, pub description: Option<String>,
#[sea_orm(unique)]
pub identity_hash: Option<i64>,
} }
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]

View File

@ -1,6 +1,7 @@
pub use sea_orm_migration::prelude::*; pub use sea_orm_migration::prelude::*;
pub mod m20230904_141851_create_monzo_tables; pub mod m20230904_141851_create_monzo_tables;
mod m20240529_195030_add_transaction_identity_hash;
pub struct Migrator; pub struct Migrator;
@ -11,6 +12,9 @@ impl MigratorTrait for Migrator {
} }
fn migrations() -> Vec<Box<dyn MigrationTrait>> { fn migrations() -> Vec<Box<dyn MigrationTrait>> {
vec![Box::new(m20230904_141851_create_monzo_tables::Migration)] vec![
Box::new(m20230904_141851_create_monzo_tables::Migration),
Box::new(m20240529_195030_add_transaction_identity_hash::Migration),
]
} }
} }

View File

@ -0,0 +1,34 @@
use sea_orm_migration::prelude::*;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager.alter_table(
TableAlterStatement::new()
.table(Transaction::Table)
.add_column(
ColumnDef::new(Transaction::IdentityHash)
.big_integer()
.unique_key(),
).to_owned()
).await
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager.alter_table(
TableAlterStatement::new()
.table(Transaction::Table)
.drop_column(Transaction::IdentityHash)
.to_owned()
).await
}
}
#[derive(DeriveIden)]
enum Transaction {
Table,
IdentityHash,
}

View File

@ -7,6 +7,7 @@ use sea_orm::{
ConnectionTrait, DatabaseBackend, DatabaseTransaction, DbErr, QueryFilter, QueryTrait, ConnectionTrait, DatabaseBackend, DatabaseTransaction, DbErr, QueryFilter, QueryTrait,
Statement, Statement,
}; };
use crate::ingestion::ingestion_logic::MonzoRow;
pub struct Insertion { pub struct Insertion {
pub transaction: transaction::ActiveModel, pub transaction: transaction::ActiveModel,
@ -52,9 +53,13 @@ async fn update_expenditures(
// trying to move failures earlier and improve reporting. // trying to move failures earlier and improve reporting.
pub async fn insert( pub async fn insert(
db: &DatabaseConnection, db: &DatabaseConnection,
insertions: Vec<Insertion>, monzo_rows: Vec<MonzoRow>,
) -> Result<Vec<String>, AppError> { ) -> Result<Vec<String>, AppError> {
let mut new_transaction_ids = Vec::new(); let mut new_transaction_ids = Vec::new();
let insertions = monzo_rows
.into_iter()
.map(MonzoRow::into_insertion)
.collect::<Result<Vec<_>, _>>()?;
for insertions in insertions.chunks(400) { for insertions in insertions.chunks(400) {
let tx = db.begin().await?; let tx = db.begin().await?;
@ -77,10 +82,12 @@ async fn update_transactions(
insertions: &[Insertion], insertions: &[Insertion],
tx: &DatabaseTransaction, tx: &DatabaseTransaction,
) -> Result<Vec<String>, AppError> { ) -> Result<Vec<String>, AppError> {
let insert = let insert =
transaction::Entity::insert_many(insertions.iter().map(|i| &i.transaction).cloned()) transaction::Entity::insert_many(insertions.iter().map(|i| &i.transaction).cloned())
.on_conflict( .on_conflict(
OnConflict::column(transaction::Column::Id) OnConflict::columns([transaction::Column::Id, transaction::Column::IdentityHash])
.update_columns(transaction::Column::iter()) .update_columns(transaction::Column::iter())
.to_owned(), .to_owned(),
) )

View File

@ -1,13 +1,14 @@
use std::hash::Hash;
use crate::ingestion::db::Insertion; use crate::ingestion::db::Insertion;
use anyhow::Context; use anyhow::{anyhow, Context};
use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime}; use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime};
use csv::StringRecord; use csv::StringRecord;
use entity::expenditure::ActiveModel; use entity::expenditure::ActiveModel;
use entity::transaction; use entity::transaction;
use num_traits::FromPrimitive; use num_traits::FromPrimitive;
use sea_orm::prelude::Decimal; use sea_orm::prelude::Decimal;
use sea_orm::ActiveValue::*;
use sea_orm::IntoActiveModel; use sea_orm::IntoActiveModel;
use serde_json::Value;
#[allow(dead_code)] #[allow(dead_code)]
mod headings { mod headings {
@ -29,41 +30,110 @@ mod headings {
pub const CATEGORY_SPLIT: usize = 15; pub const CATEGORY_SPLIT: usize = 15;
} }
fn parse_section(monzo_transaction_id: &str, section: &str) -> anyhow::Result<ActiveModel> { #[derive(Debug, Eq, PartialEq, Hash)]
let mut components = section.split(':'); pub struct MonzoRow {
let category: String = components category_split: Option<String>,
.next() primary_category: String,
.context("Missing Missing category")? total_amount: Decimal,
.to_string(); receipt: Option<String>,
notes: Option<String>,
let amount = components emoji: Option<String>,
.next() description: Option<String>,
.context("Missing amount")? transaction_type: String,
.parse::<Decimal>()?; title: String,
timestamp: NaiveDateTime,
Ok(entity::expenditure::Model { transaction_id: String,
transaction_id: monzo_transaction_id.to_string(),
category,
amount,
}
.into_active_model())
} }
fn json_opt(value: &serde_json::Value) -> Option<String> { impl MonzoRow {
fn parse_section(monzo_transaction_id: &str, section: &str) -> anyhow::Result<ActiveModel> {
let mut components = section.split(':');
let category: String = components
.next()
.context("Missing Missing category")?
.to_string();
let amount = components
.next()
.context("Missing amount")?
.parse::<Decimal>()?;
Ok(entity::expenditure::Model {
transaction_id: monzo_transaction_id.to_string(),
category,
amount,
}.into_active_model())
}
/// Compute a hash of this row, returning the number as an i64 to be used as a unique constraint
/// in the database.
pub fn compute_hash(&self) -> i64 {
use std::collections::hash_map::DefaultHasher;
use std::hash::Hasher;
let mut hasher = DefaultHasher::new();
self.hash(&mut hasher);
hasher.finish() as i64
}
pub fn into_insertion(self) -> Result<Insertion, anyhow::Error> {
let expenditures: Vec<_> = match self.category_split {
Some(split) if !split.is_empty() => split
.split(',')
.map(|section| Self::parse_section(&self.transaction_id, section))
.collect::<Result<Vec<_>, anyhow::Error>>()?,
_ => vec![entity::expenditure::Model {
category: self.primary_category.clone(),
amount: self.total_amount,
transaction_id: self.transaction_id.clone(),
}
.into_active_model()],
};
Ok(Insertion {
transaction: transaction::Model {
id: self.transaction_id,
transaction_type: self.transaction_type,
timestamp: self.timestamp,
title: self.title,
emoji: self.emoji,
notes: self.notes,
receipt: self.receipt,
total_amount: self.total_amount,
description: self.description,
identity_hash: Some(self.compute_hash()),
}.into_active_model(),
contained_expenditures: expenditures,
})
}
}
fn json_opt(value: &Value) -> Option<String> {
match value { match value {
serde_json::Value::String(string) if string.is_empty() => None, Value::String(string) if string.is_empty() => None,
serde_json::Value::String(string) => Some(string.to_string()), Value::String(string) => Some(string.to_string()),
_ => None, _ => None,
} }
} }
pub fn from_json_row(row: Vec<serde_json::Value>) -> anyhow::Result<Insertion> { fn json_required_str(value: &Value, label: &str) -> anyhow::Result<String> {
use serde_json::Value; match value {
let monzo_transaction_id = row[headings::TRANSACTION_ID] Value::String(string) if string.is_empty() => Err(anyhow!("{} is empty", label)),
.as_str() Value::String(string) => Ok(string.to_string()),
.context("No transaction id")? _ => Err(anyhow!("{} is not a string", label)),
.to_string(); }
}
fn parse_timestamp(date: &str, time: &str) -> anyhow::Result<NaiveDateTime> {
let date = NaiveDate::parse_from_str(date, "%Y-%m-%d")?;
let time = NaiveTime::parse_from_str(time, "%H:%M:%S")?;
Ok(date.and_time(time))
}
pub fn from_json_row(row: Vec<Value>) -> anyhow::Result<MonzoRow> {
let date = DateTime::parse_from_rfc3339(row[headings::DATE].as_str().context("No date")?) let date = DateTime::parse_from_rfc3339(row[headings::DATE].as_str().context("No date")?)
.context("Failed to parse date")?; .context("Failed to parse date")?;
@ -73,54 +143,21 @@ pub fn from_json_row(row: Vec<serde_json::Value>) -> anyhow::Result<Insertion> {
let timestamp = date.date_naive().and_time(time); let timestamp = date.date_naive().and_time(time);
let title = row[headings::NAME]
.as_str()
.context("No title")?
.to_string();
let monzo_transaction_type = row[headings::TYPE]
.as_str()
.context("No transaction type")?
.to_string();
let description = json_opt(&row[headings::DESCRIPTION]);
let emoji = json_opt(&row[headings::EMOJI]);
let notes = json_opt(&row[headings::NOTES_AND_TAGS]);
let receipt = json_opt(&row[headings::RECEIPT]);
let total_amount = Decimal::from_f64(row[headings::AMOUNT].as_f64().context("No amount")?) let total_amount = Decimal::from_f64(row[headings::AMOUNT].as_f64().context("No amount")?)
.context("Failed to parse date")?; .context("Failed to parse date")?;
let expenditures: Vec<_> = match row.get(headings::CATEGORY_SPLIT) { Ok(MonzoRow {
Some(Value::String(split)) if !split.is_empty() => split transaction_id: json_required_str(&row[headings::TRANSACTION_ID], "Transaction ID")?,
.split(',') title: json_required_str(&row[headings::NAME], "Title")?,
.map(|section| parse_section(&monzo_transaction_id, section)) transaction_type: json_required_str(&row[headings::TYPE], "Transaction type")?,
.collect::<Result<Vec<_>, anyhow::Error>>()?, description: json_opt(&row[headings::DESCRIPTION]),
emoji: json_opt(&row[headings::EMOJI]),
_ => vec![entity::expenditure::Model { notes: json_opt(&row[headings::NOTES_AND_TAGS]),
category: row[headings::CATEGORY] receipt: json_opt(&row[headings::RECEIPT]),
.as_str() primary_category: json_required_str(&row[headings::CATEGORY], "Primary Category")?,
.context("No context")? category_split: json_opt(&row[headings::CATEGORY_SPLIT]),
.to_string(), total_amount,
amount: total_amount, timestamp,
transaction_id: monzo_transaction_id.clone(),
}
.into_active_model()],
};
Ok(Insertion {
transaction: transaction::ActiveModel {
id: Set(monzo_transaction_id),
transaction_type: Set(monzo_transaction_type),
timestamp: Set(timestamp),
title: Set(title),
emoji: Set(emoji),
notes: Set(notes),
receipt: Set(receipt),
total_amount: Set(total_amount),
description: Set(description),
},
contained_expenditures: expenditures,
}) })
} }
@ -131,9 +168,7 @@ fn csv_opt(s: &str) -> Option<String> {
} }
} }
pub fn from_csv_row(row: StringRecord) -> anyhow::Result<Insertion> { pub fn from_csv_row(row: StringRecord) -> anyhow::Result<MonzoRow> {
let monzo_transaction_id = row[headings::TRANSACTION_ID].to_string();
let date = NaiveDate::parse_from_str(&row[headings::DATE], "%d/%m/%Y") let date = NaiveDate::parse_from_str(&row[headings::DATE], "%d/%m/%Y")
.context("Failed to parse date from csv")?; .context("Failed to parse date from csv")?;
@ -142,41 +177,17 @@ pub fn from_csv_row(row: StringRecord) -> anyhow::Result<Insertion> {
let timestamp = NaiveDateTime::new(date, time); let timestamp = NaiveDateTime::new(date, time);
let title = row[headings::NAME].to_string(); Ok(MonzoRow {
let monzo_transaction_type = row[headings::TYPE].to_string(); timestamp,
transaction_id: row[headings::TRANSACTION_ID].to_string(),
let description = csv_opt(&row[headings::DESCRIPTION]); title: row[headings::NAME].to_string(),
let emoji = csv_opt(&row[headings::EMOJI]); transaction_type: row[headings::TYPE].to_string(),
let notes = csv_opt(&row[headings::NOTES_AND_TAGS]); description: csv_opt(&row[headings::DESCRIPTION]),
let receipt = csv_opt(&row[headings::RECEIPT]); emoji: csv_opt(&row[headings::EMOJI]),
let total_amount = row[headings::AMOUNT].parse::<Decimal>()?; notes: csv_opt(&row[headings::NOTES_AND_TAGS]),
receipt: csv_opt(&row[headings::RECEIPT]),
let expenditures: Vec<_> = match row.get(headings::CATEGORY_SPLIT) { total_amount: row[headings::AMOUNT].parse::<Decimal>()?,
Some(split) if !split.is_empty() => split category_split: csv_opt(&row[headings::CATEGORY_SPLIT]),
.split(',') primary_category: row[headings::CATEGORY].to_string(),
.map(|section| parse_section(&monzo_transaction_id, section))
.collect::<Result<Vec<_>, anyhow::Error>>()?,
_ => vec![entity::expenditure::Model {
transaction_id: monzo_transaction_id.clone(),
category: row[headings::CATEGORY].to_string(),
amount: total_amount,
}
.into_active_model()],
};
Ok(Insertion {
transaction: transaction::ActiveModel {
id: Set(monzo_transaction_id),
transaction_type: Set(monzo_transaction_type),
timestamp: Set(timestamp),
title: Set(title),
emoji: Set(emoji),
notes: Set(notes),
receipt: Set(receipt),
total_amount: Set(total_amount),
description: Set(description),
},
contained_expenditures: expenditures,
}) })
} }