Pipeline 配置
Pipeline 是 GreptimeDB 中对 log 数据进行解析和转换的一种机制, 由一个唯一的名称和一组配置规则组成,这些规则定义了如何对日志数据进行格式化、拆分和转换。目前我们支持 JSON(application/json)和纯文本(text/plain)格式的日志数据作为输入。
这些配置以 YAML 格式提供,使得 Pipeline 能够在日志写入过程中,根据设定的规则对数据进行处理,并将处理后的数据存储到数据库中,便于后续的结构化查询。
整体结构
Pipeline 由两部分组成:Processors 和 Transform,这两部分均为数组形式。一个 Pipeline 配置可以包含多个 Processor 和多个 Transform。Transform 所描述的数据类型会决定日志数据保存到数据库时的表结构。
- Processor 用于对 log 数据进行预处理,例如解析时间字段,替换字段等。
- Transform 用于对数据进行格式转换,例如将字符串类型转换为数字类型。
一个包含 Processor 和 Transform 的简单配置示例如下:
processors:
- urlencoding:
fields:
- string_field_a
- string_field_b
method: decode
ignore_missing: true
transform:
- fields:
- string_field_a
- string_field_b
type: string
# 写入的数据必须包含 timestamp 字段
- fields:
- reqTimeSec, req_time_sec
# epoch 是特殊字段类型,必须指定精度
type: epoch, ms
index: timestamp
Processor
Processor 用于对 log 数据进行预处理,其配置位于 YAML 文件中的 processors 字段下。
Pipeline 会按照多个 Processor 的顺序依次加工数据,每个 Processor 都依赖于上一个 Processor 处理的结果。
Processor 由一个 name 和多个配置组成,不同类型的 Processor 配置有不同的字段。
我们目前内置了以下几种 Processor:
date: 解析格式化的时间字符串字段,例如2024-07-12T16:18:53.048。epoch: 解析数字时间戳字段,例如1720772378893。dissect: 对 log 数据字段进行拆分。gsub: 对 log 数据字段进行替换。join: 对 log 中的 array 类型字段进行合并。letter: 对 log 数据字段进行字母转换。regex: 对 log 数据字段进行正则匹配。urlencoding: 对 log 数据字段进行 URL 编解码。csv: 对 log 数据字段进行 CSV 解析。
date
date Processor 用于解析时间字段。示例配置如下:
processors:
- date:
fields:
- time
formats:
- '%Y-%m-%d %H:%M:%S%.3f'
ignore_missing: true
timezone: 'Asia/Shanghai'
如上所示,date Processor 的配置包含以下字段:
fields: 需要解析的时间字段名列表。formats: 时间格式化字符串,支持多个时间格式化字符串。按照提供的顺序尝试解析,直到解析成功。你可以在这里找到格式化的语法说明。ignore_missing: 忽略字段不存在的情况。默认为false。如果字段不存在,并且此配置为 false,则会抛出异常。timezone: 时区。使用tz_database 中的时区标识符来指定时区。默认为UTC。
epoch
epoch Processor 用于解析时间戳字段,示例配置如下:
processors:
- epoch:
fields:
- reqTimeSec
resolution: millisecond
ignore_missing: true
如上所示,epoch Processor 的配置包含以下字段:
fields: 需要解析的时间戳字段名列表。resolution: 时间戳精度,支持s,sec,second,ms,millisecond,milli,us,microsecond,micro,ns,nanosecond,nano。默认为ms。ignore_missing: 忽略字段不存在的情况。默认为false。如果字段不存在,并且此配置为 false,则会抛出异常。
dissect
dissect Processor 用于对 log 数据字段进行拆分,示例配置如下:
processors:
- dissect:
fields:
- message
patterns:
- '%{key1} %{key2}'
ignore_missing: true
append_separator: '-'
如上所示,dissect Processor 的配置包含以下字段:
fields: 需要拆分的字段名列表。patterns: 拆分的 dissect 模式。ignore_missing: 忽略字段不存在的情况。默认为false。如果字段不存在,并且此配置为 false,则会抛出异常。append_separator: 对于多个追加到一起的字段,指定连接符。默认是一个空字符串。
Dissect 模式
和 Logstash 的 Dissect 模式类似,Dissect 模式由 %{key} 组成,其中 %{key} 为一个字段名。例如:
"%{key1} %{key2} %{+key3} %{+key4/2} %{key5->} %{?key6}"
Dissect 修饰符
Dissect 模式支持以下修饰符:
| 修饰符 | 说明 | 示例 |
|---|---|---|
+ | 将两个或多个字段追加到一起 | %{+key} %{+key} |
+ 和 /n | 按照指定的顺序将两个或多个字段追加到一起 | %{+key/2} %{+key/1} |
-> | 忽略右侧的任何重复字符 | %{key1->} %{key2->} |
? | 忽略匹配的值 | %{?key} |
dissect 示例
例如,对于以下 log 数据:
"key1 key2 key3 key4 key5 key6"
使用以下 Dissect 模式:
"%{key1} %{key2} %{+key3} %{+key3/2} %{key5->} %{?key6}"
将得到以下结果:
{
"key1": "key1",
"key2": "key2",
"key3": "key3 key4",
"key5": "key5"
}
gsub
gsub Processor 用于对 log 数据字段进行替换,示例配置如下:
processors:
- gsub:
fields:
- message
pattern: 'old'
replacement: 'new'
ignore_missing: true
如上所示,gsub Processor 的配置包含以下字段:
fields: 需要替换的字段名列表。pattern: 需要替换的字符串。支持正则表达式。replacement: 替换后的字符串。ignore_missing: 忽略字段不存在的情况。默认为false。如果字段不存在,并且此配置为 false,则会抛出异常。
join
join Processor 用于对 log 中的 Array 类型字段进行合并,示例配置如下:
processors:
- join:
fields:
- message
separator: ','
ignore_missing: true