Compare commits

...

2 Commits

Author SHA1 Message Date
046ce44d23 rustfmt
All checks were successful
Build and Publish Docker Container / build (push) Successful in 10m2s
2024-06-03 18:41:32 +01:00
f70d844ff3 Fix ORM failing to handle empty lists... 2024-06-03 18:41:21 +01:00
3 changed files with 71 additions and 26 deletions

View File

@ -6,37 +6,51 @@ pub struct Migration;
#[async_trait::async_trait] #[async_trait::async_trait]
impl MigrationTrait for Migration { impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager.alter_table( manager
.alter_table(
TableAlterStatement::new() TableAlterStatement::new()
.table(Transaction::Table) .table(Transaction::Table)
.modify_column(ColumnDef::new(Transaction::Title).string().null()) .modify_column(ColumnDef::new(Transaction::Title).string().null())
.to_owned() .to_owned(),
).await?; )
.await?;
// Set all empty string titles to null // Set all empty string titles to null
manager.get_connection().execute_unprepared(r#" manager
.get_connection()
.execute_unprepared(
r#"
update transaction update transaction
set title = null set title = null
where title = '' where title = ''
"#).await?; "#,
)
.await?;
Ok(()) Ok(())
} }
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
// Set all null titles to empty string when reverting // Set all null titles to empty string when reverting
manager.get_connection().execute_unprepared(r#" manager
.get_connection()
.execute_unprepared(
r#"
update transaction update transaction
set title = '' set title = ''
where title is null where title is null
"#).await?; "#,
)
.await?;
manager.alter_table( manager
.alter_table(
TableAlterStatement::new() TableAlterStatement::new()
.table(Transaction::Table) .table(Transaction::Table)
.modify_column(ColumnDef::new(Transaction::Title).string().not_null()) .modify_column(ColumnDef::new(Transaction::Title).string().not_null())
.to_owned() .to_owned(),
).await )
.await
} }
} }

View File

