diff --git a/Cargo.lock b/Cargo.lock index 09ced4e..8a87891 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -131,6 +131,9 @@ name = "anyhow" version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da" +dependencies = [ + "backtrace", +] [[package]] name = "arrayvec" @@ -1746,6 +1749,7 @@ version = "0.1.0" dependencies = [ "anyhow", "axum", + "bytes", "chrono", "clap", "csv", diff --git a/Cargo.toml b/Cargo.toml index 6cd87bf..a03576e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,7 @@ serde = { version = "1.0.203", features = ["derive"] } serde_json = "1.0.117" tracing-subscriber = "0.3.18" tracing = "0.1.40" -anyhow = "1.0.86" +anyhow = { version = "1.0.86", features = ["backtrace"] } thiserror = "1.0.61" http = "1.1.0" chrono = { version = "0.4.38", features = ["serde"] } @@ -30,6 +30,7 @@ testcontainers = "0.17.0" testcontainers-modules = { version = "0.5.0", features = ["postgres"] } sqlx = { version = "0.7.4", features = ["postgres"] } tower-http = { version = "0.5.2", features = ["trace"] } +bytes = "1.6.0" [workspace] members = [".", "migration", "entity"] diff --git a/src/error.rs b/src/error.rs index 1293f8f..4be9cca 100644 --- a/src/error.rs +++ b/src/error.rs @@ -6,25 +6,32 @@ use tracing::log::error; #[derive(thiserror::Error, Debug)] pub enum AppError { /// SeaORM error, separated for ease of use allowing us to `?` db operations. - #[error("Internal error")] + #[error("Database error: {0}")] DbError(#[from] DbErr), #[error("Invalid request {0}")] BadRequest(anyhow::Error), /// Catch all for error we don't care to expose publicly. - #[error("Internal error")] + #[error("An error occurred: {0}")] Anyhow(#[from] anyhow::Error), } +impl AppError { + fn to_response_string(&self) -> String { + match self { + AppError::BadRequest(e) => e.to_string(), + _ => "Internal server error".to_string(), + } + } +} + impl IntoResponse for AppError { fn into_response(self) -> Response { - error!("Internal server error: {self:?}"); - let status_code = match self { _ => StatusCode::INTERNAL_SERVER_ERROR, }; - (status_code, self.to_string()).into_response() + (status_code, self.to_response_string()).into_response() } } diff --git a/src/ingestion/routes.rs b/src/ingestion/routes.rs index 2d8d6c0..4c583d1 100644 --- a/src/ingestion/routes.rs +++ b/src/ingestion/routes.rs @@ -2,43 +2,33 @@ use crate::error::AppError; use crate::ingestion::db; use crate::ingestion::ingestion_logic::{from_csv_row, from_json_row}; use anyhow::anyhow; +use axum::extract::multipart::MultipartError; use axum::extract::{Extension, Json, Multipart}; +use bytes::Bytes; use sea_orm::DatabaseConnection; use serde_json::Value; use std::io::Cursor; -pub async fn monzo_updated( - Extension(db): Extension, - Json(row): Json>, -) -> Result<&'static str, AppError> { - db::insert(&db, vec![from_json_row(row)?]).await.unwrap(); - - Ok("Ok") -} - pub async fn monzo_batched_json( Extension(db): Extension, Json(data): Json>>, ) -> Result<&'static str, AppError> { - let insertions = data + let data = data .into_iter() - .skip(1) - .map(|row| from_json_row(row)) + .skip(1) // Skip the header row. + .map(from_json_row) .collect::>()?; - db::insert(&db, insertions).await.unwrap(); + db::insert(&db, data).await?; Ok("Ok") } -pub async fn monzo_batched_csv( - Extension(db): Extension, - mut multipart: Multipart, -) -> Result<&'static str, AppError> { +async fn extract_csv(mut multipart: Multipart) -> Result, MultipartError> { let csv = loop { - match multipart.next_field().await.unwrap() { + match multipart.next_field().await? { Some(field) if field.name() == Some("csv") => { - break Some(field.bytes().await.unwrap()); + break Some(field.bytes().await?); } Some(_) => {} @@ -46,22 +36,27 @@ pub async fn monzo_batched_csv( } }; - let Some(csv) = csv else { - return Err(AppError::BadRequest(anyhow!("No CSV file provided"))); - }; + Ok(csv) +} + +pub async fn monzo_batched_csv( + Extension(db): Extension, + multipart: Multipart, +) -> Result<&'static str, AppError> { + let csv = extract_csv(multipart) + .await + .map_err(|e| AppError::BadRequest(anyhow!(e))) + .and_then(|csv| csv.ok_or(AppError::BadRequest(anyhow!("No CSV file provided"))))?; let csv = Cursor::new(csv); let mut csv = csv::Reader::from_reader(csv); let data = csv.records(); + let data = data + .filter_map(|f| f.ok()) + .map(from_csv_row) + .collect::>()?; - db::insert( - &db, - data.filter_map(|f| f.ok()) - .map(from_csv_row) - .collect::>()?, - ) - .await - .unwrap(); + db::insert(&db, data).await?; Ok("Ok") } diff --git a/src/main.rs b/src/main.rs index 843ebe4..408f981 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,7 +2,7 @@ mod error; mod ingestion; use crate::error::AppError; -use crate::ingestion::routes::{monzo_batched_csv, monzo_batched_json, monzo_updated}; +use crate::ingestion::routes::{monzo_batched_csv, monzo_batched_json}; use axum::routing::{get, post}; use axum::{Extension, Router}; use clap::Parser; @@ -51,7 +51,6 @@ async fn main() -> anyhow::Result<()> { tracing_subscriber::fmt::init(); let app = Router::new() .route("/health", get(health_check)) - .route("/monzo-updated", post(monzo_updated)) .route("/monzo-batch-export", post(monzo_batched_json)) .route("/monzo-csv-ingestion", post(monzo_batched_csv)) .layer(Extension(connection.clone()))