上圖是Exporter與其他組件和服務的關係.
Exporter指定了怎麼把span資料透過怎樣的目標發過去, 並且處理好協議格式問題. 目的是想最大程度地減少實現匯出到vendor的實作負擔.
一樣先看interface程式碼連結
// SpanExporter handles the delivery of spans to external receivers. This is
// the final component in the trace export pipeline.
type SpanExporter interface {
ExportSpans(ctx context.Context, spans []ReadOnlySpan) error
Shutdown(ctx context.Context) error
}
type ExportResult struct {
Code ExportResultCode
WrappedError error
}
type ExportResultCode int
const (
Success ExportResultCode = iota
Failure
)
ExportSpans是平常Processor匯出時呼叫的.
Shutdown則是當exporter要被停止了(可能是程序關閉, 或者人為喊停), 我們就需要exporeter清理還沒處理完的span, 且這操作可以說是同步的,
該操作也要遵守timeout與cancel, 不然本來要盡快關閉程序的, 被阻塞在這等它清理完成.
該方法只能被呼叫一次, 如果被呼叫過再被調用的話, 要回傳一個Failure結果.
有點像熔斷器(CircuitBreaker)的感覺, 熔斷Open了就立刻返回Failure.
其實規範裡還有一個ForceFlush()
給client自己決定調用時機, 用途是盡量把exporter內的span都導出完成.
可以做成同步或非同步透過callback或event notify通知調用者, export完成的結果.
只是這三個介面方法, 官方有說可以讓SDK實作者稍微的偏離這些(就是可以不用三個都宣告啦), 像OTel-Go, 只宣告了前兩個.
往下看能發現, Stdout trace exporter, 確實有實做SDK trace exporter介面.程式碼連結
兩個方法一開始都有上Lock, 來防止一些Race Condition問題.
var zeroTime time.Time
// Exporter is an implementation of trace.SpanSyncer that writes spans to stdout.
type Exporter struct {
encoder *json.Encoder
encoderMu sync.Mutex
timestamps bool
stoppedMu sync.RWMutex
stopped bool
}
// ExportSpans writes spans in json format to stdout.
func (e *Exporter) ExportSpans(ctx context.Context, spans []trace.ReadOnlySpan) error {
e.stoppedMu.RLock()
stopped := e.stopped
e.stoppedMu.RUnlock()
if stopped {
return nil
}
if len(spans) == 0 {
return nil
}
stubs := tracetest.SpanStubsFromReadOnlySpans(spans)
e.encoderMu.Lock()
defer e.encoderMu.Unlock()
for i := range stubs {
stub := &stubs[i]
// Remove timestamps
if !e.timestamps {
stub.StartTime = zeroTime
stub.EndTime = zeroTime
for j := range stub.Events {
ev := &stub.Events[j]
ev.Time = zeroTime
}
}
// Encode span stubs, one by one
if err := e.encoder.Encode(stub); err != nil {
return err
}
}
return nil
}
// Shutdown is called to stop the exporter, it preforms no action.
func (e *Exporter) Shutdown(ctx context.Context) error {
e.stoppedMu.Lock()
e.stopped = true
e.stoppedMu.Unlock()
select {
case <-ctx.Done():
return ctx.Err()
default:
}
return nil
}
程式碼連結
Jaeger trace exporter, 支援了batch的部份
Jaeger因為走得是thrift協議, 所以實作內也有實現了協議轉換的部份.
// Exporter exports OpenTelemetry spans to a Jaeger agent or collector.
type Exporter struct {
uploader batchUploader
stopOnce sync.Once
stopCh chan struct{}
defaultServiceName string
}
var _ sdktrace.SpanExporter = (*Exporter)(nil)
// ExportSpans transforms and exports OpenTelemetry spans to Jaeger.
func (e *Exporter) ExportSpans(ctx context.Context, spans []sdktrace.ReadOnlySpan) error {
// Return fast if context is already canceled or Exporter shutdown.
select {
case <-ctx.Done():
return ctx.Err()
case <-e.stopCh:
return nil
default:
}
// Cancel export if Exporter is shutdown.
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(ctx)
defer cancel()
go func(ctx context.Context, cancel context.CancelFunc) {
select {
case <-ctx.Done():
case <-e.stopCh:
cancel()
}
}(ctx, cancel)
for _, batch := range jaegerBatchList(spans, e.defaultServiceName) {
if err := e.uploader.upload(ctx, batch); err != nil {
return err
}
}
return nil
}
// Shutdown stops the Exporter. This will close all connections and release
// all resources held by the Exporter.
func (e *Exporter) Shutdown(ctx context.Context) error {
// Stop any active and subsequent exports.
e.stopOnce.Do(func() { close(e.stopCh) })
select {
case <-ctx.Done():
return ctx.Err()
default:
}
return e.uploader.shutdown(ctx)
}
// jaegerBatchList transforms a slice of spans into a slice of jaeger Batch.
func jaegerBatchList(ssl []sdktrace.ReadOnlySpan, defaultServiceName string) []*gen.Batch {
//ignore
batch.Spans = append(batch.Spans, spanToThrift(ss))
//ignore
}
func spanToThrift(ss sdktrace.ReadOnlySpan) *gen.Span {
// ignore
}
這裡Jaeger在Shutdown關閉一些channel時, 有用到一些概念, 能參考小弟以前的文章Channel, goroutine之間的溝通橋樑-The Channel Closing Principle
上面的圖跟之前有提到, OTel架構有一個Collector服務,
也是能讓exporter把span資料匯出到collector上, 但它走得主要是OTLP協議.
type Exporter struct {
client Client
mu sync.RWMutex
started bool
startOnce sync.Once
stopOnce sync.Once
}
// ExportSpans exports a batch of spans.
func (e *Exporter) ExportSpans(ctx context.Context, ss []tracesdk.ReadOnlySpan) error {
protoSpans := tracetransform.Spans(ss)
if len(protoSpans) == 0 {
return nil
}
return e.client.UploadTraces(ctx, protoSpans)
}
// Shutdown flushes all exports and closes all connections to the receiving endpoint.
func (e *Exporter) Shutdown(ctx context.Context) error {
e.mu.RLock()
started := e.started
e.mu.RUnlock()
if !started {
return nil
}
var err error
e.stopOnce.Do(func() {
err = e.client.Stop(ctx)
e.mu.Lock()
e.started = false
e.mu.Unlock()
})
return err
}
最後用Stdout trace exporter跑看看結果
exp, err := stdouttrace.New(stdouttrace.WithPrettyPrint())
if err != nil {
log.Fatalf("error: %s", err.Error())
}
tp := trace.NewTracerProvider(
trace.WithSampler(getSampler()),
trace.WithSpanProcessor(trace.NewBatchSpanProcessor(exp)),
trace.WithSpanProcessor(trace.NewSimpleSpanProcessor(exp)),
trace.WithResource(newResource(ctx)),
)
otel.SetTracerProvider(tp)
可以看到有很都requect context與span context資料
OTel-Go trace exporter這裡官方提供了有Jaeger, Stdout, Zipkin, OTLP的部份連結
開發者也是能客製化自己的exporter,只要實做該介面就好.
重要的還是一些race condition與同步/非同步的方面給設計好.
Metrics部份官方也有提供Prometheus的exporter能直接使用連結