Compare commits

..

No commits in common. "ca0df97e739b608d48bec691c1f665bd9ab65fae" and "35fd2b90d229f9a9132e5df776dd77d0704d88cb" have entirely different histories.

19 changed files with 802 additions and 716 deletions

2
.env
View File

@ -1 +1 @@
DATABASE_URL=postgres://postgres@localhost/monzo_development DATABASE_URL=postgres://postgres@localhost/logos_neu

1166
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -9,7 +9,7 @@ migration = { path = "migration" }
axum = { version = "0.7.5", features = ["multipart"] } axum = { version = "0.7.5", features = ["multipart"] }
tokio = { version = "1.37.0", features = ["full"] } tokio = { version = "1.37.0", features = ["full"] }
sea-orm = { version = "1.1.0", features = [ sea-orm = { version = "1.0.0", features = [
"sqlx-postgres", "sqlx-postgres",
"runtime-tokio-rustls", "runtime-tokio-rustls",
"macros" "macros"
@ -28,8 +28,8 @@ csv = "1.3.0"
clap = "4.5" clap = "4.5"
testcontainers = "0.21" testcontainers = "0.21"
testcontainers-modules = { version = "0.9", features = ["postgres"] } testcontainers-modules = { version = "0.9", features = ["postgres"] }
sqlx = { version = "0.8", features = ["postgres"] } sqlx = { version = "0.7", features = ["postgres"] }
tower-http = { version = "0.6", features = ["trace"] } tower-http = { version = "0.5", features = ["trace"] }
bytes = "1.7" bytes = "1.7"
once_cell = "1.19" once_cell = "1.19"

View File

@ -9,5 +9,5 @@ name = "entity"
path = "src/lib.rs" path = "src/lib.rs"
[dependencies] [dependencies]
sea-orm = { version = "1.1.0" } sea-orm = { version = "1.0.0-rc.4" }
serde = { version = "1.0.203", features = ["derive"] } serde = { version = "1.0.203", features = ["derive"] }

View File

@ -1,26 +0,0 @@
//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.0
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "account")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i32,
pub name: String,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(has_many = "super::transaction::Entity")]
Transaction,
}
impl Related<super::transaction::Entity> for Entity {
fn to() -> RelationDef {
Relation::Transaction.def()
}
}
impl ActiveModelBehavior for ActiveModel {}

View File

@ -1,4 +1,4 @@
//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.0 //! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2
use sea_orm::entity::prelude::*; use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -14,21 +14,6 @@ pub struct Model {
} }
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation { pub enum Relation {}
#[sea_orm(
belongs_to = "super::transaction::Entity",
from = "Column::TransactionId",
to = "super::transaction::Column::Id",
on_update = "NoAction",
on_delete = "Cascade"
)]
Transaction,
}
impl Related<super::transaction::Entity> for Entity {
fn to() -> RelationDef {
Relation::Transaction.def()
}
}
impl ActiveModelBehavior for ActiveModel {} impl ActiveModelBehavior for ActiveModel {}

View File

@ -1,7 +1,6 @@
//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.0 //! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2
pub mod prelude; pub mod prelude;
pub mod account;
pub mod expenditure; pub mod expenditure;
pub mod transaction; pub mod transaction;

View File

@ -1,5 +1,4 @@
//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.0 //! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2
pub use super::account::Entity as Account;
pub use super::expenditure::Entity as Expenditure; pub use super::expenditure::Entity as Expenditure;
pub use super::transaction::Entity as Transaction; pub use super::transaction::Entity as Transaction;

View File

@ -1,4 +1,4 @@
//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.0 //! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2
use sea_orm::entity::prelude::*; use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -18,33 +18,9 @@ pub struct Model {
pub description: Option<String>, pub description: Option<String>,
#[sea_orm(unique)] #[sea_orm(unique)]
pub identity_hash: Option<i64>, pub identity_hash: Option<i64>,
pub account_id: Option<i32>,
} }
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation { pub enum Relation {}
#[sea_orm(
belongs_to = "super::account::Entity",
from = "Column::AccountId",
to = "super::account::Column::Id",
on_update = "NoAction",
on_delete = "Cascade"
)]
Account,
#[sea_orm(has_many = "super::expenditure::Entity")]
Expenditure,
}
impl Related<super::account::Entity> for Entity {
fn to() -> RelationDef {
Relation::Account.def()
}
}
impl Related<super::expenditure::Entity> for Entity {
fn to() -> RelationDef {
Relation::Expenditure.def()
}
}
impl ActiveModelBehavior for ActiveModel {} impl ActiveModelBehavior for ActiveModel {}

View File

