Skip to content

Commit

Permalink
[common/tracing] feature: report tracing stat to jaeger
Browse files Browse the repository at this point in the history
Start a local jaeger server and report tracing data and view it:

```
docker run -d -p6831:6831/udp -p6832:6832/udp -p16686:16686 jaegertracing/all-in-one:latest
FUSE_JAEGER=on RUST_LOG=trace cargo test
open http://localhost:16686/
```

-   Add: report tracing data to opentelemetry server(jaeger) with env
    `FUSE_JAEGER=on`

-   Add tracing::instrument to store flight server and meta server.

-   Inject tracing span to tonic request on the client side
    and extract them on the server side to form a complete tracing.

-   Add tracing to SledTree
  • Loading branch information
drmingdrmer committed Jul 29, 2021
1 parent 424d396 commit 0424aa1
Show file tree
Hide file tree
Showing 14 changed files with 425 additions and 207 deletions.
48 changes: 48 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions common/flights/src/impls/kv_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub use common_store_api::kv_api::PrefixListReply;
pub use common_store_api::kv_api::UpsertKVActionResult;
pub use common_store_api::GetKVActionResult;
use common_store_api::KVApi;
use common_tracing::tracing;

use crate::action_declare;
use crate::RequestFor;
Expand All @@ -18,6 +19,7 @@ use crate::StoreDoAction;

