编程

在 PHP 中处理 10 亿行数据!

101 2024-11-15 15:36:00

你可能听说过“十亿行挑战” (1brc),如果你没有听说过,去看看 Gunnar Morlings 的 1brc 仓库

第一种天真的方法

我克隆了存储库,并在 measurements.txt 中创建了十亿行数据集。在那之后,我开始构建我第一个可以解决这一挑战的朴素实现:

<?php

$stations = [];

$fp = fopen('measurements.txt', 'r');

while ($data = fgetcsv($fp, null, ';')) {
    if (!isset($stations[$data[0]])) {
        $stations[$data[0]] = [
            $data[1],
            $data[1],
            $data[1],
            1
        ];
    } else {
        $stations[$data[0]][3] ++;
        $stations[$data[0]][2] += $data[1];
        if ($data[1] < $stations[$data[0]][0]) {
            $stations[$data[0]][0] = $data[1];
        }
        if ($data[1] > $stations[$data[0]][1]) {
            $stations[$data[0]][1] = $data[1];
        }
    }
}

ksort($stations);

echo '{';
foreach($stations as $k=>&$station) {
    $station[2] = $station[2]/$station[3];
    echo $k, '=', $station[0], '/', $station[2], '/', $station[1], ', ';
}
echo '}';

这里没有什么疯狂的东西,只是打开文件,使用 fgetcsv() 读取数据。如果尚未找到该 station,则创建它,否则递增计数器,对温度求和,查看当前温度是否低于或高于最小值或最大值,并相应更新。

一旦我把所有东西都放在一起,我就使用 ksort()$stations 数组按顺序排列,然后 echo 打印出列表并计算平均温度(求和/计数)。

在我的笔记本电脑上运行这个简单的代码需要 25 分钟🤯

是时候优化并查看分析器了:

时间线可视化让我看到,这显然是 CPU 极限的,脚本开头的文件编译可以忽略不计,也没有垃圾回收事件。

Flame graph(火焰图)视图也有助于显示在 fgetcsv() 上消耗了 46% 的 CPU 时间。

使用 fgets() 而不是 fgetcsv()

第一个优化是使用 fgets() 获取一行并使用 ; 字符手动拆分,而不是依赖 fgetcsv()。这是因为 fgetcsv() 所做的比我需要的多得多。

