Create & queue tasks on a pool of workers.
npm i ts-task-queue
In this example, we'll create a queue that will add two numbers.
Create a file for the queue, src/add-queue.ts. In this file, we'll create an interface that will be used for our task.
Our add task will accept two inputs, a and b, which must both be numbers:
src/add-queue.ts
export interface AddTask {
a: number;
b: number;
}Our task will take an AddTask as input, and output a number (the sum). So we'll create a queue that takes an AddTask as input and returns a number new Queue<AddTask, number>().
src/add-queue.ts
export interface AddTask {
a: number;
b: number;
}
export const addQueue = new Queue<AddTask, number>({
});At a minimum, our queue needs a name and a callback. The name must be a unique string to identify the queue, and the callback is a function that will perform the work that this queue does. The callback must be an async function!
src/add-queue.ts
export interface AddTask {
a: number;
b: number;
}
export const addQueue = new Queue<AddTask, number>({
name: 'add-queue',
callback: async (task: AddTask) => task.a + task.b,
});We can use addQueue.await() to push tasks onto the queue and get a Promise back. You most likely will want to wrap your application startup in Queue.isMainThread(); so you're not running application code on the worker that should only be run on the main thread:
src/index.ts
import { AddTask, addQueue } from './add-queue';
if (addQueue.isMainThread()) {
const sum = await addQueue.await({
data: { a: 4, b: 8 }
});
console.log('Sum is', sum);
// Sum is 12
}If you plan to run this through ts-node, you will also need to create a javascript entry-point for the workers:
./index.js
/**
* This file boots the worker in dev when the project is run through ts-node.
* - This file is not included in the build.
*/
if (!process.execArgv.includes('ts-node/register')) {
require('ts-node').register();
}
const path = require('path');
require(path.resolve(__dirname, './src/index.ts'));To run different tasks on multiple queues, you can repeat steps 1-4 to create additional queues. Just make sure the name is unique!
src/subtract-queue.ts
export interface SubtractTask {
a: number;
b: number;
};
export const subtractQueue = new Queue<SubtractTask, number>({
name: 'subtract-queue',
callback: async (task: SubtractTask) => task.a - task.b,
});src/index.ts
import { AddTask, addQueue } from './add-queue';
import { SubtractTask, subtractQueue } from './subtract-queue';
if (addQueue.isMainThread()) {
const sum = await addQueue.await({
data: { a: 4, b: 8 }
});
console.log('Sum is', sum);
// Sum is 12
}
if (subtractQueue.isMainThread()) {
const diff = await subtractQueue.await({
data: { a: 6, b: 4 }
});
console.log('Difference is', diff);
// Difference is 2
}Use the following options to customize the queue:
Specify a different file to load the workers. Default is index.js or the main file from package.json
Number of workers pooled for the queue. Default is 4
Run a function on worker startup, e.g. establish database connection
Specify an error-handler function. Default logs to stderr
Specify a fatal error-handler function. Default logs to stderr and exits
Maximum number of attempts for a task before it's considered failed. Default is 1. If a task fails, it will be retried up to this many times.
export const addQueue = new Queue<AddTask, number>({
name: 'add-queue',
callback: async (task: AddTask) => task.a + task.b,
maxAttempts: 3,
});When pushing tasks to the queue, you can specify additional options:
Schedule a task to run at a specific time in the future, with optional expiration. The task will not run until the scheduled time, and will be discarded if it expires.
src/add-queue.ts
export interface AddTask {
a: number;
b: number;
}
export const addQueue = new Queue<AddTask, number>({
name: 'add-queue',
callback: async (task: AddTask) => task.a + task.b,
});src/index.ts
import { AddTask, addQueue } from './add-queue';
if (addQueue.isMainThread()) {
// Schedule a task to run in 5 minutes
const sum = await addQueue.await({
data: { a: 4, b: 8 },
schedule: {
scheduledAt: new Date(Date.now() + 5 * 60 * 1000),
}
});
console.log('Sum is', sum);
// Sum is 12
// Schedule a task to run in 5 minutes, but expire after 10 minutes
const sum2 = await addQueue.await({
data: { a: 10, b: 20 },
schedule: {
scheduledAt: new Date(Date.now() + 5 * 60 * 1000),
expiresAt: new Date(Date.now() + 10 * 60 * 1000),
}
});
console.log('Sum2 is', sum2);
// Sum2 is 30
}