开发散文:以开发为主题的散文。基本完整复现了从遇到问题、分析问题、找到解决思路、编码实现思路的全流程,顺带着记录一些浅薄的知识点。

先简述一下需求背景:为了实现可观测性,产品中的一些指标数据需要推送到 Prometheus 聚合网关,出于不想对主体服务有更多干扰的原则,项目选择使用 UDP 协议发送数据(也对聚合网关做了简单的 UDP 数据接收改造)。但是在运行过程中,发现上报数据量偏少,经过日志排查,在数据发送端瞧出了点端倪:

1
error: [Errno 90] Message too long

问题非常简单明晰:单次请求发送的数据包大小超过了网络数据包的上限

那么就有了第一个问题:UDP 协议规定的包大小究竟是多少呢?

UDP 包是多大?

先来看一下 UDP 协议 的包长什么样子:


The UDP protocol header consists of 8 bytes of Protocol Control Information (PCI)

如图所见,UDP 可能算是最简洁的传输层协议之一了,包的组成非常简单易懂:

  • **Source Port:**包来源端口信息。
  • **Destination Port:**包目的端口信息。
  • **UDP Length:**UDP 头信息+ Payload 的总长度。
  • **UDP Checksum:**包的校验和,避免在数据在传输中被污染。

以上四个部分称之为协议控制信息 PCI,又称协议头,每个部分 2 bytes,一共占 8 bytes。

同时由于网络层的 IP 头也需要占据一部分空间,所以在计算 Payload 大小时需要先来看看 IP 层的情况。首先来看 IPv4:

1
2
# IPv4
**0xffff - (sizeof(IP Header) + sizeof(UDP Header)) = 65535-(20+8) = 65507**

总的来说符合预期,在算上 IP 头的情况下,IPv4 UDP 包 Payload 最大不能超过 65507 bytes。

但是在 IPv6 的情况有了一点点复杂:由于 Jumbogram(超大包) 的存在,超过 65535 bytes 的包是可以在调整过 MTU 的节点中传输的,所以理论上的最大体积为:

1
2
# IPv6
**0xffff - sizeof(UDP Header) = 65535-8 = 65527**

也就是在你的基建完全支持 IPv6 并且能够设置超大 MTU 时,IPv6 UDP 包 Payload 包最大不能超过 65527 bytes。

以上分析的都是理论最大值,而在真实传输中,如果想尽可能保证 UDP 包的安全,不得不考虑 MTU 在其中的影响:当数据包大小比 MTU 值大的越多,传输时被切分的段数就越多,由于 UDP 本身协议的不可靠性,数据包的安全性就越低。

而 MTU 的最小值为 576 bytes,在 IPv4 的情况下,减去上面提到的 IP Header 和 UDP Header ,还剩下 548 bytes 。这也是为什么很多公网服务都会限制 UDP 包的最大不能超过 512 bytes (例如 DNS)。同时在 IP Options 存在的情况,IP Header 可能会占据 60 bytes。所以,如果你的 UDP 包想要穿越复杂的公网,最安全的最大值是 508 bytes

考虑当前面对的网络环境——接发双方都处于同一个容器集群内,属于没有太多干扰的私有网络——可以酌情适当加大限制,只要小于 65507 bytes 即可

搞清楚了最大能发送多少数据,自然会想**将超限的数据进行切片处理,**那么迎来了第二个问题:如何在 Python 计算数据大小?

如何在 Python 计算数据大小?

我的第一反应是使用 getsizeof ,虽然稍有经验的 Python 程序员会想到,它对于容器类的对象是没法精准统计的,好在当前场景里只是用来统计 bytes,并没有这个困扰。

但简单测试了一下,却发现了新的疑惑。

getsizeof 的疑惑

1
2
3
4
from sys import getsizeof

getsizeof(b"") 👉 33
getsizeof(b"abcd") 👉 37

有意思的是,就算空 bytes,为什么也有 33 bytes 的空间占用?稍有经验的程序员就能立马反应过来,这就是对象本身的大小,一窥 Python 内置数据类型的大小(64-bit Python 3.6):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Empty
Bytes type scaling notes
28 int +4 bytes about every 30 powers of 2
37 bytes +1 byte per additional byte
49 str +1-4 per additional character (depending on max width)
48 tuple +8 per additional item
64 list +8 for each additional
224 set 5th increases to 736; 21nd, 2272; 85th, 8416; 341, 32992
240 dict 6th increases to 368; 22nd, 1184; 43rd, 2280; 86th, 4704; 171st, 9320
136 func def does not include default args and other attrs
1056 class def no slots
56 class inst has a __dict__ attr, same scaling as dict above
888 class def with slots
16 __slots__ seems to store in mutable tuple-like structure
first slot grows to 48, and so on.

