Split up the db methods and move to unprepared statements for the notify call
				
					
				
			This commit is contained in:
		
							parent
							
								
									848da0f5f3
								
							
						
					
					
						commit
						fbf473b3b4
					
				| @ -1,7 +1,7 @@ | |||||||
| use anyhow::anyhow; | use anyhow::anyhow; | ||||||
| use entity::{expenditure, transaction}; | use entity::{expenditure, transaction}; | ||||||
| use sea_orm::sea_query::OnConflict; | use sea_orm::sea_query::OnConflict; | ||||||
| use sea_orm::{ConnectionTrait, DatabaseBackend, QueryFilter, QueryTrait, Statement}; | use sea_orm::{ConnectionTrait, DatabaseBackend, DatabaseTransaction, DbErr, QueryFilter, QueryTrait, Statement}; | ||||||
| use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, Iterable, TransactionTrait}; | use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, Iterable, TransactionTrait}; | ||||||
| use migration::PostgresQueryBuilder; | use migration::PostgresQueryBuilder; | ||||||
| use crate::error::AppError; | use crate::error::AppError; | ||||||
| @ -11,6 +11,40 @@ pub struct Insertion { | |||||||
|     pub contained_expenditures: Vec<expenditure::ActiveModel>, |     pub contained_expenditures: Vec<expenditure::ActiveModel>, | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | async fn update_expenditures( | ||||||
|  |     tx: &DatabaseTransaction, | ||||||
|  |     insertions: &[Insertion], | ||||||
|  | ) -> Result<(), DbErr> { | ||||||
|  |     // Expenditures can change as we re-categorise them, so we delete all the old ones and
 | ||||||
|  |     // insert an entirely new set to ensure we don't end up leaving old ones around.
 | ||||||
|  |     expenditure::Entity::delete_many() | ||||||
|  |         .filter( | ||||||
|  |             expenditure::Column::TransactionId | ||||||
|  |                 .is_in(insertions.iter().map(|i| i.transaction.id.as_ref())), | ||||||
|  |         ) | ||||||
|  |         .exec(tx) | ||||||
|  |         .await?; | ||||||
|  | 
 | ||||||
|  |     expenditure::Entity::insert_many( | ||||||
|  |         insertions | ||||||
|  |             .iter() | ||||||
|  |             .flat_map(|i| &i.contained_expenditures) | ||||||
|  |             .cloned(), | ||||||
|  |     ) | ||||||
|  |         .on_conflict( | ||||||
|  |             OnConflict::columns(vec![ | ||||||
|  |                 expenditure::Column::TransactionId, | ||||||
|  |                 expenditure::Column::Category, | ||||||
|  |             ]) | ||||||
|  |                 .update_columns(expenditure::Column::iter()) | ||||||
|  |                 .to_owned(), | ||||||
|  |         ) | ||||||
|  |         .exec(tx) | ||||||
|  |         .await?; | ||||||
|  | 
 | ||||||
|  |     Ok(()) | ||||||
|  | } | ||||||
|  | 
 | ||||||
| // Note while this is more efficient in db calls, it does bind together the entire group.
 | // Note while this is more efficient in db calls, it does bind together the entire group.
 | ||||||
| // We employ a batching process for now to try balance speed and failure rate, but it is worth
 | // We employ a batching process for now to try balance speed and failure rate, but it is worth
 | ||||||
| // trying to move failures earlier and improve reporting.
 | // trying to move failures earlier and improve reporting.
 | ||||||
| @ -19,7 +53,22 @@ pub async fn insert(db: &DatabaseConnection, insertions: Vec<Insertion>) -> Resu | |||||||
| 
 | 
 | ||||||
|     for insertions in insertions.chunks(400) { |     for insertions in insertions.chunks(400) { | ||||||
|         let tx = db.begin().await?; |         let tx = db.begin().await?; | ||||||
|  |         let inserted_transaction_ids = update_transactions(insertions, &tx).await?; | ||||||
|  |         update_expenditures(&tx, &insertions).await?; | ||||||
|  |         tx.commit().await?; | ||||||
| 
 | 
 | ||||||
|  |         // We wait until the transaction is committed before adding the new transaction ids to the
 | ||||||
|  |         // list to avoid issues with the transaction being rolled back.
 | ||||||
|  |         new_transaction_ids.extend(inserted_transaction_ids); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     // Notify the new transactions once everything is committed.
 | ||||||
|  |     notify_new_transactions(db, &new_transaction_ids).await?; | ||||||
|  | 
 | ||||||
|  |     Ok(new_transaction_ids) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | async fn update_transactions(insertions: &[Insertion], tx: &DatabaseTransaction) -> Result<Vec<String>, AppError> { | ||||||
|     let insert = transaction::Entity::insert_many(insertions.iter().map(|i| &i.transaction).cloned()) |     let insert = transaction::Entity::insert_many(insertions.iter().map(|i| &i.transaction).cloned()) | ||||||
|         .on_conflict( |         .on_conflict( | ||||||
|             OnConflict::column(transaction::Column::Id) |             OnConflict::column(transaction::Column::Id) | ||||||
| @ -38,48 +87,16 @@ pub async fn insert(db: &DatabaseConnection, insertions: Vec<Insertion>) -> Resu | |||||||
|         .iter() |         .iter() | ||||||
|         .map(|r| r.try_get_by("id")) |         .map(|r| r.try_get_by("id")) | ||||||
|         .collect::<Result<Vec<String>, _>>()?; |         .collect::<Result<Vec<String>, _>>()?; | ||||||
| 
 |     Ok(inserted_transaction_ids) | ||||||
|         // Expenditures can change as we re-categorise them, so we delete all the old ones and
 |  | ||||||
|         // insert an entirely new set to ensure we don't end up leaving old ones around.
 |  | ||||||
|         expenditure::Entity::delete_many() |  | ||||||
|             .filter( |  | ||||||
|                 expenditure::Column::TransactionId |  | ||||||
|                     .is_in(insertions.iter().map(|i| i.transaction.id.as_ref())), |  | ||||||
|             ) |  | ||||||
|             .exec(&tx) |  | ||||||
|             .await?; |  | ||||||
| 
 |  | ||||||
|         expenditure::Entity::insert_many( |  | ||||||
|             insertions |  | ||||||
|                 .iter() |  | ||||||
|                 .flat_map(|i| &i.contained_expenditures) |  | ||||||
|                 .cloned(), |  | ||||||
|         ) |  | ||||||
|         .on_conflict( |  | ||||||
|             OnConflict::columns(vec![ |  | ||||||
|                 expenditure::Column::TransactionId, |  | ||||||
|                 expenditure::Column::Category, |  | ||||||
|             ]) |  | ||||||
|             .update_columns(expenditure::Column::iter()) |  | ||||||
|             .to_owned(), |  | ||||||
|         ) |  | ||||||
|         .exec(&tx) |  | ||||||
|         .await?; |  | ||||||
| 
 |  | ||||||
|         tx.commit().await?; |  | ||||||
|         new_transaction_ids.extend(inserted_transaction_ids); |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | async fn notify_new_transactions(db: &DatabaseConnection, new_transaction_ids: &[String]) -> Result<(), AppError> { | ||||||
|     let payload = serde_json::to_string(&new_transaction_ids) |     let payload = serde_json::to_string(&new_transaction_ids) | ||||||
|         .map_err(|e| anyhow!(e))?; |         .map_err(|e| anyhow!(e))?; | ||||||
| 
 | 
 | ||||||
|     db.execute( |     db.execute_unprepared( | ||||||
|         Statement::from_sql_and_values( |         &format!(r#"NOTIFY monzo_new_transactions, {payload}"#, ), | ||||||
|             DatabaseBackend::Postgres, |  | ||||||
|             "NOTIFY monzo_new_transactions, $1", |  | ||||||
|             vec![sea_orm::Value::from(payload)], |  | ||||||
|         ) |  | ||||||
|     ).await?; |     ).await?; | ||||||
| 
 | 
 | ||||||
|     Ok(new_transaction_ids) |     Ok(()) | ||||||
| } | } | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user