Return newly inserted transaction ids

This commit is contained in:
Joshua Coles 2024-05-28 15:08:10 +01:00
parent e375e7d9dc
commit 11fa106cf5

View File

@ -1,7 +1,8 @@
use entity::{expenditure, transaction}; use entity::{expenditure, transaction};
use sea_orm::sea_query::OnConflict; use sea_orm::sea_query::OnConflict;
use sea_orm::QueryFilter; use sea_orm::{ConnectionTrait, DatabaseBackend, QueryFilter, QueryTrait, Statement};
use sea_orm::{ColumnTrait, DatabaseConnection, DbErr, EntityTrait, Iterable, TransactionTrait}; use sea_orm::{ColumnTrait, DatabaseConnection, DbErr, EntityTrait, Iterable, TransactionTrait};
use migration::PostgresQueryBuilder;
pub struct Insertion { pub struct Insertion {
pub transaction: transaction::ActiveModel, pub transaction: transaction::ActiveModel,
@ -9,20 +10,34 @@ pub struct Insertion {
} }
// Note while this is more efficient in db calls, it does bind together the entire group. // Note while this is more efficient in db calls, it does bind together the entire group.
// We employ a batching process for now to try balance speed and failure rate but it is worth trying // We employ a batching process for now to try balance speed and failure rate, but it is worth
// to move failures earlier and improve reporting. // trying to move failures earlier and improve reporting.
pub async fn insert(db: &DatabaseConnection, insertions: Vec<Insertion>) -> Result<(), DbErr> { pub async fn insert(db: &DatabaseConnection, insertions: Vec<Insertion>) -> Result<Vec<String>, DbErr> {
let mut new_transaction_ids = Vec::new();
for insertions in insertions.chunks(400) { for insertions in insertions.chunks(400) {
let tx = db.begin().await?; let tx = db.begin().await?;
transaction::Entity::insert_many(insertions.iter().map(|i| &i.transaction).cloned()) let insert = transaction::Entity::insert_many(insertions.iter().map(|i| &i.transaction).cloned())
.on_conflict( .on_conflict(
OnConflict::column(transaction::Column::Id) OnConflict::column(transaction::Column::Id)
.update_columns(transaction::Column::iter()) .update_columns(transaction::Column::iter())
.to_owned(), .to_owned(),
) )
.exec(&tx) .into_query()
.await?; .returning_col(transaction::Column::Id)
.build(PostgresQueryBuilder);
let inserted_transaction_ids = tx.query_all(Statement::from_sql_and_values(
DatabaseBackend::Postgres,
insert.0,
insert.1,
)).await?
.iter()
.map(|r| r.try_get_by("id"))
.collect::<Result<Vec<String>, _>>()?;
new_transaction_ids.extend(inserted_transaction_ids);
// Expenditures can change as we recagegorise them, so we delete all the old ones and insert // Expenditures can change as we recagegorise them, so we delete all the old ones and insert
// an entirely new set to ensure we don't end up leaving old ones around. // an entirely new set to ensure we don't end up leaving old ones around.
@ -54,5 +69,5 @@ pub async fn insert(db: &DatabaseConnection, insertions: Vec<Insertion>) -> Resu
tx.commit().await?; tx.commit().await?;
} }
Ok(()) Ok(new_transaction_ids)
} }