diff --git a/src/error.rs b/src/error.rs index af1787d..1293f8f 100644 --- a/src/error.rs +++ b/src/error.rs @@ -12,7 +12,7 @@ pub enum AppError { #[error("Invalid request {0}")] BadRequest(anyhow::Error), - /// Catch all for error we dont care to expose publicly. + /// Catch all for error we don't care to expose publicly. #[error("Internal error")] Anyhow(#[from] anyhow::Error), } diff --git a/src/ingestion/db.rs b/src/ingestion/db.rs index b592692..9ea51cc 100644 --- a/src/ingestion/db.rs +++ b/src/ingestion/db.rs @@ -1,8 +1,10 @@ +use anyhow::anyhow; use entity::{expenditure, transaction}; use sea_orm::sea_query::OnConflict; use sea_orm::{ConnectionTrait, DatabaseBackend, QueryFilter, QueryTrait, Statement}; -use sea_orm::{ColumnTrait, DatabaseConnection, DbErr, EntityTrait, Iterable, TransactionTrait}; +use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, Iterable, TransactionTrait}; use migration::PostgresQueryBuilder; +use crate::error::AppError; pub struct Insertion { pub transaction: transaction::ActiveModel, @@ -12,7 +14,7 @@ pub struct Insertion { // Note while this is more efficient in db calls, it does bind together the entire group. // We employ a batching process for now to try balance speed and failure rate, but it is worth // trying to move failures earlier and improve reporting. -pub async fn insert(db: &DatabaseConnection, insertions: Vec) -> Result, DbErr> { +pub async fn insert(db: &DatabaseConnection, insertions: Vec) -> Result, AppError> { let mut new_transaction_ids = Vec::new(); for insertions in insertions.chunks(400) { @@ -37,10 +39,8 @@ pub async fn insert(db: &DatabaseConnection, insertions: Vec) -> Resu .map(|r| r.try_get_by("id")) .collect::, _>>()?; - new_transaction_ids.extend(inserted_transaction_ids); - - // Expenditures can change as we recagegorise them, so we delete all the old ones and insert - // an entirely new set to ensure we don't end up leaving old ones around. + // Expenditures can change as we re-categorise them, so we delete all the old ones and + // insert an entirely new set to ensure we don't end up leaving old ones around. expenditure::Entity::delete_many() .filter( expenditure::Column::TransactionId @@ -67,7 +67,19 @@ pub async fn insert(db: &DatabaseConnection, insertions: Vec) -> Resu .await?; tx.commit().await?; + new_transaction_ids.extend(inserted_transaction_ids); } + let payload = serde_json::to_string(&new_transaction_ids) + .map_err(|e| anyhow!(e))?; + + db.execute( + Statement::from_sql_and_values( + DatabaseBackend::Postgres, + "NOTIFY monzo_new_transactions, $1", + vec![sea_orm::Value::from(payload)], + ) + ).await?; + Ok(new_transaction_ids) }