【可观测性】分布式追踪与监控:构建完整的系统可观测体系

发布时间:2026/5/28 5:28:10

【可观测性】分布式追踪与监控:构建完整的系统可观测体系 一、分布式追踪概述1.1 为什么需要分布式追踪在微服务架构中请求会经过多个服务分布式追踪可以定位性能瓶颈找出慢服务和瓶颈点理解调用链路可视化服务间的调用关系故障排查快速定位问题根源容量规划基于实际调用模式进行资源规划1.2 分布式追踪架构┌─────────────────────────────────────────────────────────────┐ │ 分布式追踪架构 │ ├─────────────────────────────────────────────────────────────┤ │ Client │ │ │ │ │ ▼ │ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │ │ Service A │───▶│ Service B │───▶│ Service C │ │ │ │ [Span] │ │ [Span] │ │ [Span] │ │ │ │ TraceID: abc │ │ TraceID: abc │ │ TraceID: abc │ │ │ │ SpanID: 1 │ │ SpanID: 2 │ │ SpanID: 3 │ │ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │ │ │ │ │ │ ▼ ▼ ▼ │ │ ┌──────────────────────────────────────────────────────┐ │ │ │ Collector │ │ │ │ (Jaeger Collector) │ │ │ └──────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌──────────────────────────────────────────────────────┐ │ │ │ Storage │ │ │ │ (Elasticsearch/Cassandra) │ │ │ └──────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌──────────────────────────────────────────────────────┐ │ │ │ UI │ │ │ │ (Jaeger UI / Grafana) │ │ │ └──────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────┘1.3 核心概念概念描述Trace完整的请求链路包含多个SpanSpan单个服务的调用单元Span ContextSpan的元数据TraceID、SpanID、ParentSpanIDTag键值对附加到Span的元数据LogSpan执行过程中的日志记录二、Jaeger分布式追踪2.1 Jaeger安装部署# docker-compose.yml version: 3.8 services: jaeger: image: jaegertracing/all-in-one:latest ports: - 5775:5775/udp - 6831:6831/udp - 6832:6832/udp - 5778:5778 - 16686:16686 - 14268:14268 - 9411:9411 environment: - COLLECTOR_ZIPKIN_HOST_PORT:94112.2 Python客户端集成# Jaeger客户端配置 from jaeger_client import Config from opentracing import global_tracer def init_jaeger(service_name): config Config( config{ sampler: { type: const, param: 1 }, logging: True, local_agent: { reporting_host: jaeger, reporting_port: 6831 } }, service_nameservice_name ) return config.initialize_tracer() tracer init_jaeger(my-service)2.3 追踪代码实现# 追踪装饰器 from opentracing import tags def traced(func): def wrapper(*args, **kwargs): with tracer.start_active_span(func.__name__) as scope: span scope.span span.set_tag(tags.COMPONENT, my-service) span.set_tag(tags.SPAN_KIND, tags.SPAN_KIND_RPC_SERVER) try: result func(*args, **kwargs) span.set_tag(tags.HTTP_STATUS_CODE, 200) return result except Exception as e: span.set_tag(tags.ERROR, True) span.log_kv({error: str(e)}) raise return wrapper traced def process_request(request): # 业务逻辑 pass三、OpenTelemetry集成3.1 OpenTelemetry配置# OpenTelemetry配置 from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.exporter.jaeger.thrift import JaegerExporter def setup_opentelemetry(service_name): trace.set_tracer_provider(TracerProvider()) jaeger_exporter JaegerExporter( collector_endpointhttp://jaeger:14268/api/traces ) trace.get_tracer_provider().add_span_processor( BatchSpanProcessor(jaeger_exporter) ) return trace.get_tracer(service_name) tracer setup_opentelemetry(my-service)3.2 自动仪器化# FastAPI自动仪器化 from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor app FastAPI() # 自动追踪所有端点 FastAPIInstrumentor.instrument_app(app) app.get(/users/{user_id}) async def get_user(user_id: int): return {user_id: user_id, name: John}四、监控指标收集4.1 Prometheus指标# Prometheus指标定义 from prometheus_client import Counter, Histogram, Gauge # 请求计数 REQUEST_COUNT Counter( http_requests_total, Total HTTP requests, [endpoint, method, status_code] ) # 请求延迟 REQUEST_LATENCY Histogram( http_request_duration_seconds, HTTP request duration, [endpoint] ) # 活跃连接数 ACTIVE_CONNECTIONS Gauge( http_active_connections, Number of active connections ) app.middleware(http) async def metrics_middleware(request: Request, call_next): start_time time.time() endpoint request.url.path method request.method try: response await call_next(request) status_code response.status_code except Exception as e: status_code 500 raise finally: REQUEST_COUNT.labels(endpoint, method, status_code).inc() REQUEST_LATENCY.labels(endpoint).observe(time.time() - start_time) return response4.2 自定义指标# 业务指标 from prometheus_client import Summary # 订单处理时间 ORDER_PROCESSING_TIME Summary( order_processing_seconds, Time taken to process an order ) class OrderService: ORDER_PROCESSING_TIME.time() def process_order(self, order_data): # 处理订单逻辑 time.sleep(0.5) return {status: success}五、日志聚合5.1 结构化日志# 结构化日志配置 import logging from pythonjsonlogger import jsonlogger def setup_logging(service_name): logger logging.getLogger(service_name) logger.setLevel(logging.INFO) handler logging.StreamHandler() formatter jsonlogger.JsonFormatter( %(asctime)s %(levelname)s %(service)s %(trace_id)s %(message)s ) handler.setFormatter(formatter) logger.addHandler(handler) return logger logger setup_logging(order-service) # 使用示例 logger.info(Order created, extra{ service: order-service, trace_id: abc123, order_id: order-001 })5.2 ELK Stack配置# filebeat配置 filebeat.inputs: - type: log paths: - /var/log/myapp/*.log json.keys_under_root: true json.add_error_key: true output.elasticsearch: hosts: [elasticsearch:9200] setup.kibana: host: kibana:5601六、可视化仪表盘6.1 Grafana配置# Grafana仪表板生成 class GrafanaDashboardGenerator: def __init__(self): self.dashboard { title: Service Metrics, panels: [] } def add_graph_panel(self, title, query, y_label): panel { title: title, type: graph, targets: [{expr: query}], yaxes: [{label: y_label}] } self.dashboard[panels].append(panel) def add_stat_panel(self, title, query): panel { title: title, type: stat, targets: [{expr: query}] } self.dashboard[panels].append(panel) def generate(self): return self.dashboard # 创建仪表盘 generator GrafanaDashboardGenerator() generator.add_graph_panel(Requests per Second, rate(http_requests_total[5m]), Requests/s) generator.add_graph_panel(Request Latency, avg(http_request_duration_seconds), Seconds) generator.add_stat_panel(Active Connections, http_active_connections) dashboard generator.generate()七、告警与通知7.1 Prometheus告警规则# prometheus-rules.yml groups: - name: service-alerts rules: - alert: HighErrorRate expr: rate(http_requests_total{status_code500}[5m]) 0.1 for: 1m labels: severity: critical annotations: summary: High error rate detected description: Error rate is {{ $value }}/s - alert: HighLatency expr: avg(http_request_duration_seconds) 2 for: 2m labels: severity: warning annotations: summary: High request latency description: Average latency is {{ $value }}s7.2 告警通知# 告警通知处理器 class AlertManager: def __init__(self): self.notifiers [] def add_notifier(self, notifier): self.notifiers.append(notifier) def send_alert(self, alert): for notifier in self.notifiers: notifier.notify(alert) class SlackNotifier: def __init__(self, webhook_url): self.webhook_url webhook_url def notify(self, alert): payload { text: f Alert: {alert[summary]}\n{alert[description]} } requests.post(self.webhook_url, jsonpayload) # 使用示例 alert_manager AlertManager() alert_manager.add_notifier(SlackNotifier(https://hooks.slack.com/...)) alert_manager.send_alert({ summary: High Error Rate, description: Error rate exceeded threshold })八、实战案例完整可观测体系8.1 可观测性集成class ObservabilitySystem: def __init__(self, service_name): # 初始化追踪 self.tracer setup_opentelemetry(service_name) # 初始化日志 self.logger setup_logging(service_name) # 初始化告警 self.alert_manager AlertManager() self.alert_manager.add_notifier(SlackNotifier(https://hooks.slack.com/...)) def trace(self, span_name): return self.tracer.start_as_current_span(span_name) def log(self, level, message, **kwargs): extra {service: my-service, **kwargs} if trace_id in kwargs: extra[trace_id] kwargs[trace_id] getattr(self.logger, level)(message, extraextra) def check_alert(self, condition, alert): if condition: self.alert_manager.send_alert(alert)8.2 端到端追踪示例# 端到端追踪 observability ObservabilitySystem(order-service) def process_order(request): with observability.trace(process_order) as span: observability.log(info, Processing order, order_idrequest[order_id]) try: # 调用用户服务 user call_user_service(request[user_id]) span.set_attribute(user_id, user[id]) # 调用库存服务 inventory call_inventory_service(request[items]) # 调用支付服务 payment call_payment_service(request[amount]) observability.log(info, Order processed successfully) return {status: success} except Exception as e: observability.log(error, fOrder processing failed: {e}) observability.check_alert( True, {summary: Order Processing Failed, description: str(e)} ) raise九、总结与最佳实践9.1 关键要点全链路追踪覆盖所有关键服务和组件统一日志格式使用结构化日志便于查询多维指标收集业务和技术指标智能告警设置合理的告警阈值9.2 常见误区过度追踪追踪过多细节影响性能缺少上下文日志中缺少trace_id难以关联告警过多告警疲劳导致忽略重要问题存储不足追踪数据存储周期过短9.3 未来趋势AI辅助排查利用AI自动分析追踪数据智能采样基于重要性动态调整采样率统一可观测性平台整合追踪、日志、指标参考资料Jaeger官方文档OpenTelemetry官方文档Prometheus官方文档Grafana官方文档

相关新闻