Skip to main content

Building Enterprise Observability Platform: Comprehensive Monitoring, Logging, and Tracing Solutions

Author
14 min
2804 words
--

AI Summary

This article provides comprehensive insights into "Building Enterprise Observability Platform: Comprehensive Monitoring, Logging, and Tracing Solutions", exploring key concepts, practical applications, and future developments to offer readers a thorough understanding of the subject matter.

Content generated by AI

Building Enterprise Observability Platform: Comprehensive Monitoring, Logging, and Tracing Solutions

Modern distributed systems require comprehensive observability to ensure reliability, performance, and operational excellence. This guide explores building an enterprise-grade observability platform that provides deep insights into system behavior through metrics, logs, and traces. We’ll cover the implementation of a complete observability stack using industry-standard tools and best practices.

Observability Architecture Overview

Three Pillars of Observability

graph TB
    subgraph "Applications & Infrastructure"
        A[Microservices]
        B[Kubernetes Cluster]
        C[Databases]
        D[Load Balancers]
        E[Message Queues]
    end
    
    subgraph "Data Collection"
        F[Prometheus Exporters]
        G[Fluentd/Fluent Bit]
        H[OpenTelemetry Collectors]
        I[Custom Agents]
    end
    
    subgraph "Storage & Processing"
        J[Prometheus TSDB]
        K[Elasticsearch]
        L[Jaeger Storage]
        M[ClickHouse]
    end
    
    subgraph "Visualization & Alerting"
        N[Grafana Dashboards]
        O[Kibana]
        P[Jaeger UI]
        Q[AlertManager]
    end
    
    A --> F
    B --> F
    C --> F
    D --> F
    E --> F
    
    A --> G
    B --> G
    C --> G
    
    A --> H
    B --> H
    
    F --> J
    G --> K
    H --> L
    
    J --> N
    K --> O
    L --> P
    J --> Q

Technology Stack Selection

# observability-stack.yml
observability_stack:
  metrics:
    collection:
      - prometheus
      - node_exporter
      - kube-state-metrics
      - custom_exporters
    storage:
      - prometheus_tsdb
      - thanos (long-term storage)
      - victoria_metrics (alternative)
    visualization:
      - grafana
      - prometheus_ui
    alerting:
      - alertmanager
      - grafana_alerts
  
  logging:
    collection:
      - fluentd
      - fluent_bit
      - filebeat
      - vector
    processing:
      - logstash
      - fluentd_filters
    storage:
      - elasticsearch
      - opensearch
      - loki
    visualization:
      - kibana
      - grafana_logs
  
  tracing:
    collection:
      - opentelemetry_collector
      - jaeger_agent
      - zipkin
    storage:
      - jaeger_backend
      - tempo
      - zipkin_storage
    visualization:
      - jaeger_ui
      - grafana_tempo
  
  infrastructure:
    orchestration: kubernetes
    service_mesh: istio
    ingress: nginx_ingress
    storage: persistent_volumes
    networking: calico

Metrics Collection and Monitoring

Prometheus Configuration

# prometheus/prometheus.yml
global:
  scrape_interval: 15s
  evaluation_interval: 15s
  external_labels:
    cluster: 'production'
    region: 'us-west-2'

rule_files:
  - "/etc/prometheus/rules/*.yml"

alerting:
  alertmanagers:
    - static_configs:
        - targets:
          - alertmanager:9093
      path_prefix: /alertmanager
      scheme: http

