Compare commits

..

1 Commits

Author SHA1 Message Date
e2ffd900b4 Stash attempt at tracing exporting 2024-09-08 15:19:43 +01:00
23 changed files with 1699 additions and 1891 deletions

View File

@ -28,7 +28,6 @@
**/values.dev.yaml
/bin
/target
!/target/**/monzo-ingestion
/.idea
LICENSE
README.md

2
.env
View File

@ -1 +1 @@
DATABASE_URL=postgres://postgres@localhost/monzo_development
DATABASE_URL=postgres://postgres@localhost/logos_neu

View File

@ -78,9 +78,9 @@ jobs:
target/release/${{ env.RUST_BINARY_NAME }}
target/aarch64-unknown-linux-musl/release/${{ env.RUST_BINARY_NAME }}
# # The target directory is kept in the .dockerignore file, so allow these to be copied in we need to move them to a
# # new directory.
# - run: mv target docker-binaries
# The target directory is kept in the .dockerignore file, so allow these to be copied in we need to move them to a
# new directory.
- run: mv target docker-binaries
- name: Build and push multi-arch Docker image
uses: docker/build-push-action@v4

2908
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -7,9 +7,9 @@ edition = "2021"
entity = { path = "entity" }
migration = { path = "migration" }
axum = { version = "0.8.8", features = ["multipart"] }
tokio = { version = "1.48.0", features = ["full"] }
sea-orm = { version = "1.1.0", features = [
axum = { version = "0.7.5", features = ["multipart"] }
tokio = { version = "1.37.0", features = ["full"] }
sea-orm = { version = "1.0.0", features = [
"sqlx-postgres",
"runtime-tokio-rustls",
"macros"
@ -20,18 +20,26 @@ serde_json = "1.0"
tracing-subscriber = "0.3.18"
tracing = "0.1.40"
anyhow = { version = "1.0", features = ["backtrace"] }
thiserror = "2.0"
thiserror = "1.0"
http = "1.1"
chrono = { version = "0.4", features = ["serde"] }
num-traits = "0.2"
csv = "1.3.0"
clap = "4.5"
testcontainers = "0.26"
testcontainers-modules = { version = "0.14", features = ["postgres"] }
sqlx = { version = "0.8", features = ["postgres"] }
tower-http = { version = "0.6", features = ["trace"] }
testcontainers = "0.21"
testcontainers-modules = { version = "0.9", features = ["postgres"] }
sqlx = { version = "0.7", features = ["postgres"] }
tower-http = { version = "0.5", features = ["trace"] }
bytes = "1.7"
once_cell = "1.19"
tracing-opentelemetry = "0.25.0"
opentelemetry = "0.24.0"
opentelemetry_sdk = { version = "0.24.1", features = ["rt-tokio"] }
opentelemetry-http = { version = "0.13.0", features = ["reqwest"] }
opentelemetry-otlp = { version = "0.17.0", features = ["grpc-tonic", "http-json", "tokio"] }
opentelemetry-semantic-conventions = "0.16.0"
reqwest = "0.12.7"
[workspace]
members = [".", "migration", "entity"]

View File

@ -1,14 +1,16 @@
FROM busybox AS platform_determiner
FROM --platform=$BUILDPLATFORM debian:bullseye-slim AS builder
ARG TARGETPLATFORM
ARG BINARY_NAME
COPY /target /target
WORKDIR /app
COPY . .
RUN case "$TARGETPLATFORM" in \
"linux/amd64") BINARY_PATH="/target/release/${BINARY_NAME}" ;; \
"linux/arm64") BINARY_PATH="/target/aarch64-unknown-linux-musl/release/${BINARY_NAME}" ;; \
"linux/amd64") BINARY_PATH="target/release/${BINARY_NAME}" ;; \
"linux/arm64") BINARY_PATH="target/aarch64-unknown-linux-gnu/release/${BINARY_NAME}" ;; \
*) exit 1 ;; \
esac && mv "$BINARY_PATH" "/usr/bin/monzo-ingestion" && chmod +x "/usr/bin/monzo-ingestion"
esac && \
mv "$BINARY_PATH" /usr/local/bin/${BINARY_NAME}
FROM --platform=$TARGETPLATFORM debian:bookworm-slim
COPY --from=platform_determiner /usr/bin/monzo-ingestion /usr/local/bin/monzo-ingestion
ENTRYPOINT ["/usr/local/bin/monzo-ingestion"]
FROM --platform=$TARGETPLATFORM debian:bullseye-slim
ARG BINARY_NAME
COPY --from=builder /usr/local/bin/${BINARY_NAME} /usr/local/bin/
CMD ["${BINARY_NAME}"]

