Simple Job Queue
Simple Job Queue is a library that can be used to process jobs asynchronously. It can be used with beanstalkd, MySQL/MariaDB, SQLite, and PostgreSQL.
Install
composer require n0nag0n/simple-job-queue
Usage
In order for this to work, you need a way to add jobs to the queue and a way to process the jobs (a worker). Below are examples of how to add a job to the queue and how to process the job.
Adding to Flight
Adding this to Flight is simple and is done using the register()
method. Below is an example of how to add this to Flight.
<?php
require 'vendor/autoload.php';
// Change ['mysql'] to ['beanstalkd'] if you want to use beanstalkd
Flight::register('queue', n0nag0n\Job_Queue::class, ['mysql'], function($Job_Queue) {
// if you have a PDO connection already on Flight::db();
$Job_Queue->addQueueConnection(Flight::db());
// or if you're using beanstalkd/Pheanstalk
$pheanstalk = Pheanstalk\Pheanstalk::create('127.0.0.1');
$Job_Queue->addQueueConnection($pheanstalk);
});
Adding a new job
When you add a job, you need to specify a pipeline (queue). This is comparable to a channel in RabbitMQ or a tube in beanstalkd.
<?php
Flight::queue()->selectPipeline('send_important_emails');
Flight::queue()->addJob(json_encode([ 'something' => 'that', 'ends' => 'up', 'a' => 'string' ]));
Running a worker
Here is an example file of how to run a worker.
<?php
require 'vendor/autoload.php';
$Job_Queue = new n0nag0n\Job_Queue('mysql');
// PDO connection
$PDO = new PDO('mysql:dbname=testdb;host=127.0.0.1', 'user', 'pass');
$Job_Queue->addQueueConnection($PDO);
// or if you're using beanstalkd/Pheanstalk
$pheanstalk = Pheanstalk\Pheanstalk::create('127.0.0.1');
$Job_Queue->addQueueConnection($pheanstalk);
$Job_Queue->watchPipeline('send_important_emails');
while(true) {
$job = $Job_Queue->getNextJobAndReserve();
// adjust to whatever makes you sleep better at night (for database queues only, beanstalkd does not need this if statement)
if(empty($job)) {
usleep(500000);
continue;
}
echo "Processing {$job['id']}\n";
$payload = json_decode($job['payload'], true);
try {
$result = doSomethingThatDoesSomething($payload);
if($result === true) {
$Job_Queue->deleteJob($job);
} else {
// this takes it out of the ready queue and puts it in another queue that can be picked up and "kicked" later.
$Job_Queue->buryJob($job);
}
} catch(Exception $e) {
$Job_Queue->buryJob($job);
}
}
Handling Long Processes with Supervisord
Supervisord is a process control system that ensures your worker processes stay running continuously. Here's a more complete guide on setting it up with your Simple Job Queue worker:
Installing Supervisord
# On Ubuntu/Debian
sudo apt-get install supervisor
# On CentOS/RHEL
sudo yum install supervisor
# On macOS with Homebrew
brew install supervisor
Creating a Worker Script
First, save your worker code to a dedicated PHP file:
<?php
require 'vendor/autoload.php';
$Job_Queue = new n0nag0n\Job_Queue('mysql');
// PDO connection
$PDO = new PDO('mysql:dbname=your_database;host=127.0.0.1', 'username', 'password');
$Job_Queue->addQueueConnection($PDO);
// Set the pipeline to watch
$Job_Queue->watchPipeline('send_important_emails');
// Log start of worker
echo date('Y-m-d H:i:s') . " - Worker started\n";
while(true) {
$job = $Job_Queue->getNextJobAndReserve();
if(empty($job)) {
usleep(500000); // Sleep for 0.5 seconds
continue;
}
echo date('Y-m-d H:i:s') . " - Processing job {$job['id']}\n";
$payload = json_decode($job['payload'], true);
try {
$result = doSomethingThatDoesSomething($payload);
if($result === true) {
$Job_Queue->deleteJob($job);
echo date('Y-m-d H:i:s') . " - Job {$job['id']} completed successfully\n";
} else {
$Job_Queue->buryJob($job);
echo date('Y-m-d H:i:s') . " - Job {$job['id']} failed, buried\n";
}
} catch(Exception $e) {
$Job_Queue->buryJob($job);
echo date('Y-m-d H:i:s') . " - Exception processing job {$job['id']}: {$e->getMessage()}\n";
}
}
Configuring Supervisord
Create a configuration file for your worker:
[program:email_worker]
command=php /path/to/worker.php
directory=/path/to/project
autostart=true
autorestart=true
startretries=3
stderr_logfile=/var/log/simple_job_queue_err.log
stdout_logfile=/var/log/simple_job_queue.log
user=www-data
numprocs=2
process_name=%(program_name)s_%(process_num)02d
Key Configuration Options:
command
: The command to run your workerdirectory
: Working directory for the workerautostart
: Start automatically when supervisord startsautorestart
: Restart automatically if the process exitsstartretries
: Number of times to retry starting if it failsstderr_logfile
/stdout_logfile
: Log file locationsuser
: System user to run the process asnumprocs
: Number of worker instances to runprocess_name
: Naming format for multiple worker processes
Managing Workers with Supervisorctl
After creating or modifying the configuration:
# Reload supervisor configuration
sudo supervisorctl reread
sudo supervisorctl update
# Control specific worker processes
sudo supervisorctl start email_worker:*
sudo supervisorctl stop email_worker:*
sudo supervisorctl restart email_worker:*
sudo supervisorctl status email_worker:*
Running Multiple Pipelines
For multiple pipelines, create separate worker files and configurations:
[program:email_worker]
command=php /path/to/email_worker.php
# ... other configs ...
[program:notification_worker]
command=php /path/to/notification_worker.php
# ... other configs ...
Monitoring and Logs
Check logs to monitor worker activity:
# View logs
sudo tail -f /var/log/simple_job_queue.log
# Check status
sudo supervisorctl status
This setup ensures your job workers continue running even after crashes, server reboots, or other issues, making your queue system reliable for production environments.