From b782bfa538246af3fcfa9fbc06c1b8351c37bfff Mon Sep 17 00:00:00 2001 From: Eric Peterson Date: Thu, 12 Feb 2026 10:26:37 -0700 Subject: [PATCH 1/9] fix(LogFailedJobsInterceptor): Insert nulls when no exception information --- interceptors/LogFailedJobsInterceptor.cfc | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/interceptors/LogFailedJobsInterceptor.cfc b/interceptors/LogFailedJobsInterceptor.cfc index 3cbcb9c..26f1057 100644 --- a/interceptors/LogFailedJobsInterceptor.cfc +++ b/interceptors/LogFailedJobsInterceptor.cfc @@ -44,7 +44,12 @@ component { "null" : ( arguments.data.exception.type ?: "" ) == "", "nulls" : ( arguments.data.exception.type ?: "" ) == "" }, - "exceptionMessage" : arguments.data.exception.message, + "exceptionMessage" : { + "value": arguments.data.exception.message ?: "", + "cfsqltype" : "CF_SQL_VARCHAR", + "null" : ( arguments.data.exception.message ?: "" ) == "", + "nulls" : ( arguments.data.exception.message ?: "" ) == "" + }, "exceptionDetail" : { "value" : arguments.data.exception.detail ?: "", "cfsqltype" : "CF_SQL_VARCHAR", @@ -57,8 +62,13 @@ component { "null" : ( arguments.data.exception.extendedInfo ?: "" ) == "", "nulls" : ( arguments.data.exception.extendedInfo ?: "" ) == "" }, - "exceptionStackTrace" : arguments.data.exception.stackTrace, - "exception" : serializeJSON( arguments.data.exception ), + "exceptionStackTrace" : { + "value": arguments.data.exception.stackTrace ?: "", + "cfsqltype" : "CF_SQL_VARCHAR", + "null" : ( arguments.data.exception.stackTrace ?: "" ) == "", + "nulls" : ( arguments.data.exception.stackTrace ?: "" ) == "" + }, + "exception" : isNull( arguments.data.exception ) ? javacast( "null", "" ) : serializeJSON( arguments.data.exception ), "failedDate" : { "value": getCurrentUnixTimestamp(), "cfsqltype": "CF_SQL_BIGINT" }, "originalId" : { "value": arguments.data.job.getId(), "cfsqltype": "CF_SQL_VARCHAR" } }; From f61250da9e44b5e216e714d9a0ccd461b433456a Mon Sep 17 00:00:00 2001 From: Eric Peterson Date: Thu, 12 Feb 2026 10:27:11 -0700 Subject: [PATCH 2/9] fix(ColdBoxAsyncProvider): Compose the `marshalJob` future with the delay future --- models/Providers/ColdBoxAsyncProvider.cfc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/models/Providers/ColdBoxAsyncProvider.cfc b/models/Providers/ColdBoxAsyncProvider.cfc index 2e82802..e511ea0 100644 --- a/models/Providers/ColdBoxAsyncProvider.cfc +++ b/models/Providers/ColdBoxAsyncProvider.cfc @@ -23,7 +23,7 @@ component accessors="true" extends="AbstractQueueProvider" { sleep( delay * 1000 ); return true; }, workerPool.getExecutor() ) - .then( function() { + .thenCompose( function() { job.setId( createUUID() ); if ( !isNull( arguments.currentAttempt ) ) { job.setCurrentAttempt( attempts ); From a48980234c82fe3f4785ebd362aa73ca3f4269dc Mon Sep 17 00:00:00 2001 From: Eric Peterson Date: Thu, 12 Feb 2026 10:55:13 -0700 Subject: [PATCH 3/9] test: reproduce missing batch finally dispatch on terminal failure --- tests/specs/integration/BatchFinallySpec.cfc | 62 ++++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 tests/specs/integration/BatchFinallySpec.cfc diff --git a/tests/specs/integration/BatchFinallySpec.cfc b/tests/specs/integration/BatchFinallySpec.cfc new file mode 100644 index 0000000..4d93093 --- /dev/null +++ b/tests/specs/integration/BatchFinallySpec.cfc @@ -0,0 +1,62 @@ +component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" { + + function run() { + describe( "batch finally dispatching", function() { + beforeEach( function() { + structDelete( application, "jobBeforeCalled" ); + structDelete( application, "jobAfterCalled" ); + + param application.jobBeforeCalled = false; + param application.jobAfterCalled = false; + } ); + + it( "dispatches the finally job when the last job fails", function() { + var cbq = getWireBox().getInstance( "@cbq" ); + registerSyncConnectionAndWorkerPool(); + + var successJob = cbq.job( "SendWelcomeEmailJob" ); + var failingJob = cbq.job( + job = "ReleaseTestJob", + maxAttempts = 1 + ); + + var pendingBatch = cbq + .batch( [ successJob, failingJob ] ) + .onConnection( "syncBatch" ) + .onComplete( + job = "BeforeAndAfterJob", + connection = "syncBatch" + ); + + try { + pendingBatch.dispatch(); + } catch ( any e ) { + // The sync provider rethrows the terminal failure. + } + + expect( application.jobAfterCalled ) + .toBeTrue( "The `finally` job should dispatch even when the last job fails." ); + } ); + } ); + } + + private void function registerSyncConnectionAndWorkerPool() { + var config = getWireBox().getInstance( "Config@cbq" ); + + if ( !config.getConnections().keyExists( "syncBatch" ) ) { + config.registerConnection( + name = "syncBatch", + provider = getWireBox().getInstance( "SyncProvider@cbq" ).setProperties( {} ) + ); + } + + if ( !config.getWorkerPools().keyExists( "syncBatch" ) ) { + config.registerWorkerPool( + name = "syncBatch", + connectionName = "syncBatch", + maxAttempts = 1 + ); + } + } + +} From 9038762a97121c9989ff4acc391eae75df820915 Mon Sep 17 00:00:00 2001 From: Eric Peterson Date: Thu, 12 Feb 2026 10:59:22 -0700 Subject: [PATCH 4/9] fix: complete batches correctly when jobs end in failure --- models/Jobs/Batch.cfc | 1 + models/Jobs/DBBatchRepository.cfc | 49 ++++++++++++------- ...dd_successfulJobs_to_cbq_batches_table.cfc | 15 ++++++ tests/specs/integration/BatchFinallySpec.cfc | 22 ++++++++- 4 files changed, 69 insertions(+), 18 deletions(-) create mode 100644 resources/database/migrations/2000_01_01_000008_add_successfulJobs_to_cbq_batches_table.cfc diff --git a/models/Jobs/Batch.cfc b/models/Jobs/Batch.cfc index e12d82f..b0ec34d 100644 --- a/models/Jobs/Batch.cfc +++ b/models/Jobs/Batch.cfc @@ -10,6 +10,7 @@ component accessors="true" { property name="totalJobs" type="numeric"; property name="pendingJobs" type="numeric"; property name="failedJobs" type="numeric"; + property name="successfulJobs" type="numeric"; property name="failedJobIds" type="array"; property name="options" type="struct"; property name="createdDate" type="numeric"; diff --git a/models/Jobs/DBBatchRepository.cfc b/models/Jobs/DBBatchRepository.cfc index 98f5282..f112c6c 100644 --- a/models/Jobs/DBBatchRepository.cfc +++ b/models/Jobs/DBBatchRepository.cfc @@ -11,7 +11,11 @@ component singleton accessors="true" { property name="batchTableName" default="cbq_batches"; public DBBatchRepository function init() { - variables.timeBasedUUIDGenerator = createObject( "java", "com.fasterxml.uuid.Generators" ).timeBasedGenerator(); + try { + variables.timeBasedUUIDGenerator = createObject( "java", "com.fasterxml.uuid.Generators" ).timeBasedGenerator(); + } catch ( any e ) { + variables.timeBasedUUIDGenerator = javacast( "null", "" ); + } variables.defaultQueryOptions = {}; return this; } @@ -51,7 +55,9 @@ component singleton accessors="true" { } public Batch function store( required PendingBatch batch ) { - var id = variables.timeBasedUUIDGenerator.generate().toString(); + var id = isNull( variables.timeBasedUUIDGenerator ) ? createUUID() : variables.timeBasedUUIDGenerator + .generate() + .toString(); qb.table( variables.batchTableName ) .insert( @@ -102,23 +108,29 @@ component singleton accessors="true" { throw( type = "cbq.BatchNotFound", message = "No batch found for id [#arguments.batchId#]" ); } + var updatedValues = { + "pendingJobs" : data.pendingJobs - 1, + "failedJobs" : data.failedJobs, + "failedJobIds" : serializeJSON( + deserializeJSON( data.failedJobIds ).filter( ( failedJobId ) => failedJobId != jobId ) + ) + }; + + if ( data.keyExists( "successfulJobs" ) ) { + updatedValues[ "successfulJobs" ] = data.successfulJobs + 1; + } + qb.table( variables.batchTableName ) .where( "id", arguments.batchId ) .update( - values = { - "pendingJobs" : data.pendingJobs - 1, - "failedJobs" : data.failedJobs, - "failedJobIds" : serializeJSON( - deserializeJSON( data.failedJobIds ).filter( ( failedJobId ) => failedJobId != jobId ) - ) - }, + values = updatedValues, options = variables.defaultQueryOptions ); return { "pendingJobs" : data.pendingJobs - 1, "failedJobs" : data.failedJobs, - "allJobsHaveRanExactlyOnce" : ( data.pendingJobs - 1 ) - data.failedJobs == 0 + "allJobsHaveRanExactlyOnce" : ( data.pendingJobs - 1 ) == 0 }; } } @@ -135,21 +147,23 @@ component singleton accessors="true" { throw( type = "cbq.BatchNotFound", message = "No batch found for id [#arguments.batchId#]" ); } + var updatedValues = { + "pendingJobs" : data.pendingJobs - 1, + "failedJobs" : data.failedJobs + 1, + "failedJobIds" : serializeJSON( deserializeJSON( data.failedJobIds ).append( arguments.jobId ) ) + }; + qb.table( variables.batchTableName ) .where( "id", arguments.batchId ) .update( - values = { - "pendingJobs" : data.pendingJobs, - "failedJobs" : data.failedJobs + 1, - "failedJobIds" : serializeJSON( deserializeJSON( data.failedJobIds ).append( arguments.jobId ) ) - }, + values = updatedValues, options = variables.defaultQueryOptions ); return { - "pendingJobs" : data.pendingJobs, + "pendingJobs" : data.pendingJobs - 1, "failedJobs" : data.failedJobs + 1, - "allJobsHaveRanExactlyOnce" : data.pendingJobs - ( data.failedJobs + 1 ) == 0 + "allJobsHaveRanExactlyOnce" : ( data.pendingJobs - 1 ) == 0 }; } } @@ -191,6 +205,7 @@ component singleton accessors="true" { batch.setTotalJobs( data.totalJobs ); batch.setPendingJobs( data.pendingJobs ); batch.setFailedJobs( data.failedJobs ); + batch.setSuccessfulJobs( data.keyExists( "successfulJobs" ) ? data.successfulJobs : 0 ); batch.setFailedJobIds( deserializeJSON( data.failedJobIds ) ); batch.setOptions( deserializeJSON( data.options ) ); batch.setCreatedDate( data.createdDate ); diff --git a/resources/database/migrations/2000_01_01_000008_add_successfulJobs_to_cbq_batches_table.cfc b/resources/database/migrations/2000_01_01_000008_add_successfulJobs_to_cbq_batches_table.cfc new file mode 100644 index 0000000..7f3cef7 --- /dev/null +++ b/resources/database/migrations/2000_01_01_000008_add_successfulJobs_to_cbq_batches_table.cfc @@ -0,0 +1,15 @@ +component { + + function up( schema ) { + schema.alter( "cbq_batches", ( t ) => { + t.unsignedInteger( "successfulJobs" ).default( 0 ); + } ); + } + + function down( schema ) { + schema.alter( "cbq_batches", ( t ) => { + t.dropColumn( "successfulJobs" ); + } ); + } + +} diff --git a/tests/specs/integration/BatchFinallySpec.cfc b/tests/specs/integration/BatchFinallySpec.cfc index 4d93093..7612cee 100644 --- a/tests/specs/integration/BatchFinallySpec.cfc +++ b/tests/specs/integration/BatchFinallySpec.cfc @@ -27,16 +27,36 @@ component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" { job = "BeforeAndAfterJob", connection = "syncBatch" ); + pendingBatch.setName( "sync-failing-finally" ); try { pendingBatch.dispatch(); - } catch ( any e ) { + } catch ( cbq.MaxAttemptsReached e ) { // The sync provider rethrows the terminal failure. } expect( application.jobAfterCalled ) .toBeTrue( "The `finally` job should dispatch even when the last job fails." ); } ); + + it( "dispatches the finally job when all jobs succeed", function() { + var cbq = getWireBox().getInstance( "@cbq" ); + registerSyncConnectionAndWorkerPool(); + + var pendingBatch = cbq + .batch( [ cbq.job( "SendWelcomeEmailJob" ), cbq.job( "SendWelcomeEmailJob" ) ] ) + .onConnection( "syncBatch" ) + .onComplete( + job = "BeforeAndAfterJob", + connection = "syncBatch" + ); + pendingBatch.setName( "sync-success-finally" ); + + pendingBatch.dispatch(); + + expect( application.jobAfterCalled ) + .toBeTrue( "The `finally` job should dispatch when all batch jobs succeed." ); + } ); } ); } From 3d013832419523c8472ac9788f304fef6d8400e7 Mon Sep 17 00:00:00 2001 From: Eric Peterson Date: Thu, 12 Feb 2026 11:09:48 -0700 Subject: [PATCH 5/9] fix: make batch name optional and nullable --- models/Jobs/DBBatchRepository.cfc | 6 +++++- ...1_01_000009_make_cbq_batches_name_nullable.cfc | 15 +++++++++++++++ tests/specs/integration/BatchFinallySpec.cfc | 2 -- 3 files changed, 20 insertions(+), 3 deletions(-) create mode 100644 resources/database/migrations/2000_01_01_000009_make_cbq_batches_name_nullable.cfc diff --git a/models/Jobs/DBBatchRepository.cfc b/models/Jobs/DBBatchRepository.cfc index f112c6c..eea433e 100644 --- a/models/Jobs/DBBatchRepository.cfc +++ b/models/Jobs/DBBatchRepository.cfc @@ -58,12 +58,16 @@ component singleton accessors="true" { var id = isNull( variables.timeBasedUUIDGenerator ) ? createUUID() : variables.timeBasedUUIDGenerator .generate() .toString(); + local.batchName = arguments.batch.getName(); + if ( isNull( local.batchName ) || !isSimpleValue( local.batchName ) ) { + local.batchName = ""; + } qb.table( variables.batchTableName ) .insert( values = { "id" : id, - "name" : arguments.batch.getName(), + "name" : local.batchName, "totalJobs" : 0, "pendingJobs" : 0, "failedJobs" : 0, diff --git a/resources/database/migrations/2000_01_01_000009_make_cbq_batches_name_nullable.cfc b/resources/database/migrations/2000_01_01_000009_make_cbq_batches_name_nullable.cfc new file mode 100644 index 0000000..7a2101f --- /dev/null +++ b/resources/database/migrations/2000_01_01_000009_make_cbq_batches_name_nullable.cfc @@ -0,0 +1,15 @@ +component { + + function up( schema, qb ) { + schema.alter( "cbq_batches", ( t ) => { + t.modifyColumn( "name", t.string( "name" ).nullable() ); + } ); + } + + function down( schema, qb ) { + schema.alter( "cbq_batches", ( t ) => { + t.modifyColumn( "name", t.string( "name" ) ); + } ); + } + +} diff --git a/tests/specs/integration/BatchFinallySpec.cfc b/tests/specs/integration/BatchFinallySpec.cfc index 7612cee..cd20e25 100644 --- a/tests/specs/integration/BatchFinallySpec.cfc +++ b/tests/specs/integration/BatchFinallySpec.cfc @@ -27,7 +27,6 @@ component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" { job = "BeforeAndAfterJob", connection = "syncBatch" ); - pendingBatch.setName( "sync-failing-finally" ); try { pendingBatch.dispatch(); @@ -50,7 +49,6 @@ component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" { job = "BeforeAndAfterJob", connection = "syncBatch" ); - pendingBatch.setName( "sync-success-finally" ); pendingBatch.dispatch(); From 96ed25ce58535e8bc0ea55e269b0dd306df04514 Mon Sep 17 00:00:00 2001 From: Eric Peterson Date: Thu, 12 Feb 2026 11:16:55 -0700 Subject: [PATCH 6/9] test: load lib jars in test app and require time UUID generator --- models/Jobs/DBBatchRepository.cfc | 18 ++++++------------ tests/Application.cfc | 4 ++++ 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/models/Jobs/DBBatchRepository.cfc b/models/Jobs/DBBatchRepository.cfc index eea433e..da50225 100644 --- a/models/Jobs/DBBatchRepository.cfc +++ b/models/Jobs/DBBatchRepository.cfc @@ -11,11 +11,7 @@ component singleton accessors="true" { property name="batchTableName" default="cbq_batches"; public DBBatchRepository function init() { - try { - variables.timeBasedUUIDGenerator = createObject( "java", "com.fasterxml.uuid.Generators" ).timeBasedGenerator(); - } catch ( any e ) { - variables.timeBasedUUIDGenerator = javacast( "null", "" ); - } + variables.timeBasedUUIDGenerator = createObject( "java", "com.fasterxml.uuid.Generators" ).timeBasedGenerator(); variables.defaultQueryOptions = {}; return this; } @@ -55,19 +51,17 @@ component singleton accessors="true" { } public Batch function store( required PendingBatch batch ) { - var id = isNull( variables.timeBasedUUIDGenerator ) ? createUUID() : variables.timeBasedUUIDGenerator - .generate() - .toString(); - local.batchName = arguments.batch.getName(); - if ( isNull( local.batchName ) || !isSimpleValue( local.batchName ) ) { - local.batchName = ""; + var id = variables.timeBasedUUIDGenerator.generate().toString(); + var batchName = arguments.batch.getName(); + if ( isNull( batchName ) || !isSimpleValue( batchName ) ) { + batchName = ""; } qb.table( variables.batchTableName ) .insert( values = { "id" : id, - "name" : local.batchName, + "name" : batchName, "totalJobs" : 0, "pendingJobs" : 0, "failedJobs" : 0, diff --git a/tests/Application.cfc b/tests/Application.cfc index 63187e5..c8b5042 100644 --- a/tests/Application.cfc +++ b/tests/Application.cfc @@ -21,6 +21,10 @@ component { this.mappings[ "/testbox" ] = rootPath & "/testbox"; this.datasource = "cbq"; + this.javaSettings = { + "loadPaths" : [ rootPath & "/lib" ], + "reloadOnChange" : false + }; function onRequestStart() { createObject( "java", "java.lang.System" ).setProperty( "ENVIRONMENT", "testing" ); From cc7313197028eca2ae5ffd5b5cdfce433e5b9a73 Mon Sep 17 00:00:00 2001 From: Eric Peterson Date: Thu, 12 Feb 2026 11:46:02 -0700 Subject: [PATCH 7/9] breaking: require successfulJobs and add batch count coverage --- models/Jobs/DBBatchRepository.cfc | 8 +- ...dd_successfulJobs_to_cbq_batches_table.cfc | 2 +- .../app/models/Jobs/AlwaysErrorJob.cfc | 10 ++ .../Jobs/RequestScopeBeforeAndAfterJob.cfc | 15 +++ tests/specs/integration/BatchFinallySpec.cfc | 16 +-- .../DBBatchRepositoryCountsSpec.cfc | 124 ++++++++++++++++++ 6 files changed, 161 insertions(+), 14 deletions(-) create mode 100644 tests/resources/app/models/Jobs/AlwaysErrorJob.cfc create mode 100644 tests/resources/app/models/Jobs/RequestScopeBeforeAndAfterJob.cfc create mode 100644 tests/specs/integration/DBBatchRepositoryCountsSpec.cfc diff --git a/models/Jobs/DBBatchRepository.cfc b/models/Jobs/DBBatchRepository.cfc index da50225..6ea7336 100644 --- a/models/Jobs/DBBatchRepository.cfc +++ b/models/Jobs/DBBatchRepository.cfc @@ -64,6 +64,7 @@ component singleton accessors="true" { "name" : batchName, "totalJobs" : 0, "pendingJobs" : 0, + "successfulJobs" : 0, "failedJobs" : 0, "failedJobIds" : "[]", "options" : serializeJSON( arguments.batch.getOptions() ), @@ -108,16 +109,13 @@ component singleton accessors="true" { var updatedValues = { "pendingJobs" : data.pendingJobs - 1, + "successfulJobs" : data.successfulJobs + 1, "failedJobs" : data.failedJobs, "failedJobIds" : serializeJSON( deserializeJSON( data.failedJobIds ).filter( ( failedJobId ) => failedJobId != jobId ) ) }; - if ( data.keyExists( "successfulJobs" ) ) { - updatedValues[ "successfulJobs" ] = data.successfulJobs + 1; - } - qb.table( variables.batchTableName ) .where( "id", arguments.batchId ) .update( @@ -203,7 +201,7 @@ component singleton accessors="true" { batch.setTotalJobs( data.totalJobs ); batch.setPendingJobs( data.pendingJobs ); batch.setFailedJobs( data.failedJobs ); - batch.setSuccessfulJobs( data.keyExists( "successfulJobs" ) ? data.successfulJobs : 0 ); + batch.setSuccessfulJobs( data.successfulJobs ); batch.setFailedJobIds( deserializeJSON( data.failedJobIds ) ); batch.setOptions( deserializeJSON( data.options ) ); batch.setCreatedDate( data.createdDate ); diff --git a/resources/database/migrations/2000_01_01_000008_add_successfulJobs_to_cbq_batches_table.cfc b/resources/database/migrations/2000_01_01_000008_add_successfulJobs_to_cbq_batches_table.cfc index 7f3cef7..17d0e9f 100644 --- a/resources/database/migrations/2000_01_01_000008_add_successfulJobs_to_cbq_batches_table.cfc +++ b/resources/database/migrations/2000_01_01_000008_add_successfulJobs_to_cbq_batches_table.cfc @@ -2,7 +2,7 @@ component { function up( schema ) { schema.alter( "cbq_batches", ( t ) => { - t.unsignedInteger( "successfulJobs" ).default( 0 ); + t.addColumn( t.unsignedInteger( "successfulJobs" ).default( 0 ) ); } ); } diff --git a/tests/resources/app/models/Jobs/AlwaysErrorJob.cfc b/tests/resources/app/models/Jobs/AlwaysErrorJob.cfc new file mode 100644 index 0000000..4bfcaf5 --- /dev/null +++ b/tests/resources/app/models/Jobs/AlwaysErrorJob.cfc @@ -0,0 +1,10 @@ +component extends="cbq.models.Jobs.AbstractJob" { + + function handle() { + throw( + type = "cbq.tests.AlwaysErrorJob", + message = "This job always errors for testing." + ); + } + +} diff --git a/tests/resources/app/models/Jobs/RequestScopeBeforeAndAfterJob.cfc b/tests/resources/app/models/Jobs/RequestScopeBeforeAndAfterJob.cfc new file mode 100644 index 0000000..b2b714d --- /dev/null +++ b/tests/resources/app/models/Jobs/RequestScopeBeforeAndAfterJob.cfc @@ -0,0 +1,15 @@ +component extends="cbq.models.Jobs.AbstractJob" { + + function handle() { + // do nothing + } + + function before() { + request.jobBeforeCalled = true; + } + + function after() { + request.jobAfterCalled = true; + } + +} diff --git a/tests/specs/integration/BatchFinallySpec.cfc b/tests/specs/integration/BatchFinallySpec.cfc index cd20e25..773d2b1 100644 --- a/tests/specs/integration/BatchFinallySpec.cfc +++ b/tests/specs/integration/BatchFinallySpec.cfc @@ -3,11 +3,11 @@ component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" { function run() { describe( "batch finally dispatching", function() { beforeEach( function() { - structDelete( application, "jobBeforeCalled" ); - structDelete( application, "jobAfterCalled" ); + structDelete( request, "jobBeforeCalled" ); + structDelete( request, "jobAfterCalled" ); - param application.jobBeforeCalled = false; - param application.jobAfterCalled = false; + param request.jobBeforeCalled = false; + param request.jobAfterCalled = false; } ); it( "dispatches the finally job when the last job fails", function() { @@ -24,7 +24,7 @@ component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" { .batch( [ successJob, failingJob ] ) .onConnection( "syncBatch" ) .onComplete( - job = "BeforeAndAfterJob", + job = "RequestScopeBeforeAndAfterJob", connection = "syncBatch" ); @@ -34,7 +34,7 @@ component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" { // The sync provider rethrows the terminal failure. } - expect( application.jobAfterCalled ) + expect( request.jobAfterCalled ) .toBeTrue( "The `finally` job should dispatch even when the last job fails." ); } ); @@ -46,13 +46,13 @@ component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" { .batch( [ cbq.job( "SendWelcomeEmailJob" ), cbq.job( "SendWelcomeEmailJob" ) ] ) .onConnection( "syncBatch" ) .onComplete( - job = "BeforeAndAfterJob", + job = "RequestScopeBeforeAndAfterJob", connection = "syncBatch" ); pendingBatch.dispatch(); - expect( application.jobAfterCalled ) + expect( request.jobAfterCalled ) .toBeTrue( "The `finally` job should dispatch when all batch jobs succeed." ); } ); } ); diff --git a/tests/specs/integration/DBBatchRepositoryCountsSpec.cfc b/tests/specs/integration/DBBatchRepositoryCountsSpec.cfc new file mode 100644 index 0000000..7505483 --- /dev/null +++ b/tests/specs/integration/DBBatchRepositoryCountsSpec.cfc @@ -0,0 +1,124 @@ +component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" { + + function run() { + describe( "DBBatchRepository counts", function() { + it( "initializes successfulJobs for newly stored batches", function() { + var repository = getWireBox().getInstance( "DBBatchRepository@cbq" ); + var batch = repository.store( + getWireBox() + .getInstance( "@cbq" ) + .batch( [] ) + .allowFailures() + ); + + expect( batch.getSuccessfulJobs() ).toBe( 0 ); + } ); + + it( "successful jobs increment successfulJobs and decrement pendingJobs", function() { + var repository = getWireBox().getInstance( "DBBatchRepository@cbq" ); + var config = registerSyncConnectionAndWorkerPool(); + var batch = createTrackedBatch( repository, 1 ); + var provider = config.getConnection( "syncBatchCounts" ).getProvider(); + var pool = config.getWorkerPool( "syncBatchCounts" ); + + var job = getWireBox() + .getInstance( "@cbq" ) + .job( "SendWelcomeEmailJob" ) + .setId( createUUID() ) + .withBatchId( batch.getId() ); + + provider.marshalJob( job, pool ); + + var updatedBatch = repository.find( batch.getId() ); + + expect( updatedBatch.getPendingJobs() ).toBe( 0 ); + expect( updatedBatch.getFailedJobs() ).toBe( 0 ); + expect( updatedBatch.getSuccessfulJobs() ).toBe( 1 ); + } ); + + it( "retryable errors do not change pending, successful, or failed counts", function() { + var repository = getWireBox().getInstance( "DBBatchRepository@cbq" ); + var config = registerSyncConnectionAndWorkerPool(); + var batch = createTrackedBatch( repository, 1 ); + var provider = config.getConnection( "syncBatchCounts" ).getProvider(); + var pool = config.getWorkerPool( "syncBatchCounts" ); + + var job = getWireBox() + .getInstance( "@cbq" ) + .job( "AlwaysErrorJob" ) + .setId( createUUID() ) + .withBatchId( batch.getId() ) + .setCurrentAttempt( 1 ) + .setMaxAttempts( 2 ); + + expect( () => provider.marshalJob( job, pool ) ).toThrow( "cbq.SyncProviderJobFailed" ); + + var updatedBatch = repository.find( batch.getId() ); + + expect( updatedBatch.getPendingJobs() ).toBe( 1 ); + expect( updatedBatch.getSuccessfulJobs() ).toBe( 0 ); + expect( updatedBatch.getFailedJobs() ).toBe( 0 ); + expect( updatedBatch.getFailedJobIds() ).toBeEmpty(); + } ); + + it( "failed jobs increment failedJobs, append failedJobIds, and decrement pendingJobs", function() { + var repository = getWireBox().getInstance( "DBBatchRepository@cbq" ); + var config = registerSyncConnectionAndWorkerPool(); + var batch = createTrackedBatch( repository, 1 ); + var provider = config.getConnection( "syncBatchCounts" ).getProvider(); + var pool = config.getWorkerPool( "syncBatchCounts" ); + var failedJobId = createUUID(); + + var job = getWireBox() + .getInstance( "@cbq" ) + .job( "AlwaysErrorJob" ) + .setId( failedJobId ) + .withBatchId( batch.getId() ) + .setCurrentAttempt( 1 ) + .setMaxAttempts( 1 ); + + expect( () => provider.marshalJob( job, pool ) ).toThrow(); + + var updatedBatch = repository.find( batch.getId() ); + + expect( updatedBatch.getPendingJobs() ).toBe( 0 ); + expect( updatedBatch.getSuccessfulJobs() ).toBe( 0 ); + expect( updatedBatch.getFailedJobs() ).toBe( 1 ); + expect( updatedBatch.getFailedJobIds() ).toHaveLength( 1 ); + expect( updatedBatch.getFailedJobIds()[ 1 ] ).toBe( failedJobId ); + } ); + } ); + } + + private any function registerSyncConnectionAndWorkerPool() { + var config = getWireBox().getInstance( "Config@cbq" ); + + if ( !config.getConnections().keyExists( "syncBatchCounts" ) ) { + config.registerConnection( + name = "syncBatchCounts", + provider = getWireBox().getInstance( "SyncProvider@cbq" ).setProperties( {} ) + ); + } + + if ( !config.getWorkerPools().keyExists( "syncBatchCounts" ) ) { + config.registerWorkerPool( + name = "syncBatchCounts", + connectionName = "syncBatchCounts", + maxAttempts = 2 + ); + } + + return config; + } + + private any function createTrackedBatch( required any repository, required numeric totalJobs ) { + var pendingBatch = getWireBox() + .getInstance( "@cbq" ) + .batch( [] ) + .allowFailures(); + var batch = arguments.repository.store( pendingBatch ); + arguments.repository.incrementTotalJobs( batch.getId(), arguments.totalJobs ); + return arguments.repository.find( batch.getId() ); + } + +} From 772d034d6cc4c8ac162c5ea386417753a15b837c Mon Sep 17 00:00:00 2001 From: elpete <2583646+elpete@users.noreply.github.com> Date: Thu, 12 Feb 2026 18:48:14 +0000 Subject: [PATCH 8/9] Apply cfformat changes --- models/Jobs/DBBatchRepository.cfc | 10 ++------ tests/specs/integration/BatchFinallySpec.cfc | 26 +++++++------------- 2 files changed, 11 insertions(+), 25 deletions(-) diff --git a/models/Jobs/DBBatchRepository.cfc b/models/Jobs/DBBatchRepository.cfc index 6ea7336..4a6b193 100644 --- a/models/Jobs/DBBatchRepository.cfc +++ b/models/Jobs/DBBatchRepository.cfc @@ -118,10 +118,7 @@ component singleton accessors="true" { qb.table( variables.batchTableName ) .where( "id", arguments.batchId ) - .update( - values = updatedValues, - options = variables.defaultQueryOptions - ); + .update( values = updatedValues, options = variables.defaultQueryOptions ); return { "pendingJobs" : data.pendingJobs - 1, @@ -151,10 +148,7 @@ component singleton accessors="true" { qb.table( variables.batchTableName ) .where( "id", arguments.batchId ) - .update( - values = updatedValues, - options = variables.defaultQueryOptions - ); + .update( values = updatedValues, options = variables.defaultQueryOptions ); return { "pendingJobs" : data.pendingJobs - 1, diff --git a/tests/specs/integration/BatchFinallySpec.cfc b/tests/specs/integration/BatchFinallySpec.cfc index 773d2b1..9463d1a 100644 --- a/tests/specs/integration/BatchFinallySpec.cfc +++ b/tests/specs/integration/BatchFinallySpec.cfc @@ -15,18 +15,12 @@ component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" { registerSyncConnectionAndWorkerPool(); var successJob = cbq.job( "SendWelcomeEmailJob" ); - var failingJob = cbq.job( - job = "ReleaseTestJob", - maxAttempts = 1 - ); + var failingJob = cbq.job( job = "ReleaseTestJob", maxAttempts = 1 ); var pendingBatch = cbq .batch( [ successJob, failingJob ] ) .onConnection( "syncBatch" ) - .onComplete( - job = "RequestScopeBeforeAndAfterJob", - connection = "syncBatch" - ); + .onComplete( job = "RequestScopeBeforeAndAfterJob", connection = "syncBatch" ); try { pendingBatch.dispatch(); @@ -34,8 +28,7 @@ component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" { // The sync provider rethrows the terminal failure. } - expect( request.jobAfterCalled ) - .toBeTrue( "The `finally` job should dispatch even when the last job fails." ); + expect( request.jobAfterCalled ).toBeTrue( "The `finally` job should dispatch even when the last job fails." ); } ); it( "dispatches the finally job when all jobs succeed", function() { @@ -43,17 +36,16 @@ component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" { registerSyncConnectionAndWorkerPool(); var pendingBatch = cbq - .batch( [ cbq.job( "SendWelcomeEmailJob" ), cbq.job( "SendWelcomeEmailJob" ) ] ) + .batch( [ + cbq.job( "SendWelcomeEmailJob" ), + cbq.job( "SendWelcomeEmailJob" ) + ] ) .onConnection( "syncBatch" ) - .onComplete( - job = "RequestScopeBeforeAndAfterJob", - connection = "syncBatch" - ); + .onComplete( job = "RequestScopeBeforeAndAfterJob", connection = "syncBatch" ); pendingBatch.dispatch(); - expect( request.jobAfterCalled ) - .toBeTrue( "The `finally` job should dispatch when all batch jobs succeed." ); + expect( request.jobAfterCalled ).toBeTrue( "The `finally` job should dispatch when all batch jobs succeed." ); } ); } ); } From 95aa831644fd8123e1d1f7bb1f575d67eeb1ed6a Mon Sep 17 00:00:00 2001 From: Eric Peterson Date: Thu, 12 Feb 2026 12:25:35 -0700 Subject: [PATCH 9/9] Do not change `failedJobIds` except for incrementing failed jobs --- models/Jobs/DBBatchRepository.cfc | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/models/Jobs/DBBatchRepository.cfc b/models/Jobs/DBBatchRepository.cfc index 4a6b193..b7ffee1 100644 --- a/models/Jobs/DBBatchRepository.cfc +++ b/models/Jobs/DBBatchRepository.cfc @@ -110,10 +110,7 @@ component singleton accessors="true" { var updatedValues = { "pendingJobs" : data.pendingJobs - 1, "successfulJobs" : data.successfulJobs + 1, - "failedJobs" : data.failedJobs, - "failedJobIds" : serializeJSON( - deserializeJSON( data.failedJobIds ).filter( ( failedJobId ) => failedJobId != jobId ) - ) + "failedJobs" : data.failedJobs }; qb.table( variables.batchTableName )