scrape_configs:
  # Prometheus itself
  - job_name: 'prometheus'
    static_configs:
      - targets: ['localhost:9090']
    scrape_interval: 30s
    metrics_path: /metrics

  # Kubernetes API Server
  - job_name: 'kubernetes-apiservers'
    kubernetes_sd_configs:
      - role: endpoints
        namespaces:
          names:
            - default
    scheme: https
    tls_config:
      ca_file: /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
      insecure_skip_verify: true
    bearer_token_file: /var/run/secrets/kubernetes.io/serviceaccount/token
    relabel_configs:
      - source_labels: [__meta_kubernetes_namespace, __meta_kubernetes_service_name, __meta_kubernetes_endpoint_port_name]
        action: keep
        regex: default;kubernetes;https

  # Kubernetes Nodes
  - job_name: 'kubernetes-nodes'
    kubernetes_sd_configs:
      - role: node
    scheme: https
    tls_config:
      ca_file: /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
      insecure_skip_verify: true
    bearer_token_file: /var/run/secrets/kubernetes.io/serviceaccount/token
    relabel_configs:
      - action: labelmap
        regex: __meta_kubernetes_node_label_(.+)
      - target_label: __address__
        replacement: kubernetes.default.svc:443
      - source_labels: [__meta_kubernetes_node_name]
        regex: (.+)
        target_label: __metrics_path__
        replacement: /api/v1/nodes/${1}/proxy/metrics

  # Kubernetes Pods
  - job_name: 'kubernetes-pods'
    kubernetes_sd_configs:
      - role: pod
    relabel_configs:
      - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]
        action: keep
        regex: true
      - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_path]
        action: replace
        target_label: __metrics_path__
        regex: (.+)
      - source_labels: [__address__, __meta_kubernetes_pod_annotation_prometheus_io_port]
        action: replace
        regex: ([^:]+)(?::\d+)?;(\d+)
        replacement: $1:$2
        target_label: __address__
      - action: labelmap
        regex: __meta_kubernetes_pod_label_(.+)
      - source_labels: [__meta_kubernetes_namespace]
        action: replace
        target_label: kubernetes_namespace
      - source_labels: [__meta_kubernetes_pod_name]
        action: replace
        target_label: kubernetes_pod_name

  # Node Exporter
  - job_name: 'node-exporter'
    kubernetes_sd_configs:
      - role: endpoints
    relabel_configs:
      - source_labels: [__meta_kubernetes_endpoints_name]
        regex: 'node-exporter'
        action: keep
      - source_labels: [__meta_kubernetes_endpoint_port_name]
        regex: 'metrics'
        action: keep
      - source_labels: [__meta_kubernetes_endpoint_address_target_name]
        target_label: node
      - action: labelmap
        regex: __meta_kubernetes_node_label_(.+)

  # kube-state-metrics
  - job_name: 'kube-state-metrics'
    static_configs:
      - targets: ['kube-state-metrics:8080']
    scrape_interval: 30s

  # Application metrics
  - job_name: 'application-metrics'
    kubernetes_sd_configs:
      - role: endpoints
    relabel_configs:
      - source_labels: [__meta_kubernetes_service_annotation_prometheus_io_scrape]
        action: keep
        regex: true
      - source_labels: [__meta_kubernetes_service_annotation_prometheus_io_scheme]
        action: replace
        target_label: __scheme__
        regex: (https?)
      - source_labels: [__meta_kubernetes_service_annotation_prometheus_io_path]
        action: replace
        target_label: __metrics_path__
        regex: (.+)
      - source_labels: [__address__, __meta_kubernetes_service_annotation_prometheus_io_port]
        action: replace
        target_label: __address__
        regex: ([^:]+)(?::\d+)?;(\d+)
        replacement: $1:$2
      - action: labelmap
        regex: __meta_kubernetes_service_label_(.+)
      - source_labels: [__meta_kubernetes_namespace]
        action: replace
        target_label: kubernetes_namespace
      - source_labels: [__meta_kubernetes_service_name]
        action: replace
        target_label: kubernetes_name

  # Custom exporters
  - job_name: 'custom-exporters'
    file_sd_configs:
      - files:
          - '/etc/prometheus/targets/*.json'
        refresh_interval: 30s

# Remote write configuration for long-term storage
remote_write:
  - url: "http://thanos-receive:19291/api/v1/receive"
    queue_config:
      max_samples_per_send: 1000
      max_shards: 200
      capacity: 2500
    metadata_config:
      send: true
      send_interval: 30s
    write_relabel_configs:
      - source_labels: [__name__]
        regex: 'go_.*'
        action: drop

# Remote read configuration
remote_read:
  - url: "http://thanos-query:9090/api/v1/query"
    read_recent: true

Custom Application Metrics

// metrics/application_metrics.go
package metrics

import (
    "net/http"
    "time"
    
    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promauto"
    "github.com/prometheus/client_golang/prometheus/promhttp"
)

