ES + ClickHouse 日志双写实践

Weijie ZhaoDec 17, 202512 min read
记录下最近用 Elasticsearch + ClickHouse 做日志分析和监控的一些折腾。

缘起

之前一直想试试 Elasticsearch 和 ClickHouse 的日志分析,正好手上有些业务系统日志量还挺大,就边学边做把想法落地了。

日志里其实藏着不少有用的信息——时间戳、耗时、交易号之类的。如果能把这些非结构化的文本变成可查询、可统计的数据,那就不只是出问题时才翻的"黑匣子"了。而且即便有 APM,日志监控也有它的价值——APM 偏技术视角(调用链、方法耗时),日志更多承载业务语义(交易码、各环节的状态等),两者是互补的。

下面记录下这次折腾的过程。

先看整体的技术架构。之所以没用 Logstash,主要是日志格式比较杂,Grok 表达式太多的话性能跟不上,消费延迟也会很大。

日志采集

采集这块没什么特别的,用的还是 Filebeat + Kafka 这套经典组合:

  1. Filebeat 部署在各应用服务器,负责收集日志文件
  2. 日志发送到 Kafka 的不同 Topic,按应用区分
  3. 自己写的消费端从 Kafka 拉取消息处理

之所以加一层 Kafka,主要是为了:

  • 削峰填谷:高峰期不会打垮后端存储
  • 解耦:采集和处理可以独立扩展
  • 可靠性:消息持久化,出问题还能重放

字段解析与提取

日志原本就是一堆非结构化文本,要变成可分析的数据,得用正则把关键字段扒出来。

这里定义了一个处理器接口:

go
type Processor interface {
Process(msg *sarama.ConsumerMessage) *Result
Name() string
}
type Result struct {
Success bool
Document *ProcessedDocument
Error error
Retryable bool // 是否可重试
SkipReason string // 跳过原因
}

日志头部解析

大多数 Java 应用的日志长这样:

text
2024-01-01 12:00:00,123 INFO [com.example.Service] 业务处理完成

用正则提取时间戳、日志级别和 Logger:

go
headerRegex := regexp.MustCompile(
`^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2},\d{3})\s+` +
`(DEBUG|INFO|WARN|ERROR)\s+\[([^\]]+)\]\s+(.+)$`)
func (p *Processor) parseHeader(data map[string]interface{}, message string) {
matches := p.headerRegex.FindStringSubmatch(message)
if len(matches) == 5 {
loc, _ := time.LoadLocation("Asia/Shanghai")
ts, err := time.ParseInLocation("2006-01-02 15:04:05,000", matches[1], loc)
if err == nil {
data["@timestamp"] = ts.Format(time.RFC3339Nano)
}
data["log.level"] = strings.ToLower(matches[2])
data["log.logger"] = matches[3]
data["message"] = matches[4]
}
}

业务字段提取

从日志内容里提取交易流水号、用户 ID、耗时这些关键信息:

go
traceIDRegex := regexp.MustCompile(`(?:会话ID|交易流水)[::]?\s*([^,,\s]+)`)
userIDRegex := regexp.MustCompile(`用户ID[::]?\s*([A-Za-z0-9._-]+)`)
fullMSRegex := regexp.MustCompile(`整个交易消耗时间为ms[::]?(\d+)`)
serverMSRegex := regexp.MustCompile(`当前服务器执行耗时\(ms\)[::]?(\d+)`)
func (p *Processor) extractCostMetrics(data map[string]interface{}, message string) {
var fullMS, serverMS int64
if matches := p.fullMSRegex.FindStringSubmatch(message); len(matches) > 1 {
fullMS, _ = strconv.ParseInt(matches[1], 10, 64)
}
if matches := p.serverMSRegex.FindStringSubmatch(message); len(matches) > 1 {
serverMS, _ = strconv.ParseInt(matches[1], 10, 64)
}
if fullMS > 0 {
data["cost_ms"] = fullMS
data["metrics.cost.source"] = "full"
} else if serverMS > 0 {
data["cost_ms"] = serverMS
data["metrics.cost.source"] = "server"
}
}

添加元数据

每条日志还会带上 Kafka 来源信息,排查问题时能用得上:

go
func (p *Processor) addMetadata(data map[string]interface{}, msg *sarama.ConsumerMessage) {
if _, ok := data["@timestamp"]; !ok {
data["@timestamp"] = msg.Timestamp.Format(time.RFC3339Nano)
}
kafkaMeta := map[string]interface{}{
"topic": msg.Topic,
"partition": msg.Partition,
"offset": msg.Offset,
}
data["kafka"] = kafkaMeta
}

双写存储架构

处理后的数据同时写 Elasticsearch 和 ClickHouse,各有各的用处:

存储干嘛用
Elasticsearch日志检索、全文搜索、实时告警
ClickHouse聚合统计、趋势分析、做报表

Elasticsearch 批量写入

用 Bulk API 批量写入,效率高不少:

go
type Writer struct {
client *elastic.Client
processor *elastic.BulkProcessor
logger *zap.Logger
}
func (w *Writer) createBulkProcessor(cfg *ElasticsearchConfig) (*elastic.BulkProcessor, error) {
return w.client.BulkProcessor().
Name("log-bulk-processor").
Workers(cfg.Workers).
BulkActions(cfg.BulkActions).
BulkSize(cfg.BulkSize << 20).
FlushInterval(cfg.FlushInterval).
Before(w.beforeBulk).
After(w.afterBulk).
Do(context.Background())
}
func (w *Writer) Add(doc Document) error {
req := elastic.NewBulkIndexRequest().
Index(doc.Index()).
Doc(doc.Body())
w.processor.Add(req)
return nil
}

ClickHouse 通过 Kafka 中间层写入

ClickHouse 原生支持 Kafka 引擎,这点挺好用的,直接消费 Kafka 数据就行:

go
type Producer struct {
producer sarama.SyncProducer
topic string
enabled bool
}
// 异步发送,不阻塞主流程
func (p *Producer) SendAsync(doc *ProcessedDocument) {
if !p.enabled {
return
}
go func() {
data, _ := json.Marshal(doc.Body())
msg := &sarama.ProducerMessage{
Topic: p.topic,
Value: sarama.ByteEncoder(data),
}
p.producer.SendMessage(msg)
}()
}

ClickHouse 表结构

分布式表 + 物化视图,数据自动入库:

sql
-- 本地存储表
CREATE TABLE logs_local ON CLUSTER 'cluster_name' (
timestamp DateTime64(3) DEFAULT now64(3),
index_day Date MATERIALIZED toDate(timestamp),
message String,
log_level LowCardinality(String) DEFAULT '',
logger String DEFAULT '',
service_name LowCardinality(String) DEFAULT '',
trace_id String DEFAULT '',
transaction_id String DEFAULT '',
user_id String DEFAULT '',
cost_ms Int64 DEFAULT 0,
cost_source LowCardinality(String) DEFAULT '',
kafka_topic LowCardinality(String) DEFAULT '',
kafka_partition UInt32 DEFAULT 0,
kafka_offset UInt64 DEFAULT 0,
raw_json String
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (service_name, timestamp, trace_id)
TTL toDateTime(timestamp) + INTERVAL 30 DAY;
-- Distributed 表
CREATE TABLE logs ON CLUSTER 'cluster_name'
AS logs_local
ENGINE = Distributed('cluster_name', 'db', 'logs_local', rand());
-- Kafka 消费表
CREATE TABLE kafka_logs (
raw String
) ENGINE = Kafka
SETTINGS
kafka_broker_list = 'kafka:9092',
kafka_topic_list = 'processed-logs',
kafka_group_name = 'clickhouse-consumer',
kafka_format = 'RawBLOB',
kafka_num_consumers = 5;
-- 物化视图自动解析
CREATE MATERIALIZED VIEW logs_mv TO logs AS
SELECT
parseDateTime64BestEffortOrZero(
JSONExtractString(raw, '@timestamp'), 3, 'Asia/Shanghai'
) AS timestamp,
JSONExtractString(raw, 'message') AS message,
lower(JSONExtractString(raw, 'log', 'level')) AS log_level,
JSONExtractString(raw, 'service', 'name') AS service_name,
JSONExtractString(raw, 'trace.id') AS trace_id,
JSONExtractInt(raw, 'cost_ms') AS cost_ms,
JSONExtractString(raw, 'metrics', 'cost', 'source') AS cost_source,
raw AS raw_json
FROM kafka_logs;

性能监控与预警

有了结构化的日志数据,就可以搞性能监控和预警了。

Prometheus 指标

消费端暴露了一些关键指标给 Prometheus:

go
var (
KafkaMessagesTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "log_kafka_messages_total",
Help: "Total number of messages consumed from Kafka",
},
[]string{"topic", "consumer"},
)
KafkaLag = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "log_kafka_lag",
Help: "Kafka consumer lag",
},
[]string{"topic", "partition", "consumer"},
)
ProcessingDuration = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "log_processing_duration_seconds",
Help: "Duration of message processing",
Buckets: []float64{.001, .005, .01, .025, .05, .1, .25, .5, 1},
},
[]string{"processor"},
)
)

基于 ES 的告警

利用 ES 的聚合能力检测异常,比如错误率激增:

json
{
"query": {
"bool": {
"filter": [
{ "range": { "@timestamp": { "gte": "now-5m" } } },
{ "term": { "log.level": "error" } }
]
}
},
"aggs": {
"error_count_per_minute": {
"date_histogram": {
"field": "@timestamp",
"calendar_interval": "1m"
}
}
}
}

或者检测响应时间过长:

json
{
"query": {
"bool": {
"filter": [
{ "range": { "@timestamp": { "gte": "now-5m" } } },
{ "range": { "cost_ms": { "gte": 5000 } } }
]
}
},
"aggs": {
"slow_requests_by_transaction": {
"terms": { "field": "transaction.code", "size": 20 },
"aggs": {
"avg_cost": { "avg": { "field": "cost_ms" } }
}
}
}
}

基于 ClickHouse 的性能分析

ClickHouse 的聚合性能确实强,跑这种统计查询很爽:

接口性能 TOP N

sql
SELECT
service_name,
transaction_id AS endpoint,
count() AS request_count,
avg(cost_ms) AS avg_cost_ms,
quantile(0.99)(cost_ms) AS p99_cost_ms,
max(cost_ms) AS max_cost_ms
FROM logs
WHERE timestamp >= now() - INTERVAL 1 HOUR
AND cost_ms > 0
GROUP BY service_name, transaction_id
ORDER BY avg_cost_ms DESC
LIMIT 20;

时间趋势统计

sql
SELECT
toStartOfMinute(timestamp) AS time_bucket,
service_name,
count() AS total_requests,
countIf(log_level = 'error') AS error_count,
avg(cost_ms) AS avg_cost_ms,
quantile(0.95)(cost_ms) AS p95_cost_ms
FROM logs
WHERE timestamp >= now() - INTERVAL 30 MINUTE
GROUP BY time_bucket, service_name
ORDER BY time_bucket, service_name;

慢请求分布

sql
SELECT
multiIf(
cost_ms < 100, '<100ms',
cost_ms < 500, '100-500ms',
cost_ms < 1000, '500ms-1s',
cost_ms < 5000, '1-5s',
'>5s'
) AS cost_bucket,
count() AS request_count,
round(count() * 100.0 / sum(count()) OVER (), 2) AS percentage
FROM logs
WHERE timestamp >= now() - INTERVAL 1 HOUR
AND cost_ms > 0
GROUP BY cost_bucket
ORDER BY
CASE cost_bucket
WHEN '<100ms' THEN 1
WHEN '100-500ms' THEN 2
WHEN '500ms-1s' THEN 3
WHEN '1-5s' THEN 4
ELSE 5
END;

实际效果

跑了一段时间,效果还不错:

能力说明
实时监控Kafka 延迟在秒级,基本能实时感知系统状态
性能分析基于日志里的耗时字段,能分析出接口性能分布
错误追踪通过 trace_id 串联一个请求的完整日志链路
趋势分析ClickHouse 跑大时间跨度的统计也很快
告警预警基于 ES 查询触发告警

日志消费性能

目前部署了 4 个消费节点,跑了一段时间后的实际数据:

指标数值
总吞吐量(峰值)~30K docs/s
ES 写入成功率接近 100%
消息处理延迟 P9520-40ms
搜索引擎入库吞吐10K-20K docs/s
队列积压正常情况下接近 0

从监控看,凌晨时段业务量低,吞吐量降到几千;白天高峰期能到 30K/s,系统也没什么压力。

ClickHouse 性能分析数据

通过 ClickHouse 对日志做统计分析,可以拿到这样的性能概览(某次统计窗口内的数据):

指标数值
总请求量14.6 Million
平均响应时间90.9 ms
P95 响应时间299 ms
P99 响应时间1.12 s
慢接口数1.75 K
错误率0.03%

还能按接口维度拆分,比如某个接口的请求量、平均耗时、P95/P99 延迟一目了然。日志级别的分布趋势(info/warn/error)也能直观地看到波动。

这些数据放以前只能靠人肉翻日志,现在几秒钟就出结果。

一点想法

整个过程其实就是把非结构化的日志变成结构化的数据,然后利用 ES 和 ClickHouse 各自的优势来做不同的事情。核心就几点:

  1. 日志格式要规范:越规范越好解析
  2. 双写架构:ES 负责检索告警,ClickHouse 负责统计分析,各司其职
  3. Kafka 缓冲:解耦采集与处理,也增加了弹性
  4. 指标体系:用 Prometheus 监控整个链路的健康状态

当然这套方案也有局限性,比如日志格式变了就得跟着改解析规则。不过整体来说,边学边做还是挺有收获的。

如果你也在做类似的事情,或者有更好的思路,欢迎交流探讨。