Compare commits
2 Commits
89d0d12e26
...
3df05b2d9c
| Author | SHA1 | Date | |
|---|---|---|---|
| 3df05b2d9c | |||
| f19f861297 |
@ -16,6 +16,8 @@ pub struct Model {
|
|||||||
pub notes: Option<String>,
|
pub notes: Option<String>,
|
||||||
pub receipt: Option<String>,
|
pub receipt: Option<String>,
|
||||||
pub description: Option<String>,
|
pub description: Option<String>,
|
||||||
|
#[sea_orm(unique)]
|
||||||
|
pub identity_hash: Option<i64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
|
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
|
||||||
|
|||||||
@ -1,6 +1,7 @@
|
|||||||
pub use sea_orm_migration::prelude::*;
|
pub use sea_orm_migration::prelude::*;
|
||||||
|
|
||||||
pub mod m20230904_141851_create_monzo_tables;
|
pub mod m20230904_141851_create_monzo_tables;
|
||||||
|
mod m20240529_195030_add_transaction_identity_hash;
|
||||||
|
|
||||||
pub struct Migrator;
|
pub struct Migrator;
|
||||||
|
|
||||||
@ -11,6 +12,9 @@ impl MigratorTrait for Migrator {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn migrations() -> Vec<Box<dyn MigrationTrait>> {
|
fn migrations() -> Vec<Box<dyn MigrationTrait>> {
|
||||||
vec![Box::new(m20230904_141851_create_monzo_tables::Migration)]
|
vec![
|
||||||
|
Box::new(m20230904_141851_create_monzo_tables::Migration),
|
||||||
|
Box::new(m20240529_195030_add_transaction_identity_hash::Migration),
|
||||||
|
]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -0,0 +1,34 @@
|
|||||||
|
use sea_orm_migration::prelude::*;
|
||||||
|
|
||||||
|
#[derive(DeriveMigrationName)]
|
||||||
|
pub struct Migration;
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl MigrationTrait for Migration {
|
||||||
|
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||||
|
manager.alter_table(
|
||||||
|
TableAlterStatement::new()
|
||||||
|
.table(Transaction::Table)
|
||||||
|
.add_column(
|
||||||
|
ColumnDef::new(Transaction::IdentityHash)
|
||||||
|
.big_integer()
|
||||||
|
.unique_key(),
|
||||||
|
).to_owned()
|
||||||
|
).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||||
|
manager.alter_table(
|
||||||
|
TableAlterStatement::new()
|
||||||
|
.table(Transaction::Table)
|
||||||
|
.drop_column(Transaction::IdentityHash)
|
||||||
|
.to_owned()
|
||||||
|
).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(DeriveIden)]
|
||||||
|
enum Transaction {
|
||||||
|
Table,
|
||||||
|
IdentityHash,
|
||||||
|
}
|
||||||
@ -7,6 +7,7 @@ use sea_orm::{
|
|||||||
ConnectionTrait, DatabaseBackend, DatabaseTransaction, DbErr, QueryFilter, QueryTrait,
|
ConnectionTrait, DatabaseBackend, DatabaseTransaction, DbErr, QueryFilter, QueryTrait,
|
||||||
Statement,
|
Statement,
|
||||||
};
|
};
|
||||||
|
use crate::ingestion::ingestion_logic::MonzoRow;
|
||||||
|
|
||||||
pub struct Insertion {
|
pub struct Insertion {
|
||||||
pub transaction: transaction::ActiveModel,
|
pub transaction: transaction::ActiveModel,
|
||||||
@ -52,9 +53,13 @@ async fn update_expenditures(
|
|||||||
// trying to move failures earlier and improve reporting.
|
// trying to move failures earlier and improve reporting.
|
||||||
pub async fn insert(
|
pub async fn insert(
|
||||||
db: &DatabaseConnection,
|
db: &DatabaseConnection,
|
||||||
insertions: Vec<Insertion>,
|
monzo_rows: Vec<MonzoRow>,
|
||||||
) -> Result<Vec<String>, AppError> {
|
) -> Result<Vec<String>, AppError> {
|
||||||
let mut new_transaction_ids = Vec::new();
|
let mut new_transaction_ids = Vec::new();
|
||||||
|
let insertions = monzo_rows
|
||||||
|
.into_iter()
|
||||||
|
.map(MonzoRow::into_insertion)
|
||||||
|
.collect::<Result<Vec<_>, _>>()?;
|
||||||
|
|
||||||
for insertions in insertions.chunks(400) {
|
for insertions in insertions.chunks(400) {
|
||||||
let tx = db.begin().await?;
|
let tx = db.begin().await?;
|
||||||
@ -77,10 +82,12 @@ async fn update_transactions(
|
|||||||
insertions: &[Insertion],
|
insertions: &[Insertion],
|
||||||
tx: &DatabaseTransaction,
|
tx: &DatabaseTransaction,
|
||||||
) -> Result<Vec<String>, AppError> {
|
) -> Result<Vec<String>, AppError> {
|
||||||
|
|
||||||
|
|
||||||
let insert =
|
let insert =
|
||||||
transaction::Entity::insert_many(insertions.iter().map(|i| &i.transaction).cloned())
|
transaction::Entity::insert_many(insertions.iter().map(|i| &i.transaction).cloned())
|
||||||
.on_conflict(
|
.on_conflict(
|
||||||
OnConflict::column(transaction::Column::Id)
|
OnConflict::columns([transaction::Column::Id, transaction::Column::IdentityHash])
|
||||||
.update_columns(transaction::Column::iter())
|
.update_columns(transaction::Column::iter())
|
||||||
.to_owned(),
|
.to_owned(),
|
||||||
)
|
)
|
||||||
|
|||||||
@ -1,13 +1,14 @@
|
|||||||
|
use std::hash::Hash;
|
||||||
use crate::ingestion::db::Insertion;
|
use crate::ingestion::db::Insertion;
|
||||||
use anyhow::Context;
|
use anyhow::{anyhow, Context};
|
||||||
use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime};
|
use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime};
|
||||||
use csv::StringRecord;
|
use csv::StringRecord;
|
||||||
use entity::expenditure::ActiveModel;
|
use entity::expenditure::ActiveModel;
|
||||||
use entity::transaction;
|
use entity::transaction;
|
||||||
use num_traits::FromPrimitive;
|
use num_traits::FromPrimitive;
|
||||||
use sea_orm::prelude::Decimal;
|
use sea_orm::prelude::Decimal;
|
||||||
use sea_orm::ActiveValue::*;
|
|
||||||
use sea_orm::IntoActiveModel;
|
use sea_orm::IntoActiveModel;
|
||||||
|
use serde_json::Value;
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
mod headings {
|
mod headings {
|
||||||
@ -29,7 +30,23 @@ mod headings {
|
|||||||
pub const CATEGORY_SPLIT: usize = 15;
|
pub const CATEGORY_SPLIT: usize = 15;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn parse_section(monzo_transaction_id: &str, section: &str) -> anyhow::Result<ActiveModel> {
|
#[derive(Debug, Eq, PartialEq, Hash)]
|
||||||
|
pub struct MonzoRow {
|
||||||
|
category_split: Option<String>,
|
||||||
|
primary_category: String,
|
||||||
|
total_amount: Decimal,
|
||||||
|
receipt: Option<String>,
|
||||||
|
notes: Option<String>,
|
||||||
|
emoji: Option<String>,
|
||||||
|
description: Option<String>,
|
||||||
|
transaction_type: String,
|
||||||
|
title: String,
|
||||||
|
timestamp: NaiveDateTime,
|
||||||
|
transaction_id: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MonzoRow {
|
||||||
|
fn parse_section(monzo_transaction_id: &str, section: &str) -> anyhow::Result<ActiveModel> {
|
||||||
let mut components = section.split(':');
|
let mut components = section.split(':');
|
||||||
let category: String = components
|
let category: String = components
|
||||||
.next()
|
.next()
|
||||||
@ -45,25 +62,78 @@ fn parse_section(monzo_transaction_id: &str, section: &str) -> anyhow::Result<Ac
|
|||||||
transaction_id: monzo_transaction_id.to_string(),
|
transaction_id: monzo_transaction_id.to_string(),
|
||||||
category,
|
category,
|
||||||
amount,
|
amount,
|
||||||
|
}.into_active_model())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Compute a hash of this row, returning the number as an i64 to be used as a unique constraint
|
||||||
|
/// in the database.
|
||||||
|
pub fn compute_hash(&self) -> i64 {
|
||||||
|
use std::collections::hash_map::DefaultHasher;
|
||||||
|
use std::hash::Hasher;
|
||||||
|
|
||||||
|
let mut hasher = DefaultHasher::new();
|
||||||
|
self.hash(&mut hasher);
|
||||||
|
hasher.finish() as i64
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn into_insertion(self) -> Result<Insertion, anyhow::Error> {
|
||||||
|
let expenditures: Vec<_> = match self.category_split {
|
||||||
|
Some(split) if !split.is_empty() => split
|
||||||
|
.split(',')
|
||||||
|
.map(|section| Self::parse_section(&self.transaction_id, section))
|
||||||
|
.collect::<Result<Vec<_>, anyhow::Error>>()?,
|
||||||
|
|
||||||
|
_ => vec![entity::expenditure::Model {
|
||||||
|
category: self.primary_category.clone(),
|
||||||
|
amount: self.total_amount,
|
||||||
|
transaction_id: self.transaction_id.clone(),
|
||||||
|
}
|
||||||
|
.into_active_model()],
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(Insertion {
|
||||||
|
transaction: transaction::Model {
|
||||||
|
id: self.transaction_id,
|
||||||
|
transaction_type: self.transaction_type,
|
||||||
|
timestamp: self.timestamp,
|
||||||
|
title: self.title,
|
||||||
|
emoji: self.emoji,
|
||||||
|
notes: self.notes,
|
||||||
|
receipt: self.receipt,
|
||||||
|
total_amount: self.total_amount,
|
||||||
|
description: self.description,
|
||||||
|
identity_hash: Some(self.compute_hash()),
|
||||||
|
}.into_active_model(),
|
||||||
|
|
||||||
|
contained_expenditures: expenditures,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
.into_active_model())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn json_opt(value: &serde_json::Value) -> Option<String> {
|
fn json_opt(value: &Value) -> Option<String> {
|
||||||
match value {
|
match value {
|
||||||
serde_json::Value::String(string) if string.is_empty() => None,
|
Value::String(string) if string.is_empty() => None,
|
||||||
serde_json::Value::String(string) => Some(string.to_string()),
|
Value::String(string) => Some(string.to_string()),
|
||||||
_ => None,
|
_ => None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn from_json_row(row: Vec<serde_json::Value>) -> anyhow::Result<Insertion> {
|
fn json_required_str(value: &Value, label: &str) -> anyhow::Result<String> {
|
||||||
use serde_json::Value;
|
match value {
|
||||||
let monzo_transaction_id = row[headings::TRANSACTION_ID]
|
Value::String(string) if string.is_empty() => Err(anyhow!("{} is empty", label)),
|
||||||
.as_str()
|
Value::String(string) => Ok(string.to_string()),
|
||||||
.context("No transaction id")?
|
_ => Err(anyhow!("{} is not a string", label)),
|
||||||
.to_string();
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn parse_timestamp(date: &str, time: &str) -> anyhow::Result<NaiveDateTime> {
|
||||||
|
let date = NaiveDate::parse_from_str(date, "%Y-%m-%d")?;
|
||||||
|
let time = NaiveTime::parse_from_str(time, "%H:%M:%S")?;
|
||||||
|
|
||||||
|
Ok(date.and_time(time))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn from_json_row(row: Vec<Value>) -> anyhow::Result<MonzoRow> {
|
||||||
let date = DateTime::parse_from_rfc3339(row[headings::DATE].as_str().context("No date")?)
|
let date = DateTime::parse_from_rfc3339(row[headings::DATE].as_str().context("No date")?)
|
||||||
.context("Failed to parse date")?;
|
.context("Failed to parse date")?;
|
||||||
|
|
||||||
@ -73,54 +143,21 @@ pub fn from_json_row(row: Vec<serde_json::Value>) -> anyhow::Result<Insertion> {
|
|||||||
|
|
||||||
let timestamp = date.date_naive().and_time(time);
|
let timestamp = date.date_naive().and_time(time);
|
||||||
|
|
||||||
let title = row[headings::NAME]
|
|
||||||
.as_str()
|
|
||||||
.context("No title")?
|
|
||||||
.to_string();
|
|
||||||
|
|
||||||
let monzo_transaction_type = row[headings::TYPE]
|
|
||||||
.as_str()
|
|
||||||
.context("No transaction type")?
|
|
||||||
.to_string();
|
|
||||||
|
|
||||||
let description = json_opt(&row[headings::DESCRIPTION]);
|
|
||||||
let emoji = json_opt(&row[headings::EMOJI]);
|
|
||||||
let notes = json_opt(&row[headings::NOTES_AND_TAGS]);
|
|
||||||
let receipt = json_opt(&row[headings::RECEIPT]);
|
|
||||||
let total_amount = Decimal::from_f64(row[headings::AMOUNT].as_f64().context("No amount")?)
|
let total_amount = Decimal::from_f64(row[headings::AMOUNT].as_f64().context("No amount")?)
|
||||||
.context("Failed to parse date")?;
|
.context("Failed to parse date")?;
|
||||||
|
|
||||||
let expenditures: Vec<_> = match row.get(headings::CATEGORY_SPLIT) {
|
Ok(MonzoRow {
|
||||||
Some(Value::String(split)) if !split.is_empty() => split
|
transaction_id: json_required_str(&row[headings::TRANSACTION_ID], "Transaction ID")?,
|
||||||
.split(',')
|
title: json_required_str(&row[headings::NAME], "Title")?,
|
||||||
.map(|section| parse_section(&monzo_transaction_id, section))
|
transaction_type: json_required_str(&row[headings::TYPE], "Transaction type")?,
|
||||||
.collect::<Result<Vec<_>, anyhow::Error>>()?,
|
description: json_opt(&row[headings::DESCRIPTION]),
|
||||||
|
emoji: json_opt(&row[headings::EMOJI]),
|
||||||
_ => vec![entity::expenditure::Model {
|
notes: json_opt(&row[headings::NOTES_AND_TAGS]),
|
||||||
category: row[headings::CATEGORY]
|
receipt: json_opt(&row[headings::RECEIPT]),
|
||||||
.as_str()
|
primary_category: json_required_str(&row[headings::CATEGORY], "Primary Category")?,
|
||||||
.context("No context")?
|
category_split: json_opt(&row[headings::CATEGORY_SPLIT]),
|
||||||
.to_string(),
|
total_amount,
|
||||||
amount: total_amount,
|
timestamp,
|
||||||
transaction_id: monzo_transaction_id.clone(),
|
|
||||||
}
|
|
||||||
.into_active_model()],
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(Insertion {
|
|
||||||
transaction: transaction::ActiveModel {
|
|
||||||
id: Set(monzo_transaction_id),
|
|
||||||
transaction_type: Set(monzo_transaction_type),
|
|
||||||
timestamp: Set(timestamp),
|
|
||||||
title: Set(title),
|
|
||||||
emoji: Set(emoji),
|
|
||||||
notes: Set(notes),
|
|
||||||
receipt: Set(receipt),
|
|
||||||
total_amount: Set(total_amount),
|
|
||||||
description: Set(description),
|
|
||||||
},
|
|
||||||
|
|
||||||
contained_expenditures: expenditures,
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -131,9 +168,7 @@ fn csv_opt(s: &str) -> Option<String> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn from_csv_row(row: StringRecord) -> anyhow::Result<Insertion> {
|
pub fn from_csv_row(row: StringRecord) -> anyhow::Result<MonzoRow> {
|
||||||
let monzo_transaction_id = row[headings::TRANSACTION_ID].to_string();
|
|
||||||
|
|
||||||
let date = NaiveDate::parse_from_str(&row[headings::DATE], "%d/%m/%Y")
|
let date = NaiveDate::parse_from_str(&row[headings::DATE], "%d/%m/%Y")
|
||||||
.context("Failed to parse date from csv")?;
|
.context("Failed to parse date from csv")?;
|
||||||
|
|
||||||
@ -142,41 +177,17 @@ pub fn from_csv_row(row: StringRecord) -> anyhow::Result<Insertion> {
|
|||||||
|
|
||||||
let timestamp = NaiveDateTime::new(date, time);
|
let timestamp = NaiveDateTime::new(date, time);
|
||||||
|
|
||||||
let title = row[headings::NAME].to_string();
|
Ok(MonzoRow {
|
||||||
let monzo_transaction_type = row[headings::TYPE].to_string();
|
timestamp,
|
||||||
|
transaction_id: row[headings::TRANSACTION_ID].to_string(),
|
||||||
let description = csv_opt(&row[headings::DESCRIPTION]);
|
title: row[headings::NAME].to_string(),
|
||||||
let emoji = csv_opt(&row[headings::EMOJI]);
|
transaction_type: row[headings::TYPE].to_string(),
|
||||||
let notes = csv_opt(&row[headings::NOTES_AND_TAGS]);
|
description: csv_opt(&row[headings::DESCRIPTION]),
|
||||||
let receipt = csv_opt(&row[headings::RECEIPT]);
|
emoji: csv_opt(&row[headings::EMOJI]),
|
||||||
let total_amount = row[headings::AMOUNT].parse::<Decimal>()?;
|
notes: csv_opt(&row[headings::NOTES_AND_TAGS]),
|
||||||
|
receipt: csv_opt(&row[headings::RECEIPT]),
|
||||||
let expenditures: Vec<_> = match row.get(headings::CATEGORY_SPLIT) {
|
total_amount: row[headings::AMOUNT].parse::<Decimal>()?,
|
||||||
Some(split) if !split.is_empty() => split
|
category_split: csv_opt(&row[headings::CATEGORY_SPLIT]),
|
||||||
.split(',')
|
primary_category: row[headings::CATEGORY].to_string(),
|
||||||
.map(|section| parse_section(&monzo_transaction_id, section))
|
|
||||||
.collect::<Result<Vec<_>, anyhow::Error>>()?,
|
|
||||||
|
|
||||||
_ => vec![entity::expenditure::Model {
|
|
||||||
transaction_id: monzo_transaction_id.clone(),
|
|
||||||
category: row[headings::CATEGORY].to_string(),
|
|
||||||
amount: total_amount,
|
|
||||||
}
|
|
||||||
.into_active_model()],
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(Insertion {
|
|
||||||
transaction: transaction::ActiveModel {
|
|
||||||
id: Set(monzo_transaction_id),
|
|
||||||
transaction_type: Set(monzo_transaction_type),
|
|
||||||
timestamp: Set(timestamp),
|
|
||||||
title: Set(title),
|
|
||||||
emoji: Set(emoji),
|
|
||||||
notes: Set(notes),
|
|
||||||
receipt: Set(receipt),
|
|
||||||
total_amount: Set(total_amount),
|
|
||||||
description: Set(description),
|
|
||||||
},
|
|
||||||
contained_expenditures: expenditures,
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user