iT邦幫忙

2022 iThome 鐵人賽

DAY 13
4
DevOps

淺談DevOps與Observability系列 第 13

淺談OpenTelemetry Specification - Trace Exporter

  • 分享至 

  • xImage
  •  

OTel Trace Exporter


上圖是Exporter與其他組件和服務的關係.

Exporter指定了怎麼把span資料透過怎樣的目標發過去, 並且處理好協議格式問題. 目的是想最大程度地減少實現匯出到vendor的實作負擔.

一樣先看interface程式碼連結

// SpanExporter handles the delivery of spans to external receivers. This is
// the final component in the trace export pipeline.
type SpanExporter interface {
	ExportSpans(ctx context.Context, spans []ReadOnlySpan) error
	Shutdown(ctx context.Context) error
}

type ExportResult struct {
    Code         ExportResultCode
    WrappedError error
}

type ExportResultCode int

const (
    Success ExportResultCode = iota
    Failure
)

ExportSpans是平常Processor匯出時呼叫的.

Shutdown則是當exporter要被停止了(可能是程序關閉, 或者人為喊停), 我們就需要exporeter清理還沒處理完的span, 且這操作可以說是同步的,
該操作也要遵守timeout與cancel, 不然本來要盡快關閉程序的, 被阻塞在這等它清理完成.
該方法只能被呼叫一次, 如果被呼叫過再被調用的話, 要回傳一個Failure結果.
有點像熔斷器(CircuitBreaker)的感覺, 熔斷Open了就立刻返回Failure.

其實規範裡還有一個ForceFlush()
給client自己決定調用時機, 用途是盡量把exporter內的span都導出完成.
可以做成同步或非同步透過callback或event notify通知調用者, export完成的結果.
只是這三個介面方法, 官方有說可以讓SDK實作者稍微的偏離這些(就是可以不用三個都宣告啦), 像OTel-Go, 只宣告了前兩個.

Trace Exporter Implementations

Stdout Trace Exporter

往下看能發現, Stdout trace exporter, 確實有實做SDK trace exporter介面.程式碼連結
兩個方法一開始都有上Lock, 來防止一些Race Condition問題.

var zeroTime time.Time

// Exporter is an implementation of trace.SpanSyncer that writes spans to stdout.
type Exporter struct {
	encoder    *json.Encoder
	encoderMu  sync.Mutex
	timestamps bool

	stoppedMu sync.RWMutex
	stopped   bool
}

// ExportSpans writes spans in json format to stdout.
func (e *Exporter) ExportSpans(ctx context.Context, spans []trace.ReadOnlySpan) error {
	e.stoppedMu.RLock()
	stopped := e.stopped
	e.stoppedMu.RUnlock()
	if stopped {
		return nil
	}

	if len(spans) == 0 {
		return nil
	}

	stubs := tracetest.SpanStubsFromReadOnlySpans(spans)

	e.encoderMu.Lock()
	defer e.encoderMu.Unlock()
	for i := range stubs {
		stub := &stubs[i]
		// Remove timestamps
		if !e.timestamps {
			stub.StartTime = zeroTime
			stub.EndTime = zeroTime
			for j := range stub.Events {
				ev := &stub.Events[j]
				ev.Time = zeroTime
			}
		}

		// Encode span stubs, one by one
		if err := e.encoder.Encode(stub); err != nil {
			return err
		}
	}
	return nil
}

// Shutdown is called to stop the exporter, it preforms no action.
func (e *Exporter) Shutdown(ctx context.Context) error {
	e.stoppedMu.Lock()
	e.stopped = true
	e.stoppedMu.Unlock()

	select {
	case <-ctx.Done():
		return ctx.Err()
	default:
	}
	return nil
}

Jaeger Trace Exporter

程式碼連結
Jaeger trace exporter, 支援了batch的部份
Jaeger因為走得是thrift協議, 所以實作內也有實現了協議轉換的部份.

// Exporter exports OpenTelemetry spans to a Jaeger agent or collector.
type Exporter struct {
	uploader           batchUploader
	stopOnce           sync.Once
	stopCh             chan struct{}
	defaultServiceName string
}

var _ sdktrace.SpanExporter = (*Exporter)(nil)

