Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 150 additions & 0 deletions examples/work-item-filters/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

/**
* This example demonstrates Work Item Filters for Durable Task workers.
*
* Work Item Filters allow a worker to tell the sidecar which orchestrations,
* activities, and entities it is configured to handle. The sidecar then only
* dispatches matching work items to that worker, enabling efficient routing.
*
* Key concepts demonstrated:
* - Auto-generated filters from the worker's registry (default behavior)
* - Explicit filters via useWorkItemFilters()
*
* This example runs against:
* DTS Emulator:
* docker run --name dts-emulator -i -p 8080:8080 -d --rm mcr.microsoft.com/dts/dts-emulator:latest
* Then:
* npx ts-node --swc examples/work-item-filters/index.ts
*/

import {
ActivityContext,
OrchestrationContext,
TOrchestrator,
WorkItemFilters,
} from "@microsoft/durabletask-js";
import {
DurableTaskAzureManagedClientBuilder,
DurableTaskAzureManagedWorkerBuilder,
} from "@microsoft/durabletask-js-azuremanaged";

const endpoint = process.env.ENDPOINT || "localhost:8080";
const taskHub = process.env.TASKHUB || "default";

// ============================================================================
// Step 1: Define activities and orchestrators
// ============================================================================

const greet = async (_: ActivityContext, name: string): Promise<string> => {
return `Hello, ${name}!`;
};

const add = async (_: ActivityContext, input: { a: number; b: number }): Promise<number> => {
return input.a + input.b;
};

const greetingOrchestrator: TOrchestrator = async function* (
ctx: OrchestrationContext,
name: string,
): Promise<any> {
const result = yield ctx.callActivity(greet, name);
return result;
};

const mathOrchestrator: TOrchestrator = async function* (
ctx: OrchestrationContext,
input: { a: number; b: number },
): Promise<any> {
const result = yield ctx.callActivity(add, input);
return result;
};

// ============================================================================
// Step 2: Demonstrate different work item filter configurations
// ============================================================================

async function runWithAutoGeneratedFilters() {
console.log("\n=== Scenario 1: Auto-Generated Filters (Default) ===");
console.log("The worker auto-generates filters from its registered orchestrators and activities.");
console.log("Only matching work items will be dispatched to this worker.\n");

const client = new DurableTaskAzureManagedClientBuilder()
.endpoint(endpoint, taskHub, null)
.build();

// No explicit filters — they are auto-generated from addOrchestrator/addActivity
const worker = new DurableTaskAzureManagedWorkerBuilder()
.endpoint(endpoint, taskHub, null)
.addOrchestrator(greetingOrchestrator)
.addActivity(greet)
.build();

await worker.start();
console.log("Worker started with auto-generated filters for: greetingOrchestrator, greet");

const id = await client.scheduleNewOrchestration(greetingOrchestrator, "Auto-Filters");
console.log(`Scheduled orchestration: ${id}`);

const state = await client.waitForOrchestrationCompletion(id, undefined, 30);
console.log(`Result: ${state?.serializedOutput}`);

await worker.stop();
await client.stop();
}

async function runWithExplicitFilters() {
console.log("\n=== Scenario 2: Explicit Filters ===");
console.log("The worker uses explicitly provided filters instead of auto-generating them.");
console.log("This is useful when you want fine-grained control over which work items to accept.\n");

const client = new DurableTaskAzureManagedClientBuilder()
.endpoint(endpoint, taskHub, null)
.build();

// Provide explicit filters — these override auto-generation
const filters: WorkItemFilters = {
orchestrations: [{ name: "mathOrchestrator" }],
activities: [{ name: "add" }],
};

const worker = new DurableTaskAzureManagedWorkerBuilder()
.endpoint(endpoint, taskHub, null)
.addOrchestrator(mathOrchestrator)
.addActivity(add)
.useWorkItemFilters(filters)
.build();

await worker.start();
console.log("Worker started with explicit filters for: mathOrchestrator, add");

const id = await client.scheduleNewOrchestration(mathOrchestrator, { a: 17, b: 25 });
console.log(`Scheduled orchestration: ${id}`);

const state = await client.waitForOrchestrationCompletion(id, undefined, 30);
console.log(`Result: ${state?.serializedOutput}`);

await worker.stop();
await client.stop();
}

// ============================================================================
// Step 3: Run all scenarios
// ============================================================================

