Compare commits
2 Commits
61de38b9bf
...
046ce44d23
| Author | SHA1 | Date | |
|---|---|---|---|
| 046ce44d23 | |||
| f70d844ff3 |
@ -6,37 +6,51 @@ pub struct Migration;
|
|||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
impl MigrationTrait for Migration {
|
impl MigrationTrait for Migration {
|
||||||
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||||
manager.alter_table(
|
manager
|
||||||
TableAlterStatement::new()
|
.alter_table(
|
||||||
.table(Transaction::Table)
|
TableAlterStatement::new()
|
||||||
.modify_column(ColumnDef::new(Transaction::Title).string().null())
|
.table(Transaction::Table)
|
||||||
.to_owned()
|
.modify_column(ColumnDef::new(Transaction::Title).string().null())
|
||||||
).await?;
|
.to_owned(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
// Set all empty string titles to null
|
// Set all empty string titles to null
|
||||||
manager.get_connection().execute_unprepared(r#"
|
manager
|
||||||
|
.get_connection()
|
||||||
|
.execute_unprepared(
|
||||||
|
r#"
|
||||||
update transaction
|
update transaction
|
||||||
set title = null
|
set title = null
|
||||||
where title = ''
|
where title = ''
|
||||||
"#).await?;
|
"#,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||||
// Set all null titles to empty string when reverting
|
// Set all null titles to empty string when reverting
|
||||||
manager.get_connection().execute_unprepared(r#"
|
manager
|
||||||
|
.get_connection()
|
||||||
|
.execute_unprepared(
|
||||||
|
r#"
|
||||||
update transaction
|
update transaction
|
||||||
set title = ''
|
set title = ''
|
||||||
where title is null
|
where title is null
|
||||||
"#).await?;
|
"#,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
manager.alter_table(
|
manager
|
||||||
TableAlterStatement::new()
|
.alter_table(
|
||||||
.table(Transaction::Table)
|
TableAlterStatement::new()
|
||||||
.modify_column(ColumnDef::new(Transaction::Title).string().not_null())
|
.table(Transaction::Table)
|
||||||
.to_owned()
|
.modify_column(ColumnDef::new(Transaction::Title).string().not_null())
|
||||||
).await
|
.to_owned(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -4,7 +4,7 @@ use anyhow::anyhow;
|
|||||||
use entity::{expenditure, transaction};
|
use entity::{expenditure, transaction};
|
||||||
use sea_orm::sea_query::OnConflict;
|
use sea_orm::sea_query::OnConflict;
|
||||||
use sea_orm::{
|
use sea_orm::{
|
||||||
ColumnTrait, DatabaseConnection, EntityTrait, Iterable, QuerySelect, TransactionTrait,
|
ColumnTrait, DatabaseConnection, DbErr, EntityTrait, Iterable, QuerySelect, TransactionTrait,
|
||||||
};
|
};
|
||||||
use sea_orm::{ConnectionTrait, DatabaseTransaction, QueryFilter};
|
use sea_orm::{ConnectionTrait, DatabaseTransaction, QueryFilter};
|
||||||
|
|
||||||
@ -28,10 +28,14 @@ pub async fn insert(
|
|||||||
.collect::<Result<Vec<_>, _>>()?;
|
.collect::<Result<Vec<_>, _>>()?;
|
||||||
|
|
||||||
for insertions in insertions.chunks(400) {
|
for insertions in insertions.chunks(400) {
|
||||||
let tx = db.begin().await?;
|
|
||||||
let (new_or_updated_insertions, inserted_transaction_ids) =
|
let (new_or_updated_insertions, inserted_transaction_ids) =
|
||||||
whittle_insertions(insertions, &tx).await?;
|
whittle_insertions(insertions, &db).await?;
|
||||||
|
|
||||||
|
if new_or_updated_insertions.is_empty() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let tx = db.begin().await?;
|
||||||
update_transactions(&tx, &new_or_updated_insertions).await?;
|
update_transactions(&tx, &new_or_updated_insertions).await?;
|
||||||
update_expenditures(&tx, &new_or_updated_insertions, &inserted_transaction_ids).await?;
|
update_expenditures(&tx, &new_or_updated_insertions, &inserted_transaction_ids).await?;
|
||||||
tx.commit().await?;
|
tx.commit().await?;
|
||||||
@ -52,6 +56,10 @@ async fn update_expenditures(
|
|||||||
new_or_updated_insertions: &[&Insertion],
|
new_or_updated_insertions: &[&Insertion],
|
||||||
inserted_transaction_ids: &[String],
|
inserted_transaction_ids: &[String],
|
||||||
) -> Result<(), AppError> {
|
) -> Result<(), AppError> {
|
||||||
|
if new_or_updated_insertions.is_empty() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
// Expenditures can change as we re-categorise them, so we delete all the old ones and
|
// Expenditures can change as we re-categorise them, so we delete all the old ones and
|
||||||
// insert an entirely new set to ensure we don't end up leaving old ones around.
|
// insert an entirely new set to ensure we don't end up leaving old ones around.
|
||||||
expenditure::Entity::delete_many()
|
expenditure::Entity::delete_many()
|
||||||
@ -81,7 +89,11 @@ async fn update_expenditures(
|
|||||||
async fn update_transactions(
|
async fn update_transactions(
|
||||||
tx: &DatabaseTransaction,
|
tx: &DatabaseTransaction,
|
||||||
new_or_updated_insertions: &[&Insertion],
|
new_or_updated_insertions: &[&Insertion],
|
||||||
) -> Result<(), AppError> {
|
) -> Result<(), DbErr> {
|
||||||
|
if new_or_updated_insertions.is_empty() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
let transactions = new_or_updated_insertions
|
let transactions = new_or_updated_insertions
|
||||||
.iter()
|
.iter()
|
||||||
.map(|i| &i.transaction)
|
.map(|i| &i.transaction)
|
||||||
@ -100,7 +112,7 @@ async fn update_transactions(
|
|||||||
|
|
||||||
async fn whittle_insertions<'a>(
|
async fn whittle_insertions<'a>(
|
||||||
insertions: &'a [Insertion],
|
insertions: &'a [Insertion],
|
||||||
tx: &DatabaseTransaction,
|
tx: &DatabaseConnection,
|
||||||
) -> Result<(Vec<&'a Insertion>, Vec<String>), AppError> {
|
) -> Result<(Vec<&'a Insertion>, Vec<String>), AppError> {
|
||||||
let existing_hashes = transaction::Entity::find()
|
let existing_hashes = transaction::Entity::find()
|
||||||
.select_only()
|
.select_only()
|
||||||
@ -143,10 +155,10 @@ async fn notify_new_transactions(
|
|||||||
}
|
}
|
||||||
|
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::notify_new_transactions;
|
use super::{notify_new_transactions, update_expenditures, update_transactions};
|
||||||
use anyhow::Error;
|
use anyhow::Error;
|
||||||
use migration::MigratorTrait;
|
use migration::MigratorTrait;
|
||||||
use sea_orm::DatabaseConnection;
|
use sea_orm::{DatabaseConnection, TransactionTrait};
|
||||||
use sqlx::postgres::PgListener;
|
use sqlx::postgres::PgListener;
|
||||||
use sqlx::PgPool;
|
use sqlx::PgPool;
|
||||||
use testcontainers::runners::AsyncRunner;
|
use testcontainers::runners::AsyncRunner;
|
||||||
@ -178,6 +190,18 @@ mod tests {
|
|||||||
Ok((container, db, pool))
|
Ok((container, db, pool))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_no_new_insertions() -> Result<(), Error> {
|
||||||
|
let (_container, db, _pool) = initialise().await?;
|
||||||
|
let insertions = vec![];
|
||||||
|
let tx = db.begin().await?;
|
||||||
|
update_transactions(&tx, &insertions).await?;
|
||||||
|
update_expenditures(&tx, &insertions, &vec![]).await?;
|
||||||
|
tx.commit().await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_notify() -> Result<(), Error> {
|
async fn test_notify() -> Result<(), Error> {
|
||||||
let (_container, db, pool) = initialise().await?;
|
let (_container, db, pool) = initialise().await?;
|
||||||
|
|||||||
@ -174,12 +174,14 @@ fn test_json() {
|
|||||||
let json: Vec<Vec<Value>> = serde_json::from_str(json).unwrap();
|
let json: Vec<Vec<Value>> = serde_json::from_str(json).unwrap();
|
||||||
let mut csv_reader = csv::Reader::from_reader(csv.as_bytes());
|
let mut csv_reader = csv::Reader::from_reader(csv.as_bytes());
|
||||||
|
|
||||||
let json_rows = json.iter()
|
let json_rows = json
|
||||||
|
.iter()
|
||||||
.map(|row| from_json_row(row.clone()))
|
.map(|row| from_json_row(row.clone()))
|
||||||
.collect::<Result<Vec<_>, anyhow::Error>>()
|
.collect::<Result<Vec<_>, anyhow::Error>>()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let csv_rows = csv_reader.records()
|
let csv_rows = csv_reader
|
||||||
|
.records()
|
||||||
.map(|record| from_csv_row(record.unwrap()))
|
.map(|record| from_csv_row(record.unwrap()))
|
||||||
.collect::<Result<Vec<_>, anyhow::Error>>()
|
.collect::<Result<Vec<_>, anyhow::Error>>()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
@ -188,7 +190,12 @@ fn test_json() {
|
|||||||
|
|
||||||
for (i, (json_row, csv_row)) in json_rows.iter().zip(csv_rows.iter()).enumerate() {
|
for (i, (json_row, csv_row)) in json_rows.iter().zip(csv_rows.iter()).enumerate() {
|
||||||
assert_eq!(json_row, csv_row, "Row {} is different", i);
|
assert_eq!(json_row, csv_row, "Row {} is different", i);
|
||||||
assert_eq!(json_row.compute_hash(), csv_row.compute_hash(), "Row {} hash are different", i);
|
assert_eq!(
|
||||||
|
json_row.compute_hash(),
|
||||||
|
csv_row.compute_hash(),
|
||||||
|
"Row {} hash are different",
|
||||||
|
i
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user