@ -12,7 +12,7 @@ path = "src/lib.rs"
async-std = { version = "1", features = ["attributes", "tokio1"] } async-std = { version = "1", features = ["attributes", "tokio1"] }
[dependencies.sea-orm-migration] [dependencies.sea-orm-migration]
version = "1.1.0" version = "1.0.0-rc.4"
features = [ features = [
"runtime-tokio-rustls", # `ASYNC_RUNTIME` feature "runtime-tokio-rustls", # `ASYNC_RUNTIME` feature
"sqlx-postgres", # `DATABASE_DRIVER` feature "sqlx-postgres", # `DATABASE_DRIVER` feature

View File

@ -3,9 +3,6 @@ pub use sea_orm_migration::prelude::*;
pub mod m20230904_141851_create_monzo_tables; pub mod m20230904_141851_create_monzo_tables;
mod m20240529_195030_add_transaction_identity_hash; mod m20240529_195030_add_transaction_identity_hash;
mod m20240603_162500_make_title_optional; mod m20240603_162500_make_title_optional;
mod m20241015_195220_add_account_to_transactions;
mod m20241015_200222_add_expenditure_transaction_fk;
mod m20241015_200652_add_accounts;
pub struct Migrator; pub struct Migrator;
@ -20,9 +17,6 @@ impl MigratorTrait for Migrator {
Box::new(m20230904_141851_create_monzo_tables::Migration), Box::new(m20230904_141851_create_monzo_tables::Migration),
Box::new(m20240529_195030_add_transaction_identity_hash::Migration), Box::new(m20240529_195030_add_transaction_identity_hash::Migration),
Box::new(m20240603_162500_make_title_optional::Migration), Box::new(m20240603_162500_make_title_optional::Migration),
Box::new(m20241015_195220_add_account_to_transactions::Migration),
Box::new(m20241015_200222_add_expenditure_transaction_fk::Migration),
Box::new(m20241015_200652_add_accounts::Migration),
] ]
} }
} }

View File

@ -1,76 +0,0 @@
use sea_orm_migration::{prelude::*, schema::*};
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_table(
Table::create()
.table(Account::Table)
.if_not_exists()
.col(
ColumnDef::new(Account::Id)
.integer()
.auto_increment()
.not_null()
.primary_key(),
)
.col(
ColumnDef::new(Account::Name)
.string()
.not_null(),
)
.to_owned(),
)
.await?;
manager.alter_table(
TableAlterStatement::new()
.table(Transaction::Table)
.add_column(ColumnDef::new(Transaction::AccountId).integer())
.to_owned(),
).await?;
manager
.create_foreign_key(
ForeignKey::create()
.name("fk_transaction_account_id")
.from(Transaction::Table, Transaction::AccountId)
.to(Account::Table, Account::Id)
.on_delete(ForeignKeyAction::Cascade)
.to_owned(),
)
.await?;
Ok(())
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager.alter_table(
TableAlterStatement::new()
.table(Transaction::Table)
.drop_column(Transaction::AccountId)
.to_owned(),
).await?;
manager.drop_table(Table::drop().table(Account::Table).to_owned()).await?;
Ok(())
}
}
#[derive(DeriveIden)]
enum Account {
Table,
Id,
Name,
}
#[derive(DeriveIden)]
enum Transaction {
Table,
AccountId,
}

View File

@ -1,40 +0,0 @@
use sea_orm_migration::{prelude::*, schema::*};
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_foreign_key(
ForeignKey::create()
.name("fk_expenditure_transaction_id")
.from(Expenditure::Table, Expenditure::TransactionId)
.to(Transaction::Table, Transaction::Id)
.on_delete(ForeignKeyAction::Cascade)
.to_owned(),
)
.await?;
Ok(())
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager.drop_foreign_key(ForeignKey::drop().name("fk_expenditure_transaction_id").to_owned()).await?;
Ok(())
}
}
#[derive(DeriveIden)]
enum Transaction {
Table,
Id,
}
#[derive(DeriveIden)]
enum Expenditure {
Table,
TransactionId,
}

View File

@ -1,53 +0,0 @@
use sea_orm_migration::{prelude::*, schema::*};
use crate::sea_orm::sqlx;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager.exec_stmt(
InsertStatement::new()
.into_table(Account::Table)
.columns(vec![Account::Name])
.values_panic(["Monzo".into()])
.values_panic(["Flex".into()])
.to_owned()
).await?;
let id: i32 = manager.get_connection().query_one(
sea_orm::Statement::from_string(
manager.get_database_backend(),
"SELECT id FROM account WHERE name = 'Monzo'",
)
).await?.expect("Monzo account not found").try_get_by_index(0)?;
manager.exec_stmt(
UpdateStatement::new()
.table(Transaction::Table)
.values([(Transaction::AccountId, id.into())])
.to_owned()
).await?;
Ok(())
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
// This is a data creation migration, so we do not roll back the migration
Ok(())
}
}
#[derive(DeriveIden)]
enum Account {
Table,
Id,
Name,
}
#[derive(DeriveIden)]
enum Transaction {
Table,
AccountId,
}