虽然我们在代码中传入的是对象,但实际上传输的数据需要刨除语言附加的存储空间,所以不能通过 getsizeof 拿到的数据本身的大小,这算是一条走错的思路 xD。

简单点, len()

让我们回到问题本身,对于 bytes 数据,最简单统计大小的方式就是直接使用 len()

1
2
big_bytes: bytes
size = len(big_bytes)

确定了如何计算数据大小,下一步的思路就是:如何对 Prometheus 指标数据切片?

如何对 Prometheus 指标数据切片?

正如标题提到的, UDP 传输的是 Prometheus 的指标数据,它是由一个有字面值的字符串转换而来的二进制,它还是字符串的时候一般长这样:

1
2
3
4
5
6
7
8
9
10
# HELP sample_metric_foo_count 示例解释
# TYPE sample_metric_foo_count gauge
sample_metric_foo_count{foo="aa",bar="bb"} 14.0
sample_metric_foo_count{foo="cc",bar="dd"} 27.0
sample_metric_foo_count{foo="ee",bar="ff"} 12.0

# HELP sample_metric_bar_count 示例解释
# TYPE sample_metric_bar_count gauge
sample_metric_bar_count{baz="gg"} 660.0
sample_metric_bar_count{baz="hh"} 660.0

它本身需要符合一定的格式# 开头的两行作为指标信息的元信息,

1
2
# HELP sample_metric_foo_count 示例解释
# TYPE sample_metric_foo_count gauge

可以先看看 Prometheus 是如何解析它的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 以下为摘抄内容
func (p *TextParser) startOfLine() stateFn {
...
switch p.currentByte {
case '#':
return p.startComment
case '\n':
return p.startOfLine // Empty line, start the next one.
}
return p.readingMetricName
}

func (p *TextParser) startComment() stateFn {
...
keyword := p.currentToken.String()
if keyword != "HELP" && keyword != "TYPE" {
// Generic comment, ignore by fast forwarding to end of line.
for p.currentByte != '\n' {
if p.currentByte, p.err = p.buf.ReadByte(); p.err != nil {
return nil // Unexpected end of input.
}
}
return p.startOfLine
}
...
switch keyword {
case "HELP":
return p.readingHelp
case "TYPE":
return p.readingType
}
panic(fmt.Sprintf("code error: unexpected keyword %q", keyword))
}

可以看到:如果直接粗暴地切分会导致接收端无法解析从而丢弃整个请求数据。所以需要针对数据的开头和结尾特征值来决定具体的切分点。

1
2
3
4
5
6
7
8
9
10
11
12
**------ # HELP 作为开头,尽量作为数据包的开头 ------**
**# HELP** sample_metric_foo_count 示例解释
# TYPE sample_metric_foo_count gauge
sample_metric_foo_count{foo="aa",bar="bb"} 14.0
sample_metric_foo_count{foo="cc",bar="dd"} 27.0
sample_metric_foo_count{foo="ee",bar="ff"} 12.0

# HELP sample_metric_bar_count 示例解释
# TYPE sample_metric_bar_count gauge
sample_metric_bar_count{baz="gg"} 660.0**\n**
**------ \n 作为单行数据结尾,能够保证包数据被正常解析,最差情况作为数据包的结尾 ------**
sample_metric_bar_count{baz="hh"} 660.0 **由于缺少 HELP/TYPE 元信息,将无法被聚合处理**

开码!

既然准备好了思路,那么就开始正式编码吧。

首先定义模型

好的模型能够清楚地展示编码思路,为了更方便切分定义了如下模型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@dataclass
class SlicedIndex:
"""数据切分索引"""
start: int
end: int
# 如果不是以 # HELP 开头,会导致 metric 无法被识别合并
valid_start: bool = True
# 如果不是以 metric 内容完结,会让下一个分片包没有有效开头
valid_end: bool = True

def to_tuple(self) -> tuple:
return self.start, self.end

def __len__(self):
return self.end - self.start

@dataclass
class SlicedIndexList:
indexes: List[SlicedIndex] = field(default_factory=list)

def append(self, start: int, end: int, valid_end: bool = True):
valid_start = True
if self.indexes and not self.indexes[-1].valid_end:
valid_start = False

self.indexes.append(SlicedIndex(start, end, valid_start, valid_end))

def __iter__(self):
for elem in self.indexes:
yield elem

def __getitem__(self, ii):
"""Get a list item"""
return self.indexes[ii]

由于网络环境较为良好,这里更倾向于单次尽可能提交足够多的数据而减少网络发送次数。所以尽量贴近最大限制做数据切分,由于每次查找逻辑类同,采用递归会更简单。

