シンプルジョブキュー
シンプルジョブキューは、非同期でジョブを処理するために使用できるライブラリです。beanstalkd、MySQL/MariaDB、SQLite、およびPostgreSQLで使用できます。
インストール
composer require n0nag0n/simple-job-queue
使用法
これを機能させるには、キューにジョブを追加する方法と、ジョブを処理する方法(ワーカー)が必要です。以下は、ジョブをキューに追加する方法と、そのジョブを処理する方法の例です。
Flightへの追加
これをFlightに追加するのは簡単で、register()
メソッドを使用して行います。以下は、これをFlightに追加する方法の例です。
<?php
require 'vendor/autoload.php';
// beanstalkdを使用する場合は、['mysql']を['beanstalkd']に変更してください
Flight::register('queue', n0nag0n\Job_Queue::class, ['mysql'], function($Job_Queue) {
// Flight::db()で既にPDO接続がある場合
$Job_Queue->addQueueConnection(Flight::db());
// または、beanstalkd/Pheanstalkを使用している場合
$pheanstalk = Pheanstalk\Pheanstalk::create('127.0.0.1');
$Job_Queue->addQueueConnection($pheanstalk);
});
新しいジョブの追加
ジョブを追加する場合、パイプライン(キュー)を指定する必要があります。これは、RabbitMQのチャネルやbeanstalkdのチューブに相当します。
<?php
Flight::queue()->selectPipeline('send_important_emails');
Flight::queue()->addJob(json_encode([ 'something' => 'that', 'ends' => 'up', 'a' => 'string' ]));
ワーカーの実行
ここにワーカーを実行する方法のサンプルファイルがあります。
<?php
require 'vendor/autoload.php';
$Job_Queue = new n0nag0n\Job_Queue('mysql');
// PDO接続
$PDO = new PDO('mysql:dbname=testdb;host=127.0.0.1', 'user', 'pass');
$Job_Queue->addQueueConnection($PDO);
// または、beanstalkd/Pheanstalkを使用している場合
$pheanstalk = Pheanstalk\Pheanstalk::create('127.0.0.1');
$Job_Queue->addQueueConnection($pheanstalk);
$Job_Queue->watchPipeline('send_important_emails');
while(true) {
$job = $Job_Queue->getNextJobAndReserve();
// あなたが夜に良く眠れるように調整してください(データベースキューのみ、beanstalkdではこのif文は必要ありません)
if(empty($job)) {
usleep(500000);
continue;
}
echo "処理中 {$job['id']}\n";
$payload = json_decode($job['payload'], true);
try {
$result = doSomethingThatDoesSomething($payload);
if($result === true) {
$Job_Queue->deleteJob($job);
} else {
// これはレディキューから取り出し、後で拾って「キック」できる別のキューに入れます。
$Job_Queue->buryJob($job);
}
} catch(Exception $e) {
$Job_Queue->buryJob($job);
}
}
Supervisordを使用した長いプロセスの処理
Supervisordは、ワーカープロセスが継続的に実行されることを保証するプロセス制御システムです。シンプルジョブキューワーカーの設定に関するより完全なガイドは次のとおりです。
Supervisordのインストール
# Ubuntu/Debian上
sudo apt-get install supervisor
# CentOS/RHEL上
sudo yum install supervisor
# Homebrewを使用したmacOS上
brew install supervisor
ワーカースクリプトの作成
最初に、ワーカーコードを専用のPHPファイルに保存します。
<?php
require 'vendor/autoload.php';
$Job_Queue = new n0nag0n\Job_Queue('mysql');
// PDO接続
$PDO = new PDO('mysql:dbname=your_database;host=127.0.0.1', 'username', 'password');
$Job_Queue->addQueueConnection($PDO);
// 監視するパイプラインを設定
$Job_Queue->watchPipeline('send_important_emails');
// ワーカーの開始をログに記録
echo date('Y-m-d H:i:s') . " - ワーカーが開始されました\n";
while(true) {
$job = $Job_Queue->getNextJobAndReserve();
if(empty($job)) {
usleep(500000); // 0.5秒間スリープ
continue;
}
echo date('Y-m-d H:i:s') . " - ジョブ {$job['id']}を処理中\n";
$payload = json_decode($job['payload'], true);
try {
$result = doSomethingThatDoesSomething($payload);
if($result === true) {
$Job_Queue->deleteJob($job);
echo date('Y-m-d H:i:s') . " - ジョブ {$job['id']}は正常に完了しました\n";
} else {
$Job_Queue->buryJob($job);
echo date('Y-m-d H:i:s') . " - ジョブ {$job['id']}が失敗し、埋められました\n";
}
} catch(Exception $e) {
$Job_Queue->buryJob($job);
echo date('Y-m-d H:i:s') . " - ジョブ {$job['id']}の処理中に例外が発生しました: {$e->getMessage()}\n";
}
}
Supervisordの設定
ワーカーのための設定ファイルを作成します。
[program:email_worker]
command=php /path/to/worker.php
directory=/path/to/project
autostart=true
autorestart=true
startretries=3
stderr_logfile=/var/log/simple_job_queue_err.log
stdout_logfile=/var/log/simple_job_queue.log
user=www-data
numprocs=2
process_name=%(program_name)s_%(process_num)02d
主要な設定オプション:
command
: ワーカーを実行するコマンドdirectory
: ワーカーの作業ディレクトリautostart
: supervisordが起動するときに自動的に開始autorestart
: プロセスが終了した場合に自動的に再起動startretries
: 失敗した場合に再起動を試みる回数stderr_logfile
/stdout_logfile
: ログファイルの場所user
: プロセスを実行するシステムユーザーnumprocs
: 実行するワーカーインスタンスの数process_name
: 複数のワーカープロセスの命名形式
Supervisorctlによるワーカーの管理
設定を作成または変更した後:
# supervisor設定を再読み込み
sudo supervisorctl reread
sudo supervisorctl update
# 特定のワーカープロセスを制御
sudo supervisorctl start email_worker:*
sudo supervisorctl stop email_worker:*
sudo supervisorctl restart email_worker:*
sudo supervisorctl status email_worker:*