From 8ce60ce2788675cc69a9b1f27eea61da1e2bcf66 Mon Sep 17 00:00:00 2001 From: Joshua Coles Date: Tue, 28 May 2024 17:38:45 +0100 Subject: [PATCH] `rustfmt` --- src/ingestion/db.rs | 85 ++++++++++++++++++++++++++------------------- 1 file changed, 49 insertions(+), 36 deletions(-) diff --git a/src/ingestion/db.rs b/src/ingestion/db.rs index dc69812..720b40b 100644 --- a/src/ingestion/db.rs +++ b/src/ingestion/db.rs @@ -1,10 +1,13 @@ +use crate::error::AppError; use anyhow::anyhow; use entity::{expenditure, transaction}; -use sea_orm::sea_query::OnConflict; -use sea_orm::{ConnectionTrait, DatabaseBackend, DatabaseTransaction, DbErr, QueryFilter, QueryTrait, Statement}; -use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, Iterable, TransactionTrait}; use migration::PostgresQueryBuilder; -use crate::error::AppError; +use sea_orm::sea_query::OnConflict; +use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, Iterable, TransactionTrait}; +use sea_orm::{ + ConnectionTrait, DatabaseBackend, DatabaseTransaction, DbErr, QueryFilter, QueryTrait, + Statement, +}; pub struct Insertion { pub transaction: transaction::ActiveModel, @@ -31,16 +34,16 @@ async fn update_expenditures( .flat_map(|i| &i.contained_expenditures) .cloned(), ) - .on_conflict( - OnConflict::columns(vec![ - expenditure::Column::TransactionId, - expenditure::Column::Category, - ]) - .update_columns(expenditure::Column::iter()) - .to_owned(), - ) - .exec(tx) - .await?; + .on_conflict( + OnConflict::columns(vec![ + expenditure::Column::TransactionId, + expenditure::Column::Category, + ]) + .update_columns(expenditure::Column::iter()) + .to_owned(), + ) + .exec(tx) + .await?; Ok(()) } @@ -48,7 +51,10 @@ async fn update_expenditures( // Note while this is more efficient in db calls, it does bind together the entire group. // We employ a batching process for now to try balance speed and failure rate, but it is worth // trying to move failures earlier and improve reporting. -pub async fn insert(db: &DatabaseConnection, insertions: Vec) -> Result, AppError> { +pub async fn insert( + db: &DatabaseConnection, + insertions: Vec, +) -> Result, AppError> { let mut new_transaction_ids = Vec::new(); for insertions in insertions.chunks(400) { @@ -68,35 +74,42 @@ pub async fn insert(db: &DatabaseConnection, insertions: Vec) -> Resu Ok(new_transaction_ids) } -async fn update_transactions(insertions: &[Insertion], tx: &DatabaseTransaction) -> Result, AppError> { - let insert = transaction::Entity::insert_many(insertions.iter().map(|i| &i.transaction).cloned()) - .on_conflict( - OnConflict::column(transaction::Column::Id) - .update_columns(transaction::Column::iter()) - .to_owned(), - ) - .into_query() - .returning_col(transaction::Column::Id) - .build(PostgresQueryBuilder); +async fn update_transactions( + insertions: &[Insertion], + tx: &DatabaseTransaction, +) -> Result, AppError> { + let insert = + transaction::Entity::insert_many(insertions.iter().map(|i| &i.transaction).cloned()) + .on_conflict( + OnConflict::column(transaction::Column::Id) + .update_columns(transaction::Column::iter()) + .to_owned(), + ) + .into_query() + .returning_col(transaction::Column::Id) + .build(PostgresQueryBuilder); - let inserted_transaction_ids = tx.query_all(Statement::from_sql_and_values( - DatabaseBackend::Postgres, - insert.0, - insert.1, - )).await? + let inserted_transaction_ids = tx + .query_all(Statement::from_sql_and_values( + DatabaseBackend::Postgres, + insert.0, + insert.1, + )) + .await? .iter() .map(|r| r.try_get_by("id")) .collect::, _>>()?; Ok(inserted_transaction_ids) } -async fn notify_new_transactions(db: &DatabaseConnection, new_transaction_ids: &[String]) -> Result<(), AppError> { - let payload = serde_json::to_string(&new_transaction_ids) - .map_err(|e| anyhow!(e))?; +async fn notify_new_transactions( + db: &DatabaseConnection, + new_transaction_ids: &[String], +) -> Result<(), AppError> { + let payload = serde_json::to_string(&new_transaction_ids).map_err(|e| anyhow!(e))?; - db.execute_unprepared( - &format!(r#"NOTIFY monzo_new_transactions, {payload}"#, ), - ).await?; + db.execute_unprepared(&format!(r#"NOTIFY monzo_new_transactions, {payload}"#,)) + .await?; Ok(()) }