Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
b782bfa
fix(LogFailedJobsInterceptor): Insert nulls when no exception informa…
elpete Feb 12, 2026
f61250d
fix(ColdBoxAsyncProvider): Compose the `marshalJob` future with the d…
elpete Feb 12, 2026
a489802
test: reproduce missing batch finally dispatch on terminal failure
elpete Feb 12, 2026
9038762
fix: complete batches correctly when jobs end in failure
elpete Feb 12, 2026
3d01383
fix: make batch name optional and nullable
elpete Feb 12, 2026
96ed25c
test: load lib jars in test app and require time UUID generator
elpete Feb 12, 2026
cc73131
breaking: require successfulJobs and add batch count coverage
elpete Feb 12, 2026
772d034
Apply cfformat changes
elpete Feb 12, 2026
95aa831
Do not change `failedJobIds` except for incrementing failed jobs
elpete Feb 12, 2026
0c03791
v6.0.0-beta.1
elpete Feb 12, 2026
19a871e
fix: handle complex stackTrace objects in LogFailedJobsInterceptor
elpete Apr 6, 2026
d776aa0
fix: use availableDate instead of reservedDate for timeout watcher
elpete Apr 6, 2026
8b96166
chore: add interceptors to cfformat scripts
elpete Apr 6, 2026
4641261
v6.0.0-beta.2
elpete Apr 6, 2026
d57e907
test: verify timeout watcher respects job-specific timeout over pool …
elpete Apr 6, 2026
e1af7a8
fix: set job attempt count in ColdBoxAsyncProvider and tighten tryToL…
elpete Apr 6, 2026
96450f2
6.0.0-beta.3
elpete Apr 6, 2026
0f504c6
fix: remove skip locked from DB timeout watcher query
elpete Apr 6, 2026
a57d7c4
chore: upgrade CI to MySQL 8 and re-enable skip locked
elpete Apr 6, 2026
6d30e6a
fix: remove invalid mysql docker flag in workflow services
elpete Apr 6, 2026
f51594f
fix: set mysql8 test user auth plugin via init script
elpete Apr 6, 2026
375ee0a
fix: configure mysql8 auth plugin in workflow step
elpete Apr 6, 2026
5c12a1a
fix: protect finally job dispatch from then/catch job failures
elpete Apr 7, 2026
970b00c
fix: correct excpetion typo in SyncProvider onFailure invocation
elpete Apr 7, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions .github/workflows/cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ jobs:
experimental: true
services:
mysql:
image: mysql:5.7
image: mysql:8.0
env:
MYSQL_RANDOM_ROOT_PASSWORD: yes
MYSQL_ROOT_PASSWORD: root
MYSQL_USER: cbq
MYSQL_PASSWORD: cbq
MYSQL_DATABASE: cbq
Expand Down Expand Up @@ -56,6 +56,10 @@ jobs:
box install ${{ matrix.coldbox }} --noSave
box package list

- name: Configure MySQL user auth plugin
run: |
mysql -h 127.0.0.1 -P ${{ job.services.mysql.ports[3306] }} -uroot -proot -e "ALTER USER 'cbq'@'%' IDENTIFIED WITH mysql_native_password BY 'cbq'; FLUSH PRIVILEGES;"

- name: Start server
env:
DB_HOST: localhost
Expand All @@ -78,4 +82,4 @@ jobs:
DB_PASSWORD: cbq
DB_DATABASE: cbq
DB_SCHEMA: cbq
run: box testbox run
run: box testbox run
10 changes: 7 additions & 3 deletions .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ jobs:
cfengine: ["lucee@5", "lucee@6", "adobe@2021", "adobe@2023", "adobe@2025"]
services:
mysql:
image: mysql:5.7
image: mysql:8.0
env:
MYSQL_RANDOM_ROOT_PASSWORD: yes
MYSQL_ROOT_PASSWORD: root
MYSQL_USER: cbq
MYSQL_PASSWORD: cbq
MYSQL_DATABASE: cbq
Expand Down Expand Up @@ -52,6 +52,10 @@ jobs:
box config set modules.commandbox-dotenv.checkEnvPreServerStart=false
box package list

- name: Configure MySQL user auth plugin
run: |
mysql -h 127.0.0.1 -P ${{ job.services.mysql.ports[3306] }} -uroot -proot -e "ALTER USER 'cbq'@'%' IDENTIFIED WITH mysql_native_password BY 'cbq'; FLUSH PRIVILEGES;"

