diff --git a/.github/workflows/cron.yml b/.github/workflows/cron.yml index 04d03e7..da360e6 100644 --- a/.github/workflows/cron.yml +++ b/.github/workflows/cron.yml @@ -25,9 +25,9 @@ jobs: experimental: true services: mysql: - image: mysql:5.7 + image: mysql:8.0 env: - MYSQL_RANDOM_ROOT_PASSWORD: yes + MYSQL_ROOT_PASSWORD: root MYSQL_USER: cbq MYSQL_PASSWORD: cbq MYSQL_DATABASE: cbq @@ -56,6 +56,10 @@ jobs: box install ${{ matrix.coldbox }} --noSave box package list + - name: Configure MySQL user auth plugin + run: | + mysql -h 127.0.0.1 -P ${{ job.services.mysql.ports[3306] }} -uroot -proot -e "ALTER USER 'cbq'@'%' IDENTIFIED WITH mysql_native_password BY 'cbq'; FLUSH PRIVILEGES;" + - name: Start server env: DB_HOST: localhost @@ -78,4 +82,4 @@ jobs: DB_PASSWORD: cbq DB_DATABASE: cbq DB_SCHEMA: cbq - run: box testbox run \ No newline at end of file + run: box testbox run diff --git a/.github/workflows/pr.yml b/.github/workflows/pr.yml index b66507b..af546f5 100644 --- a/.github/workflows/pr.yml +++ b/.github/workflows/pr.yml @@ -22,9 +22,9 @@ jobs: cfengine: ["lucee@5", "lucee@6", "adobe@2021", "adobe@2023", "adobe@2025"] services: mysql: - image: mysql:5.7 + image: mysql:8.0 env: - MYSQL_RANDOM_ROOT_PASSWORD: yes + MYSQL_ROOT_PASSWORD: root MYSQL_USER: cbq MYSQL_PASSWORD: cbq MYSQL_DATABASE: cbq @@ -52,6 +52,10 @@ jobs: box config set modules.commandbox-dotenv.checkEnvPreServerStart=false box package list + - name: Configure MySQL user auth plugin + run: | + mysql -h 127.0.0.1 -P ${{ job.services.mysql.ports[3306] }} -uroot -proot -e "ALTER USER 'cbq'@'%' IDENTIFIED WITH mysql_native_password BY 'cbq'; FLUSH PRIVILEGES;" + - name: Start server env: DB_HOST: localhost @@ -101,4 +105,4 @@ jobs: - name: Commit Format Changes uses: stefanzweifel/git-auto-commit-action@v5.2.0 with: - commit_message: Apply cfformat changes \ No newline at end of file + commit_message: Apply cfformat changes diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 15b250f..c2cc1e6 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -17,9 +17,9 @@ jobs: cfengine: ["lucee@5", "lucee@6", "adobe@2021", "adobe@2023", "adobe@2025"] services: mysql: - image: mysql:5.7 + image: mysql:8.0 env: - MYSQL_RANDOM_ROOT_PASSWORD: yes + MYSQL_ROOT_PASSWORD: root MYSQL_USER: cbq MYSQL_PASSWORD: cbq MYSQL_DATABASE: cbq @@ -47,6 +47,10 @@ jobs: box config set modules.commandbox-dotenv.checkEnvPreServerStart=false box package list + - name: Configure MySQL user auth plugin + run: | + mysql -h 127.0.0.1 -P ${{ job.services.mysql.ports[3306] }} -uroot -proot -e "ALTER USER 'cbq'@'%' IDENTIFIED WITH mysql_native_password BY 'cbq'; FLUSH PRIVILEGES;" + - name: Start server env: DB_HOST: localhost diff --git a/box.json b/box.json index 6aeff2f..67b69f5 100644 --- a/box.json +++ b/box.json @@ -1,6 +1,6 @@ { "name":"cbq", - "version":"5.0.7", + "version":"6.0.0-beta.3", "author":"Eric Peterson ", "location":"forgeboxStorage", "homepage":"https://github.com/coldbox-modules/cbq", @@ -31,9 +31,9 @@ "cfmigrations":"modules/cfmigrations/" }, "scripts":{ - "format":"cfformat run ModuleConfig.cfc,models/**/*.cfc,tests/specs/**/*.cfc,tests/resources/app/handlers/**/*.cfc,tests/resources/app/config/**/*.cfc --overwrite", - "format:check":"cfformat check ModuleConfig.cfc,models/**/*.cfc,tests/specs/**/*.cfc,tests/resources/app/handlers/**/*.cfc,tests/resources/app/config/**/*.cfc --verbose", - "format:watch":"cfformat watch ModuleConfig.cfc,models/**/*.cfc,tests/specs/**/*.cfc,tests/resources/app/handlers/**/*.cfc,tests/resources/app/config/**/*.cfc", + "format":"cfformat run ModuleConfig.cfc,models/**/*.cfc,interceptors/**/*.cfc,tests/specs/**/*.cfc,tests/resources/app/handlers/**/*.cfc,tests/resources/app/config/**/*.cfc --overwrite", + "format:check":"cfformat check ModuleConfig.cfc,models/**/*.cfc,interceptors/**/*.cfc,tests/specs/**/*.cfc,tests/resources/app/handlers/**/*.cfc,tests/resources/app/config/**/*.cfc --verbose", + "format:watch":"cfformat watch ModuleConfig.cfc,models/**/*.cfc,interceptors/**/*.cfc,tests/specs/**/*.cfc,tests/resources/app/handlers/**/*.cfc,tests/resources/app/config/**/*.cfc", "install:2021":"cfpm install document,feed,zip" }, "ignore":[ diff --git a/interceptors/LogFailedJobsInterceptor.cfc b/interceptors/LogFailedJobsInterceptor.cfc index 3cbcb9c..fa6386f 100644 --- a/interceptors/LogFailedJobsInterceptor.cfc +++ b/interceptors/LogFailedJobsInterceptor.cfc @@ -44,7 +44,12 @@ component { "null" : ( arguments.data.exception.type ?: "" ) == "", "nulls" : ( arguments.data.exception.type ?: "" ) == "" }, - "exceptionMessage" : arguments.data.exception.message, + "exceptionMessage" : { + "value" : arguments.data.exception.message ?: "", + "cfsqltype" : "CF_SQL_VARCHAR", + "null" : ( arguments.data.exception.message ?: "" ) == "", + "nulls" : ( arguments.data.exception.message ?: "" ) == "" + }, "exceptionDetail" : { "value" : arguments.data.exception.detail ?: "", "cfsqltype" : "CF_SQL_VARCHAR", @@ -57,23 +62,38 @@ component { "null" : ( arguments.data.exception.extendedInfo ?: "" ) == "", "nulls" : ( arguments.data.exception.extendedInfo ?: "" ) == "" }, - "exceptionStackTrace" : arguments.data.exception.stackTrace, - "exception" : serializeJSON( arguments.data.exception ), - "failedDate" : { "value": getCurrentUnixTimestamp(), "cfsqltype": "CF_SQL_BIGINT" }, - "originalId" : { "value": arguments.data.job.getId(), "cfsqltype": "CF_SQL_VARCHAR" } + "exceptionStackTrace" : { + "value" : isNull( arguments.data.exception.stackTrace ) ? "" : ( + isSimpleValue( arguments.data.exception.stackTrace ) ? arguments.data.exception.stackTrace : serializeJSON( + arguments.data.exception.stackTrace + ) + ), + "cfsqltype" : "CF_SQL_VARCHAR", + "null" : isNull( arguments.data.exception.stackTrace ) || ( + isSimpleValue( arguments.data.exception.stackTrace ) && arguments.data.exception.stackTrace == "" + ), + "nulls" : isNull( arguments.data.exception.stackTrace ) || ( + isSimpleValue( arguments.data.exception.stackTrace ) && arguments.data.exception.stackTrace == "" + ) + }, + "exception" : isNull( arguments.data.exception ) ? javacast( "null", "" ) : serializeJSON( + arguments.data.exception + ), + "failedDate" : { + "value" : getCurrentUnixTimestamp(), + "cfsqltype" : "CF_SQL_BIGINT" + }, + "originalId" : { + "value" : arguments.data.job.getId(), + "cfsqltype" : "CF_SQL_VARCHAR" + } }; try { qb.table( variables.settings.logFailedJobsProperties.tableName ) - .insert( - values = logData, - options = options - ); + .insert( values = logData, options = options ); } catch ( any e ) { - log.error( "Failed to log failed job: #e.message#", { - "log": logData, - "exception": e - } ); + log.error( "Failed to log failed job: #e.message#", { "log" : logData, "exception" : e } ); rethrow; } } diff --git a/models/Jobs/Batch.cfc b/models/Jobs/Batch.cfc index e12d82f..52db039 100644 --- a/models/Jobs/Batch.cfc +++ b/models/Jobs/Batch.cfc @@ -3,6 +3,7 @@ component accessors="true" { property name="dispatcher" inject="provider:Dispatcher@cbq"; property name="cbq" inject="provider:@cbq"; property name="wirebox" inject="wirebox"; + property name="log" inject="logbox:logger:{this}"; property name="repository"; property name="id" type="string"; @@ -10,6 +11,7 @@ component accessors="true" { property name="totalJobs" type="numeric"; property name="pendingJobs" type="numeric"; property name="failedJobs" type="numeric"; + property name="successfulJobs" type="numeric"; property name="failedJobIds" type="array"; property name="options" type="struct"; property name="createdDate" type="numeric"; @@ -49,7 +51,12 @@ component accessors="true" { } getRepository().markAsFinished( variables.id ); - dispatchThenJobIfNeeded(); + + try { + dispatchThenJobIfNeeded(); + } catch ( any e ) { + log.error( "Failed to dispatch then job for batch [#variables.id#]: #e.message#", e ); + } if ( counts.allJobsHaveRanExactlyOnce ) { dispatchFinallyJobIfNeeded(); @@ -64,7 +71,11 @@ component accessors="true" { cancel(); } - dispatchCatchJobIfNeeded( arguments.error ); + try { + dispatchCatchJobIfNeeded( arguments.error ); + } catch ( any e ) { + log.error( "Failed to dispatch catch job for batch [#variables.id#]: #e.message#", e ); + } } if ( counts.allJobsHaveRanExactlyOnce ) { diff --git a/models/Jobs/DBBatchRepository.cfc b/models/Jobs/DBBatchRepository.cfc index 98f5282..b7ffee1 100644 --- a/models/Jobs/DBBatchRepository.cfc +++ b/models/Jobs/DBBatchRepository.cfc @@ -52,14 +52,19 @@ component singleton accessors="true" { public Batch function store( required PendingBatch batch ) { var id = variables.timeBasedUUIDGenerator.generate().toString(); + var batchName = arguments.batch.getName(); + if ( isNull( batchName ) || !isSimpleValue( batchName ) ) { + batchName = ""; + } qb.table( variables.batchTableName ) .insert( values = { "id" : id, - "name" : arguments.batch.getName(), + "name" : batchName, "totalJobs" : 0, "pendingJobs" : 0, + "successfulJobs" : 0, "failedJobs" : 0, "failedJobIds" : "[]", "options" : serializeJSON( arguments.batch.getOptions() ), @@ -102,23 +107,20 @@ component singleton accessors="true" { throw( type = "cbq.BatchNotFound", message = "No batch found for id [#arguments.batchId#]" ); } + var updatedValues = { + "pendingJobs" : data.pendingJobs - 1, + "successfulJobs" : data.successfulJobs + 1, + "failedJobs" : data.failedJobs + }; + qb.table( variables.batchTableName ) .where( "id", arguments.batchId ) - .update( - values = { - "pendingJobs" : data.pendingJobs - 1, - "failedJobs" : data.failedJobs, - "failedJobIds" : serializeJSON( - deserializeJSON( data.failedJobIds ).filter( ( failedJobId ) => failedJobId != jobId ) - ) - }, - options = variables.defaultQueryOptions - ); + .update( values = updatedValues, options = variables.defaultQueryOptions ); return { "pendingJobs" : data.pendingJobs - 1, "failedJobs" : data.failedJobs, - "allJobsHaveRanExactlyOnce" : ( data.pendingJobs - 1 ) - data.failedJobs == 0 + "allJobsHaveRanExactlyOnce" : ( data.pendingJobs - 1 ) == 0 }; } } @@ -135,21 +137,20 @@ component singleton accessors="true" { throw( type = "cbq.BatchNotFound", message = "No batch found for id [#arguments.batchId#]" ); } + var updatedValues = { + "pendingJobs" : data.pendingJobs - 1, + "failedJobs" : data.failedJobs + 1, + "failedJobIds" : serializeJSON( deserializeJSON( data.failedJobIds ).append( arguments.jobId ) ) + }; + qb.table( variables.batchTableName ) .where( "id", arguments.batchId ) - .update( - values = { - "pendingJobs" : data.pendingJobs, - "failedJobs" : data.failedJobs + 1, - "failedJobIds" : serializeJSON( deserializeJSON( data.failedJobIds ).append( arguments.jobId ) ) - }, - options = variables.defaultQueryOptions - ); + .update( values = updatedValues, options = variables.defaultQueryOptions ); return { - "pendingJobs" : data.pendingJobs, + "pendingJobs" : data.pendingJobs - 1, "failedJobs" : data.failedJobs + 1, - "allJobsHaveRanExactlyOnce" : data.pendingJobs - ( data.failedJobs + 1 ) == 0 + "allJobsHaveRanExactlyOnce" : ( data.pendingJobs - 1 ) == 0 }; } } @@ -191,6 +192,7 @@ component singleton accessors="true" { batch.setTotalJobs( data.totalJobs ); batch.setPendingJobs( data.pendingJobs ); batch.setFailedJobs( data.failedJobs ); + batch.setSuccessfulJobs( data.successfulJobs ); batch.setFailedJobIds( deserializeJSON( data.failedJobIds ) ); batch.setOptions( deserializeJSON( data.options ) ); batch.setCreatedDate( data.createdDate ); diff --git a/models/Providers/ColdBoxAsyncProvider.cfc b/models/Providers/ColdBoxAsyncProvider.cfc index 2e82802..f8ea080 100644 --- a/models/Providers/ColdBoxAsyncProvider.cfc +++ b/models/Providers/ColdBoxAsyncProvider.cfc @@ -23,9 +23,9 @@ component accessors="true" extends="AbstractQueueProvider" { sleep( delay * 1000 ); return true; }, workerPool.getExecutor() ) - .then( function() { + .thenCompose( function() { job.setId( createUUID() ); - if ( !isNull( arguments.currentAttempt ) ) { + if ( attempts > 0 ) { job.setCurrentAttempt( attempts ); } return marshalJob( job, workerPool ); diff --git a/models/Providers/DBProvider.cfc b/models/Providers/DBProvider.cfc index 0b47a03..c32cb42 100644 --- a/models/Providers/DBProvider.cfc +++ b/models/Providers/DBProvider.cfc @@ -394,12 +394,15 @@ component accessors="true" extends="AbstractQueueProvider" { variables.getCurrentUnixTimestamp() ); } ); - // past the reserved date - q1.orWhere( - "reservedDate", - "<=", - variables.getCurrentUnixTimestamp() - pool.getTimeout() - ); + // past the job's own timeout (availableDate was set to now + jobTimeout at reservation time) + q1.orWhere( ( q2 ) => { + q2.whereNotNull( "reservedDate" ) + .where( + "availableDate", + "<=", + variables.getCurrentUnixTimestamp() + ); + } ); // reserved by a worker but never released q1.orWhere( ( q3 ) => { q3.whereNull( "reservedDate" ) @@ -448,11 +451,14 @@ component accessors="true" extends="AbstractQueueProvider" { q2.whereNull( "reservedBy" ); q2.whereNull( "reservedDate" ); } ) - .orWhere( - "reservedDate", - "<=", - variables.getCurrentUnixTimestamp() - pool.getTimeout() - ); + .orWhere( ( q2 ) => { + q2.whereNotNull( "reservedDate" ) + .where( + "availableDate", + "<=", + variables.getCurrentUnixTimestamp() + ); + } ); } ) .update( values = { diff --git a/models/Providers/SyncProvider.cfc b/models/Providers/SyncProvider.cfc index 9e58fd5..c7adaf9 100644 --- a/models/Providers/SyncProvider.cfc +++ b/models/Providers/SyncProvider.cfc @@ -139,7 +139,7 @@ component accessors="true" extends="AbstractQueueProvider" { invoke( job, "onFailure", - { "excpetion" : e } + { "exception" : e } ); } diff --git a/resources/database/migrations/2000_01_01_000008_add_successfulJobs_to_cbq_batches_table.cfc b/resources/database/migrations/2000_01_01_000008_add_successfulJobs_to_cbq_batches_table.cfc new file mode 100644 index 0000000..17d0e9f --- /dev/null +++ b/resources/database/migrations/2000_01_01_000008_add_successfulJobs_to_cbq_batches_table.cfc @@ -0,0 +1,15 @@ +component { + + function up( schema ) { + schema.alter( "cbq_batches", ( t ) => { + t.addColumn( t.unsignedInteger( "successfulJobs" ).default( 0 ) ); + } ); + } + + function down( schema ) { + schema.alter( "cbq_batches", ( t ) => { + t.dropColumn( "successfulJobs" ); + } ); + } + +} diff --git a/resources/database/migrations/2000_01_01_000009_make_cbq_batches_name_nullable.cfc b/resources/database/migrations/2000_01_01_000009_make_cbq_batches_name_nullable.cfc new file mode 100644 index 0000000..7a2101f --- /dev/null +++ b/resources/database/migrations/2000_01_01_000009_make_cbq_batches_name_nullable.cfc @@ -0,0 +1,15 @@ +component { + + function up( schema, qb ) { + schema.alter( "cbq_batches", ( t ) => { + t.modifyColumn( "name", t.string( "name" ).nullable() ); + } ); + } + + function down( schema, qb ) { + schema.alter( "cbq_batches", ( t ) => { + t.modifyColumn( "name", t.string( "name" ) ); + } ); + } + +} diff --git a/tests/Application.cfc b/tests/Application.cfc index 63187e5..c8b5042 100644 --- a/tests/Application.cfc +++ b/tests/Application.cfc @@ -21,6 +21,10 @@ component { this.mappings[ "/testbox" ] = rootPath & "/testbox"; this.datasource = "cbq"; + this.javaSettings = { + "loadPaths" : [ rootPath & "/lib" ], + "reloadOnChange" : false + }; function onRequestStart() { createObject( "java", "java.lang.System" ).setProperty( "ENVIRONMENT", "testing" ); diff --git a/tests/resources/app/models/Jobs/AlwaysErrorJob.cfc b/tests/resources/app/models/Jobs/AlwaysErrorJob.cfc new file mode 100644 index 0000000..4bfcaf5 --- /dev/null +++ b/tests/resources/app/models/Jobs/AlwaysErrorJob.cfc @@ -0,0 +1,10 @@ +component extends="cbq.models.Jobs.AbstractJob" { + + function handle() { + throw( + type = "cbq.tests.AlwaysErrorJob", + message = "This job always errors for testing." + ); + } + +} diff --git a/tests/resources/app/models/Jobs/OnFailureCapturingJob.cfc b/tests/resources/app/models/Jobs/OnFailureCapturingJob.cfc new file mode 100644 index 0000000..8f25325 --- /dev/null +++ b/tests/resources/app/models/Jobs/OnFailureCapturingJob.cfc @@ -0,0 +1,15 @@ +component extends="cbq.models.Jobs.AbstractJob" { + + function handle() { + throw( + type = "cbq.tests.OnFailureCapturingJob", + message = "This job always errors to test the onFailure exception argument." + ); + } + + function onFailure() { + application.onFailureExceptionReceived = !isNull( arguments.exception ); + application.onFailureExceptionIsExpcetion = structKeyExists( arguments, "excpetion" ); + } + +} diff --git a/tests/resources/app/models/Jobs/RequestScopeBeforeAndAfterJob.cfc b/tests/resources/app/models/Jobs/RequestScopeBeforeAndAfterJob.cfc new file mode 100644 index 0000000..b2b714d --- /dev/null +++ b/tests/resources/app/models/Jobs/RequestScopeBeforeAndAfterJob.cfc @@ -0,0 +1,15 @@ +component extends="cbq.models.Jobs.AbstractJob" { + + function handle() { + // do nothing + } + + function before() { + request.jobBeforeCalled = true; + } + + function after() { + request.jobAfterCalled = true; + } + +} diff --git a/tests/specs/integration/BatchFinallySpec.cfc b/tests/specs/integration/BatchFinallySpec.cfc new file mode 100644 index 0000000..9463d1a --- /dev/null +++ b/tests/specs/integration/BatchFinallySpec.cfc @@ -0,0 +1,72 @@ +component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" { + + function run() { + describe( "batch finally dispatching", function() { + beforeEach( function() { + structDelete( request, "jobBeforeCalled" ); + structDelete( request, "jobAfterCalled" ); + + param request.jobBeforeCalled = false; + param request.jobAfterCalled = false; + } ); + + it( "dispatches the finally job when the last job fails", function() { + var cbq = getWireBox().getInstance( "@cbq" ); + registerSyncConnectionAndWorkerPool(); + + var successJob = cbq.job( "SendWelcomeEmailJob" ); + var failingJob = cbq.job( job = "ReleaseTestJob", maxAttempts = 1 ); + + var pendingBatch = cbq + .batch( [ successJob, failingJob ] ) + .onConnection( "syncBatch" ) + .onComplete( job = "RequestScopeBeforeAndAfterJob", connection = "syncBatch" ); + + try { + pendingBatch.dispatch(); + } catch ( cbq.MaxAttemptsReached e ) { + // The sync provider rethrows the terminal failure. + } + + expect( request.jobAfterCalled ).toBeTrue( "The `finally` job should dispatch even when the last job fails." ); + } ); + + it( "dispatches the finally job when all jobs succeed", function() { + var cbq = getWireBox().getInstance( "@cbq" ); + registerSyncConnectionAndWorkerPool(); + + var pendingBatch = cbq + .batch( [ + cbq.job( "SendWelcomeEmailJob" ), + cbq.job( "SendWelcomeEmailJob" ) + ] ) + .onConnection( "syncBatch" ) + .onComplete( job = "RequestScopeBeforeAndAfterJob", connection = "syncBatch" ); + + pendingBatch.dispatch(); + + expect( request.jobAfterCalled ).toBeTrue( "The `finally` job should dispatch when all batch jobs succeed." ); + } ); + } ); + } + + private void function registerSyncConnectionAndWorkerPool() { + var config = getWireBox().getInstance( "Config@cbq" ); + + if ( !config.getConnections().keyExists( "syncBatch" ) ) { + config.registerConnection( + name = "syncBatch", + provider = getWireBox().getInstance( "SyncProvider@cbq" ).setProperties( {} ) + ); + } + + if ( !config.getWorkerPools().keyExists( "syncBatch" ) ) { + config.registerWorkerPool( + name = "syncBatch", + connectionName = "syncBatch", + maxAttempts = 1 + ); + } + } + +} diff --git a/tests/specs/integration/DBBatchRepositoryCountsSpec.cfc b/tests/specs/integration/DBBatchRepositoryCountsSpec.cfc new file mode 100644 index 0000000..7505483 --- /dev/null +++ b/tests/specs/integration/DBBatchRepositoryCountsSpec.cfc @@ -0,0 +1,124 @@ +component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" { + + function run() { + describe( "DBBatchRepository counts", function() { + it( "initializes successfulJobs for newly stored batches", function() { + var repository = getWireBox().getInstance( "DBBatchRepository@cbq" ); + var batch = repository.store( + getWireBox() + .getInstance( "@cbq" ) + .batch( [] ) + .allowFailures() + ); + + expect( batch.getSuccessfulJobs() ).toBe( 0 ); + } ); + + it( "successful jobs increment successfulJobs and decrement pendingJobs", function() { + var repository = getWireBox().getInstance( "DBBatchRepository@cbq" ); + var config = registerSyncConnectionAndWorkerPool(); + var batch = createTrackedBatch( repository, 1 ); + var provider = config.getConnection( "syncBatchCounts" ).getProvider(); + var pool = config.getWorkerPool( "syncBatchCounts" ); + + var job = getWireBox() + .getInstance( "@cbq" ) + .job( "SendWelcomeEmailJob" ) + .setId( createUUID() ) + .withBatchId( batch.getId() ); + + provider.marshalJob( job, pool ); + + var updatedBatch = repository.find( batch.getId() ); + + expect( updatedBatch.getPendingJobs() ).toBe( 0 ); + expect( updatedBatch.getFailedJobs() ).toBe( 0 ); + expect( updatedBatch.getSuccessfulJobs() ).toBe( 1 ); + } ); + + it( "retryable errors do not change pending, successful, or failed counts", function() { + var repository = getWireBox().getInstance( "DBBatchRepository@cbq" ); + var config = registerSyncConnectionAndWorkerPool(); + var batch = createTrackedBatch( repository, 1 ); + var provider = config.getConnection( "syncBatchCounts" ).getProvider(); + var pool = config.getWorkerPool( "syncBatchCounts" ); + + var job = getWireBox() + .getInstance( "@cbq" ) + .job( "AlwaysErrorJob" ) + .setId( createUUID() ) + .withBatchId( batch.getId() ) + .setCurrentAttempt( 1 ) + .setMaxAttempts( 2 ); + + expect( () => provider.marshalJob( job, pool ) ).toThrow( "cbq.SyncProviderJobFailed" ); + + var updatedBatch = repository.find( batch.getId() ); + + expect( updatedBatch.getPendingJobs() ).toBe( 1 ); + expect( updatedBatch.getSuccessfulJobs() ).toBe( 0 ); + expect( updatedBatch.getFailedJobs() ).toBe( 0 ); + expect( updatedBatch.getFailedJobIds() ).toBeEmpty(); + } ); + + it( "failed jobs increment failedJobs, append failedJobIds, and decrement pendingJobs", function() { + var repository = getWireBox().getInstance( "DBBatchRepository@cbq" ); + var config = registerSyncConnectionAndWorkerPool(); + var batch = createTrackedBatch( repository, 1 ); + var provider = config.getConnection( "syncBatchCounts" ).getProvider(); + var pool = config.getWorkerPool( "syncBatchCounts" ); + var failedJobId = createUUID(); + + var job = getWireBox() + .getInstance( "@cbq" ) + .job( "AlwaysErrorJob" ) + .setId( failedJobId ) + .withBatchId( batch.getId() ) + .setCurrentAttempt( 1 ) + .setMaxAttempts( 1 ); + + expect( () => provider.marshalJob( job, pool ) ).toThrow(); + + var updatedBatch = repository.find( batch.getId() ); + + expect( updatedBatch.getPendingJobs() ).toBe( 0 ); + expect( updatedBatch.getSuccessfulJobs() ).toBe( 0 ); + expect( updatedBatch.getFailedJobs() ).toBe( 1 ); + expect( updatedBatch.getFailedJobIds() ).toHaveLength( 1 ); + expect( updatedBatch.getFailedJobIds()[ 1 ] ).toBe( failedJobId ); + } ); + } ); + } + + private any function registerSyncConnectionAndWorkerPool() { + var config = getWireBox().getInstance( "Config@cbq" ); + + if ( !config.getConnections().keyExists( "syncBatchCounts" ) ) { + config.registerConnection( + name = "syncBatchCounts", + provider = getWireBox().getInstance( "SyncProvider@cbq" ).setProperties( {} ) + ); + } + + if ( !config.getWorkerPools().keyExists( "syncBatchCounts" ) ) { + config.registerWorkerPool( + name = "syncBatchCounts", + connectionName = "syncBatchCounts", + maxAttempts = 2 + ); + } + + return config; + } + + private any function createTrackedBatch( required any repository, required numeric totalJobs ) { + var pendingBatch = getWireBox() + .getInstance( "@cbq" ) + .batch( [] ) + .allowFailures(); + var batch = arguments.repository.store( pendingBatch ); + arguments.repository.incrementTotalJobs( batch.getId(), arguments.totalJobs ); + return arguments.repository.find( batch.getId() ); + } + +} diff --git a/tests/specs/integration/Providers/DBProviderTimeoutWatcherSpec.cfc b/tests/specs/integration/Providers/DBProviderTimeoutWatcherSpec.cfc new file mode 100644 index 0000000..84a7d8a --- /dev/null +++ b/tests/specs/integration/Providers/DBProviderTimeoutWatcherSpec.cfc @@ -0,0 +1,107 @@ +component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" { + + function run() { + describe( "DBProvider timeout watcher", function() { + beforeEach( function() { + variables.provider = getWireBox().getInstance( "DBProvider@cbq" ).setProperties( {} ); + makePublic( variables.provider, "fetchPotentiallyOpenRecords" ); + variables.pool = makeWorkerPool( variables.provider ); + // clean up any leftover test records + variables.provider + .newQuery() + .table( "cbq_jobs" ) + .delete(); + } ); + + afterEach( function() { + variables.provider + .newQuery() + .table( "cbq_jobs" ) + .delete(); + } ); + + it( "does not re-grab a reserved job that is still within its job-specific timeout", function() { + var job = getWireBox().getInstance( "SendWelcomeEmailJob" ); + variables.provider.push( "default", job ); + + // Simulate the job being reserved by another worker with a 300s job timeout + var otherWorkerUUID = createUUID(); + var now = javacast( "long", getTickCount() / 1000 ); + variables.provider + .newQuery() + .table( "cbq_jobs" ) + .update( { + "reservedBy" : otherWorkerUUID, + "reservedDate" : now, + "availableDate" : now + 300 + } ); + + var ids = variables.provider.fetchPotentiallyOpenRecords( capacity = 10, pool = variables.pool ); + + expect( ids ).toBeEmpty( "A reserved job still within its job-specific timeout should not be re-grabbed" ); + } ); + + it( "re-grabs a reserved job whose job-specific timeout has expired", function() { + var job = getWireBox().getInstance( "SendWelcomeEmailJob" ); + variables.provider.push( "default", job ); + + // Simulate the job being reserved 310s ago with a 300s job timeout (availableDate now in the past) + var otherWorkerUUID = createUUID(); + var now = javacast( "long", getTickCount() / 1000 ); + variables.provider + .newQuery() + .table( "cbq_jobs" ) + .update( { + "reservedBy" : otherWorkerUUID, + "reservedDate" : now - 310, + "availableDate" : now - 10 + } ); + + var ids = variables.provider.fetchPotentiallyOpenRecords( capacity = 10, pool = variables.pool ); + + expect( ids ).toHaveLength( + 1, + "A reserved job whose job-specific timeout has expired should be re-grabbed" + ); + } ); + + it( "does not re-grab a job using the pool timeout when the job-specific timeout is longer", function() { + // Core bug fix: pool timeout is 60s, job timeout is 300s. + // After 65s (past pool timeout, within job timeout), job should NOT be re-grabbed. + var job = getWireBox().getInstance( "SendWelcomeEmailJob" ); + variables.provider.push( "default", job ); + + var otherWorkerUUID = createUUID(); + var now = javacast( "long", getTickCount() / 1000 ); + // Reserved 65s ago, job timeout is 300s, so availableDate is still 235s in the future + variables.provider + .newQuery() + .table( "cbq_jobs" ) + .update( { + "reservedBy" : otherWorkerUUID, + "reservedDate" : now - 65, + "availableDate" : now + 235 + } ); + + var ids = variables.provider.fetchPotentiallyOpenRecords( capacity = 10, pool = variables.pool ); + + expect( ids ).toBeEmpty( + "A job past the pool timeout but still within its job-specific timeout should not be re-grabbed" + ); + } ); + } ); + } + + private any function makeWorkerPool( required any provider ) { + var connection = getInstance( "QueueConnection@cbq" ) + .setName( "TestTimeoutWatcherConnection" ) + .setProvider( arguments.provider ); + + return getInstance( "WorkerPool@cbq" ) + .setName( "TestTimeoutWatcherPool" ) + .setConnection( connection ) + .setConnectionName( connection.getName() ) + .startWorkers(); + } + +} diff --git a/tests/specs/integration/Providers/SyncProviderOnFailureSpec.cfc b/tests/specs/integration/Providers/SyncProviderOnFailureSpec.cfc new file mode 100644 index 0000000..62dc748 --- /dev/null +++ b/tests/specs/integration/Providers/SyncProviderOnFailureSpec.cfc @@ -0,0 +1,45 @@ +component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" { + + function run() { + describe( "SyncProvider onFailure exception argument", function() { + beforeEach( function() { + structDelete( application, "onFailureExceptionReceived" ); + structDelete( application, "onFailureExceptionIsExpcetion" ); + } ); + + it( "passes the exception as 'exception' (not 'excpetion') to the onFailure handler", function() { + var provider = getWireBox().getInstance( "SyncProvider@cbq" ).setProperties( {} ); + var pool = makeWorkerPool( provider ); + var job = getInstance( "OnFailureCapturingJob" ) + .setId( createUUID() ) + .setCurrentAttempt( 1 ) + .setMaxAttempts( 1 ); + + param application.onFailureExceptionReceived = false; + param application.onFailureExceptionIsExpcetion = true; + + expect( () => provider.marshalJob( job, pool ) ).toThrow(); + + expect( application.onFailureExceptionReceived ).toBeTrue( + "onFailure should receive the exception under the key 'exception'" + ); + expect( application.onFailureExceptionIsExpcetion ).toBeFalse( + "onFailure should NOT receive the exception under the misspelled key 'excpetion'" + ); + } ); + } ); + } + + private any function makeWorkerPool( required any provider ) { + var connection = getInstance( "QueueConnection@cbq" ) + .setName( "TestOnFailureConnection" ) + .setProvider( arguments.provider ); + + return getInstance( "WorkerPool@cbq" ) + .setName( "TestOnFailurePool" ) + .setConnection( connection ) + .setConnectionName( connection.getName() ) + .startWorkers(); + } + +}