Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 110 additions & 28 deletions CRM/Paymentprocessingcore/BAO/PaymentWebhook.php
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ public static function findByEventId($eventId) {
* being processed by another worker, callers should skip it.
*
* Stuck webhooks (in 'processing' too long) are handled separately by
* resetStuckWebhooks() which resets their status to 'new', after which
* this method will return FALSE.
* the stuck recovery process in WebhookQueueRunnerService which resets
* their status to 'new', after which this method will return FALSE.
*
* @param string $eventId Processor event ID
* @return bool TRUE if event is processed or being processed, FALSE otherwise
Expand Down Expand Up @@ -368,65 +368,147 @@ public static function batchUpdateStatus(array $ids, string $status): void {
}

/**
* Maximum number of stuck webhooks to reset per run.
* Maximum number of stuck webhooks to process per run.
*
* Prevents unbounded loop when many webhooks are stuck.
*/
public const MAX_STUCK_RESET_LIMIT = 100;

/**
* Reset stuck webhooks that have been processing for too long.
* Default timeout for stuck webhook detection (1 day in minutes).
*/
public const DEFAULT_STUCK_TIMEOUT_MINUTES = 1440;

/**
* Find webhooks stuck in 'processing' state beyond the timeout.
*
* Webhooks stuck in 'processing' for more than 30 minutes are reset to 'new'.
* This handles cases where the processor crashed mid-processing.
* Returns webhook records for the service layer to decide
* whether to retry or mark as permanent error.
*
* Limited to MAX_STUCK_RESET_LIMIT per run to prevent unbounded loops.
*
* @param int $timeoutMinutes Minutes after which to consider a webhook stuck
*
* @return int Number of webhooks reset
* @return array List of stuck webhook records with id, attempts, processor_type
* @phpstan-return array<array{id: int, attempts: int, processor_type: string}>
*/
public static function resetStuckWebhooks(int $timeoutMinutes = 30): int {
public static function getStuckWebhooks(int $timeoutMinutes = self::DEFAULT_STUCK_TIMEOUT_MINUTES): array {
$cutoff = date('Y-m-d H:i:s', strtotime("-{$timeoutMinutes} minutes"));

$stuckWebhooks = \Civi\Api4\PaymentWebhook::get(FALSE)
->addSelect('id')
$webhooks = [];
foreach (\Civi\Api4\PaymentWebhook::get(FALSE)
->addSelect('id', 'attempts', 'processor_type')
->addWhere('status', '=', 'processing')
->addWhere('processing_started_at', 'IS NOT NULL')
->addWhere('processing_started_at', '<', $cutoff)
->setLimit(self::MAX_STUCK_RESET_LIMIT)
->execute();
->execute() as $webhook) {
$webhooks[] = $webhook;
}
return $webhooks;
}

$webhookIds = array_column($stuckWebhooks->getArrayCopy(), 'id');
/**
* Find orphaned webhooks in 'new' status from previous recovery.
*
* When stuck recovery resets a webhook to 'new' but the process crashes
* before re-queuing, the webhook is left in 'new' with attempts > 0.
* This method finds those orphaned webhooks so they can be re-queued.
*
* Distinguishes orphans from retry-flow webhooks by requiring:
* - next_retry_at IS NULL (retry-flow sets next_retry_at)
* - processing_started_at IS NOT NULL (stuck recovery resets from
* 'processing' which always has processing_started_at set)
*
* @param string $processorType Processor type filter
* @param int $limit Maximum number of webhooks to return
*
* @return array List of orphaned webhook records
* @phpstan-return array<array{id: int, processor_type: string}>
*/
public static function getOrphanedNewWebhooks(string $processorType, int $limit = 50): array {
$webhooks = [];
foreach (\Civi\Api4\PaymentWebhook::get(FALSE)
->addSelect('id', 'processor_type')
->addWhere('processor_type', '=', $processorType)
->addWhere('status', '=', 'new')
->addWhere('attempts', '>', 0)
->addWhere('next_retry_at', 'IS NULL')
->addWhere('processing_started_at', 'IS NOT NULL')
->setLimit($limit)
->execute() as $webhook) {
$webhooks[] = $webhook;
}
return $webhooks;
}

if (empty($webhookIds)) {
/**
* Batch reset stuck webhooks to 'new' with incremented attempt counts.
*
* Uses a single SQL UPDATE for performance. Only updates webhooks
* that are still in 'processing' status (optimistic locking).
*
* @param array $ids Webhook IDs to reset
* @phpstan-param array<int> $ids
* @param string $errorLog Error message to record
*
* @return int Number of rows actually updated
*/
public static function batchResetStuckToNew(array $ids, string $errorLog): int {
if (empty($ids)) {
return 0;
}

// Batch update all stuck webhooks at once (avoids N+1)
$idList = implode(',', array_map('intval', $webhookIds));
$errorLog = 'Reset from stuck processing state after ' . $timeoutMinutes . ' minutes';
$idList = implode(',', array_map('intval', $ids));

$sql = "UPDATE civicrm_payment_webhook
SET status = 'new', error_log = %1
WHERE id IN ({$idList})";
SET status = 'new',
attempts = attempts + 1,
error_log = %1
WHERE id IN ({$idList})
AND status = 'processing'";

\CRM_Core_DAO::executeQuery($sql, [
$dao = \CRM_Core_DAO::executeQuery($sql, [
1 => [$errorLog, 'String'],
]);

$count = count($webhookIds);
return $dao->affectedRows();
}

if ($count > 0) {
\Civi::log()->warning('Reset stuck webhooks', [
'count' => $count,
'timeout_minutes' => $timeoutMinutes,
'webhook_ids' => $webhookIds,
'limit_applied' => $count >= self::MAX_STUCK_RESET_LIMIT,
]);
/**
* Batch mark stuck webhooks as permanent error with incremented attempts.
*
* Uses a single SQL UPDATE for performance. Only updates webhooks
* that are still in 'processing' status (optimistic locking).
*
* @param array $ids Webhook IDs to mark as permanent error
* @phpstan-param array<int> $ids
* @param string $errorLog Error message to record
*
* @return int Number of rows actually updated
*/
public static function batchMarkStuckAsPermanentError(array $ids, string $errorLog): int {
if (empty($ids)) {
return 0;
}

return $count;
$idList = implode(',', array_map('intval', $ids));

$sql = "UPDATE civicrm_payment_webhook
SET status = 'permanent_error',
result = 'error',
attempts = attempts + 1,
error_log = %1,
processed_at = %2
WHERE id IN ({$idList})
AND status = 'processing'";

$dao = \CRM_Core_DAO::executeQuery($sql, [
1 => [$errorLog, 'String'],
2 => [date('Y-m-d H:i:s'), 'String'],
]);

return $dao->affectedRows();
}

}
148 changes: 139 additions & 9 deletions Civi/Paymentprocessingcore/Service/WebhookQueueRunnerService.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ class WebhookQueueRunnerService {
*/
private WebhookHandlerRegistry $registry;

/**
* Whether stuck recovery has already run in this invocation.
*
* @var bool
*/
private bool $stuckRecoveryDone = FALSE;

/**
* WebhookQueueRunnerService constructor.
*
Expand Down Expand Up @@ -62,21 +69,18 @@ public function __construct(WebhookHandlerRegistry $registry) {
public function runAllQueues(int $batchSize = self::DEFAULT_BATCH_SIZE): array {
$processorTypes = $this->registry->getRegisteredProcessorTypes();

// First, reset any stuck webhooks across all processors
$stuckReset = \CRM_Paymentprocessingcore_BAO_PaymentWebhook::resetStuckWebhooks();
// Recover stuck webhooks (crashed workers) across all processors
$stuckResult = $this->recoverStuckWebhooks();

$results = [
'_meta' => [
'stuck_webhooks_reset' => $stuckReset,
'stuck_webhooks_reset' => $stuckResult['reset_count'],
'stuck_webhooks_permanent_error' => $stuckResult['permanent_error_count'],
'batch_size' => $batchSize,
],
];

foreach ($processorTypes as $processorType) {
// First, re-queue any webhooks ready for retry
$this->requeueRetryableWebhooks($processorType);

// Then process the queue
$results[$processorType] = $this->runQueue($processorType, $batchSize);
}

Expand All @@ -99,12 +103,15 @@ public function runAllQueues(int $batchSize = self::DEFAULT_BATCH_SIZE): array {
* items_failed, items_remaining
*/
public function runQueue(string $processorType, int $batchSize = self::DEFAULT_BATCH_SIZE): array {
// Reset any stuck webhooks before processing
\CRM_Paymentprocessingcore_BAO_PaymentWebhook::resetStuckWebhooks();
// Recover stuck webhooks once per invocation (skipped if already done)
$this->recoverStuckWebhooks();

// Re-queue any webhooks ready for retry
$this->requeueRetryableWebhooks($processorType);

// Re-queue orphaned 'new' webhooks from previous stuck recovery
$this->requeueOrphanedNewWebhooks($processorType);

/** @var \Civi\Paymentprocessingcore\Service\WebhookQueueService $queueService */
$queueService = \Civi::service('paymentprocessingcore.webhook_queue');
$queue = $queueService->getQueue($processorType);
Expand Down Expand Up @@ -198,6 +205,91 @@ public function runQueue(string $processorType, int $batchSize = self::DEFAULT_B
];
}

/**
* Recover webhooks stuck in 'processing' state due to worker crashes.
*
* Finds webhooks stuck for longer than the timeout (default: 1 day),
* increments their attempt count, and decides:
* - attempts < MAX_RETRY_ATTEMPTS: reset to 'new' and re-queue
* - attempts >= MAX_RETRY_ATTEMPTS: mark as permanent_error
*
* Uses batch SQL updates for performance (1 SELECT + at most 2 UPDATEs).
*
* @return array Summary with 'reset_count' and 'permanent_error_count'
* @phpstan-return array{reset_count: int, permanent_error_count: int}
*/
private function recoverStuckWebhooks(): array {
if ($this->stuckRecoveryDone) {
return ['reset_count' => 0, 'permanent_error_count' => 0];
}
$this->stuckRecoveryDone = TRUE;

$stuckWebhooks = \CRM_Paymentprocessingcore_BAO_PaymentWebhook::getStuckWebhooks();

if (empty($stuckWebhooks)) {
return ['reset_count' => 0, 'permanent_error_count' => 0];
}

$maxAttempts = \CRM_Paymentprocessingcore_BAO_PaymentWebhook::MAX_RETRY_ATTEMPTS;
$errorLog = sprintf(
'Reset from stuck processing state after %d minutes',
\CRM_Paymentprocessingcore_BAO_PaymentWebhook::DEFAULT_STUCK_TIMEOUT_MINUTES
);

// Partition into retryable vs exceeded based on what attempt count will be
$retryableIds = [];
$exceededIds = [];
$retryableWebhooks = [];

foreach ($stuckWebhooks as $webhook) {
$nextAttempts = $webhook['attempts'] + 1;

if ($nextAttempts >= $maxAttempts) {
$exceededIds[] = (int) $webhook['id'];
}
else {
$retryableIds[] = (int) $webhook['id'];
$retryableWebhooks[] = $webhook;
}
}

// Batch update: retryable → 'new' with attempts+1
$resetCount = \CRM_Paymentprocessingcore_BAO_PaymentWebhook::batchResetStuckToNew(
$retryableIds,
$errorLog
);

// Batch update: exceeded → 'permanent_error' with attempts+1
$permanentErrorCount = \CRM_Paymentprocessingcore_BAO_PaymentWebhook::batchMarkStuckAsPermanentError(
$exceededIds,
sprintf('Max retries exceeded after stuck recovery. %s', $errorLog)
);

// Re-queue recovered webhooks
if (!empty($retryableWebhooks)) {
/** @var \Civi\Paymentprocessingcore\Service\WebhookQueueService $queueService */
$queueService = \Civi::service('paymentprocessingcore.webhook_queue');

foreach ($retryableWebhooks as $webhook) {
$queueService->addTask($webhook['processor_type'], (int) $webhook['id'], []);
}
}

if ($resetCount > 0 || $permanentErrorCount > 0) {
\Civi::log()->warning('Recovered stuck webhooks', [
'reset_to_new' => $resetCount,
'permanent_error' => $permanentErrorCount,
'reset_ids' => $retryableIds,
'permanent_error_ids' => $exceededIds,
]);
}

return [
'reset_count' => $resetCount,
'permanent_error_count' => $permanentErrorCount,
];
}

/**
* Re-queue webhooks that are ready for retry.
*
Expand Down Expand Up @@ -250,6 +342,44 @@ private function requeueRetryableWebhooks(string $processorType): int {
return $count;
}

/**
* Re-queue orphaned 'new' webhooks from previous stuck recovery.
*
* When stuck recovery resets a webhook to 'new' but the process crashes
* before re-queuing, webhooks are left orphaned. This method finds
* those webhooks (status='new' with attempts > 0) and adds them
* back to the queue.
*
* @param string $processorType The processor type
*
* @return int Number of webhooks re-queued
*/
private function requeueOrphanedNewWebhooks(string $processorType): int {
$webhooks = \CRM_Paymentprocessingcore_BAO_PaymentWebhook::getOrphanedNewWebhooks(
$processorType
);

if (empty($webhooks)) {
return 0;
}

/** @var \Civi\Paymentprocessingcore\Service\WebhookQueueService $queueService */
$queueService = \Civi::service('paymentprocessingcore.webhook_queue');

foreach ($webhooks as $webhook) {
$queueService->addTask($processorType, (int) $webhook['id'], []);
}

$count = count($webhooks);

\Civi::log()->info('Re-queued orphaned new webhooks', [
'processor_type' => $processorType,
'count' => $count,
]);

return $count;
}

/**
* Process a single webhook task.
*
Expand Down
2 changes: 1 addition & 1 deletion api/v3/PaymentWebhookRunner/Run.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* Features:
* - Batch size limiting to prevent job timeouts (default: 250 per processor)
* - Automatic retry of failed webhooks with exponential backoff
* - Recovery of stuck webhooks (processing > 30 minutes)
* - Recovery of stuck webhooks (processing > 1 day) with attempt tracking
*
* @param array $params
* API parameters:
Expand Down
Loading