From fbf473b3b4e4e17525efd833e3524be494c9b716 Mon Sep 17 00:00:00 2001 From: Joshua Coles Date: Tue, 28 May 2024 17:38:25 +0100 Subject: [PATCH] Split up the db methods and move to unprepared statements for the `notify` call --- src/ingestion/db.rs | 133 +++++++++++++++++++++++++------------------- 1 file changed, 75 insertions(+), 58 deletions(-) diff --git a/src/ingestion/db.rs b/src/ingestion/db.rs index 9ea51cc..dc69812 100644 --- a/src/ingestion/db.rs +++ b/src/ingestion/db.rs @@ -1,7 +1,7 @@ use anyhow::anyhow; use entity::{expenditure, transaction}; use sea_orm::sea_query::OnConflict; -use sea_orm::{ConnectionTrait, DatabaseBackend, QueryFilter, QueryTrait, Statement}; +use sea_orm::{ConnectionTrait, DatabaseBackend, DatabaseTransaction, DbErr, QueryFilter, QueryTrait, Statement}; use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, Iterable, TransactionTrait}; use migration::PostgresQueryBuilder; use crate::error::AppError; @@ -11,6 +11,40 @@ pub struct Insertion { pub contained_expenditures: Vec, } +async fn update_expenditures( + tx: &DatabaseTransaction, + insertions: &[Insertion], +) -> Result<(), DbErr> { + // Expenditures can change as we re-categorise them, so we delete all the old ones and + // insert an entirely new set to ensure we don't end up leaving old ones around. + expenditure::Entity::delete_many() + .filter( + expenditure::Column::TransactionId + .is_in(insertions.iter().map(|i| i.transaction.id.as_ref())), + ) + .exec(tx) + .await?; + + expenditure::Entity::insert_many( + insertions + .iter() + .flat_map(|i| &i.contained_expenditures) + .cloned(), + ) + .on_conflict( + OnConflict::columns(vec![ + expenditure::Column::TransactionId, + expenditure::Column::Category, + ]) + .update_columns(expenditure::Column::iter()) + .to_owned(), + ) + .exec(tx) + .await?; + + Ok(()) +} + // Note while this is more efficient in db calls, it does bind together the entire group. // We employ a batching process for now to try balance speed and failure rate, but it is worth // trying to move failures earlier and improve reporting. @@ -19,67 +53,50 @@ pub async fn insert(db: &DatabaseConnection, insertions: Vec) -> Resu for insertions in insertions.chunks(400) { let tx = db.begin().await?; - - let insert = transaction::Entity::insert_many(insertions.iter().map(|i| &i.transaction).cloned()) - .on_conflict( - OnConflict::column(transaction::Column::Id) - .update_columns(transaction::Column::iter()) - .to_owned(), - ) - .into_query() - .returning_col(transaction::Column::Id) - .build(PostgresQueryBuilder); - - let inserted_transaction_ids = tx.query_all(Statement::from_sql_and_values( - DatabaseBackend::Postgres, - insert.0, - insert.1, - )).await? - .iter() - .map(|r| r.try_get_by("id")) - .collect::, _>>()?; - - // Expenditures can change as we re-categorise them, so we delete all the old ones and - // insert an entirely new set to ensure we don't end up leaving old ones around. - expenditure::Entity::delete_many() - .filter( - expenditure::Column::TransactionId - .is_in(insertions.iter().map(|i| i.transaction.id.as_ref())), - ) - .exec(&tx) - .await?; - - expenditure::Entity::insert_many( - insertions - .iter() - .flat_map(|i| &i.contained_expenditures) - .cloned(), - ) - .on_conflict( - OnConflict::columns(vec![ - expenditure::Column::TransactionId, - expenditure::Column::Category, - ]) - .update_columns(expenditure::Column::iter()) - .to_owned(), - ) - .exec(&tx) - .await?; - + let inserted_transaction_ids = update_transactions(insertions, &tx).await?; + update_expenditures(&tx, &insertions).await?; tx.commit().await?; + + // We wait until the transaction is committed before adding the new transaction ids to the + // list to avoid issues with the transaction being rolled back. new_transaction_ids.extend(inserted_transaction_ids); } - let payload = serde_json::to_string(&new_transaction_ids) - .map_err(|e| anyhow!(e))?; - - db.execute( - Statement::from_sql_and_values( - DatabaseBackend::Postgres, - "NOTIFY monzo_new_transactions, $1", - vec![sea_orm::Value::from(payload)], - ) - ).await?; + // Notify the new transactions once everything is committed. + notify_new_transactions(db, &new_transaction_ids).await?; Ok(new_transaction_ids) } + +async fn update_transactions(insertions: &[Insertion], tx: &DatabaseTransaction) -> Result, AppError> { + let insert = transaction::Entity::insert_many(insertions.iter().map(|i| &i.transaction).cloned()) + .on_conflict( + OnConflict::column(transaction::Column::Id) + .update_columns(transaction::Column::iter()) + .to_owned(), + ) + .into_query() + .returning_col(transaction::Column::Id) + .build(PostgresQueryBuilder); + + let inserted_transaction_ids = tx.query_all(Statement::from_sql_and_values( + DatabaseBackend::Postgres, + insert.0, + insert.1, + )).await? + .iter() + .map(|r| r.try_get_by("id")) + .collect::, _>>()?; + Ok(inserted_transaction_ids) +} + +async fn notify_new_transactions(db: &DatabaseConnection, new_transaction_ids: &[String]) -> Result<(), AppError> { + let payload = serde_json::to_string(&new_transaction_ids) + .map_err(|e| anyhow!(e))?; + + db.execute_unprepared( + &format!(r#"NOTIFY monzo_new_transactions, {payload}"#, ), + ).await?; + + Ok(()) +}