记录下最近用 Elasticsearch + ClickHouse 做日志分析和监控的一些折腾。
缘起
之前一直想试试 Elasticsearch 和 ClickHouse 的日志分析,正好手上有些业务系统日志量还挺大,就边学边做把想法落地了。
日志里其实藏着不少有用的信息——时间戳、耗时、交易号之类的。如果能把这些非结构化的文本变成可查询、可统计的数据,那就不只是出问题时才翻的"黑匣子"了。而且即便有 APM,日志监控也有它的价值——APM 偏技术视角(调用链、方法耗时),日志更多承载业务语义(交易码、各环节的状态等),两者是互补的。
下面记录下这次折腾的过程。
先看整体的技术架构。之所以没用 Logstash,主要是日志格式比较杂,Grok 表达式太多的话性能跟不上,消费延迟也会很大。
日志采集
采集这块没什么特别的,用的还是 Filebeat + Kafka 这套经典组合:
- Filebeat 部署在各应用服务器,负责收集日志文件
- 日志发送到 Kafka 的不同 Topic,按应用区分
- 自己写的消费端从 Kafka 拉取消息处理
之所以加一层 Kafka,主要是为了:
- 削峰填谷:高峰期不会打垮后端存储
- 解耦:采集和处理可以独立扩展
- 可靠性:消息持久化,出问题还能重放
字段解析与提取
日志原本就是一堆非结构化文本,要变成可分析的数据,得用正则把关键字段扒出来。
这里定义了一个处理器接口:
type Processor interface {Process(msg *sarama.ConsumerMessage) *ResultName() string}type Result struct {Success boolDocument *ProcessedDocumentError errorRetryable bool // 是否可重试SkipReason string // 跳过原因}
日志头部解析
大多数 Java 应用的日志长这样:
2024-01-01 12:00:00,123 INFO [com.example.Service] 业务处理完成
用正则提取时间戳、日志级别和 Logger:
headerRegex := regexp.MustCompile(`^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2},\d{3})\s+` +`(DEBUG|INFO|WARN|ERROR)\s+\[([^\]]+)\]\s+(.+)$`)func (p *Processor) parseHeader(data map[string]interface{}, message string) {matches := p.headerRegex.FindStringSubmatch(message)if len(matches) == 5 {loc, _ := time.LoadLocation("Asia/Shanghai")ts, err := time.ParseInLocation("2006-01-02 15:04:05,000", matches[1], loc)if err == nil {data["@timestamp"] = ts.Format(time.RFC3339Nano)}data["log.level"] = strings.ToLower(matches[2])data["log.logger"] = matches[3]data["message"] = matches[4]}}
业务字段提取
从日志内容里提取交易流水号、用户 ID、耗时这些关键信息:
traceIDRegex := regexp.MustCompile(`(?:会话ID|交易流水)[::]?\s*([^,,\s]+)`)userIDRegex := regexp.MustCompile(`用户ID[::]?\s*([A-Za-z0-9._-]+)`)fullMSRegex := regexp.MustCompile(`整个交易消耗时间为ms[::]?(\d+)`)serverMSRegex := regexp.MustCompile(`当前服务器执行耗时\(ms\)[::]?(\d+)`)func (p *Processor) extractCostMetrics(data map[string]interface{}, message string) {var fullMS, serverMS int64if matches := p.fullMSRegex.FindStringSubmatch(message); len(matches) > 1 {fullMS, _ = strconv.ParseInt(matches[1], 10, 64)}if matches := p.serverMSRegex.FindStringSubmatch(message); len(matches) > 1 {serverMS, _ = strconv.ParseInt(matches[1], 10, 64)}if fullMS > 0 {data["cost_ms"] = fullMSdata["metrics.cost.source"] = "full"} else if serverMS > 0 {data["cost_ms"] = serverMSdata["metrics.cost.source"] = "server"}}
添加元数据
每条日志还会带上 Kafka 来源信息,排查问题时能用得上:
func (p *Processor) addMetadata(data map[string]interface{}, msg *sarama.ConsumerMessage) {if _, ok := data["@timestamp"]; !ok {data["@timestamp"] = msg.Timestamp.Format(time.RFC3339Nano)}kafkaMeta := map[string]interface{}{"topic": msg.Topic,"partition": msg.Partition,"offset": msg.Offset,}data["kafka"] = kafkaMeta}
双写存储架构
处理后的数据同时写 Elasticsearch 和 ClickHouse,各有各的用处:
| 存储 | 干嘛用 |
|---|---|
| Elasticsearch | 日志检索、全文搜索、实时告警 |
| ClickHouse | 聚合统计、趋势分析、做报表 |
Elasticsearch 批量写入
用 Bulk API 批量写入,效率高不少:
type Writer struct {client *elastic.Clientprocessor *elastic.BulkProcessorlogger *zap.Logger}func (w *Writer) createBulkProcessor(cfg *ElasticsearchConfig) (*elastic.BulkProcessor, error) {return w.client.BulkProcessor().Name("log-bulk-processor").Workers(cfg.Workers).BulkActions(cfg.BulkActions).BulkSize(cfg.BulkSize << 20).FlushInterval(cfg.FlushInterval).Before(w.beforeBulk).After(w.afterBulk).Do(context.Background())}func (w *Writer) Add(doc Document) error {req := elastic.NewBulkIndexRequest().Index(doc.Index()).Doc(doc.Body())w.processor.Add(req)return nil}
ClickHouse 通过 Kafka 中间层写入
ClickHouse 原生支持 Kafka 引擎,这点挺好用的,直接消费 Kafka 数据就行:
type Producer struct {producer sarama.SyncProducertopic stringenabled bool}// 异步发送,不阻塞主流程func (p *Producer) SendAsync(doc *ProcessedDocument) {if !p.enabled {return}go func() {data, _ := json.Marshal(doc.Body())msg := &sarama.ProducerMessage{Topic: p.topic,Value: sarama.ByteEncoder(data),}p.producer.SendMessage(msg)}()}
ClickHouse 表结构
分布式表 + 物化视图,数据自动入库:
-- 本地存储表CREATE TABLE logs_local ON CLUSTER 'cluster_name' (timestamp DateTime64(3) DEFAULT now64(3),index_day Date MATERIALIZED toDate(timestamp),message String,log_level LowCardinality(String) DEFAULT '',logger String DEFAULT '',service_name LowCardinality(String) DEFAULT '',trace_id String DEFAULT '',transaction_id String DEFAULT '',user_id String DEFAULT '',cost_ms Int64 DEFAULT 0,cost_source LowCardinality(String) DEFAULT '',kafka_topic LowCardinality(String) DEFAULT '',kafka_partition UInt32 DEFAULT 0,kafka_offset UInt64 DEFAULT 0,raw_json String) ENGINE = MergeTree()PARTITION BY toYYYYMM(timestamp)ORDER BY (service_name, timestamp, trace_id)TTL toDateTime(timestamp) + INTERVAL 30 DAY;-- Distributed 表CREATE TABLE logs ON CLUSTER 'cluster_name'AS logs_localENGINE = Distributed('cluster_name', 'db', 'logs_local', rand());-- Kafka 消费表CREATE TABLE kafka_logs (raw String) ENGINE = KafkaSETTINGSkafka_broker_list = 'kafka:9092',kafka_topic_list = 'processed-logs',kafka_group_name = 'clickhouse-consumer',kafka_format = 'RawBLOB',kafka_num_consumers = 5;-- 物化视图自动解析CREATE MATERIALIZED VIEW logs_mv TO logs ASSELECTparseDateTime64BestEffortOrZero(JSONExtractString(raw, '@timestamp'), 3, 'Asia/Shanghai') AS timestamp,JSONExtractString(raw, 'message') AS message,lower(JSONExtractString(raw, 'log', 'level')) AS log_level,JSONExtractString(raw, 'service', 'name') AS service_name,JSONExtractString(raw, 'trace.id') AS trace_id,JSONExtractInt(raw, 'cost_ms') AS cost_ms,JSONExtractString(raw, 'metrics', 'cost', 'source') AS cost_source,raw AS raw_jsonFROM kafka_logs;
性能监控与预警
有了结构化的日志数据,就可以搞性能监控和预警了。
Prometheus 指标
消费端暴露了一些关键指标给 Prometheus:
var (KafkaMessagesTotal = promauto.NewCounterVec(prometheus.CounterOpts{Name: "log_kafka_messages_total",Help: "Total number of messages consumed from Kafka",},[]string{"topic", "consumer"},)KafkaLag = promauto.NewGaugeVec(prometheus.GaugeOpts{Name: "log_kafka_lag",Help: "Kafka consumer lag",},[]string{"topic", "partition", "consumer"},)ProcessingDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{Name: "log_processing_duration_seconds",Help: "Duration of message processing",Buckets: []float64{.001, .005, .01, .025, .05, .1, .25, .5, 1},},[]string{"processor"},))
基于 ES 的告警
利用 ES 的聚合能力检测异常,比如错误率激增:
{"query": {"bool": {"filter": [{ "range": { "@timestamp": { "gte": "now-5m" } } },{ "term": { "log.level": "error" } }]}},"aggs": {"error_count_per_minute": {"date_histogram": {"field": "@timestamp","calendar_interval": "1m"}}}}
或者检测响应时间过长:
{"query": {"bool": {"filter": [{ "range": { "@timestamp": { "gte": "now-5m" } } },{ "range": { "cost_ms": { "gte": 5000 } } }]}},"aggs": {"slow_requests_by_transaction": {"terms": { "field": "transaction.code", "size": 20 },"aggs": {"avg_cost": { "avg": { "field": "cost_ms" } }}}}}
基于 ClickHouse 的性能分析
ClickHouse 的聚合性能确实强,跑这种统计查询很爽:
接口性能 TOP N
SELECTservice_name,transaction_id AS endpoint,count() AS request_count,avg(cost_ms) AS avg_cost_ms,quantile(0.99)(cost_ms) AS p99_cost_ms,max(cost_ms) AS max_cost_msFROM logsWHERE timestamp >= now() - INTERVAL 1 HOURAND cost_ms > 0GROUP BY service_name, transaction_idORDER BY avg_cost_ms DESCLIMIT 20;
时间趋势统计
SELECTtoStartOfMinute(timestamp) AS time_bucket,service_name,count() AS total_requests,countIf(log_level = 'error') AS error_count,avg(cost_ms) AS avg_cost_ms,quantile(0.95)(cost_ms) AS p95_cost_msFROM logsWHERE timestamp >= now() - INTERVAL 30 MINUTEGROUP BY time_bucket, service_nameORDER BY time_bucket, service_name;
慢请求分布
SELECTmultiIf(cost_ms < 100, '<100ms',cost_ms < 500, '100-500ms',cost_ms < 1000, '500ms-1s',cost_ms < 5000, '1-5s','>5s') AS cost_bucket,count() AS request_count,round(count() * 100.0 / sum(count()) OVER (), 2) AS percentageFROM logsWHERE timestamp >= now() - INTERVAL 1 HOURAND cost_ms > 0GROUP BY cost_bucketORDER BYCASE cost_bucketWHEN '<100ms' THEN 1WHEN '100-500ms' THEN 2WHEN '500ms-1s' THEN 3WHEN '1-5s' THEN 4ELSE 5END;
实际效果
跑了一段时间,效果还不错:
| 能力 | 说明 |
|---|---|
| 实时监控 | Kafka 延迟在秒级,基本能实时感知系统状态 |
| 性能分析 | 基于日志里的耗时字段,能分析出接口性能分布 |
| 错误追踪 | 通过 trace_id 串联一个请求的完整日志链路 |
| 趋势分析 | ClickHouse 跑大时间跨度的统计也很快 |
| 告警预警 | 基于 ES 查询触发告警 |
日志消费性能
目前部署了 4 个消费节点,跑了一段时间后的实际数据:
| 指标 | 数值 |
|---|---|
| 总吞吐量(峰值) | ~30K docs/s |
| ES 写入成功率 | 接近 100% |
| 消息处理延迟 P95 | 20-40ms |
| 搜索引擎入库吞吐 | 10K-20K docs/s |
| 队列积压 | 正常情况下接近 0 |
从监控看,凌晨时段业务量低,吞吐量降到几千;白天高峰期能到 30K/s,系统也没什么压力。
ClickHouse 性能分析数据
通过 ClickHouse 对日志做统计分析,可以拿到这样的性能概览(某次统计窗口内的数据):
| 指标 | 数值 |
|---|---|
| 总请求量 | 14.6 Million |
| 平均响应时间 | 90.9 ms |
| P95 响应时间 | 299 ms |
| P99 响应时间 | 1.12 s |
| 慢接口数 | 1.75 K |
| 错误率 | 0.03% |
还能按接口维度拆分,比如某个接口的请求量、平均耗时、P95/P99 延迟一目了然。日志级别的分布趋势(info/warn/error)也能直观地看到波动。
这些数据放以前只能靠人肉翻日志,现在几秒钟就出结果。
一点想法
整个过程其实就是把非结构化的日志变成结构化的数据,然后利用 ES 和 ClickHouse 各自的优势来做不同的事情。核心就几点:
- 日志格式要规范:越规范越好解析
- 双写架构:ES 负责检索告警,ClickHouse 负责统计分析,各司其职
- Kafka 缓冲:解耦采集与处理,也增加了弹性
- 指标体系:用 Prometheus 监控整个链路的健康状态
当然这套方案也有局限性,比如日志格式变了就得跟着改解析规则。不过整体来说,边学边做还是挺有收获的。
如果你也在做类似的事情,或者有更好的思路,欢迎交流探讨。