(async () => {
console.log(`Connecting to DTS emulator at ${endpoint}, taskHub: ${taskHub}`);

try {
await runWithAutoGeneratedFilters();
await runWithExplicitFilters();

console.log("\n=== All scenarios completed successfully! ===");
} catch (error) {
console.error("Error:", error);
process.exit(1);
}

process.exit(0);
})();
17 changes: 17 additions & 0 deletions examples/work-item-filters/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"name": "work-item-filters-example",
"version": "1.0.0",
"description": "Example demonstrating work item filters for Durable Task workers",
"private": true,
"scripts": {
"start": "ts-node --swc index.ts",
"start:emulator": "ENDPOINT=localhost:8080 TASKHUB=default ts-node --swc index.ts"
},
"dependencies": {
"@microsoft/durabletask-js": "workspace:*",
"@microsoft/durabletask-js-azuremanaged": "workspace:*"
},
"engines": {
"node": ">=22.0.0"
}
}
2 changes: 1 addition & 1 deletion internal/protocol/SOURCE_COMMIT
Original file line number Diff line number Diff line change
@@ -1 +1 @@
026329c53fe6363985655857b9ca848ec7238bd2
1caadbd7ecfdf5f2309acbeac28a3e36d16aa156
21 changes: 21 additions & 0 deletions internal/protocol/protos/orchestrator_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,7 @@ message GetWorkItemsRequest {
int32 maxConcurrentEntityWorkItems = 3;

repeated WorkerCapability capabilities = 10;
WorkItemFilters workItemFilters = 11;
}

