🧟‍♂️ FileBeat 启动假死问题

date
Jun 1, 2021
slug
filebeat-hangs-problem.html
status
Published
tags
best-practice
tech
ELK
k8s
summary
日常工作记录
type
Post

问题

上周因为 OOM 问题,某个集群内的 Filebeat 被迫重启后,观测了许久,仍不见事件流恢复,查看 Filebeat 输出日志,发现只有其自监控的日志:
2021-05-28T03:19:41.061Z	INFO	[monitoring]	log/log.go:145	Non-zero metrics in the last 30s	{"monitoring": {"metrics": {"beat":{"cpu":{"system":{"ticks":6249680,"time":{"ms":3024}},"total":{"ticks":13659880,"time":{"ms":6612},"value":13659880},"user":{"ticks":7410200,"time":{"ms":3588}}},"handles":{"limit":{"hard":1048576,"soft":1048576},"open":46},"info":{"ephemeral_id":"ca641ad8-e10a-496f-a087-6924e456aaea","uptime":{"ms":60180037}},"memstats":{"gc_next":42518272,"memory_alloc":31026880,"memory_total":1715304668248,"rss":-1552384},"runtime":{"goroutines":206}},"filebeat":{"events":{"added":139,"done":139},"harvester":{"open_files":0,"running":0}},"libbeat":{"config":{"module":{"running":0}},"pipeline":{"clients":1,"events":{"active":1,"filtered":139,"total":139},"registrar":{"states":{"current":55084,"update":13},"writes":{"success":139,"total":139}},"system":{"load":{"1":9.21,"15":10.97,"5":10.87,"norm":{"1":0.3838,"15":0.4571,"5":0.4529}}}}}}
"harvester":{"open_files":0,"running":0}} 我们可以判断出 harvester 尚未启动采集,这让我非常疑惑,Filebeat 究竟在做什么?
接着观察日志,发现除开自监控,最后输出一条日志的内容是:
2021-05-28T02:46:39.019Z	INFO	beater/crawler.go:73	Loading Inputs: 1
Loading Inputs 看起来也是符合逻辑的,短时间没有太多的头绪,放下忙其他工作了。
随着时间推移,当我再次观测 Filebeat 时,发现它已经在正常工作了,但是日志内依旧没有错误输出,找到恢复时间点的最早日志:
2021-05-28T05:55:17.822Z	INFO	log/input.go:152	Configured paths: [/data/logs/*/*.log* /data/v3logs/*/*/*.log*]
2021-05-28T05:55:17.822Z	INFO	input/input.go:114	Starting input of type: log; ID: 3062577341473220485
2021-05-28T05:55:17.822Z	INFO	beater/crawler.go:105	Loading and starting Inputs completed. Enabled inputs: 1
好家伙,从 Loading InputsLoading and starting Inputs completed 花费了3个多小时,这更让我疑惑了,Filebeat 究竟为什么一直装死?

原因

根据日志打印翻阅了 Filebeat 源码
https://github.com/elastic/beats/blob/23e4403ae093fcc8f7905345cad2c7ad256976d8/filebeat/beater/crawler.go#L71
https://github.com/elastic/beats/blob/2ee21d95aef89af7f7e7aef8d07f679a24d690b4/filebeat/registrar/registrar.go#L172
Filebeat 使用 registry file 作为采集的状态存储,实际上就是一个纯文本的 JSON 文件。每次启动时都会检查 JSON 文件中的 states 是否需要更新(新增或者删除文件),而当任何一个 state 需要更新, registry file 将会全量序列化(读) → 持久化(写),随着 states 越来越多,JSON 文件会越来越大(接近20MB),每次全量读写都会越来越慢,并且 CPU、内存的使用量都会暴涨。
notion image
 
 
总的来说,在当前的数据存储选型下,Filebeat 无法应对过多的文件数据数量,启动时的数据核验时间过长(几小时→几天不等,视数据量而定),就会产生了“假死”的现象。

解决方案

临时的解决方案

暂停 Filebeat 进程,删除 registry file ,重启 Filebeat 进程。这时候由于 JSON 文件是比较小的,所有 state 均处于增量状态,数据同步是比较快的。但是所有已经发送过的事件将难以避免地重复发送一次,所以这种做法只能应急,不能长久处理。
 

长久的权宜之计

Filebeat 的纯文本的 JSON 存储选型天生就是存在问题的,社区内也曾做过一些小改进的尝试,最终并没有被合并到柱分枝。所以 Filebeat 无法应用过多的日志文件,这是一个短期内无法改变的事实。
 
而在当前选择的依赖背压的采集方案 中,我们并不倾向将日志文件留在采集管道中,而是将日志留在原处——机器的磁盘上,然后尽量保证管道的通畅,将日志实时采集到 ES 中。
这样做的好处,就是避免因为过多的管道传输导致日志丢失(例如 Redis 写满后崩溃)。
同时也会引入另一个问题:如果采集链路阻塞,同时过多的日志(采集条目每日2亿+)大于机器的磁盘承载能力时,日志丢失的风险依旧存在。
所以我们需要定期清理过期的日志,但问题也没那么简单:
  • 清空文件日志内容( echo '' > {}) → 导致文件数量不会减少
  • 如果因为硬盘容量限制,删除日志的周期小于产品许诺的日志保存时长,当链路出现堵塞又未能及时处理 → 导致日志丢失
 
所以我写了一个 删除脚本,在保证清理过期日志的同时,会判断日志文件的句柄使用情况,跳过那些仍在被写入的文件,保证日志不丢失。
# -*- coding: utf-8 -*-
import datetime
import getopt
import glob
import logging
import subprocess
import sys
from collections import namedtuple
from pathlib import Path
from typing import Generator, Dict

logging.basicConfig(level=logging.DEBUG)


Process = namedtuple("Process", "command,pid,user")


def get_processes_open(file_regex: str) -> Dict[str, Process]:
    """Find processes with open handles for the specified file(s)."""

    open_file_process_map = {}

    try:
        # maybe not safe
        output = subprocess.getoutput(f"echo {file_regex} | xargs lsof")
    except Exception:
        logging.exception("exec error")
        return open_file_process_map

    lines = output.split("\n")[1:]
    for line in lines:
        parts = [x for x in line.split(" ") if x]
        open_file_process_map[parts[-1]] = Process(*parts[:3])

    return open_file_process_map


def try_to_delete_files(file_regex: str, minutes: int):
    """Try to delete files"""
    skipped_count = deleted_count = 0

    def filter_files_by_expire_minutes() -> Generator[Path, None, None]:
        """Get all expired files"""
        now = datetime.datetime.now()
        for name in glob.glob(file_regex, recursive=True):
            file_obj = Path(name)
            updated = datetime.datetime.fromtimestamp(file_obj.stat().st_mtime)
            if updated < now - datetime.timedelta(minutes=minutes):
                yield file_obj

    open_file_process_map = get_processes_open(file_regex)
    for f in filter_files_by_expire_minutes():
        if str(f) not in open_file_process_map:
            logging.info(f"deleting {f}")
            try:
                f.unlink()
                deleted_count += 1
            except Exception as e:
                skipped_count += 1
                logging.warning("failed to delete file: %s, for: %s", str(f), e)
                continue
        else:
            skipped_count += 1
            logging.info(f"skipping {f}, for process: {open_file_process_map[str(f)]}")

    print(f"Skipped: {skipped_count}, deleted: {deleted_count}")


if __name__ == "__main__":
    try:
        opts, args = getopt.getopt(sys.argv[1:], "hf:m:", ["files=", "minutes="])
    except getopt.GetoptError:
        print("Example: python delete_files.py -f /data/*/*.log* -m 1440")
        sys.exit(2)

    arg_files = ""
    arg_minutes = 1440
    for opt, arg in opts:
        if opt == "-h":
            print("Usage: python delete_files.py -f /data/*/*.log* -m 1440")
            sys.exit(2)
        elif opt in ("-f", "--files"):
            arg_files = arg
        elif opt in ("-m", "--minutes"):
            arg_minutes = int(arg)

    started = datetime.datetime.now()
    print(f"Started: {started}\n>>>>>>>>>>>>")
    try_to_delete_files(arg_files, arg_minutes)
    finished = datetime.datetime.now()
    print(
        f">>>>>>>>>>>>\nFinished: {finished}, total cost: {(finished - started).total_seconds()} seconds \n"
    )
delete_files.py
当然你也可以根据需求用 Bash 实现,这里就不展开了。(其实就是我不会 Bash)
 
然后我们需要将它跑在集群中的每一个节点上,定期执行清理工作:
首先定义镜像
FROM python:3
RUN apt-get update && apt-get install -y lsof
ADD delete_files.py /
CMD [ "python", "./delete_files.py", "-f", "./*.no", "-m", "1440" ]
 
Kubernetes DaemonSet 定义,在每一个节点上都尝试清理日志:
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: log-cleaner
  namespace: kube-system
  labels:
    k8s-app: log-cleaner
spec:
  selector:
    matchLabels:
      k8s-app: log-cleaner
  template:
    metadata:
      name: log-cleaner
      labels:
        k8s-app: log-cleaner
    spec:
      hostPID: true
      containers:
      - name: batch-delete-files
        image: some-registry/batch-delete-files:v1.0.0
        imagePullPolicy: Always
        command: ["bash"]
        args: ["-c", "while true; do python delete_files.py -f /some-path/*/*/*.log* -m 1440; sleep 3600; done;"]
        resources:
          limits:
            cpu: 2560m
            memory: 256Mi
          requests:
            cpu: 25m
            memory: 32Mi
        volumeMounts:
        - mountPath: /some-path/
          name: some-volume
      volumes:
      - name: some-volume
        hostPath:
          path: /some-path/
值得注意的是,由于我们需要在容器内使用 lsof 来看查看母机文件的 fd 使用情况,所以这里需要额外添加 hostPID: true 来保证能够读取到母机的进程信息。
由于我们需要定时执行,所以通过 while true; do ... sleep 3600; done; 来要额外控制执行周期。
 

结语

由于 Filebeat 存在天生的存储缺陷,我们需要通过额外的脚本较为精确的控制 Filebeat 的输入文件数量,当前的方案断然达不到完善,仍需要我们继续探索。
 
参考:
 

© bluesyu 2019 - 2024

powered by nobelium