Notify monzo_new_transactions with a list of IDs when we insert new transactions
All checks were successful
Build and Publish Docker Container / build (push) Successful in 7m10s

This commit is contained in:
Joshua Coles 2024-05-28 15:48:34 +01:00
parent 11fa106cf5
commit 848da0f5f3
2 changed files with 19 additions and 7 deletions

View File

@ -12,7 +12,7 @@ pub enum AppError {
#[error("Invalid request {0}")] #[error("Invalid request {0}")]
BadRequest(anyhow::Error), BadRequest(anyhow::Error),
/// Catch all for error we dont care to expose publicly. /// Catch all for error we don't care to expose publicly.
#[error("Internal error")] #[error("Internal error")]
Anyhow(#[from] anyhow::Error), Anyhow(#[from] anyhow::Error),
} }

View File

@ -1,8 +1,10 @@
use anyhow::anyhow;
use entity::{expenditure, transaction}; use entity::{expenditure, transaction};
use sea_orm::sea_query::OnConflict; use sea_orm::sea_query::OnConflict;
use sea_orm::{ConnectionTrait, DatabaseBackend, QueryFilter, QueryTrait, Statement}; use sea_orm::{ConnectionTrait, DatabaseBackend, QueryFilter, QueryTrait, Statement};
use sea_orm::{ColumnTrait, DatabaseConnection, DbErr, EntityTrait, Iterable, TransactionTrait}; use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, Iterable, TransactionTrait};
use migration::PostgresQueryBuilder; use migration::PostgresQueryBuilder;
use crate::error::AppError;
pub struct Insertion { pub struct Insertion {
pub transaction: transaction::ActiveModel, pub transaction: transaction::ActiveModel,
@ -12,7 +14,7 @@ pub struct Insertion {
// Note while this is more efficient in db calls, it does bind together the entire group. // Note while this is more efficient in db calls, it does bind together the entire group.
// We employ a batching process for now to try balance speed and failure rate, but it is worth // We employ a batching process for now to try balance speed and failure rate, but it is worth
// trying to move failures earlier and improve reporting. // trying to move failures earlier and improve reporting.
pub async fn insert(db: &DatabaseConnection, insertions: Vec<Insertion>) -> Result<Vec<String>, DbErr> { pub async fn insert(db: &DatabaseConnection, insertions: Vec<Insertion>) -> Result<Vec<String>, AppError> {
let mut new_transaction_ids = Vec::new(); let mut new_transaction_ids = Vec::new();
for insertions in insertions.chunks(400) { for insertions in insertions.chunks(400) {
@ -37,10 +39,8 @@ pub async fn insert(db: &DatabaseConnection, insertions: Vec<Insertion>) -> Resu
.map(|r| r.try_get_by("id")) .map(|r| r.try_get_by("id"))
.collect::<Result<Vec<String>, _>>()?; .collect::<Result<Vec<String>, _>>()?;
new_transaction_ids.extend(inserted_transaction_ids); // Expenditures can change as we re-categorise them, so we delete all the old ones and
// insert an entirely new set to ensure we don't end up leaving old ones around.
// Expenditures can change as we recagegorise them, so we delete all the old ones and insert
// an entirely new set to ensure we don't end up leaving old ones around.
expenditure::Entity::delete_many() expenditure::Entity::delete_many()
.filter( .filter(
expenditure::Column::TransactionId expenditure::Column::TransactionId
@ -67,7 +67,19 @@ pub async fn insert(db: &DatabaseConnection, insertions: Vec<Insertion>) -> Resu
.await?; .await?;
tx.commit().await?; tx.commit().await?;
new_transaction_ids.extend(inserted_transaction_ids);
} }
let payload = serde_json::to_string(&new_transaction_ids)
.map_err(|e| anyhow!(e))?;
db.execute(
Statement::from_sql_and_values(
DatabaseBackend::Postgres,
"NOTIFY monzo_new_transactions, $1",
vec![sea_orm::Value::from(payload)],
)
).await?;
Ok(new_transaction_ids) Ok(new_transaction_ids)
} }