- name: Start server
env:
DB_HOST: localhost
Expand Down Expand Up @@ -101,4 +105,4 @@ jobs:
- name: Commit Format Changes
uses: stefanzweifel/git-auto-commit-action@v5.2.0
with:
commit_message: Apply cfformat changes
commit_message: Apply cfformat changes
8 changes: 6 additions & 2 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ jobs:
cfengine: ["lucee@5", "lucee@6", "adobe@2021", "adobe@2023", "adobe@2025"]
services:
mysql:
image: mysql:5.7
image: mysql:8.0
env:
MYSQL_RANDOM_ROOT_PASSWORD: yes
MYSQL_ROOT_PASSWORD: root
MYSQL_USER: cbq
MYSQL_PASSWORD: cbq
MYSQL_DATABASE: cbq
Expand Down Expand Up @@ -47,6 +47,10 @@ jobs:
box config set modules.commandbox-dotenv.checkEnvPreServerStart=false
box package list

- name: Configure MySQL user auth plugin
run: |
mysql -h 127.0.0.1 -P ${{ job.services.mysql.ports[3306] }} -uroot -proot -e "ALTER USER 'cbq'@'%' IDENTIFIED WITH mysql_native_password BY 'cbq'; FLUSH PRIVILEGES;"

- name: Start server
env:
DB_HOST: localhost
Expand Down
8 changes: 4 additions & 4 deletions box.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name":"cbq",
"version":"5.0.7",
"version":"6.0.0-beta.3",
"author":"Eric Peterson <eric@elpete.com>",
"location":"forgeboxStorage",
"homepage":"https://github.com/coldbox-modules/cbq",
Expand Down Expand Up @@ -31,9 +31,9 @@
"cfmigrations":"modules/cfmigrations/"
},
"scripts":{
"format":"cfformat run ModuleConfig.cfc,models/**/*.cfc,tests/specs/**/*.cfc,tests/resources/app/handlers/**/*.cfc,tests/resources/app/config/**/*.cfc --overwrite",
"format:check":"cfformat check ModuleConfig.cfc,models/**/*.cfc,tests/specs/**/*.cfc,tests/resources/app/handlers/**/*.cfc,tests/resources/app/config/**/*.cfc --verbose",
"format:watch":"cfformat watch ModuleConfig.cfc,models/**/*.cfc,tests/specs/**/*.cfc,tests/resources/app/handlers/**/*.cfc,tests/resources/app/config/**/*.cfc",
"format":"cfformat run ModuleConfig.cfc,models/**/*.cfc,interceptors/**/*.cfc,tests/specs/**/*.cfc,tests/resources/app/handlers/**/*.cfc,tests/resources/app/config/**/*.cfc --overwrite",
"format:check":"cfformat check ModuleConfig.cfc,models/**/*.cfc,interceptors/**/*.cfc,tests/specs/**/*.cfc,tests/resources/app/handlers/**/*.cfc,tests/resources/app/config/**/*.cfc --verbose",
"format:watch":"cfformat watch ModuleConfig.cfc,models/**/*.cfc,interceptors/**/*.cfc,tests/specs/**/*.cfc,tests/resources/app/handlers/**/*.cfc,tests/resources/app/config/**/*.cfc",
"install:2021":"cfpm install document,feed,zip"
},
"ignore":[
Expand Down
46 changes: 33 additions & 13 deletions interceptors/LogFailedJobsInterceptor.cfc
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,12 @@ component {
"null" : ( arguments.data.exception.type ?: "" ) == "",
"nulls" : ( arguments.data.exception.type ?: "" ) == ""
},
"exceptionMessage" : arguments.data.exception.message,
"exceptionMessage" : {
"value" : arguments.data.exception.message ?: "",
"cfsqltype" : "CF_SQL_VARCHAR",
"null" : ( arguments.data.exception.message ?: "" ) == "",
"nulls" : ( arguments.data.exception.message ?: "" ) == ""
},
"exceptionDetail" : {
"value" : arguments.data.exception.detail ?: "",
"cfsqltype" : "CF_SQL_VARCHAR",
Expand All @@ -57,23 +62,38 @@ component {
"null" : ( arguments.data.exception.extendedInfo ?: "" ) == "",
"nulls" : ( arguments.data.exception.extendedInfo ?: "" ) == ""
},
"exceptionStackTrace" : arguments.data.exception.stackTrace,
"exception" : serializeJSON( arguments.data.exception ),
"failedDate" : { "value": getCurrentUnixTimestamp(), "cfsqltype": "CF_SQL_BIGINT" },
"originalId" : { "value": arguments.data.job.getId(), "cfsqltype": "CF_SQL_VARCHAR" }
"exceptionStackTrace" : {
"value" : isNull( arguments.data.exception.stackTrace ) ? "" : (
isSimpleValue( arguments.data.exception.stackTrace ) ? arguments.data.exception.stackTrace : serializeJSON(
arguments.data.exception.stackTrace
)
),
"cfsqltype" : "CF_SQL_VARCHAR",
"null" : isNull( arguments.data.exception.stackTrace ) || (
isSimpleValue( arguments.data.exception.stackTrace ) && arguments.data.exception.stackTrace == ""
),
"nulls" : isNull( arguments.data.exception.stackTrace ) || (
isSimpleValue( arguments.data.exception.stackTrace ) && arguments.data.exception.stackTrace == ""
)
},
"exception" : isNull( arguments.data.exception ) ? javacast( "null", "" ) : serializeJSON(
arguments.data.exception
),
"failedDate" : {
"value" : getCurrentUnixTimestamp(),
"cfsqltype" : "CF_SQL_BIGINT"
},
"originalId" : {
"value" : arguments.data.job.getId(),
"cfsqltype" : "CF_SQL_VARCHAR"
}
};

try {
qb.table( variables.settings.logFailedJobsProperties.tableName )
.insert(
values = logData,
options = options
);
.insert( values = logData, options = options );
} catch ( any e ) {
log.error( "Failed to log failed job: #e.message#", {
"log": logData,
"exception": e
} );
log.error( "Failed to log failed job: #e.message#", { "log" : logData, "exception" : e } );
rethrow;
}
}
Expand Down
15 changes: 13 additions & 2 deletions models/Jobs/Batch.cfc
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ component accessors="true" {
property name="dispatcher" inject="provider:Dispatcher@cbq";
property name="cbq" inject="provider:@cbq";
property name="wirebox" inject="wirebox";
property name="log" inject="logbox:logger:{this}";
property name="repository";

property name="id" type="string";
property name="name" type="string";
property name="totalJobs" type="numeric";
property name="pendingJobs" type="numeric";
property name="failedJobs" type="numeric";
property name="successfulJobs" type="numeric";
property name="failedJobIds" type="array";
property name="options" type="struct";
property name="createdDate" type="numeric";
Expand Down Expand Up @@ -49,7 +51,12 @@ component accessors="true" {
}

getRepository().markAsFinished( variables.id );
dispatchThenJobIfNeeded();

try {
dispatchThenJobIfNeeded();
} catch ( any e ) {
log.error( "Failed to dispatch then job for batch [#variables.id#]: #e.message#", e );
}

if ( counts.allJobsHaveRanExactlyOnce ) {
dispatchFinallyJobIfNeeded();
Expand All @@ -64,7 +71,11 @@ component accessors="true" {
cancel();
}

dispatchCatchJobIfNeeded( arguments.error );
try {
dispatchCatchJobIfNeeded( arguments.error );
} catch ( any e ) {
log.error( "Failed to dispatch catch job for batch [#variables.id#]: #e.message#", e );
}
}

if ( counts.allJobsHaveRanExactlyOnce ) {
Expand Down
46 changes: 24 additions & 22 deletions models/Jobs/DBBatchRepository.cfc
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,19 @@ component singleton accessors="true" {

public Batch function store( required PendingBatch batch ) {
var id = variables.timeBasedUUIDGenerator.generate().toString();
var batchName = arguments.batch.getName();
if ( isNull( batchName ) || !isSimpleValue( batchName ) ) {
batchName = "";
}

qb.table( variables.batchTableName )
.insert(
values = {
"id" : id,
"name" : arguments.batch.getName(),
"name" : batchName,
"totalJobs" : 0,
"pendingJobs" : 0,
"successfulJobs" : 0,
"failedJobs" : 0,
"failedJobIds" : "[]",
"options" : serializeJSON( arguments.batch.getOptions() ),
Expand Down Expand Up @@ -102,23 +107,20 @@ component singleton accessors="true" {
throw( type = "cbq.BatchNotFound", message = "No batch found for id [#arguments.batchId#]" );
}

var updatedValues = {
"pendingJobs" : data.pendingJobs - 1,
"successfulJobs" : data.successfulJobs + 1,
"failedJobs" : data.failedJobs
};

qb.table( variables.batchTableName )
.where( "id", arguments.batchId )
.update(
values = {
"pendingJobs" : data.pendingJobs - 1,
"failedJobs" : data.failedJobs,
"failedJobIds" : serializeJSON(
deserializeJSON( data.failedJobIds ).filter( ( failedJobId ) => failedJobId != jobId )
)
},
options = variables.defaultQueryOptions
);
.update( values = updatedValues, options = variables.defaultQueryOptions );

return {
"pendingJobs" : data.pendingJobs - 1,
"failedJobs" : data.failedJobs,
"allJobsHaveRanExactlyOnce" : ( data.pendingJobs - 1 ) - data.failedJobs == 0
"allJobsHaveRanExactlyOnce" : ( data.pendingJobs - 1 ) == 0
};
}
}
Expand All @@ -135,21 +137,20 @@ component singleton accessors="true" {
throw( type = "cbq.BatchNotFound", message = "No batch found for id [#arguments.batchId#]" );
}

var updatedValues = {
"pendingJobs" : data.pendingJobs - 1,
"failedJobs" : data.failedJobs + 1,
"failedJobIds" : serializeJSON( deserializeJSON( data.failedJobIds ).append( arguments.jobId ) )
};

qb.table( variables.batchTableName )
.where( "id", arguments.batchId )
.update(
values = {
"pendingJobs" : data.pendingJobs,
"failedJobs" : data.failedJobs + 1,
"failedJobIds" : serializeJSON( deserializeJSON( data.failedJobIds ).append( arguments.jobId ) )
},
options = variables.defaultQueryOptions
);
.update( values = updatedValues, options = variables.defaultQueryOptions );

return {
"pendingJobs" : data.pendingJobs,
"pendingJobs" : data.pendingJobs - 1,
"failedJobs" : data.failedJobs + 1,
"allJobsHaveRanExactlyOnce" : data.pendingJobs - ( data.failedJobs + 1 ) == 0
"allJobsHaveRanExactlyOnce" : ( data.pendingJobs - 1 ) == 0
};
}
}
Expand Down Expand Up @@ -191,6 +192,7 @@ component singleton accessors="true" {
batch.setTotalJobs( data.totalJobs );
batch.setPendingJobs( data.pendingJobs );
batch.setFailedJobs( data.failedJobs );
batch.setSuccessfulJobs( data.successfulJobs );
batch.setFailedJobIds( deserializeJSON( data.failedJobIds ) );
batch.setOptions( deserializeJSON( data.options ) );
batch.setCreatedDate( data.createdDate );
Expand Down
4 changes: 2 additions & 2 deletions models/Providers/ColdBoxAsyncProvider.cfc
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ component accessors="true" extends="AbstractQueueProvider" {
sleep( delay * 1000 );
return true;
}, workerPool.getExecutor() )
.then( function() {
.thenCompose( function() {
job.setId( createUUID() );
if ( !isNull( arguments.currentAttempt ) ) {
if ( attempts > 0 ) {
job.setCurrentAttempt( attempts );
}
return marshalJob( job, workerPool );
Expand Down
28 changes: 17 additions & 11 deletions models/Providers/DBProvider.cfc
Original file line number Diff line number Diff line change
Expand Up @@ -394,12 +394,15 @@ component accessors="true" extends="AbstractQueueProvider" {
variables.getCurrentUnixTimestamp()
);
} );
// past the reserved date
q1.orWhere(
"reservedDate",
"<=",
variables.getCurrentUnixTimestamp() - pool.getTimeout()
);
// past the job's own timeout (availableDate was set to now + jobTimeout at reservation time)
q1.orWhere( ( q2 ) => {
q2.whereNotNull( "reservedDate" )
.where(
"availableDate",
"<=",
variables.getCurrentUnixTimestamp()
);
} );
// reserved by a worker but never released
q1.orWhere( ( q3 ) => {
q3.whereNull( "reservedDate" )
Expand Down Expand Up @@ -448,11 +451,14 @@ component accessors="true" extends="AbstractQueueProvider" {
q2.whereNull( "reservedBy" );
q2.whereNull( "reservedDate" );
} )
.orWhere(
"reservedDate",
"<=",
variables.getCurrentUnixTimestamp() - pool.getTimeout()
);
.orWhere( ( q2 ) => {
q2.whereNotNull( "reservedDate" )
.where(
"availableDate",
"<=",
variables.getCurrentUnixTimestamp()
);
} );
} )
.update(
values = {
Expand Down
2 changes: 1 addition & 1 deletion models/Providers/SyncProvider.cfc
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ component accessors="true" extends="AbstractQueueProvider" {
invoke(
job,
"onFailure",
{ "excpetion" : e }
{ "exception" : e }
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
component {

function up( schema ) {
schema.alter( "cbq_batches", ( t ) => {
t.addColumn( t.unsignedInteger( "successfulJobs" ).default( 0 ) );
} );
}

function down( schema ) {
schema.alter( "cbq_batches", ( t ) => {
t.dropColumn( "successfulJobs" );
} );
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
component {

function up( schema, qb ) {
schema.alter( "cbq_batches", ( t ) => {
t.modifyColumn( "name", t.string( "name" ).nullable() );
} );
}

function down( schema, qb ) {
schema.alter( "cbq_batches", ( t ) => {
t.modifyColumn( "name", t.string( "name" ) );
} );
}

}
Loading
Loading