#[async_trait::async_trait]
impl KVApi for StoreClient {
#[tracing::instrument(level = "debug", skip(self, value))]
async fn upsert_kv(
&mut self,
key: &str,
Expand All @@ -35,6 +37,7 @@ impl KVApi for StoreClient {
/// Delete a kv record that matches key and seq.
/// Returns the (seq, value) that is deleted.
/// I.e., if key not found or seq does not match, it returns None.
#[tracing::instrument(level = "debug", skip(self))]
async fn delete_kv(&mut self, key: &str, seq: Option<u64>) -> Result<Option<SeqValue>> {
let res = self
.do_action(DeleteKVReq {
Expand All @@ -52,19 +55,22 @@ impl KVApi for StoreClient {
}
}

#[tracing::instrument(level = "debug", skip(self))]
async fn get_kv(&mut self, key: &str) -> Result<GetKVActionResult> {
self.do_action(GetKVAction {
key: key.to_string(),
})
.await
}

#[tracing::instrument(level = "debug", skip(self, keys))]
async fn mget_kv(&mut self, keys: &[String]) -> common_exception::Result<MGetKVActionResult> {
let keys = keys.to_vec();
//keys.iter().map(|k| k.to_string()).collect();
self.do_action(MGetKVAction { keys }).await
}

#[tracing::instrument(level = "debug", skip(self))]
async fn prefix_list_kv(&mut self, prefix: &str) -> common_exception::Result<PrefixListReply> {
self.do_action(PrefixListReq(prefix.to_string())).await
}
Expand Down
8 changes: 7 additions & 1 deletion common/flights/src/store_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use common_arrow::arrow_flight::Action;
use common_arrow::arrow_flight::BasicAuth;
use common_arrow::arrow_flight::HandshakeRequest;
use common_exception::ErrorCode;
use common_tracing::tracing;
use futures::stream;
use futures::StreamExt;
use log::info;
Expand All @@ -34,6 +35,7 @@ pub struct StoreClient {
static AUTH_TOKEN_KEY: &str = "auth-token-bin";

impl StoreClient {
#[tracing::instrument(level = "debug", skip(password))]
pub async fn try_create(addr: &str, username: &str, password: &str) -> anyhow::Result<Self> {
// TODO configuration
let timeout = Duration::from_secs(60);
Expand Down Expand Up @@ -65,6 +67,7 @@ impl StoreClient {
}

/// Handshake.
#[tracing::instrument(level = "debug", skip(client, password))]
async fn handshake(
client: &mut FlightServiceClient<Channel>,
timeout: Duration,
Expand Down Expand Up @@ -94,14 +97,17 @@ impl StoreClient {
Ok(token)
}

#[tracing::instrument(level = "debug", skip(self, v))]
pub(crate) async fn do_action<T, R>(&mut self, v: T) -> common_exception::Result<R>
where
T: RequestFor<Reply = R>,
T: Into<StoreDoAction>,
R: DeserializeOwned,
{
let act: StoreDoAction = v.into();
let mut req: Request<Action> = (&act).try_into()?;
let req: Request<Action> = (&act).try_into()?;
let mut req = common_tracing::inject_span_to_tonic_request(req);

req.set_timeout(self.timeout);

let mut stream = self.client.do_action(req).await?.into_inner();
Expand Down
6 changes: 6 additions & 0 deletions common/tracing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,14 @@ publish = false
edition = "2018"

[dependencies] # In alphabetical order
common-runtime = {path = "../runtime"}

opentelemetry = { version = "0.15", default-features = false, features = ["trace"] }
opentelemetry-jaeger = "0.14"
tonic = "0.4.3"
tracing = "0.1.26"
tracing-appender = "0.1.2"
tracing-bunyan-formatter = "0.2"
tracing-futures = { version = "0.2.5", default-features = false }
tracing-opentelemetry = "0.14.0"
tracing-subscriber = "0.2.19"
3 changes: 3 additions & 0 deletions common/tracing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
// SPDX-License-Identifier: Apache-2.0.

mod logging;
mod tracing_to_jaeger;

pub use logging::init_default_tracing;
pub use logging::init_tracing_with_file;
pub use tracing;
pub use tracing_to_jaeger::extract_remote_span_as_parent;
pub use tracing_to_jaeger::inject_span_to_tonic_request;
28 changes: 27 additions & 1 deletion common/tracing/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@
//
// SPDX-License-Identifier: Apache-2.0.

use std::env;
use std::sync::Once;

use opentelemetry::global;
use opentelemetry::sdk::propagation::TraceContextPropagator;
use tracing_appender::non_blocking::WorkerGuard;
use tracing_appender::rolling::RollingFileAppender;
use tracing_appender::rolling::Rotation;
Expand Down Expand Up @@ -32,9 +35,32 @@ fn init_tracing_stdout() {
.with_ansi(true)
.with_span_events(fmt::format::FmtSpan::FULL);

// Enable reporting tracing data to jaeger if FUSE_JAEGER is non-empty.
// Start a local jaeger server and report tracing data and view it:
// docker run -d -p6831:6831/udp -p6832:6832/udp -p16686:16686 jaegertracing/all-in-one:latest
// FUSE_JAEGER=on RUST_LOG=trace cargo test
// open http://localhost:16686/

// TODO(xp): use FUSE_JAEGER to assign jaeger server address.
let fuse_jaeger = env::var("FUSE_JAEGER").unwrap_or_else(|_| "".to_string());
let ot_layer = if !fuse_jaeger.is_empty() {
global::set_text_map_propagator(TraceContextPropagator::new());

let tracer = opentelemetry_jaeger::new_pipeline()
.with_service_name("fuse-store")
.install_simple()
.expect("install");

let ot_layer = tracing_opentelemetry::layer().with_tracer(tracer);
Some(ot_layer)
} else {
None
};

let subscriber = Registry::default()
.with(EnvFilter::from_default_env())
.with(fmt_layer);
.with(fmt_layer)
.with(ot_layer);

tracing::subscriber::set_global_default(subscriber)
.expect("error setting global tracing subscriber");
Expand Down
79 changes: 79 additions & 0 deletions common/tracing/src/tracing_to_jaeger.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2020-2021 The Datafuse Authors.
//
// SPDX-License-Identifier: Apache-2.0.

use opentelemetry::global;
use opentelemetry::propagation::Extractor;
use opentelemetry::propagation::Injector;
use tracing_opentelemetry::OpenTelemetrySpanExt;

/// Inject tracing info into tonic request meta.
struct MetadataMapInjector<'a>(&'a mut tonic::metadata::MetadataMap);

impl<'a> Injector for MetadataMapInjector<'a> {
/// Set a key and value in the MetadataMap. Does nothing if the key or value are not valid inputs
fn set(&mut self, key: &str, value: String) {
if let Ok(key) = tonic::metadata::MetadataKey::from_bytes(key.as_bytes()) {
if let Ok(val) = tonic::metadata::MetadataValue::from_str(&value) {
self.0.insert(key, val);
}
}
}
}

/// Extract tracing info from tonic request meta.
struct MetadataMapExtractor<'a>(&'a tonic::metadata::MetadataMap);

impl<'a> Extractor for MetadataMapExtractor<'a> {
/// Get a value for a key from the MetadataMap. If the value can't be converted to &str, returns None
fn get(&self, key: &str) -> Option<&str> {
self.0.get(key).and_then(|metadata| metadata.to_str().ok())
}

/// Collect all the keys from the MetadataMap.
fn keys(&self) -> Vec<&str> {
self.0
.keys()
.map(|key| match key {
tonic::metadata::KeyRef::Ascii(v) => v.as_str(),
tonic::metadata::KeyRef::Binary(v) => v.as_str(),
})
.collect::<Vec<_>>()
}
}

/// Inject current tracing::Span info into tonic request meta
/// before sending request to a tonic server.
/// Then the tonic server will be able to chain a distributed tracing.
///
/// A tonic client should call this function just before sending out the request.
///
/// The global propagater must be installed, e.g. by calling: TODO
pub fn inject_span_to_tonic_request<T>(mes: impl tonic::IntoRequest<T>) -> tonic::Request<T> {
let curr = tracing::Span::current();
let cx = curr.context();

let mut request = mes.into_request();

global::get_text_map_propagator(|propagator| {
propagator.inject_context(&cx, &mut MetadataMapInjector(request.metadata_mut()))
});

request
}

/// Extract tracing context from tonic request meta
/// and set current tracing::Span parent to the context from remote,
/// to chain the client side span with current server side span.
///
/// A tonic request handler should call this before doing anything else.
///
/// The global propagater must be installed, e.g. by calling: TODO
pub fn extract_remote_span_as_parent<T>(request: &tonic::Request<T>) {
let parent_cx = global::get_text_map_propagator(|prop| {
prop.extract(&MetadataMapExtractor(request.metadata()))
});

let span = tracing::Span::current();
span.set_parent(parent_cx);
}
5 changes: 5 additions & 0 deletions fusestore/store/src/api/rpc/flight_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use common_flights::StoreDoGet;
use common_runtime::tokio;
use common_runtime::tokio::sync::mpsc::Receiver;
use common_runtime::tokio::sync::mpsc::Sender;
use common_tracing::tracing;
use futures::Stream;
use futures::StreamExt;
use log::info;
Expand Down Expand Up @@ -214,13 +215,17 @@ impl FlightService for StoreFlightImpl {
}

type DoActionStream = FlightStream<arrow_flight::Result>;

#[tracing::instrument(level = "debug", skip(self, request))]
async fn do_action(
&self,
request: Request<Action>,
) -> Result<Response<Self::DoActionStream>, Status> {
// Check token.
let _claim = self.check_token(request.metadata())?;

common_tracing::extract_remote_span_as_parent(&request);

let action: StoreDoAction = request.try_into()?;
info!("Receive do_action: {:?}", action);

Expand Down
Loading

0 comments on commit 0424aa1

Please sign in to comment.