Compare commits
No commits in common. "848da0f5f3a17f416120c32c3295d2c67037188e" and "89fccc8d85b4e0007db3a9fdeebd93ad2225b299" have entirely different histories.
848da0f5f3
...
89fccc8d85
1
.gitignore
vendored
1
.gitignore
vendored
@ -1,3 +1,2 @@
|
|||||||
/target
|
/target
|
||||||
/.idea
|
/.idea
|
||||||
client_secret.json
|
|
||||||
|
|||||||
851
Cargo.lock
generated
851
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
22
Cargo.toml
22
Cargo.toml
@ -7,25 +7,25 @@ edition = "2021"
|
|||||||
entity = { path = "entity" }
|
entity = { path = "entity" }
|
||||||
migration = { path = "migration" }
|
migration = { path = "migration" }
|
||||||
|
|
||||||
axum = { version = "0.7.5", features = ["multipart"] }
|
axum = { version = "0.7.4", features = ["multipart"] }
|
||||||
tokio = { version = "1.37.0", features = ["full"] }
|
tokio = { version = "1.36.0", features = ["full"] }
|
||||||
sea-orm = { version = "1.0.0-rc.4", features = [
|
sea-orm = { version = "1.0.0-rc.1", features = [
|
||||||
"sqlx-postgres",
|
"sqlx-postgres",
|
||||||
"runtime-tokio-rustls",
|
"runtime-tokio-rustls",
|
||||||
"macros"
|
"macros"
|
||||||
] }
|
] }
|
||||||
|
|
||||||
serde = { version = "1.0.203", features = ["derive"] }
|
serde = { version = "1.0.197", features = ["derive"] }
|
||||||
serde_json = "1.0.117"
|
serde_json = "1.0.114"
|
||||||
tracing-subscriber = "0.3.18"
|
tracing-subscriber = "0.3.18"
|
||||||
tracing = "0.1.40"
|
tracing = "0.1.40"
|
||||||
anyhow = "1.0.86"
|
anyhow = "1.0.80"
|
||||||
thiserror = "1.0.61"
|
thiserror = "1.0.57"
|
||||||
http = "1.1.0"
|
http = "1.0.0"
|
||||||
chrono = { version = "0.4.38", features = ["serde"] }
|
chrono = { version = "0.4.34", features = ["serde"] }
|
||||||
num-traits = "0.2.19"
|
num-traits = "0.2.18"
|
||||||
csv = "1.3.0"
|
csv = "1.3.0"
|
||||||
clap = "4.5.4"
|
clap = "4.5.1"
|
||||||
|
|
||||||
[workspace]
|
[workspace]
|
||||||
members = [".", "migration", "entity"]
|
members = [".", "migration", "entity"]
|
||||||
|
|||||||
@ -9,5 +9,5 @@ name = "entity"
|
|||||||
path = "src/lib.rs"
|
path = "src/lib.rs"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
sea-orm = { version = "1.0.0-rc.4" }
|
sea-orm = { version = "1.0.0-rc.1" }
|
||||||
serde = { version = "1.0.203", features = ["derive"] }
|
serde = { version = "1.0.197", features = ["derive"] }
|
||||||
|
|||||||
@ -12,7 +12,7 @@ path = "src/lib.rs"
|
|||||||
async-std = { version = "1", features = ["attributes", "tokio1"] }
|
async-std = { version = "1", features = ["attributes", "tokio1"] }
|
||||||
|
|
||||||
[dependencies.sea-orm-migration]
|
[dependencies.sea-orm-migration]
|
||||||
version = "1.0.0-rc.4"
|
version = "1.0.0-rc.1"
|
||||||
features = [
|
features = [
|
||||||
"runtime-tokio-rustls", # `ASYNC_RUNTIME` feature
|
"runtime-tokio-rustls", # `ASYNC_RUNTIME` feature
|
||||||
"sqlx-postgres", # `DATABASE_DRIVER` feature
|
"sqlx-postgres", # `DATABASE_DRIVER` feature
|
||||||
|
|||||||
@ -6,8 +6,7 @@ pub struct Migration;
|
|||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
impl MigrationTrait for Migration {
|
impl MigrationTrait for Migration {
|
||||||
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||||
manager
|
manager.create_table(
|
||||||
.create_table(
|
|
||||||
Table::create()
|
Table::create()
|
||||||
.table(Transaction::Table)
|
.table(Transaction::Table)
|
||||||
.if_not_exists()
|
.if_not_exists()
|
||||||
@ -32,14 +31,29 @@ impl MigrationTrait for Migration {
|
|||||||
.timestamp()
|
.timestamp()
|
||||||
.not_null(),
|
.not_null(),
|
||||||
)
|
)
|
||||||
.col(ColumnDef::new(Transaction::Title).string().not_null())
|
.col(
|
||||||
.col(ColumnDef::new(Transaction::Emoji).string())
|
ColumnDef::new(Transaction::Title)
|
||||||
.col(ColumnDef::new(Transaction::Notes).string())
|
.string()
|
||||||
.col(ColumnDef::new(Transaction::Receipt).string())
|
.not_null(),
|
||||||
.col(ColumnDef::new(Transaction::Description).string())
|
|
||||||
.to_owned(),
|
|
||||||
)
|
)
|
||||||
.await?;
|
.col(
|
||||||
|
ColumnDef::new(Transaction::Emoji)
|
||||||
|
.string(),
|
||||||
|
)
|
||||||
|
.col(
|
||||||
|
ColumnDef::new(Transaction::Notes)
|
||||||
|
.string(),
|
||||||
|
)
|
||||||
|
.col(
|
||||||
|
ColumnDef::new(Transaction::Receipt)
|
||||||
|
.string(),
|
||||||
|
)
|
||||||
|
.col(
|
||||||
|
ColumnDef::new(Transaction::Description)
|
||||||
|
.string(),
|
||||||
|
)
|
||||||
|
.to_owned()
|
||||||
|
).await?;
|
||||||
|
|
||||||
manager
|
manager
|
||||||
.create_table(
|
.create_table(
|
||||||
@ -59,8 +73,7 @@ impl MigrationTrait for Migration {
|
|||||||
)
|
)
|
||||||
.col(ColumnDef::new(Expenditure::Amount).decimal().not_null())
|
.col(ColumnDef::new(Expenditure::Amount).decimal().not_null())
|
||||||
.to_owned(),
|
.to_owned(),
|
||||||
)
|
).await?;
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@ -12,7 +12,7 @@ pub enum AppError {
|
|||||||
#[error("Invalid request {0}")]
|
#[error("Invalid request {0}")]
|
||||||
BadRequest(anyhow::Error),
|
BadRequest(anyhow::Error),
|
||||||
|
|
||||||
/// Catch all for error we don't care to expose publicly.
|
/// Catch all for error we dont care to expose publicly.
|
||||||
#[error("Internal error")]
|
#[error("Internal error")]
|
||||||
Anyhow(#[from] anyhow::Error),
|
Anyhow(#[from] anyhow::Error),
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,10 +1,7 @@
|
|||||||
use anyhow::anyhow;
|
|
||||||
use entity::{expenditure, transaction};
|
use entity::{expenditure, transaction};
|
||||||
use sea_orm::sea_query::OnConflict;
|
use sea_orm::sea_query::OnConflict;
|
||||||
use sea_orm::{ConnectionTrait, DatabaseBackend, QueryFilter, QueryTrait, Statement};
|
use sea_orm::QueryFilter;
|
||||||
use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, Iterable, TransactionTrait};
|
use sea_orm::{ColumnTrait, DatabaseConnection, DbErr, EntityTrait, Iterable, TransactionTrait};
|
||||||
use migration::PostgresQueryBuilder;
|
|
||||||
use crate::error::AppError;
|
|
||||||
|
|
||||||
pub struct Insertion {
|
pub struct Insertion {
|
||||||
pub transaction: transaction::ActiveModel,
|
pub transaction: transaction::ActiveModel,
|
||||||
@ -12,43 +9,35 @@ pub struct Insertion {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Note while this is more efficient in db calls, it does bind together the entire group.
|
// Note while this is more efficient in db calls, it does bind together the entire group.
|
||||||
// We employ a batching process for now to try balance speed and failure rate, but it is worth
|
// We employ a batching process for now to try balance speed and failure rate but it is worth trying
|
||||||
// trying to move failures earlier and improve reporting.
|
// to move failures earlier and improve reporting.
|
||||||
pub async fn insert(db: &DatabaseConnection, insertions: Vec<Insertion>) -> Result<Vec<String>, AppError> {
|
pub async fn insert(db: &DatabaseConnection, insertions: Vec<Insertion>) -> Result<(), DbErr> {
|
||||||
let mut new_transaction_ids = Vec::new();
|
|
||||||
|
|
||||||
for insertions in insertions.chunks(400) {
|
for insertions in insertions.chunks(400) {
|
||||||
let tx = db.begin().await?;
|
let tx = db.begin().await?;
|
||||||
|
|
||||||
let insert = transaction::Entity::insert_many(insertions.iter().map(|i| &i.transaction).cloned())
|
transaction::Entity::insert_many(
|
||||||
|
insertions.iter().map(|i| &i.transaction).cloned(),
|
||||||
|
)
|
||||||
.on_conflict(
|
.on_conflict(
|
||||||
OnConflict::column(transaction::Column::Id)
|
OnConflict::column(transaction::Column::Id)
|
||||||
.update_columns(transaction::Column::iter())
|
.update_columns(transaction::Column::iter())
|
||||||
.to_owned(),
|
.to_owned(),
|
||||||
)
|
)
|
||||||
.into_query()
|
|
||||||
.returning_col(transaction::Column::Id)
|
|
||||||
.build(PostgresQueryBuilder);
|
|
||||||
|
|
||||||
let inserted_transaction_ids = tx.query_all(Statement::from_sql_and_values(
|
|
||||||
DatabaseBackend::Postgres,
|
|
||||||
insert.0,
|
|
||||||
insert.1,
|
|
||||||
)).await?
|
|
||||||
.iter()
|
|
||||||
.map(|r| r.try_get_by("id"))
|
|
||||||
.collect::<Result<Vec<String>, _>>()?;
|
|
||||||
|
|
||||||
// Expenditures can change as we re-categorise them, so we delete all the old ones and
|
|
||||||
// insert an entirely new set to ensure we don't end up leaving old ones around.
|
|
||||||
expenditure::Entity::delete_many()
|
|
||||||
.filter(
|
|
||||||
expenditure::Column::TransactionId
|
|
||||||
.is_in(insertions.iter().map(|i| i.transaction.id.as_ref())),
|
|
||||||
)
|
|
||||||
.exec(&tx)
|
.exec(&tx)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
// Expenditures can change as we recagegorise them, so we delete all the old ones and insert
|
||||||
|
// an entirely new set to ensure we don't end up leaving old ones around.
|
||||||
|
expenditure::Entity::delete_many()
|
||||||
|
.filter(
|
||||||
|
expenditure::Column::TransactionId.is_in(
|
||||||
|
insertions
|
||||||
|
.iter()
|
||||||
|
.map(|i| i.transaction.id.as_ref()),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
.exec(&tx).await?;
|
||||||
|
|
||||||
expenditure::Entity::insert_many(
|
expenditure::Entity::insert_many(
|
||||||
insertions
|
insertions
|
||||||
.iter()
|
.iter()
|
||||||
@ -67,19 +56,7 @@ pub async fn insert(db: &DatabaseConnection, insertions: Vec<Insertion>) -> Resu
|
|||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
tx.commit().await?;
|
tx.commit().await?;
|
||||||
new_transaction_ids.extend(inserted_transaction_ids);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let payload = serde_json::to_string(&new_transaction_ids)
|
Ok(())
|
||||||
.map_err(|e| anyhow!(e))?;
|
|
||||||
|
|
||||||
db.execute(
|
|
||||||
Statement::from_sql_and_values(
|
|
||||||
DatabaseBackend::Postgres,
|
|
||||||
"NOTIFY monzo_new_transactions, $1",
|
|
||||||
vec![sea_orm::Value::from(payload)],
|
|
||||||
)
|
|
||||||
).await?;
|
|
||||||
|
|
||||||
Ok(new_transaction_ids)
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,13 +1,13 @@
|
|||||||
use crate::ingestion::db::Insertion;
|
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime};
|
use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime};
|
||||||
use csv::StringRecord;
|
|
||||||
use entity::expenditure::ActiveModel;
|
use entity::expenditure::ActiveModel;
|
||||||
use entity::transaction;
|
use entity::transaction;
|
||||||
use num_traits::FromPrimitive;
|
use num_traits::FromPrimitive;
|
||||||
use sea_orm::prelude::Decimal;
|
use sea_orm::prelude::Decimal;
|
||||||
use sea_orm::ActiveValue::*;
|
use sea_orm::ActiveValue::*;
|
||||||
use sea_orm::IntoActiveModel;
|
use sea_orm::IntoActiveModel;
|
||||||
|
use crate::ingestion::db::Insertion;
|
||||||
|
use csv::StringRecord;
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
mod headings {
|
mod headings {
|
||||||
|
|||||||
@ -1,3 +1,3 @@
|
|||||||
pub mod db;
|
pub mod db;
|
||||||
pub mod ingestion_logic;
|
|
||||||
pub mod routes;
|
pub mod routes;
|
||||||
|
pub mod ingestion_logic;
|
||||||
|
|||||||
@ -1,11 +1,11 @@
|
|||||||
use crate::error::AppError;
|
|
||||||
use crate::ingestion::db;
|
|
||||||
use crate::ingestion::ingestion_logic::{from_csv_row, from_json_row};
|
|
||||||
use anyhow::anyhow;
|
use anyhow::anyhow;
|
||||||
use axum::extract::{Extension, Json, Multipart};
|
use axum::extract::{Extension, Json, Multipart};
|
||||||
use sea_orm::DatabaseConnection;
|
use sea_orm::DatabaseConnection;
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
use std::io::Cursor;
|
use std::io::Cursor;
|
||||||
|
use crate::error::AppError;
|
||||||
|
use crate::ingestion::db;
|
||||||
|
use crate::ingestion::ingestion_logic::{from_csv_row, from_json_row};
|
||||||
|
|
||||||
pub async fn monzo_updated(
|
pub async fn monzo_updated(
|
||||||
Extension(db): Extension<DatabaseConnection>,
|
Extension(db): Extension<DatabaseConnection>,
|
||||||
|
|||||||
17
src/main.rs
17
src/main.rs
@ -1,14 +1,14 @@
|
|||||||
mod error;
|
|
||||||
mod ingestion;
|
mod ingestion;
|
||||||
|
mod error;
|
||||||
|
|
||||||
|
use axum::{Extension, Router};
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
use axum::routing::{get, post};
|
||||||
|
use clap::Parser;
|
||||||
|
use sea_orm::{ConnectionTrait, DatabaseConnection};
|
||||||
|
use migration::{Migrator, MigratorTrait};
|
||||||
use crate::error::AppError;
|
use crate::error::AppError;
|
||||||
use crate::ingestion::routes::{monzo_batched_csv, monzo_batched_json, monzo_updated};
|
use crate::ingestion::routes::{monzo_batched_csv, monzo_batched_json, monzo_updated};
|
||||||
use axum::routing::{get, post};
|
|
||||||
use axum::{Extension, Router};
|
|
||||||
use clap::Parser;
|
|
||||||
use migration::{Migrator, MigratorTrait};
|
|
||||||
use sea_orm::{ConnectionTrait, DatabaseConnection};
|
|
||||||
use std::net::SocketAddr;
|
|
||||||
|
|
||||||
#[derive(Debug, clap::Parser)]
|
#[derive(Debug, clap::Parser)]
|
||||||
struct Config {
|
struct Config {
|
||||||
@ -28,7 +28,8 @@ struct Config {
|
|||||||
async fn health_check(
|
async fn health_check(
|
||||||
Extension(db): Extension<DatabaseConnection>,
|
Extension(db): Extension<DatabaseConnection>,
|
||||||
) -> Result<&'static str, AppError> {
|
) -> Result<&'static str, AppError> {
|
||||||
db.execute_unprepared("SELECT 1").await?;
|
db.execute_unprepared("SELECT 1")
|
||||||
|
.await?;
|
||||||
|
|
||||||
Ok("Ok")
|
Ok("Ok")
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user