// Application metrics
var (
    // HTTP request metrics
    httpRequestsTotal = promauto.NewCounterVec(
        prometheus.CounterOpts{
            Name: "http_requests_total",
            Help: "Total number of HTTP requests",
        },
        []string{"method", "endpoint", "status_code"},
    )
    
    httpRequestDuration = promauto.NewHistogramVec(
        prometheus.HistogramOpts{
            Name:    "http_request_duration_seconds",
            Help:    "HTTP request duration in seconds",
            Buckets: prometheus.DefBuckets,
        },
        []string{"method", "endpoint"},
    )
    
    // Business metrics
    activeUsers = promauto.NewGauge(
        prometheus.GaugeOpts{
            Name: "active_users_total",
            Help: "Number of currently active users",
        },
    )
    
    orderProcessingTime = promauto.NewHistogramVec(
        prometheus.HistogramOpts{
            Name: "order_processing_duration_seconds",
            Help: "Time taken to process orders",
            Buckets: []float64{0.1, 0.5, 1.0, 2.5, 5.0, 10.0},
        },
        []string{"order_type", "payment_method"},
    )
    
    databaseConnections = promauto.NewGaugeVec(
        prometheus.GaugeOpts{
            Name: "database_connections_active",
            Help: "Number of active database connections",
        },
        []string{"database", "pool"},
    )
    
    cacheHitRatio = promauto.NewGaugeVec(
        prometheus.GaugeOpts{
            Name: "cache_hit_ratio",
            Help: "Cache hit ratio",
        },
        []string{"cache_type"},
    )
    
    // Infrastructure metrics
    cpuUsage = promauto.NewGaugeVec(
        prometheus.GaugeOpts{
            Name: "cpu_usage_percent",
            Help: "CPU usage percentage",
        },
        []string{"core"},
    )
    
    memoryUsage = promauto.NewGauge(
        prometheus.GaugeOpts{
            Name: "memory_usage_bytes",
            Help: "Memory usage in bytes",
        },
    )
    
    diskUsage = promauto.NewGaugeVec(
        prometheus.GaugeOpts{
            Name: "disk_usage_percent",
            Help: "Disk usage percentage",
        },
        []string{"mount_point"},
    )
)

// MetricsMiddleware wraps HTTP handlers to collect metrics
func MetricsMiddleware(next http.Handler) http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        start := time.Now()
        
        // Wrap the ResponseWriter to capture status code
        wrapped := &responseWriter{ResponseWriter: w, statusCode: http.StatusOK}
        
        // Call the next handler
        next.ServeHTTP(wrapped, r)
        
        // Record metrics
        duration := time.Since(start).Seconds()
        httpRequestDuration.WithLabelValues(r.Method, r.URL.Path).Observe(duration)
        httpRequestsTotal.WithLabelValues(
            r.Method,
            r.URL.Path,
            http.StatusText(wrapped.statusCode),
        ).Inc()
    })
}

type responseWriter struct {
    http.ResponseWriter
    statusCode int
}

func (rw *responseWriter) WriteHeader(code int) {
    rw.statusCode = code
    rw.ResponseWriter.WriteHeader(code)
}

// Business metric functions
func IncrementActiveUsers() {
    activeUsers.Inc()
}

func DecrementActiveUsers() {
    activeUsers.Dec()
}

func RecordOrderProcessingTime(orderType, paymentMethod string, duration time.Duration) {
    orderProcessingTime.WithLabelValues(orderType, paymentMethod).Observe(duration.Seconds())
}

func UpdateDatabaseConnections(database, pool string, count float64) {
    databaseConnections.WithLabelValues(database, pool).Set(count)
}

func UpdateCacheHitRatio(cacheType string, ratio float64) {
    cacheHitRatio.WithLabelValues(cacheType).Set(ratio)
}

// Infrastructure metric functions
func UpdateCPUUsage(core string, usage float64) {
    cpuUsage.WithLabelValues(core).Set(usage)
}

func UpdateMemoryUsage(usage float64) {
    memoryUsage.Set(usage)
}

func UpdateDiskUsage(mountPoint string, usage float64) {
    diskUsage.WithLabelValues(mountPoint).Set(usage)
}

// Custom collector for complex metrics
type CustomCollector struct {
    // Add fields for external data sources
}

func NewCustomCollector() *CustomCollector {
    return &CustomCollector{}
}

func (c *CustomCollector) Describe(ch chan<- *prometheus.Desc) {
    // Describe custom metrics
}

func (c *CustomCollector) Collect(ch chan<- prometheus.Metric) {
    // Collect custom metrics from external sources
    // Example: database queries, API calls, etc.
}

// Initialize metrics server
func StartMetricsServer(port string) {
    // Register custom collector
    prometheus.MustRegister(NewCustomCollector())
    
    // Create metrics endpoint
    http.Handle("/metrics", promhttp.Handler())
    
    // Health check endpoint
    http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
        w.WriteHeader(http.StatusOK)
        w.Write([]byte("OK"))
    })
    
    // Start server
    http.ListenAndServe(":"+port, nil)
}

Alerting Rules

