From 11fa106cf54bd3b9ba5732278ff75aae1c6c6ad1 Mon Sep 17 00:00:00 2001 From: Joshua Coles Date: Tue, 28 May 2024 15:08:10 +0100 Subject: [PATCH] Return newly inserted transaction ids --- src/ingestion/db.rs | 31 +++++++++++++++++++++++-------- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/src/ingestion/db.rs b/src/ingestion/db.rs index 9a4f94f..b592692 100644 --- a/src/ingestion/db.rs +++ b/src/ingestion/db.rs @@ -1,7 +1,8 @@ use entity::{expenditure, transaction}; use sea_orm::sea_query::OnConflict; -use sea_orm::QueryFilter; +use sea_orm::{ConnectionTrait, DatabaseBackend, QueryFilter, QueryTrait, Statement}; use sea_orm::{ColumnTrait, DatabaseConnection, DbErr, EntityTrait, Iterable, TransactionTrait}; +use migration::PostgresQueryBuilder; pub struct Insertion { pub transaction: transaction::ActiveModel, @@ -9,20 +10,34 @@ pub struct Insertion { } // Note while this is more efficient in db calls, it does bind together the entire group. -// We employ a batching process for now to try balance speed and failure rate but it is worth trying -// to move failures earlier and improve reporting. -pub async fn insert(db: &DatabaseConnection, insertions: Vec) -> Result<(), DbErr> { +// We employ a batching process for now to try balance speed and failure rate, but it is worth +// trying to move failures earlier and improve reporting. +pub async fn insert(db: &DatabaseConnection, insertions: Vec) -> Result, DbErr> { + let mut new_transaction_ids = Vec::new(); + for insertions in insertions.chunks(400) { let tx = db.begin().await?; - transaction::Entity::insert_many(insertions.iter().map(|i| &i.transaction).cloned()) + let insert = transaction::Entity::insert_many(insertions.iter().map(|i| &i.transaction).cloned()) .on_conflict( OnConflict::column(transaction::Column::Id) .update_columns(transaction::Column::iter()) .to_owned(), ) - .exec(&tx) - .await?; + .into_query() + .returning_col(transaction::Column::Id) + .build(PostgresQueryBuilder); + + let inserted_transaction_ids = tx.query_all(Statement::from_sql_and_values( + DatabaseBackend::Postgres, + insert.0, + insert.1, + )).await? + .iter() + .map(|r| r.try_get_by("id")) + .collect::, _>>()?; + + new_transaction_ids.extend(inserted_transaction_ids); // Expenditures can change as we recagegorise them, so we delete all the old ones and insert // an entirely new set to ensure we don't end up leaving old ones around. @@ -54,5 +69,5 @@ pub async fn insert(db: &DatabaseConnection, insertions: Vec) -> Resu tx.commit().await?; } - Ok(()) + Ok(new_transaction_ids) }