Compare commits

..

No commits in common. "8ce60ce2788675cc69a9b1f27eea61da1e2bcf66" and "848da0f5f3a17f416120c32c3295d2c67037188e" have entirely different histories.

View File

@ -1,85 +1,26 @@
use crate::error::AppError;
use anyhow::anyhow; use anyhow::anyhow;
use entity::{expenditure, transaction}; use entity::{expenditure, transaction};
use migration::PostgresQueryBuilder;
use sea_orm::sea_query::OnConflict; use sea_orm::sea_query::OnConflict;
use sea_orm::{ConnectionTrait, DatabaseBackend, QueryFilter, QueryTrait, Statement};
use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, Iterable, TransactionTrait}; use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, Iterable, TransactionTrait};
use sea_orm::{ use migration::PostgresQueryBuilder;
ConnectionTrait, DatabaseBackend, DatabaseTransaction, DbErr, QueryFilter, QueryTrait, use crate::error::AppError;
Statement,
};
pub struct Insertion { pub struct Insertion {
pub transaction: transaction::ActiveModel, pub transaction: transaction::ActiveModel,
pub contained_expenditures: Vec<expenditure::ActiveModel>, pub contained_expenditures: Vec<expenditure::ActiveModel>,
} }
async fn update_expenditures(
tx: &DatabaseTransaction,
insertions: &[Insertion],
) -> Result<(), DbErr> {
// Expenditures can change as we re-categorise them, so we delete all the old ones and
// insert an entirely new set to ensure we don't end up leaving old ones around.
expenditure::Entity::delete_many()
.filter(
expenditure::Column::TransactionId
.is_in(insertions.iter().map(|i| i.transaction.id.as_ref())),
)
.exec(tx)
.await?;
expenditure::Entity::insert_many(
insertions
.iter()
.flat_map(|i| &i.contained_expenditures)
.cloned(),
)
.on_conflict(
OnConflict::columns(vec![
expenditure::Column::TransactionId,
expenditure::Column::Category,
])
.update_columns(expenditure::Column::iter())
.to_owned(),
)
.exec(tx)
.await?;
Ok(())
}
// Note while this is more efficient in db calls, it does bind together the entire group. // Note while this is more efficient in db calls, it does bind together the entire group.
// We employ a batching process for now to try balance speed and failure rate, but it is worth // We employ a batching process for now to try balance speed and failure rate, but it is worth
// trying to move failures earlier and improve reporting. // trying to move failures earlier and improve reporting.
pub async fn insert( pub async fn insert(db: &DatabaseConnection, insertions: Vec<Insertion>) -> Result<Vec<String>, AppError> {
db: &DatabaseConnection,
insertions: Vec<Insertion>,
) -> Result<Vec<String>, AppError> {
let mut new_transaction_ids = Vec::new(); let mut new_transaction_ids = Vec::new();
for insertions in insertions.chunks(400) { for insertions in insertions.chunks(400) {
let tx = db.begin().await?; let tx = db.begin().await?;
let inserted_transaction_ids = update_transactions(insertions, &tx).await?;
update_expenditures(&tx, &insertions).await?;
tx.commit().await?;
// We wait until the transaction is committed before adding the new transaction ids to the let insert = transaction::Entity::insert_many(insertions.iter().map(|i| &i.transaction).cloned())
// list to avoid issues with the transaction being rolled back.
new_transaction_ids.extend(inserted_transaction_ids);
}
// Notify the new transactions once everything is committed.
notify_new_transactions(db, &new_transaction_ids).await?;
Ok(new_transaction_ids)
}
async fn update_transactions(
insertions: &[Insertion],
tx: &DatabaseTransaction,
) -> Result<Vec<String>, AppError> {
let insert =
transaction::Entity::insert_many(insertions.iter().map(|i| &i.transaction).cloned())
.on_conflict( .on_conflict(
OnConflict::column(transaction::Column::Id) OnConflict::column(transaction::Column::Id)
.update_columns(transaction::Column::iter()) .update_columns(transaction::Column::iter())
@ -89,27 +30,56 @@ async fn update_transactions(
.returning_col(transaction::Column::Id) .returning_col(transaction::Column::Id)
.build(PostgresQueryBuilder); .build(PostgresQueryBuilder);
let inserted_transaction_ids = tx let inserted_transaction_ids = tx.query_all(Statement::from_sql_and_values(
.query_all(Statement::from_sql_and_values(
DatabaseBackend::Postgres, DatabaseBackend::Postgres,
insert.0, insert.0,
insert.1, insert.1,
)) )).await?
.await? .iter()
.iter() .map(|r| r.try_get_by("id"))
.map(|r| r.try_get_by("id")) .collect::<Result<Vec<String>, _>>()?;
.collect::<Result<Vec<String>, _>>()?;
Ok(inserted_transaction_ids)
}
async fn notify_new_transactions( // Expenditures can change as we re-categorise them, so we delete all the old ones and
db: &DatabaseConnection, // insert an entirely new set to ensure we don't end up leaving old ones around.
new_transaction_ids: &[String], expenditure::Entity::delete_many()
) -> Result<(), AppError> { .filter(
let payload = serde_json::to_string(&new_transaction_ids).map_err(|e| anyhow!(e))?; expenditure::Column::TransactionId
.is_in(insertions.iter().map(|i| i.transaction.id.as_ref())),
)
.exec(&tx)
.await?;
db.execute_unprepared(&format!(r#"NOTIFY monzo_new_transactions, {payload}"#,)) expenditure::Entity::insert_many(
insertions
.iter()
.flat_map(|i| &i.contained_expenditures)
.cloned(),
)
.on_conflict(
OnConflict::columns(vec![
expenditure::Column::TransactionId,
expenditure::Column::Category,
])
.update_columns(expenditure::Column::iter())
.to_owned(),
)
.exec(&tx)
.await?; .await?;
Ok(()) tx.commit().await?;
new_transaction_ids.extend(inserted_transaction_ids);
}
let payload = serde_json::to_string(&new_transaction_ids)
.map_err(|e| anyhow!(e))?;
db.execute(
Statement::from_sql_and_values(
DatabaseBackend::Postgres,
"NOTIFY monzo_new_transactions, $1",
vec![sea_orm::Value::from(payload)],
)
).await?;
Ok(new_transaction_ids)
} }