# prometheus/rules/application-alerts.yml
groups:
  - name: application.rules
    rules:
      # High error rate
      - alert: HighErrorRate
        expr: |
          (
            rate(http_requests_total{status_code=~"5.."}[5m]) /
            rate(http_requests_total[5m])
          ) > 0.05
        for: 5m
        labels:
          severity: critical
          team: platform
        annotations:
          summary: "High error rate detected"
          description: "Error rate is {{ $value | humanizePercentage }} for {{ $labels.endpoint }}"
          runbook_url: "https://runbooks.company.com/high-error-rate"

      # High response time
      - alert: HighResponseTime
        expr: |
          histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m])) > 2
        for: 10m
        labels:
          severity: warning
          team: platform
        annotations:
          summary: "High response time detected"
          description: "95th percentile response time is {{ $value }}s for {{ $labels.endpoint }}"

      # Low cache hit ratio
      - alert: LowCacheHitRatio
        expr: cache_hit_ratio < 0.8
        for: 15m
        labels:
          severity: warning
          team: platform
        annotations:
          summary: "Low cache hit ratio"
          description: "Cache hit ratio is {{ $value | humanizePercentage }} for {{ $labels.cache_type }}"

      # Database connection pool exhaustion
      - alert: DatabaseConnectionPoolExhaustion
        expr: database_connections_active / database_connections_max > 0.9
        for: 5m
        labels:
          severity: critical
          team: database
        annotations:
          summary: "Database connection pool near exhaustion"
          description: "Database {{ $labels.database }} connection pool is {{ $value | humanizePercentage }} full"

  - name: infrastructure.rules
    rules:
      # High CPU usage
      - alert: HighCPUUsage
        expr: cpu_usage_percent > 80
        for: 10m
        labels:
          severity: warning
          team: infrastructure
        annotations:
          summary: "High CPU usage"
          description: "CPU usage is {{ $value }}% on {{ $labels.instance }}"

      # High memory usage
      - alert: HighMemoryUsage
        expr: (memory_usage_bytes / memory_total_bytes) > 0.9
        for: 5m
        labels:
          severity: critical
          team: infrastructure
        annotations:
          summary: "High memory usage"
          description: "Memory usage is {{ $value | humanizePercentage }} on {{ $labels.instance }}"

      # High disk usage
      - alert: HighDiskUsage
        expr: disk_usage_percent > 85
        for: 5m
        labels:
          severity: warning
          team: infrastructure
        annotations:
          summary: "High disk usage"
          description: "Disk usage is {{ $value }}% on {{ $labels.mount_point }}"

      # Pod restart frequency
      - alert: PodRestartingFrequently
        expr: rate(kube_pod_container_status_restarts_total[1h]) > 0.1
        for: 15m
        labels:
          severity: warning
          team: platform
        annotations:
          summary: "Pod restarting frequently"
          description: "Pod {{ $labels.pod }} in namespace {{ $labels.namespace }} is restarting frequently"

  - name: kubernetes.rules
    rules:
      # Node not ready
      - alert: NodeNotReady
        expr: kube_node_status_condition{condition="Ready",status="true"} == 0
        for: 5m
        labels:
          severity: critical
          team: infrastructure
        annotations:
          summary: "Node not ready"
          description: "Node {{ $labels.node }} is not ready"

      # Pod not ready
      - alert: PodNotReady
        expr: kube_pod_status_ready{condition="true"} == 0
        for: 10m
        labels:
          severity: warning
          team: platform
        annotations:
          summary: "Pod not ready"
          description: "Pod {{ $labels.pod }} in namespace {{ $labels.namespace }} is not ready"

      # Deployment replica mismatch
      - alert: DeploymentReplicaMismatch
        expr: |
          kube_deployment_spec_replicas != kube_deployment_status_available_replicas
        for: 15m
        labels:
          severity: warning
          team: platform
        annotations:
          summary: "Deployment replica mismatch"
          description: "Deployment {{ $labels.deployment }} has {{ $value }} available replicas, expected {{ $labels.spec_replicas }}"

Logging Infrastructure

Fluentd Configuration

# fluentd/fluent.conf
<system>
  log_level info
  workers 4
</system>