@ -4,7 +4,7 @@ use anyhow::anyhow;
use entity::{expenditure, transaction}; use entity::{expenditure, transaction};
use sea_orm::sea_query::OnConflict; use sea_orm::sea_query::OnConflict;
use sea_orm::{ use sea_orm::{
ColumnTrait, DatabaseConnection, EntityTrait, Iterable, QuerySelect, TransactionTrait, ColumnTrait, DatabaseConnection, DbErr, EntityTrait, Iterable, QuerySelect, TransactionTrait,
}; };
use sea_orm::{ConnectionTrait, DatabaseTransaction, QueryFilter}; use sea_orm::{ConnectionTrait, DatabaseTransaction, QueryFilter};
@ -28,10 +28,14 @@ pub async fn insert(
.collect::<Result<Vec<_>, _>>()?; .collect::<Result<Vec<_>, _>>()?;
for insertions in insertions.chunks(400) { for insertions in insertions.chunks(400) {
let tx = db.begin().await?;
let (new_or_updated_insertions, inserted_transaction_ids) = let (new_or_updated_insertions, inserted_transaction_ids) =
whittle_insertions(insertions, &tx).await?; whittle_insertions(insertions, &db).await?;
if new_or_updated_insertions.is_empty() {
continue;
}
let tx = db.begin().await?;
update_transactions(&tx, &new_or_updated_insertions).await?; update_transactions(&tx, &new_or_updated_insertions).await?;
update_expenditures(&tx, &new_or_updated_insertions, &inserted_transaction_ids).await?; update_expenditures(&tx, &new_or_updated_insertions, &inserted_transaction_ids).await?;
tx.commit().await?; tx.commit().await?;
@ -52,6 +56,10 @@ async fn update_expenditures(
new_or_updated_insertions: &[&Insertion], new_or_updated_insertions: &[&Insertion],
inserted_transaction_ids: &[String], inserted_transaction_ids: &[String],
) -> Result<(), AppError> { ) -> Result<(), AppError> {
if new_or_updated_insertions.is_empty() {
return Ok(());
}
// Expenditures can change as we re-categorise them, so we delete all the old ones and // Expenditures can change as we re-categorise them, so we delete all the old ones and
// insert an entirely new set to ensure we don't end up leaving old ones around. // insert an entirely new set to ensure we don't end up leaving old ones around.
expenditure::Entity::delete_many() expenditure::Entity::delete_many()
@ -81,7 +89,11 @@ async fn update_expenditures(
async fn update_transactions( async fn update_transactions(
tx: &DatabaseTransaction, tx: &DatabaseTransaction,
new_or_updated_insertions: &[&Insertion], new_or_updated_insertions: &[&Insertion],
) -> Result<(), AppError> { ) -> Result<(), DbErr> {
if new_or_updated_insertions.is_empty() {
return Ok(());
}
let transactions = new_or_updated_insertions let transactions = new_or_updated_insertions
.iter() .iter()
.map(|i| &i.transaction) .map(|i| &i.transaction)
@ -100,7 +112,7 @@ async fn update_transactions(
async fn whittle_insertions<'a>( async fn whittle_insertions<'a>(
insertions: &'a [Insertion], insertions: &'a [Insertion],
tx: &DatabaseTransaction, tx: &DatabaseConnection,
) -> Result<(Vec<&'a Insertion>, Vec<String>), AppError> { ) -> Result<(Vec<&'a Insertion>, Vec<String>), AppError> {
let existing_hashes = transaction::Entity::find() let existing_hashes = transaction::Entity::find()
.select_only() .select_only()
@ -143,10 +155,10 @@ async fn notify_new_transactions(
} }
mod tests { mod tests {
use super::notify_new_transactions; use super::{notify_new_transactions, update_expenditures, update_transactions};
use anyhow::Error; use anyhow::Error;
use migration::MigratorTrait; use migration::MigratorTrait;
use sea_orm::DatabaseConnection; use sea_orm::{DatabaseConnection, TransactionTrait};
use sqlx::postgres::PgListener; use sqlx::postgres::PgListener;
use sqlx::PgPool; use sqlx::PgPool;
use testcontainers::runners::AsyncRunner; use testcontainers::runners::AsyncRunner;
@ -178,6 +190,18 @@ mod tests {
Ok((container, db, pool)) Ok((container, db, pool))
} }
#[tokio::test]
async fn test_no_new_insertions() -> Result<(), Error> {
let (_container, db, _pool) = initialise().await?;
let insertions = vec![];
let tx = db.begin().await?;
update_transactions(&tx, &insertions).await?;
update_expenditures(&tx, &insertions, &vec![]).await?;
tx.commit().await?;
Ok(())
}
#[tokio::test] #[tokio::test]
async fn test_notify() -> Result<(), Error> { async fn test_notify() -> Result<(), Error> {
let (_container, db, pool) = initialise().await?; let (_container, db, pool) = initialise().await?;

View File

@ -174,12 +174,14 @@ fn test_json() {
let json: Vec<Vec<Value>> = serde_json::from_str(json).unwrap(); let json: Vec<Vec<Value>> = serde_json::from_str(json).unwrap();
let mut csv_reader = csv::Reader::from_reader(csv.as_bytes()); let mut csv_reader = csv::Reader::from_reader(csv.as_bytes());
let json_rows = json.iter() let json_rows = json
.iter()
.map(|row| from_json_row(row.clone())) .map(|row| from_json_row(row.clone()))
.collect::<Result<Vec<_>, anyhow::Error>>() .collect::<Result<Vec<_>, anyhow::Error>>()
.unwrap(); .unwrap();
let csv_rows = csv_reader.records() let csv_rows = csv_reader
.records()
.map(|record| from_csv_row(record.unwrap())) .map(|record| from_csv_row(record.unwrap()))
.collect::<Result<Vec<_>, anyhow::Error>>() .collect::<Result<Vec<_>, anyhow::Error>>()
.unwrap(); .unwrap();
@ -188,7 +190,12 @@ fn test_json() {
for (i, (json_row, csv_row)) in json_rows.iter().zip(csv_rows.iter()).enumerate() { for (i, (json_row, csv_row)) in json_rows.iter().zip(csv_rows.iter()).enumerate() {
assert_eq!(json_row, csv_row, "Row {} is different", i); assert_eq!(json_row, csv_row, "Row {} is different", i);
assert_eq!(json_row.compute_hash(), csv_row.compute_hash(), "Row {} hash are different", i); assert_eq!(
json_row.compute_hash(),
csv_row.compute_hash(),
"Row {} hash are different",
i
);
} }
} }