diff --git a/src/ingestion/db.rs b/src/ingestion/db.rs index e08008d..762658e 100644 --- a/src/ingestion/db.rs +++ b/src/ingestion/db.rs @@ -4,7 +4,7 @@ use anyhow::anyhow; use entity::{expenditure, transaction}; use sea_orm::sea_query::OnConflict; use sea_orm::{ - ColumnTrait, DatabaseConnection, EntityTrait, Iterable, QuerySelect, TransactionTrait, + ColumnTrait, DatabaseConnection, DbErr, EntityTrait, Iterable, QuerySelect, TransactionTrait, }; use sea_orm::{ConnectionTrait, DatabaseTransaction, QueryFilter}; @@ -28,10 +28,14 @@ pub async fn insert( .collect::, _>>()?; for insertions in insertions.chunks(400) { - let tx = db.begin().await?; let (new_or_updated_insertions, inserted_transaction_ids) = - whittle_insertions(insertions, &tx).await?; + whittle_insertions(insertions, &db).await?; + if new_or_updated_insertions.is_empty() { + continue; + } + + let tx = db.begin().await?; update_transactions(&tx, &new_or_updated_insertions).await?; update_expenditures(&tx, &new_or_updated_insertions, &inserted_transaction_ids).await?; tx.commit().await?; @@ -52,6 +56,10 @@ async fn update_expenditures( new_or_updated_insertions: &[&Insertion], inserted_transaction_ids: &[String], ) -> Result<(), AppError> { + if new_or_updated_insertions.is_empty() { + return Ok(()); + } + // Expenditures can change as we re-categorise them, so we delete all the old ones and // insert an entirely new set to ensure we don't end up leaving old ones around. expenditure::Entity::delete_many() @@ -81,7 +89,11 @@ async fn update_expenditures( async fn update_transactions( tx: &DatabaseTransaction, new_or_updated_insertions: &[&Insertion], -) -> Result<(), AppError> { +) -> Result<(), DbErr> { + if new_or_updated_insertions.is_empty() { + return Ok(()); + } + let transactions = new_or_updated_insertions .iter() .map(|i| &i.transaction) @@ -100,7 +112,7 @@ async fn update_transactions( async fn whittle_insertions<'a>( insertions: &'a [Insertion], - tx: &DatabaseTransaction, + tx: &DatabaseConnection, ) -> Result<(Vec<&'a Insertion>, Vec), AppError> { let existing_hashes = transaction::Entity::find() .select_only() @@ -143,10 +155,10 @@ async fn notify_new_transactions( } mod tests { - use super::notify_new_transactions; + use super::{notify_new_transactions, update_expenditures, update_transactions}; use anyhow::Error; use migration::MigratorTrait; - use sea_orm::DatabaseConnection; + use sea_orm::{DatabaseConnection, TransactionTrait}; use sqlx::postgres::PgListener; use sqlx::PgPool; use testcontainers::runners::AsyncRunner; @@ -178,6 +190,18 @@ mod tests { Ok((container, db, pool)) } + #[tokio::test] + async fn test_no_new_insertions() -> Result<(), Error> { + let (_container, db, _pool) = initialise().await?; + let insertions = vec![]; + let tx = db.begin().await?; + update_transactions(&tx, &insertions).await?; + update_expenditures(&tx, &insertions, &vec![]).await?; + tx.commit().await?; + + Ok(()) + } + #[tokio::test] async fn test_notify() -> Result<(), Error> { let (_container, db, pool) = initialise().await?;