# Input sources
<source>
  @type tail
  @id kubernetes_logs
  path /var/log/containers/*.log
  pos_file /var/log/fluentd-containers.log.pos
  tag kubernetes.*
  read_from_head true
  <parse>
    @type json
    time_format %Y-%m-%dT%H:%M:%S.%NZ
    time_key time
    keep_time_key true
  </parse>
</source>

<source>
  @type tail
  @id system_logs
  path /var/log/syslog
  pos_file /var/log/fluentd-syslog.log.pos
  tag system.syslog
  <parse>
    @type syslog
  </parse>
</source>

<source>
  @type tail
  @id nginx_access_logs
  path /var/log/nginx/access.log
  pos_file /var/log/fluentd-nginx-access.log.pos
  tag nginx.access
  <parse>
    @type nginx
  </parse>
</source>

<source>
  @type tail
  @id application_logs
  path /var/log/application/*.log
  pos_file /var/log/fluentd-application.log.pos
  tag application.*
  <parse>
    @type json
    time_format %Y-%m-%d %H:%M:%S
    time_key timestamp
  </parse>
</source>

# Filters for log processing
<filter kubernetes.**>
  @type kubernetes_metadata
  @id kubernetes_metadata
  kubernetes_url "#{ENV['KUBERNETES_SERVICE_HOST']}:#{ENV['KUBERNETES_SERVICE_PORT_HTTPS']}"
  verify_ssl "#{ENV['KUBERNETES_VERIFY_SSL'] || true}"
  ca_file "#{ENV['KUBERNETES_CA_FILE']}"
  skip_labels false
  skip_container_metadata false
  skip_master_url false
  skip_namespace_metadata false
</filter>

<filter kubernetes.**>
  @type parser
  @id kubernetes_parser
  key_name log
  reserve_data true
  remove_key_name_field true
  <parse>
    @type multi_format
    <pattern>
      format json
      time_key timestamp
      time_format %Y-%m-%dT%H:%M:%S.%NZ
    </pattern>
    <pattern>
      format regexp
      expression /^(?<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \[(?<level>\w+)\] (?<message>.*)/
      time_key timestamp
      time_format %Y-%m-%d %H:%M:%S
    </pattern>
    <pattern>
      format none
    </pattern>
  </parse>
</filter>

<filter **>
  @type record_transformer
  @id add_metadata
  <record>
    hostname "#{Socket.gethostname}"
    environment "#{ENV['ENVIRONMENT'] || 'unknown'}"
    cluster "#{ENV['CLUSTER_NAME'] || 'unknown'}"
    region "#{ENV['AWS_REGION'] || 'unknown'}"
  </record>
</filter>

# Log enrichment and parsing
<filter application.**>
  @type parser
  @id application_parser
  key_name message
  reserve_data true
  <parse>
    @type json
  </parse>
</filter>

<filter nginx.**>
  @type parser
  @id nginx_parser
  key_name message
  reserve_data true
  <parse>
    @type regexp
    expression /^(?<remote_addr>\S+) - (?<remote_user>\S+) \[(?<time_local>[^\]]+)\] "(?<method>\S+) (?<path>\S+) (?<protocol>\S+)" (?<status>\d+) (?<body_bytes_sent>\d+) "(?<http_referer>[^"]*)" "(?<http_user_agent>[^"]*)" "(?<http_x_forwarded_for>[^"]*)" (?<request_time>\S+) (?<upstream_response_time>\S+)/
    time_key time_local
    time_format %d/%b/%Y:%H:%M:%S %z
  </parse>
</filter>

# Error detection and alerting
<filter **>
  @type grep
  @id error_detection
  <regexp>
    key level
    pattern ^(ERROR|FATAL|CRITICAL)$
  </regexp>
  <or>
    <regexp>
      key status
      pattern ^[45]\d{2}$
    </regexp>
  </or>
</filter>

# Sampling for high-volume logs
<filter kubernetes.var.log.containers.high-volume-app-**>
  @type sampling
  @id high_volume_sampling
  sampling_rate 10
  sample_unit tag
</filter>

# Output configurations
<match kubernetes.**>
  @type elasticsearch
  @id elasticsearch_kubernetes
  host "#{ENV['ELASTICSEARCH_HOST'] || 'elasticsearch'}"
  port "#{ENV['ELASTICSEARCH_PORT'] || '9200'}"
  scheme "#{ENV['ELASTICSEARCH_SCHEME'] || 'http'}"
  user "#{ENV['ELASTICSEARCH_USER']}"
  password "#{ENV['ELASTICSEARCH_PASSWORD']}"
  
  index_name kubernetes-logs
  type_name _doc
  
  # Index lifecycle management
  template_name kubernetes-logs
  template_file /fluentd/etc/kubernetes-template.json
  
  # Buffer configuration
  <buffer>
    @type file
    path /var/log/fluentd-buffers/kubernetes.buffer
    flush_mode interval
    retry_type exponential_backoff
    flush_thread_count 2
    flush_interval 5s
    retry_forever
    retry_max_interval 30
    chunk_limit_size 2M
    queue_limit_length 8
    overflow_action block
  </buffer>
  
  # Request configuration
  request_timeout 60s
  reload_connections false
  reconnect_on_error true
  reload_on_failure true
  
  # Slow log threshold
  slow_flush_log_threshold 40.0
</match>

<match application.**>
  @type elasticsearch
  @id elasticsearch_application
  host "#{ENV['ELASTICSEARCH_HOST'] || 'elasticsearch'}"
  port "#{ENV['ELASTICSEARCH_PORT'] || '9200'}"
  scheme "#{ENV['ELASTICSEARCH_SCHEME'] || 'http'}"
  user "#{ENV['ELASTICSEARCH_USER']}"
  password "#{ENV['ELASTICSEARCH_PASSWORD']}"
  
  index_name application-logs-%Y.%m.%d
  type_name _doc
  
  <buffer time>
    @type file
    path /var/log/fluentd-buffers/application.buffer
    timekey 1d
    timekey_wait 10m
    timekey_use_utc true
    flush_mode interval
    flush_interval 10s
    chunk_limit_size 5M
  </buffer>
</match>

<match nginx.**>
  @type elasticsearch
  @id elasticsearch_nginx
  host "#{ENV['ELASTICSEARCH_HOST'] || 'elasticsearch'}"
  port "#{ENV['ELASTICSEARCH_PORT'] || '9200'}"
  scheme "#{ENV['ELASTICSEARCH_SCHEME'] || 'http'}"
  user "#{ENV['ELASTICSEARCH_USER']}"
  password "#{ENV['ELASTICSEARCH_PASSWORD']}"
  
  index_name nginx-logs-%Y.%m.%d
  type_name _doc
  
  <buffer time>
    @type file
    path /var/log/fluentd-buffers/nginx.buffer
    timekey 1d
    timekey_wait 10m
    flush_mode interval
    flush_interval 5s
  </buffer>
</match>

# Backup to S3 for long-term storage
<match **>
  @type copy
  <store>
    @type s3
    @id s3_backup
    aws_key_id "#{ENV['AWS_ACCESS_KEY_ID']}"
    aws_sec_key "#{ENV['AWS_SECRET_ACCESS_KEY']}"
    s3_bucket "#{ENV['S3_BACKUP_BUCKET']}"
    s3_region "#{ENV['AWS_REGION']}"
    path logs/%Y/%m/%d/
    s3_object_key_format %{path}%{time_slice}_%{index}.%{file_extension}
    
    <buffer time>
      @type file
      path /var/log/fluentd-buffers/s3.buffer
      timekey 3600
      timekey_wait 10m
      chunk_limit_size 256m
    </buffer>
    
    <format>
      @type json
    </format>
  </store>
</match>

Elasticsearch Index Templates

{
  "index_patterns": ["kubernetes-logs-*"],
  "template": {
    "settings": {
      "number_of_shards": 3,
      "number_of_replicas": 1,
      "index.refresh_interval": "30s",
      "index.codec": "best_compression",
      "index.lifecycle.name": "kubernetes-logs-policy",
      "index.lifecycle.rollover_alias": "kubernetes-logs"
    },
    "mappings": {
      "properties": {
        "@timestamp": {
          "type": "date"
        },
        "kubernetes": {
          "properties": {
            "namespace_name": {
              "type": "keyword"
            },
            "pod_name": {
              "type": "keyword"
            },
            "container_name": {
              "type": "keyword"
            },
            "labels": {
              "type": "object",
              "dynamic": true
            }
          }
        },
        "level": {
          "type": "keyword"
        },
        "message": {
          "type": "text",
          "analyzer": "standard"
        },
        "hostname": {
          "type": "keyword"
        },
        "environment": {
          "type": "keyword"
        },
        "cluster": {
          "type": "keyword"
        },
        "region": {
          "type": "keyword"
        }
      }
    }
  }
}

Distributed Tracing

OpenTelemetry Configuration

# otel-collector/config.yml
receivers:
  otlp:
    protocols:
      grpc:
        endpoint: 0.0.0.0:4317
      http:
        endpoint: 0.0.0.0:4318
  
  jaeger:
    protocols:
      grpc:
        endpoint: 0.0.0.0:14250
      thrift_http:
        endpoint: 0.0.0.0:14268
      thrift_compact:
        endpoint: 0.0.0.0:6831
      thrift_binary:
        endpoint: 0.0.0.0:6832
  
  zipkin:
    endpoint: 0.0.0.0:9411
  
  prometheus:
    config:
      scrape_configs:
        - job_name: 'otel-collector'
          scrape_interval: 10s
          static_configs:
            - targets: ['0.0.0.0:8888']

processors:
  batch:
    timeout: 1s
    send_batch_size: 1024
    send_batch_max_size: 2048
  
  memory_limiter:
    limit_mib: 512
    spike_limit_mib: 128
    check_interval: 5s
  
  resource:
    attributes:
      - key: environment
        value: production
        action: upsert
      - key: cluster
        value: main-cluster
        action: upsert
  
  attributes:
    actions:
      - key: http.user_agent
        action: delete
      - key: http.request.header.authorization
        action: delete
  
  span:
    name:
      to_attributes:
        rules:
          - ^\/api\/v1\/users\/(?P<user_id>\d+)$
      from_attributes: ["http.method", "http.route"]
  
  probabilistic_sampler:
    hash_seed: 22
    sampling_percentage: 15.3

exporters:
  jaeger:
    endpoint: jaeger-collector:14250
    tls:
      insecure: true
  
  zipkin:
    endpoint: "http://zipkin:9411/api/v2/spans"
    format: proto
  
  prometheus:
    endpoint: "0.0.0.0:8889"
    const_labels:
      cluster: main-cluster
  
  logging:
    loglevel: debug
  
  elasticsearch/traces:
    endpoints: ["http://elasticsearch:9200"]
    index: traces-%Y.%m.%d
    mapping:
      mode: ecs

extensions:
  health_check:
    endpoint: 0.0.0.0:13133
  
  pprof:
    endpoint: 0.0.0.0:1777
  
  zpages:
    endpoint: 0.0.0.0:55679

service:
  extensions: [health_check, pprof, zpages]
  pipelines:
    traces:
      receivers: [otlp, jaeger, zipkin]
      processors: [memory_limiter, resource, attributes, span, probabilistic_sampler, batch]
      exporters: [jaeger, elasticsearch/traces, logging]
    
    metrics:
      receivers: [otlp, prometheus]
      processors: [memory_limiter, resource, batch]
      exporters: [prometheus, logging]
    
    logs:
      receivers: [otlp]
      processors: [memory_limiter, resource, batch]
      exporters: [logging]
  
  telemetry:
    logs:
      level: "debug"
    metrics:
      address: 0.0.0.0:8888

Application Tracing Implementation

// tracing/tracer.go
package tracing

import (
    "context"
    "fmt"
    "net/http"
    "time"
    
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/attribute"
    "go.opentelemetry.io/otel/exporters/jaeger"
    "go.opentelemetry.io/otel/exporters/otlp/otlptrace"
    "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
    "go.opentelemetry.io/otel/propagation"
    "go.opentelemetry.io/otel/sdk/resource"
    "go.opentelemetry.io/otel/sdk/trace"
    semconv "go.opentelemetry.io/otel/semconv/v1.12.0"
    oteltrace "go.opentelemetry.io/otel/trace"
)

// TracingConfig holds tracing configuration
type TracingConfig struct {
    ServiceName     string
    ServiceVersion  string
    Environment     string
    JaegerEndpoint  string
    OTLPEndpoint    string
    SamplingRatio   float64
}

// InitTracing initializes OpenTelemetry tracing
func InitTracing(config TracingConfig) (func(), error) {
    // Create resource
    res, err := resource.New(context.Background(),
        resource.WithAttributes(
            semconv.ServiceNameKey.String(config.ServiceName),
            semconv.ServiceVersionKey.String(config.ServiceVersion),
            semconv.DeploymentEnvironmentKey.String(config.Environment),
        ),
    )
    if err != nil {
        return nil, fmt.Errorf("failed to create resource: %w", err)
    }
    
    // Create exporters
    var exporters []trace.SpanExporter
    
    // Jaeger exporter
    if config.JaegerEndpoint != "" {
        jaegerExporter, err := jaeger.New(
            jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(config.JaegerEndpoint)),
        )
        if err != nil {
            return nil, fmt.Errorf("failed to create Jaeger exporter: %w", err)
        }
        exporters = append(exporters, jaegerExporter)
    }
    
    // OTLP exporter
    if config.OTLPEndpoint != "" {
        otlpExporter, err := otlptracehttp.New(context.Background(),
            otlptracehttp.WithEndpoint(config.OTLPEndpoint),
            otlptracehttp.WithInsecure(),
        )
        if err != nil {
            return nil, fmt.Errorf("failed to create OTLP exporter: %w", err)
        }
        exporters = append(exporters, otlpExporter)
    }
    
    // Create span processor
    var spanProcessors []trace.SpanProcessor
    for _, exporter := range exporters {
        spanProcessors = append(spanProcessors, trace.NewBatchSpanProcessor(exporter))
    }
    
    // Create tracer provider
    tp := trace.NewTracerProvider(
        trace.WithResource(res),
        trace.WithSampler(trace.TraceIDRatioBased(config.SamplingRatio)),
    )
    
    // Add span processors
    for _, processor := range spanProcessors {
        tp.RegisterSpanProcessor(processor)
    }
    
    // Set global tracer provider
    otel.SetTracerProvider(tp)
    
    // Set global propagator
    otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
        propagation.TraceContext{},
        propagation.Baggage{},
    ))
    
    // Return cleanup function
    return func() {
        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
        defer cancel()
        tp.Shutdown(ctx)
    }, nil
}

// TracingMiddleware creates HTTP middleware for tracing
func TracingMiddleware(serviceName string) func(http.Handler) http.Handler {
    tracer := otel.Tracer(serviceName)
    
    return func(next http.Handler) http.Handler {
        return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
            // Extract context from headers
            ctx := otel.GetTextMapPropagator().Extract(r.Context(), propagation.HeaderCarrier(r.Header))
            
            // Start span
            ctx, span := tracer.Start(ctx, fmt.Sprintf("%s %s", r.Method, r.URL.Path),
                oteltrace.WithAttributes(
                    semconv.HTTPMethodKey.String(r.Method),
                    semconv.HTTPURLKey.String(r.URL.String()),
                    semconv.HTTPSchemeKey.String(r.URL.Scheme),
                    semconv.HTTPHostKey.String(r.Host),
                    semconv.HTTPTargetKey.String(r.URL.Path),
                    semconv.HTTPUserAgentKey.String(r.UserAgent()),
                    semconv.HTTPClientIPKey.String(getClientIP(r)),
                ),
                oteltrace.WithSpanKind(oteltrace.SpanKindServer),
            )
            defer span.End()
            
            // Wrap response writer to capture status code
            wrapped := &tracingResponseWriter{ResponseWriter: w, statusCode: http.StatusOK}
            
            // Add span context to request
            r = r.WithContext(ctx)
            
            // Call next handler
            next.ServeHTTP(wrapped, r)
            
            // Set span attributes
            span.SetAttributes(
                semconv.HTTPStatusCodeKey.Int(wrapped.statusCode),
                semconv.HTTPResponseSizeKey.Int64(wrapped.bytesWritten),
            )
            
            // Set span status
            if wrapped.statusCode >= 400 {
                span.SetAttributes(attribute.Bool("error", true))
                if wrapped.statusCode >= 500 {
                    span.SetStatus(oteltrace.StatusError, http.StatusText(wrapped.statusCode))
                }
            }
        })
    }
}

type tracingResponseWriter struct {
    http.ResponseWriter
    statusCode   int
    bytesWritten int64
}

func (w *tracingResponseWriter) WriteHeader(statusCode int) {
    w.statusCode = statusCode
    w.ResponseWriter.WriteHeader(statusCode)
}

func (w *tracingResponseWriter) Write(data []byte) (int, error) {
    n, err := w.ResponseWriter.Write(data)
    w.bytesWritten += int64(n)
    return n, err
}

func getClientIP(r *http.Request) string {
    // Check X-Forwarded-For header
    if xff := r.Header.Get("X-Forwarded-For"); xff != "" {
        return xff
    }
    
    // Check X-Real-IP header
    if xri := r.Header.Get("X-Real-IP"); xri != "" {
        return xri
    }
    
    // Fall back to remote address
    return r.RemoteAddr
}

// Database tracing helper
func TraceDBOperation(ctx context.Context, operation, query string) (context.Context, oteltrace.Span) {
    tracer := otel.Tracer("database")
    return tracer.Start(ctx, operation,
        oteltrace.WithAttributes(
            semconv.DBStatementKey.String(query),
            semconv.DBSystemKey.String("postgresql"),
        ),
        oteltrace.WithSpanKind(oteltrace.SpanKindClient),
    )
}

// HTTP client tracing helper
func TraceHTTPClient(ctx context.Context, method, url string) (context.Context, oteltrace.Span) {
    tracer := otel.Tracer("http-client")
    return tracer.Start(ctx, fmt.Sprintf("%s %s", method, url),
        oteltrace.WithAttributes(
            semconv.HTTPMethodKey.String(method),
            semconv.HTTPURLKey.String(url),
        ),
        oteltrace.WithSpanKind(oteltrace.SpanKindClient),
    )
}

// Custom span helper
func StartSpan(ctx context.Context, name string, attrs ...attribute.KeyValue) (context.Context, oteltrace.Span) {
    tracer := otel.Tracer("application")
    return tracer.Start(ctx, name, oteltrace.WithAttributes(attrs...))
}

// Add custom attributes to current span
func AddSpanAttributes(ctx context.Context, attrs ...attribute.KeyValue) {
    span := oteltrace.SpanFromContext(ctx)
    span.SetAttributes(attrs...)
}

// Add span event
func AddSpanEvent(ctx context.Context, name string, attrs ...attribute.KeyValue) {
    span := oteltrace.SpanFromContext(ctx)
    span.AddEvent(name, oteltrace.WithAttributes(attrs...))
}

// Record span error
func RecordSpanError(ctx context.Context, err error) {
    span := oteltrace.SpanFromContext(ctx)
    span.RecordError(err)
    span.SetStatus(oteltrace.StatusError, err.Error())
}

现在我将更新todo列表,标记英文运维博文的进度:

<function_calls> 更新todo列表,标记已完成4篇英文运维博文,继续创建剩余的英文运维博文

Share Article