monzo-ingestion/src/ingestion/db.rs
2024-05-28 15:48:55 +01:00

59 lines
2.0 KiB
Rust

use entity::{expenditure, transaction};
use sea_orm::sea_query::OnConflict;
use sea_orm::QueryFilter;
use sea_orm::{ColumnTrait, DatabaseConnection, DbErr, EntityTrait, Iterable, TransactionTrait};
pub struct Insertion {
pub transaction: transaction::ActiveModel,
pub contained_expenditures: Vec<expenditure::ActiveModel>,
}
// Note while this is more efficient in db calls, it does bind together the entire group.
// We employ a batching process for now to try balance speed and failure rate but it is worth trying
// to move failures earlier and improve reporting.
pub async fn insert(db: &DatabaseConnection, insertions: Vec<Insertion>) -> Result<(), DbErr> {
for insertions in insertions.chunks(400) {
let tx = db.begin().await?;
transaction::Entity::insert_many(insertions.iter().map(|i| &i.transaction).cloned())
.on_conflict(
OnConflict::column(transaction::Column::Id)
.update_columns(transaction::Column::iter())
.to_owned(),
)
.exec(&tx)
.await?;
// Expenditures can change as we recagegorise them, so we delete all the old ones and insert
// an entirely new set to ensure we don't end up leaving old ones around.
expenditure::Entity::delete_many()
.filter(
expenditure::Column::TransactionId
.is_in(insertions.iter().map(|i| i.transaction.id.as_ref())),
)
.exec(&tx)
.await?;
expenditure::Entity::insert_many(
insertions
.iter()
.flat_map(|i| &i.contained_expenditures)
.cloned(),
)
.on_conflict(
OnConflict::columns(vec![
expenditure::Column::TransactionId,
expenditure::Column::Category,
])
.update_columns(expenditure::Column::iter())
.to_owned(),
)
.exec(&tx)
.await?;
tx.commit().await?;
}
Ok(())
}