monzo-ingestion/src/ingestion/routes.rs
2024-10-15 21:35:55 +01:00

81 lines
2.3 KiB
Rust

use crate::error::AppError;
use crate::ingestion::db;
use crate::ingestion::ingestion_logic::{from_csv_row, from_json_row};
use anyhow::anyhow;
use axum::extract::multipart::MultipartError;
use axum::extract::{Extension, Json, Multipart};
use bytes::Bytes;
use sea_orm::DatabaseConnection;
use serde_json::Value;
use std::io::Cursor;
pub async fn monzo_batched_json(
Extension(db): Extension<DatabaseConnection>,
Json(data): Json<Vec<Vec<Value>>>,
) -> Result<&'static str, AppError> {
let data = data
.into_iter()
.skip(1) // Skip the header row.
.map(from_json_row)
.collect::<Result<_, _>>()?;
// We default to the main account for JSON ingestion for now.
let account_id = db::get_account_id(&db, None).await?;
db::insert(&db, data, account_id).await?;
Ok("Ok")
}
async fn extract_csv_and_account_name(mut multipart: Multipart) -> Result<(Option<Bytes>, Option<String>), MultipartError> {
let mut csv = None;
let mut account_name = None;
while let Some(field) = multipart.next_field().await? {
match field.name() {
Some("csv") => {
csv = Some(field.bytes().await?);
}
Some("account_id") => {
account_name = Some(field.text().await?);
}
_ => {}
}
if csv.is_some() && account_name.is_some() {
break;
}
}
Ok((csv, account_name))
}
pub async fn monzo_batched_csv(
Extension(db): Extension<DatabaseConnection>,
multipart: Multipart,
) -> Result<&'static str, AppError> {
static CSV_MISSING_ERR_MSG: &str = "No CSV file provided. Expected a multipart request with a `csv` field containing the contents of the CSV.";
let (csv, account_name) = extract_csv_and_account_name(multipart)
.await
.map_err(|e| AppError::BadRequest(anyhow!(e)))?;
let Some(csv) = csv else {
return Err(AppError::BadRequest(anyhow!(CSV_MISSING_ERR_MSG)));
};
let account_id = db::get_account_id(&db, account_name).await?;
let csv = Cursor::new(csv);
let mut csv = csv::Reader::from_reader(csv);
let data = csv.records();
let data = data
.filter_map(|f| f.ok())
.map(from_csv_row)
.collect::<Result<_, _>>()?;
db::insert(&db, data, account_id).await?;
Ok("Ok")
}