递归寻找切分点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def find_sliced_indexes(data: bytes, start: int, udp_package_max_size: int, sliced_index_list: SlicedIndexList):
"""递归找寻指标开头标志"""
if start + udp_package_max_size >= len(data):
sliced_index_list.append(start, len(data))
return

valid_start_index = data.rfind(b"# HELP", start, start + udp_package_max_size)
if valid_start_index == -1 or valid_start_index == start:
# 当某个 metrics 数据大小大于 UDP 协议最大包限制时,原则上我们无法通过 UDP 发送该数据
# 但这里我们尝试以最大限制发送该数据,以一个超过最大限制的数据为例
# +-----------------+ 协议最大长度
# +-----------------+ +-------------------+ +-------------------+
# 有效 有效 (缺失元信息部分丢弃) 有效 有效 有效
# 可以看到中间段数据会被丢弃,因为没有有效的 metrics 开头,但是我们尽可能保证了后续的包是有效开头和有效结尾
# 所以当前的做法更利于我们单 metrics 数据比 65535 稍大的场景,这样丢弃的内容少,保留的内容多
# 反之,当单个 metrics 的数据比最大限制大的越多,丢弃的越多

# sample 肯定会转行,只要有转行就能保证起码前一个包的内容格式有效
valid_end_index = data.rfind(b"\n", start, start + udp_package_max_size)
new_start = valid_end_index + 1
sliced_index_list.append(start, new_start, valid_end=False)
else:
sliced_index_list.append(start, valid_start_index)
new_start = valid_start_index

# 尝试继续向后查找
find_sliced_indexes(data, new_start, udp_package_max_size, sliced_index_list)
return

切!

当拿到了想要切入点后,就可以对原数据进行切分了。这里可以使用 memoryview,节省一次大数据拷贝,切片的速度也会更快一点:

1
2
3
4
5
6
7
8
9
10
11
def slice_metrics_udp_data(data: bytes, sliced_indexes: SlicedIndexList) -> Generator[memoryview, None, None]:
"""拆分 metrics UDP data"""
# Q: 为什么不直接切分?
# A: 直接按照大小切分会降 bytes 中的字面值切断,让服务端对于指标无法理解,所以要按照字面值的内容做切分

# memoryview 无需额外拷贝
mview = memoryview(data)
for index in sliced_indexes:
if not index.valid_start:
logger.warning("data<len:%s> has no valid start, may not be parsed", len(index))
yield mview[index.start : index.end]

收尾

针对一些特殊场景做最后的善后处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def find_udp_data_sliced_indexes(data: bytes, udp_package_max_size: int = 65507, mtu: int = 1500) -> SlicedIndexList:
"""对 UDP 发送数据进行切片处理,保证每次发送成功
:param data: 预发送数据
:param udp_package_max_size: 当前系统支持的最大 UDP 发送包大小,以 bytes 计算,默认为 65535 (在 macOS 下默认为 9126)
:param mtu: Maximum Transmission Unit

udp_package_max_size = 0xffff - (sizeof(IP Header) + sizeof(UDP Header)) = 65535-(20+8) = 65507
ref to: https://en.wikipedia.org/wiki/User_Datagram_Protocol
"""
length = len(data)
if length > mtu:
# TODO: 当前我们暂不考虑处理 MTU 的问题,先解决 UDP 包过大的情况
logger.debug("UDP packages is larger than MTU, not safe for single push.")

if length <= udp_package_max_size:
return SlicedIndexList(indexes=[SlicedIndex(0, length)])

sliced_index_list = SlicedIndexList()
try:
find_sliced_indexes(data, 0, udp_package_max_size, sliced_index_list)
except RecursionError:
logger.warning("data has no valid format, drop it...")

return sliced_index_list

效果

1
2
3
4
5
6
7
8
9
# 原来的 UDP 直接发送
# 将抛出 error: [Errno 90] Message too long
udp_socket.sendto(data, (address, port))

# 循环调用 UDP 请求
for sliced_data in slice_metrics_udp_data(data, find_udp_data_sliced_indexes(data)):
# 所有 UDP 包都可以发送出去
# 同时取决于数据特性,**绝大多数**的 UDP 数据都能在服务端得到解析
udp_socket.sendto(sliced_data, (address, port))

尾声

进一步优化?

正如我们上面实现的逻辑,仍旧无法保证所有数据都能被解析成功,主要的原因就是我们没有对缺失元信息的指标进行处理,而我们在解析其中内容的时候是可以通过 bytes.index() 找到元信息位置的,可以将元信息暂存起来,在 valid_start 时直接补充上去,这样就能保证所有数据都有合适的格式。

Prometheus UDP 数据切分是一个在开发中普通的不能再普通的小问题。在这里写这么多并不是为了展现该问题本身的解决方案,而是用来描述一种完备的开发思路。

参考