From e2ffd900b415c26455f0710b1b405f454288e510 Mon Sep 17 00:00:00 2001 From: Joshua Coles Date: Sun, 8 Sep 2024 15:19:43 +0100 Subject: [PATCH] Stash attempt at tracing exporting --- Cargo.lock | 413 ++++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 8 + src/ingestion/db.rs | 2 +- src/main.rs | 12 +- src/tracin_support.rs | 108 +++++++++++ 5 files changed, 541 insertions(+), 2 deletions(-) create mode 100644 src/tracin_support.rs diff --git a/Cargo.lock b/Cargo.lock index 4e10501..e7b36e7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1131,6 +1131,21 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.2.1" @@ -1325,6 +1340,25 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "h2" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "524e8ac6999421f49a846c2d4411f337e53497d8ec55d67753beffa43c5d9205" +dependencies = [ + "atomic-waker", + "bytes", + "fnv", + "futures-core", + "futures-sink", + "http", + "indexmap 2.3.0", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -1468,6 +1502,7 @@ dependencies = [ "bytes", "futures-channel", "futures-util", + "h2", "http", "http-body", "httparse", @@ -1513,6 +1548,35 @@ dependencies = [ "tower-service", ] +[[package]] +name = "hyper-timeout" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3203a961e5c83b6f5498933e78b6b263e208c197b63e9c6c53cc82ffd3f63793" +dependencies = [ + "hyper", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + +[[package]] +name = "hyper-tls" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" +dependencies = [ + "bytes", + "http-body-util", + "hyper", + "hyper-util", + "native-tls", + "tokio", + "tokio-native-tls", + "tower-service", +] + [[package]] name = "hyper-util" version = "0.1.7" @@ -1640,6 +1704,12 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "ipnet" +version = "2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "187674a687eed5fe42285b40c6291f9a01517d415fad1c3cbc6a9f778af7fcd4" + [[package]] name = "is_terminal_polyfill" version = "1.70.1" @@ -1870,6 +1940,12 @@ dependencies = [ "migration", "num-traits", "once_cell", + "opentelemetry", + "opentelemetry-http", + "opentelemetry-otlp", + "opentelemetry-semantic-conventions", + "opentelemetry_sdk", + "reqwest", "sea-orm", "serde", "serde_json", @@ -1880,6 +1956,7 @@ dependencies = [ "tokio", "tower-http", "tracing", + "tracing-opentelemetry", "tracing-subscriber", ] @@ -1900,6 +1977,23 @@ dependencies = [ "version_check", ] +[[package]] +name = "native-tls" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8614eb2c83d59d1c8cc974dd3f920198647674a0a035e1af1fa58707e317466" +dependencies = [ + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "nom" version = "7.1.3" @@ -1998,12 +2092,138 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +[[package]] +name = "openssl" +version = "0.10.66" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9529f4786b70a3e8c61e11179af17ab6188ad8d0ded78c5529441ed39d4bd9c1" +dependencies = [ + "bitflags 2.6.0", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.72", +] + [[package]] name = "openssl-probe" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "openssl-sys" +version = "0.9.103" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f9e8deee91df40a943c71b917e5874b951d32a802526c85721ce3b776c929d6" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + +[[package]] +name = "opentelemetry" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c365a63eec4f55b7efeceb724f1336f26a9cf3427b70e59e2cd2a5b947fba96" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "once_cell", + "pin-project-lite", + "thiserror", +] + +[[package]] +name = "opentelemetry-http" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad31e9de44ee3538fb9d64fe3376c1362f406162434609e79aea2a41a0af78ab" +dependencies = [ + "async-trait", + "bytes", + "http", + "opentelemetry", + "reqwest", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b925a602ffb916fb7421276b86756027b37ee708f9dce2dbdcc51739f07e727" +dependencies = [ + "async-trait", + "futures-core", + "http", + "opentelemetry", + "opentelemetry-http", + "opentelemetry-proto", + "opentelemetry_sdk", + "prost", + "serde_json", + "thiserror", + "tokio", + "tonic", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30ee9f20bff9c984511a02f082dc8ede839e4a9bf15cc2487c8d6fea5ad850d9" +dependencies = [ + "hex", + "opentelemetry", + "opentelemetry_sdk", + "prost", + "serde", + "tonic", +] + +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cefe0543875379e47eb5f1e68ff83f45cc41366a92dfd0d073d513bf68e9a05" + +[[package]] +name = "opentelemetry_sdk" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "692eac490ec80f24a17828d49b40b60f5aeaccdfe6a503f939713afd22bc28df" +dependencies = [ + "async-trait", + "futures-channel", + "futures-executor", + "futures-util", + "glob", + "once_cell", + "opentelemetry", + "percent-encoding", + "rand", + "serde_json", + "thiserror", + "tokio", + "tokio-stream", +] + [[package]] name = "option-ext" version = "0.2.0" @@ -2292,6 +2512,29 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b2ecbe40f08db5c006b5764a2645f7f3f141ce756412ac9e1dd6087e6d32995" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acf0c195eebb4af52c752bec4f52f645da98b6e92077a04110c7f349477ae5ac" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn 2.0.72", +] + [[package]] name = "ptr_meta" version = "0.1.4" @@ -2439,6 +2682,50 @@ dependencies = [ "bytecheck", ] +[[package]] +name = "reqwest" +version = "0.12.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8f4955649ef5c38cc7f9e8aa41761d48fb9677197daea9984dc54f56aad5e63" +dependencies = [ + "base64 0.22.1", + "bytes", + "encoding_rs", + "futures-channel", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-rustls", + "hyper-tls", + "hyper-util", + "ipnet", + "js-sys", + "log", + "mime", + "native-tls", + "once_cell", + "percent-encoding", + "pin-project-lite", + "rustls-pemfile 2.1.3", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper 1.0.1", + "system-configuration", + "tokio", + "tokio-native-tls", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "windows-registry", +] + [[package]] name = "ring" version = "0.17.8" @@ -3410,6 +3697,30 @@ name = "sync_wrapper" version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394" +dependencies = [ + "futures-core", +] + +[[package]] +name = "system-configuration" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c879d448e9d986b661742763247d3693ed13609438cf3d006f51f5368a5ba6b" +dependencies = [ + "bitflags 2.6.0", + "core-foundation", + "system-configuration-sys", +] + +[[package]] +name = "system-configuration-sys" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e1d1b10ced5ca923a1fcb8d03e96b8d3268065d724548c0211415ff6ac6bac4" +dependencies = [ + "core-foundation-sys", + "libc", +] [[package]] name = "tap" @@ -3572,6 +3883,16 @@ dependencies = [ "syn 2.0.72", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.26.0" @@ -3624,6 +3945,36 @@ dependencies = [ "winnow", ] +[[package]] +name = "tonic" +version = "0.12.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6f6ba989e4b2c58ae83d862d3a3e27690b6e3ae630d0deb59f3697f32aa88ad" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64 0.22.1", + "bytes", + "h2", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-timeout", + "hyper-util", + "percent-encoding", + "pin-project", + "prost", + "socket2 0.5.7", + "tokio", + "tokio-stream", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower" version = "0.4.13" @@ -3632,9 +3983,13 @@ checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" dependencies = [ "futures-core", "futures-util", + "indexmap 1.9.3", "pin-project", "pin-project-lite", + "rand", + "slab", "tokio", + "tokio-util", "tower-layer", "tower-service", "tracing", @@ -3713,6 +4068,24 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-opentelemetry" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9784ed4da7d921bc8df6963f8c80a0e4ce34ba6ba76668acadd3edbd985ff3b" +dependencies = [ + "js-sys", + "once_cell", + "opentelemetry", + "opentelemetry_sdk", + "smallvec", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber", + "web-time", +] + [[package]] name = "tracing-subscriber" version = "0.3.18" @@ -3948,6 +4321,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "webpki-roots" version = "0.25.4" @@ -4007,6 +4390,36 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-registry" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e400001bb720a623c1c69032f8e3e4cf09984deec740f007dd2b03ec864804b0" +dependencies = [ + "windows-result", + "windows-strings", + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-result" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d1043d8214f791817bab27572aaa8af63732e11bf84aa21a45a78d6c317ae0e" +dependencies = [ + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-strings" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cd9b125c486025df0eabcb585e62173c6c9eddcec5d117d3b6e8c30e2ee4d10" +dependencies = [ + "windows-result", + "windows-targets 0.52.6", +] + [[package]] name = "windows-sys" version = "0.48.0" diff --git a/Cargo.toml b/Cargo.toml index 7091831..addd79b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,5 +33,13 @@ tower-http = { version = "0.5", features = ["trace"] } bytes = "1.7" once_cell = "1.19" +tracing-opentelemetry = "0.25.0" +opentelemetry = "0.24.0" +opentelemetry_sdk = { version = "0.24.1", features = ["rt-tokio"] } +opentelemetry-http = { version = "0.13.0", features = ["reqwest"] } +opentelemetry-otlp = { version = "0.17.0", features = ["grpc-tonic", "http-json", "tokio"] } +opentelemetry-semantic-conventions = "0.16.0" +reqwest = "0.12.7" + [workspace] members = [".", "migration", "entity"] diff --git a/src/ingestion/db.rs b/src/ingestion/db.rs index 4b7950e..c891bd2 100644 --- a/src/ingestion/db.rs +++ b/src/ingestion/db.rs @@ -256,7 +256,7 @@ mod tests { listener.listen("monzo_new_transactions").await?; let json = include_str!("../../fixtures/transactions.json"); - let json: Vec> = serde_json::from_str(json).unwrap(); + let json: Vec> = serde_json::from_str(json)?; let data = json .iter() .map(|row| from_json_row(row.clone())) diff --git a/src/main.rs b/src/main.rs index 8c3c14b..1d65397 100644 --- a/src/main.rs +++ b/src/main.rs @@ -16,6 +16,16 @@ use std::path::PathBuf; use tower_http::trace::TraceLayer; use tracing::log::LevelFilter; +use opentelemetry::trace::TracerProvider as _; +use opentelemetry_sdk::trace::TracerProvider; +use opentelemetry_otlp; +use tracing::{error, span}; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::Registry; +use tracin_support::init_tracing_subscriber; + +mod tracin_support; + #[derive(Debug, Subcommand)] enum Commands { /// Manually run database migrations. @@ -68,7 +78,7 @@ async fn health_check( #[tokio::main] async fn main() -> anyhow::Result<()> { - tracing_subscriber::fmt::init(); + let _guard = init_tracing_subscriber(); let cli: Cli = Cli::parse(); let connection = sea_orm::ConnectOptions::new(&cli.database_url) diff --git a/src/tracin_support.rs b/src/tracin_support.rs new file mode 100644 index 0000000..a06b5a4 --- /dev/null +++ b/src/tracin_support.rs @@ -0,0 +1,108 @@ +use opentelemetry::{global, trace::TracerProvider, Key, KeyValue}; +use opentelemetry_sdk::{ + metrics::{ + reader::{DefaultAggregationSelector, DefaultTemporalitySelector}, + Aggregation, Instrument, MeterProviderBuilder, PeriodicReader, SdkMeterProvider, Stream, + }, + runtime, + trace::{BatchConfig, RandomIdGenerator, Sampler, Tracer}, + Resource, +}; +use opentelemetry_semantic_conventions::{ + resource::{DEPLOYMENT_ENVIRONMENT, SERVICE_NAME, SERVICE_VERSION}, + SCHEMA_URL, +}; +use tracing::Level; +use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer}; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; + +// Create a Resource that captures information about the entity for which telemetry is recorded. +fn resource() -> Resource { + Resource::from_schema_url( + [ + KeyValue::new(SERVICE_NAME, env!("CARGO_PKG_NAME")), + KeyValue::new(SERVICE_VERSION, env!("CARGO_PKG_VERSION")), + KeyValue::new(DEPLOYMENT_ENVIRONMENT, "develop"), + ], + SCHEMA_URL, + ) +} + +// Construct MeterProvider for MetricsLayer +fn init_meter_provider() -> SdkMeterProvider { + let exporter = opentelemetry_otlp::new_exporter() + .http() + .with_http_client(reqwest::Client::new()) + .build_metrics_exporter( + Box::new(DefaultAggregationSelector::new()), + Box::new(DefaultTemporalitySelector::new()), + ) + .unwrap(); + + let reader = PeriodicReader::builder(exporter, runtime::Tokio) + .with_interval(std::time::Duration::from_secs(30)) + .build(); + + let meter_provider = MeterProviderBuilder::default() + .with_resource(resource()) + .with_reader(reader) + .build(); + + global::set_meter_provider(meter_provider.clone()); + + meter_provider +} + +// Construct Tracer for OpenTelemetryLayer +fn init_tracer() -> Tracer { + let provider = opentelemetry_otlp::new_pipeline() + .tracing() + .with_trace_config( + opentelemetry_sdk::trace::Config::default() + // Customize sampling strategy + .with_sampler(Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased( + 1.0, + )))) + // If export trace to AWS X-Ray, you can use XrayIdGenerator + .with_id_generator(RandomIdGenerator::default()) + .with_resource(resource()), + ) + .with_batch_config(BatchConfig::default()) + .with_exporter(opentelemetry_otlp::new_exporter().tonic()) + .install_batch(runtime::Tokio) + .unwrap(); + + global::set_tracer_provider(provider.clone()); + provider.tracer("tracing-otel-subscriber") +} + +// Initialize tracing-subscriber and return OtelGuard for opentelemetry-related termination processing +pub(crate) fn init_tracing_subscriber() -> OtelGuard { + let meter_provider = init_meter_provider(); + let tracer = init_tracer(); + + tracing_subscriber::registry() + .with(tracing_subscriber::filter::LevelFilter::from_level( + Level::DEBUG, + )) + .with(tracing_subscriber::fmt::layer()) + .with(MetricsLayer::new(meter_provider.clone())) + .with(OpenTelemetryLayer::new(tracer)) + .init(); + + OtelGuard { meter_provider } +} + +pub(crate) struct OtelGuard { + meter_provider: SdkMeterProvider, +} + +impl Drop for OtelGuard { + fn drop(&mut self) { + if let Err(err) = self.meter_provider.shutdown() { + eprintln!("{err:?}"); + } + + global::shutdown_tracer_provider(); + } +}