编程

使用 GoLang 协程处理 Laravel 队列任务

1572 2023-08-08 13:12:00

Laravel 包含一个优秀的队列组件,它允许我们将耗时的任务委派给后台进程。它使我们能够更快地响应请求,从而处理更多的请求,从而扩展我们的 web 服务。

队列进程,无论优化得多么好,一次只能处理一个作业。这意味着我们需要更多的进程来同时处理更多的工作。

每个工作进程都是一个 PHP 进程,每个进程都通过拥有自己的内存空间而与其他进程隔离。如果我们启动过多的进程,它们可能会消耗我们机器的所有内存,导致一切都变慢。此外,管理过多的进程会给操作系统和CPU带来压力。

基于 web 的应用程序中的大多数工作负载都是 I/O 密集型的,这意味着进程在等待 I/O 事件的大部分时间都处于空闲状态。即使进程处于空闲状态,它也会继续占用内存空间和操作系统管理工作负载的一部分。

如果我们能利用进程空闲的时间做更多的工作,那不是很好吗?我们不需要以这种方式启动那么多进程,因为单个进程可以在等待 I/O 事件时处理其他任务。

这就是所谓的并发。单个进程将在多个任务之间切换,并同时处理所有任务。当任务处于等待状态时,进程会搜索要处理的其他任务。

可以使用多线程或协程来实现这种并发性。PHP不支持多线程,但它通过Swoole PHP扩展支持协程。

在我的一个 Laravel 项目中,我有一类任务,每天排队5000万到6000万次。这项工作很简单:它从S3中读取一个文件,对其进行处理,并将输出发送到外部端点。

处理此任务的 PHP 进程几乎总是空闲的,等待来自 S3 的数据和来自外部端点的响应。这是一个使用并发可能有利的理想任务。

尽管 Swoole 提供了协程并发,但我选择利用我所了解的 Go 来创建一个作为守护进程运行并同时处理所有这些作业的二进制文件。二进制文件没有依赖项,是轻量级的,并且可以通过简单的 scp 命令部署到服务器。

任务

以下是我们需要执行的任务的伪代码:

$messages = $sqs->getMessages();

foreach ($messages as $message){
    $key = $message->extractKey();

    $object = $s3->getObject($key);

    $requestBody = someSimpleProcessing($object);

    Http::post('https://...', $requestBody);

    $sqs->deleteMessage($message['ReceiptHandle']);
}

我们将消息从 SQS 队列中出列,然后遍历每条消息,提取 S3 对象密钥,从 S3 中提取对象,生成请求体,将请求发送到端点,最后删除消息。

以下是发生的等待列表:

  • 正在等待来自 SQS 的响应(将消息出列)。
  • 正在等待 S3 的响应。
  • 正在等待来自终结点的响应。
  • 正在等待SQS的响应(删除消息)。

这就是大部分进程时间所消耗的地方。正在等待响应。

在 Go 中编写 worker

我们将从创建一个结构开始:

import (
    "github.com/aws/aws-sdk-go-v2/service/s3"
    "github.com/aws/aws-sdk-go-v2/service/sqs"
)

type Worker struct {
    sqsClient *sqs.Client
    s3Client  *s3.Client
}

结构是字段的集合。这里的结构有两个类型指针字段。第一个字段指向表示 SQS 客户端的结构的内存地址,而第二个字段指向 S3 客户端的内存地址。

现在,让我们向结构中添加一个方法:

func (worker Worker) Work() {

}

Work 方法有一个 Worker 结构作为其接收器。换句话说,通过此方法发送的消息由 Worker 接收。这是 Go 的面向对象编程方式。

在方法内部,我们将启动一个无限循环,并调用 sqsClient 的 ReceiveMessage 方法来接收来自队列的消息:

func (worker Worker) Work() {
    for {
        results, err := worker.sqsClient.ReceiveMessage(
            context.Background(),
            &sqs.ReceiveMessageInput{
                QueueUrl: ptr.String("QUEUE_URL_HERE"),
                MaxNumberOfMessages: 10,
            },
        )
    }
}

AWS SDK 中的每个方法都需要一个 context 和一个 struct,用来表示将传递给底层 HTTP 客户端的输入。

上下文(context)是一种对有关代码执行环境的信息进行分组的方式。当程序作为同时接收和处理多个请求的 web 服务器运行时,上下文会变得很有用。在这种情况下,请求处理代码需要知道它正在处理哪个请求。这就是我们将请求上下文传递给代码的时候,这样它就可以提取请求参数并监听与特定请求相关的web服务器信号。

如果你看一下上面的代码,你可以看到 ReceiveMessage 有多个返回值;其中一个包含 SDK 发出的 HTTP 请求的结果,另一个包含错误(如果有)。

将其视为一个返回结果并可能引发异常的 PHP 方法。Go 中不存在异常。错误的返回方式与其他返回值的返回方式相同。

要捕获错误,我们需要将返回的错误分配给一个变量并检查该变量:

if err != nil {
    log.Printf("error: %v", err)
}

如果将其转成 PHP 代码,像这样:

try {
    $results = $client->receiveMessage();
} catch (Throwable $err) {
    log($err);
}

回到 Go,一旦我们从该 SDK 方法中获得一些结果,我们将遍历消息并处理:

for _, message := range results.Messages {
    // TODO: Extract S3 key from Laravel's payload

    // TODO: Get the object from S3

    // TODO: Process the object

    // TOODO: Send the results to the endpoint
}

要从该消息中提取 S3 对象密钥,我使用简单的正则表达式去匹配 Laravel 创建的消息 payload。

