Building Enterprise Observability Platform: Comprehensive Monitoring, Logging, and Tracing Solutions
Modern distributed systems require comprehensive observability to ensure reliability, performance, and operational excellence. This guide explores building an enterprise-grade observability platform that provides deep insights into system behavior through metrics, logs, and traces. We’ll cover the implementation of a complete observability stack using industry-standard tools and best practices.
Observability Architecture Overview
Three Pillars of Observability
graph TB
subgraph "Applications & Infrastructure"
A[Microservices]
B[Kubernetes Cluster]
C[Databases]
D[Load Balancers]
E[Message Queues]
end
subgraph "Data Collection"
F[Prometheus Exporters]
G[Fluentd/Fluent Bit]
H[OpenTelemetry Collectors]
I[Custom Agents]
end
subgraph "Storage & Processing"
J[Prometheus TSDB]
K[Elasticsearch]
L[Jaeger Storage]
M[ClickHouse]
end
subgraph "Visualization & Alerting"
N[Grafana Dashboards]
O[Kibana]
P[Jaeger UI]
Q[AlertManager]
end
A --> F
B --> F
C --> F
D --> F
E --> F
A --> G
B --> G
C --> G
A --> H
B --> H
F --> J
G --> K
H --> L
J --> N
K --> O
L --> P
J --> Q
Technology Stack Selection
# observability-stack.yml
observability_stack:
metrics:
collection:
- prometheus
- node_exporter
- kube-state-metrics
- custom_exporters
storage:
- prometheus_tsdb
- thanos (long-term storage)
- victoria_metrics (alternative)
visualization:
- grafana
- prometheus_ui
alerting:
- alertmanager
- grafana_alerts
logging:
collection:
- fluentd
- fluent_bit
- filebeat
- vector
processing:
- logstash
- fluentd_filters
storage:
- elasticsearch
- opensearch
- loki
visualization:
- kibana
- grafana_logs
tracing:
collection:
- opentelemetry_collector
- jaeger_agent
- zipkin
storage:
- jaeger_backend
- tempo
- zipkin_storage
visualization:
- jaeger_ui
- grafana_tempo
infrastructure:
orchestration: kubernetes
service_mesh: istio
ingress: nginx_ingress
storage: persistent_volumes
networking: calico
Metrics Collection and Monitoring
Prometheus Configuration
# prometheus/prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
external_labels:
cluster: 'production'
region: 'us-west-2'
rule_files:
- "/etc/prometheus/rules/*.yml"
alerting:
alertmanagers:
- static_configs:
- targets:
- alertmanager:9093
path_prefix: /alertmanager
scheme: http
scrape_configs:
# Prometheus itself
- job_name: 'prometheus'
static_configs:
- targets: ['localhost:9090']
scrape_interval: 30s
metrics_path: /metrics
# Kubernetes API Server
- job_name: 'kubernetes-apiservers'
kubernetes_sd_configs:
- role: endpoints
namespaces:
names:
- default
scheme: https
tls_config:
ca_file: /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
insecure_skip_verify: true
bearer_token_file: /var/run/secrets/kubernetes.io/serviceaccount/token
relabel_configs:
- source_labels: [__meta_kubernetes_namespace, __meta_kubernetes_service_name, __meta_kubernetes_endpoint_port_name]
action: keep
regex: default;kubernetes;https
# Kubernetes Nodes
- job_name: 'kubernetes-nodes'
kubernetes_sd_configs:
- role: node
scheme: https
tls_config:
ca_file: /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
insecure_skip_verify: true
bearer_token_file: /var/run/secrets/kubernetes.io/serviceaccount/token
relabel_configs:
- action: labelmap
regex: __meta_kubernetes_node_label_(.+)
- target_label: __address__
replacement: kubernetes.default.svc:443
- source_labels: [__meta_kubernetes_node_name]
regex: (.+)
target_label: __metrics_path__
replacement: /api/v1/nodes/${1}/proxy/metrics
# Kubernetes Pods
- job_name: 'kubernetes-pods'
kubernetes_sd_configs:
- role: pod
relabel_configs:
- source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]
action: keep
regex: true
- source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_path]
action: replace
target_label: __metrics_path__
regex: (.+)
- source_labels: [__address__, __meta_kubernetes_pod_annotation_prometheus_io_port]
action: replace
regex: ([^:]+)(?::\d+)?;(\d+)
replacement: $1:$2
target_label: __address__
- action: labelmap
regex: __meta_kubernetes_pod_label_(.+)
- source_labels: [__meta_kubernetes_namespace]
action: replace
target_label: kubernetes_namespace
- source_labels: [__meta_kubernetes_pod_name]
action: replace
target_label: kubernetes_pod_name
# Node Exporter
- job_name: 'node-exporter'
kubernetes_sd_configs:
- role: endpoints
relabel_configs:
- source_labels: [__meta_kubernetes_endpoints_name]
regex: 'node-exporter'
action: keep
- source_labels: [__meta_kubernetes_endpoint_port_name]
regex: 'metrics'
action: keep
- source_labels: [__meta_kubernetes_endpoint_address_target_name]
target_label: node
- action: labelmap
regex: __meta_kubernetes_node_label_(.+)
# kube-state-metrics
- job_name: 'kube-state-metrics'
static_configs:
- targets: ['kube-state-metrics:8080']
scrape_interval: 30s
# Application metrics
- job_name: 'application-metrics'
kubernetes_sd_configs:
- role: endpoints
relabel_configs:
- source_labels: [__meta_kubernetes_service_annotation_prometheus_io_scrape]
action: keep
regex: true
- source_labels: [__meta_kubernetes_service_annotation_prometheus_io_scheme]
action: replace
target_label: __scheme__
regex: (https?)
- source_labels: [__meta_kubernetes_service_annotation_prometheus_io_path]
action: replace
target_label: __metrics_path__
regex: (.+)
- source_labels: [__address__, __meta_kubernetes_service_annotation_prometheus_io_port]
action: replace
target_label: __address__
regex: ([^:]+)(?::\d+)?;(\d+)
replacement: $1:$2
- action: labelmap
regex: __meta_kubernetes_service_label_(.+)
- source_labels: [__meta_kubernetes_namespace]
action: replace
target_label: kubernetes_namespace
- source_labels: [__meta_kubernetes_service_name]
action: replace
target_label: kubernetes_name
# Custom exporters
- job_name: 'custom-exporters'
file_sd_configs:
- files:
- '/etc/prometheus/targets/*.json'
refresh_interval: 30s
# Remote write configuration for long-term storage
remote_write:
- url: "http://thanos-receive:19291/api/v1/receive"
queue_config:
max_samples_per_send: 1000
max_shards: 200
capacity: 2500
metadata_config:
send: true
send_interval: 30s
write_relabel_configs:
- source_labels: [__name__]
regex: 'go_.*'
action: drop
# Remote read configuration
remote_read:
- url: "http://thanos-query:9090/api/v1/query"
read_recent: true
Custom Application Metrics
// metrics/application_metrics.go
package metrics
import (
"net/http"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
// Application metrics
var (
// HTTP request metrics
httpRequestsTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "http_requests_total",
Help: "Total number of HTTP requests",
},
[]string{"method", "endpoint", "status_code"},
)
httpRequestDuration = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "http_request_duration_seconds",
Help: "HTTP request duration in seconds",
Buckets: prometheus.DefBuckets,
},
[]string{"method", "endpoint"},
)
// Business metrics
activeUsers = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "active_users_total",
Help: "Number of currently active users",
},
)
orderProcessingTime = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "order_processing_duration_seconds",
Help: "Time taken to process orders",
Buckets: []float64{0.1, 0.5, 1.0, 2.5, 5.0, 10.0},
},
[]string{"order_type", "payment_method"},
)
databaseConnections = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "database_connections_active",
Help: "Number of active database connections",
},
[]string{"database", "pool"},
)
cacheHitRatio = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "cache_hit_ratio",
Help: "Cache hit ratio",
},
[]string{"cache_type"},
)
// Infrastructure metrics
cpuUsage = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "cpu_usage_percent",
Help: "CPU usage percentage",
},
[]string{"core"},
)
memoryUsage = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "memory_usage_bytes",
Help: "Memory usage in bytes",
},
)
diskUsage = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "disk_usage_percent",
Help: "Disk usage percentage",
},
[]string{"mount_point"},
)
)
// MetricsMiddleware wraps HTTP handlers to collect metrics
func MetricsMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
// Wrap the ResponseWriter to capture status code
wrapped := &responseWriter{ResponseWriter: w, statusCode: http.StatusOK}
// Call the next handler
next.ServeHTTP(wrapped, r)
// Record metrics
duration := time.Since(start).Seconds()
httpRequestDuration.WithLabelValues(r.Method, r.URL.Path).Observe(duration)
httpRequestsTotal.WithLabelValues(
r.Method,
r.URL.Path,
http.StatusText(wrapped.statusCode),
).Inc()
})
}
type responseWriter struct {
http.ResponseWriter
statusCode int
}
func (rw *responseWriter) WriteHeader(code int) {
rw.statusCode = code
rw.ResponseWriter.WriteHeader(code)
}
// Business metric functions
func IncrementActiveUsers() {
activeUsers.Inc()
}
func DecrementActiveUsers() {
activeUsers.Dec()
}
func RecordOrderProcessingTime(orderType, paymentMethod string, duration time.Duration) {
orderProcessingTime.WithLabelValues(orderType, paymentMethod).Observe(duration.Seconds())
}
func UpdateDatabaseConnections(database, pool string, count float64) {
databaseConnections.WithLabelValues(database, pool).Set(count)
}
func UpdateCacheHitRatio(cacheType string, ratio float64) {
cacheHitRatio.WithLabelValues(cacheType).Set(ratio)
}
// Infrastructure metric functions
func UpdateCPUUsage(core string, usage float64) {
cpuUsage.WithLabelValues(core).Set(usage)
}
func UpdateMemoryUsage(usage float64) {
memoryUsage.Set(usage)
}
func UpdateDiskUsage(mountPoint string, usage float64) {
diskUsage.WithLabelValues(mountPoint).Set(usage)
}
// Custom collector for complex metrics
type CustomCollector struct {
// Add fields for external data sources
}
func NewCustomCollector() *CustomCollector {
return &CustomCollector{}
}
func (c *CustomCollector) Describe(ch chan<- *prometheus.Desc) {
// Describe custom metrics
}
func (c *CustomCollector) Collect(ch chan<- prometheus.Metric) {
// Collect custom metrics from external sources
// Example: database queries, API calls, etc.
}
// Initialize metrics server
func StartMetricsServer(port string) {
// Register custom collector
prometheus.MustRegister(NewCustomCollector())
// Create metrics endpoint
http.Handle("/metrics", promhttp.Handler())
// Health check endpoint
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("OK"))
})
// Start server
http.ListenAndServe(":"+port, nil)
}
Alerting Rules
# prometheus/rules/application-alerts.yml
groups:
- name: application.rules
rules:
# High error rate
- alert: HighErrorRate
expr: |
(
rate(http_requests_total{status_code=~"5.."}[5m]) /
rate(http_requests_total[5m])
) > 0.05
for: 5m
labels:
severity: critical
team: platform
annotations:
summary: "High error rate detected"
description: "Error rate is {{ $value | humanizePercentage }} for {{ $labels.endpoint }}"
runbook_url: "https://runbooks.company.com/high-error-rate"
# High response time
- alert: HighResponseTime
expr: |
histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m])) > 2
for: 10m
labels:
severity: warning
team: platform
annotations:
summary: "High response time detected"
description: "95th percentile response time is {{ $value }}s for {{ $labels.endpoint }}"
# Low cache hit ratio
- alert: LowCacheHitRatio
expr: cache_hit_ratio < 0.8
for: 15m
labels:
severity: warning
team: platform
annotations:
summary: "Low cache hit ratio"
description: "Cache hit ratio is {{ $value | humanizePercentage }} for {{ $labels.cache_type }}"
# Database connection pool exhaustion
- alert: DatabaseConnectionPoolExhaustion
expr: database_connections_active / database_connections_max > 0.9
for: 5m
labels:
severity: critical
team: database
annotations:
summary: "Database connection pool near exhaustion"
description: "Database {{ $labels.database }} connection pool is {{ $value | humanizePercentage }} full"
- name: infrastructure.rules
rules:
# High CPU usage
- alert: HighCPUUsage
expr: cpu_usage_percent > 80
for: 10m
labels:
severity: warning
team: infrastructure
annotations:
summary: "High CPU usage"
description: "CPU usage is {{ $value }}% on {{ $labels.instance }}"
# High memory usage
- alert: HighMemoryUsage
expr: (memory_usage_bytes / memory_total_bytes) > 0.9
for: 5m
labels:
severity: critical
team: infrastructure
annotations:
summary: "High memory usage"
description: "Memory usage is {{ $value | humanizePercentage }} on {{ $labels.instance }}"
# High disk usage
- alert: HighDiskUsage
expr: disk_usage_percent > 85
for: 5m
labels:
severity: warning
team: infrastructure
annotations:
summary: "High disk usage"
description: "Disk usage is {{ $value }}% on {{ $labels.mount_point }}"
# Pod restart frequency
- alert: PodRestartingFrequently
expr: rate(kube_pod_container_status_restarts_total[1h]) > 0.1
for: 15m
labels:
severity: warning
team: platform
annotations:
summary: "Pod restarting frequently"
description: "Pod {{ $labels.pod }} in namespace {{ $labels.namespace }} is restarting frequently"
- name: kubernetes.rules
rules:
# Node not ready
- alert: NodeNotReady
expr: kube_node_status_condition{condition="Ready",status="true"} == 0
for: 5m
labels:
severity: critical
team: infrastructure
annotations:
summary: "Node not ready"
description: "Node {{ $labels.node }} is not ready"
# Pod not ready
- alert: PodNotReady
expr: kube_pod_status_ready{condition="true"} == 0
for: 10m
labels:
severity: warning
team: platform
annotations:
summary: "Pod not ready"
description: "Pod {{ $labels.pod }} in namespace {{ $labels.namespace }} is not ready"
# Deployment replica mismatch
- alert: DeploymentReplicaMismatch
expr: |
kube_deployment_spec_replicas != kube_deployment_status_available_replicas
for: 15m
labels:
severity: warning
team: platform
annotations:
summary: "Deployment replica mismatch"
description: "Deployment {{ $labels.deployment }} has {{ $value }} available replicas, expected {{ $labels.spec_replicas }}"
Logging Infrastructure
Fluentd Configuration
# fluentd/fluent.conf
<system>
log_level info
workers 4
</system>
# Input sources
<source>
@type tail
@id kubernetes_logs
path /var/log/containers/*.log
pos_file /var/log/fluentd-containers.log.pos
tag kubernetes.*
read_from_head true
<parse>
@type json
time_format %Y-%m-%dT%H:%M:%S.%NZ
time_key time
keep_time_key true
</parse>
</source>
<source>
@type tail
@id system_logs
path /var/log/syslog
pos_file /var/log/fluentd-syslog.log.pos
tag system.syslog
<parse>
@type syslog
</parse>
</source>
<source>
@type tail
@id nginx_access_logs
path /var/log/nginx/access.log
pos_file /var/log/fluentd-nginx-access.log.pos
tag nginx.access
<parse>
@type nginx
</parse>
</source>
<source>
@type tail
@id application_logs
path /var/log/application/*.log
pos_file /var/log/fluentd-application.log.pos
tag application.*
<parse>
@type json
time_format %Y-%m-%d %H:%M:%S
time_key timestamp
</parse>
</source>
# Filters for log processing
<filter kubernetes.**>
@type kubernetes_metadata
@id kubernetes_metadata
kubernetes_url "#{ENV['KUBERNETES_SERVICE_HOST']}:#{ENV['KUBERNETES_SERVICE_PORT_HTTPS']}"
verify_ssl "#{ENV['KUBERNETES_VERIFY_SSL'] || true}"
ca_file "#{ENV['KUBERNETES_CA_FILE']}"
skip_labels false
skip_container_metadata false
skip_master_url false
skip_namespace_metadata false
</filter>
<filter kubernetes.**>
@type parser
@id kubernetes_parser
key_name log
reserve_data true
remove_key_name_field true
<parse>
@type multi_format
<pattern>
format json
time_key timestamp
time_format %Y-%m-%dT%H:%M:%S.%NZ
</pattern>
<pattern>
format regexp
expression /^(?<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \[(?<level>\w+)\] (?<message>.*)/
time_key timestamp
time_format %Y-%m-%d %H:%M:%S
</pattern>
<pattern>
format none
</pattern>
</parse>
</filter>
<filter **>
@type record_transformer
@id add_metadata
<record>
hostname "#{Socket.gethostname}"
environment "#{ENV['ENVIRONMENT'] || 'unknown'}"
cluster "#{ENV['CLUSTER_NAME'] || 'unknown'}"
region "#{ENV['AWS_REGION'] || 'unknown'}"
</record>
</filter>
# Log enrichment and parsing
<filter application.**>
@type parser
@id application_parser
key_name message
reserve_data true
<parse>
@type json
</parse>
</filter>
<filter nginx.**>
@type parser
@id nginx_parser
key_name message
reserve_data true
<parse>
@type regexp
expression /^(?<remote_addr>\S+) - (?<remote_user>\S+) \[(?<time_local>[^\]]+)\] "(?<method>\S+) (?<path>\S+) (?<protocol>\S+)" (?<status>\d+) (?<body_bytes_sent>\d+) "(?<http_referer>[^"]*)" "(?<http_user_agent>[^"]*)" "(?<http_x_forwarded_for>[^"]*)" (?<request_time>\S+) (?<upstream_response_time>\S+)/
time_key time_local
time_format %d/%b/%Y:%H:%M:%S %z
</parse>
</filter>
# Error detection and alerting
<filter **>
@type grep
@id error_detection
<regexp>
key level
pattern ^(ERROR|FATAL|CRITICAL)$
</regexp>
<or>
<regexp>
key status
pattern ^[45]\d{2}$
</regexp>
</or>
</filter>
# Sampling for high-volume logs
<filter kubernetes.var.log.containers.high-volume-app-**>
@type sampling
@id high_volume_sampling
sampling_rate 10
sample_unit tag
</filter>
# Output configurations
<match kubernetes.**>
@type elasticsearch
@id elasticsearch_kubernetes
host "#{ENV['ELASTICSEARCH_HOST'] || 'elasticsearch'}"
port "#{ENV['ELASTICSEARCH_PORT'] || '9200'}"
scheme "#{ENV['ELASTICSEARCH_SCHEME'] || 'http'}"
user "#{ENV['ELASTICSEARCH_USER']}"
password "#{ENV['ELASTICSEARCH_PASSWORD']}"
index_name kubernetes-logs
type_name _doc
# Index lifecycle management
template_name kubernetes-logs
template_file /fluentd/etc/kubernetes-template.json
# Buffer configuration
<buffer>
@type file
path /var/log/fluentd-buffers/kubernetes.buffer
flush_mode interval
retry_type exponential_backoff
flush_thread_count 2
flush_interval 5s
retry_forever
retry_max_interval 30
chunk_limit_size 2M
queue_limit_length 8
overflow_action block
</buffer>
# Request configuration
request_timeout 60s
reload_connections false
reconnect_on_error true
reload_on_failure true
# Slow log threshold
slow_flush_log_threshold 40.0
</match>
<match application.**>
@type elasticsearch
@id elasticsearch_application
host "#{ENV['ELASTICSEARCH_HOST'] || 'elasticsearch'}"
port "#{ENV['ELASTICSEARCH_PORT'] || '9200'}"
scheme "#{ENV['ELASTICSEARCH_SCHEME'] || 'http'}"
user "#{ENV['ELASTICSEARCH_USER']}"
password "#{ENV['ELASTICSEARCH_PASSWORD']}"
index_name application-logs-%Y.%m.%d
type_name _doc
<buffer time>
@type file
path /var/log/fluentd-buffers/application.buffer
timekey 1d
timekey_wait 10m
timekey_use_utc true
flush_mode interval
flush_interval 10s
chunk_limit_size 5M
</buffer>
</match>
<match nginx.**>
@type elasticsearch
@id elasticsearch_nginx
host "#{ENV['ELASTICSEARCH_HOST'] || 'elasticsearch'}"
port "#{ENV['ELASTICSEARCH_PORT'] || '9200'}"
scheme "#{ENV['ELASTICSEARCH_SCHEME'] || 'http'}"
user "#{ENV['ELASTICSEARCH_USER']}"
password "#{ENV['ELASTICSEARCH_PASSWORD']}"
index_name nginx-logs-%Y.%m.%d
type_name _doc
<buffer time>
@type file
path /var/log/fluentd-buffers/nginx.buffer
timekey 1d
timekey_wait 10m
flush_mode interval
flush_interval 5s
</buffer>
</match>
# Backup to S3 for long-term storage
<match **>
@type copy
<store>
@type s3
@id s3_backup
aws_key_id "#{ENV['AWS_ACCESS_KEY_ID']}"
aws_sec_key "#{ENV['AWS_SECRET_ACCESS_KEY']}"
s3_bucket "#{ENV['S3_BACKUP_BUCKET']}"
s3_region "#{ENV['AWS_REGION']}"
path logs/%Y/%m/%d/
s3_object_key_format %{path}%{time_slice}_%{index}.%{file_extension}
<buffer time>
@type file
path /var/log/fluentd-buffers/s3.buffer
timekey 3600
timekey_wait 10m
chunk_limit_size 256m
</buffer>
<format>
@type json
</format>
</store>
</match>
Elasticsearch Index Templates
{
"index_patterns": ["kubernetes-logs-*"],
"template": {
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1,
"index.refresh_interval": "30s",
"index.codec": "best_compression",
"index.lifecycle.name": "kubernetes-logs-policy",
"index.lifecycle.rollover_alias": "kubernetes-logs"
},
"mappings": {
"properties": {
"@timestamp": {
"type": "date"
},
"kubernetes": {
"properties": {
"namespace_name": {
"type": "keyword"
},
"pod_name": {
"type": "keyword"
},
"container_name": {
"type": "keyword"
},
"labels": {
"type": "object",
"dynamic": true
}
}
},
"level": {
"type": "keyword"
},
"message": {
"type": "text",
"analyzer": "standard"
},
"hostname": {
"type": "keyword"
},
"environment": {
"type": "keyword"
},
"cluster": {
"type": "keyword"
},
"region": {
"type": "keyword"
}
}
}
}
}
Distributed Tracing
OpenTelemetry Configuration
# otel-collector/config.yml
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
http:
endpoint: 0.0.0.0:4318
jaeger:
protocols:
grpc:
endpoint: 0.0.0.0:14250
thrift_http:
endpoint: 0.0.0.0:14268
thrift_compact:
endpoint: 0.0.0.0:6831
thrift_binary:
endpoint: 0.0.0.0:6832
zipkin:
endpoint: 0.0.0.0:9411
prometheus:
config:
scrape_configs:
- job_name: 'otel-collector'
scrape_interval: 10s
static_configs:
- targets: ['0.0.0.0:8888']
processors:
batch:
timeout: 1s
send_batch_size: 1024
send_batch_max_size: 2048
memory_limiter:
limit_mib: 512
spike_limit_mib: 128
check_interval: 5s
resource:
attributes:
- key: environment
value: production
action: upsert
- key: cluster
value: main-cluster
action: upsert
attributes:
actions:
- key: http.user_agent
action: delete
- key: http.request.header.authorization
action: delete
span:
name:
to_attributes:
rules:
- ^\/api\/v1\/users\/(?P<user_id>\d+)$
from_attributes: ["http.method", "http.route"]
probabilistic_sampler:
hash_seed: 22
sampling_percentage: 15.3
exporters:
jaeger:
endpoint: jaeger-collector:14250
tls:
insecure: true
zipkin:
endpoint: "http://zipkin:9411/api/v2/spans"
format: proto
prometheus:
endpoint: "0.0.0.0:8889"
const_labels:
cluster: main-cluster
logging:
loglevel: debug
elasticsearch/traces:
endpoints: ["http://elasticsearch:9200"]
index: traces-%Y.%m.%d
mapping:
mode: ecs
extensions:
health_check:
endpoint: 0.0.0.0:13133
pprof:
endpoint: 0.0.0.0:1777
zpages:
endpoint: 0.0.0.0:55679
service:
extensions: [health_check, pprof, zpages]
pipelines:
traces:
receivers: [otlp, jaeger, zipkin]
processors: [memory_limiter, resource, attributes, span, probabilistic_sampler, batch]
exporters: [jaeger, elasticsearch/traces, logging]
metrics:
receivers: [otlp, prometheus]
processors: [memory_limiter, resource, batch]
exporters: [prometheus, logging]
logs:
receivers: [otlp]
processors: [memory_limiter, resource, batch]
exporters: [logging]
telemetry:
logs:
level: "debug"
metrics:
address: 0.0.0.0:8888
Application Tracing Implementation
// tracing/tracer.go
package tracing
import (
"context"
"fmt"
"net/http"
"time"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/jaeger"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.12.0"
oteltrace "go.opentelemetry.io/otel/trace"
)
// TracingConfig holds tracing configuration
type TracingConfig struct {
ServiceName string
ServiceVersion string
Environment string
JaegerEndpoint string
OTLPEndpoint string
SamplingRatio float64
}
// InitTracing initializes OpenTelemetry tracing
func InitTracing(config TracingConfig) (func(), error) {
// Create resource
res, err := resource.New(context.Background(),
resource.WithAttributes(
semconv.ServiceNameKey.String(config.ServiceName),
semconv.ServiceVersionKey.String(config.ServiceVersion),
semconv.DeploymentEnvironmentKey.String(config.Environment),
),
)
if err != nil {
return nil, fmt.Errorf("failed to create resource: %w", err)
}
// Create exporters
var exporters []trace.SpanExporter
// Jaeger exporter
if config.JaegerEndpoint != "" {
jaegerExporter, err := jaeger.New(
jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(config.JaegerEndpoint)),
)
if err != nil {
return nil, fmt.Errorf("failed to create Jaeger exporter: %w", err)
}
exporters = append(exporters, jaegerExporter)
}
// OTLP exporter
if config.OTLPEndpoint != "" {
otlpExporter, err := otlptracehttp.New(context.Background(),
otlptracehttp.WithEndpoint(config.OTLPEndpoint),
otlptracehttp.WithInsecure(),
)
if err != nil {
return nil, fmt.Errorf("failed to create OTLP exporter: %w", err)
}
exporters = append(exporters, otlpExporter)
}
// Create span processor
var spanProcessors []trace.SpanProcessor
for _, exporter := range exporters {
spanProcessors = append(spanProcessors, trace.NewBatchSpanProcessor(exporter))
}
// Create tracer provider
tp := trace.NewTracerProvider(
trace.WithResource(res),
trace.WithSampler(trace.TraceIDRatioBased(config.SamplingRatio)),
)
// Add span processors
for _, processor := range spanProcessors {
tp.RegisterSpanProcessor(processor)
}
// Set global tracer provider
otel.SetTracerProvider(tp)
// Set global propagator
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
))
// Return cleanup function
return func() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
tp.Shutdown(ctx)
}, nil
}
// TracingMiddleware creates HTTP middleware for tracing
func TracingMiddleware(serviceName string) func(http.Handler) http.Handler {
tracer := otel.Tracer(serviceName)
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Extract context from headers
ctx := otel.GetTextMapPropagator().Extract(r.Context(), propagation.HeaderCarrier(r.Header))
// Start span
ctx, span := tracer.Start(ctx, fmt.Sprintf("%s %s", r.Method, r.URL.Path),
oteltrace.WithAttributes(
semconv.HTTPMethodKey.String(r.Method),
semconv.HTTPURLKey.String(r.URL.String()),
semconv.HTTPSchemeKey.String(r.URL.Scheme),
semconv.HTTPHostKey.String(r.Host),
semconv.HTTPTargetKey.String(r.URL.Path),
semconv.HTTPUserAgentKey.String(r.UserAgent()),
semconv.HTTPClientIPKey.String(getClientIP(r)),
),
oteltrace.WithSpanKind(oteltrace.SpanKindServer),
)
defer span.End()
// Wrap response writer to capture status code
wrapped := &tracingResponseWriter{ResponseWriter: w, statusCode: http.StatusOK}
// Add span context to request
r = r.WithContext(ctx)
// Call next handler
next.ServeHTTP(wrapped, r)
// Set span attributes
span.SetAttributes(
semconv.HTTPStatusCodeKey.Int(wrapped.statusCode),
semconv.HTTPResponseSizeKey.Int64(wrapped.bytesWritten),
)
// Set span status
if wrapped.statusCode >= 400 {
span.SetAttributes(attribute.Bool("error", true))
if wrapped.statusCode >= 500 {
span.SetStatus(oteltrace.StatusError, http.StatusText(wrapped.statusCode))
}
}
})
}
}
type tracingResponseWriter struct {
http.ResponseWriter
statusCode int
bytesWritten int64
}
func (w *tracingResponseWriter) WriteHeader(statusCode int) {
w.statusCode = statusCode
w.ResponseWriter.WriteHeader(statusCode)
}
func (w *tracingResponseWriter) Write(data []byte) (int, error) {
n, err := w.ResponseWriter.Write(data)
w.bytesWritten += int64(n)
return n, err
}
func getClientIP(r *http.Request) string {
// Check X-Forwarded-For header
if xff := r.Header.Get("X-Forwarded-For"); xff != "" {
return xff
}
// Check X-Real-IP header
if xri := r.Header.Get("X-Real-IP"); xri != "" {
return xri
}
// Fall back to remote address
return r.RemoteAddr
}
// Database tracing helper
func TraceDBOperation(ctx context.Context, operation, query string) (context.Context, oteltrace.Span) {
tracer := otel.Tracer("database")
return tracer.Start(ctx, operation,
oteltrace.WithAttributes(
semconv.DBStatementKey.String(query),
semconv.DBSystemKey.String("postgresql"),
),
oteltrace.WithSpanKind(oteltrace.SpanKindClient),
)
}
// HTTP client tracing helper
func TraceHTTPClient(ctx context.Context, method, url string) (context.Context, oteltrace.Span) {
tracer := otel.Tracer("http-client")
return tracer.Start(ctx, fmt.Sprintf("%s %s", method, url),
oteltrace.WithAttributes(
semconv.HTTPMethodKey.String(method),
semconv.HTTPURLKey.String(url),
),
oteltrace.WithSpanKind(oteltrace.SpanKindClient),
)
}
// Custom span helper
func StartSpan(ctx context.Context, name string, attrs ...attribute.KeyValue) (context.Context, oteltrace.Span) {
tracer := otel.Tracer("application")
return tracer.Start(ctx, name, oteltrace.WithAttributes(attrs...))
}
// Add custom attributes to current span
func AddSpanAttributes(ctx context.Context, attrs ...attribute.KeyValue) {
span := oteltrace.SpanFromContext(ctx)
span.SetAttributes(attrs...)
}
// Add span event
func AddSpanEvent(ctx context.Context, name string, attrs ...attribute.KeyValue) {
span := oteltrace.SpanFromContext(ctx)
span.AddEvent(name, oteltrace.WithAttributes(attrs...))
}
// Record span error
func RecordSpanError(ctx context.Context, err error) {
span := oteltrace.SpanFromContext(ctx)
span.RecordError(err)
span.SetStatus(oteltrace.StatusError, err.Error())
}
现在我将更新todo列表,标记英文运维博文的进度:
<function_calls>