View File

@ -20,15 +20,14 @@ pub struct Insertion {
pub async fn insert( pub async fn insert(
db: &DatabaseConnection, db: &DatabaseConnection,
monzo_rows: Vec<MonzoRow>, monzo_rows: Vec<MonzoRow>,
account_id: i32,
) -> Result<Vec<String>, AppError> { ) -> Result<Vec<String>, AppError> {
let mut new_transaction_ids = Vec::new(); let mut new_transaction_ids = Vec::new();
let insertions = monzo_rows let insertions = monzo_rows
.into_iter() .into_iter()
.map(|row| MonzoRow::into_insertion(row, account_id)) .map(MonzoRow::into_insertion)
.collect::<Result<Vec<_>, _>>()?; .collect::<Result<Vec<_>, _>>()?;
for insertions in insertions.chunks(200) { for insertions in insertions.chunks(400) {
let (new_or_updated_insertions, inserted_transaction_ids) = let (new_or_updated_insertions, inserted_transaction_ids) =
whittle_insertions(insertions, db).await?; whittle_insertions(insertions, db).await?;
@ -264,7 +263,7 @@ mod tests {
.collect::<Result<Vec<_>, anyhow::Error>>() .collect::<Result<Vec<_>, anyhow::Error>>()
.unwrap(); .unwrap();
insert(&dbi.db, data.clone(), ).await?; insert(&dbi.db, data.clone()).await?;
let notification = listener.recv().await?; let notification = listener.recv().await?;
let payload = notification.payload(); let payload = notification.payload();
let mut payload = serde_json::from_str::<Vec<String>>(&payload)?; let mut payload = serde_json::from_str::<Vec<String>>(&payload)?;
@ -278,7 +277,7 @@ mod tests {
assert_eq!(payload, ids, "Inserted IDs do not match"); assert_eq!(payload, ids, "Inserted IDs do not match");
insert(&dbi.db, data.clone(), ).await?; insert(&dbi.db, data.clone()).await?;
let notification = listener.recv().await?; let notification = listener.recv().await?;
let payload = notification.payload(); let payload = notification.payload();
let payload = serde_json::from_str::<Vec<String>>(&payload)?; let payload = serde_json::from_str::<Vec<String>>(&payload)?;
@ -289,7 +288,7 @@ mod tests {
assert_ne!(altered_data[0].compute_hash(), data[0].compute_hash(), "Alterations have the same hash"); assert_ne!(altered_data[0].compute_hash(), data[0].compute_hash(), "Alterations have the same hash");
insert(&dbi.db, altered_data.clone(), ).await?; insert(&dbi.db, altered_data.clone()).await?;
let notification = listener.recv().await?; let notification = listener.recv().await?;
let payload = notification.payload(); let payload = notification.payload();
let payload = serde_json::from_str::<Vec<String>>(&payload)?; let payload = serde_json::from_str::<Vec<String>>(&payload)?;
@ -298,15 +297,3 @@ mod tests {
Ok(()) Ok(())
} }
} }
pub(crate) async fn get_account_id(p0: &DatabaseConnection, p1: Option<String>) -> Result<i32, AppError> {
let p1 = p1.unwrap_or("Monzo".to_string());
entity::prelude::Account::find()
.filter(entity::account::Column::Name.eq(p1))
.select_only()
.column(entity::account::Column::Id)
.into_tuple::<i32>()
.one(p0).await?
.ok_or(AppError::BadRequest(anyhow!("Account not found")))
}

View File

@ -1,9 +1,9 @@
#[allow(dead_code)] #[allow(dead_code)]
pub mod headings { mod headings {
#[allow(unused_imports)] #[allow(unused_imports)]
pub use super::super::ingestion_logic::headings::*; pub use super::super::ingestion_logic::headings::*;
// Additional Flex headings // Additional FLex headings
pub const MONEY_OUT: usize = 16; pub const MONEY_OUT: usize = 16;
pub const MONEY_IN: usize = 17; pub const MONEY_IN: usize = 17;
} }

View File

@ -9,7 +9,6 @@ use sea_orm::prelude::Decimal;
use sea_orm::IntoActiveModel; use sea_orm::IntoActiveModel;
use serde_json::Value; use serde_json::Value;
use std::hash::Hash; use std::hash::Hash;
use crate::ingestion::flex;
#[allow(dead_code)] #[allow(dead_code)]
pub(crate) mod headings { pub(crate) mod headings {
@ -78,7 +77,7 @@ impl MonzoRow {
hasher.finish() as i64 hasher.finish() as i64
} }
pub fn into_insertion(self, account_id: i32) -> Result<Insertion, anyhow::Error> { pub fn into_insertion(self) -> Result<Insertion, anyhow::Error> {
let identity_hash = self.compute_hash(); let identity_hash = self.compute_hash();
let expenditures: Vec<_> = match &self.category_split { let expenditures: Vec<_> = match &self.category_split {
@ -107,7 +106,6 @@ impl MonzoRow {
total_amount: self.total_amount, total_amount: self.total_amount,
description: self.description, description: self.description,
identity_hash: Some(identity_hash), identity_hash: Some(identity_hash),
account_id: Some(account_id),
} }
.into_active_model(), .into_active_model(),

View File

@ -19,52 +19,36 @@ pub async fn monzo_batched_json(
.map(from_json_row) .map(from_json_row)
.collect::<Result<_, _>>()?; .collect::<Result<_, _>>()?;
// We default to the main account for JSON ingestion for now. db::insert(&db, data).await?;
let account_id = db::get_account_id(&db, None).await?;
db::insert(&db, data, account_id).await?;
Ok("Ok") Ok("Ok")
} }
async fn extract_csv_and_account_name(mut multipart: Multipart) -> Result<(Option<Bytes>, Option<String>), MultipartError> { async fn extract_csv(mut multipart: Multipart) -> Result<Option<Bytes>, MultipartError> {
let mut csv = None; let csv = loop {
let mut account_name = None; match multipart.next_field().await? {
Some(field) if field.name() == Some("csv") => {
while let Some(field) = multipart.next_field().await? { break Some(field.bytes().await?);
match field.name() {
Some("csv") => {
csv = Some(field.bytes().await?);
} }
Some("account_id") => { Some(_) => {}
account_name = Some(field.text().await?); None => break None,
} }
};
_ => {} Ok(csv)
}
if csv.is_some() && account_name.is_some() {
break;
}
}
Ok((csv, account_name))
} }
pub async fn monzo_batched_csv( pub async fn monzo_batched_csv(
Extension(db): Extension<DatabaseConnection>, Extension(db): Extension<DatabaseConnection>,
multipart: Multipart, multipart: Multipart,
) -> Result<&'static str, AppError> { ) -> Result<&'static str, AppError> {
static CSV_MISSING_ERR_MSG: &str = "No CSV file provided. Expected a multipart request with a `csv` field containing the contents of the CSV."; static CSV_MISSING_ERR_MSG: &str = "No CSV file provided. Expected a multipart request with a `csv` field containing the contents of the CSV.";
let (csv, account_name) = extract_csv_and_account_name(multipart) let csv = extract_csv(multipart)
.await .await
.map_err(|e| AppError::BadRequest(anyhow!(e)))?; .map_err(|e| AppError::BadRequest(anyhow!(e)))
.and_then(|csv| csv.ok_or(AppError::BadRequest(anyhow!(CSV_MISSING_ERR_MSG))))?;
let Some(csv) = csv else {
return Err(AppError::BadRequest(anyhow!(CSV_MISSING_ERR_MSG)));
};
let account_id = db::get_account_id(&db, account_name).await?;
let csv = Cursor::new(csv); let csv = Cursor::new(csv);
let mut csv = csv::Reader::from_reader(csv); let mut csv = csv::Reader::from_reader(csv);
@ -74,7 +58,7 @@ pub async fn monzo_batched_csv(
.map(from_csv_row) .map(from_csv_row)
.collect::<Result<_, _>>()?; .collect::<Result<_, _>>()?;
db::insert(&db, data, account_id).await?; db::insert(&db, data).await?;
Ok("Ok") Ok("Ok")
} }

View File

@ -44,10 +44,6 @@ enum Commands {
Csv { Csv {
/// The path of the CSV file to ingest. /// The path of the CSV file to ingest.
csv_file: PathBuf, csv_file: PathBuf,
/// The name of the account to ingest the CSV for.
#[clap(long, short)]
account: String,
}, },
} }
@ -98,7 +94,7 @@ async fn main() -> anyhow::Result<()> {
serve_web(addr, connection).await?; serve_web(addr, connection).await?;
} }
Commands::Csv { csv_file, account: account_name } => { Commands::Csv { csv_file } => {
let mut csv = csv::Reader::from_reader(File::open(csv_file)?); let mut csv = csv::Reader::from_reader(File::open(csv_file)?);
let data = csv.records(); let data = csv.records();
let data = data let data = data
@ -106,8 +102,7 @@ async fn main() -> anyhow::Result<()> {
.map(from_csv_row) .map(from_csv_row)
.collect::<Result<_, _>>()?; .collect::<Result<_, _>>()?;
let account_id = db::get_account_id(&connection, Some(account_name)).await?; db::insert(&connection, data).await?;
db::insert(&connection, data, account_id).await?;
} }
} }