Compare commits
4 Commits
89fccc8d85
...
848da0f5f3
| Author | SHA1 | Date | |
|---|---|---|---|
| 848da0f5f3 | |||
| 11fa106cf5 | |||
| e375e7d9dc | |||
| 660b7b8676 |
1
.gitignore
vendored
1
.gitignore
vendored
@ -1,2 +1,3 @@
|
||||
/target
|
||||
/.idea
|
||||
client_secret.json
|
||||
|
||||
855
Cargo.lock
generated
855
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
22
Cargo.toml
22
Cargo.toml
@ -7,25 +7,25 @@ edition = "2021"
|
||||
entity = { path = "entity" }
|
||||
migration = { path = "migration" }
|
||||
|
||||
axum = { version = "0.7.4", features = ["multipart"] }
|
||||
tokio = { version = "1.36.0", features = ["full"] }
|
||||
sea-orm = { version = "1.0.0-rc.1", features = [
|
||||
axum = { version = "0.7.5", features = ["multipart"] }
|
||||
tokio = { version = "1.37.0", features = ["full"] }
|
||||
sea-orm = { version = "1.0.0-rc.4", features = [
|
||||
"sqlx-postgres",
|
||||
"runtime-tokio-rustls",
|
||||
"macros"
|
||||
] }
|
||||
|
||||
serde = { version = "1.0.197", features = ["derive"] }
|
||||
serde_json = "1.0.114"
|
||||
serde = { version = "1.0.203", features = ["derive"] }
|
||||
serde_json = "1.0.117"
|
||||
tracing-subscriber = "0.3.18"
|
||||
tracing = "0.1.40"
|
||||
anyhow = "1.0.80"
|
||||
thiserror = "1.0.57"
|
||||
http = "1.0.0"
|
||||
chrono = { version = "0.4.34", features = ["serde"] }
|
||||
num-traits = "0.2.18"
|
||||
anyhow = "1.0.86"
|
||||
thiserror = "1.0.61"
|
||||
http = "1.1.0"
|
||||
chrono = { version = "0.4.38", features = ["serde"] }
|
||||
num-traits = "0.2.19"
|
||||
csv = "1.3.0"
|
||||
clap = "4.5.1"
|
||||
clap = "4.5.4"
|
||||
|
||||
[workspace]
|
||||
members = [".", "migration", "entity"]
|
||||
|
||||
@ -9,5 +9,5 @@ name = "entity"
|
||||
path = "src/lib.rs"
|
||||
|
||||
[dependencies]
|
||||
sea-orm = { version = "1.0.0-rc.1" }
|
||||
serde = { version = "1.0.197", features = ["derive"] }
|
||||
sea-orm = { version = "1.0.0-rc.4" }
|
||||
serde = { version = "1.0.203", features = ["derive"] }
|
||||
|
||||
@ -12,7 +12,7 @@ path = "src/lib.rs"
|
||||
async-std = { version = "1", features = ["attributes", "tokio1"] }
|
||||
|
||||
[dependencies.sea-orm-migration]
|
||||
version = "1.0.0-rc.1"
|
||||
version = "1.0.0-rc.4"
|
||||
features = [
|
||||
"runtime-tokio-rustls", # `ASYNC_RUNTIME` feature
|
||||
"sqlx-postgres", # `DATABASE_DRIVER` feature
|
||||
|
||||
@ -6,54 +6,40 @@ pub struct Migration;
|
||||
#[async_trait::async_trait]
|
||||
impl MigrationTrait for Migration {
|
||||
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||
manager.create_table(
|
||||
Table::create()
|
||||
.table(Transaction::Table)
|
||||
.if_not_exists()
|
||||
.col(
|
||||
ColumnDef::new(Transaction::Id)
|
||||
.string()
|
||||
.not_null()
|
||||
.primary_key(),
|
||||
)
|
||||
.col(
|
||||
ColumnDef::new(Transaction::TransactionType)
|
||||
.string()
|
||||
.not_null(),
|
||||
)
|
||||
.col(
|
||||
ColumnDef::new(Transaction::TotalAmount)
|
||||
.decimal()
|
||||
.not_null(),
|
||||
)
|
||||
.col(
|
||||
ColumnDef::new(Transaction::Timestamp)
|
||||
.timestamp()
|
||||
.not_null(),
|
||||
)
|
||||
.col(
|
||||
ColumnDef::new(Transaction::Title)
|
||||
.string()
|
||||
.not_null(),
|
||||
)
|
||||
.col(
|
||||
ColumnDef::new(Transaction::Emoji)
|
||||
.string(),
|
||||
)
|
||||
.col(
|
||||
ColumnDef::new(Transaction::Notes)
|
||||
.string(),
|
||||
)
|
||||
.col(
|
||||
ColumnDef::new(Transaction::Receipt)
|
||||
.string(),
|
||||
)
|
||||
.col(
|
||||
ColumnDef::new(Transaction::Description)
|
||||
.string(),
|
||||
)
|
||||
.to_owned()
|
||||
).await?;
|
||||
manager
|
||||
.create_table(
|
||||
Table::create()
|
||||
.table(Transaction::Table)
|
||||
.if_not_exists()
|
||||
.col(
|
||||
ColumnDef::new(Transaction::Id)
|
||||
.string()
|
||||
.not_null()
|
||||
.primary_key(),
|
||||
)
|
||||
.col(
|
||||
ColumnDef::new(Transaction::TransactionType)
|
||||
.string()
|
||||
.not_null(),
|
||||
)
|
||||
.col(
|
||||
ColumnDef::new(Transaction::TotalAmount)
|
||||
.decimal()
|
||||
.not_null(),
|
||||
)
|
||||
.col(
|
||||
ColumnDef::new(Transaction::Timestamp)
|
||||
.timestamp()
|
||||
.not_null(),
|
||||
)
|
||||
.col(ColumnDef::new(Transaction::Title).string().not_null())
|
||||
.col(ColumnDef::new(Transaction::Emoji).string())
|
||||
.col(ColumnDef::new(Transaction::Notes).string())
|
||||
.col(ColumnDef::new(Transaction::Receipt).string())
|
||||
.col(ColumnDef::new(Transaction::Description).string())
|
||||
.to_owned(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
manager
|
||||
.create_table(
|
||||
@ -73,7 +59,8 @@ impl MigrationTrait for Migration {
|
||||
)
|
||||
.col(ColumnDef::new(Expenditure::Amount).decimal().not_null())
|
||||
.to_owned(),
|
||||
).await?;
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -12,7 +12,7 @@ pub enum AppError {
|
||||
#[error("Invalid request {0}")]
|
||||
BadRequest(anyhow::Error),
|
||||
|
||||
/// Catch all for error we dont care to expose publicly.
|
||||
/// Catch all for error we don't care to expose publicly.
|
||||
#[error("Internal error")]
|
||||
Anyhow(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
@ -1,7 +1,10 @@
|
||||
use anyhow::anyhow;
|
||||
use entity::{expenditure, transaction};
|
||||
use sea_orm::sea_query::OnConflict;
|
||||
use sea_orm::QueryFilter;
|
||||
use sea_orm::{ColumnTrait, DatabaseConnection, DbErr, EntityTrait, Iterable, TransactionTrait};
|
||||
use sea_orm::{ConnectionTrait, DatabaseBackend, QueryFilter, QueryTrait, Statement};
|
||||
use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, Iterable, TransactionTrait};
|
||||
use migration::PostgresQueryBuilder;
|
||||
use crate::error::AppError;
|
||||
|
||||
pub struct Insertion {
|
||||
pub transaction: transaction::ActiveModel,
|
||||
@ -9,34 +12,42 @@ pub struct Insertion {
|
||||
}
|
||||
|
||||
// Note while this is more efficient in db calls, it does bind together the entire group.
|
||||
// We employ a batching process for now to try balance speed and failure rate but it is worth trying
|
||||
// to move failures earlier and improve reporting.
|
||||
pub async fn insert(db: &DatabaseConnection, insertions: Vec<Insertion>) -> Result<(), DbErr> {
|
||||
// We employ a batching process for now to try balance speed and failure rate, but it is worth
|
||||
// trying to move failures earlier and improve reporting.
|
||||
pub async fn insert(db: &DatabaseConnection, insertions: Vec<Insertion>) -> Result<Vec<String>, AppError> {
|
||||
let mut new_transaction_ids = Vec::new();
|
||||
|
||||
for insertions in insertions.chunks(400) {
|
||||
let tx = db.begin().await?;
|
||||
|
||||
transaction::Entity::insert_many(
|
||||
insertions.iter().map(|i| &i.transaction).cloned(),
|
||||
)
|
||||
.on_conflict(
|
||||
OnConflict::column(transaction::Column::Id)
|
||||
.update_columns(transaction::Column::iter())
|
||||
.to_owned(),
|
||||
)
|
||||
.exec(&tx)
|
||||
.await?;
|
||||
let insert = transaction::Entity::insert_many(insertions.iter().map(|i| &i.transaction).cloned())
|
||||
.on_conflict(
|
||||
OnConflict::column(transaction::Column::Id)
|
||||
.update_columns(transaction::Column::iter())
|
||||
.to_owned(),
|
||||
)
|
||||
.into_query()
|
||||
.returning_col(transaction::Column::Id)
|
||||
.build(PostgresQueryBuilder);
|
||||
|
||||
// Expenditures can change as we recagegorise them, so we delete all the old ones and insert
|
||||
// an entirely new set to ensure we don't end up leaving old ones around.
|
||||
let inserted_transaction_ids = tx.query_all(Statement::from_sql_and_values(
|
||||
DatabaseBackend::Postgres,
|
||||
insert.0,
|
||||
insert.1,
|
||||
)).await?
|
||||
.iter()
|
||||
.map(|r| r.try_get_by("id"))
|
||||
.collect::<Result<Vec<String>, _>>()?;
|
||||
|
||||
// Expenditures can change as we re-categorise them, so we delete all the old ones and
|
||||
// insert an entirely new set to ensure we don't end up leaving old ones around.
|
||||
expenditure::Entity::delete_many()
|
||||
.filter(
|
||||
expenditure::Column::TransactionId.is_in(
|
||||
insertions
|
||||
.iter()
|
||||
.map(|i| i.transaction.id.as_ref()),
|
||||
),
|
||||
expenditure::Column::TransactionId
|
||||
.is_in(insertions.iter().map(|i| i.transaction.id.as_ref())),
|
||||
)
|
||||
.exec(&tx).await?;
|
||||
.exec(&tx)
|
||||
.await?;
|
||||
|
||||
expenditure::Entity::insert_many(
|
||||
insertions
|
||||
@ -56,7 +67,19 @@ pub async fn insert(db: &DatabaseConnection, insertions: Vec<Insertion>) -> Resu
|
||||
.await?;
|
||||
|
||||
tx.commit().await?;
|
||||
new_transaction_ids.extend(inserted_transaction_ids);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
let payload = serde_json::to_string(&new_transaction_ids)
|
||||
.map_err(|e| anyhow!(e))?;
|
||||
|
||||
db.execute(
|
||||
Statement::from_sql_and_values(
|
||||
DatabaseBackend::Postgres,
|
||||
"NOTIFY monzo_new_transactions, $1",
|
||||
vec![sea_orm::Value::from(payload)],
|
||||
)
|
||||
).await?;
|
||||
|
||||
Ok(new_transaction_ids)
|
||||
}
|
||||
|
||||
@ -1,13 +1,13 @@
|
||||
use crate::ingestion::db::Insertion;
|
||||
use anyhow::Context;
|
||||
use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime};
|
||||
use csv::StringRecord;
|
||||
use entity::expenditure::ActiveModel;
|
||||
use entity::transaction;
|
||||
use num_traits::FromPrimitive;
|
||||
use sea_orm::prelude::Decimal;
|
||||
use sea_orm::ActiveValue::*;
|
||||
use sea_orm::IntoActiveModel;
|
||||
use crate::ingestion::db::Insertion;
|
||||
use csv::StringRecord;
|
||||
|
||||
#[allow(dead_code)]
|
||||
mod headings {
|
||||
|
||||
@ -1,3 +1,3 @@
|
||||
pub mod db;
|
||||
pub mod routes;
|
||||
pub mod ingestion_logic;
|
||||
pub mod routes;
|
||||
|
||||
@ -1,11 +1,11 @@
|
||||
use crate::error::AppError;
|
||||
use crate::ingestion::db;
|
||||
use crate::ingestion::ingestion_logic::{from_csv_row, from_json_row};
|
||||
use anyhow::anyhow;
|
||||
use axum::extract::{Extension, Json, Multipart};
|
||||
use sea_orm::DatabaseConnection;
|
||||
use serde_json::Value;
|
||||
use std::io::Cursor;
|
||||
use crate::error::AppError;
|
||||
use crate::ingestion::db;
|
||||
use crate::ingestion::ingestion_logic::{from_csv_row, from_json_row};
|
||||
|
||||
pub async fn monzo_updated(
|
||||
Extension(db): Extension<DatabaseConnection>,
|
||||
|
||||
17
src/main.rs
17
src/main.rs
@ -1,14 +1,14 @@
|
||||
mod ingestion;
|
||||
mod error;
|
||||
mod ingestion;
|
||||
|
||||
use axum::{Extension, Router};
|
||||
use std::net::SocketAddr;
|
||||
use axum::routing::{get, post};
|
||||
use clap::Parser;
|
||||
use sea_orm::{ConnectionTrait, DatabaseConnection};
|
||||
use migration::{Migrator, MigratorTrait};
|
||||
use crate::error::AppError;
|
||||
use crate::ingestion::routes::{monzo_batched_csv, monzo_batched_json, monzo_updated};
|
||||
use axum::routing::{get, post};
|
||||
use axum::{Extension, Router};
|
||||
use clap::Parser;
|
||||
use migration::{Migrator, MigratorTrait};
|
||||
use sea_orm::{ConnectionTrait, DatabaseConnection};
|
||||
use std::net::SocketAddr;
|
||||
|
||||
#[derive(Debug, clap::Parser)]
|
||||
struct Config {
|
||||
@ -28,8 +28,7 @@ struct Config {
|
||||
async fn health_check(
|
||||
Extension(db): Extension<DatabaseConnection>,
|
||||
) -> Result<&'static str, AppError> {
|
||||
db.execute_unprepared("SELECT 1")
|
||||
.await?;
|
||||
db.execute_unprepared("SELECT 1").await?;
|
||||
|
||||
Ok("Ok")
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user