diff --git a/interceptors/LogFailedJobsInterceptor.cfc b/interceptors/LogFailedJobsInterceptor.cfc index 3cbcb9c..26f1057 100644 --- a/interceptors/LogFailedJobsInterceptor.cfc +++ b/interceptors/LogFailedJobsInterceptor.cfc @@ -44,7 +44,12 @@ component { "null" : ( arguments.data.exception.type ?: "" ) == "", "nulls" : ( arguments.data.exception.type ?: "" ) == "" }, - "exceptionMessage" : arguments.data.exception.message, + "exceptionMessage" : { + "value": arguments.data.exception.message ?: "", + "cfsqltype" : "CF_SQL_VARCHAR", + "null" : ( arguments.data.exception.message ?: "" ) == "", + "nulls" : ( arguments.data.exception.message ?: "" ) == "" + }, "exceptionDetail" : { "value" : arguments.data.exception.detail ?: "", "cfsqltype" : "CF_SQL_VARCHAR", @@ -57,8 +62,13 @@ component { "null" : ( arguments.data.exception.extendedInfo ?: "" ) == "", "nulls" : ( arguments.data.exception.extendedInfo ?: "" ) == "" }, - "exceptionStackTrace" : arguments.data.exception.stackTrace, - "exception" : serializeJSON( arguments.data.exception ), + "exceptionStackTrace" : { + "value": arguments.data.exception.stackTrace ?: "", + "cfsqltype" : "CF_SQL_VARCHAR", + "null" : ( arguments.data.exception.stackTrace ?: "" ) == "", + "nulls" : ( arguments.data.exception.stackTrace ?: "" ) == "" + }, + "exception" : isNull( arguments.data.exception ) ? javacast( "null", "" ) : serializeJSON( arguments.data.exception ), "failedDate" : { "value": getCurrentUnixTimestamp(), "cfsqltype": "CF_SQL_BIGINT" }, "originalId" : { "value": arguments.data.job.getId(), "cfsqltype": "CF_SQL_VARCHAR" } }; diff --git a/models/Jobs/Batch.cfc b/models/Jobs/Batch.cfc index e12d82f..b0ec34d 100644 --- a/models/Jobs/Batch.cfc +++ b/models/Jobs/Batch.cfc @@ -10,6 +10,7 @@ component accessors="true" { property name="totalJobs" type="numeric"; property name="pendingJobs" type="numeric"; property name="failedJobs" type="numeric"; + property name="successfulJobs" type="numeric"; property name="failedJobIds" type="array"; property name="options" type="struct"; property name="createdDate" type="numeric"; diff --git a/models/Jobs/DBBatchRepository.cfc b/models/Jobs/DBBatchRepository.cfc index 98f5282..b7ffee1 100644 --- a/models/Jobs/DBBatchRepository.cfc +++ b/models/Jobs/DBBatchRepository.cfc @@ -52,14 +52,19 @@ component singleton accessors="true" { public Batch function store( required PendingBatch batch ) { var id = variables.timeBasedUUIDGenerator.generate().toString(); + var batchName = arguments.batch.getName(); + if ( isNull( batchName ) || !isSimpleValue( batchName ) ) { + batchName = ""; + } qb.table( variables.batchTableName ) .insert( values = { "id" : id, - "name" : arguments.batch.getName(), + "name" : batchName, "totalJobs" : 0, "pendingJobs" : 0, + "successfulJobs" : 0, "failedJobs" : 0, "failedJobIds" : "[]", "options" : serializeJSON( arguments.batch.getOptions() ), @@ -102,23 +107,20 @@ component singleton accessors="true" { throw( type = "cbq.BatchNotFound", message = "No batch found for id [#arguments.batchId#]" ); } + var updatedValues = { + "pendingJobs" : data.pendingJobs - 1, + "successfulJobs" : data.successfulJobs + 1, + "failedJobs" : data.failedJobs + }; + qb.table( variables.batchTableName ) .where( "id", arguments.batchId ) - .update( - values = { - "pendingJobs" : data.pendingJobs - 1, - "failedJobs" : data.failedJobs, - "failedJobIds" : serializeJSON( - deserializeJSON( data.failedJobIds ).filter( ( failedJobId ) => failedJobId != jobId ) - ) - }, - options = variables.defaultQueryOptions - ); + .update( values = updatedValues, options = variables.defaultQueryOptions ); return { "pendingJobs" : data.pendingJobs - 1, "failedJobs" : data.failedJobs, - "allJobsHaveRanExactlyOnce" : ( data.pendingJobs - 1 ) - data.failedJobs == 0 + "allJobsHaveRanExactlyOnce" : ( data.pendingJobs - 1 ) == 0 }; } } @@ -135,21 +137,20 @@ component singleton accessors="true" { throw( type = "cbq.BatchNotFound", message = "No batch found for id [#arguments.batchId#]" ); } + var updatedValues = { + "pendingJobs" : data.pendingJobs - 1, + "failedJobs" : data.failedJobs + 1, + "failedJobIds" : serializeJSON( deserializeJSON( data.failedJobIds ).append( arguments.jobId ) ) + }; + qb.table( variables.batchTableName ) .where( "id", arguments.batchId ) - .update( - values = { - "pendingJobs" : data.pendingJobs, - "failedJobs" : data.failedJobs + 1, - "failedJobIds" : serializeJSON( deserializeJSON( data.failedJobIds ).append( arguments.jobId ) ) - }, - options = variables.defaultQueryOptions - ); + .update( values = updatedValues, options = variables.defaultQueryOptions ); return { - "pendingJobs" : data.pendingJobs, + "pendingJobs" : data.pendingJobs - 1, "failedJobs" : data.failedJobs + 1, - "allJobsHaveRanExactlyOnce" : data.pendingJobs - ( data.failedJobs + 1 ) == 0 + "allJobsHaveRanExactlyOnce" : ( data.pendingJobs - 1 ) == 0 }; } } @@ -191,6 +192,7 @@ component singleton accessors="true" { batch.setTotalJobs( data.totalJobs ); batch.setPendingJobs( data.pendingJobs ); batch.setFailedJobs( data.failedJobs ); + batch.setSuccessfulJobs( data.successfulJobs ); batch.setFailedJobIds( deserializeJSON( data.failedJobIds ) ); batch.setOptions( deserializeJSON( data.options ) ); batch.setCreatedDate( data.createdDate ); diff --git a/models/Providers/ColdBoxAsyncProvider.cfc b/models/Providers/ColdBoxAsyncProvider.cfc index 2e82802..e511ea0 100644 --- a/models/Providers/ColdBoxAsyncProvider.cfc +++ b/models/Providers/ColdBoxAsyncProvider.cfc @@ -23,7 +23,7 @@ component accessors="true" extends="AbstractQueueProvider" { sleep( delay * 1000 ); return true; }, workerPool.getExecutor() ) - .then( function() { + .thenCompose( function() { job.setId( createUUID() ); if ( !isNull( arguments.currentAttempt ) ) { job.setCurrentAttempt( attempts ); diff --git a/resources/database/migrations/2000_01_01_000008_add_successfulJobs_to_cbq_batches_table.cfc b/resources/database/migrations/2000_01_01_000008_add_successfulJobs_to_cbq_batches_table.cfc new file mode 100644 index 0000000..17d0e9f --- /dev/null +++ b/resources/database/migrations/2000_01_01_000008_add_successfulJobs_to_cbq_batches_table.cfc @@ -0,0 +1,15 @@ +component { + + function up( schema ) { + schema.alter( "cbq_batches", ( t ) => { + t.addColumn( t.unsignedInteger( "successfulJobs" ).default( 0 ) ); + } ); + } + + function down( schema ) { + schema.alter( "cbq_batches", ( t ) => { + t.dropColumn( "successfulJobs" ); + } ); + } + +} diff --git a/resources/database/migrations/2000_01_01_000009_make_cbq_batches_name_nullable.cfc b/resources/database/migrations/2000_01_01_000009_make_cbq_batches_name_nullable.cfc new file mode 100644 index 0000000..7a2101f --- /dev/null +++ b/resources/database/migrations/2000_01_01_000009_make_cbq_batches_name_nullable.cfc @@ -0,0 +1,15 @@ +component { + + function up( schema, qb ) { + schema.alter( "cbq_batches", ( t ) => { + t.modifyColumn( "name", t.string( "name" ).nullable() ); + } ); + } + + function down( schema, qb ) { + schema.alter( "cbq_batches", ( t ) => { + t.modifyColumn( "name", t.string( "name" ) ); + } ); + } + +} diff --git a/tests/Application.cfc b/tests/Application.cfc index 63187e5..c8b5042 100644 --- a/tests/Application.cfc +++ b/tests/Application.cfc @@ -21,6 +21,10 @@ component { this.mappings[ "/testbox" ] = rootPath & "/testbox"; this.datasource = "cbq"; + this.javaSettings = { + "loadPaths" : [ rootPath & "/lib" ], + "reloadOnChange" : false + }; function onRequestStart() { createObject( "java", "java.lang.System" ).setProperty( "ENVIRONMENT", "testing" ); diff --git a/tests/resources/app/models/Jobs/AlwaysErrorJob.cfc b/tests/resources/app/models/Jobs/AlwaysErrorJob.cfc new file mode 100644 index 0000000..4bfcaf5 --- /dev/null +++ b/tests/resources/app/models/Jobs/AlwaysErrorJob.cfc @@ -0,0 +1,10 @@ +component extends="cbq.models.Jobs.AbstractJob" { + + function handle() { + throw( + type = "cbq.tests.AlwaysErrorJob", + message = "This job always errors for testing." + ); + } + +} diff --git a/tests/resources/app/models/Jobs/RequestScopeBeforeAndAfterJob.cfc b/tests/resources/app/models/Jobs/RequestScopeBeforeAndAfterJob.cfc new file mode 100644 index 0000000..b2b714d --- /dev/null +++ b/tests/resources/app/models/Jobs/RequestScopeBeforeAndAfterJob.cfc @@ -0,0 +1,15 @@ +component extends="cbq.models.Jobs.AbstractJob" { + + function handle() { + // do nothing + } + + function before() { + request.jobBeforeCalled = true; + } + + function after() { + request.jobAfterCalled = true; + } + +} diff --git a/tests/specs/integration/BatchFinallySpec.cfc b/tests/specs/integration/BatchFinallySpec.cfc new file mode 100644 index 0000000..9463d1a --- /dev/null +++ b/tests/specs/integration/BatchFinallySpec.cfc @@ -0,0 +1,72 @@ +component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" { + + function run() { + describe( "batch finally dispatching", function() { + beforeEach( function() { + structDelete( request, "jobBeforeCalled" ); + structDelete( request, "jobAfterCalled" ); + + param request.jobBeforeCalled = false; + param request.jobAfterCalled = false; + } ); + + it( "dispatches the finally job when the last job fails", function() { + var cbq = getWireBox().getInstance( "@cbq" ); + registerSyncConnectionAndWorkerPool(); + + var successJob = cbq.job( "SendWelcomeEmailJob" ); + var failingJob = cbq.job( job = "ReleaseTestJob", maxAttempts = 1 ); + + var pendingBatch = cbq + .batch( [ successJob, failingJob ] ) + .onConnection( "syncBatch" ) + .onComplete( job = "RequestScopeBeforeAndAfterJob", connection = "syncBatch" ); + + try { + pendingBatch.dispatch(); + } catch ( cbq.MaxAttemptsReached e ) { + // The sync provider rethrows the terminal failure. + } + + expect( request.jobAfterCalled ).toBeTrue( "The `finally` job should dispatch even when the last job fails." ); + } ); + + it( "dispatches the finally job when all jobs succeed", function() { + var cbq = getWireBox().getInstance( "@cbq" ); + registerSyncConnectionAndWorkerPool(); + + var pendingBatch = cbq + .batch( [ + cbq.job( "SendWelcomeEmailJob" ), + cbq.job( "SendWelcomeEmailJob" ) + ] ) + .onConnection( "syncBatch" ) + .onComplete( job = "RequestScopeBeforeAndAfterJob", connection = "syncBatch" ); + + pendingBatch.dispatch(); + + expect( request.jobAfterCalled ).toBeTrue( "The `finally` job should dispatch when all batch jobs succeed." ); + } ); + } ); + } + + private void function registerSyncConnectionAndWorkerPool() { + var config = getWireBox().getInstance( "Config@cbq" ); + + if ( !config.getConnections().keyExists( "syncBatch" ) ) { + config.registerConnection( + name = "syncBatch", + provider = getWireBox().getInstance( "SyncProvider@cbq" ).setProperties( {} ) + ); + } + + if ( !config.getWorkerPools().keyExists( "syncBatch" ) ) { + config.registerWorkerPool( + name = "syncBatch", + connectionName = "syncBatch", + maxAttempts = 1 + ); + } + } + +} diff --git a/tests/specs/integration/DBBatchRepositoryCountsSpec.cfc b/tests/specs/integration/DBBatchRepositoryCountsSpec.cfc new file mode 100644 index 0000000..7505483 --- /dev/null +++ b/tests/specs/integration/DBBatchRepositoryCountsSpec.cfc @@ -0,0 +1,124 @@ +component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" { + + function run() { + describe( "DBBatchRepository counts", function() { + it( "initializes successfulJobs for newly stored batches", function() { + var repository = getWireBox().getInstance( "DBBatchRepository@cbq" ); + var batch = repository.store( + getWireBox() + .getInstance( "@cbq" ) + .batch( [] ) + .allowFailures() + ); + + expect( batch.getSuccessfulJobs() ).toBe( 0 ); + } ); + + it( "successful jobs increment successfulJobs and decrement pendingJobs", function() { + var repository = getWireBox().getInstance( "DBBatchRepository@cbq" ); + var config = registerSyncConnectionAndWorkerPool(); + var batch = createTrackedBatch( repository, 1 ); + var provider = config.getConnection( "syncBatchCounts" ).getProvider(); + var pool = config.getWorkerPool( "syncBatchCounts" ); + + var job = getWireBox() + .getInstance( "@cbq" ) + .job( "SendWelcomeEmailJob" ) + .setId( createUUID() ) + .withBatchId( batch.getId() ); + + provider.marshalJob( job, pool ); + + var updatedBatch = repository.find( batch.getId() ); + + expect( updatedBatch.getPendingJobs() ).toBe( 0 ); + expect( updatedBatch.getFailedJobs() ).toBe( 0 ); + expect( updatedBatch.getSuccessfulJobs() ).toBe( 1 ); + } ); + + it( "retryable errors do not change pending, successful, or failed counts", function() { + var repository = getWireBox().getInstance( "DBBatchRepository@cbq" ); + var config = registerSyncConnectionAndWorkerPool(); + var batch = createTrackedBatch( repository, 1 ); + var provider = config.getConnection( "syncBatchCounts" ).getProvider(); + var pool = config.getWorkerPool( "syncBatchCounts" ); + + var job = getWireBox() + .getInstance( "@cbq" ) + .job( "AlwaysErrorJob" ) + .setId( createUUID() ) + .withBatchId( batch.getId() ) + .setCurrentAttempt( 1 ) + .setMaxAttempts( 2 ); + + expect( () => provider.marshalJob( job, pool ) ).toThrow( "cbq.SyncProviderJobFailed" ); + + var updatedBatch = repository.find( batch.getId() ); + + expect( updatedBatch.getPendingJobs() ).toBe( 1 ); + expect( updatedBatch.getSuccessfulJobs() ).toBe( 0 ); + expect( updatedBatch.getFailedJobs() ).toBe( 0 ); + expect( updatedBatch.getFailedJobIds() ).toBeEmpty(); + } ); + + it( "failed jobs increment failedJobs, append failedJobIds, and decrement pendingJobs", function() { + var repository = getWireBox().getInstance( "DBBatchRepository@cbq" ); + var config = registerSyncConnectionAndWorkerPool(); + var batch = createTrackedBatch( repository, 1 ); + var provider = config.getConnection( "syncBatchCounts" ).getProvider(); + var pool = config.getWorkerPool( "syncBatchCounts" ); + var failedJobId = createUUID(); + + var job = getWireBox() + .getInstance( "@cbq" ) + .job( "AlwaysErrorJob" ) + .setId( failedJobId ) + .withBatchId( batch.getId() ) + .setCurrentAttempt( 1 ) + .setMaxAttempts( 1 ); + + expect( () => provider.marshalJob( job, pool ) ).toThrow(); + + var updatedBatch = repository.find( batch.getId() ); + + expect( updatedBatch.getPendingJobs() ).toBe( 0 ); + expect( updatedBatch.getSuccessfulJobs() ).toBe( 0 ); + expect( updatedBatch.getFailedJobs() ).toBe( 1 ); + expect( updatedBatch.getFailedJobIds() ).toHaveLength( 1 ); + expect( updatedBatch.getFailedJobIds()[ 1 ] ).toBe( failedJobId ); + } ); + } ); + } + + private any function registerSyncConnectionAndWorkerPool() { + var config = getWireBox().getInstance( "Config@cbq" ); + + if ( !config.getConnections().keyExists( "syncBatchCounts" ) ) { + config.registerConnection( + name = "syncBatchCounts", + provider = getWireBox().getInstance( "SyncProvider@cbq" ).setProperties( {} ) + ); + } + + if ( !config.getWorkerPools().keyExists( "syncBatchCounts" ) ) { + config.registerWorkerPool( + name = "syncBatchCounts", + connectionName = "syncBatchCounts", + maxAttempts = 2 + ); + } + + return config; + } + + private any function createTrackedBatch( required any repository, required numeric totalJobs ) { + var pendingBatch = getWireBox() + .getInstance( "@cbq" ) + .batch( [] ) + .allowFailures(); + var batch = arguments.repository.store( pendingBatch ); + arguments.repository.incrementTotalJobs( batch.getId(), arguments.totalJobs ); + return arguments.repository.find( batch.getId() ); + } + +}