Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 64 additions & 1 deletion inc/Abilities/Engine/PipelineBatchScheduler.php
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,10 @@ public function processChunk( int $parent_job_id ): void {
$batch_data = get_transient( $transient_key );

if ( ! $batch_data ) {
$this->failParentIfStillProcessing( $parent_job_id, 'batch_state_missing' );
do_action(
'datamachine_log',
'warning',
'error',
'Pipeline batch: transient expired or missing',
array( 'parent_job_id' => $parent_job_id )
);
Expand Down Expand Up @@ -217,6 +218,26 @@ public function processChunk( int $parent_job_id ): void {
// All items scheduled — clean up transient.
delete_transient( $transient_key );

$child_count = $this->countChildren( $parent_job_id );
if ( $child_count < 1 ) {
$this->db_jobs->complete_job(
$parent_job_id,
JobStatus::failed( 'batch_no_children_scheduled' )->toString()
);

do_action(
'datamachine_log',
'error',
'Pipeline batch: no child jobs were scheduled; parent marked failed',
array(
'parent_job_id' => $parent_job_id,
'total' => $total,
)
);

return;
}

do_action(
'datamachine_log',
'info',
Expand Down Expand Up @@ -414,4 +435,46 @@ public static function onChildComplete( int $job_id, string $status ): void {
)
);
}

/**
* Mark parent as failed when batch state is missing.
*
* @param int $parent_job_id Parent job ID.
* @param string $reason Failure reason suffix.
*/
private function failParentIfStillProcessing( int $parent_job_id, string $reason ): void {
$job = $this->db_jobs->get_job( $parent_job_id );

if ( ! $job ) {
return;
}

$current_status = $job['status'] ?? '';
if ( JobStatus::PROCESSING !== $current_status ) {
return;
}

$this->db_jobs->complete_job( $parent_job_id, JobStatus::failed( $reason )->toString() );
}

/**
* Count child jobs for a parent job.
*
* @param int $parent_job_id Parent job ID.
* @return int
*/
private function countChildren( int $parent_job_id ): int {
global $wpdb;
$table = $wpdb->prefix . 'datamachine_jobs';

// phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching
$count = (int) $wpdb->get_var(
$wpdb->prepare(
"SELECT COUNT(*) FROM {$table} WHERE parent_job_id = %d",
$parent_job_id
)
);

return $count;
}
}
45 changes: 45 additions & 0 deletions tests/Unit/Engine/PipelineBatchSchedulerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,51 @@ public function test_process_chunk_respects_cancellation(): void {
$this->assertStringContainsString( 'batch cancelled', $parent_job['status'] );
}

public function test_process_chunk_marks_parent_failed_when_batch_state_is_missing(): void {
$parent_id = $this->create_parent_job();

$scheduler = new PipelineBatchScheduler();
$scheduler->processChunk( $parent_id );

$parent_job = $this->jobs_db->get_job( $parent_id );
$this->assertStringContainsString( 'failed', $parent_job['status'] );
$this->assertStringContainsString( 'batch_state_missing', $parent_job['status'] );
}

public function test_process_chunk_marks_parent_failed_when_zero_children_scheduled(): void {
$parent_id = $this->create_parent_job();
$engine = $this->make_engine_snapshot( $parent_id );

datamachine_merge_engine_data( $parent_id, array(
'batch' => true,
'batch_total' => 0,
'batch_scheduled' => 0,
'batch_chunk_size' => PipelineBatchScheduler::CHUNK_SIZE,
'next_flow_step_id' => 'step_empty',
'started_at' => current_time( 'mysql' ),
) );

set_transient(
'dm_pipeline_batch_' . $parent_id,
array(
'parent_job_id' => $parent_id,
'next_flow_step_id' => 'step_empty',
'engine_snapshot' => $engine,
'data_packets' => array(),
'total' => 0,
'offset' => 0,
),
4 * HOUR_IN_SECONDS
);

$scheduler = new PipelineBatchScheduler();
$scheduler->processChunk( $parent_id );

$parent_job = $this->jobs_db->get_job( $parent_id );
$this->assertStringContainsString( 'failed', $parent_job['status'] );
$this->assertStringContainsString( 'batch_no_children_scheduled', $parent_job['status'] );
}

public function test_child_labels_use_packet_titles(): void {
$parent_id = $this->create_parent_job();
$engine = $this->make_engine_snapshot( $parent_id );
Expand Down
Loading