🖋️ 开发散文:使用 Python 以 UDP 协议发送 Prometheus 指标数据
date
Nov 11, 2022
slug
python-udp-prom-aggregation-gateway
status
Published
tags
python
tech
prometheus
udp
summary
形散神不散,记录开发思路
type
Post
开发散文:以开发为主题的散文。基本完整复现了从遇到问题、分析问题、找到解决思路、编码实现思路的全流程,顺带着记录一些浅薄的知识点。
UDP 包是多大?如何在 Python 计算数据大小?getsizeof 的疑惑简单点, len()如何对 Prometheus 指标数据切片?开码!首先定义模型递归寻找切分点切!收尾效果尾声进一步优化?参考
先简述一下需求背景:为了实现可观测性,产品中的一些指标数据需要推送到 Prometheus 聚合网关,出于不想对主体服务有更多干扰的原则,项目选择使用 UDP 协议发送数据(也对聚合网关做了简单的 UDP 数据接收改造)。但是在运行过程中,发现上报数据量偏少,经过日志排查,在数据发送端瞧出了点端倪:
error: [Errno 90] Message too long
问题非常简单明晰:单次请求发送的数据包大小超过了网络数据包的上限。
那么就有了第一个问题:UDP 协议规定的包大小究竟是多少呢?
UDP 包是多大?
先来看一下 UDP 协议 的包长什么样子:
如图所见,UDP 可能算是最简洁的传输层协议之一了,包的组成非常简单易懂:
- Source Port:包来源端口信息。
- Destination Port:包目的端口信息。
- UDP Length:UDP 头信息+ Payload 的总长度。
- UDP Checksum:包的校验和,避免在数据在传输中被污染。
以上四个部分称之为协议控制信息 PCI,又称协议头,每个部分 2 bytes,一共占 8 bytes。
同时由于网络层的 IP 头也需要占据一部分空间,所以在计算 Payload 大小时需要先来看看 IP 层的情况。首先来看 IPv4:
# IPv4
0xffff - (sizeof(IP Header) + sizeof(UDP Header)) = 65535-(20+8) = 65507
总的来说符合预期,在算上 IP 头的情况下,IPv4 UDP 包 Payload 最大不能超过 65507 bytes。
# IPv6
0xffff - sizeof(UDP Header) = 65535-8 = 65527
也就是在你的基建完全支持 IPv6 并且能够设置超大 MTU 时,IPv6 UDP 包 Payload 包最大不能超过 65527 bytes。
注意:不同操作系统限值也可能会不同,例如 macOS 下最大包默认值为 9216 bytes。本文仅从协议角度讨论。
以上分析的都是理论最大值,而在真实传输中,如果想尽可能保证 UDP 包的安全,不得不考虑 MTU 在其中的影响:当数据包大小比 MTU 值大的越多,传输时被切分的段数就越多,由于 UDP 本身协议的不可靠性,数据包的安全性就越低。
而 MTU 的最小值为 576 bytes,在 IPv4 的情况下,减去上面提到的 IP Header 和 UDP Header ,还剩下 548 bytes 。这也是为什么很多公网服务都会限制 UDP 包的最大不能超过 512 bytes (例如 DNS)。同时在 IP Options 存在的情况,IP Header 可能会占据 60 bytes。所以,如果你的 UDP 包想要穿越复杂的公网,最安全的最大值是 508 bytes。
考虑当前面对的网络环境——接发双方都处于同一个容器集群内,属于没有太多干扰的私有网络——可以酌情适当加大限制,只要小于 65507 bytes 即可。
搞清楚了最大能发送多少数据,自然会想将超限的数据进行切片处理,那么迎来了第二个问题:如何在 Python 计算数据大小?
如何在 Python 计算数据大小?
我的第一反应是使用
getsizeof
,虽然稍有经验的 Python 程序员会想到,它对于容器类的对象是没法精准统计的,好在当前场景里只是用来统计 bytes,并没有这个困扰。但简单测试了一下,却发现了新的疑惑。
getsizeof
的疑惑
from sys import getsizeof
getsizeof(b"") 👉 33
getsizeof(b"abcd") 👉 37
有意思的是,就算空 bytes,为什么也有 33 bytes 的空间占用?稍有经验的程序员就能立马反应过来,这就是对象本身的大小,一窥 Python 内置数据类型的大小(64-bit Python 3.6):
Empty
Bytes type scaling notes
28 int +4 bytes about every 30 powers of 2
37 bytes +1 byte per additional byte
49 str +1-4 per additional character (depending on max width)
48 tuple +8 per additional item
64 list +8 for each additional
224 set 5th increases to 736; 21nd, 2272; 85th, 8416; 341, 32992
240 dict 6th increases to 368; 22nd, 1184; 43rd, 2280; 86th, 4704; 171st, 9320
136 func def does not include default args and other attrs
1056 class def no slots
56 class inst has a __dict__ attr, same scaling as dict above
888 class def with slots
16 __slots__ seems to store in mutable tuple-like structure
first slot grows to 48, and so on.
虽然我们在代码中传入的是对象,但实际上传输的数据需要刨除语言附加的存储空间,所以不能通过
getsizeof
拿到的数据本身的大小,这算是一条走错的思路 xD。简单点, len()
让我们回到问题本身,对于 bytes 数据,最简单统计大小的方式就是直接使用
len()
。big_bytes: bytes
size = len(big_bytes)
确定了如何计算数据大小,下一步的思路就是:如何对 Prometheus 指标数据切片?
如何对 Prometheus 指标数据切片?
正如标题提到的, UDP 传输的是 Prometheus 的指标数据,它是由一个有字面值的字符串转换而来的二进制,它还是字符串的时候一般长这样:
# HELP sample_metric_foo_count 示例解释
# TYPE sample_metric_foo_count gauge
sample_metric_foo_count{foo="aa",bar="bb"} 14.0
sample_metric_foo_count{foo="cc",bar="dd"} 27.0
sample_metric_foo_count{foo="ee",bar="ff"} 12.0
# HELP sample_metric_bar_count 示例解释
# TYPE sample_metric_bar_count gauge
sample_metric_bar_count{baz="gg"} 660.0
sample_metric_bar_count{baz="hh"} 660.0
它本身需要符合一定的格式,
#
开头的两行作为指标信息的元信息,# HELP sample_metric_foo_count 示例解释
# TYPE sample_metric_foo_count gauge
可以先看看 Prometheus 是如何解析它的:
// 以下为摘抄内容
func (p *TextParser) startOfLine() stateFn {
...
switch p.currentByte {
case '#':
return p.startComment
case '\n':
return p.startOfLine // Empty line, start the next one.
}
return p.readingMetricName
}
func (p *TextParser) startComment() stateFn {
...
keyword := p.currentToken.String()
if keyword != "HELP" && keyword != "TYPE" {
// Generic comment, ignore by fast forwarding to end of line.
for p.currentByte != '\n' {
if p.currentByte, p.err = p.buf.ReadByte(); p.err != nil {
return nil // Unexpected end of input.
}
}
return p.startOfLine
}
...
switch keyword {
case "HELP":
return p.readingHelp
case "TYPE":
return p.readingType
}
panic(fmt.Sprintf("code error: unexpected keyword %q", keyword))
}
可以看到:如果直接粗暴地切分会导致接收端无法解析从而丢弃整个请求数据。所以需要针对数据的开头和结尾特征值来决定具体的切分点。
------ # HELP 作为开头,尽量作为数据包的开头 ------
# HELP sample_metric_foo_count 示例解释
# TYPE sample_metric_foo_count gauge
sample_metric_foo_count{foo="aa",bar="bb"} 14.0
sample_metric_foo_count{foo="cc",bar="dd"} 27.0
sample_metric_foo_count{foo="ee",bar="ff"} 12.0
# HELP sample_metric_bar_count 示例解释
# TYPE sample_metric_bar_count gauge
sample_metric_bar_count{baz="gg"} 660.0\n
------ \n 作为单行数据结尾,能够保证包数据被正常解析,最差情况作为数据包的结尾 ------
sample_metric_bar_count{baz="hh"} 660.0 由于缺少 HELP/TYPE 元信息,将无法被聚合处理
开码!
既然准备好了思路,那么就开始正式编码吧。
首先定义模型
好的模型能够清楚地展示编码思路,为了更方便切分定义了如下模型:
@dataclass
class SlicedIndex:
"""数据切分索引"""
start: int
end: int
# 如果不是以 # HELP 开头,会导致 metric 无法被识别合并
valid_start: bool = True
# 如果不是以 metric 内容完结,会让下一个分片包没有有效开头
valid_end: bool = True
def to_tuple(self) -> tuple:
return self.start, self.end
def __len__(self):
return self.end - self.start
@dataclass
class SlicedIndexList:
indexes: List[SlicedIndex] = field(default_factory=list)
def append(self, start: int, end: int, valid_end: bool = True):
valid_start = True
if self.indexes and not self.indexes[-1].valid_end:
valid_start = False
self.indexes.append(SlicedIndex(start, end, valid_start, valid_end))
def __iter__(self):
for elem in self.indexes:
yield elem
def __getitem__(self, ii):
"""Get a list item"""
return self.indexes[ii]
由于网络环境较为良好,这里更倾向于单次尽可能提交足够多的数据而减少网络发送次数。所以尽量贴近最大限制做数据切分,由于每次查找逻辑类同,采用递归会更简单。
递归寻找切分点
def find_sliced_indexes(data: bytes, start: int, udp_package_max_size: int, sliced_index_list: SlicedIndexList):
"""递归找寻指标开头标志"""
if start + udp_package_max_size >= len(data):
sliced_index_list.append(start, len(data))
return
valid_start_index = data.rfind(b"# HELP", start, start + udp_package_max_size)
if valid_start_index == -1 or valid_start_index == start:
# 当某个 metrics 数据大小大于 UDP 协议最大包限制时,原则上我们无法通过 UDP 发送该数据
# 但这里我们尝试以最大限制发送该数据,以一个超过最大限制的数据为例
# +-----------------+ 协议最大长度
# +-----------------+ +-------------------+ +-------------------+
# 有效 有效 (缺失元信息部分丢弃) 有效 有效 有效
# 可以看到中间段数据会被丢弃,因为没有有效的 metrics 开头,但是我们尽可能保证了后续的包是有效开头和有效结尾
# 所以当前的做法更利于我们单 metrics 数据比 65535 稍大的场景,这样丢弃的内容少,保留的内容多
# 反之,当单个 metrics 的数据比最大限制大的越多,丢弃的越多
# sample 肯定会转行,只要有转行就能保证起码前一个包的内容格式有效
valid_end_index = data.rfind(b"\n", start, start + udp_package_max_size)
new_start = valid_end_index + 1
sliced_index_list.append(start, new_start, valid_end=False)
else:
sliced_index_list.append(start, valid_start_index)
new_start = valid_start_index
# 尝试继续向后查找
find_sliced_indexes(data, new_start, udp_package_max_size, sliced_index_list)
return
切!
当拿到了想要切入点后,就可以对原数据进行切分了。这里可以使用 memoryview,节省一次大数据拷贝,切片的速度也会更快一点:
def slice_metrics_udp_data(data: bytes, sliced_indexes: SlicedIndexList) -> Generator[memoryview, None, None]:
"""拆分 metrics UDP data"""
# Q: 为什么不直接切分?
# A: 直接按照大小切分会降 bytes 中的字面值切断,让服务端对于指标无法理解,所以要按照字面值的内容做切分
# memoryview 无需额外拷贝
mview = memoryview(data)
for index in sliced_indexes:
if not index.valid_start:
logger.warning("data<len:%s> has no valid start, may not be parsed", len(index))
yield mview[index.start : index.end]
收尾
针对一些特殊场景做最后的善后处理
def find_udp_data_sliced_indexes(data: bytes, udp_package_max_size: int = 65507, mtu: int = 1500) -> SlicedIndexList:
"""对 UDP 发送数据进行切片处理,保证每次发送成功
:param data: 预发送数据
:param udp_package_max_size: 当前系统支持的最大 UDP 发送包大小,以 bytes 计算,默认为 65535 (在 macOS 下默认为 9126)
:param mtu: Maximum Transmission Unit
udp_package_max_size = 0xffff - (sizeof(IP Header) + sizeof(UDP Header)) = 65535-(20+8) = 65507
ref to: https://en.wikipedia.org/wiki/User_Datagram_Protocol
"""
length = len(data)
if length > mtu:
# TODO: 当前我们暂不考虑处理 MTU 的问题,先解决 UDP 包过大的情况
logger.debug("UDP packages is larger than MTU, not safe for single push.")
if length <= udp_package_max_size:
return SlicedIndexList(indexes=[SlicedIndex(0, length)])
sliced_index_list = SlicedIndexList()
try:
find_sliced_indexes(data, 0, udp_package_max_size, sliced_index_list)
except RecursionError:
logger.warning("data has no valid format, drop it...")
return sliced_index_list
效果
# 原来的 UDP 直接发送
# 将抛出 error: [Errno 90] Message too long
udp_socket.sendto(data, (address, port))
# 循环调用 UDP 请求
for sliced_data in slice_metrics_udp_data(data, find_udp_data_sliced_indexes(data)):
# 所有 UDP 包都可以发送出去
# 同时取决于数据特性,绝大多数的 UDP 数据都能在服务端得到解析
udp_socket.sendto(sliced_data, (address, port))
尾声
进一步优化?
正如我们上面实现的逻辑,仍旧无法保证所有数据都能被解析成功,主要的原因就是我们没有对缺失元信息的指标进行处理,而我们在解析其中内容的时候是可以通过
bytes.index()
找到元信息位置的,可以将元信息暂存起来,在 valid_start
时直接补充上去,这样就能保证所有数据都有合适的格式。Prometheus UDP 数据切分是一个在开发中普通的不能再普通的小问题。在这里写这么多并不是为了展现该问题本身的解决方案,而是用来描述一种完备的开发思路。