Compare commits
No commits in common. "d11e4fd0c4c3f90d52380a82915ece257a2fafae" and "bc4aa8242cfb3f64890c362a32c3e8394050edee" have entirely different histories.
d11e4fd0c4
...
bc4aa8242c
2201
Cargo.lock
generated
2201
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
10
Cargo.toml
10
Cargo.toml
@ -7,8 +7,8 @@ edition = "2021"
|
|||||||
entity = { path = "entity" }
|
entity = { path = "entity" }
|
||||||
migration = { path = "migration" }
|
migration = { path = "migration" }
|
||||||
|
|
||||||
axum = { version = "0.8.8", features = ["multipart"] }
|
axum = { version = "0.7.5", features = ["multipart"] }
|
||||||
tokio = { version = "1.48.0", features = ["full"] }
|
tokio = { version = "1.37.0", features = ["full"] }
|
||||||
sea-orm = { version = "1.1.0", features = [
|
sea-orm = { version = "1.1.0", features = [
|
||||||
"sqlx-postgres",
|
"sqlx-postgres",
|
||||||
"runtime-tokio-rustls",
|
"runtime-tokio-rustls",
|
||||||
@ -20,14 +20,14 @@ serde_json = "1.0"
|
|||||||
tracing-subscriber = "0.3.18"
|
tracing-subscriber = "0.3.18"
|
||||||
tracing = "0.1.40"
|
tracing = "0.1.40"
|
||||||
anyhow = { version = "1.0", features = ["backtrace"] }
|
anyhow = { version = "1.0", features = ["backtrace"] }
|
||||||
thiserror = "2.0"
|
thiserror = "1.0"
|
||||||
http = "1.1"
|
http = "1.1"
|
||||||
chrono = { version = "0.4", features = ["serde"] }
|
chrono = { version = "0.4", features = ["serde"] }
|
||||||
num-traits = "0.2"
|
num-traits = "0.2"
|
||||||
csv = "1.3.0"
|
csv = "1.3.0"
|
||||||
clap = "4.5"
|
clap = "4.5"
|
||||||
testcontainers = "0.26"
|
testcontainers = "0.21"
|
||||||
testcontainers-modules = { version = "0.14", features = ["postgres"] }
|
testcontainers-modules = { version = "0.9", features = ["postgres"] }
|
||||||
sqlx = { version = "0.8", features = ["postgres"] }
|
sqlx = { version = "0.8", features = ["postgres"] }
|
||||||
tower-http = { version = "0.6", features = ["trace"] }
|
tower-http = { version = "0.6", features = ["trace"] }
|
||||||
bytes = "1.7"
|
bytes = "1.7"
|
||||||
|
|||||||
@ -119,7 +119,7 @@ async fn whittle_insertions<'a>(
|
|||||||
.select_only()
|
.select_only()
|
||||||
.columns([transaction::Column::IdentityHash])
|
.columns([transaction::Column::IdentityHash])
|
||||||
.filter(transaction::Column::IdentityHash.is_not_null())
|
.filter(transaction::Column::IdentityHash.is_not_null())
|
||||||
.into_tuple::<(i64,)>()
|
.into_tuple::<(i64, )>()
|
||||||
.all(tx)
|
.all(tx)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
@ -133,7 +133,7 @@ async fn whittle_insertions<'a>(
|
|||||||
let hash = i.identity_hash;
|
let hash = i.identity_hash;
|
||||||
!existing_hashes
|
!existing_hashes
|
||||||
.iter()
|
.iter()
|
||||||
.any(|(existing_hash,)| *existing_hash == hash)
|
.any(|(existing_hash, )| *existing_hash == hash)
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
@ -159,17 +159,16 @@ async fn notify_new_transactions(
|
|||||||
|
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::{insert, notify_new_transactions, update_expenditures, update_transactions};
|
use super::{insert, notify_new_transactions, update_expenditures, update_transactions};
|
||||||
use crate::ingestion::ingestion_logic::from_json_row;
|
|
||||||
use anyhow::Error;
|
use anyhow::Error;
|
||||||
use entity::account;
|
use tokio::sync::OnceCell;
|
||||||
use migration::MigratorTrait;
|
use migration::MigratorTrait;
|
||||||
use sea_orm::{ActiveModelTrait, DatabaseConnection, TransactionTrait};
|
use sea_orm::{DatabaseConnection, TransactionTrait};
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
use sqlx::postgres::PgListener;
|
use sqlx::postgres::PgListener;
|
||||||
use sqlx::{Executor, PgPool};
|
use sqlx::PgPool;
|
||||||
use testcontainers::runners::AsyncRunner;
|
use testcontainers::runners::AsyncRunner;
|
||||||
use testcontainers::ContainerAsync;
|
use testcontainers::ContainerAsync;
|
||||||
use tokio::sync::OnceCell;
|
use crate::ingestion::ingestion_logic::from_json_row;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct DatabaseInstance {
|
struct DatabaseInstance {
|
||||||
@ -204,19 +203,13 @@ mod tests {
|
|||||||
Ok(instance)
|
Ok(instance)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_or_initialize_db_instance() -> Result<&'static DatabaseInstance, Error> {
|
async fn get_or_initialize_db_instance() -> Result<
|
||||||
Ok(INSTANCE
|
&'static DatabaseInstance,
|
||||||
.get_or_init(|| async { initialise_db().await.unwrap() })
|
Error,
|
||||||
.await)
|
> {
|
||||||
}
|
Ok(INSTANCE.get_or_init(|| async {
|
||||||
|
initialise_db().await.unwrap()
|
||||||
async fn create_test_account(db: &DatabaseConnection) -> Result<i32, Error> {
|
}).await)
|
||||||
let new_account = account::ActiveModel {
|
|
||||||
id: sea_orm::ActiveValue::NotSet,
|
|
||||||
name: sea_orm::ActiveValue::Set("Test Account".to_string()),
|
|
||||||
};
|
|
||||||
let inserted = new_account.insert(db).await?;
|
|
||||||
Ok(inserted.id)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
@ -260,7 +253,6 @@ mod tests {
|
|||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_notify_on_insert() -> Result<(), Error> {
|
async fn test_notify_on_insert() -> Result<(), Error> {
|
||||||
let dbi = get_or_initialize_db_instance().await?;
|
let dbi = get_or_initialize_db_instance().await?;
|
||||||
let account_id = create_test_account(&dbi.db).await?;
|
|
||||||
let mut listener = PgListener::connect_with(&dbi.pool).await?;
|
let mut listener = PgListener::connect_with(&dbi.pool).await?;
|
||||||
listener.listen("monzo_new_transactions").await?;
|
listener.listen("monzo_new_transactions").await?;
|
||||||
|
|
||||||
@ -268,17 +260,17 @@ mod tests {
|
|||||||
let json: Vec<Vec<Value>> = serde_json::from_str(json).unwrap();
|
let json: Vec<Vec<Value>> = serde_json::from_str(json).unwrap();
|
||||||
let data = json
|
let data = json
|
||||||
.iter()
|
.iter()
|
||||||
.map(|row| from_json_row(row))
|
.map(|row| from_json_row(row.clone()))
|
||||||
.collect::<Result<Vec<_>, anyhow::Error>>()?;
|
.collect::<Result<Vec<_>, anyhow::Error>>()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
insert(&dbi.db, data.clone(), account_id).await?;
|
insert(&dbi.db, data.clone(), ).await?;
|
||||||
let notification = listener.recv().await?;
|
let notification = listener.recv().await?;
|
||||||
let payload = notification.payload();
|
let payload = notification.payload();
|
||||||
let mut payload = serde_json::from_str::<Vec<String>>(&payload)?;
|
let mut payload = serde_json::from_str::<Vec<String>>(&payload)?;
|
||||||
payload.sort();
|
payload.sort();
|
||||||
|
|
||||||
let mut ids = data
|
let mut ids = data.iter()
|
||||||
.iter()
|
|
||||||
.map(|row| row.transaction_id.clone())
|
.map(|row| row.transaction_id.clone())
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
@ -286,43 +278,28 @@ mod tests {
|
|||||||
|
|
||||||
assert_eq!(payload, ids, "Inserted IDs do not match");
|
assert_eq!(payload, ids, "Inserted IDs do not match");
|
||||||
|
|
||||||
insert(&dbi.db, data.clone(), account_id).await?;
|
insert(&dbi.db, data.clone(), ).await?;
|
||||||
let notification = listener.recv().await?;
|
let notification = listener.recv().await?;
|
||||||
let payload = notification.payload();
|
let payload = notification.payload();
|
||||||
let payload = serde_json::from_str::<Vec<String>>(&payload)?;
|
let payload = serde_json::from_str::<Vec<String>>(&payload)?;
|
||||||
assert_eq!(
|
assert_eq!(payload, Vec::<String>::new(), "Re-inserting identical rows triggered double notification");
|
||||||
payload,
|
|
||||||
Vec::<String>::new(),
|
|
||||||
"Re-inserting identical rows triggered double notification"
|
|
||||||
);
|
|
||||||
|
|
||||||
let mut altered_data = data.clone();
|
let mut altered_data = data.clone();
|
||||||
altered_data[0].description = Some("New description".to_string());
|
altered_data[0].description = Some("New description".to_string());
|
||||||
|
|
||||||
assert_ne!(
|
assert_ne!(altered_data[0].compute_hash(), data[0].compute_hash(), "Alterations have the same hash");
|
||||||
altered_data[0].compute_hash(),
|
|
||||||
data[0].compute_hash(),
|
|
||||||
"Alterations have the same hash"
|
|
||||||
);
|
|
||||||
|
|
||||||
insert(&dbi.db, altered_data.clone(), account_id).await?;
|
insert(&dbi.db, altered_data.clone(), ).await?;
|
||||||
let notification = listener.recv().await?;
|
let notification = listener.recv().await?;
|
||||||
let payload = notification.payload();
|
let payload = notification.payload();
|
||||||
let payload = serde_json::from_str::<Vec<String>>(&payload)?;
|
let payload = serde_json::from_str::<Vec<String>>(&payload)?;
|
||||||
assert_eq!(
|
assert_eq!(payload, vec![altered_data[0].transaction_id.clone()], "Re-inserting altered row failed to re-trigger notification");
|
||||||
payload,
|
|
||||||
vec![altered_data[0].transaction_id.clone()],
|
|
||||||
"Re-inserting altered row failed to re-trigger notification"
|
|
||||||
);
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn get_account_id(
|
pub(crate) async fn get_account_id(p0: &DatabaseConnection, p1: Option<String>) -> Result<i32, AppError> {
|
||||||
p0: &DatabaseConnection,
|
|
||||||
p1: Option<String>,
|
|
||||||
) -> Result<i32, AppError> {
|
|
||||||
let p1 = p1.unwrap_or("Monzo".to_string());
|
let p1 = p1.unwrap_or("Monzo".to_string());
|
||||||
|
|
||||||
entity::prelude::Account::find()
|
entity::prelude::Account::find()
|
||||||
@ -330,7 +307,6 @@ pub(crate) async fn get_account_id(
|
|||||||
.select_only()
|
.select_only()
|
||||||
.column(entity::account::Column::Id)
|
.column(entity::account::Column::Id)
|
||||||
.into_tuple::<i32>()
|
.into_tuple::<i32>()
|
||||||
.one(p0)
|
.one(p0).await?
|
||||||
.await?
|
|
||||||
.ok_or(AppError::BadRequest(anyhow!("Account not found")))
|
.ok_or(AppError::BadRequest(anyhow!("Account not found")))
|
||||||
}
|
}
|
||||||
@ -140,7 +140,7 @@ fn parse_timestamp(date: &str, time: &str) -> anyhow::Result<NaiveDateTime> {
|
|||||||
Ok(date.and_time(time))
|
Ok(date.and_time(time))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn from_json_row(row: &[Value]) -> anyhow::Result<MonzoRow> {
|
pub fn from_json_row(row: Vec<Value>) -> anyhow::Result<MonzoRow> {
|
||||||
let date = DateTime::parse_from_rfc3339(row[headings::DATE].as_str().context("No date")?)
|
let date = DateTime::parse_from_rfc3339(row[headings::DATE].as_str().context("No date")?)
|
||||||
.context("Failed to parse date")?;
|
.context("Failed to parse date")?;
|
||||||
|
|
||||||
@ -178,7 +178,7 @@ fn test_json() {
|
|||||||
|
|
||||||
let json_rows = json
|
let json_rows = json
|
||||||
.iter()
|
.iter()
|
||||||
.map(|row| from_json_row(&row))
|
.map(|row| from_json_row(row.clone()))
|
||||||
.collect::<Result<Vec<_>, anyhow::Error>>()
|
.collect::<Result<Vec<_>, anyhow::Error>>()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
|||||||
@ -9,53 +9,24 @@ use sea_orm::DatabaseConnection;
|
|||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
use std::io::Cursor;
|
use std::io::Cursor;
|
||||||
|
|
||||||
#[derive(serde::Deserialize, Debug)]
|
|
||||||
#[serde(untagged)]
|
|
||||||
pub enum MonzoBatchedJsonInput {
|
|
||||||
Legacy(Vec<Vec<Value>>),
|
|
||||||
New {
|
|
||||||
account_id: Option<u8>,
|
|
||||||
rows: Vec<Vec<Value>>,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
impl MonzoBatchedJsonInput {
|
|
||||||
fn account_id(&self) -> Option<u8> {
|
|
||||||
match self {
|
|
||||||
MonzoBatchedJsonInput::Legacy(_) => None,
|
|
||||||
MonzoBatchedJsonInput::New { account_id, .. } => *account_id,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn rows(&self) -> &[Vec<Value>] {
|
|
||||||
match self {
|
|
||||||
MonzoBatchedJsonInput::Legacy(rows) => rows,
|
|
||||||
MonzoBatchedJsonInput::New { rows, .. } => rows,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn monzo_batched_json(
|
pub async fn monzo_batched_json(
|
||||||
Extension(db): Extension<DatabaseConnection>,
|
Extension(db): Extension<DatabaseConnection>,
|
||||||
Json(data): Json<MonzoBatchedJsonInput>,
|
Json(data): Json<Vec<Vec<Value>>>,
|
||||||
) -> Result<&'static str, AppError> {
|
) -> Result<&'static str, AppError> {
|
||||||
let rows = data
|
let data = data
|
||||||
.rows()
|
.into_iter()
|
||||||
.iter()
|
.skip(1) // Skip the header row.
|
||||||
.skip_while(|row| row[0] == Value::String("Transaction ID".to_string()))
|
.map(from_json_row)
|
||||||
.map(|row| from_json_row(row.as_ref()))
|
|
||||||
.collect::<Result<_, _>>()?;
|
.collect::<Result<_, _>>()?;
|
||||||
|
|
||||||
// We default to the main account for JSON ingestion for now.
|
// We default to the main account for JSON ingestion for now.
|
||||||
let account_id = db::get_account_id(&db, data.account_id().map(|id| id.to_string())).await?;
|
let account_id = db::get_account_id(&db, None).await?;
|
||||||
db::insert(&db, rows, account_id).await?;
|
db::insert(&db, data, account_id).await?;
|
||||||
|
|
||||||
Ok("Ok")
|
Ok("Ok")
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn extract_csv_and_account_name(
|
async fn extract_csv_and_account_name(mut multipart: Multipart) -> Result<(Option<Bytes>, Option<String>), MultipartError> {
|
||||||
mut multipart: Multipart,
|
|
||||||
) -> Result<(Option<Bytes>, Option<String>), MultipartError> {
|
|
||||||
let mut csv = None;
|
let mut csv = None;
|
||||||
let mut account_name = None;
|
let mut account_name = None;
|
||||||
|
|
||||||
@ -88,7 +59,7 @@ pub struct ShortcutBody {
|
|||||||
|
|
||||||
pub async fn shortcuts_csv(
|
pub async fn shortcuts_csv(
|
||||||
Extension(db): Extension<DatabaseConnection>,
|
Extension(db): Extension<DatabaseConnection>,
|
||||||
Json(shortcut_body): Json<ShortcutBody>,
|
Json(shortcut_body): Json<ShortcutBody>
|
||||||
) -> Result<&'static str, AppError> {
|
) -> Result<&'static str, AppError> {
|
||||||
let account_id = db::get_account_id(&db, Some(shortcut_body.account_name)).await?;
|
let account_id = db::get_account_id(&db, Some(shortcut_body.account_name)).await?;
|
||||||
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user