From 2418242d01c05e40fe600ce949d4a1b30a2e38f7 Mon Sep 17 00:00:00 2001 From: mscherer Date: Wed, 18 Mar 2026 18:03:46 +0100 Subject: [PATCH 1/2] Add multi-connection support for multiple databases Add support for running queue workers and admin interface across multiple database connections (e.g., primary and secondary databases). Features: - Configure connections via Queue.connections array in config - Connection switcher dropdown in admin navigation - --connection option for queue run and queue job commands - Whitelist validation for security - Backwards compatible - only activates with 2+ connections Configuration example: $config['Queue']['connections'] = ['default', 'acme']; The first connection in the array is used as the default. Workers must be run separately for each connection to process its jobs. --- docs/sections/multi_connection.md | 167 ++++++++++++++++++ src/Command/JobCommand.php | 54 +++++- src/Command/RunCommand.php | 11 +- src/Controller/Admin/QueueAppController.php | 82 +++++++++ src/Controller/Admin/QueueController.php | 18 ++ .../Admin/QueueProcessesController.php | 17 ++ src/Controller/Admin/QueuedJobsController.php | 10 ++ src/Queue/Processor.php | 49 ++++- .../element/Queue/connection_switcher.php | 45 +++++ templates/element/Queue/mobile_nav.php | 28 +++ templates/layout/queue.php | 1 + 11 files changed, 476 insertions(+), 6 deletions(-) create mode 100644 docs/sections/multi_connection.md create mode 100644 templates/element/Queue/connection_switcher.php diff --git a/docs/sections/multi_connection.md b/docs/sections/multi_connection.md new file mode 100644 index 00000000..b6d4221e --- /dev/null +++ b/docs/sections/multi_connection.md @@ -0,0 +1,167 @@ +# Multi-Connection Support + +If your application uses multiple database connections (e.g., primary database and a secondary "acme" database), you can configure the Queue plugin to manage jobs across all of them. + +## Configuration + +Enable multi-connection mode by defining an array of connections in your config: + +```php +// In app_queue.php or your app config +$config['Queue']['connections'] = ['default', 'acme']; +``` + +The first connection in the array becomes the default when no specific connection is selected. + +**Important:** +- Multi-connection mode is only enabled when 2 or more connections are configured +- Only connections in this whitelist can be used (security feature) +- Without this config, the plugin operates in backwards-compatible single-connection mode + +## Admin Dashboard + +When multi-connection mode is enabled, a connection switcher dropdown appears in the admin navigation bar. + +- The dropdown shows all configured connections +- Select a connection to view jobs/workers for that specific database +- The current connection is highlighted +- The connection selection persists via URL query parameter (`?connection=acme`) + +All admin pages (Dashboard, Jobs, Workers, Processes) will show data for the selected connection. + +## CLI Usage + +### Running Workers + +Use the `--connection` option to start a worker for a specific connection: + +```bash +# Run worker for default connection +bin/cake queue run + +# Run worker for 'acme' connection +bin/cake queue run --connection acme +``` + +### Managing Jobs + +The `queue job` command also supports the `--connection` option: + +```bash +# View jobs on default connection +bin/cake queue job view 123 + +# View jobs on 'acme' connection +bin/cake queue job view 123 --connection acme + +# Reset failed jobs on 'acme' connection +bin/cake queue job reset all --connection acme +``` + +## Creating Jobs for Specific Connections + +Jobs are stored in the database they were created against. To create a job for a specific connection, you need to get the table with that connection: + +```php +use Cake\Datasource\ConnectionManager; + +// Get the QueuedJobs table +$queuedJobsTable = $this->fetchTable('Queue.QueuedJobs'); + +// For a non-default connection, set it explicitly +$connection = ConnectionManager::get('acme'); +$queuedJobsTable->setConnection($connection); + +// Create the job - it will be stored in the 'acme' database +$queuedJobsTable->createJob('MyTask', $data); +``` + +Or using the table locator directly: + +```php +use Cake\Datasource\ConnectionManager; +use Cake\ORM\TableRegistry; + +$queuedJobsTable = TableRegistry::getTableLocator()->get('Queue.QueuedJobs'); +$queuedJobsTable->setConnection(ConnectionManager::get('acme')); + +$queuedJobsTable->createJob('MyTask', ['key' => 'value']); +``` + +### Helper Method Pattern + +For cleaner code, consider creating a helper method or service: + +```php +// In a utility class or base controller +protected function getQueueTable(string $connection = 'default'): QueuedJobsTable { + $table = $this->fetchTable('Queue.QueuedJobs'); + + if ($connection !== 'default') { + $table->setConnection(ConnectionManager::get($connection)); + } + + return $table; +} + +// Usage +$this->getQueueTable('acme')->createJob('MyTask', $data); +``` + +## Running Workers in Production + +For production environments with multiple connections, you'll need separate workers for each connection: + +### Cron Setup + +```bash +# Worker for default connection +* * * * * cd /var/www/app && bin/cake queue run + +# Worker for acme connection +* * * * * cd /var/www/app && bin/cake queue run --connection acme +``` + +### Supervisor Configuration + +```ini +[program:queue-default] +command=bin/cake queue run +directory=/var/www/app +autostart=true +autorestart=true + +[program:queue-acme] +command=bin/cake queue run --connection acme +directory=/var/www/app +autostart=true +autorestart=true +``` + +## Database Setup + +Each connection needs the queue tables (queued_jobs, queue_processes). Run migrations for each connection: + +```bash +# Default connection +bin/cake migrations migrate --plugin Queue + +# Acme connection +bin/cake migrations migrate --plugin Queue -c acme +``` + +## Troubleshooting + +### Jobs not being picked up + +Ensure you're running a worker for each configured connection. A worker only processes jobs from its assigned connection. + +### Invalid connection error + +If you see "Invalid connection: xyz", verify the connection is: +1. Listed in `Queue.connections` config +2. Properly configured in your database config + +### Jobs created in wrong database + +When creating jobs, always explicitly set the connection if targeting a non-default database. The table remembers its connection until explicitly changed. diff --git a/src/Command/JobCommand.php b/src/Command/JobCommand.php index db29b1ec..ef07dbbf 100644 --- a/src/Command/JobCommand.php +++ b/src/Command/JobCommand.php @@ -8,6 +8,7 @@ use Cake\Console\ConsoleIo; use Cake\Console\ConsoleOptionParser; use Cake\Core\Configure; +use Cake\Datasource\ConnectionManager; use Cake\Error\Debugger; use Cake\I18n\DateTime; use Queue\Model\Entity\QueuedJob; @@ -33,11 +34,9 @@ public static function getDescription(): string { protected ?string $defaultTable = 'Queue.QueuedJobs'; /** - * @return void + * @var string */ - public function initialize(): void { - $this->QueuedJobs = $this->fetchTable('Queue.QueuedJobs'); - } + protected string $connection = 'default'; /** * @inheritDoc @@ -60,6 +59,10 @@ public function getOptionParser(): ConsoleOptionParser { 'help' => 'ID of job record, or "all" for all', 'required' => false, ]); + $parser->addOption('connection', [ + 'help' => 'Database connection to use (must be in Queue.connections whitelist if multi-connection mode is enabled)', + 'default' => null, + ]); $parser->setDescription( 'Display, rerun, reset or remove pending jobs.', @@ -75,6 +78,20 @@ public function getOptionParser(): ConsoleOptionParser { * @return int|null|void */ public function execute(Arguments $args, ConsoleIo $io) { + $connection = $args->getOption('connection'); + if (!is_string($connection)) { + $connection = null; + } + $this->connection = $this->resolveConnection($connection); + $this->QueuedJobs = $this->fetchTable('Queue.QueuedJobs'); + + // Set connection for multi-connection support + if ($this->connection !== 'default') { + /** @var \Cake\Database\Connection $connectionObject */ + $connectionObject = ConnectionManager::get($this->connection); + $this->QueuedJobs->setConnection($connectionObject); + } + $action = $args->getArgument('action'); if (!$action) { $io->out('Please use with [action] [ID] added.'); @@ -314,4 +331,33 @@ protected function clean(ConsoleIo $io): int { return static::CODE_SUCCESS; } + /** + * Resolve and validate the connection name. + * + * @param string|null $connection Requested connection + * + * @return string + */ + protected function resolveConnection(?string $connection): string { + $connections = Configure::read('Queue.connections'); + + // Single connection mode (backwards compatible) + if (!$connections || !is_array($connections) || count($connections) < 2) { + return $connection ?: 'default'; + } + + // Multi-connection mode + if ($connection === null) { + // Use first connection as default + return $connections[0]; + } + + // Validate against whitelist + if (!in_array($connection, $connections, true)) { + throw new \RuntimeException(__d('queue', 'Invalid connection: {0}. Must be one of: {1}', $connection, implode(', ', $connections))); + } + + return $connection; + } + } diff --git a/src/Command/RunCommand.php b/src/Command/RunCommand.php index 990bb764..157ce45a 100644 --- a/src/Command/RunCommand.php +++ b/src/Command/RunCommand.php @@ -56,6 +56,10 @@ public function getOptionParser(): ConsoleOptionParser { 'help' => 'Name of a queue config to use', 'short' => 'c', ]); + $parser->addOption('connection', [ + 'help' => 'Database connection to use (must be in Queue.connections whitelist if multi-connection mode is enabled)', + 'default' => null, + ]); $parser->addOption('logger', [ 'help' => 'Name of a configured logger', 'default' => 'stdout', @@ -114,7 +118,12 @@ protected function getLogger(Arguments $args): LoggerInterface { public function execute(Arguments $args, ConsoleIo $io): int { $logger = $this->getLogger($args); $io = new Io($io); - $processor = new Processor($io, $logger, $this->container); + + $connection = $args->getOption('connection'); + if (!is_string($connection)) { + $connection = null; + } + $processor = new Processor($io, $logger, $this->container, $connection); return $processor->run($args->getOptions()); } diff --git a/src/Controller/Admin/QueueAppController.php b/src/Controller/Admin/QueueAppController.php index 23416c58..ada3bd3b 100644 --- a/src/Controller/Admin/QueueAppController.php +++ b/src/Controller/Admin/QueueAppController.php @@ -6,6 +6,9 @@ use App\Controller\AppController; use Cake\Controller\Controller; use Cake\Core\Configure; +use Cake\Database\Connection; +use Cake\Datasource\ConnectionManager; +use Cake\Http\Exception\NotFoundException; /** * QueueAppController @@ -19,6 +22,13 @@ class QueueAppController extends AppController { use LoadHelperTrait; + /** + * Current active connection name. + * + * @var string + */ + protected string $activeConnection = 'default'; + /** * @return void */ @@ -42,6 +52,78 @@ public function initialize(): void { if ($layout !== false) { $this->viewBuilder()->setLayout($layout ?: 'Queue.queue'); } + + // Multi-connection support + $this->activeConnection = $this->resolveConnection(); + $this->set('queueConnections', $this->getConnections()); + $this->set('queueActiveConnection', $this->activeConnection); + } + + /** + * Get configured connections. + * + * Returns null if multi-connection mode is not enabled. + * + * @return array|null + */ + protected function getConnections(): ?array { + $connections = Configure::read('Queue.connections'); + if (!$connections || !is_array($connections) || count($connections) < 2) { + return null; + } + + return $connections; + } + + /** + * Resolve the active connection from request or config. + * + * @throws \Cake\Http\Exception\NotFoundException If connection is not in whitelist + * @return string + */ + protected function resolveConnection(): string { + $connections = Configure::read('Queue.connections'); + + // Single connection mode (backwards compatible) + if (!$connections || !is_array($connections) || count($connections) < 2) { + return 'default'; + } + + // Multi-connection mode + $requested = $this->request->getQuery('connection'); + + if ($requested === null) { + // Use first connection as default + return $connections[0]; + } + + // Validate against whitelist + if (!in_array($requested, $connections, true)) { + throw new NotFoundException(__d('queue', 'Invalid connection: {0}', $requested)); + } + + return $requested; + } + + /** + * Get the active connection name. + * + * @return string + */ + protected function getActiveConnection(): string { + return $this->activeConnection; + } + + /** + * Get the active connection object. + * + * @return \Cake\Database\Connection + */ + protected function getActiveConnectionObject(): Connection { + /** @var \Cake\Database\Connection $connection */ + $connection = ConnectionManager::get($this->activeConnection); + + return $connection; } } diff --git a/src/Controller/Admin/QueueController.php b/src/Controller/Admin/QueueController.php index d00a839f..c4f317aa 100644 --- a/src/Controller/Admin/QueueController.php +++ b/src/Controller/Admin/QueueController.php @@ -21,6 +21,18 @@ class QueueController extends QueueAppController { */ protected ?string $defaultTable = 'Queue.QueuedJobs'; + /** + * @return void + */ + public function initialize(): void { + parent::initialize(); + + // Set connection for multi-connection support + if ($this->activeConnection !== 'default') { + $this->QueuedJobs->setConnection($this->getActiveConnectionObject()); + } + } + /** * Admin center. * Manage queues from admin backend (without the need to open ssh console window). @@ -29,6 +41,9 @@ class QueueController extends QueueAppController { */ public function index() { $QueueProcesses = $this->fetchTable('Queue.QueueProcesses'); + if ($this->activeConnection !== 'default') { + $QueueProcesses->setConnection($this->getActiveConnectionObject()); + } $status = $QueueProcesses->status(); $current = $this->QueuedJobs->getLength(); @@ -162,6 +177,9 @@ public function removeJob(?int $id = null) { */ public function processes() { $QueueProcesses = $this->fetchTable('Queue.QueueProcesses'); + if ($this->activeConnection !== 'default') { + $QueueProcesses->setConnection($this->getActiveConnectionObject()); + } if ($this->request->is('post') && $this->request->getQuery('end')) { $pid = (string)$this->request->getQuery('end'); diff --git a/src/Controller/Admin/QueueProcessesController.php b/src/Controller/Admin/QueueProcessesController.php index 9c2d3cc5..6f803c18 100644 --- a/src/Controller/Admin/QueueProcessesController.php +++ b/src/Controller/Admin/QueueProcessesController.php @@ -14,6 +14,11 @@ */ class QueueProcessesController extends QueueAppController { + /** + * @var string|null + */ + protected ?string $defaultTable = 'Queue.QueueProcesses'; + /** * @var array */ @@ -23,6 +28,18 @@ class QueueProcessesController extends QueueAppController { ], ]; + /** + * @return void + */ + public function initialize(): void { + parent::initialize(); + + // Set connection for multi-connection support + if ($this->activeConnection !== 'default') { + $this->QueueProcesses->setConnection($this->getActiveConnectionObject()); + } + } + /** * Index method * diff --git a/src/Controller/Admin/QueuedJobsController.php b/src/Controller/Admin/QueuedJobsController.php index 717f461d..d0b341f1 100644 --- a/src/Controller/Admin/QueuedJobsController.php +++ b/src/Controller/Admin/QueuedJobsController.php @@ -18,6 +18,11 @@ */ class QueuedJobsController extends QueueAppController { + /** + * @var string|null + */ + protected ?string $defaultTable = 'Queue.QueuedJobs'; + /** * @var array */ @@ -33,6 +38,11 @@ class QueuedJobsController extends QueueAppController { public function initialize(): void { parent::initialize(); + // Set connection for multi-connection support + if ($this->activeConnection !== 'default') { + $this->QueuedJobs->setConnection($this->getActiveConnectionObject()); + } + $this->enableSearch(); } diff --git a/src/Queue/Processor.php b/src/Queue/Processor.php index f4e84def..ea022074 100644 --- a/src/Queue/Processor.php +++ b/src/Queue/Processor.php @@ -6,6 +6,7 @@ use Cake\Console\CommandInterface; use Cake\Core\Configure; use Cake\Core\ContainerInterface; +use Cake\Datasource\ConnectionManager; use Cake\Datasource\Exception\RecordNotFoundException; use Cake\Event\Event; use Cake\Event\EventManager; @@ -91,19 +92,65 @@ class Processor { */ protected ?bool $captureOutput = null; + /** + * @var string + */ + protected string $connection = 'default'; + /** * @param \Queue\Console\Io $io * @param \Psr\Log\LoggerInterface $logger * @param \Cake\Core\ContainerInterface|null $container + * @param string|null $connection Database connection to use */ - public function __construct(Io $io, LoggerInterface $logger, ?ContainerInterface $container = null) { + public function __construct(Io $io, LoggerInterface $logger, ?ContainerInterface $container = null, ?string $connection = null) { $this->io = $io; $this->logger = $logger; $this->container = $container; + $this->connection = $this->resolveConnection($connection); $tableLocator = $this->getTableLocator(); $this->QueuedJobs = $tableLocator->get('Queue.QueuedJobs'); $this->QueueProcesses = $tableLocator->get('Queue.QueueProcesses'); + + // Set connection for multi-connection support + if ($this->connection !== 'default') { + /** @var \Cake\Database\Connection $connectionObject */ + $connectionObject = ConnectionManager::get($this->connection); + $this->QueuedJobs->setConnection($connectionObject); + $this->QueueProcesses->setConnection($connectionObject); + } + } + + /** + * Resolve and validate the connection name. + * + * @param string|null $connection Requested connection + * + * @throws \RuntimeException If connection is not in whitelist + * + * @return string + */ + protected function resolveConnection(?string $connection): string { + $connections = Configure::read('Queue.connections'); + + // Single connection mode (backwards compatible) + if (!$connections || !is_array($connections) || count($connections) < 2) { + return $connection ?: 'default'; + } + + // Multi-connection mode + if ($connection === null) { + // Use first connection as default + return $connections[0]; + } + + // Validate against whitelist + if (!in_array($connection, $connections, true)) { + throw new RuntimeException(__d('queue', 'Invalid connection: {0}. Must be one of: {1}', $connection, implode(', ', $connections))); + } + + return $connection; } /** diff --git a/templates/element/Queue/connection_switcher.php b/templates/element/Queue/connection_switcher.php new file mode 100644 index 00000000..c10f87a3 --- /dev/null +++ b/templates/element/Queue/connection_switcher.php @@ -0,0 +1,45 @@ +|null $queueConnections Available connections (null if single connection mode) + * @var string $queueActiveConnection Currently active connection + */ + +// Don't render if multi-connection mode is not enabled +if (empty($queueConnections)) { + return; +} +?> + diff --git a/templates/element/Queue/mobile_nav.php b/templates/element/Queue/mobile_nav.php index 3dd6f24a..01a69749 100644 --- a/templates/element/Queue/mobile_nav.php +++ b/templates/element/Queue/mobile_nav.php @@ -3,6 +3,8 @@ * Queue Admin Mobile Navigation * * @var \Cake\View\View $this + * @var array|null $queueConnections Available connections (null if single connection mode) + * @var string $queueActiveConnection Currently active connection */ $controller = $this->getRequest()->getParam('controller'); @@ -20,6 +22,32 @@ }; ?>
+ + +
+
+ +
+ +
diff --git a/templates/layout/queue.php b/templates/layout/queue.php index 7878d215..56d68f45 100644 --- a/templates/layout/queue.php +++ b/templates/layout/queue.php @@ -362,6 +362,7 @@ + element('Queue.Queue/connection_switcher') ?>
  • diff --git a/templates/element/Queue/mobile_nav.php b/templates/element/Queue/mobile_nav.php index 01a69749..8b076587 100644 --- a/templates/element/Queue/mobile_nav.php +++ b/templates/element/Queue/mobile_nav.php @@ -30,11 +30,13 @@ Url->build([ - '?' => array_merge( - $this->getRequest()->getQueryParams(), - ['connection' => $connection] - ), + 'plugin' => 'Queue', + 'prefix' => 'Admin', + 'controller' => 'Queue', + 'action' => 'index', + '?' => ['connection' => $connection], ]); ?>