View File

@ -9,5 +9,5 @@ name = "entity"
path = "src/lib.rs"
[dependencies]
sea-orm = { version = "1.1.0" }
sea-orm = { version = "1.0.0-rc.4" }
serde = { version = "1.0.203", features = ["derive"] }

View File

@ -1,26 +0,0 @@
//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.0
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "account")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i32,
pub name: String,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(has_many = "super::transaction::Entity")]
Transaction,
}
impl Related<super::transaction::Entity> for Entity {
fn to() -> RelationDef {
Relation::Transaction.def()
}
}
impl ActiveModelBehavior for ActiveModel {}

View File

@ -1,4 +1,4 @@
//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.0
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
@ -14,21 +14,6 @@ pub struct Model {
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(
belongs_to = "super::transaction::Entity",
from = "Column::TransactionId",
to = "super::transaction::Column::Id",
on_update = "NoAction",
on_delete = "Cascade"
)]
Transaction,
}
impl Related<super::transaction::Entity> for Entity {
fn to() -> RelationDef {
Relation::Transaction.def()
}
}
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}

View File

@ -1,7 +1,6 @@
//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.0
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2
pub mod prelude;
pub mod account;
pub mod expenditure;
pub mod transaction;

View File

@ -1,5 +1,4 @@
//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.0
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2
pub use super::account::Entity as Account;
pub use super::expenditure::Entity as Expenditure;
pub use super::transaction::Entity as Transaction;

View File

@ -1,4 +1,4 @@
//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.0
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
@ -18,33 +18,9 @@ pub struct Model {
pub description: Option<String>,
#[sea_orm(unique)]
pub identity_hash: Option<i64>,
pub account_id: Option<i32>,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(
belongs_to = "super::account::Entity",
from = "Column::AccountId",
to = "super::account::Column::Id",
on_update = "NoAction",
on_delete = "Cascade"
)]
Account,
#[sea_orm(has_many = "super::expenditure::Entity")]
Expenditure,
}
impl Related<super::account::Entity> for Entity {
fn to() -> RelationDef {
Relation::Account.def()
}
}
impl Related<super::expenditure::Entity> for Entity {
fn to() -> RelationDef {
Relation::Expenditure.def()
}
}
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}

View File

@ -12,7 +12,7 @@ path = "src/lib.rs"
async-std = { version = "1", features = ["attributes", "tokio1"] }
[dependencies.sea-orm-migration]
version = "1.1.0"
version = "1.0.0-rc.4"
features = [
"runtime-tokio-rustls", # `ASYNC_RUNTIME` feature
"sqlx-postgres", # `DATABASE_DRIVER` feature

View File

@ -3,9 +3,6 @@ pub use sea_orm_migration::prelude::*;
pub mod m20230904_141851_create_monzo_tables;
mod m20240529_195030_add_transaction_identity_hash;
mod m20240603_162500_make_title_optional;
mod m20241015_195220_add_account_to_transactions;
mod m20241015_200222_add_expenditure_transaction_fk;
mod m20241015_200652_add_accounts;
pub struct Migrator;
@ -20,9 +17,6 @@ impl MigratorTrait for Migrator {
Box::new(m20230904_141851_create_monzo_tables::Migration),
Box::new(m20240529_195030_add_transaction_identity_hash::Migration),
Box::new(m20240603_162500_make_title_optional::Migration),
Box::new(m20241015_195220_add_account_to_transactions::Migration),
Box::new(m20241015_200222_add_expenditure_transaction_fk::Migration),
Box::new(m20241015_200652_add_accounts::Migration),
]
}
}

View File