enum WorkerCapability {
Expand All @@ -844,6 +845,26 @@ enum WorkerCapability {
WORKER_CAPABILITY_LARGE_PAYLOADS = 3;
}

message WorkItemFilters {
repeated OrchestrationFilter orchestrations = 1;
repeated ActivityFilter activities = 2;
repeated EntityFilter entities = 3;
}

message OrchestrationFilter {
string name = 1;
repeated string versions = 2;
}

message ActivityFilter {
string name = 1;
repeated string versions = 2;
}

message EntityFilter {
string name = 1;
}

message WorkItem {
oneof request {
OrchestratorRequest orchestratorRequest = 1;
Expand Down
18 changes: 18 additions & 0 deletions packages/durabletask-js-azuremanaged/src/worker-builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
Logger,
ConsoleLogger,
VersioningOptions,
WorkItemFilters,
} from "@microsoft/durabletask-js";

/**
Expand All @@ -27,6 +28,7 @@ export class DurableTaskAzureManagedWorkerBuilder {
private _logger: Logger = new ConsoleLogger();
private _shutdownTimeoutMs?: number;
private _versioning?: VersioningOptions;
private _workItemFilters?: WorkItemFilters | null;

/**
* Creates a new instance of DurableTaskAzureManagedWorkerBuilder.
Expand Down Expand Up @@ -220,6 +222,21 @@ export class DurableTaskAzureManagedWorkerBuilder {
return this;
}

/**
* Sets work item filters for the worker.
* When provided, the sidecar will only send work items matching these filters.
* Pass null to explicitly disable filtering and receive all work items.
* When not called, filters are auto-generated from the registered orchestrations,
* activities, and entities.
*
* @param filters The work item filters, or null to disable filtering.
* @returns This builder instance.
*/
useWorkItemFilters(filters: WorkItemFilters | null): DurableTaskAzureManagedWorkerBuilder {
this._workItemFilters = filters;
return this;
}

/**
* Builds and returns a configured TaskHubGrpcWorker.
*
Expand Down Expand Up @@ -251,6 +268,7 @@ export class DurableTaskAzureManagedWorkerBuilder {
logger: this._logger,
shutdownTimeoutMs: this._shutdownTimeoutMs,
versioning: this._versioning,
workItemFilters: this._workItemFilters,
});

// Register all orchestrators
Expand Down
8 changes: 8 additions & 0 deletions packages/durabletask-js/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@
export { TaskHubGrpcClient, TaskHubGrpcClientOptions, MetadataGenerator } from "./client/client";
export { TaskHubGrpcWorker, TaskHubGrpcWorkerOptions } from "./worker/task-hub-grpc-worker";
export { VersioningOptions, VersionMatchStrategy, VersionFailureStrategy } from "./worker/versioning-options";
export {
WorkItemFilters,
OrchestrationWorkItemFilter,
ActivityWorkItemFilter,
EntityWorkItemFilter,
generateWorkItemFiltersFromRegistry,
toGrpcWorkItemFilters,
} from "./worker/work-item-filters";

// Contexts
export { OrchestrationContext } from "./task/context/orchestration-context";
Expand Down
108 changes: 108 additions & 0 deletions packages/durabletask-js/src/proto/orchestrator_service_pb.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3661,6 +3661,11 @@ export class GetWorkItemsRequest extends jspb.Message {
setCapabilitiesList(value: Array<WorkerCapability>): GetWorkItemsRequest;
addCapabilities(value: WorkerCapability, index?: number): WorkerCapability;

hasWorkitemfilters(): boolean;
clearWorkitemfilters(): void;
getWorkitemfilters(): WorkItemFilters | undefined;
setWorkitemfilters(value?: WorkItemFilters): GetWorkItemsRequest;

serializeBinary(): Uint8Array;
toObject(includeInstance?: boolean): GetWorkItemsRequest.AsObject;
static toObject(includeInstance: boolean, msg: GetWorkItemsRequest): GetWorkItemsRequest.AsObject;
Expand All @@ -3677,6 +3682,109 @@ export namespace GetWorkItemsRequest {
maxconcurrentactivityworkitems: number,
maxconcurrententityworkitems: number,
capabilitiesList: Array<WorkerCapability>,
workitemfilters?: WorkItemFilters.AsObject,
}
}

export class WorkItemFilters extends jspb.Message {
clearOrchestrationsList(): void;
getOrchestrationsList(): Array<OrchestrationFilter>;
setOrchestrationsList(value: Array<OrchestrationFilter>): WorkItemFilters;
addOrchestrations(value?: OrchestrationFilter, index?: number): OrchestrationFilter;
clearActivitiesList(): void;
getActivitiesList(): Array<ActivityFilter>;
setActivitiesList(value: Array<ActivityFilter>): WorkItemFilters;
addActivities(value?: ActivityFilter, index?: number): ActivityFilter;
clearEntitiesList(): void;
getEntitiesList(): Array<EntityFilter>;
setEntitiesList(value: Array<EntityFilter>): WorkItemFilters;
addEntities(value?: EntityFilter, index?: number): EntityFilter;

serializeBinary(): Uint8Array;
toObject(includeInstance?: boolean): WorkItemFilters.AsObject;
static toObject(includeInstance: boolean, msg: WorkItemFilters): WorkItemFilters.AsObject;
static extensions: {[key: number]: jspb.ExtensionFieldInfo<jspb.Message>};
static extensionsBinary: {[key: number]: jspb.ExtensionFieldBinaryInfo<jspb.Message>};
static serializeBinaryToWriter(message: WorkItemFilters, writer: jspb.BinaryWriter): void;
static deserializeBinary(bytes: Uint8Array): WorkItemFilters;
static deserializeBinaryFromReader(message: WorkItemFilters, reader: jspb.BinaryReader): WorkItemFilters;
}

export namespace WorkItemFilters {
export type AsObject = {
orchestrationsList: Array<OrchestrationFilter.AsObject>,
activitiesList: Array<ActivityFilter.AsObject>,
entitiesList: Array<EntityFilter.AsObject>,
}
}

export class OrchestrationFilter extends jspb.Message {
getName(): string;
setName(value: string): OrchestrationFilter;
clearVersionsList(): void;
getVersionsList(): Array<string>;
setVersionsList(value: Array<string>): OrchestrationFilter;
addVersions(value: string, index?: number): string;

serializeBinary(): Uint8Array;
toObject(includeInstance?: boolean): OrchestrationFilter.AsObject;
static toObject(includeInstance: boolean, msg: OrchestrationFilter): OrchestrationFilter.AsObject;
static extensions: {[key: number]: jspb.ExtensionFieldInfo<jspb.Message>};
static extensionsBinary: {[key: number]: jspb.ExtensionFieldBinaryInfo<jspb.Message>};
static serializeBinaryToWriter(message: OrchestrationFilter, writer: jspb.BinaryWriter): void;
static deserializeBinary(bytes: Uint8Array): OrchestrationFilter;
static deserializeBinaryFromReader(message: OrchestrationFilter, reader: jspb.BinaryReader): OrchestrationFilter;
}

export namespace OrchestrationFilter {
export type AsObject = {
name: string,
versionsList: Array<string>,
}
}

export class ActivityFilter extends jspb.Message {
getName(): string;
setName(value: string): ActivityFilter;
clearVersionsList(): void;
getVersionsList(): Array<string>;
setVersionsList(value: Array<string>): ActivityFilter;
addVersions(value: string, index?: number): string;

serializeBinary(): Uint8Array;
toObject(includeInstance?: boolean): ActivityFilter.AsObject;
static toObject(includeInstance: boolean, msg: ActivityFilter): ActivityFilter.AsObject;
static extensions: {[key: number]: jspb.ExtensionFieldInfo<jspb.Message>};
static extensionsBinary: {[key: number]: jspb.ExtensionFieldBinaryInfo<jspb.Message>};
static serializeBinaryToWriter(message: ActivityFilter, writer: jspb.BinaryWriter): void;
static deserializeBinary(bytes: Uint8Array): ActivityFilter;
static deserializeBinaryFromReader(message: ActivityFilter, reader: jspb.BinaryReader): ActivityFilter;
}

export namespace ActivityFilter {
export type AsObject = {
name: string,
versionsList: Array<string>,
}
}

export class EntityFilter extends jspb.Message {
getName(): string;
setName(value: string): EntityFilter;

serializeBinary(): Uint8Array;
toObject(includeInstance?: boolean): EntityFilter.AsObject;
static toObject(includeInstance: boolean, msg: EntityFilter): EntityFilter.AsObject;
static extensions: {[key: number]: jspb.ExtensionFieldInfo<jspb.Message>};
static extensionsBinary: {[key: number]: jspb.ExtensionFieldBinaryInfo<jspb.Message>};
static serializeBinaryToWriter(message: EntityFilter, writer: jspb.BinaryWriter): void;
static deserializeBinary(bytes: Uint8Array): EntityFilter;
static deserializeBinaryFromReader(message: EntityFilter, reader: jspb.BinaryReader): EntityFilter;
}

export namespace EntityFilter {
export type AsObject = {
name: string,
}
}

Expand Down
Loading
Loading