Compare commits
No commits in common. "d11e4fd0c4c3f90d52380a82915ece257a2fafae" and "bc4aa8242cfb3f64890c362a32c3e8394050edee" have entirely different histories.
d11e4fd0c4
...
bc4aa8242c
2201
Cargo.lock
generated
2201
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
10
Cargo.toml
10
Cargo.toml
@ -7,8 +7,8 @@ edition = "2021"
|
||||
entity = { path = "entity" }
|
||||
migration = { path = "migration" }
|
||||
|
||||
axum = { version = "0.8.8", features = ["multipart"] }
|
||||
tokio = { version = "1.48.0", features = ["full"] }
|
||||
axum = { version = "0.7.5", features = ["multipart"] }
|
||||
tokio = { version = "1.37.0", features = ["full"] }
|
||||
sea-orm = { version = "1.1.0", features = [
|
||||
"sqlx-postgres",
|
||||
"runtime-tokio-rustls",
|
||||
@ -20,14 +20,14 @@ serde_json = "1.0"
|
||||
tracing-subscriber = "0.3.18"
|
||||
tracing = "0.1.40"
|
||||
anyhow = { version = "1.0", features = ["backtrace"] }
|
||||
thiserror = "2.0"
|
||||
thiserror = "1.0"
|
||||
http = "1.1"
|
||||
chrono = { version = "0.4", features = ["serde"] }
|
||||
num-traits = "0.2"
|
||||
csv = "1.3.0"
|
||||
clap = "4.5"
|
||||
testcontainers = "0.26"
|
||||
testcontainers-modules = { version = "0.14", features = ["postgres"] }
|
||||
testcontainers = "0.21"
|
||||
testcontainers-modules = { version = "0.9", features = ["postgres"] }
|
||||
sqlx = { version = "0.8", features = ["postgres"] }
|
||||
tower-http = { version = "0.6", features = ["trace"] }
|
||||
bytes = "1.7"
|
||||
|
||||
@ -74,16 +74,16 @@ async fn update_expenditures(
|
||||
.flat_map(|i| &i.contained_expenditures)
|
||||
.cloned(),
|
||||
)
|
||||
.on_conflict(
|
||||
OnConflict::columns(vec![
|
||||
expenditure::Column::TransactionId,
|
||||
expenditure::Column::Category,
|
||||
])
|
||||
.update_columns(expenditure::Column::iter())
|
||||
.to_owned(),
|
||||
)
|
||||
.exec(tx)
|
||||
.await?;
|
||||
.on_conflict(
|
||||
OnConflict::columns(vec![
|
||||
expenditure::Column::TransactionId,
|
||||
expenditure::Column::Category,
|
||||
])
|
||||
.update_columns(expenditure::Column::iter())
|
||||
.to_owned(),
|
||||
)
|
||||
.exec(tx)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -119,7 +119,7 @@ async fn whittle_insertions<'a>(
|
||||
.select_only()
|
||||
.columns([transaction::Column::IdentityHash])
|
||||
.filter(transaction::Column::IdentityHash.is_not_null())
|
||||
.into_tuple::<(i64,)>()
|
||||
.into_tuple::<(i64, )>()
|
||||
.all(tx)
|
||||
.await?;
|
||||
|
||||
@ -133,7 +133,7 @@ async fn whittle_insertions<'a>(
|
||||
let hash = i.identity_hash;
|
||||
!existing_hashes
|
||||
.iter()
|
||||
.any(|(existing_hash,)| *existing_hash == hash)
|
||||
.any(|(existing_hash, )| *existing_hash == hash)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
@ -159,17 +159,16 @@ async fn notify_new_transactions(
|
||||
|
||||
mod tests {
|
||||
use super::{insert, notify_new_transactions, update_expenditures, update_transactions};
|
||||
use crate::ingestion::ingestion_logic::from_json_row;
|
||||
use anyhow::Error;
|
||||
use entity::account;
|
||||
use tokio::sync::OnceCell;
|
||||
use migration::MigratorTrait;
|
||||
use sea_orm::{ActiveModelTrait, DatabaseConnection, TransactionTrait};
|
||||
use sea_orm::{DatabaseConnection, TransactionTrait};
|
||||
use serde_json::Value;
|
||||
use sqlx::postgres::PgListener;
|
||||
use sqlx::{Executor, PgPool};
|
||||
use sqlx::PgPool;
|
||||
use testcontainers::runners::AsyncRunner;
|
||||
use testcontainers::ContainerAsync;
|
||||
use tokio::sync::OnceCell;
|
||||
use crate::ingestion::ingestion_logic::from_json_row;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct DatabaseInstance {
|
||||
@ -204,19 +203,13 @@ mod tests {
|
||||
Ok(instance)
|
||||
}
|
||||
|
||||
async fn get_or_initialize_db_instance() -> Result<&'static DatabaseInstance, Error> {
|
||||
Ok(INSTANCE
|
||||
.get_or_init(|| async { initialise_db().await.unwrap() })
|
||||
.await)
|
||||
}
|
||||
|
||||
async fn create_test_account(db: &DatabaseConnection) -> Result<i32, Error> {
|
||||
let new_account = account::ActiveModel {
|
||||
id: sea_orm::ActiveValue::NotSet,
|
||||
name: sea_orm::ActiveValue::Set("Test Account".to_string()),
|
||||
};
|
||||
let inserted = new_account.insert(db).await?;
|
||||
Ok(inserted.id)
|
||||
async fn get_or_initialize_db_instance() -> Result<
|
||||
&'static DatabaseInstance,
|
||||
Error,
|
||||
> {
|
||||
Ok(INSTANCE.get_or_init(|| async {
|
||||
initialise_db().await.unwrap()
|
||||
}).await)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@ -260,7 +253,6 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_notify_on_insert() -> Result<(), Error> {
|
||||
let dbi = get_or_initialize_db_instance().await?;
|
||||
let account_id = create_test_account(&dbi.db).await?;
|
||||
let mut listener = PgListener::connect_with(&dbi.pool).await?;
|
||||
listener.listen("monzo_new_transactions").await?;
|
||||
|
||||
@ -268,17 +260,17 @@ mod tests {
|
||||
let json: Vec<Vec<Value>> = serde_json::from_str(json).unwrap();
|
||||
let data = json
|
||||
.iter()
|
||||
.map(|row| from_json_row(row))
|
||||
.collect::<Result<Vec<_>, anyhow::Error>>()?;
|
||||
.map(|row| from_json_row(row.clone()))
|
||||
.collect::<Result<Vec<_>, anyhow::Error>>()
|
||||
.unwrap();
|
||||
|
||||
insert(&dbi.db, data.clone(), account_id).await?;
|
||||
insert(&dbi.db, data.clone(), ).await?;
|
||||
let notification = listener.recv().await?;
|
||||
let payload = notification.payload();
|
||||
let mut payload = serde_json::from_str::<Vec<String>>(&payload)?;
|
||||
payload.sort();
|
||||
|
||||
let mut ids = data
|
||||
.iter()
|
||||
let mut ids = data.iter()
|
||||
.map(|row| row.transaction_id.clone())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
@ -286,43 +278,28 @@ mod tests {
|
||||
|
||||
assert_eq!(payload, ids, "Inserted IDs do not match");
|
||||
|
||||
insert(&dbi.db, data.clone(), account_id).await?;
|
||||
insert(&dbi.db, data.clone(), ).await?;
|
||||
let notification = listener.recv().await?;
|
||||
let payload = notification.payload();
|
||||
let payload = serde_json::from_str::<Vec<String>>(&payload)?;
|
||||
assert_eq!(
|
||||
payload,
|
||||
Vec::<String>::new(),
|
||||
"Re-inserting identical rows triggered double notification"
|
||||
);
|
||||
assert_eq!(payload, Vec::<String>::new(), "Re-inserting identical rows triggered double notification");
|
||||
|
||||
let mut altered_data = data.clone();
|
||||
altered_data[0].description = Some("New description".to_string());
|
||||
|
||||
assert_ne!(
|
||||
altered_data[0].compute_hash(),
|
||||
data[0].compute_hash(),
|
||||
"Alterations have the same hash"
|
||||
);
|
||||
assert_ne!(altered_data[0].compute_hash(), data[0].compute_hash(), "Alterations have the same hash");
|
||||
|
||||
insert(&dbi.db, altered_data.clone(), account_id).await?;
|
||||
insert(&dbi.db, altered_data.clone(), ).await?;
|
||||
let notification = listener.recv().await?;
|
||||
let payload = notification.payload();
|
||||
let payload = serde_json::from_str::<Vec<String>>(&payload)?;
|
||||
assert_eq!(
|
||||
payload,
|
||||
vec![altered_data[0].transaction_id.clone()],
|
||||
"Re-inserting altered row failed to re-trigger notification"
|
||||
);
|
||||
assert_eq!(payload, vec![altered_data[0].transaction_id.clone()], "Re-inserting altered row failed to re-trigger notification");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn get_account_id(
|
||||
p0: &DatabaseConnection,
|
||||
p1: Option<String>,
|
||||
) -> Result<i32, AppError> {
|
||||
pub(crate) async fn get_account_id(p0: &DatabaseConnection, p1: Option<String>) -> Result<i32, AppError> {
|
||||
let p1 = p1.unwrap_or("Monzo".to_string());
|
||||
|
||||
entity::prelude::Account::find()
|
||||
@ -330,7 +307,6 @@ pub(crate) async fn get_account_id(
|
||||
.select_only()
|
||||
.column(entity::account::Column::Id)
|
||||
.into_tuple::<i32>()
|
||||
.one(p0)
|
||||
.await?
|
||||
.one(p0).await?
|
||||
.ok_or(AppError::BadRequest(anyhow!("Account not found")))
|
||||
}
|
||||
@ -140,7 +140,7 @@ fn parse_timestamp(date: &str, time: &str) -> anyhow::Result<NaiveDateTime> {
|
||||
Ok(date.and_time(time))
|
||||
}
|
||||
|
||||
pub fn from_json_row(row: &[Value]) -> anyhow::Result<MonzoRow> {
|
||||
pub fn from_json_row(row: Vec<Value>) -> anyhow::Result<MonzoRow> {
|
||||
let date = DateTime::parse_from_rfc3339(row[headings::DATE].as_str().context("No date")?)
|
||||
.context("Failed to parse date")?;
|
||||
|
||||
@ -178,7 +178,7 @@ fn test_json() {
|
||||
|
||||
let json_rows = json
|
||||
.iter()
|
||||
.map(|row| from_json_row(&row))
|
||||
.map(|row| from_json_row(row.clone()))
|
||||
.collect::<Result<Vec<_>, anyhow::Error>>()
|
||||
.unwrap();
|
||||
|
||||
|
||||
@ -9,53 +9,24 @@ use sea_orm::DatabaseConnection;
|
||||
use serde_json::Value;
|
||||
use std::io::Cursor;
|
||||
|
||||
#[derive(serde::Deserialize, Debug)]
|
||||
#[serde(untagged)]
|
||||
pub enum MonzoBatchedJsonInput {
|
||||
Legacy(Vec<Vec<Value>>),
|
||||
New {
|
||||
account_id: Option<u8>,
|
||||
rows: Vec<Vec<Value>>,
|
||||
},
|
||||
}
|
||||
|
||||
impl MonzoBatchedJsonInput {
|
||||
fn account_id(&self) -> Option<u8> {
|
||||
match self {
|
||||
MonzoBatchedJsonInput::Legacy(_) => None,
|
||||
MonzoBatchedJsonInput::New { account_id, .. } => *account_id,
|
||||
}
|
||||
}
|
||||
|
||||
fn rows(&self) -> &[Vec<Value>] {
|
||||
match self {
|
||||
MonzoBatchedJsonInput::Legacy(rows) => rows,
|
||||
MonzoBatchedJsonInput::New { rows, .. } => rows,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn monzo_batched_json(
|
||||
Extension(db): Extension<DatabaseConnection>,
|
||||
Json(data): Json<MonzoBatchedJsonInput>,
|
||||
Json(data): Json<Vec<Vec<Value>>>,
|
||||
) -> Result<&'static str, AppError> {
|
||||
let rows = data
|
||||
.rows()
|
||||
.iter()
|
||||
.skip_while(|row| row[0] == Value::String("Transaction ID".to_string()))
|
||||
.map(|row| from_json_row(row.as_ref()))
|
||||
let data = data
|
||||
.into_iter()
|
||||
.skip(1) // Skip the header row.
|
||||
.map(from_json_row)
|
||||
.collect::<Result<_, _>>()?;
|
||||
|
||||
// We default to the main account for JSON ingestion for now.
|
||||
let account_id = db::get_account_id(&db, data.account_id().map(|id| id.to_string())).await?;
|
||||
db::insert(&db, rows, account_id).await?;
|
||||
let account_id = db::get_account_id(&db, None).await?;
|
||||
db::insert(&db, data, account_id).await?;
|
||||
|
||||
Ok("Ok")
|
||||
}
|
||||
|
||||
async fn extract_csv_and_account_name(
|
||||
mut multipart: Multipart,
|
||||
) -> Result<(Option<Bytes>, Option<String>), MultipartError> {
|
||||
async fn extract_csv_and_account_name(mut multipart: Multipart) -> Result<(Option<Bytes>, Option<String>), MultipartError> {
|
||||
let mut csv = None;
|
||||
let mut account_name = None;
|
||||
|
||||
@ -88,7 +59,7 @@ pub struct ShortcutBody {
|
||||
|
||||
pub async fn shortcuts_csv(
|
||||
Extension(db): Extension<DatabaseConnection>,
|
||||
Json(shortcut_body): Json<ShortcutBody>,
|
||||
Json(shortcut_body): Json<ShortcutBody>
|
||||
) -> Result<&'static str, AppError> {
|
||||
let account_id = db::get_account_id(&db, Some(shortcut_body.account_name)).await?;
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user