@ -1,76 +0,0 @@
use sea_orm_migration::{prelude::*, schema::*};
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_table(
Table::create()
.table(Account::Table)
.if_not_exists()
.col(
ColumnDef::new(Account::Id)
.integer()
.auto_increment()
.not_null()
.primary_key(),
)
.col(
ColumnDef::new(Account::Name)
.string()
.not_null(),
)
.to_owned(),
)
.await?;
manager.alter_table(
TableAlterStatement::new()
.table(Transaction::Table)
.add_column(ColumnDef::new(Transaction::AccountId).integer())
.to_owned(),
).await?;
manager
.create_foreign_key(
ForeignKey::create()
.name("fk_transaction_account_id")
.from(Transaction::Table, Transaction::AccountId)
.to(Account::Table, Account::Id)
.on_delete(ForeignKeyAction::Cascade)
.to_owned(),
)
.await?;
Ok(())
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager.alter_table(
TableAlterStatement::new()
.table(Transaction::Table)
.drop_column(Transaction::AccountId)
.to_owned(),
).await?;
manager.drop_table(Table::drop().table(Account::Table).to_owned()).await?;
Ok(())
}
}
#[derive(DeriveIden)]
enum Account {
Table,
Id,
Name,
}
#[derive(DeriveIden)]
enum Transaction {
Table,
AccountId,
}

View File

@ -1,40 +0,0 @@
use sea_orm_migration::{prelude::*, schema::*};
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_foreign_key(
ForeignKey::create()
.name("fk_expenditure_transaction_id")
.from(Expenditure::Table, Expenditure::TransactionId)
.to(Transaction::Table, Transaction::Id)
.on_delete(ForeignKeyAction::Cascade)
.to_owned(),
)
.await?;
Ok(())
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager.drop_foreign_key(ForeignKey::drop().name("fk_expenditure_transaction_id").to_owned()).await?;
Ok(())
}
}
#[derive(DeriveIden)]
enum Transaction {
Table,
Id,
}
#[derive(DeriveIden)]
enum Expenditure {
Table,
TransactionId,
}

View File

@ -1,53 +0,0 @@
use sea_orm_migration::{prelude::*, schema::*};
use crate::sea_orm::sqlx;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager.exec_stmt(
InsertStatement::new()
.into_table(Account::Table)
.columns(vec![Account::Name])
.values_panic(["Monzo".into()])
.values_panic(["Flex".into()])
.to_owned()
).await?;
let id: i32 = manager.get_connection().query_one(
sea_orm::Statement::from_string(
manager.get_database_backend(),
"SELECT id FROM account WHERE name = 'Monzo'",
)
).await?.expect("Monzo account not found").try_get_by_index(0)?;
manager.exec_stmt(
UpdateStatement::new()
.table(Transaction::Table)
.values([(Transaction::AccountId, id.into())])
.to_owned()
).await?;
Ok(())
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
// This is a data creation migration, so we do not roll back the migration
Ok(())
}
}
#[derive(DeriveIden)]
enum Account {
Table,
Id,
Name,
}
#[derive(DeriveIden)]
enum Transaction {
Table,
AccountId,
}

View File

