Expose more information in logging and improve error handling
All checks were successful
Build and Publish Docker Container / build (push) Successful in 9m28s
All checks were successful
Build and Publish Docker Container / build (push) Successful in 9m28s
This commit is contained in:
parent
046ce44d23
commit
97f57803e5
4
Cargo.lock
generated
4
Cargo.lock
generated
@ -131,6 +131,9 @@ name = "anyhow"
|
|||||||
version = "1.0.86"
|
version = "1.0.86"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da"
|
checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da"
|
||||||
|
dependencies = [
|
||||||
|
"backtrace",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "arrayvec"
|
name = "arrayvec"
|
||||||
@ -1746,6 +1749,7 @@ version = "0.1.0"
|
|||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"axum",
|
"axum",
|
||||||
|
"bytes",
|
||||||
"chrono",
|
"chrono",
|
||||||
"clap",
|
"clap",
|
||||||
"csv",
|
"csv",
|
||||||
|
|||||||
@ -19,7 +19,7 @@ serde = { version = "1.0.203", features = ["derive"] }
|
|||||||
serde_json = "1.0.117"
|
serde_json = "1.0.117"
|
||||||
tracing-subscriber = "0.3.18"
|
tracing-subscriber = "0.3.18"
|
||||||
tracing = "0.1.40"
|
tracing = "0.1.40"
|
||||||
anyhow = "1.0.86"
|
anyhow = { version = "1.0.86", features = ["backtrace"] }
|
||||||
thiserror = "1.0.61"
|
thiserror = "1.0.61"
|
||||||
http = "1.1.0"
|
http = "1.1.0"
|
||||||
chrono = { version = "0.4.38", features = ["serde"] }
|
chrono = { version = "0.4.38", features = ["serde"] }
|
||||||
@ -30,6 +30,7 @@ testcontainers = "0.17.0"
|
|||||||
testcontainers-modules = { version = "0.5.0", features = ["postgres"] }
|
testcontainers-modules = { version = "0.5.0", features = ["postgres"] }
|
||||||
sqlx = { version = "0.7.4", features = ["postgres"] }
|
sqlx = { version = "0.7.4", features = ["postgres"] }
|
||||||
tower-http = { version = "0.5.2", features = ["trace"] }
|
tower-http = { version = "0.5.2", features = ["trace"] }
|
||||||
|
bytes = "1.6.0"
|
||||||
|
|
||||||
[workspace]
|
[workspace]
|
||||||
members = [".", "migration", "entity"]
|
members = [".", "migration", "entity"]
|
||||||
|
|||||||
17
src/error.rs
17
src/error.rs
@ -6,25 +6,32 @@ use tracing::log::error;
|
|||||||
#[derive(thiserror::Error, Debug)]
|
#[derive(thiserror::Error, Debug)]
|
||||||
pub enum AppError {
|
pub enum AppError {
|
||||||
/// SeaORM error, separated for ease of use allowing us to `?` db operations.
|
/// SeaORM error, separated for ease of use allowing us to `?` db operations.
|
||||||
#[error("Internal error")]
|
#[error("Database error: {0}")]
|
||||||
DbError(#[from] DbErr),
|
DbError(#[from] DbErr),
|
||||||
|
|
||||||
#[error("Invalid request {0}")]
|
#[error("Invalid request {0}")]
|
||||||
BadRequest(anyhow::Error),
|
BadRequest(anyhow::Error),
|
||||||
|
|
||||||
/// Catch all for error we don't care to expose publicly.
|
/// Catch all for error we don't care to expose publicly.
|
||||||
#[error("Internal error")]
|
#[error("An error occurred: {0}")]
|
||||||
Anyhow(#[from] anyhow::Error),
|
Anyhow(#[from] anyhow::Error),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl AppError {
|
||||||
|
fn to_response_string(&self) -> String {
|
||||||
|
match self {
|
||||||
|
AppError::BadRequest(e) => e.to_string(),
|
||||||
|
_ => "Internal server error".to_string(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl IntoResponse for AppError {
|
impl IntoResponse for AppError {
|
||||||
fn into_response(self) -> Response {
|
fn into_response(self) -> Response {
|
||||||
error!("Internal server error: {self:?}");
|
|
||||||
|
|
||||||
let status_code = match self {
|
let status_code = match self {
|
||||||
_ => StatusCode::INTERNAL_SERVER_ERROR,
|
_ => StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
};
|
};
|
||||||
|
|
||||||
(status_code, self.to_string()).into_response()
|
(status_code, self.to_response_string()).into_response()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -2,43 +2,33 @@ use crate::error::AppError;
|
|||||||
use crate::ingestion::db;
|
use crate::ingestion::db;
|
||||||
use crate::ingestion::ingestion_logic::{from_csv_row, from_json_row};
|
use crate::ingestion::ingestion_logic::{from_csv_row, from_json_row};
|
||||||
use anyhow::anyhow;
|
use anyhow::anyhow;
|
||||||
|
use axum::extract::multipart::MultipartError;
|
||||||
use axum::extract::{Extension, Json, Multipart};
|
use axum::extract::{Extension, Json, Multipart};
|
||||||
|
use bytes::Bytes;
|
||||||
use sea_orm::DatabaseConnection;
|
use sea_orm::DatabaseConnection;
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
use std::io::Cursor;
|
use std::io::Cursor;
|
||||||
|
|
||||||
pub async fn monzo_updated(
|
|
||||||
Extension(db): Extension<DatabaseConnection>,
|
|
||||||
Json(row): Json<Vec<Value>>,
|
|
||||||
) -> Result<&'static str, AppError> {
|
|
||||||
db::insert(&db, vec![from_json_row(row)?]).await.unwrap();
|
|
||||||
|
|
||||||
Ok("Ok")
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn monzo_batched_json(
|
pub async fn monzo_batched_json(
|
||||||
Extension(db): Extension<DatabaseConnection>,
|
Extension(db): Extension<DatabaseConnection>,
|
||||||
Json(data): Json<Vec<Vec<Value>>>,
|
Json(data): Json<Vec<Vec<Value>>>,
|
||||||
) -> Result<&'static str, AppError> {
|
) -> Result<&'static str, AppError> {
|
||||||
let insertions = data
|
let data = data
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.skip(1)
|
.skip(1) // Skip the header row.
|
||||||
.map(|row| from_json_row(row))
|
.map(from_json_row)
|
||||||
.collect::<Result<_, _>>()?;
|
.collect::<Result<_, _>>()?;
|
||||||
|
|
||||||
db::insert(&db, insertions).await.unwrap();
|
db::insert(&db, data).await?;
|
||||||
|
|
||||||
Ok("Ok")
|
Ok("Ok")
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn monzo_batched_csv(
|
async fn extract_csv(mut multipart: Multipart) -> Result<Option<Bytes>, MultipartError> {
|
||||||
Extension(db): Extension<DatabaseConnection>,
|
|
||||||
mut multipart: Multipart,
|
|
||||||
) -> Result<&'static str, AppError> {
|
|
||||||
let csv = loop {
|
let csv = loop {
|
||||||
match multipart.next_field().await.unwrap() {
|
match multipart.next_field().await? {
|
||||||
Some(field) if field.name() == Some("csv") => {
|
Some(field) if field.name() == Some("csv") => {
|
||||||
break Some(field.bytes().await.unwrap());
|
break Some(field.bytes().await?);
|
||||||
}
|
}
|
||||||
|
|
||||||
Some(_) => {}
|
Some(_) => {}
|
||||||
@ -46,22 +36,27 @@ pub async fn monzo_batched_csv(
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let Some(csv) = csv else {
|
Ok(csv)
|
||||||
return Err(AppError::BadRequest(anyhow!("No CSV file provided")));
|
}
|
||||||
};
|
|
||||||
|
pub async fn monzo_batched_csv(
|
||||||
|
Extension(db): Extension<DatabaseConnection>,
|
||||||
|
multipart: Multipart,
|
||||||
|
) -> Result<&'static str, AppError> {
|
||||||
|
let csv = extract_csv(multipart)
|
||||||
|
.await
|
||||||
|
.map_err(|e| AppError::BadRequest(anyhow!(e)))
|
||||||
|
.and_then(|csv| csv.ok_or(AppError::BadRequest(anyhow!("No CSV file provided"))))?;
|
||||||
|
|
||||||
let csv = Cursor::new(csv);
|
let csv = Cursor::new(csv);
|
||||||
let mut csv = csv::Reader::from_reader(csv);
|
let mut csv = csv::Reader::from_reader(csv);
|
||||||
let data = csv.records();
|
let data = csv.records();
|
||||||
|
let data = data
|
||||||
db::insert(
|
.filter_map(|f| f.ok())
|
||||||
&db,
|
|
||||||
data.filter_map(|f| f.ok())
|
|
||||||
.map(from_csv_row)
|
.map(from_csv_row)
|
||||||
.collect::<Result<_, _>>()?,
|
.collect::<Result<_, _>>()?;
|
||||||
)
|
|
||||||
.await
|
db::insert(&db, data).await?;
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
Ok("Ok")
|
Ok("Ok")
|
||||||
}
|
}
|
||||||
|
|||||||
@ -2,7 +2,7 @@ mod error;
|
|||||||
mod ingestion;
|
mod ingestion;
|
||||||
|
|
||||||
use crate::error::AppError;
|
use crate::error::AppError;
|
||||||
use crate::ingestion::routes::{monzo_batched_csv, monzo_batched_json, monzo_updated};
|
use crate::ingestion::routes::{monzo_batched_csv, monzo_batched_json};
|
||||||
use axum::routing::{get, post};
|
use axum::routing::{get, post};
|
||||||
use axum::{Extension, Router};
|
use axum::{Extension, Router};
|
||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
@ -51,7 +51,6 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
tracing_subscriber::fmt::init();
|
tracing_subscriber::fmt::init();
|
||||||
let app = Router::new()
|
let app = Router::new()
|
||||||
.route("/health", get(health_check))
|
.route("/health", get(health_check))
|
||||||
.route("/monzo-updated", post(monzo_updated))
|
|
||||||
.route("/monzo-batch-export", post(monzo_batched_json))
|
.route("/monzo-batch-export", post(monzo_batched_json))
|
||||||
.route("/monzo-csv-ingestion", post(monzo_batched_csv))
|
.route("/monzo-csv-ingestion", post(monzo_batched_csv))
|
||||||
.layer(Extension(connection.clone()))
|
.layer(Extension(connection.clone()))
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user