Compare commits
5 Commits
35fd2b90d2
...
ca0df97e73
| Author | SHA1 | Date | |
|---|---|---|---|
| ca0df97e73 | |||
| 5c3f734bbc | |||
| e3ed72c9b0 | |||
| 3c3b6dc4e6 | |||
| c3796720b7 |
2
.env
2
.env
@ -1 +1 @@
|
||||
DATABASE_URL=postgres://postgres@localhost/logos_neu
|
||||
DATABASE_URL=postgres://postgres@localhost/monzo_development
|
||||
|
||||
1158
Cargo.lock
generated
1158
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@ -9,7 +9,7 @@ migration = { path = "migration" }
|
||||
|
||||
axum = { version = "0.7.5", features = ["multipart"] }
|
||||
tokio = { version = "1.37.0", features = ["full"] }
|
||||
sea-orm = { version = "1.0.0", features = [
|
||||
sea-orm = { version = "1.1.0", features = [
|
||||
"sqlx-postgres",
|
||||
"runtime-tokio-rustls",
|
||||
"macros"
|
||||
@ -28,8 +28,8 @@ csv = "1.3.0"
|
||||
clap = "4.5"
|
||||
testcontainers = "0.21"
|
||||
testcontainers-modules = { version = "0.9", features = ["postgres"] }
|
||||
sqlx = { version = "0.7", features = ["postgres"] }
|
||||
tower-http = { version = "0.5", features = ["trace"] }
|
||||
sqlx = { version = "0.8", features = ["postgres"] }
|
||||
tower-http = { version = "0.6", features = ["trace"] }
|
||||
bytes = "1.7"
|
||||
once_cell = "1.19"
|
||||
|
||||
|
||||
@ -9,5 +9,5 @@ name = "entity"
|
||||
path = "src/lib.rs"
|
||||
|
||||
[dependencies]
|
||||
sea-orm = { version = "1.0.0-rc.4" }
|
||||
sea-orm = { version = "1.1.0" }
|
||||
serde = { version = "1.0.203", features = ["derive"] }
|
||||
|
||||
26
entity/src/account.rs
Normal file
26
entity/src/account.rs
Normal file
@ -0,0 +1,26 @@
|
||||
//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.0
|
||||
|
||||
use sea_orm::entity::prelude::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
|
||||
#[sea_orm(table_name = "account")]
|
||||
pub struct Model {
|
||||
#[sea_orm(primary_key)]
|
||||
pub id: i32,
|
||||
pub name: String,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
|
||||
pub enum Relation {
|
||||
#[sea_orm(has_many = "super::transaction::Entity")]
|
||||
Transaction,
|
||||
}
|
||||
|
||||
impl Related<super::transaction::Entity> for Entity {
|
||||
fn to() -> RelationDef {
|
||||
Relation::Transaction.def()
|
||||
}
|
||||
}
|
||||
|
||||
impl ActiveModelBehavior for ActiveModel {}
|
||||
@ -1,4 +1,4 @@
|
||||
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2
|
||||
//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.0
|
||||
|
||||
use sea_orm::entity::prelude::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@ -14,6 +14,21 @@ pub struct Model {
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
|
||||
pub enum Relation {}
|
||||
pub enum Relation {
|
||||
#[sea_orm(
|
||||
belongs_to = "super::transaction::Entity",
|
||||
from = "Column::TransactionId",
|
||||
to = "super::transaction::Column::Id",
|
||||
on_update = "NoAction",
|
||||
on_delete = "Cascade"
|
||||
)]
|
||||
Transaction,
|
||||
}
|
||||
|
||||
impl Related<super::transaction::Entity> for Entity {
|
||||
fn to() -> RelationDef {
|
||||
Relation::Transaction.def()
|
||||
}
|
||||
}
|
||||
|
||||
impl ActiveModelBehavior for ActiveModel {}
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2
|
||||
//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.0
|
||||
|
||||
pub mod prelude;
|
||||
|
||||
pub mod account;
|
||||
pub mod expenditure;
|
||||
pub mod transaction;
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2
|
||||
//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.0
|
||||
|
||||
pub use super::account::Entity as Account;
|
||||
pub use super::expenditure::Entity as Expenditure;
|
||||
pub use super::transaction::Entity as Transaction;
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2
|
||||
//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.0
|
||||
|
||||
use sea_orm::entity::prelude::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@ -18,9 +18,33 @@ pub struct Model {
|
||||
pub description: Option<String>,
|
||||
#[sea_orm(unique)]
|
||||
pub identity_hash: Option<i64>,
|
||||
pub account_id: Option<i32>,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
|
||||
pub enum Relation {}
|
||||
pub enum Relation {
|
||||
#[sea_orm(
|
||||
belongs_to = "super::account::Entity",
|
||||
from = "Column::AccountId",
|
||||
to = "super::account::Column::Id",
|
||||
on_update = "NoAction",
|
||||
on_delete = "Cascade"
|
||||
)]
|
||||
Account,
|
||||
#[sea_orm(has_many = "super::expenditure::Entity")]
|
||||
Expenditure,
|
||||
}
|
||||
|
||||
impl Related<super::account::Entity> for Entity {
|
||||
fn to() -> RelationDef {
|
||||
Relation::Account.def()
|
||||
}
|
||||
}
|
||||
|
||||
impl Related<super::expenditure::Entity> for Entity {
|
||||
fn to() -> RelationDef {
|
||||
Relation::Expenditure.def()
|
||||
}
|
||||
}
|
||||
|
||||
impl ActiveModelBehavior for ActiveModel {}
|
||||
|
||||
@ -12,7 +12,7 @@ path = "src/lib.rs"
|
||||
async-std = { version = "1", features = ["attributes", "tokio1"] }
|
||||
|
||||
[dependencies.sea-orm-migration]
|
||||
version = "1.0.0-rc.4"
|
||||
version = "1.1.0"
|
||||
features = [
|
||||
"runtime-tokio-rustls", # `ASYNC_RUNTIME` feature
|
||||
"sqlx-postgres", # `DATABASE_DRIVER` feature
|
||||
|
||||
@ -3,6 +3,9 @@ pub use sea_orm_migration::prelude::*;
|
||||
pub mod m20230904_141851_create_monzo_tables;
|
||||
mod m20240529_195030_add_transaction_identity_hash;
|
||||
mod m20240603_162500_make_title_optional;
|
||||
mod m20241015_195220_add_account_to_transactions;
|
||||
mod m20241015_200222_add_expenditure_transaction_fk;
|
||||
mod m20241015_200652_add_accounts;
|
||||
|
||||
pub struct Migrator;
|
||||
|
||||
@ -17,6 +20,9 @@ impl MigratorTrait for Migrator {
|
||||
Box::new(m20230904_141851_create_monzo_tables::Migration),
|
||||
Box::new(m20240529_195030_add_transaction_identity_hash::Migration),
|
||||
Box::new(m20240603_162500_make_title_optional::Migration),
|
||||
Box::new(m20241015_195220_add_account_to_transactions::Migration),
|
||||
Box::new(m20241015_200222_add_expenditure_transaction_fk::Migration),
|
||||
Box::new(m20241015_200652_add_accounts::Migration),
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,76 @@
|
||||
use sea_orm_migration::{prelude::*, schema::*};
|
||||
|
||||
#[derive(DeriveMigrationName)]
|
||||
pub struct Migration;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl MigrationTrait for Migration {
|
||||
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||
manager
|
||||
.create_table(
|
||||
Table::create()
|
||||
.table(Account::Table)
|
||||
.if_not_exists()
|
||||
.col(
|
||||
ColumnDef::new(Account::Id)
|
||||
.integer()
|
||||
.auto_increment()
|
||||
.not_null()
|
||||
.primary_key(),
|
||||
)
|
||||
.col(
|
||||
ColumnDef::new(Account::Name)
|
||||
.string()
|
||||
.not_null(),
|
||||
)
|
||||
.to_owned(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
manager.alter_table(
|
||||
TableAlterStatement::new()
|
||||
.table(Transaction::Table)
|
||||
.add_column(ColumnDef::new(Transaction::AccountId).integer())
|
||||
.to_owned(),
|
||||
).await?;
|
||||
|
||||
manager
|
||||
.create_foreign_key(
|
||||
ForeignKey::create()
|
||||
.name("fk_transaction_account_id")
|
||||
.from(Transaction::Table, Transaction::AccountId)
|
||||
.to(Account::Table, Account::Id)
|
||||
.on_delete(ForeignKeyAction::Cascade)
|
||||
.to_owned(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||
manager.alter_table(
|
||||
TableAlterStatement::new()
|
||||
.table(Transaction::Table)
|
||||
.drop_column(Transaction::AccountId)
|
||||
.to_owned(),
|
||||
).await?;
|
||||
|
||||
manager.drop_table(Table::drop().table(Account::Table).to_owned()).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(DeriveIden)]
|
||||
enum Account {
|
||||
Table,
|
||||
Id,
|
||||
Name,
|
||||
}
|
||||
|
||||
#[derive(DeriveIden)]
|
||||
enum Transaction {
|
||||
Table,
|
||||
AccountId,
|
||||
}
|
||||
@ -0,0 +1,40 @@
|
||||
use sea_orm_migration::{prelude::*, schema::*};
|
||||
|
||||
#[derive(DeriveMigrationName)]
|
||||
pub struct Migration;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl MigrationTrait for Migration {
|
||||
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||
manager
|
||||
.create_foreign_key(
|
||||
ForeignKey::create()
|
||||
.name("fk_expenditure_transaction_id")
|
||||
.from(Expenditure::Table, Expenditure::TransactionId)
|
||||
.to(Transaction::Table, Transaction::Id)
|
||||
.on_delete(ForeignKeyAction::Cascade)
|
||||
.to_owned(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||
manager.drop_foreign_key(ForeignKey::drop().name("fk_expenditure_transaction_id").to_owned()).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(DeriveIden)]
|
||||
enum Transaction {
|
||||
Table,
|
||||
Id,
|
||||
}
|
||||
|
||||
#[derive(DeriveIden)]
|
||||
enum Expenditure {
|
||||
Table,
|
||||
TransactionId,
|
||||
}
|
||||
53
migration/src/m20241015_200652_add_accounts.rs
Normal file
53
migration/src/m20241015_200652_add_accounts.rs
Normal file
@ -0,0 +1,53 @@
|
||||
use sea_orm_migration::{prelude::*, schema::*};
|
||||
use crate::sea_orm::sqlx;
|
||||
|
||||
#[derive(DeriveMigrationName)]
|
||||
pub struct Migration;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl MigrationTrait for Migration {
|
||||
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||
manager.exec_stmt(
|
||||
InsertStatement::new()
|
||||
.into_table(Account::Table)
|
||||
.columns(vec![Account::Name])
|
||||
.values_panic(["Monzo".into()])
|
||||
.values_panic(["Flex".into()])
|
||||
.to_owned()
|
||||
).await?;
|
||||
|
||||
let id: i32 = manager.get_connection().query_one(
|
||||
sea_orm::Statement::from_string(
|
||||
manager.get_database_backend(),
|
||||
"SELECT id FROM account WHERE name = 'Monzo'",
|
||||
)
|
||||
).await?.expect("Monzo account not found").try_get_by_index(0)?;
|
||||
|
||||
manager.exec_stmt(
|
||||
UpdateStatement::new()
|
||||
.table(Transaction::Table)
|
||||
.values([(Transaction::AccountId, id.into())])
|
||||
.to_owned()
|
||||
).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||
// This is a data creation migration, so we do not roll back the migration
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(DeriveIden)]
|
||||
enum Account {
|
||||
Table,
|
||||
Id,
|
||||
Name,
|
||||
}
|
||||
|
||||
#[derive(DeriveIden)]
|
||||
enum Transaction {
|
||||
Table,
|
||||
AccountId,
|
||||
}
|
||||
@ -20,14 +20,15 @@ pub struct Insertion {
|
||||
pub async fn insert(
|
||||
db: &DatabaseConnection,
|
||||
monzo_rows: Vec<MonzoRow>,
|
||||
account_id: i32,
|
||||
) -> Result<Vec<String>, AppError> {
|
||||
let mut new_transaction_ids = Vec::new();
|
||||
let insertions = monzo_rows
|
||||
.into_iter()
|
||||
.map(MonzoRow::into_insertion)
|
||||
.map(|row| MonzoRow::into_insertion(row, account_id))
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
|
||||
for insertions in insertions.chunks(400) {
|
||||
for insertions in insertions.chunks(200) {
|
||||
let (new_or_updated_insertions, inserted_transaction_ids) =
|
||||
whittle_insertions(insertions, db).await?;
|
||||
|
||||
@ -263,7 +264,7 @@ mod tests {
|
||||
.collect::<Result<Vec<_>, anyhow::Error>>()
|
||||
.unwrap();
|
||||
|
||||
insert(&dbi.db, data.clone()).await?;
|
||||
insert(&dbi.db, data.clone(), ).await?;
|
||||
let notification = listener.recv().await?;
|
||||
let payload = notification.payload();
|
||||
let mut payload = serde_json::from_str::<Vec<String>>(&payload)?;
|
||||
@ -277,7 +278,7 @@ mod tests {
|
||||
|
||||
assert_eq!(payload, ids, "Inserted IDs do not match");
|
||||
|
||||
insert(&dbi.db, data.clone()).await?;
|
||||
insert(&dbi.db, data.clone(), ).await?;
|
||||
let notification = listener.recv().await?;
|
||||
let payload = notification.payload();
|
||||
let payload = serde_json::from_str::<Vec<String>>(&payload)?;
|
||||
@ -288,7 +289,7 @@ mod tests {
|
||||
|
||||
assert_ne!(altered_data[0].compute_hash(), data[0].compute_hash(), "Alterations have the same hash");
|
||||
|
||||
insert(&dbi.db, altered_data.clone()).await?;
|
||||
insert(&dbi.db, altered_data.clone(), ).await?;
|
||||
let notification = listener.recv().await?;
|
||||
let payload = notification.payload();
|
||||
let payload = serde_json::from_str::<Vec<String>>(&payload)?;
|
||||
@ -297,3 +298,15 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn get_account_id(p0: &DatabaseConnection, p1: Option<String>) -> Result<i32, AppError> {
|
||||
let p1 = p1.unwrap_or("Monzo".to_string());
|
||||
|
||||
entity::prelude::Account::find()
|
||||
.filter(entity::account::Column::Name.eq(p1))
|
||||
.select_only()
|
||||
.column(entity::account::Column::Id)
|
||||
.into_tuple::<i32>()
|
||||
.one(p0).await?
|
||||
.ok_or(AppError::BadRequest(anyhow!("Account not found")))
|
||||
}
|
||||
@ -1,9 +1,9 @@
|
||||
#[allow(dead_code)]
|
||||
mod headings {
|
||||
pub mod headings {
|
||||
#[allow(unused_imports)]
|
||||
pub use super::super::ingestion_logic::headings::*;
|
||||
|
||||
// Additional FLex headings
|
||||
// Additional Flex headings
|
||||
pub const MONEY_OUT: usize = 16;
|
||||
pub const MONEY_IN: usize = 17;
|
||||
}
|
||||
|
||||
@ -9,6 +9,7 @@ use sea_orm::prelude::Decimal;
|
||||
use sea_orm::IntoActiveModel;
|
||||
use serde_json::Value;
|
||||
use std::hash::Hash;
|
||||
use crate::ingestion::flex;
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub(crate) mod headings {
|
||||
@ -77,7 +78,7 @@ impl MonzoRow {
|
||||
hasher.finish() as i64
|
||||
}
|
||||
|
||||
pub fn into_insertion(self) -> Result<Insertion, anyhow::Error> {
|
||||
pub fn into_insertion(self, account_id: i32) -> Result<Insertion, anyhow::Error> {
|
||||
let identity_hash = self.compute_hash();
|
||||
|
||||
let expenditures: Vec<_> = match &self.category_split {
|
||||
@ -106,6 +107,7 @@ impl MonzoRow {
|
||||
total_amount: self.total_amount,
|
||||
description: self.description,
|
||||
identity_hash: Some(identity_hash),
|
||||
account_id: Some(account_id),
|
||||
}
|
||||
.into_active_model(),
|
||||
|
||||
|
||||
@ -19,36 +19,52 @@ pub async fn monzo_batched_json(
|
||||
.map(from_json_row)
|
||||
.collect::<Result<_, _>>()?;
|
||||
|
||||
db::insert(&db, data).await?;
|
||||
// We default to the main account for JSON ingestion for now.
|
||||
let account_id = db::get_account_id(&db, None).await?;
|
||||
db::insert(&db, data, account_id).await?;
|
||||
|
||||
Ok("Ok")
|
||||
}
|
||||
|
||||
async fn extract_csv(mut multipart: Multipart) -> Result<Option<Bytes>, MultipartError> {
|
||||
let csv = loop {
|
||||
match multipart.next_field().await? {
|
||||
Some(field) if field.name() == Some("csv") => {
|
||||
break Some(field.bytes().await?);
|
||||
async fn extract_csv_and_account_name(mut multipart: Multipart) -> Result<(Option<Bytes>, Option<String>), MultipartError> {
|
||||
let mut csv = None;
|
||||
let mut account_name = None;
|
||||
|
||||
while let Some(field) = multipart.next_field().await? {
|
||||
match field.name() {
|
||||
Some("csv") => {
|
||||
csv = Some(field.bytes().await?);
|
||||
}
|
||||
|
||||
Some(_) => {}
|
||||
None => break None,
|
||||
Some("account_id") => {
|
||||
account_name = Some(field.text().await?);
|
||||
}
|
||||
|
||||
_ => {}
|
||||
}
|
||||
};
|
||||
|
||||
Ok(csv)
|
||||
if csv.is_some() && account_name.is_some() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok((csv, account_name))
|
||||
}
|
||||
|
||||
pub async fn monzo_batched_csv(
|
||||
Extension(db): Extension<DatabaseConnection>,
|
||||
multipart: Multipart,
|
||||
) -> Result<&'static str, AppError> {
|
||||
static CSV_MISSING_ERR_MSG: &str = "No CSV file provided. Expected a multipart request with a `csv` field containing the contents of the CSV.";
|
||||
|
||||
let csv = extract_csv(multipart)
|
||||
let (csv, account_name) = extract_csv_and_account_name(multipart)
|
||||
.await
|
||||
.map_err(|e| AppError::BadRequest(anyhow!(e)))
|
||||
.and_then(|csv| csv.ok_or(AppError::BadRequest(anyhow!(CSV_MISSING_ERR_MSG))))?;
|
||||
.map_err(|e| AppError::BadRequest(anyhow!(e)))?;
|
||||
|
||||
let Some(csv) = csv else {
|
||||
return Err(AppError::BadRequest(anyhow!(CSV_MISSING_ERR_MSG)));
|
||||
};
|
||||
|
||||
let account_id = db::get_account_id(&db, account_name).await?;
|
||||
|
||||
let csv = Cursor::new(csv);
|
||||
let mut csv = csv::Reader::from_reader(csv);
|
||||
@ -58,7 +74,7 @@ pub async fn monzo_batched_csv(
|
||||
.map(from_csv_row)
|
||||
.collect::<Result<_, _>>()?;
|
||||
|
||||
db::insert(&db, data).await?;
|
||||
db::insert(&db, data, account_id).await?;
|
||||
|
||||
Ok("Ok")
|
||||
}
|
||||
|
||||
@ -44,6 +44,10 @@ enum Commands {
|
||||
Csv {
|
||||
/// The path of the CSV file to ingest.
|
||||
csv_file: PathBuf,
|
||||
|
||||
/// The name of the account to ingest the CSV for.
|
||||
#[clap(long, short)]
|
||||
account: String,
|
||||
},
|
||||
}
|
||||
|
||||
@ -94,7 +98,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
serve_web(addr, connection).await?;
|
||||
}
|
||||
|
||||
Commands::Csv { csv_file } => {
|
||||
Commands::Csv { csv_file, account: account_name } => {
|
||||
let mut csv = csv::Reader::from_reader(File::open(csv_file)?);
|
||||
let data = csv.records();
|
||||
let data = data
|
||||
@ -102,7 +106,8 @@ async fn main() -> anyhow::Result<()> {
|
||||
.map(from_csv_row)
|
||||
.collect::<Result<_, _>>()?;
|
||||
|
||||
db::insert(&connection, data).await?;
|
||||
let account_id = db::get_account_id(&connection, Some(account_name)).await?;
|
||||
db::insert(&connection, data, account_id).await?;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user