在 PHP 中处理 10 亿行数据!
你可能听说过“十亿行挑战” (1brc),如果你没有听说过,去看看 Gunnar Morlings 的 1brc 仓库。
第一种天真的方法
我克隆了存储库,并在 measurements.txt
中创建了十亿行数据集。在那之后,我开始构建我第一个可以解决这一挑战的朴素实现:
<?php
$stations = [];
$fp = fopen('measurements.txt', 'r');
while ($data = fgetcsv($fp, null, ';')) {
if (!isset($stations[$data[0]])) {
$stations[$data[0]] = [
$data[1],
$data[1],
$data[1],
1
];
} else {
$stations[$data[0]][3] ++;
$stations[$data[0]][2] += $data[1];
if ($data[1] < $stations[$data[0]][0]) {
$stations[$data[0]][0] = $data[1];
}
if ($data[1] > $stations[$data[0]][1]) {
$stations[$data[0]][1] = $data[1];
}
}
}
ksort($stations);
echo '{';
foreach($stations as $k=>&$station) {
$station[2] = $station[2]/$station[3];
echo $k, '=', $station[0], '/', $station[2], '/', $station[1], ', ';
}
echo '}';
这里没有什么疯狂的东西,只是打开文件,使用 fgetcsv()
读取数据。如果尚未找到该 station,则创建它,否则递增计数器,对温度求和,查看当前温度是否低于或高于最小值或最大值,并相应更新。
一旦我把所有东西都放在一起,我就使用 ksort()
将 $stations
数组按顺序排列,然后 echo 打印出列表并计算平均温度(求和/计数)。
在我的笔记本电脑上运行这个简单的代码需要 25 分钟🤯
是时候优化并查看分析器了:
时间线可视化让我看到,这显然是 CPU 极限的,脚本开头的文件编译可以忽略不计,也没有垃圾回收事件。
Flame graph(火焰图)视图也有助于显示在 fgetcsv()
上消耗了 46% 的 CPU 时间。
使用 fgets()
而不是 fgetcsv()
第一个优化是使用 fgets()
获取一行并使用 ;
字符手动拆分,而不是依赖 fgetcsv()
。这是因为 fgetcsv()
所做的比我需要的多得多。
// ...
while ($data = fgets($fp, 999)) {
$pos = strpos($data, ';');
$city = substr($data, 0, $pos);
$temp = substr($data, $pos+1, -1);
// ...
此外,我将 $data[0]
重构为 $city
,将 $data[1]
重构为 $temp
。
仅通过这一更改再次运行脚本,运行时间已降至 19 分 49 秒。从绝对数字来看,这仍然是一个很大的数字,但也是:下降了 21%!
火焰图反映了变化,切换到按行显示 CPU 时间也揭示了 root 帧中发生的事情:
18 行和 23 行消耗了 ~38% CPU 时间:
18 | $stations[$city][3] ++;
| // ...
23 | if ($temp > $stations[$city][1]) {
第 18 行是循环中 $stations
数组的第一个入口,否则它只是一个自增量,第 23 行是一个比较,乍一看没有什么耗费性能的东西,但让我们再做一些优化,你会看到这里花了多少时间。
在可能的情况下使用引用
$station = &$stations[$city];
$station[3] ++;
$station[2] += $temp;
// instead of
$stations[$city][3] ++;
$stations[$city][2] += $data[1];
这应该有助于 PHP 在每次访问数组时不在 $stations
数组中搜索数组键(key),而是将其视为访问数组中“当前” station 的缓存。
这实际上很有帮助,运行这个只需要 17 分 48 秒,又下降了 10%!
只有一个比较
在查看代码时,我偶然发现了这段代码:
if ($temp < $station[0]) {
$station[0] = $temp;
}
if ($temp > $station[1]) {
$station[1] = $temp;
}
如果温度低于最低温度,它就不能再高于最高温度,所以我把它做成了一个 elseif
,也许可以节省一些 CPU 周期。
顺便说一句:我对 measurements.txt
中的温度顺序一无所知,但根据顺序,如果我先检查其中一个,可能会有所不同。
新版本需要 17 分 30 秒,又下降了 ~2%。比纯粹抖动好,但不会差太多。
添加强制类型转换
PHP 被称为动态语言,当我刚开始编写软件时,我非常重视它,少了一个需要关心的问题。但另一方面,知道类型有助于引擎在运行代码时做出更好的决策。
$temp = (float)substr($data, $pos+1, -1);
你猜怎么着?这个简单的类型转换使脚本运行时间变为 13分 32秒,性能提高了 21%!
18 | $station = &$stations[$city];
| // ...
23 | } elseif ($temp > $station[1]) {
第 18 行仍然显示了 11% 的 CPU 时间,这是对数组的访问(在哈希映射中查找键,这是 PHP 中用于关联数组的底层数据结构)。
第 23 行的 CPU 时间从约 32% 下降到约 15%。这是由于 PHP 不再进行类型篡改。在类型转换之前,$temp
/$station[0]
/ $station[1]
是字符串,因此 PHP 必须将它们转换为浮点数,以便在每次比较时进行比较。
那么 JIT 呢?
PHP 中的 OPCache 在 CLI 中默认禁用,需要将 opache.enable_cli
设置设置为 on
。JIT(作为 OPCache 的一部分)默认启用,但由于缓冲区大小设置为 0,因此实际上被禁用,所以我将 opcache.jit-buffer-size
设置为某个值,我只使用了 10M
。应用这些更改后,我使用 JIT 重新运行了脚本,并看到它完成情况为:
7分 19秒 🚀
这就少消耗了 45.9% 时间!!
What more?
我已经将运行时间从开始时的 25 分钟缩短到了大约 7 分钟。有一件事让我感到非常惊讶,那就是 fgets()
为读取 13GB 的文件分配了约 56 GiB/m 的 RAM。似乎不太对劲,所以我检查了 fgets()
的实现,看起来我可以通过省略 fget()
的 len
参数来节省很多这些分配:
while ($data = fgets($fp)) {
// instead of
while ($data = fgets($fp, 999)) {
比较更改前后的分析,可以得出以下结果:
你可能会认为这会带来很大的性能提升,但这只有 1% 左右。这是因为这些是 ZendMM 可以在 bins 处理的小分配,而且速度非常快。
还能更快吗?
当然!到目前为止,我的方法是单线程的,这是大多数 PHP 软件的本质,但 PHP 确实通过 Parallel 并行扩展支持用户端的线程。
正如分析器清楚地显示的那样,在 PHP 中读取数据是一个瓶颈。从 fgetcsv()
切换到 fgets()
并手动拆分会有所帮助,但这仍然需要很多时间,因此让我们使用线程并行读取和处理数据,然后再组合工作线程的中间结果。
<?php
$file = 'measurements.txt';
$threads_cnt = 16;
/**
* Get the chunks that each thread needs to process with start and end position.
* These positions are aligned to \n chars because we use `fgets()` to read
* which itself reads till a \n character.
*
* @return array<int, array{0: int, 1: int}>
*/
function get_file_chunks(string $file, int $cpu_count): array {
$size = filesize($file);
if ($cpu_count == 1) {
$chunk_size = $size;
} else {
$chunk_size = (int) ($size / $cpu_count);
}
$fp = fopen($file, 'rb');
$chunks = [];
$chunk_start = 0;
while ($chunk_start < $size) {
$chunk_end = min($size, $chunk_start + $chunk_size);
if ($chunk_end < $size) {
fseek($fp, $chunk_end);
fgets($fp); // moves fp to next \n char
$chunk_end = ftell($fp);
}
$chunks[] = [
$chunk_start,
$chunk_end
];
$chunk_start = $chunk_end;
}
fclose($fp);
return $chunks;
}
/**
* This function will open the file passed in `$file` and read and process the
* data from `$chunk_start` to `$chunk_end`.
*
* The returned array has the name of the city as the key and an array as the
* value, containing the min temp in key 0, the max temp in key 1, the sum of
* all temperatures in key 2 and count of temperatures in key 3.
*
* @return array<string, array{0: float, 1: float, 2: float, 3: int}>
*/
$process_chunk = function (string $file, int $chunk_start, int $chunk_end): array {
$stations = [];
$fp = fopen($file, 'rb');
fseek($fp, $chunk_start);
while ($data = fgets($fp)) {
$chunk_start += strlen($data);
if ($chunk_start > $chunk_end) {
break;
}
$pos2 = strpos($data, ';');
$city = substr($data, 0, $pos2);
$temp = (float)substr($data, $pos2+1, -1);
if (isset($stations[$city])) {
$station = &$stations[$city];
$station[3] ++;
$station[2] += $temp;
if ($temp < $station[0]) {
$station[0] = $temp;
} elseif ($temp > $station[1]) {
$station[1] = $temp;
}
} else {
$stations[$city] = [
$temp,
$temp,
$temp,
1
];
}
}
return $stations;
};
$chunks = get_file_chunks($file, $threads_cnt);
$futures = [];
for ($i = 0; $i < $threads_cnt; $i++) {
$runtime = new \parallel\Runtime();
$futures[$i] = $runtime->run(
$process_chunk,
[
$file,
$chunks[$i][0],
$chunks[$i][1]
]
);
}
$results = [];
for ($i = 0; $i < $threads_cnt; $i++) {
// `value()` blocks until a result is available, so the main thread waits
// for the thread to finish
$chunk_result = $futures[$i]->value();
foreach ($chunk_result as $city => $measurement) {
if (isset($results[$city])) {
$result = &$results[$city];
$result[2] += $measurement[2];
$result[3] += $measurement[3];
if ($measurement[0] < $result[0]) {
$result[0] = $measurement[0];
}
if ($measurement[1] < $result[1]) {
$result[1] = $measurement[1];
}
} else {
$results[$city] = $measurement;
}
}
}
ksort($results);
echo '{', PHP_EOL;
foreach($results as $k=>&$station) {
echo "\t", $k, '=', $station[0], '/', ($station[2]/$station[3]), '/', $station[1], ',', PHP_EOL;
}
echo '}', PHP_EOL;
这段代码做了一些事,首先扫描文件并将其拆分为以 \n
对齐的块(因为我稍后将使用 fgets()
)。准备好块后,启动 $threads_cnt
工作线程,然后所有这些线程都打开同一个文件,寻找分配给它们的块开始,读取并处理数据,直到块结束,返回一个中间结果,然后在主线程中进行组合、排序和打印。
这个多线程方案的完成时间只耗费了:
1 分 35秒 🚀
这就是最后结果了吗?
当然不是。该方案至少还有两个问题:
- 我在苹果 Silicon 硬件上的 MacOS 上运行这段代码,在 ZTS 版本的 PHP 中使用 JIT 时,它崩溃了,所以 1 分 35秒的结果是没有使用 JIT,如果我能使用它,可能会更快
- 我意识到,由于我日常工作的需要,我运行的是一个使用
CFLAGS=“-g-O0…”
编译的PHP版本
我本应在一开始就检查一下这个的,所以我使用 CFLAGS=“-Os…”
重新编译了 PHP 8.3,我的最终数字(有 16 个线程)是:
🚀 27.7 秒 🚀
这个数字与你在原始挑战的排行榜上看到的数字根本无法相提并论,这是因为我在完全不同的硬件上运行了这段代码。
这是一个包含 10 个线程的时间线视图:
底部的线程是主线程,等待工作线程的结果。一旦这些 worker 返回了他们的中间结果,就可以看到主线程正在对所有内容进行组合和排序。我们也可以清楚地看到,主线程绝不是瓶颈。如果你想进一步优化这一点,请专注于工作线程。
这一路来学到了什么?
每个抽象层只是用可用性/集成换取 CPU 周期或内存。fgetcsv()
非常易于使用,隐藏了很多东西,但这是有代价的。甚至 fgets()
也对我们隐藏了一些东西,但使读取数据变得超级方便。
在代码中添加类型将有助于语言优化执行,或者停止类型篡改,这是你看不到的,但仍然需要用 CPU 周期来支付。
JIT 非常棒,尤其是在处理 CPU 密集型问题时!
这绝对不是大多数 PHP 软件的本质,但由于并行化(使用 ext-parallel
),我们可以显著降低数字。haiyou
还有性能提升空间吗
删除 isset()
我们或许不需要检查 isset()
,可以只创建 station 的引用,而当该 station 不存在时,它的值为 NULL
。也就是说,如果城市(city)确实存在(这是大多数情况),则保留一个数组访问。
# before
if (isset($stations[$city])) {
$station = &$stations[$city];
// ..
# after
$station = &$stations[$city];
if ($station !== NULL) {
// ..
这节省了大约 2.5% 的时间!
不检查 fgets()
返回值
当前读取文件的主循环如下:
while ($data = fgets($fp)) {
$chunk_start += strlen($data);
if ($chunk_start > $chunk_end) {
break;
}
// ..
移到并行版本后,对 $chunk_start > $chunk_end
进行了额外的检查,因为每个线程只处理文件的一部分。不需要再检查 fgets()
返回值,因为只要我们在 $chunk_start
和 $chunk_end
之间,它就会始终返回一个字符串,这意味着我们可以将其作为循环中的表达式,不加检查地读取。
while ($chunk_start < $chunk_end) {
$data = fgets($fp);
$chunk_start += strlen($data);
// ..
此更改将从循环中删除一个分支,并导致时间再次下降约 2.7%!
fgets()
vs. stream_get_line()
在 $city
和 $temp
中的实际读取和存储如下:
$data = fgets($fp);
$chunk_start += strlen($data);
$pos2 = strpos($data, ';');
$city = substr($data, 0, $pos2);
$temp = (float)substr($data, $pos2+1, -1);
我以前从未听说过 stream_get_line()
,它的行为与 fgets()
几乎完全相同,不同的是它允许指定行尾分隔符!
$city = stream_get_line($fp, 99, ';');
$temp = stream_get_line($fp, 99, "\n");
$chunk_start += strlen($city) + strlen($temp) + 2;
$temp = (float)$temp;
这一变化又节省了约 15% 的时间!
为什么会这样?fgets()
和 stream_get_line()
的实现非常接近,两者都使用 PHP 流层。主要的不同是,我们不再需要使用 substr()
将字符串(来自 fgets()
)拆分为子字符串。额外的 strlen()
调用可以忽略不计,因为 PHP 中的变量是 zval 的底层变量,它保存了字符串的长度。
那么,这个 PHP 脚本的等待时间在是多少呢?
在 Hetzner 的 AX161 上运行的基准测试。
最后结果是:12.76 秒 🎉
结束了吗?
我不知道,也许这里还有一些需要优化的地方,但在设法将 83% 的时间花在 stream_get_line()
函数上之后,看起来我们已经实现了 PHP 流层所允许的。
要么我们找到另一个绕过 PHP 流层并提供对文件系统更直接访问的函数,要么我们尝试优化该层本身。