regex, err := regexp.Compile("report-(.*).json")

for _, message := range results.Messages {
    s3Key := fmt.Sprintf(
        "report-%s.json",
        regex.FindStringSubmatch(*message.Body)[1],
    )
}

既然我们有了密钥,我们就可以使用 S3 客户端并调用 GetObject 方法来检索对象 body:

body, err := worker.s3Client.GetObject(s3Key, "BUCKET-NAME")

然后我们进行处理,生成请求主体,并发送请求:

// TODO: Process the object

payload, _ := json.Marshal(map[string]string{
   "KEY":  "VALUE",
})

response, err := http.Post(
    "https://...",
    "application/json",
    bytes.NewBuffer(payload),
)

我们以缓冲区的形式将有效载荷传递给 Post 方法。底层 HTTP 客户端继续从该缓冲区读取并通过打开的 HTTP 连接发送其内容,直到所有内容都已发送。

最后一步是从 SQS 队列中删除消息:

err = worker.sqsClient.DeleteMessage("QUEUE_URL_HERE", *message.ReceiptHandle)

message.ReceiptHandle 之前的 astrisk 用于取消引用,即将实际值存储在指针指向的内存空间中。在我们的例子中,ReceiptHandle 字段包含一个字符串指针。通过取消引用,我们从内存中复制该字符串,并将其传递给 DeleteMessage 方法。

全部代码

以下是完整代码的样子:

 

func (worker Worker) Work() {
    for {
        results, err := worker.sqsClient.ReceiveMessage(
            context.Background(),
            &sqs.ReceiveMessageInput{
                QueueUrl: ptr.String("QUEUE_URL_HERE"),
                MaxNumberOfMessages: 10,
            },
        )

        regex, err := regexp.Compile("report-(.*).json")

        for _, message := range results.Messages {
            s3Key := fmt.Sprintf(
                "report-%s.json",
                regex.FindStringSubmatch(*message.Body)[1],
            )

            body, err := worker.s3Client.GetObject(s3Key, "BUCKET-NAME")

            // TODO: Process the object

            payload, err := json.Marshal(map[string]string{
               "KEY":  "VALUE",
            })

            response, err := http.Post(
                "https://...",
                "application/json",
                bytes.NewBuffer(payload),
            )

            err = worker.sqsClient.DeleteMessage(
                "QUEUE_URL_HERE",
                *message.ReceiptHandle,
            )
        }
    }
}

为了让代码更便于阅读,我移除了 error handling 及其他细节。

运行进程

为了运行 worker,我们需要为 SQS 和 S3 创建 AWS 客户端:

awsConfig, err := config.LoadDefaultConfig(context.Background())

sqsClient = sqs.NewFromConfig(*awsConfig)

s3Client = s3.NewFromConfig(*awsConfig)

然后,创建 worker 结构并调用 Work 方法:

worker := Worker{
    sqsClient: sqsClient,
    s3Client: s3Client,
}

worker.Work()

如果我们编译并运行该程序,该 worker 会开启一个无限循环,该循环会排队等候 10 个 job 并一个个处理。

使用并发

到目前为止,我们开始运行可执行文件的进程一次处理一个作业。让我们通过在协程中运行多个进程来使用 Go 的并发。

waitGroup := sync.WaitGroup{}

waitGroup.Add(700)

for i := 1; i <= 700; i++ {
    go daemon(sqsClient, s3Client, &waitGroup)
}

waitGroup.Wait()

我们创建了一个 wait group,并让它等待 700 次协程完成。然后我们创建一个循环,其中 700 个协程运行一个 daemon 函数。

最后,我们通过调用 waitGroup.Wait() 开始等待。现在,在 700 个协程完成之前,我们的程序不会退出。

daemon 函数如下所示:

func daemon(sqsClient *sqs.Client, s3Client *sqs.Client, waitGroup *sync.WaitGroup) {
    defer waitGroup.Done()

    worker := Worker{
        sqsClient: sqsClient,
        s3Client: s3Client,
    }

    worker.Work()
}

我们首先 defer 对waitGroup().Done() 的调用,这将一个协程标记为已完成。通过 defer 延迟调用,Go 将只在 daemon() 函数即将返回时运行该方法。

然后,我们创建一个 worker 并调用 Work 方法。请记住,这个worker.Work() 方法启动了一个无限循环,只有当我们告诉它时才会返回。这就是为什么我们必须处理Work方法中返回的所有错误,并决定worker是否应该进行另一次迭代、休眠或返回(如果错误不可恢复)。

执行模型

有了我们所拥有的,可执行文件将启动 700 个协程,使作业出列并进行处理。Go 调度程序在等待 I/O 事件时切换到另一个需要 CPU 时间的协程。它还将在协程之间划分 CPU 时间,这样就不会有一个协程消耗所有的 CPU 时间并阻塞所有其他协程。

当我们监控机器的 CPU 利用率时,我们将能够看到可用的 CPU 内核是否被充分利用。如果它们总是被充分利用,这可能表明我们有太多的协程在争夺 CPU 时间。如果它们的运行率低于100%,这意味着还有一些等待,如果必要的话,我们可以添加更多的协程。

其他需要考虑的指标是内存利用率、队列大小和 SQS 的空接收。

如果内存利用率在增加,那么代码中的某个地方就会出现内存泄漏。如果队列清空得不够快,就意味着我们需要增加更多的工作人员 woker。如果有太多的空接收,则意味着在 SQS 队列为空时有太多 worker 在调用 ReceiveMessage。

我在这篇文章中还没有涉及到其他细节,因为我试图保持简短。