// ...
while ($data = fgets($fp, 999)) {
    $pos = strpos($data, ';');
    $city = substr($data, 0, $pos);
    $temp = substr($data, $pos+1, -1);
// ...

此外,我将 $data[0] 重构为 $city,将 $data[1] 重构为 $temp

仅通过这一更改再次运行脚本,运行时间已降至 19 分 49 秒。从绝对数字来看,这仍然是一个很大的数字,但也是:下降了 21%

火焰图反映了变化,切换到按行显示 CPU 时间也揭示了 root 帧中发生的事情:

18 行和 23 行消耗了 ~38% CPU 时间:

18 | $stations[$city][3] ++;
   | // ...
23 | if ($temp > $stations[$city][1]) {

第 18 行是循环中 $stations 数组的第一个入口,否则它只是一个自增量,第 23 行是一个比较,乍一看没有什么耗费性能的东西,但让我们再做一些优化,你会看到这里花了多少时间。

在可能的情况下使用引用

$station = &$stations[$city];
$station[3] ++;
$station[2] += $temp;
// instead of
$stations[$city][3] ++;
$stations[$city][2] += $data[1];

这应该有助于 PHP 在每次访问数组时不在 $stations 数组中搜索数组键(key),而是将其视为访问数组中“当前” station 的缓存。
这实际上很有帮助,运行这个只需要 17 分 48 秒,又下降了 10%

只有一个比较

在查看代码时,我偶然发现了这段代码:

if ($temp < $station[0]) {
    $station[0] = $temp;
}
if ($temp > $station[1]) {
    $station[1] = $temp;
}

如果温度低于最低温度,它就不能再高于最高温度,所以我把它做成了一个 elseif,也许可以节省一些 CPU 周期。

顺便说一句:我对 measurements.txt 中的温度顺序一无所知,但根据顺序,如果我先检查其中一个,可能会有所不同。

新版本需要 17 分 30 秒,又下降了 ~2%。比纯粹抖动好,但不会差太多。

添加强制类型转换

PHP 被称为动态语言,当我刚开始编写软件时,我非常重视它,少了一个需要关心的问题。但另一方面,知道类型有助于引擎在运行代码时做出更好的决策。

$temp = (float)substr($data, $pos+1, -1);

你猜怎么着?这个简单的类型转换使脚本运行时间变为 13分 32秒性能提高了 21%

18 | $station = &$stations[$city];
   | // ...
23 | } elseif ($temp > $station[1]) {

第 18 行仍然显示了 11% 的 CPU 时间,这是对数组的访问(在哈希映射中查找键,这是 PHP 中用于关联数组的底层数据结构)。

第 23 行的 CPU 时间从约 32% 下降到约 15%。这是由于 PHP 不再进行类型篡改。在类型转换之前,$temp /$station[0] / $station[1]是字符串,因此 PHP 必须将它们转换为浮点数,以便在每次比较时进行比较。

那么 JIT 呢?

PHP 中的 OPCache 在 CLI 中默认禁用,需要将 opache.enable_cli 设置设置为 on。JIT(作为 OPCache 的一部分)默认启用,但由于缓冲区大小设置为 0,因此实际上被禁用,所以我将 opcache.jit-buffer-size设置为某个值,我只使用了 10M。应用这些更改后,我使用 JIT 重新运行了脚本,并看到它完成情况为:

7分 19秒 🚀

这就少消耗了 45.9% 时间!!

What more?

我已经将运行时间从开始时的 25 分钟缩短到了大约 7 分钟。有一件事让我感到非常惊讶,那就是 fgets() 为读取 13GB 的文件分配了约 56 GiB/m 的 RAM。似乎不太对劲,所以我检查了 fgets() 的实现,看起来我可以通过省略 fget()len 参数来节省很多这些分配:

while ($data = fgets($fp)) {
// instead of
while ($data = fgets($fp, 999)) {

比较更改前后的分析,可以得出以下结果:

你可能会认为这会带来很大的性能提升,但这只有 1% 左右。这是因为这些是 ZendMM 可以在 bins 处理的小分配,而且速度非常快。

还能更快吗?

当然!到目前为止,我的方法是单线程的,这是大多数 PHP 软件的本质,但 PHP 确实通过 Parallel 并行扩展支持用户端的线程。

正如分析器清楚地显示的那样,在 PHP 中读取数据是一个瓶颈。从 fgetcsv() 切换到 fgets() 并手动拆分会有所帮助,但这仍然需要很多时间,因此让我们使用线程并行读取和处理数据,然后再组合工作线程的中间结果。

<?php

$file = 'measurements.txt';

$threads_cnt = 16;

/**
 * Get the chunks that each thread needs to process with start and end position.
 * These positions are aligned to \n chars because we use `fgets()` to read
 * which itself reads till a \n character.
 *
 * @return array<int, array{0: int, 1: int}>
 */
function get_file_chunks(string $file, int $cpu_count): array {
    $size = filesize($file);

    if ($cpu_count == 1) {
        $chunk_size = $size;
    } else {
        $chunk_size = (int) ($size / $cpu_count);
    }

    $fp = fopen($file, 'rb');

    $chunks = [];
    $chunk_start = 0;
    while ($chunk_start < $size) {
        $chunk_end = min($size, $chunk_start + $chunk_size);

        if ($chunk_end < $size) {
            fseek($fp, $chunk_end);
            fgets($fp); // moves fp to next \n char
            $chunk_end = ftell($fp);
        }

        $chunks[] = [
            $chunk_start,
            $chunk_end
        ];

        $chunk_start = $chunk_end;
    }

    fclose($fp);
    return $chunks;
}

/**
 * This function will open the file passed in `$file` and read and process the
 * data from `$chunk_start` to `$chunk_end`.
 *
 * The returned array has the name of the city as the key and an array as the
 * value, containing the min temp in key 0, the max temp in key 1, the sum of
 * all temperatures in key 2 and count of temperatures in key 3.
 *
 * @return array<string, array{0: float, 1: float, 2: float, 3: int}>
 */ 
$process_chunk = function (string $file, int $chunk_start, int $chunk_end): array {
    $stations = [];
    $fp = fopen($file, 'rb');
    fseek($fp, $chunk_start);
    while ($data = fgets($fp)) {
        $chunk_start += strlen($data);
        if ($chunk_start > $chunk_end) {
            break;
        }
        $pos2 = strpos($data, ';');
        $city = substr($data, 0, $pos2);
        $temp = (float)substr($data, $pos2+1, -1);
        if (isset($stations[$city])) {
            $station = &$stations[$city];
            $station[3] ++;
            $station[2] += $temp;
            if ($temp < $station[0]) {
                $station[0] = $temp;
            } elseif ($temp > $station[1]) {
                $station[1] = $temp;
            }
        } else {
            $stations[$city] = [
                $temp,
                $temp,
                $temp,
                1
            ];
        }
    }
    return $stations;
};

$chunks = get_file_chunks($file, $threads_cnt);

$futures = [];

for ($i = 0; $i < $threads_cnt; $i++) {
    $runtime = new \parallel\Runtime();
    $futures[$i] = $runtime->run(
        $process_chunk,
        [
            $file,
            $chunks[$i][0],
            $chunks[$i][1]
        ]
    );
}

$results = [];

for ($i = 0; $i < $threads_cnt; $i++) {
    // `value()` blocks until a result is available, so the main thread waits
    // for the thread to finish
    $chunk_result = $futures[$i]->value();
    foreach ($chunk_result as $city => $measurement) {
        if (isset($results[$city])) {
            $result = &$results[$city];
            $result[2] += $measurement[2];
            $result[3] += $measurement[3];
            if ($measurement[0] < $result[0]) {
                $result[0] = $measurement[0];
            }
            if ($measurement[1] < $result[1]) {
                $result[1] = $measurement[1];
            }
        } else {
            $results[$city] = $measurement;
        }
    }
}

ksort($results);

echo '{', PHP_EOL;
foreach($results as $k=>&$station) {
    echo "\t", $k, '=', $station[0], '/', ($station[2]/$station[3]), '/', $station[1], ',', PHP_EOL;
}
echo '}', PHP_EOL;

这段代码做了一些事,首先扫描文件并将其拆分为以 \n 对齐的块(因为我稍后将使用 fgets() )。准备好块后,启动 $threads_cnt 工作线程,然后所有这些线程都打开同一个文件,寻找分配给它们的块开始,读取并处理数据,直到块结束,返回一个中间结果,然后在主线程中进行组合、排序和打印。

这个多线程方案的完成时间只耗费了:

1 分 35秒 🚀

这就是最后结果了吗?

当然不是。该方案至少还有两个问题:

  1. 我在苹果 Silicon 硬件上的 MacOS 上运行这段代码,在 ZTS 版本的 PHP 中使用 JIT 时,它崩溃了,所以 1 分 35秒的结果是没有使用 JIT,如果我能使用它,可能会更快
  2. 我意识到,由于我日常工作的需要,我运行的是一个使用 CFLAGS=“-g-O0…” 编译的PHP版本

我本应在一开始就检查一下这个的,所以我使用 CFLAGS=“-Os…” 重新编译了 PHP 8.3,我的最终数字(有 16 个线程)是:

🚀 27.7 秒 🚀

这个数字与你在原始挑战的排行榜上看到的数字根本无法相提并论,这是因为我在完全不同的硬件上运行了这段代码。

这是一个包含 10 个线程的时间线视图:

底部的线程是主线程,等待工作线程的结果。一旦这些 worker 返回了他们的中间结果,就可以看到主线程正在对所有内容进行组合和排序。我们也可以清楚地看到,主线程绝不是瓶颈。如果你想进一步优化这一点,请专注于工作线程。

这一路来学到了什么?

每个抽象层只是用可用性/集成换取 CPU 周期或内存。fgetcsv() 非常易于使用,隐藏了很多东西,但这是有代价的。甚至 fgets() 也对我们隐藏了一些东西,但使读取数据变得超级方便。

在代码中添加类型将有助于语言优化执行,或者停止类型篡改,这是你看不到的,但仍然需要用 CPU 周期来支付。

JIT 非常棒,尤其是在处理 CPU 密集型问题时!

这绝对不是大多数 PHP 软件的本质,但由于并行化(使用 ext-parallel),我们可以显著降低数字。haiyou 

还有性能提升空间吗

删除 isset()

我们或许不需要检查 isset(),可以只创建 station 的引用,而当该 station 不存在时,它的值为 NULL。也就是说,如果城市(city)确实存在(这是大多数情况),则保留一个数组访问。

# before
if (isset($stations[$city])) {
    $station = &$stations[$city];
// ..

# after
$station = &$stations[$city];
if ($station !== NULL) {
// ..

这节省了大约 2.5% 的时间!

不检查 fgets() 返回值

当前读取文件的主循环如下:

while ($data = fgets($fp)) {
    $chunk_start += strlen($data);
    if ($chunk_start > $chunk_end) {
        break;
    }
// ..

移到并行版本后,对 $chunk_start > $chunk_end 进行了额外的检查,因为每个线程只处理文件的一部分。不需要再检查 fgets() 返回值,因为只要我们在 $chunk_start$chunk_end 之间,它就会始终返回一个字符串,这意味着我们可以将其作为循环中的表达式,不加检查地读取。

while ($chunk_start < $chunk_end) {
    $data = fgets($fp);
    $chunk_start += strlen($data);
// ..

此更改将从循环中删除一个分支,并导致时间再次下降约 2.7%!

fgets() vs. stream_get_line()

$city$temp 中的实际读取和存储如下:

$data = fgets($fp);
$chunk_start += strlen($data);
$pos2 = strpos($data, ';');
$city = substr($data, 0, $pos2);
$temp = (float)substr($data, $pos2+1, -1);

我以前从未听说过 stream_get_line(),它的行为与 fgets() 几乎完全相同,不同的是它允许指定行尾分隔符!

$city = stream_get_line($fp, 99, ';');
$temp = stream_get_line($fp, 99, "\n");
$chunk_start += strlen($city) + strlen($temp) + 2;
$temp = (float)$temp;

这一变化又节省了约 15% 的时间!

为什么会这样?fgets()stream_get_line() 的实现非常接近,两者都使用 PHP 流层。主要的不同是,我们不再需要使用 substr() 将字符串(来自 fgets() )拆分为子字符串。额外的 strlen() 调用可以忽略不计,因为 PHP 中的变量是 zval 的底层变量,它保存了字符串的长度。

那么,这个 PHP 脚本的等待时间在是多少呢?
在 Hetzner 的 AX161 上运行的基准测试。

最后结果是:12.76 秒 🎉

结束了吗?

我不知道,也许这里还有一些需要优化的地方,但在设法将 83% 的时间花在 stream_get_line() 函数上之后,看起来我们已经实现了 PHP 流层所允许的。

要么我们找到另一个绕过 PHP 流层并提供对文件系统更直接访问的函数,要么我们尝试优化该层本身。

 

PHP