@ -20,15 +20,14 @@ pub struct Insertion {
pub async fn insert(
db: &DatabaseConnection,
monzo_rows: Vec<MonzoRow>,
account_id: i32,
) -> Result<Vec<String>, AppError> {
let mut new_transaction_ids = Vec::new();
let insertions = monzo_rows
.into_iter()
.map(|row| MonzoRow::into_insertion(row, account_id))
.map(MonzoRow::into_insertion)
.collect::<Result<Vec<_>, _>>()?;
for insertions in insertions.chunks(200) {
for insertions in insertions.chunks(400) {
let (new_or_updated_insertions, inserted_transaction_ids) =
whittle_insertions(insertions, db).await?;
@ -74,16 +73,16 @@ async fn update_expenditures(
.flat_map(|i| &i.contained_expenditures)
.cloned(),
)
.on_conflict(
OnConflict::columns(vec![
expenditure::Column::TransactionId,
expenditure::Column::Category,
])
.update_columns(expenditure::Column::iter())
.to_owned(),
)
.exec(tx)
.await?;
.on_conflict(
OnConflict::columns(vec![
expenditure::Column::TransactionId,
expenditure::Column::Category,
])
.update_columns(expenditure::Column::iter())
.to_owned(),
)
.exec(tx)
.await?;
Ok(())
}
@ -119,7 +118,7 @@ async fn whittle_insertions<'a>(
.select_only()
.columns([transaction::Column::IdentityHash])
.filter(transaction::Column::IdentityHash.is_not_null())
.into_tuple::<(i64,)>()
.into_tuple::<(i64, )>()
.all(tx)
.await?;
@ -133,7 +132,7 @@ async fn whittle_insertions<'a>(
let hash = i.identity_hash;
!existing_hashes
.iter()
.any(|(existing_hash,)| *existing_hash == hash)
.any(|(existing_hash, )| *existing_hash == hash)
})
.collect::<Vec<_>>();
@ -159,17 +158,16 @@ async fn notify_new_transactions(
mod tests {
use super::{insert, notify_new_transactions, update_expenditures, update_transactions};
use crate::ingestion::ingestion_logic::from_json_row;
use anyhow::Error;
use entity::account;
use tokio::sync::OnceCell;
use migration::MigratorTrait;
use sea_orm::{ActiveModelTrait, DatabaseConnection, TransactionTrait};
use sea_orm::{DatabaseConnection, TransactionTrait};
use serde_json::Value;
use sqlx::postgres::PgListener;
use sqlx::{Executor, PgPool};
use sqlx::PgPool;
use testcontainers::runners::AsyncRunner;
use testcontainers::ContainerAsync;
use tokio::sync::OnceCell;
use crate::ingestion::ingestion_logic::from_json_row;
#[derive(Debug)]
struct DatabaseInstance {
@ -204,19 +202,13 @@ mod tests {
Ok(instance)
}
async fn get_or_initialize_db_instance() -> Result<&'static DatabaseInstance, Error> {
Ok(INSTANCE
.get_or_init(|| async { initialise_db().await.unwrap() })
.await)
}
async fn create_test_account(db: &DatabaseConnection) -> Result<i32, Error> {
let new_account = account::ActiveModel {
id: sea_orm::ActiveValue::NotSet,
name: sea_orm::ActiveValue::Set("Test Account".to_string()),
};
let inserted = new_account.insert(db).await?;
Ok(inserted.id)
async fn get_or_initialize_db_instance() -> Result<
&'static DatabaseInstance,
Error,
> {
Ok(INSTANCE.get_or_init(|| async {
initialise_db().await.unwrap()
}).await)
}
#[tokio::test]
@ -260,25 +252,24 @@ mod tests {
#[tokio::test]
async fn test_notify_on_insert() -> Result<(), Error> {
let dbi = get_or_initialize_db_instance().await?;
let account_id = create_test_account(&dbi.db).await?;
let mut listener = PgListener::connect_with(&dbi.pool).await?;
listener.listen("monzo_new_transactions").await?;
let json = include_str!("../../fixtures/transactions.json");
let json: Vec<Vec<Value>> = serde_json::from_str(json).unwrap();
let json: Vec<Vec<Value>> = serde_json::from_str(json)?;
let data = json
.iter()
.map(|row| from_json_row(row))
.collect::<Result<Vec<_>, anyhow::Error>>()?;
.map(|row| from_json_row(row.clone()))
.collect::<Result<Vec<_>, anyhow::Error>>()
.unwrap();
insert(&dbi.db, data.clone(), account_id).await?;
insert(&dbi.db, data.clone()).await?;
let notification = listener.recv().await?;
let payload = notification.payload();
let mut payload = serde_json::from_str::<Vec<String>>(&payload)?;
payload.sort();
let mut ids = data
.iter()
let mut ids = data.iter()
.map(|row| row.transaction_id.clone())
.collect::<Vec<_>>();
@ -286,51 +277,23 @@ mod tests {
assert_eq!(payload, ids, "Inserted IDs do not match");
insert(&dbi.db, data.clone(), account_id).await?;
insert(&dbi.db, data.clone()).await?;
let notification = listener.recv().await?;
let payload = notification.payload();
let payload = serde_json::from_str::<Vec<String>>(&payload)?;
assert_eq!(
payload,
Vec::<String>::new(),
"Re-inserting identical rows triggered double notification"
);
assert_eq!(payload, Vec::<String>::new(), "Re-inserting identical rows triggered double notification");
let mut altered_data = data.clone();
altered_data[0].description = Some("New description".to_string());
assert_ne!(
altered_data[0].compute_hash(),
data[0].compute_hash(),
"Alterations have the same hash"
);
assert_ne!(altered_data[0].compute_hash(), data[0].compute_hash(), "Alterations have the same hash");
insert(&dbi.db, altered_data.clone(), account_id).await?;
insert(&dbi.db, altered_data.clone()).await?;
let notification = listener.recv().await?;
let payload = notification.payload();
let payload = serde_json::from_str::<Vec<String>>(&payload)?;
assert_eq!(
payload,
vec![altered_data[0].transaction_id.clone()],
"Re-inserting altered row failed to re-trigger notification"
);
assert_eq!(payload, vec![altered_data[0].transaction_id.clone()], "Re-inserting altered row failed to re-trigger notification");
Ok(())
}
}
pub(crate) async fn get_account_id(
p0: &DatabaseConnection,
p1: Option<String>,
) -> Result<i32, AppError> {
let p1 = p1.unwrap_or("Monzo".to_string());
entity::prelude::Account::find()
.filter(entity::account::Column::Name.eq(p1))
.select_only()
.column(entity::account::Column::Id)
.into_tuple::<i32>()
.one(p0)
.await?
.ok_or(AppError::BadRequest(anyhow!("Account not found")))
}

View File

@ -1,9 +1,9 @@
#[allow(dead_code)]
pub mod headings {
mod headings {
#[allow(unused_imports)]
pub use super::super::ingestion_logic::headings::*;
// Additional Flex headings
// Additional FLex headings
pub const MONEY_OUT: usize = 16;
pub const MONEY_IN: usize = 17;
}

View File

@ -9,7 +9,6 @@ use sea_orm::prelude::Decimal;
use sea_orm::IntoActiveModel;
use serde_json::Value;
use std::hash::Hash;
use crate::ingestion::flex;
#[allow(dead_code)]
pub(crate) mod headings {
@ -78,7 +77,7 @@ impl MonzoRow {
hasher.finish() as i64
}
pub fn into_insertion(self, account_id: i32) -> Result<Insertion, anyhow::Error> {
pub fn into_insertion(self) -> Result<Insertion, anyhow::Error> {
let identity_hash = self.compute_hash();
let expenditures: Vec<_> = match &self.category_split {
@ -107,7 +106,6 @@ impl MonzoRow {
total_amount: self.total_amount,
description: self.description,
identity_hash: Some(identity_hash),
account_id: Some(account_id),
}
.into_active_model(),
@ -140,7 +138,7 @@ fn parse_timestamp(date: &str, time: &str) -> anyhow::Result<NaiveDateTime> {
Ok(date.and_time(time))
}
pub fn from_json_row(row: &[Value]) -> anyhow::Result<MonzoRow> {
pub fn from_json_row(row: Vec<Value>) -> anyhow::Result<MonzoRow> {
let date = DateTime::parse_from_rfc3339(row[headings::DATE].as_str().context("No date")?)
.context("Failed to parse date")?;
@ -178,7 +176,7 @@ fn test_json() {
let json_rows = json
.iter()
.map(|row| from_json_row(&row))
.map(|row| from_json_row(row.clone()))
.collect::<Result<Vec<_>, anyhow::Error>>()
.unwrap();

View File

@ -9,116 +9,46 @@ use sea_orm::DatabaseConnection;
use serde_json::Value;
use std::io::Cursor;
#[derive(serde::Deserialize, Debug)]
#[serde(untagged)]
pub enum MonzoBatchedJsonInput {
Legacy(Vec<Vec<Value>>),
New {
account_id: Option<u8>,
rows: Vec<Vec<Value>>,
},
}
impl MonzoBatchedJsonInput {
fn account_id(&self) -> Option<u8> {
match self {
MonzoBatchedJsonInput::Legacy(_) => None,
MonzoBatchedJsonInput::New { account_id, .. } => *account_id,
}
}
fn rows(&self) -> &[Vec<Value>] {
match self {
MonzoBatchedJsonInput::Legacy(rows) => rows,
MonzoBatchedJsonInput::New { rows, .. } => rows,
}
}
}
pub async fn monzo_batched_json(
Extension(db): Extension<DatabaseConnection>,
Json(data): Json<MonzoBatchedJsonInput>,
Json(data): Json<Vec<Vec<Value>>>,
) -> Result<&'static str, AppError> {
let rows = data
.rows()
.iter()
.skip_while(|row| row[0] == Value::String("Transaction ID".to_string()))
.map(|row| from_json_row(row.as_ref()))
.collect::<Result<_, _>>()?;
// We default to the main account for JSON ingestion for now.
db::insert(&db, rows, data.account_id().unwrap_or(1) as i32).await?;
Ok("Ok")
}
async fn extract_csv_and_account_name(
mut multipart: Multipart,
) -> Result<(Option<Bytes>, Option<String>), MultipartError> {
let mut csv = None;
let mut account_name = None;
while let Some(field) = multipart.next_field().await? {
match field.name() {
Some("csv") => {
csv = Some(field.bytes().await?);
}
Some("account_id") => {
account_name = Some(field.text().await?);
}
_ => {}
}
if csv.is_some() && account_name.is_some() {
break;
}
}
Ok((csv, account_name))
}
#[derive(serde::Deserialize, Debug, Clone)]
pub struct ShortcutBody {
pub body: String,
pub account_name: String,
}
pub async fn shortcuts_csv(
Extension(db): Extension<DatabaseConnection>,
Json(shortcut_body): Json<ShortcutBody>,
) -> Result<&'static str, AppError> {
let account_id = db::get_account_id(&db, Some(shortcut_body.account_name)).await?;
let csv = Cursor::new(shortcut_body.body.as_bytes());
let mut csv = csv::Reader::from_reader(csv);
let data = csv.records();
let data = data
.filter_map(|f| f.ok())
.map(from_csv_row)
.into_iter()
.skip(1) // Skip the header row.
.map(from_json_row)
.collect::<Result<_, _>>()?;
db::insert(&db, data, account_id).await?;
db::insert(&db, data).await?;
Ok("Ok")
}
async fn extract_csv(mut multipart: Multipart) -> Result<Option<Bytes>, MultipartError> {
let csv = loop {
match multipart.next_field().await? {
Some(field) if field.name() == Some("csv") => {
break Some(field.bytes().await?);
}
Some(_) => {}
None => break None,
}
};
Ok(csv)
}
pub async fn monzo_batched_csv(
Extension(db): Extension<DatabaseConnection>,
multipart: Multipart,
) -> Result<&'static str, AppError> {
static CSV_MISSING_ERR_MSG: &str = "No CSV file provided. Expected a multipart request with a `csv` field containing the contents of the CSV.";
let (csv, account_name) = extract_csv_and_account_name(multipart)
let csv = extract_csv(multipart)
.await
.map_err(|e| AppError::BadRequest(anyhow!(e)))?;
let Some(csv) = csv else {
return Err(AppError::BadRequest(anyhow!(CSV_MISSING_ERR_MSG)));
};
let account_id = db::get_account_id(&db, account_name).await?;
.map_err(|e| AppError::BadRequest(anyhow!(e)))
.and_then(|csv| csv.ok_or(AppError::BadRequest(anyhow!(CSV_MISSING_ERR_MSG))))?;
let csv = Cursor::new(csv);
let mut csv = csv::Reader::from_reader(csv);
@ -128,7 +58,7 @@ pub async fn monzo_batched_csv(
.map(from_csv_row)
.collect::<Result<_, _>>()?;
db::insert(&db, data, account_id).await?;
db::insert(&db, data).await?;
Ok("Ok")
}

View File

@ -4,7 +4,7 @@ mod ingestion;
use crate::error::AppError;
use crate::ingestion::db;
use crate::ingestion::ingestion_logic::from_csv_row;
use crate::ingestion::routes::{monzo_batched_csv, monzo_batched_json, shortcuts_csv};
use crate::ingestion::routes::{monzo_batched_csv, monzo_batched_json};
use axum::routing::{get, post};
use axum::{Extension, Router};
use clap::{Parser, Subcommand};
@ -16,6 +16,16 @@ use std::path::PathBuf;
use tower_http::trace::TraceLayer;
use tracing::log::LevelFilter;
use opentelemetry::trace::TracerProvider as _;
use opentelemetry_sdk::trace::TracerProvider;
use opentelemetry_otlp;
use tracing::{error, span};
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::Registry;
use tracin_support::init_tracing_subscriber;
mod tracin_support;
#[derive(Debug, Subcommand)]
enum Commands {
/// Manually run database migrations.
@ -44,10 +54,6 @@ enum Commands {
Csv {
/// The path of the CSV file to ingest.
csv_file: PathBuf,
/// The name of the account to ingest the CSV for.
#[clap(long, short)]
account: String,
},
}
@ -72,7 +78,7 @@ async fn health_check(
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let _guard = init_tracing_subscriber();
let cli: Cli = Cli::parse();
let connection = sea_orm::ConnectOptions::new(&cli.database_url)
@ -98,7 +104,7 @@ async fn main() -> anyhow::Result<()> {
serve_web(addr, connection).await?;
}
Commands::Csv { csv_file, account: account_name } => {
Commands::Csv { csv_file } => {
let mut csv = csv::Reader::from_reader(File::open(csv_file)?);
let data = csv.records();
let data = data
@ -106,8 +112,7 @@ async fn main() -> anyhow::Result<()> {
.map(from_csv_row)
.collect::<Result<_, _>>()?;
let account_id = db::get_account_id(&connection, Some(account_name)).await?;
db::insert(&connection, data, account_id).await?;
db::insert(&connection, data).await?;
}
}
@ -119,7 +124,6 @@ async fn serve_web(address: SocketAddr, connection: DatabaseConnection) -> anyho
.route("/health", get(health_check))
.route("/monzo-batch-export", post(monzo_batched_json))
.route("/monzo-csv-ingestion", post(monzo_batched_csv))
.route("/shortcuts-csv-import", post(shortcuts_csv))
.layer(Extension(connection.clone()))
.layer(TraceLayer::new_for_http());

108
src/tracin_support.rs Normal file
View File

@ -0,0 +1,108 @@
use opentelemetry::{global, trace::TracerProvider, Key, KeyValue};
use opentelemetry_sdk::{
metrics::{
reader::{DefaultAggregationSelector, DefaultTemporalitySelector},
Aggregation, Instrument, MeterProviderBuilder, PeriodicReader, SdkMeterProvider, Stream,
},
runtime,
trace::{BatchConfig, RandomIdGenerator, Sampler, Tracer},
Resource,
};
use opentelemetry_semantic_conventions::{
resource::{DEPLOYMENT_ENVIRONMENT, SERVICE_NAME, SERVICE_VERSION},
SCHEMA_URL,
};
use tracing::Level;
use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
// Create a Resource that captures information about the entity for which telemetry is recorded.
fn resource() -> Resource {
Resource::from_schema_url(
[
KeyValue::new(SERVICE_NAME, env!("CARGO_PKG_NAME")),
KeyValue::new(SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
KeyValue::new(DEPLOYMENT_ENVIRONMENT, "develop"),
],
SCHEMA_URL,
)
}
// Construct MeterProvider for MetricsLayer
fn init_meter_provider() -> SdkMeterProvider {
let exporter = opentelemetry_otlp::new_exporter()
.http()
.with_http_client(reqwest::Client::new())
.build_metrics_exporter(
Box::new(DefaultAggregationSelector::new()),
Box::new(DefaultTemporalitySelector::new()),
)
.unwrap();
let reader = PeriodicReader::builder(exporter, runtime::Tokio)
.with_interval(std::time::Duration::from_secs(30))
.build();
let meter_provider = MeterProviderBuilder::default()
.with_resource(resource())
.with_reader(reader)
.build();
global::set_meter_provider(meter_provider.clone());
meter_provider
}
// Construct Tracer for OpenTelemetryLayer
fn init_tracer() -> Tracer {
let provider = opentelemetry_otlp::new_pipeline()
.tracing()
.with_trace_config(
opentelemetry_sdk::trace::Config::default()
// Customize sampling strategy
.with_sampler(Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(
1.0,
))))
// If export trace to AWS X-Ray, you can use XrayIdGenerator
.with_id_generator(RandomIdGenerator::default())
.with_resource(resource()),
)
.with_batch_config(BatchConfig::default())
.with_exporter(opentelemetry_otlp::new_exporter().tonic())
.install_batch(runtime::Tokio)
.unwrap();
global::set_tracer_provider(provider.clone());
provider.tracer("tracing-otel-subscriber")
}
// Initialize tracing-subscriber and return OtelGuard for opentelemetry-related termination processing
pub(crate) fn init_tracing_subscriber() -> OtelGuard {
let meter_provider = init_meter_provider();
let tracer = init_tracer();
tracing_subscriber::registry()
.with(tracing_subscriber::filter::LevelFilter::from_level(
Level::DEBUG,
))
.with(tracing_subscriber::fmt::layer())
.with(MetricsLayer::new(meter_provider.clone()))
.with(OpenTelemetryLayer::new(tracer))
.init();
OtelGuard { meter_provider }
}
pub(crate) struct OtelGuard {
meter_provider: SdkMeterProvider,
}
impl Drop for OtelGuard {
fn drop(&mut self) {
if let Err(err) = self.meter_provider.shutdown() {
eprintln!("{err:?}");
}
global::shutdown_tracer_provider();
}
}