Compare commits
1 Commits
main
...
tracing-ex
| Author | SHA1 | Date | |
|---|---|---|---|
| e2ffd900b4 |
@ -28,7 +28,6 @@
|
|||||||
**/values.dev.yaml
|
**/values.dev.yaml
|
||||||
/bin
|
/bin
|
||||||
/target
|
/target
|
||||||
!/target/**/monzo-ingestion
|
|
||||||
/.idea
|
/.idea
|
||||||
LICENSE
|
LICENSE
|
||||||
README.md
|
README.md
|
||||||
|
|||||||
2
.env
2
.env
@ -1 +1 @@
|
|||||||
DATABASE_URL=postgres://postgres@localhost/monzo_development
|
DATABASE_URL=postgres://postgres@localhost/logos_neu
|
||||||
|
|||||||
6
.github/workflows/build.yml
vendored
6
.github/workflows/build.yml
vendored
@ -78,9 +78,9 @@ jobs:
|
|||||||
target/release/${{ env.RUST_BINARY_NAME }}
|
target/release/${{ env.RUST_BINARY_NAME }}
|
||||||
target/aarch64-unknown-linux-musl/release/${{ env.RUST_BINARY_NAME }}
|
target/aarch64-unknown-linux-musl/release/${{ env.RUST_BINARY_NAME }}
|
||||||
|
|
||||||
# # The target directory is kept in the .dockerignore file, so allow these to be copied in we need to move them to a
|
# The target directory is kept in the .dockerignore file, so allow these to be copied in we need to move them to a
|
||||||
# # new directory.
|
# new directory.
|
||||||
# - run: mv target docker-binaries
|
- run: mv target docker-binaries
|
||||||
|
|
||||||
- name: Build and push multi-arch Docker image
|
- name: Build and push multi-arch Docker image
|
||||||
uses: docker/build-push-action@v4
|
uses: docker/build-push-action@v4
|
||||||
|
|||||||
1569
Cargo.lock
generated
1569
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
14
Cargo.toml
14
Cargo.toml
@ -9,7 +9,7 @@ migration = { path = "migration" }
|
|||||||
|
|
||||||
axum = { version = "0.7.5", features = ["multipart"] }
|
axum = { version = "0.7.5", features = ["multipart"] }
|
||||||
tokio = { version = "1.37.0", features = ["full"] }
|
tokio = { version = "1.37.0", features = ["full"] }
|
||||||
sea-orm = { version = "1.1.0", features = [
|
sea-orm = { version = "1.0.0", features = [
|
||||||
"sqlx-postgres",
|
"sqlx-postgres",
|
||||||
"runtime-tokio-rustls",
|
"runtime-tokio-rustls",
|
||||||
"macros"
|
"macros"
|
||||||
@ -28,10 +28,18 @@ csv = "1.3.0"
|
|||||||
clap = "4.5"
|
clap = "4.5"
|
||||||
testcontainers = "0.21"
|
testcontainers = "0.21"
|
||||||
testcontainers-modules = { version = "0.9", features = ["postgres"] }
|
testcontainers-modules = { version = "0.9", features = ["postgres"] }
|
||||||
sqlx = { version = "0.8", features = ["postgres"] }
|
sqlx = { version = "0.7", features = ["postgres"] }
|
||||||
tower-http = { version = "0.6", features = ["trace"] }
|
tower-http = { version = "0.5", features = ["trace"] }
|
||||||
bytes = "1.7"
|
bytes = "1.7"
|
||||||
once_cell = "1.19"
|
once_cell = "1.19"
|
||||||
|
|
||||||
|
tracing-opentelemetry = "0.25.0"
|
||||||
|
opentelemetry = "0.24.0"
|
||||||
|
opentelemetry_sdk = { version = "0.24.1", features = ["rt-tokio"] }
|
||||||
|
opentelemetry-http = { version = "0.13.0", features = ["reqwest"] }
|
||||||
|
opentelemetry-otlp = { version = "0.17.0", features = ["grpc-tonic", "http-json", "tokio"] }
|
||||||
|
opentelemetry-semantic-conventions = "0.16.0"
|
||||||
|
reqwest = "0.12.7"
|
||||||
|
|
||||||
[workspace]
|
[workspace]
|
||||||
members = [".", "migration", "entity"]
|
members = [".", "migration", "entity"]
|
||||||
|
|||||||
20
Dockerfile
20
Dockerfile
@ -1,14 +1,16 @@
|
|||||||
FROM busybox AS platform_determiner
|
FROM --platform=$BUILDPLATFORM debian:bullseye-slim AS builder
|
||||||
ARG TARGETPLATFORM
|
ARG TARGETPLATFORM
|
||||||
ARG BINARY_NAME
|
ARG BINARY_NAME
|
||||||
|
WORKDIR /app
|
||||||
COPY /target /target
|
COPY . .
|
||||||
RUN case "$TARGETPLATFORM" in \
|
RUN case "$TARGETPLATFORM" in \
|
||||||
"linux/amd64") BINARY_PATH="/target/release/${BINARY_NAME}" ;; \
|
"linux/amd64") BINARY_PATH="target/release/${BINARY_NAME}" ;; \
|
||||||
"linux/arm64") BINARY_PATH="/target/aarch64-unknown-linux-musl/release/${BINARY_NAME}" ;; \
|
"linux/arm64") BINARY_PATH="target/aarch64-unknown-linux-gnu/release/${BINARY_NAME}" ;; \
|
||||||
*) exit 1 ;; \
|
*) exit 1 ;; \
|
||||||
esac && mv "$BINARY_PATH" "/usr/bin/monzo-ingestion" && chmod +x "/usr/bin/monzo-ingestion"
|
esac && \
|
||||||
|
mv "$BINARY_PATH" /usr/local/bin/${BINARY_NAME}
|
||||||
|
|
||||||
FROM --platform=$TARGETPLATFORM debian:bookworm-slim
|
FROM --platform=$TARGETPLATFORM debian:bullseye-slim
|
||||||
COPY --from=platform_determiner /usr/bin/monzo-ingestion /usr/local/bin/monzo-ingestion
|
ARG BINARY_NAME
|
||||||
ENTRYPOINT ["/usr/local/bin/monzo-ingestion"]
|
COPY --from=builder /usr/local/bin/${BINARY_NAME} /usr/local/bin/
|
||||||
|
CMD ["${BINARY_NAME}"]
|
||||||
|
|||||||
@ -9,5 +9,5 @@ name = "entity"
|
|||||||
path = "src/lib.rs"
|
path = "src/lib.rs"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
sea-orm = { version = "1.1.0" }
|
sea-orm = { version = "1.0.0-rc.4" }
|
||||||
serde = { version = "1.0.203", features = ["derive"] }
|
serde = { version = "1.0.203", features = ["derive"] }
|
||||||
|
|||||||
@ -1,26 +0,0 @@
|
|||||||
//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.0
|
|
||||||
|
|
||||||
use sea_orm::entity::prelude::*;
|
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
|
|
||||||
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
|
|
||||||
#[sea_orm(table_name = "account")]
|
|
||||||
pub struct Model {
|
|
||||||
#[sea_orm(primary_key)]
|
|
||||||
pub id: i32,
|
|
||||||
pub name: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
|
|
||||||
pub enum Relation {
|
|
||||||
#[sea_orm(has_many = "super::transaction::Entity")]
|
|
||||||
Transaction,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Related<super::transaction::Entity> for Entity {
|
|
||||||
fn to() -> RelationDef {
|
|
||||||
Relation::Transaction.def()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ActiveModelBehavior for ActiveModel {}
|
|
||||||
@ -1,4 +1,4 @@
|
|||||||
//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.0
|
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2
|
||||||
|
|
||||||
use sea_orm::entity::prelude::*;
|
use sea_orm::entity::prelude::*;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
@ -14,21 +14,6 @@ pub struct Model {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
|
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
|
||||||
pub enum Relation {
|
pub enum Relation {}
|
||||||
#[sea_orm(
|
|
||||||
belongs_to = "super::transaction::Entity",
|
|
||||||
from = "Column::TransactionId",
|
|
||||||
to = "super::transaction::Column::Id",
|
|
||||||
on_update = "NoAction",
|
|
||||||
on_delete = "Cascade"
|
|
||||||
)]
|
|
||||||
Transaction,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Related<super::transaction::Entity> for Entity {
|
|
||||||
fn to() -> RelationDef {
|
|
||||||
Relation::Transaction.def()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ActiveModelBehavior for ActiveModel {}
|
impl ActiveModelBehavior for ActiveModel {}
|
||||||
|
|||||||
@ -1,7 +1,6 @@
|
|||||||
//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.0
|
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2
|
||||||
|
|
||||||
pub mod prelude;
|
pub mod prelude;
|
||||||
|
|
||||||
pub mod account;
|
|
||||||
pub mod expenditure;
|
pub mod expenditure;
|
||||||
pub mod transaction;
|
pub mod transaction;
|
||||||
|
|||||||
@ -1,5 +1,4 @@
|
|||||||
//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.0
|
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2
|
||||||
|
|
||||||
pub use super::account::Entity as Account;
|
|
||||||
pub use super::expenditure::Entity as Expenditure;
|
pub use super::expenditure::Entity as Expenditure;
|
||||||
pub use super::transaction::Entity as Transaction;
|
pub use super::transaction::Entity as Transaction;
|
||||||
|
|||||||
@ -1,4 +1,4 @@
|
|||||||
//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.0
|
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2
|
||||||
|
|
||||||
use sea_orm::entity::prelude::*;
|
use sea_orm::entity::prelude::*;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
@ -18,33 +18,9 @@ pub struct Model {
|
|||||||
pub description: Option<String>,
|
pub description: Option<String>,
|
||||||
#[sea_orm(unique)]
|
#[sea_orm(unique)]
|
||||||
pub identity_hash: Option<i64>,
|
pub identity_hash: Option<i64>,
|
||||||
pub account_id: Option<i32>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
|
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
|
||||||
pub enum Relation {
|
pub enum Relation {}
|
||||||
#[sea_orm(
|
|
||||||
belongs_to = "super::account::Entity",
|
|
||||||
from = "Column::AccountId",
|
|
||||||
to = "super::account::Column::Id",
|
|
||||||
on_update = "NoAction",
|
|
||||||
on_delete = "Cascade"
|
|
||||||
)]
|
|
||||||
Account,
|
|
||||||
#[sea_orm(has_many = "super::expenditure::Entity")]
|
|
||||||
Expenditure,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Related<super::account::Entity> for Entity {
|
|
||||||
fn to() -> RelationDef {
|
|
||||||
Relation::Account.def()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Related<super::expenditure::Entity> for Entity {
|
|
||||||
fn to() -> RelationDef {
|
|
||||||
Relation::Expenditure.def()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ActiveModelBehavior for ActiveModel {}
|
impl ActiveModelBehavior for ActiveModel {}
|
||||||
|
|||||||
@ -12,7 +12,7 @@ path = "src/lib.rs"
|
|||||||
async-std = { version = "1", features = ["attributes", "tokio1"] }
|
async-std = { version = "1", features = ["attributes", "tokio1"] }
|
||||||
|
|
||||||
[dependencies.sea-orm-migration]
|
[dependencies.sea-orm-migration]
|
||||||
version = "1.1.0"
|
version = "1.0.0-rc.4"
|
||||||
features = [
|
features = [
|
||||||
"runtime-tokio-rustls", # `ASYNC_RUNTIME` feature
|
"runtime-tokio-rustls", # `ASYNC_RUNTIME` feature
|
||||||
"sqlx-postgres", # `DATABASE_DRIVER` feature
|
"sqlx-postgres", # `DATABASE_DRIVER` feature
|
||||||
|
|||||||
@ -3,9 +3,6 @@ pub use sea_orm_migration::prelude::*;
|
|||||||
pub mod m20230904_141851_create_monzo_tables;
|
pub mod m20230904_141851_create_monzo_tables;
|
||||||
mod m20240529_195030_add_transaction_identity_hash;
|
mod m20240529_195030_add_transaction_identity_hash;
|
||||||
mod m20240603_162500_make_title_optional;
|
mod m20240603_162500_make_title_optional;
|
||||||
mod m20241015_195220_add_account_to_transactions;
|
|
||||||
mod m20241015_200222_add_expenditure_transaction_fk;
|
|
||||||
mod m20241015_200652_add_accounts;
|
|
||||||
|
|
||||||
pub struct Migrator;
|
pub struct Migrator;
|
||||||
|
|
||||||
@ -20,9 +17,6 @@ impl MigratorTrait for Migrator {
|
|||||||
Box::new(m20230904_141851_create_monzo_tables::Migration),
|
Box::new(m20230904_141851_create_monzo_tables::Migration),
|
||||||
Box::new(m20240529_195030_add_transaction_identity_hash::Migration),
|
Box::new(m20240529_195030_add_transaction_identity_hash::Migration),
|
||||||
Box::new(m20240603_162500_make_title_optional::Migration),
|
Box::new(m20240603_162500_make_title_optional::Migration),
|
||||||
Box::new(m20241015_195220_add_account_to_transactions::Migration),
|
|
||||||
Box::new(m20241015_200222_add_expenditure_transaction_fk::Migration),
|
|
||||||
Box::new(m20241015_200652_add_accounts::Migration),
|
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,76 +0,0 @@
|
|||||||
use sea_orm_migration::{prelude::*, schema::*};
|
|
||||||
|
|
||||||
#[derive(DeriveMigrationName)]
|
|
||||||
pub struct Migration;
|
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
|
||||||
impl MigrationTrait for Migration {
|
|
||||||
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
|
||||||
manager
|
|
||||||
.create_table(
|
|
||||||
Table::create()
|
|
||||||
.table(Account::Table)
|
|
||||||
.if_not_exists()
|
|
||||||
.col(
|
|
||||||
ColumnDef::new(Account::Id)
|
|
||||||
.integer()
|
|
||||||
.auto_increment()
|
|
||||||
.not_null()
|
|
||||||
.primary_key(),
|
|
||||||
)
|
|
||||||
.col(
|
|
||||||
ColumnDef::new(Account::Name)
|
|
||||||
.string()
|
|
||||||
.not_null(),
|
|
||||||
)
|
|
||||||
.to_owned(),
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
manager.alter_table(
|
|
||||||
TableAlterStatement::new()
|
|
||||||
.table(Transaction::Table)
|
|
||||||
.add_column(ColumnDef::new(Transaction::AccountId).integer())
|
|
||||||
.to_owned(),
|
|
||||||
).await?;
|
|
||||||
|
|
||||||
manager
|
|
||||||
.create_foreign_key(
|
|
||||||
ForeignKey::create()
|
|
||||||
.name("fk_transaction_account_id")
|
|
||||||
.from(Transaction::Table, Transaction::AccountId)
|
|
||||||
.to(Account::Table, Account::Id)
|
|
||||||
.on_delete(ForeignKeyAction::Cascade)
|
|
||||||
.to_owned(),
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
|
||||||
manager.alter_table(
|
|
||||||
TableAlterStatement::new()
|
|
||||||
.table(Transaction::Table)
|
|
||||||
.drop_column(Transaction::AccountId)
|
|
||||||
.to_owned(),
|
|
||||||
).await?;
|
|
||||||
|
|
||||||
manager.drop_table(Table::drop().table(Account::Table).to_owned()).await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(DeriveIden)]
|
|
||||||
enum Account {
|
|
||||||
Table,
|
|
||||||
Id,
|
|
||||||
Name,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(DeriveIden)]
|
|
||||||
enum Transaction {
|
|
||||||
Table,
|
|
||||||
AccountId,
|
|
||||||
}
|
|
||||||
@ -1,40 +0,0 @@
|
|||||||
use sea_orm_migration::{prelude::*, schema::*};
|
|
||||||
|
|
||||||
#[derive(DeriveMigrationName)]
|
|
||||||
pub struct Migration;
|
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
|
||||||
impl MigrationTrait for Migration {
|
|
||||||
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
|
||||||
manager
|
|
||||||
.create_foreign_key(
|
|
||||||
ForeignKey::create()
|
|
||||||
.name("fk_expenditure_transaction_id")
|
|
||||||
.from(Expenditure::Table, Expenditure::TransactionId)
|
|
||||||
.to(Transaction::Table, Transaction::Id)
|
|
||||||
.on_delete(ForeignKeyAction::Cascade)
|
|
||||||
.to_owned(),
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
|
||||||
manager.drop_foreign_key(ForeignKey::drop().name("fk_expenditure_transaction_id").to_owned()).await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(DeriveIden)]
|
|
||||||
enum Transaction {
|
|
||||||
Table,
|
|
||||||
Id,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(DeriveIden)]
|
|
||||||
enum Expenditure {
|
|
||||||
Table,
|
|
||||||
TransactionId,
|
|
||||||
}
|
|
||||||
@ -1,53 +0,0 @@
|
|||||||
use sea_orm_migration::{prelude::*, schema::*};
|
|
||||||
use crate::sea_orm::sqlx;
|
|
||||||
|
|
||||||
#[derive(DeriveMigrationName)]
|
|
||||||
pub struct Migration;
|
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
|
||||||
impl MigrationTrait for Migration {
|
|
||||||
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
|
||||||
manager.exec_stmt(
|
|
||||||
InsertStatement::new()
|
|
||||||
.into_table(Account::Table)
|
|
||||||
.columns(vec![Account::Name])
|
|
||||||
.values_panic(["Monzo".into()])
|
|
||||||
.values_panic(["Flex".into()])
|
|
||||||
.to_owned()
|
|
||||||
).await?;
|
|
||||||
|
|
||||||
let id: i32 = manager.get_connection().query_one(
|
|
||||||
sea_orm::Statement::from_string(
|
|
||||||
manager.get_database_backend(),
|
|
||||||
"SELECT id FROM account WHERE name = 'Monzo'",
|
|
||||||
)
|
|
||||||
).await?.expect("Monzo account not found").try_get_by_index(0)?;
|
|
||||||
|
|
||||||
manager.exec_stmt(
|
|
||||||
UpdateStatement::new()
|
|
||||||
.table(Transaction::Table)
|
|
||||||
.values([(Transaction::AccountId, id.into())])
|
|
||||||
.to_owned()
|
|
||||||
).await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
|
||||||
// This is a data creation migration, so we do not roll back the migration
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(DeriveIden)]
|
|
||||||
enum Account {
|
|
||||||
Table,
|
|
||||||
Id,
|
|
||||||
Name,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(DeriveIden)]
|
|
||||||
enum Transaction {
|
|
||||||
Table,
|
|
||||||
AccountId,
|
|
||||||
}
|
|
||||||
@ -20,15 +20,14 @@ pub struct Insertion {
|
|||||||
pub async fn insert(
|
pub async fn insert(
|
||||||
db: &DatabaseConnection,
|
db: &DatabaseConnection,
|
||||||
monzo_rows: Vec<MonzoRow>,
|
monzo_rows: Vec<MonzoRow>,
|
||||||
account_id: i32,
|
|
||||||
) -> Result<Vec<String>, AppError> {
|
) -> Result<Vec<String>, AppError> {
|
||||||
let mut new_transaction_ids = Vec::new();
|
let mut new_transaction_ids = Vec::new();
|
||||||
let insertions = monzo_rows
|
let insertions = monzo_rows
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|row| MonzoRow::into_insertion(row, account_id))
|
.map(MonzoRow::into_insertion)
|
||||||
.collect::<Result<Vec<_>, _>>()?;
|
.collect::<Result<Vec<_>, _>>()?;
|
||||||
|
|
||||||
for insertions in insertions.chunks(200) {
|
for insertions in insertions.chunks(400) {
|
||||||
let (new_or_updated_insertions, inserted_transaction_ids) =
|
let (new_or_updated_insertions, inserted_transaction_ids) =
|
||||||
whittle_insertions(insertions, db).await?;
|
whittle_insertions(insertions, db).await?;
|
||||||
|
|
||||||
@ -257,14 +256,14 @@ mod tests {
|
|||||||
listener.listen("monzo_new_transactions").await?;
|
listener.listen("monzo_new_transactions").await?;
|
||||||
|
|
||||||
let json = include_str!("../../fixtures/transactions.json");
|
let json = include_str!("../../fixtures/transactions.json");
|
||||||
let json: Vec<Vec<Value>> = serde_json::from_str(json).unwrap();
|
let json: Vec<Vec<Value>> = serde_json::from_str(json)?;
|
||||||
let data = json
|
let data = json
|
||||||
.iter()
|
.iter()
|
||||||
.map(|row| from_json_row(row.clone()))
|
.map(|row| from_json_row(row.clone()))
|
||||||
.collect::<Result<Vec<_>, anyhow::Error>>()
|
.collect::<Result<Vec<_>, anyhow::Error>>()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
insert(&dbi.db, data.clone(), ).await?;
|
insert(&dbi.db, data.clone()).await?;
|
||||||
let notification = listener.recv().await?;
|
let notification = listener.recv().await?;
|
||||||
let payload = notification.payload();
|
let payload = notification.payload();
|
||||||
let mut payload = serde_json::from_str::<Vec<String>>(&payload)?;
|
let mut payload = serde_json::from_str::<Vec<String>>(&payload)?;
|
||||||
@ -278,7 +277,7 @@ mod tests {
|
|||||||
|
|
||||||
assert_eq!(payload, ids, "Inserted IDs do not match");
|
assert_eq!(payload, ids, "Inserted IDs do not match");
|
||||||
|
|
||||||
insert(&dbi.db, data.clone(), ).await?;
|
insert(&dbi.db, data.clone()).await?;
|
||||||
let notification = listener.recv().await?;
|
let notification = listener.recv().await?;
|
||||||
let payload = notification.payload();
|
let payload = notification.payload();
|
||||||
let payload = serde_json::from_str::<Vec<String>>(&payload)?;
|
let payload = serde_json::from_str::<Vec<String>>(&payload)?;
|
||||||
@ -289,7 +288,7 @@ mod tests {
|
|||||||
|
|
||||||
assert_ne!(altered_data[0].compute_hash(), data[0].compute_hash(), "Alterations have the same hash");
|
assert_ne!(altered_data[0].compute_hash(), data[0].compute_hash(), "Alterations have the same hash");
|
||||||
|
|
||||||
insert(&dbi.db, altered_data.clone(), ).await?;
|
insert(&dbi.db, altered_data.clone()).await?;
|
||||||
let notification = listener.recv().await?;
|
let notification = listener.recv().await?;
|
||||||
let payload = notification.payload();
|
let payload = notification.payload();
|
||||||
let payload = serde_json::from_str::<Vec<String>>(&payload)?;
|
let payload = serde_json::from_str::<Vec<String>>(&payload)?;
|
||||||
@ -298,15 +297,3 @@ mod tests {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn get_account_id(p0: &DatabaseConnection, p1: Option<String>) -> Result<i32, AppError> {
|
|
||||||
let p1 = p1.unwrap_or("Monzo".to_string());
|
|
||||||
|
|
||||||
entity::prelude::Account::find()
|
|
||||||
.filter(entity::account::Column::Name.eq(p1))
|
|
||||||
.select_only()
|
|
||||||
.column(entity::account::Column::Id)
|
|
||||||
.into_tuple::<i32>()
|
|
||||||
.one(p0).await?
|
|
||||||
.ok_or(AppError::BadRequest(anyhow!("Account not found")))
|
|
||||||
}
|
|
||||||
@ -1,9 +1,9 @@
|
|||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
pub mod headings {
|
mod headings {
|
||||||
#[allow(unused_imports)]
|
#[allow(unused_imports)]
|
||||||
pub use super::super::ingestion_logic::headings::*;
|
pub use super::super::ingestion_logic::headings::*;
|
||||||
|
|
||||||
// Additional Flex headings
|
// Additional FLex headings
|
||||||
pub const MONEY_OUT: usize = 16;
|
pub const MONEY_OUT: usize = 16;
|
||||||
pub const MONEY_IN: usize = 17;
|
pub const MONEY_IN: usize = 17;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -9,7 +9,6 @@ use sea_orm::prelude::Decimal;
|
|||||||
use sea_orm::IntoActiveModel;
|
use sea_orm::IntoActiveModel;
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
use std::hash::Hash;
|
use std::hash::Hash;
|
||||||
use crate::ingestion::flex;
|
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
pub(crate) mod headings {
|
pub(crate) mod headings {
|
||||||
@ -78,7 +77,7 @@ impl MonzoRow {
|
|||||||
hasher.finish() as i64
|
hasher.finish() as i64
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn into_insertion(self, account_id: i32) -> Result<Insertion, anyhow::Error> {
|
pub fn into_insertion(self) -> Result<Insertion, anyhow::Error> {
|
||||||
let identity_hash = self.compute_hash();
|
let identity_hash = self.compute_hash();
|
||||||
|
|
||||||
let expenditures: Vec<_> = match &self.category_split {
|
let expenditures: Vec<_> = match &self.category_split {
|
||||||
@ -107,7 +106,6 @@ impl MonzoRow {
|
|||||||
total_amount: self.total_amount,
|
total_amount: self.total_amount,
|
||||||
description: self.description,
|
description: self.description,
|
||||||
identity_hash: Some(identity_hash),
|
identity_hash: Some(identity_hash),
|
||||||
account_id: Some(account_id),
|
|
||||||
}
|
}
|
||||||
.into_active_model(),
|
.into_active_model(),
|
||||||
|
|
||||||
|
|||||||
@ -19,61 +19,24 @@ pub async fn monzo_batched_json(
|
|||||||
.map(from_json_row)
|
.map(from_json_row)
|
||||||
.collect::<Result<_, _>>()?;
|
.collect::<Result<_, _>>()?;
|
||||||
|
|
||||||
// We default to the main account for JSON ingestion for now.
|
db::insert(&db, data).await?;
|
||||||
let account_id = db::get_account_id(&db, None).await?;
|
|
||||||
db::insert(&db, data, account_id).await?;
|
|
||||||
|
|
||||||
Ok("Ok")
|
Ok("Ok")
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn extract_csv_and_account_name(mut multipart: Multipart) -> Result<(Option<Bytes>, Option<String>), MultipartError> {
|
async fn extract_csv(mut multipart: Multipart) -> Result<Option<Bytes>, MultipartError> {
|
||||||
let mut csv = None;
|
let csv = loop {
|
||||||
let mut account_name = None;
|
match multipart.next_field().await? {
|
||||||
|
Some(field) if field.name() == Some("csv") => {
|
||||||
while let Some(field) = multipart.next_field().await? {
|
break Some(field.bytes().await?);
|
||||||
match field.name() {
|
|
||||||
Some("csv") => {
|
|
||||||
csv = Some(field.bytes().await?);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Some("account_id") => {
|
Some(_) => {}
|
||||||
account_name = Some(field.text().await?);
|
None => break None,
|
||||||
}
|
|
||||||
|
|
||||||
_ => {}
|
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
|
||||||
if csv.is_some() && account_name.is_some() {
|
Ok(csv)
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok((csv, account_name))
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(serde::Deserialize, Debug, Clone)]
|
|
||||||
pub struct ShortcutBody {
|
|
||||||
pub body: String,
|
|
||||||
pub account_name: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn shortcuts_csv(
|
|
||||||
Extension(db): Extension<DatabaseConnection>,
|
|
||||||
Json(shortcut_body): Json<ShortcutBody>
|
|
||||||
) -> Result<&'static str, AppError> {
|
|
||||||
let account_id = db::get_account_id(&db, Some(shortcut_body.account_name)).await?;
|
|
||||||
|
|
||||||
let csv = Cursor::new(shortcut_body.body.as_bytes());
|
|
||||||
let mut csv = csv::Reader::from_reader(csv);
|
|
||||||
let data = csv.records();
|
|
||||||
let data = data
|
|
||||||
.filter_map(|f| f.ok())
|
|
||||||
.map(from_csv_row)
|
|
||||||
.collect::<Result<_, _>>()?;
|
|
||||||
|
|
||||||
db::insert(&db, data, account_id).await?;
|
|
||||||
|
|
||||||
Ok("Ok")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn monzo_batched_csv(
|
pub async fn monzo_batched_csv(
|
||||||
@ -82,15 +45,10 @@ pub async fn monzo_batched_csv(
|
|||||||
) -> Result<&'static str, AppError> {
|
) -> Result<&'static str, AppError> {
|
||||||
static CSV_MISSING_ERR_MSG: &str = "No CSV file provided. Expected a multipart request with a `csv` field containing the contents of the CSV.";
|
static CSV_MISSING_ERR_MSG: &str = "No CSV file provided. Expected a multipart request with a `csv` field containing the contents of the CSV.";
|
||||||
|
|
||||||
let (csv, account_name) = extract_csv_and_account_name(multipart)
|
let csv = extract_csv(multipart)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| AppError::BadRequest(anyhow!(e)))?;
|
.map_err(|e| AppError::BadRequest(anyhow!(e)))
|
||||||
|
.and_then(|csv| csv.ok_or(AppError::BadRequest(anyhow!(CSV_MISSING_ERR_MSG))))?;
|
||||||
let Some(csv) = csv else {
|
|
||||||
return Err(AppError::BadRequest(anyhow!(CSV_MISSING_ERR_MSG)));
|
|
||||||
};
|
|
||||||
|
|
||||||
let account_id = db::get_account_id(&db, account_name).await?;
|
|
||||||
|
|
||||||
let csv = Cursor::new(csv);
|
let csv = Cursor::new(csv);
|
||||||
let mut csv = csv::Reader::from_reader(csv);
|
let mut csv = csv::Reader::from_reader(csv);
|
||||||
@ -100,7 +58,7 @@ pub async fn monzo_batched_csv(
|
|||||||
.map(from_csv_row)
|
.map(from_csv_row)
|
||||||
.collect::<Result<_, _>>()?;
|
.collect::<Result<_, _>>()?;
|
||||||
|
|
||||||
db::insert(&db, data, account_id).await?;
|
db::insert(&db, data).await?;
|
||||||
|
|
||||||
Ok("Ok")
|
Ok("Ok")
|
||||||
}
|
}
|
||||||
|
|||||||
24
src/main.rs
24
src/main.rs
@ -4,7 +4,7 @@ mod ingestion;
|
|||||||
use crate::error::AppError;
|
use crate::error::AppError;
|
||||||
use crate::ingestion::db;
|
use crate::ingestion::db;
|
||||||
use crate::ingestion::ingestion_logic::from_csv_row;
|
use crate::ingestion::ingestion_logic::from_csv_row;
|
||||||
use crate::ingestion::routes::{monzo_batched_csv, monzo_batched_json, shortcuts_csv};
|
use crate::ingestion::routes::{monzo_batched_csv, monzo_batched_json};
|
||||||
use axum::routing::{get, post};
|
use axum::routing::{get, post};
|
||||||
use axum::{Extension, Router};
|
use axum::{Extension, Router};
|
||||||
use clap::{Parser, Subcommand};
|
use clap::{Parser, Subcommand};
|
||||||
@ -16,6 +16,16 @@ use std::path::PathBuf;
|
|||||||
use tower_http::trace::TraceLayer;
|
use tower_http::trace::TraceLayer;
|
||||||
use tracing::log::LevelFilter;
|
use tracing::log::LevelFilter;
|
||||||
|
|
||||||
|
use opentelemetry::trace::TracerProvider as _;
|
||||||
|
use opentelemetry_sdk::trace::TracerProvider;
|
||||||
|
use opentelemetry_otlp;
|
||||||
|
use tracing::{error, span};
|
||||||
|
use tracing_subscriber::layer::SubscriberExt;
|
||||||
|
use tracing_subscriber::Registry;
|
||||||
|
use tracin_support::init_tracing_subscriber;
|
||||||
|
|
||||||
|
mod tracin_support;
|
||||||
|
|
||||||
#[derive(Debug, Subcommand)]
|
#[derive(Debug, Subcommand)]
|
||||||
enum Commands {
|
enum Commands {
|
||||||
/// Manually run database migrations.
|
/// Manually run database migrations.
|
||||||
@ -44,10 +54,6 @@ enum Commands {
|
|||||||
Csv {
|
Csv {
|
||||||
/// The path of the CSV file to ingest.
|
/// The path of the CSV file to ingest.
|
||||||
csv_file: PathBuf,
|
csv_file: PathBuf,
|
||||||
|
|
||||||
/// The name of the account to ingest the CSV for.
|
|
||||||
#[clap(long, short)]
|
|
||||||
account: String,
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -72,7 +78,7 @@ async fn health_check(
|
|||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> anyhow::Result<()> {
|
async fn main() -> anyhow::Result<()> {
|
||||||
tracing_subscriber::fmt::init();
|
let _guard = init_tracing_subscriber();
|
||||||
|
|
||||||
let cli: Cli = Cli::parse();
|
let cli: Cli = Cli::parse();
|
||||||
let connection = sea_orm::ConnectOptions::new(&cli.database_url)
|
let connection = sea_orm::ConnectOptions::new(&cli.database_url)
|
||||||
@ -98,7 +104,7 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
serve_web(addr, connection).await?;
|
serve_web(addr, connection).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Commands::Csv { csv_file, account: account_name } => {
|
Commands::Csv { csv_file } => {
|
||||||
let mut csv = csv::Reader::from_reader(File::open(csv_file)?);
|
let mut csv = csv::Reader::from_reader(File::open(csv_file)?);
|
||||||
let data = csv.records();
|
let data = csv.records();
|
||||||
let data = data
|
let data = data
|
||||||
@ -106,8 +112,7 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
.map(from_csv_row)
|
.map(from_csv_row)
|
||||||
.collect::<Result<_, _>>()?;
|
.collect::<Result<_, _>>()?;
|
||||||
|
|
||||||
let account_id = db::get_account_id(&connection, Some(account_name)).await?;
|
db::insert(&connection, data).await?;
|
||||||
db::insert(&connection, data, account_id).await?;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -119,7 +124,6 @@ async fn serve_web(address: SocketAddr, connection: DatabaseConnection) -> anyho
|
|||||||
.route("/health", get(health_check))
|
.route("/health", get(health_check))
|
||||||
.route("/monzo-batch-export", post(monzo_batched_json))
|
.route("/monzo-batch-export", post(monzo_batched_json))
|
||||||
.route("/monzo-csv-ingestion", post(monzo_batched_csv))
|
.route("/monzo-csv-ingestion", post(monzo_batched_csv))
|
||||||
.route("/shortcuts-csv-import", post(shortcuts_csv))
|
|
||||||
.layer(Extension(connection.clone()))
|
.layer(Extension(connection.clone()))
|
||||||
.layer(TraceLayer::new_for_http());
|
.layer(TraceLayer::new_for_http());
|
||||||
|
|
||||||
|
|||||||
108
src/tracin_support.rs
Normal file
108
src/tracin_support.rs
Normal file
@ -0,0 +1,108 @@
|
|||||||
|
use opentelemetry::{global, trace::TracerProvider, Key, KeyValue};
|
||||||
|
use opentelemetry_sdk::{
|
||||||
|
metrics::{
|
||||||
|
reader::{DefaultAggregationSelector, DefaultTemporalitySelector},
|
||||||
|
Aggregation, Instrument, MeterProviderBuilder, PeriodicReader, SdkMeterProvider, Stream,
|
||||||
|
},
|
||||||
|
runtime,
|
||||||
|
trace::{BatchConfig, RandomIdGenerator, Sampler, Tracer},
|
||||||
|
Resource,
|
||||||
|
};
|
||||||
|
use opentelemetry_semantic_conventions::{
|
||||||
|
resource::{DEPLOYMENT_ENVIRONMENT, SERVICE_NAME, SERVICE_VERSION},
|
||||||
|
SCHEMA_URL,
|
||||||
|
};
|
||||||
|
use tracing::Level;
|
||||||
|
use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer};
|
||||||
|
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
||||||
|
|
||||||
|
// Create a Resource that captures information about the entity for which telemetry is recorded.
|
||||||
|
fn resource() -> Resource {
|
||||||
|
Resource::from_schema_url(
|
||||||
|
[
|
||||||
|
KeyValue::new(SERVICE_NAME, env!("CARGO_PKG_NAME")),
|
||||||
|
KeyValue::new(SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
|
||||||
|
KeyValue::new(DEPLOYMENT_ENVIRONMENT, "develop"),
|
||||||
|
],
|
||||||
|
SCHEMA_URL,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Construct MeterProvider for MetricsLayer
|
||||||
|
fn init_meter_provider() -> SdkMeterProvider {
|
||||||
|
let exporter = opentelemetry_otlp::new_exporter()
|
||||||
|
.http()
|
||||||
|
.with_http_client(reqwest::Client::new())
|
||||||
|
.build_metrics_exporter(
|
||||||
|
Box::new(DefaultAggregationSelector::new()),
|
||||||
|
Box::new(DefaultTemporalitySelector::new()),
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let reader = PeriodicReader::builder(exporter, runtime::Tokio)
|
||||||
|
.with_interval(std::time::Duration::from_secs(30))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
let meter_provider = MeterProviderBuilder::default()
|
||||||
|
.with_resource(resource())
|
||||||
|
.with_reader(reader)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
global::set_meter_provider(meter_provider.clone());
|
||||||
|
|
||||||
|
meter_provider
|
||||||
|
}
|
||||||
|
|
||||||
|
// Construct Tracer for OpenTelemetryLayer
|
||||||
|
fn init_tracer() -> Tracer {
|
||||||
|
let provider = opentelemetry_otlp::new_pipeline()
|
||||||
|
.tracing()
|
||||||
|
.with_trace_config(
|
||||||
|
opentelemetry_sdk::trace::Config::default()
|
||||||
|
// Customize sampling strategy
|
||||||
|
.with_sampler(Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(
|
||||||
|
1.0,
|
||||||
|
))))
|
||||||
|
// If export trace to AWS X-Ray, you can use XrayIdGenerator
|
||||||
|
.with_id_generator(RandomIdGenerator::default())
|
||||||
|
.with_resource(resource()),
|
||||||
|
)
|
||||||
|
.with_batch_config(BatchConfig::default())
|
||||||
|
.with_exporter(opentelemetry_otlp::new_exporter().tonic())
|
||||||
|
.install_batch(runtime::Tokio)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
global::set_tracer_provider(provider.clone());
|
||||||
|
provider.tracer("tracing-otel-subscriber")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initialize tracing-subscriber and return OtelGuard for opentelemetry-related termination processing
|
||||||
|
pub(crate) fn init_tracing_subscriber() -> OtelGuard {
|
||||||
|
let meter_provider = init_meter_provider();
|
||||||
|
let tracer = init_tracer();
|
||||||
|
|
||||||
|
tracing_subscriber::registry()
|
||||||
|
.with(tracing_subscriber::filter::LevelFilter::from_level(
|
||||||
|
Level::DEBUG,
|
||||||
|
))
|
||||||
|
.with(tracing_subscriber::fmt::layer())
|
||||||
|
.with(MetricsLayer::new(meter_provider.clone()))
|
||||||
|
.with(OpenTelemetryLayer::new(tracer))
|
||||||
|
.init();
|
||||||
|
|
||||||
|
OtelGuard { meter_provider }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) struct OtelGuard {
|
||||||
|
meter_provider: SdkMeterProvider,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for OtelGuard {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
if let Err(err) = self.meter_provider.shutdown() {
|
||||||
|
eprintln!("{err:?}");
|
||||||
|
}
|
||||||
|
|
||||||
|
global::shutdown_tracer_provider();
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue
Block a user