问题

上周因为 OOM 问题,某个集群内的 Filebeat 被迫重启后,观测了许久,仍不见事件流恢复,查看 Filebeat 输出日志,发现只有其自监控的日志:

1
2021-05-28T03:19:41.061Z	INFO	[monitoring]	log/log.go:145	Non-zero metrics in the last 30s	{"monitoring": {"metrics": {"beat":{"cpu":{"system":{"ticks":6249680,"time":{"ms":3024}},"total":{"ticks":13659880,"time":{"ms":6612},"value":13659880},"user":{"ticks":7410200,"time":{"ms":3588}}},"handles":{"limit":{"hard":1048576,"soft":1048576},"open":46},"info":{"ephemeral_id":"ca641ad8-e10a-496f-a087-6924e456aaea","uptime":{"ms":60180037}},"memstats":{"gc_next":42518272,"memory_alloc":31026880,"memory_total":1715304668248,"rss":-1552384},"runtime":{"goroutines":206}},"filebeat":{"events":{"added":139,"done":139},**"harvester":{"open_files":0,"running":0}}**,"libbeat":{"config":{"module":{"running":0}},"pipeline":{"clients":1,"events":{"active":1,"filtered":139,"total":139},"registrar":{"states":{"current":55084,"update":13},"writes":{"success":139,"total":139}},"system":{"load":{"1":9.21,"15":10.97,"5":10.87,"norm":{"1":0.3838,"15":0.4571,"5":0.4529}}}}}}

"harvester":{"open_files":0,"running":0}} 我们可以判断出 harvester 尚未启动采集,这让我非常疑惑,Filebeat 究竟在做什么?

接着观察日志,发现除开自监控,最后输出一条日志的内容是:

1
2021-05-28T02:46:39.019Z	INFO	beater/crawler.go:73	Loading Inputs: 1

Loading Inputs 看起来也是符合逻辑的,短时间没有太多的头绪,放下忙其他工作了。

随着时间推移,当我再次观测 Filebeat 时,发现它已经在正常工作了,但是日志内依旧没有错误输出,找到恢复时间点的最早日志:

1
2
3
2021-05-28T05:55:17.822Z	INFO	log/input.go:152	Configured paths: [/data/logs/*/*.log* /data/v3logs/*/*/*.log*]
2021-05-28T05:55:17.822Z INFO input/input.go:114 Starting input of type: log; ID: 3062577341473220485
2021-05-28T05:55:17.822Z INFO beater/crawler.go:105 Loading and starting Inputs completed. Enabled inputs: 1

好家伙,从 Loading InputsLoading and starting Inputs completed 花费了3个多小时,这更让我疑惑了,Filebeat 究竟为什么一直装死?

原因

根据日志打印翻阅了 Filebeat 源码


https://github.com/elastic/beats/blob/23e4403ae093fcc8f7905345cad2c7ad256976d8/filebeat/beater/crawler.go#L71


https://github.com/elastic/beats/blob/2ee21d95aef89af7f7e7aef8d07f679a24d690b4/filebeat/registrar/registrar.go#L172

Filebeat 使用 registry file 作为采集的状态存储,实际上就是一个纯文本的 JSON 文件。每次启动时都会检查 JSON 文件中的 states 是否需要更新(新增或者删除文件),而当任何一个 state 需要更新, registry file 将会全量序列化(读) → 持久化(写),随着 states 越来越多,JSON 文件会越来越大(接近20MB),每次全量读写都会越来越慢,并且 CPU、内存的使用量都会暴涨。

总的来说,在当前的数据存储选型下,Filebeat 无法应对过多的文件数据数量,启动时的数据核验时间过长(几小时→几天不等,视数据量而定),就会产生了“假死”的现象。

解决方案

临时的解决方案

暂停 Filebeat 进程,删除 registry file ,重启 Filebeat 进程。这时候由于 JSON 文件是比较小的,所有 state 均处于增量状态,数据同步是比较快的。但是所有已经发送过的事件将难以**避免地重复发送一次,**所以这种做法只能应急,不能长久处理。

长久的权宜之计

Filebeat 的纯文本的 JSON 存储选型天生就是存在问题的,社区内也曾做过一些小改进的尝试,最终并没有被合并到柱分枝。所以 Filebeat 无法应用过多的日志文件,这是一个短期内无法改变的事实。

而在当前选择的依赖背压的采集方案 中,我们并不倾向将日志文件留在采集管道中,而是将日志留在原处——机器的磁盘上,然后尽量保证管道的通畅,将日志实时采集到 ES 中。

这样做的好处,就是避免因为过多的管道传输导致日志丢失(例如 Redis 写满后崩溃)。

同时也会引入另一个问题:如果采集链路阻塞,同时过多的日志(采集条目每日2亿+)大于机器的磁盘承载能力时,日志丢失的风险依旧存在。

所以我们需要定期清理过期的日志,但问题也没那么简单:

  • 直接删除日志文件 → 写日志的应用进程无法感知,将向无效文件句柄写日志 → 导致日志丢失
  • 清空文件日志内容( echo '' > {}) → 导致文件数量不会减少
  • 如果因为硬盘容量限制,删除日志的周期小于产品许诺的日志保存时长,当链路出现堵塞又未能及时处理 → 导致日志丢失

所以我写了一个 删除脚本,在保证清理过期日志的同时,会判断日志文件的句柄使用情况,跳过那些仍在被写入的文件,保证日志不丢失。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# -*- coding: utf-8 -*-
import datetime
import getopt
import glob
import logging
import subprocess
import sys
from collections import namedtuple
from pathlib import Path
from typing import Generator, Dict

logging.basicConfig(level=logging.DEBUG)

Process = namedtuple("Process", "command,pid,user")

def get_processes_open(file_regex: str) -> Dict[str, Process]:
"""Find processes with open handles for the specified file(s)."""

open_file_process_map = {}

try:
# maybe not safe
output = subprocess.getoutput(f"echo {file_regex} | xargs lsof")
except Exception:
logging.exception("exec error")
return open_file_process_map

lines = output.split("\n")[1:]
for line in lines:
parts = [x for x in line.split(" ") if x]
open_file_process_map[parts[-1]] = Process(*parts[:3])

return open_file_process_map

def try_to_delete_files(file_regex: str, minutes: int):
"""Try to delete files"""
skipped_count = deleted_count = 0

def filter_files_by_expire_minutes() -> Generator[Path, None, None]:
"""Get all expired files"""
now = datetime.datetime.now()
for name in glob.glob(file_regex, recursive=True):
file_obj = Path(name)
updated = datetime.datetime.fromtimestamp(file_obj.stat().st_mtime)
if updated < now - datetime.timedelta(minutes=minutes):
yield file_obj

open_file_process_map = get_processes_open(file_regex)
for f in filter_files_by_expire_minutes():
if str(f) not in open_file_process_map:
logging.info(f"deleting {f}")
try:
f.unlink()
deleted_count += 1
except Exception as e:
skipped_count += 1
logging.warning("failed to delete file: %s, for: %s", str(f), e)
continue
else:
skipped_count += 1
logging.info(f"skipping {f}, for process: {open_file_process_map[str(f)]}")

print(f"Skipped: {skipped_count}, deleted: {deleted_count}")

if __name__ == "__main__":
try:
opts, args = getopt.getopt(sys.argv[1:], "hf:m:", ["files=", "minutes="])
except getopt.GetoptError:
print("Example: python delete_files.py -f /data/*/*.log* -m 1440")
sys.exit(2)

arg_files = ""
arg_minutes = 1440
for opt, arg in opts:
if opt == "-h":
print("Usage: python delete_files.py -f /data/*/*.log* -m 1440")
sys.exit(2)
elif opt in ("-f", "--files"):
arg_files = arg
elif opt in ("-m", "--minutes"):
arg_minutes = int(arg)

started = datetime.datetime.now()
print(f"Started: {started}\n>>>>>>>>>>>>")
try_to_delete_files(arg_files, arg_minutes)
finished = datetime.datetime.now()
print(
f">>>>>>>>>>>>\nFinished: {finished}, total cost: {(finished - started).total_seconds()} seconds \n"
)

当然你也可以根据需求用 Bash 实现,这里就不展开了。(其实就是我不会 Bash)

然后我们需要将它跑在集群中的每一个节点上,定期执行清理工作:

首先定义镜像

1
2
3
4
FROM python:3
RUN apt-get update && apt-get install -y lsof
ADD delete_files.py /
CMD [ "python", "./delete_files.py", "-f", "./*.no", "-m", "1440" ]

Kubernetes DaemonSet 定义,在每一个节点上都尝试清理日志:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: log-cleaner
namespace: kube-system
labels:
k8s-app: log-cleaner
spec:
selector:
matchLabels:
k8s-app: log-cleaner
template:
metadata:
name: log-cleaner
labels:
k8s-app: log-cleaner
spec:
hostPID: true
containers:
- name: batch-delete-files
image: some-registry/batch-delete-files:v1.0.0
imagePullPolicy: Always
command: ["bash"]
args: ["-c", "while true; do python delete_files.py -f /some-path/*/*/*.log* -m 1440; sleep 3600; done;"]
resources:
limits:
cpu: 2560m
memory: 256Mi
requests:
cpu: 25m
memory: 32Mi
volumeMounts:
- mountPath: /some-path/
name: some-volume
volumes:
- name: some-volume
hostPath:
path: /some-path/

值得注意的是,由于我们需要在容器内使用 lsof 来看查看母机文件的 fd 使用情况,所以这里需要额外添加 hostPID: true 来保证能够读取到母机的进程信息。

由于我们需要定时执行,所以通过 while true; do ... sleep 3600; done; 来要额外控制执行周期。

结语

由于 Filebeat 存在天生的存储缺陷,我们需要通过额外的脚本较为精确的控制 Filebeat 的输入文件数量,当前的方案断然达不到完善,仍需要我们继续探索。

参考: