Queues and Jobs
This comprehensive guide covers Glueful's enterprise-grade queue system, featuring multiple drivers, auto-scaling, batch processing, and advanced monitoring capabilities for high-performance background job processing.
Table of Contents
- Overview
- Queue Drivers
- Queue Manager
- Job System
- Process Management
- Auto-Scaling
- Batch Processing
- Failed Job Handling
- Monitoring and Statistics
- CLI Commands
- Configuration
- Production Deployment
Overview
Glueful's queue system provides a powerful, scalable solution for background job processing with support for multiple storage backends, automatic scaling, and comprehensive monitoring.
Key Features
- Multiple Drivers: Database and Redis queue drivers with atomic operations
- Auto-Scaling: Intelligent worker scaling based on queue load and metrics
- Process Management: Advanced worker process management with health monitoring
- Batch Processing: Efficient bulk job processing with atomic operations
- Priority Queues: Priority-based job ordering for critical tasks
- Delayed Jobs: Schedule jobs for future execution
- Failed Job Recovery: Comprehensive failed job handling and retry mechanisms
- Real-time Monitoring: Queue statistics, worker metrics, and health monitoring
- Plugin System: Extensible architecture with driver discovery and validation
Architecture Components
- QueueManager: Central management for connections and operations
- Queue Drivers: Database and Redis implementations
- ProcessManager: Worker process lifecycle management
- AutoScaler: Intelligent scaling based on load metrics
- Job Classes: Base job system with serialization and error handling
- Monitoring System: Real-time metrics and health monitoring
Queue Drivers
Database Queue Driver
The database driver provides ACID-compliant job storage with transaction support and efficient indexing.
Features
- ACID Compliance: Full transaction support for job operations
- Atomic Job Reservation: Row-level locking for job picking
- Priority Queues: Priority-based job ordering with compound indexes
- Delayed Jobs: Timestamp-based job scheduling
- Failed Job Isolation: Separate table for failed job tracking
- Batch Operations: Efficient bulk insertions
Usage
use Glueful\Queue\QueueManager;
// Initialize queue manager with database driver
$queueManager = new QueueManager([
'default' => 'database',
'connections' => [
'database' => [
'driver' => 'database',
'table' => 'queue_jobs',
'failed_table' => 'queue_failed_jobs',
'retry_after' => 90
]
]
]);
// Push immediate job
$jobUuid = $queueManager->push('ProcessEmail', [
'to' => '[email protected]',
'template' => 'welcome'
]);
// Push delayed job (5 minutes)
$delayedUuid = $queueManager->later(300, 'SendReminder', [
'user_id' => 123,
'type' => 'subscription_expiry'
]);
// Push high priority job
$priorityUuid = $queueManager->push('ProcessPayment', [
'order_id' => 456,
'amount' => 99.99,
'priority' => 10
]);
Database Schema
-- Queue jobs table
CREATE TABLE queue_jobs (
id INTEGER PRIMARY KEY AUTO_INCREMENT,
uuid CHAR(21) NOT NULL UNIQUE,
queue VARCHAR(100) NOT NULL,
payload LONGTEXT NOT NULL,
attempts INTEGER DEFAULT 0,
reserved_at INTEGER NULL,
available_at INTEGER NOT NULL,
created_at INTEGER NOT NULL,
priority INTEGER DEFAULT 0,
batch_id CHAR(21) NULL,
INDEX idx_queue_processing (queue, reserved_at, available_at, priority),
INDEX idx_batch_id (batch_id)
);
-- Failed jobs table
CREATE TABLE queue_failed_jobs (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
uuid CHAR(12) NOT NULL UNIQUE,
connection VARCHAR(255) NOT NULL,
queue VARCHAR(255) NOT NULL,
payload TEXT NOT NULL,
exception TEXT NOT NULL,
batch_uuid CHAR(12) NULL,
failed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_connection_queue (connection, queue),
INDEX idx_failed_at (failed_at)
);
Redis Queue Driver
The Redis driver provides high-performance job processing with atomic operations and memory efficiency.
Features
- High Performance: Memory-based storage with atomic operations
- Atomic Operations: Redis transactions for job consistency
- Priority Queues: Sorted sets for priority-based processing
- Delayed Jobs: Redis ZADD for timestamp-based scheduling
- Memory Efficient: Optimized data structures and job expiration
- Connection Pooling: Persistent connections with failover support
Redis Data Structures
# Queue structures
queue:{name} # List for immediate jobs
queue:{name}:delayed # Sorted set for delayed jobs (score = timestamp)
queue:{name}:reserved # Sorted set for reserved jobs (score = timeout)
queue:{name}:failed # List for failed jobs
# Job data
job:{uuid} # Hash containing job data
# Queue registry
queues # Set of all queue names
Usage
// Redis queue configuration
$queueManager = new QueueManager([
'default' => 'redis',
'connections' => [
'redis' => [
'driver' => 'redis',
'host' => '127.0.0.1',
'port' => 6379,
'database' => 0,
'prefix' => 'glueful:queue:',
'retry_after' => 90,
'job_expiration' => 3600
]
]
]);
// Redis supports all the same operations as database driver
$queueManager->push('ProcessImages', ['images' => [1, 2, 3]]);
$queueManager->later(600, 'CleanupTemp', ['older_than' => '1 hour']);
Queue Manager
Central Queue Management
The QueueManager provides a unified interface for all queue operations across multiple drivers and connections.
use Glueful\Queue\QueueManager;
// Create from configuration
$queueManager = QueueManager::createDefault();
// Or from custom config file
$queueManager = QueueManager::fromConfigFile('/path/to/queue.php');
// Get specific connection
$redisQueue = $queueManager->connection('redis');
$dbQueue = $queueManager->connection('database');
// Push to specific connection and queue
$uuid = $queueManager->push(
'ProcessOrder',
['order_id' => 123],
'orders', // queue name
'redis' // connection name
);
Bulk Operations
// Bulk job creation for high throughput
$jobs = [
['job' => 'ProcessEmail', 'data' => ['to' => '[email protected]']],
['job' => 'ProcessEmail', 'data' => ['to' => '[email protected]']],
['job' => 'ProcessEmail', 'data' => ['to' => '[email protected]'], 'delay' => 300],
['job' => 'ProcessSMS', 'data' => ['to' => '+1234567890'], 'priority' => 5]
];
$uuids = $queueManager->bulk($jobs, 'notifications');
// Returns: ['uuid1', 'uuid2', 'uuid3', 'uuid4']
Queue Statistics and Health
// Get queue statistics
$stats = $queueManager->getStats('orders');
/*
[
'total' => 150,
'pending' => 120,
'reserved' => 25,
'delayed' => 5,
'failed' => 3,
'queues' => ['orders', 'emails', 'images']
]
*/
// Test connection health
$health = $queueManager->testConnection('redis');
/*
[
'connection' => 'redis',
'healthy' => true,
'message' => 'Redis connection is healthy',
'metrics' => [
'redis_version' => '6.2.6',
'total_queues' => 5,
'total_jobs' => 1250
],
'response_time' => 2.5
]
*/
// Get available drivers and their capabilities
$drivers = $queueManager->getAvailableDrivers();
/*
[
[
'name' => 'database',
'version' => '1.0.0',
'description' => 'Database-backed queue driver with transaction support',
'features' => ['delayed_jobs', 'priority_queues', 'atomic_operations'],
'dependencies' => []
],
[
'name' => 'redis',
'version' => '1.0.0',
'description' => 'High-performance Redis-backed queue driver',
'features' => ['delayed_jobs', 'priority_queues', 'high_throughput'],
'dependencies' => ['redis']
]
]
*/
Job System
Tasks vs Jobs Architecture
Starting with Glueful 1.2.0, the framework implements a clear separation between business logic (Tasks) and queue execution (Jobs):
- Tasks (
src/Tasks/
): Contains pure business logic that can be executed directly or queued - Jobs (
src/Queue/Jobs/
): Lightweight wrappers that handle queue-specific concerns like retries, logging, and error handling
This architecture provides several benefits:
- Testability: Business logic can be tested independently of queue infrastructure
- Flexibility: Tasks can be executed directly without queuing when needed
- Maintainability: Clear separation of concerns between business logic and execution context
- Reliability: Queue-specific error handling and retry logic is centralized in Job classes
Example: Cache Maintenance
// Task: Pure business logic
use Glueful\Tasks\CacheMaintenanceTask;
class CacheMaintenanceTask
{
public function handle(array $options = []): array
{
$operation = $options['operation'] ?? 'clearExpiredKeys';
$verbose = $options['verbose'] ?? false;
return match ($operation) {
'clearExpiredKeys' => $this->clearExpiredKeys($verbose),
'optimizeCache' => $this->optimizeCache($verbose),
'fullCleanup' => $this->fullCleanup($options),
default => throw new \InvalidArgumentException("Unknown operation: {$operation}")
};
}
private function clearExpiredKeys(bool $verbose): array
{
// Business logic for clearing expired cache keys
$cleared = 0;
// ... implementation
return ['operation' => 'clearExpiredKeys', 'cleared' => $cleared];
}
}
// Job: Queue wrapper with error handling
use Glueful\Queue\Jobs\CacheMaintenanceJob;
use Glueful\Queue\Job;
class CacheMaintenanceJob extends Job
{
public function handle(): void
{
$data = $this->getData();
$operation = $data['operation'] ?? 'clearExpiredKeys';
$options = $data['options'] ?? [];
try {
$task = new CacheMaintenanceTask();
$result = $task->handle($options);
// Log successful completion
app(LogManager::class)->info('Cache maintenance completed successfully', [
'operation' => $operation,
'result' => $result,
'job_uuid' => $this->getUuid()
]);
} catch (\Exception $e) {
app(LogManager::class)->error('Cache maintenance failed', [
'operation' => $operation,
'error' => $e->getMessage(),
'job_uuid' => $this->getUuid()
]);
throw $e;
}
}
public function failed(\Exception $exception): void
{
$data = $this->getData();
$operation = $data['operation'] ?? 'clearExpiredKeys';
$options = $data['options'] ?? [];
app(LogManager::class)->error('Cache maintenance job failed', [
'operation' => $operation,
'options' => $options,
'error' => $exception->getMessage()
]);
}
}
Using Tasks and Jobs
// Direct execution (testing, CLI commands)
$task = new CacheMaintenanceTask();
$result = $task->handle(['operation' => 'clearExpiredKeys', 'verbose' => true]);
// Queued execution
$queueManager = app(QueueManager::class);
$jobId = $queueManager->push(CacheMaintenanceJob::class, [
'operation' => 'clearExpiredKeys',
'options' => ['verbose' => true]
], 'maintenance');
// Scheduled execution
$jobId = $queueManager->later(3600, CacheMaintenanceJob::class, [
'operation' => 'optimizeCache',
'options' => ['retention_days' => 30]
], 'maintenance');
Available Framework Tasks
The framework includes several built-in tasks for common maintenance operations:
// Cache maintenance
use Glueful\Tasks\CacheMaintenanceTask;
$task = new CacheMaintenanceTask();
$result = $task->handle(['operation' => 'fullCleanup', 'retention_days' => 30]);
// Database backup
use Glueful\Tasks\DatabaseBackupTask;
$task = new DatabaseBackupTask();
$result = $task->handle(['retention_days' => 7, 'compress' => true]);
// Log cleanup
use Glueful\Tasks\LogCleanupTask;
$task = new LogCleanupTask();
$result = $task->handle(['retention_days' => 14, 'cleanup_type' => 'filesystem']);
// Session cleanup
use Glueful\Tasks\SessionCleanupTask;
$task = new SessionCleanupTask();
$result = $task->handle(['max_lifetime' => 1440]); // 24 hours
// Notification retry processing
use Glueful\Tasks\NotificationRetryTask;
$task = new NotificationRetryTask();
$result = $task->handle(['max_retries' => 3, 'retry_delay' => 300]);
Creating Job Classes
use Glueful\Queue\Job;
class ProcessEmailJob extends Job
{
/**
* Execute the job
*/
public function handle(): void
{
$data = $this->getData();
// Send email
$emailService = container()->get(EmailService::class);
$emailService->send(
$data['to'],
$data['subject'],
$data['template'],
$data['variables'] ?? []
);
// Log success
logger()->info('Email sent successfully', [
'job_uuid' => $this->getUuid(),
'recipient' => $data['to'],
'template' => $data['template']
]);
}
/**
* Handle job failure
*/
public function failed(\Exception $exception): void
{
$data = $this->getData();
// Log failure
logger()->error('Email job failed', [
'job_uuid' => $this->getUuid(),
'recipient' => $data['to'],
'error' => $exception->getMessage(),
'attempts' => $this->getAttempts()
]);
// Notify admin for critical emails
if ($data['critical'] ?? false) {
$notificationService = container()->get(NotificationService::class);
$notificationService->alertAdmin('Critical email job failed', [
'job_uuid' => $this->getUuid(),
'error' => $exception->getMessage()
]);
}
}
/**
* Configure job settings
*/
public function getMaxAttempts(): int
{
return 5; // Retry up to 5 times
}
public function getTimeout(): int
{
return 120; // 2 minute timeout
}
}
Advanced Job Features
class ProcessImagesJob extends Job
{
public function handle(): void
{
$data = $this->getData();
$imageIds = $data['image_ids'];
foreach ($imageIds as $imageId) {
try {
$this->processImage($imageId);
} catch (\Exception $e) {
// Log individual image failure but continue processing
logger()->warning('Image processing failed', [
'image_id' => $imageId,
'error' => $e->getMessage()
]);
// Create separate retry job for failed image
$queueManager = container()->get(QueueManager::class);
$queueManager->later(300, ProcessSingleImageJob::class, [
'image_id' => $imageId,
'retry_count' => ($data['retry_count'] ?? 0) + 1
]);
}
}
}
/**
* Check if job should be retried based on custom logic
*/
public function shouldRetry(): bool
{
$data = $this->getData();
// Don't retry if all images were processed individually
if ($data['processed_individually'] ?? false) {
return false;
}
return parent::shouldRetry();
}
/**
* Release job with exponential backoff
*/
protected function releaseWithBackoff(): void
{
$attempts = $this->getAttempts();
$delay = min(300, pow(2, $attempts) * 10); // Exponential backoff, max 5 minutes
$this->release($delay);
}
}
Job Serialization and Batch Support
// Job serialization for persistent storage
$job = new ProcessEmailJob(['to' => '[email protected]']);
$serialized = $job->serialize();
$unserialized = ProcessEmailJob::unserialize($serialized);
// Create job from array (useful for API endpoints)
$jobData = [
'job' => ProcessEmailJob::class,
'data' => ['to' => '[email protected]'],
'priority' => 5,
'batchUuid' => 'batch_123'
];
$job = Job::fromArray($jobData);
// Convert job to array for logging/debugging
$jobArray = $job->toArray();
$jobJson = $job->toJson();
Process Management
Worker Process Management
The ProcessManager handles the lifecycle of worker processes with automatic scaling, health monitoring, and graceful shutdown.
use Glueful\Queue\Process\ProcessManager;
use Glueful\Queue\Process\ProcessFactory;
use Glueful\Queue\Monitoring\WorkerMonitor;
use Glueful\Queue\WorkerOptions;
// Initialize process manager
$processManager = new ProcessManager(
factory: new ProcessFactory(),
monitor: new WorkerMonitor(),
logger: $logger,
config: [
'max_workers' => 20,
'restart_delay' => 5,
'health_check_interval' => 30
]
);
// Spawn workers for different queues
$emailWorkerOptions = new WorkerOptions(
memory: 256, // MB
timeout: 120, // seconds
maxJobs: 500, // jobs per worker
maxAttempts: 3 // retry attempts
);
$imageWorkerOptions = new WorkerOptions(
memory: 512, // Higher memory for image processing
timeout: 300, // Longer timeout
maxJobs: 100 // Fewer jobs due to resource intensity
);
// Spawn workers
$emailWorker = $processManager->spawn('emails', $emailWorkerOptions);
$imageWorker = $processManager->spawn('images', $imageWorkerOptions);
// Scale workers based on load
$processManager->scale(5, 'emails', $emailWorkerOptions); // Scale to 5 email workers
$processManager->scale(2, 'images', $imageWorkerOptions); // Scale to 2 image workers
Worker Status and Monitoring
// Get status of all workers
$status = $processManager->getStatus();
/*
[
[
'id' => 'worker_abc123',
'queue' => 'emails',
'pid' => 12345,
'status' => 'running',
'memory_usage' => 145.2,
'cpu_usage' => 15.6,
'jobs_processed' => 250,
'started_at' => '2024-07-02 10:15:30',
'last_heartbeat' => '2024-07-02 12:45:22'
],
// ... more workers
]
*/
// Monitor worker health
$processManager->monitorHealth(); // Automatically restarts unhealthy workers
// Manual worker restart
$processManager->restart('worker_abc123');
// Graceful shutdown of all workers
$processManager->stopAll(timeout: 60);
Auto-Scaling
Intelligent Worker Scaling
The AutoScaler automatically adjusts worker count based on queue load, processing rates, and resource utilization.
use Glueful\Queue\Process\AutoScaler;
// Initialize auto-scaler
$autoScaler = new AutoScaler(
processManager: $processManager,
queueManager: $queueManager,
logger: $logger,
config: [
'enabled' => true,
'limits' => [
'max_workers_per_queue' => 15
],
'auto_scale' => [
'scale_up_threshold' => 100, // Queue size threshold
'scale_down_threshold' => 10, // Queue size threshold
'scale_up_step' => 2, // Workers to add
'scale_down_step' => 1, // Workers to remove
'cooldown_period' => 300 // Seconds between scaling
],
'queues' => [
'emails' => [
'auto_scale' => true,
'min_workers' => 2,
'max_workers' => 10,
'scale_up_threshold' => 50,
'max_wait_time' => 30
],
'images' => [
'auto_scale' => true,
'min_workers' => 1,
'max_workers' => 5,
'scale_up_threshold' => 20,
'max_wait_time' => 120
]
]
]
);
// Perform auto-scaling check
$scalingActions = $autoScaler->scale();
/*
[
[
'queue' => 'emails',
'action' => 'scale_up',
'from' => 3,
'to' => 5,
'reason' => 'Queue size (75) > threshold (50), High worker utilization (92%)',
'metrics' => [
'queue_size' => 75,
'avg_worker_utilization' => 92,
'processing_rate' => 25.5,
'incoming_rate' => 35.2
]
]
]
*/
// Force scaling (bypasses cooldown)
$autoScaler->forceScale('images', 3, 'Manual scale for image batch processing');
// Get scaling history
$history = $autoScaler->getScalingHistory('emails');
/*
[
[
'queue' => 'emails',
'timestamp' => 1720035123,
'from_workers' => 3,
'to_workers' => 5,
'reason' => 'Queue size (75) > threshold (50)'
],
// ... more history entries
]
*/
Scaling Metrics and Decision Logic
// The auto-scaler considers multiple metrics for scaling decisions:
// Scale-up conditions:
// - Queue size > scale_up_threshold
// - Incoming rate > processing rate * 1.5
// - Average worker utilization > 85%
// - Average wait time > max_wait_time
// Scale-down conditions:
// - Queue size < scale_down_threshold
// - Average worker utilization < 30% AND wait time < 10 seconds
// - Processing rate > incoming rate * 2 AND queue size < 5
// Custom scaling logic can be implemented by extending AutoScaler
class CustomAutoScaler extends AutoScaler
{
protected function shouldScaleUp(array $metrics, int $currentWorkers, array $queueConfig): bool
{
// Add custom business logic
$isBusinessHours = (date('H') >= 9 && date('H') <= 17);
$isWeekday = !in_array(date('w'), [0, 6]);
if ($isBusinessHours && $isWeekday) {
// More aggressive scaling during business hours
return $metrics['queue_size'] > 25 || parent::shouldScaleUp($metrics, $currentWorkers, $queueConfig);
}
return parent::shouldScaleUp($metrics, $currentWorkers, $queueConfig);
}
}
Batch Processing
Creating and Managing Batches
use Glueful\Helpers\Utils;
// Create a batch of related jobs
$batchUuid = Utils::generateNanoID();
$jobs = [
[
'job' => ProcessEmailJob::class,
'data' => ['to' => '[email protected]', 'template' => 'newsletter'],
'batch_uuid' => $batchUuid
],
[
'job' => ProcessEmailJob::class,
'data' => ['to' => '[email protected]', 'template' => 'newsletter'],
'batch_uuid' => $batchUuid
],
[
'job' => ProcessEmailJob::class,
'data' => ['to' => '[email protected]', 'template' => 'newsletter'],
'batch_uuid' => $batchUuid
]
];
// Push batch jobs
$uuids = $queueManager->bulk($jobs, 'emails');
// Track batch progress
class BatchProgressTracker
{
private QueueManager $queueManager;
public function getBatchProgress(string $batchUuid): array
{
// Get batch job statistics from database
$db = container()->get(DatabaseInterface::class);
$completed = $db->count('queue_jobs_completed', ['batch_uuid' => $batchUuid]);
$failed = $db->count('queue_failed_jobs', ['batch_uuid' => $batchUuid]);
$pending = $db->count('queue_jobs', ['batch_uuid' => $batchUuid]);
$total = $completed + $failed + $pending;
return [
'batch_uuid' => $batchUuid,
'total' => $total,
'completed' => $completed,
'failed' => $failed,
'pending' => $pending,
'progress_percentage' => $total > 0 ? ($completed / $total) * 100 : 0,
'success_rate' => ($completed + $failed) > 0 ? ($completed / ($completed + $failed)) * 100 : 0
];
}
}
Batch Completion Callbacks
class NewsletterBatchJob extends Job
{
public function handle(): void
{
// Process individual newsletter email
$this->sendNewsletterEmail();
// Check if this is the last job in the batch
if ($this->isBatchComplete()) {
$this->handleBatchCompletion();
}
}
private function isBatchComplete(): bool
{
$batchUuid = $this->getBatchUuid();
if (!$batchUuid) {
return true; // Not part of batch
}
$db = container()->get(DatabaseInterface::class);
$remaining = $db->count('queue_jobs', ['batch_uuid' => $batchUuid]);
return $remaining <= 1; // This job is the last one
}
private function handleBatchCompletion(): void
{
$batchUuid = $this->getBatchUuid();
// Create completion job
$queueManager = container()->get(QueueManager::class);
$queueManager->push(NewsletterBatchCompletedJob::class, [
'batch_uuid' => $batchUuid,
'completed_at' => date('Y-m-d H:i:s')
]);
}
}
class NewsletterBatchCompletedJob extends Job
{
public function handle(): void
{
$data = $this->getData();
$batchUuid = $data['batch_uuid'];
// Generate batch report
$tracker = new BatchProgressTracker();
$progress = $tracker->getBatchProgress($batchUuid);
// Send completion notification
$notificationService = container()->get(NotificationService::class);
$notificationService->send(
'[email protected]',
'Newsletter Batch Completed',
'batch-completion',
[
'batch_uuid' => $batchUuid,
'total_emails' => $progress['total'],
'success_rate' => $progress['success_rate'],
'completed_at' => $data['completed_at']
]
);
}
}
Failed Job Handling
Failed Job Recovery and Analysis
use Glueful\Queue\Failed\FailedJobProvider;
// Get failed job provider
$failedJobProvider = new FailedJobProvider($database);
// Get failed jobs
$failedJobs = $failedJobProvider->all();
foreach ($failedJobs as $failedJob) {
echo "Failed Job: {$failedJob['uuid']}\n";
echo "Queue: {$failedJob['queue']}\n";
echo "Failed At: {$failedJob['failed_at']}\n";
echo "Exception: {$failedJob['exception']}\n";
echo "---\n";
}
// Get failed jobs for specific queue
$emailFailures = $failedJobProvider->getByQueue('emails');
// Retry specific failed job
$failedJobProvider->retry('failed_job_uuid_123');
// Retry all failed jobs for a queue
$retryCount = $failedJobProvider->retryQueue('emails');
// Delete old failed jobs
$deletedCount = $failedJobProvider->prune(days: 30);
Custom Failed Job Handlers
class CustomFailedJobHandler
{
public function handle(JobInterface $job, \Exception $exception): void
{
$jobData = $job->getData();
// Categorize failure
$failureCategory = $this->categorizeFailure($exception);
switch ($failureCategory) {
case 'network_error':
// Retry with exponential backoff
$delay = min(3600, pow(2, $job->getAttempts()) * 60);
$job->release($delay);
break;
case 'validation_error':
// Don't retry validation errors, log for investigation
$this->logValidationError($job, $exception);
break;
case 'resource_exhaustion':
// Retry after longer delay when resources recover
$job->release(1800); // 30 minutes
break;
default:
// Standard retry logic
if ($job->shouldRetry()) {
$job->release(300); // 5 minutes
} else {
$this->moveToFailedJobs($job, $exception);
}
}
}
private function categorizeFailure(\Exception $exception): string
{
$message = strtolower($exception->getMessage());
if (strpos($message, 'network') !== false || strpos($message, 'timeout') !== false) {
return 'network_error';
}
if (strpos($message, 'validation') !== false || strpos($message, 'invalid') !== false) {
return 'validation_error';
}
if (strpos($message, 'memory') !== false || strpos($message, 'disk') !== false) {
return 'resource_exhaustion';
}
return 'unknown';
}
}
Monitoring and Statistics
Real-time Queue Monitoring
class QueueMonitoringService
{
private QueueManager $queueManager;
private ProcessManager $processManager;
public function getSystemOverview(): array
{
$overview = [
'timestamp' => time(),
'queues' => [],
'workers' => [],
'system_health' => 'healthy'
];
// Get queue statistics
$connections = $this->queueManager->getAvailableConnections();
foreach ($connections as $connection) {
$stats = $this->queueManager->getStats(null, $connection);
$health = $this->queueManager->testConnection($connection);
$overview['queues'][$connection] = [
'stats' => $stats,
'health' => $health,
'throughput' => $this->calculateThroughput($connection)
];
if (!$health['healthy']) {
$overview['system_health'] = 'degraded';
}
}
// Get worker statistics
$overview['workers'] = [
'total' => $this->processManager->getWorkerCount(),
'by_queue' => $this->getWorkersByQueue(),
'resource_usage' => $this->getWorkerResourceUsage()
];
return $overview;
}
public function getQueueMetrics(string $queueName, int $timeWindow = 3600): array
{
// Get metrics for the last hour by default
$endTime = time();
$startTime = $endTime - $timeWindow;
// This would typically query a metrics database
return [
'queue' => $queueName,
'time_window' => $timeWindow,
'metrics' => [
'jobs_processed' => $this->getJobsProcessed($queueName, $startTime, $endTime),
'average_processing_time' => $this->getAverageProcessingTime($queueName, $startTime, $endTime),
'failure_rate' => $this->getFailureRate($queueName, $startTime, $endTime),
'throughput_per_minute' => $this->getThroughputPerMinute($queueName, $startTime, $endTime),
'peak_queue_size' => $this->getPeakQueueSize($queueName, $startTime, $endTime)
]
];
}
public function getAlerts(): array
{
$alerts = [];
// Check for high failure rates
$stats = $this->queueManager->getStats();
if ($stats['failed'] > 0) {
$failureRate = ($stats['failed'] / ($stats['total'] + $stats['failed'])) * 100;
if ($failureRate > 10) { // > 10% failure rate
$alerts[] = [
'type' => 'high_failure_rate',
'severity' => 'warning',
'message' => "Queue failure rate is {$failureRate}%",
'value' => $failureRate,
'threshold' => 10
];
}
}
// Check for large queue sizes
foreach ($this->queueManager->getAvailableConnections() as $connection) {
$stats = $this->queueManager->getStats(null, $connection);
if ($stats['pending'] > 1000) {
$alerts[] = [
'type' => 'large_queue_size',
'severity' => 'warning',
'message' => "Queue {$connection} has {$stats['pending']} pending jobs",
'value' => $stats['pending'],
'threshold' => 1000
];
}
}
// Check for stalled workers
$stalledWorkers = $this->getStalledWorkers();
if (!empty($stalledWorkers)) {
$alerts[] = [
'type' => 'stalled_workers',
'severity' => 'error',
'message' => count($stalledWorkers) . " workers appear to be stalled",
'workers' => $stalledWorkers
];
}
return $alerts;
}
}
Performance Analytics
class QueuePerformanceAnalyzer
{
public function analyzeQueuePerformance(string $queueName, array $timeRange): array
{
return [
'queue' => $queueName,
'analysis_period' => $timeRange,
'performance_metrics' => [
'average_throughput' => $this->calculateAverageThroughput($queueName, $timeRange),
'peak_throughput' => $this->calculatePeakThroughput($queueName, $timeRange),
'average_latency' => $this->calculateAverageLatency($queueName, $timeRange),
'p95_latency' => $this->calculatePercentileLatency($queueName, $timeRange, 95),
'p99_latency' => $this->calculatePercentileLatency($queueName, $timeRange, 99)
],
'bottlenecks' => $this->identifyBottlenecks($queueName, $timeRange),
'recommendations' => $this->generateRecommendations($queueName, $timeRange)
];
}
private function identifyBottlenecks(string $queueName, array $timeRange): array
{
$bottlenecks = [];
// Check for worker utilization issues
$workerUtilization = $this->getWorkerUtilization($queueName, $timeRange);
if ($workerUtilization > 90) {
$bottlenecks[] = [
'type' => 'worker_saturation',
'description' => 'Worker utilization is consistently above 90%',
'impact' => 'high',
'suggestion' => 'Consider increasing the number of workers'
];
}
// Check for memory pressure
$avgMemoryUsage = $this->getAverageMemoryUsage($queueName, $timeRange);
if ($avgMemoryUsage > 80) { // > 80% of limit
$bottlenecks[] = [
'type' => 'memory_pressure',
'description' => 'Workers are consistently using high memory',
'impact' => 'medium',
'suggestion' => 'Optimize job memory usage or increase worker memory limits'
];
}
// Check for database/Redis performance
$avgResponseTime = $this->getAverageStorageResponseTime($queueName, $timeRange);
if ($avgResponseTime > 100) { // > 100ms
$bottlenecks[] = [
'type' => 'storage_latency',
'description' => 'Storage operations are slower than optimal',
'impact' => 'medium',
'suggestion' => 'Consider optimizing database queries or Redis configuration'
];
}
return $bottlenecks;
}
}
CLI Commands
Task-Specific Commands
# Cache maintenance commands
php glueful cache:maintenance --operation=clearExpiredKeys --verbose
php glueful cache:maintenance --operation=optimizeCache
php glueful cache:maintenance --operation=fullCleanup --retention-days=30
# These commands can execute directly or queue the operation
php glueful cache:maintenance --operation=clearExpiredKeys --queue
Work and Manage Workers
# Start queue workers (multi-worker by default)
php glueful queue:work
# Process a specific queue
php glueful queue:work --queue=emails
# Tune worker options
php glueful queue:work --workers=4 --memory=512 --timeout=300 --max-jobs=100
# Show worker status (optionally auto-refresh)
php glueful queue:work status --json --watch=5
# Scale workers for a queue
php glueful queue:work scale --queue=emails --count=5
# Restart or stop workers
php glueful queue:work restart --all
php glueful queue:work stop --worker-id=worker_abc123
# Health summary
php glueful queue:work health
Auto-Scaling
# Start auto-scaling daemon
php glueful queue:autoscale
# Auto-scaler status and configuration
php glueful queue:autoscale status --detailed
php glueful queue:autoscale config --show
# View schedules and resources
php glueful queue:autoscale schedule --list
php glueful queue:autoscale resources --show
# Real-time streaming monitor
php glueful queue:autoscale stream
Scheduler
# Run due scheduled jobs once
php glueful queue:scheduler run --dry-run
# Watch worker mode
php glueful queue:scheduler work --interval=60
# List and manage scheduled jobs
php glueful queue:scheduler list
php glueful queue:scheduler status
Configuration
Main Queue Configuration
// config/queue.php
return [
'default' => env('QUEUE_CONNECTION', 'database'),
'connections' => [
'database' => [
'driver' => 'database',
'table' => env('QUEUE_TABLE', 'queue_jobs'),
'failed_table' => env('QUEUE_FAILED_TABLE', 'queue_failed_jobs'),
'retry_after' => env('QUEUE_RETRY_AFTER', 90),
'batch_size' => env('QUEUE_BATCH_SIZE', 100)
],
'redis' => [
'driver' => 'redis',
'host' => env('REDIS_HOST', '127.0.0.1'),
'port' => env('REDIS_PORT', 6379),
'password' => env('REDIS_PASSWORD', null),
'database' => env('REDIS_QUEUE_DB', 0),
'prefix' => env('REDIS_QUEUE_PREFIX', 'glueful:queue:'),
'retry_after' => env('QUEUE_RETRY_AFTER', 90),
'job_expiration' => env('QUEUE_JOB_EXPIRATION', 3600),
'persistent' => env('REDIS_PERSISTENT', false)
]
],
'worker_options' => [
'memory' => env('QUEUE_WORKER_MEMORY', 128), // MB
'timeout' => env('QUEUE_WORKER_TIMEOUT', 60), // seconds
'max_jobs' => env('QUEUE_WORKER_MAX_JOBS', 1000), // jobs per worker
'max_attempts' => env('QUEUE_MAX_ATTEMPTS', 3), // retry attempts
'sleep' => env('QUEUE_WORKER_SLEEP', 3), // seconds between checks
'rest' => env('QUEUE_WORKER_REST', 0) // seconds to rest after jobs
],
'auto_scaling' => [
'enabled' => env('QUEUE_AUTO_SCALE', false),
'limits' => [
'max_workers_per_queue' => env('QUEUE_MAX_WORKERS_PER_QUEUE', 10),
'max_total_workers' => env('QUEUE_MAX_TOTAL_WORKERS', 50)
],
'thresholds' => [
'scale_up_threshold' => env('QUEUE_SCALE_UP_THRESHOLD', 100),
'scale_down_threshold' => env('QUEUE_SCALE_DOWN_THRESHOLD', 10),
'scale_up_step' => env('QUEUE_SCALE_UP_STEP', 2),
'scale_down_step' => env('QUEUE_SCALE_DOWN_STEP', 1),
'cooldown_period' => env('QUEUE_COOLDOWN_PERIOD', 300)
]
],
'monitoring' => [
'enabled' => env('QUEUE_MONITORING', true),
'metrics_retention' => env('QUEUE_METRICS_RETENTION', 7), // days
'health_check_interval' => env('QUEUE_HEALTH_CHECK_INTERVAL', 60), // seconds
'alert_thresholds' => [
'failure_rate' => env('QUEUE_ALERT_FAILURE_RATE', 10), // percentage
'queue_size' => env('QUEUE_ALERT_QUEUE_SIZE', 1000), // number of jobs
'worker_memory' => env('QUEUE_ALERT_WORKER_MEMORY', 90) // percentage
]
]
];
Queue-Specific Configuration
// config/queue_settings.php
return [
'queues' => [
'emails' => [
'auto_scale' => true,
'min_workers' => 2,
'max_workers' => 8,
'scale_up_threshold' => 50,
'scale_down_threshold' => 5,
'worker_options' => [
'memory' => 256,
'timeout' => 120,
'max_jobs' => 500
]
],
'images' => [
'auto_scale' => true,
'min_workers' => 1,
'max_workers' => 4,
'scale_up_threshold' => 20,
'scale_down_threshold' => 2,
'worker_options' => [
'memory' => 512,
'timeout' => 300,
'max_jobs' => 50
]
],
'reports' => [
'auto_scale' => false,
'workers' => 1,
'worker_options' => [
'memory' => 1024,
'timeout' => 1800,
'max_jobs' => 10
]
]
]
];
Environment Variables
# Queue Configuration
QUEUE_CONNECTION=redis
QUEUE_TABLE=queue_jobs
QUEUE_FAILED_TABLE=queue_failed_jobs
QUEUE_RETRY_AFTER=90
QUEUE_BATCH_SIZE=100
# Redis Configuration
REDIS_HOST=127.0.0.1
REDIS_PORT=6379
REDIS_PASSWORD=
REDIS_QUEUE_DB=0
REDIS_QUEUE_PREFIX=glueful:queue:
REDIS_PERSISTENT=false
# Worker Configuration
QUEUE_WORKER_MEMORY=128
QUEUE_WORKER_TIMEOUT=60
QUEUE_WORKER_MAX_JOBS=1000
QUEUE_MAX_ATTEMPTS=3
QUEUE_WORKER_SLEEP=3
# Auto-Scaling
QUEUE_AUTO_SCALE=false
QUEUE_MAX_WORKERS_PER_QUEUE=10
QUEUE_SCALE_UP_THRESHOLD=100
QUEUE_SCALE_DOWN_THRESHOLD=10
QUEUE_COOLDOWN_PERIOD=300
# Monitoring
QUEUE_MONITORING=true
QUEUE_HEALTH_CHECK_INTERVAL=60
QUEUE_ALERT_FAILURE_RATE=10
QUEUE_ALERT_QUEUE_SIZE=1000
Production Deployment
High-Availability Setup
# Use a process manager like Supervisor for production
# /etc/supervisor/conf.d/glueful-workers.conf
[program:glueful-worker-emails]
command=php /path/to/glueful/glueful queue:work --queue=emails --memory=256 --timeout=120
process_name=%(program_name)s_%(process_num)02d
numprocs=4
autostart=true
autorestart=true
user=www-data
redirect_stderr=true
stdout_logfile=/var/log/glueful-queue-emails.log
[program:glueful-worker-images]
command=php /path/to/glueful/glueful queue:work --queue=images --memory=512 --timeout=300
process_name=%(program_name)s_%(process_num)02d
numprocs=2
autostart=true
autorestart=true
user=www-data
redirect_stderr=true
stdout_logfile=/var/log/glueful-queue-images.log
[program:glueful-autoscaler]
command=php /path/to/glueful/glueful queue:autoscale
autostart=true
autorestart=true
user=www-data
redirect_stderr=true
stdout_logfile=/var/log/glueful-autoscaler.log
Performance Optimization
// Production optimization tips:
// 1. Use Redis for high-throughput workloads
$config['default'] = 'redis';
// 2. Tune worker settings for your workload
$config['worker_options'] = [
'memory' => 512, // Increase for memory-intensive jobs
'timeout' => 120, // Increase for long-running jobs
'max_jobs' => 500, // Balance between efficiency and memory usage
'sleep' => 1 // Reduce for high-frequency workloads
];
// 3. Enable auto-scaling for variable workloads
$config['auto_scaling']['enabled'] = true;
// 4. Use connection pooling for database drivers
$config['connections']['database']['pool_size'] = 20;
// 5. Configure appropriate retry settings
$config['connections']['redis']['retry_after'] = 60; // Faster retry for Redis
// 6. Enable monitoring for production insights
$config['monitoring']['enabled'] = true;
Monitoring and Alerting Setup
// Set up monitoring webhook for queue alerts
class QueueAlertingService
{
public function checkAndAlert(): void
{
$monitor = new QueueMonitoringService();
$alerts = $monitor->getAlerts();
foreach ($alerts as $alert) {
match ($alert['severity']) {
'error' => $this->sendCriticalAlert($alert),
'warning' => $this->sendWarningAlert($alert),
default => $this->logAlert($alert)
};
}
}
private function sendCriticalAlert(array $alert): void
{
// Send to PagerDuty, Slack, email, etc.
$this->notificationService->send([
'channels' => ['pagerduty', 'slack'],
'message' => "CRITICAL: Queue Alert - {$alert['message']}",
'data' => $alert
]);
}
}
// Run alerting check every 5 minutes via cron
// */5 * * * * php /path/to/glueful/glueful queue:check-alerts
Summary
Glueful's queue system provides enterprise-grade background job processing with:
- Multi-Driver Support: Database and Redis drivers with ACID compliance and high performance
- Auto-Scaling: Intelligent worker scaling based on real-time metrics and load patterns
- Process Management: Advanced worker lifecycle management with health monitoring
- Batch Processing: Efficient bulk operations with progress tracking and completion callbacks
- Failed Job Handling: Comprehensive retry mechanisms and failure analysis
- Real-time Monitoring: Queue statistics, worker metrics, and performance analytics
- Production Ready: High-availability setup, performance optimization, and comprehensive alerting
The system is designed to scale from simple background tasks to high-throughput, mission-critical job processing while providing detailed insights and automatic optimization capabilities.