Some checks failed
Build and Publish Docker Container / build (push) Has been cancelled
65 lines
1.8 KiB
Rust
65 lines
1.8 KiB
Rust
use crate::error::AppError;
|
|
use crate::ingestion::db;
|
|
use crate::ingestion::ingestion_logic::{from_csv_row, from_json_row};
|
|
use anyhow::anyhow;
|
|
use axum::extract::multipart::MultipartError;
|
|
use axum::extract::{Extension, Json, Multipart};
|
|
use bytes::Bytes;
|
|
use sea_orm::DatabaseConnection;
|
|
use serde_json::Value;
|
|
use std::io::Cursor;
|
|
|
|
pub async fn monzo_batched_json(
|
|
Extension(db): Extension<DatabaseConnection>,
|
|
Json(data): Json<Vec<Vec<Value>>>,
|
|
) -> Result<&'static str, AppError> {
|
|
let data = data
|
|
.into_iter()
|
|
.skip(1) // Skip the header row.
|
|
.map(from_json_row)
|
|
.collect::<Result<_, _>>()?;
|
|
|
|
db::insert(&db, data).await?;
|
|
|
|
Ok("Ok")
|
|
}
|
|
|
|
async fn extract_csv(mut multipart: Multipart) -> Result<Option<Bytes>, MultipartError> {
|
|
let csv = loop {
|
|
match multipart.next_field().await? {
|
|
Some(field) if field.name() == Some("csv") => {
|
|
break Some(field.bytes().await?);
|
|
}
|
|
|
|
Some(_) => {}
|
|
None => break None,
|
|
}
|
|
};
|
|
|
|
Ok(csv)
|
|
}
|
|
|
|
pub async fn monzo_batched_csv(
|
|
Extension(db): Extension<DatabaseConnection>,
|
|
multipart: Multipart,
|
|
) -> Result<&'static str, AppError> {
|
|
static CSV_MISSING_ERR_MSG: &'static str = "No CSV file provided. Expected a multipart request with a `csv` field containing the contents of the CSV.";
|
|
|
|
let csv = extract_csv(multipart)
|
|
.await
|
|
.map_err(|e| AppError::BadRequest(anyhow!(e)))
|
|
.and_then(|csv| csv.ok_or(AppError::BadRequest(anyhow!(CSV_MISSING_ERR_MSG))))?;
|
|
|
|
let csv = Cursor::new(csv);
|
|
let mut csv = csv::Reader::from_reader(csv);
|
|
let data = csv.records();
|
|
let data = data
|
|
.filter_map(|f| f.ok())
|
|
.map(from_csv_row)
|
|
.collect::<Result<_, _>>()?;
|
|
|
|
db::insert(&db, data).await?;
|
|
|
|
Ok("Ok")
|
|
}
|