// ExportSpans transforms and exports OpenTelemetry spans to Jaeger.
func (e *Exporter) ExportSpans(ctx context.Context, spans []sdktrace.ReadOnlySpan) error {
	// Return fast if context is already canceled or Exporter shutdown.
	select {
	case <-ctx.Done():
		return ctx.Err()
	case <-e.stopCh:
		return nil
	default:
	}

	// Cancel export if Exporter is shutdown.
	var cancel context.CancelFunc
	ctx, cancel = context.WithCancel(ctx)
	defer cancel()
	go func(ctx context.Context, cancel context.CancelFunc) {
		select {
		case <-ctx.Done():
		case <-e.stopCh:
			cancel()
		}
	}(ctx, cancel)

	for _, batch := range jaegerBatchList(spans, e.defaultServiceName) {
		if err := e.uploader.upload(ctx, batch); err != nil {
			return err
		}
	}

	return nil
}

// Shutdown stops the Exporter. This will close all connections and release
// all resources held by the Exporter.
func (e *Exporter) Shutdown(ctx context.Context) error {
	// Stop any active and subsequent exports.
	e.stopOnce.Do(func() { close(e.stopCh) })
	select {
	case <-ctx.Done():
		return ctx.Err()
	default:
	}
	return e.uploader.shutdown(ctx)
}

// jaegerBatchList transforms a slice of spans into a slice of jaeger Batch.
func jaegerBatchList(ssl []sdktrace.ReadOnlySpan, defaultServiceName string) []*gen.Batch {
    //ignore
    batch.Spans = append(batch.Spans, spanToThrift(ss))
    //ignore
}

func spanToThrift(ss sdktrace.ReadOnlySpan) *gen.Span { 
    // ignore
}

這裡Jaeger在Shutdown關閉一些channel時, 有用到一些概念, 能參考小弟以前的文章Channel, goroutine之間的溝通橋樑-The Channel Closing Principle

OTLP trace exporter

上面的圖跟之前有提到, OTel架構有一個Collector服務,
也是能讓exporter把span資料匯出到collector上, 但它走得主要是OTLP協議.

type Exporter struct {
	client Client

	mu      sync.RWMutex
	started bool

	startOnce sync.Once
	stopOnce  sync.Once
}

// ExportSpans exports a batch of spans.
func (e *Exporter) ExportSpans(ctx context.Context, ss []tracesdk.ReadOnlySpan) error {
	protoSpans := tracetransform.Spans(ss)
	if len(protoSpans) == 0 {
		return nil
	}

	return e.client.UploadTraces(ctx, protoSpans)
}

// Shutdown flushes all exports and closes all connections to the receiving endpoint.
func (e *Exporter) Shutdown(ctx context.Context) error {
	e.mu.RLock()
	started := e.started
	e.mu.RUnlock()

	if !started {
		return nil
	}

	var err error

	e.stopOnce.Do(func() {
		err = e.client.Stop(ctx)
		e.mu.Lock()
		e.started = false
		e.mu.Unlock()
	})

	return err
}

最後用Stdout trace exporter跑看看結果

exp, err := stdouttrace.New(stdouttrace.WithPrettyPrint())
	if err != nil {
		log.Fatalf("error: %s", err.Error())
	}

	tp := trace.NewTracerProvider(
		trace.WithSampler(getSampler()),
		trace.WithSpanProcessor(trace.NewBatchSpanProcessor(exp)),
		trace.WithSpanProcessor(trace.NewSimpleSpanProcessor(exp)),
		trace.WithResource(newResource(ctx)),
	)
	otel.SetTracerProvider(tp)


可以看到有很都requect context與span context資料

今日小心得

OTel-Go trace exporter這裡官方提供了有Jaeger, Stdout, Zipkin, OTLP的部份連結
開發者也是能客製化自己的exporter,只要實做該介面就好.
重要的還是一些race condition與同步/非同步的方面給設計好.

Metrics部份官方也有提供Prometheus的exporter能直接使用連結

參考資料

OTel trace exporter


上一篇
淺談OpenTelemetry Specification - Trace Processor
下一篇
淺談OpenTelemetry - Collector
系列文
淺談DevOps與Observability36
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言