Add a little more CLI structure for local runs
This commit is contained in:
parent
7fd85550ea
commit
b563bbd02c
92
src/main.rs
92
src/main.rs
@ -2,29 +2,55 @@ mod error;
|
|||||||
mod ingestion;
|
mod ingestion;
|
||||||
|
|
||||||
use crate::error::AppError;
|
use crate::error::AppError;
|
||||||
|
use crate::ingestion::db;
|
||||||
|
use crate::ingestion::ingestion_logic::from_csv_row;
|
||||||
use crate::ingestion::routes::{monzo_batched_csv, monzo_batched_json};
|
use crate::ingestion::routes::{monzo_batched_csv, monzo_batched_json};
|
||||||
use axum::routing::{get, post};
|
use axum::routing::{get, post};
|
||||||
use axum::{Extension, Router};
|
use axum::{Extension, Router};
|
||||||
use clap::Parser;
|
use clap::{Parser, Subcommand};
|
||||||
use migration::{Migrator, MigratorTrait};
|
use migration::{Migrator, MigratorTrait};
|
||||||
use sea_orm::{ConnectionTrait, DatabaseConnection};
|
use sea_orm::{ConnectionTrait, DatabaseConnection};
|
||||||
|
use std::fs::File;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
|
use std::path::PathBuf;
|
||||||
use tower_http::trace::TraceLayer;
|
use tower_http::trace::TraceLayer;
|
||||||
use tracing::log::LevelFilter;
|
use tracing::log::LevelFilter;
|
||||||
|
|
||||||
|
#[derive(Debug, Subcommand)]
|
||||||
|
enum Commands {
|
||||||
|
Migrate {
|
||||||
|
/// Number of migration steps to perform. If not provided, all migrations will be run.
|
||||||
|
#[arg(long)]
|
||||||
|
steps: Option<u32>,
|
||||||
|
|
||||||
|
/// If we should perform migration down.
|
||||||
|
#[arg(long)]
|
||||||
|
down: bool,
|
||||||
|
},
|
||||||
|
Run {
|
||||||
|
/// If we should perform migration on startup.
|
||||||
|
#[clap(short, long, env, default_value_t = true)]
|
||||||
|
migrate: bool,
|
||||||
|
|
||||||
|
/// The server address to bind to.
|
||||||
|
#[clap(short, long, env, default_value = "0.0.0.0:3000")]
|
||||||
|
addr: SocketAddr,
|
||||||
|
},
|
||||||
|
|
||||||
|
Csv {
|
||||||
|
/// The path to the CSV file to ingest.
|
||||||
|
csv_file: PathBuf,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, clap::Parser)]
|
#[derive(Debug, clap::Parser)]
|
||||||
struct Config {
|
struct Cli {
|
||||||
/// If we should perform migration on startup.
|
|
||||||
#[clap(short, long, env, default_value_t = true)]
|
|
||||||
migrate: bool,
|
|
||||||
|
|
||||||
/// The server address to bind to.
|
|
||||||
#[clap(short, long, env, default_value = "0.0.0.0:3000")]
|
|
||||||
addr: SocketAddr,
|
|
||||||
|
|
||||||
/// URL to PostgreSQL database.
|
/// URL to PostgreSQL database.
|
||||||
#[clap(short, long = "db", env)]
|
#[clap(short, long = "db", env)]
|
||||||
database_url: String,
|
database_url: String,
|
||||||
|
|
||||||
|
#[command(subcommand)]
|
||||||
|
command: Commands,
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn health_check(
|
async fn health_check(
|
||||||
@ -37,18 +63,48 @@ async fn health_check(
|
|||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> anyhow::Result<()> {
|
async fn main() -> anyhow::Result<()> {
|
||||||
let config: Config = Config::parse();
|
tracing_subscriber::fmt::init();
|
||||||
let connection = sea_orm::ConnectOptions::new(&config.database_url)
|
|
||||||
|
let cli: Cli = Cli::parse();
|
||||||
|
let connection = sea_orm::ConnectOptions::new(&cli.database_url)
|
||||||
.sqlx_logging_level(LevelFilter::Debug)
|
.sqlx_logging_level(LevelFilter::Debug)
|
||||||
.to_owned();
|
.to_owned();
|
||||||
|
|
||||||
let connection = sea_orm::Database::connect(connection).await?;
|
let connection = sea_orm::Database::connect(connection).await?;
|
||||||
|
|
||||||
if config.migrate {
|
match cli.command {
|
||||||
Migrator::up(&connection, None).await?;
|
Commands::Migrate { steps, down } => {
|
||||||
|
if down {
|
||||||
|
Migrator::down(&connection, steps).await?;
|
||||||
|
} else {
|
||||||
|
Migrator::up(&connection, steps).await?
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Commands::Run { migrate, addr } => {
|
||||||
|
if migrate {
|
||||||
|
Migrator::up(&connection, None).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
serve_web(addr, connection).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Commands::Csv { csv_file } => {
|
||||||
|
let mut csv = csv::Reader::from_reader(File::open(csv_file)?);
|
||||||
|
let data = csv.records();
|
||||||
|
let data = data
|
||||||
|
.filter_map(|f| f.ok())
|
||||||
|
.map(from_csv_row)
|
||||||
|
.collect::<Result<_, _>>()?;
|
||||||
|
|
||||||
|
db::insert(&connection, data).await?;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
tracing_subscriber::fmt::init();
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn serve_web(address: SocketAddr, connection: DatabaseConnection) -> anyhow::Result<()> {
|
||||||
let app = Router::new()
|
let app = Router::new()
|
||||||
.route("/health", get(health_check))
|
.route("/health", get(health_check))
|
||||||
.route("/monzo-batch-export", post(monzo_batched_json))
|
.route("/monzo-batch-export", post(monzo_batched_json))
|
||||||
@ -56,9 +112,9 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
.layer(Extension(connection.clone()))
|
.layer(Extension(connection.clone()))
|
||||||
.layer(TraceLayer::new_for_http());
|
.layer(TraceLayer::new_for_http());
|
||||||
|
|
||||||
tracing::debug!("listening on {}", &config.addr);
|
tracing::info!("listening on {}", &address);
|
||||||
let listener = tokio::net::TcpListener::bind(&config.addr).await.unwrap();
|
let listener = tokio::net::TcpListener::bind(&address).await?;
|
||||||
axum::serve(listener, app).await.unwrap();
|
axum::serve(listener, app).await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user