Skip to content

Commit 4c39801

Browse files
Fix time skipping for dotnet SDK
1 parent 20fb852 commit 4c39801

5 files changed

Lines changed: 415 additions & 309 deletions

File tree

temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableState.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@ void startNexusOperation(
107107

108108
void cancelNexusOperationRequestAcknowledge(NexusOperationRef ref);
109109

110+
void failNexusOperationCancelRequest(NexusOperationRef ref, Failure failure);
111+
110112
void completeNexusOperation(NexusOperationRef ref, Payload result);
111113

112114
void completeAsyncNexusOperation(

temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java

Lines changed: 79 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,10 @@ private interface UpdateProcedure {
105105
private final Map<String, Long> activityById = new HashMap<>();
106106
private final Map<Long, StateMachine<ChildWorkflowData>> childWorkflows = new HashMap<>();
107107
private final Map<Long, StateMachine<NexusOperationData>> nexusOperations = new HashMap<>();
108+
// Tracks cancelRequestedEventId by scheduledEventId, persists after operation removal.
109+
private final Map<Long, Long> nexusCancelRequestedEventIds = new HashMap<>();
110+
// Tracks scheduledEventIds of nexus cancel requests that have not yet received a response.
111+
private final Set<Long> unresolvedNexusCancelRequests = new HashSet<>();
108112
private final Map<String, StateMachine<TimerData>> timers = new HashMap<>();
109113
private final Map<String, StateMachine<SignalExternalData>> externalSignals = new HashMap<>();
110114
private final Map<String, StateMachine<CancelExternalData>> externalCancellations =
@@ -486,10 +490,13 @@ public void completeWorkflowTask(
486490
.asRuntimeException();
487491
}
488492

489-
if (unhandledCommand(request) || unhandledMessages(request)) {
493+
if (unhandledCommand(request)
494+
|| unhandledMessages(request)
495+
|| hasUnresolvedNexusCancelWithCompletion(request)) {
490496
// Fail the workflow task if there are new events or messages and a command tries to
491-
// complete the workflow. Record the failure in history, then throw an error to the
492-
// caller (matching real server behavior).
497+
// complete the workflow, or if there are unresolved nexus cancel requests. Record the
498+
// failure in history, then throw an error to the caller (matching real server
499+
// behavior).
493500
failWorkflowTaskWithAReason(
494501
WorkflowTaskFailedCause.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_COMMAND,
495502
null,
@@ -673,6 +680,12 @@ private boolean unhandledMessages(RespondWorkflowTaskCompletedRequest request) {
673680
&& hasCompletionCommand(request.getCommandsList()));
674681
}
675682

683+
private boolean hasUnresolvedNexusCancelWithCompletion(
684+
RespondWorkflowTaskCompletedRequest request) {
685+
return !unresolvedNexusCancelRequests.isEmpty()
686+
&& hasCompletionCommand(request.getCommandsList());
687+
}
688+
676689
private boolean hasCompletionCommand(List<Command> commands) {
677690
for (Command command : commands) {
678691
if (WorkflowExecutionUtils.isWorkflowExecutionCompleteCommand(command)) {
@@ -899,6 +912,12 @@ private void processRequestCancelNexusOperation(
899912
ctx.setNeedWorkflowTask(true);
900913
} else {
901914
operation.action(Action.REQUEST_CANCELLATION, ctx, null, workflowTaskCompletedId);
915+
ctx.onCommit(
916+
historySize -> {
917+
nexusCancelRequestedEventIds.put(
918+
scheduleEventId, operation.getData().cancelRequestedEventId);
919+
unresolvedNexusCancelRequests.add(scheduleEventId);
920+
});
902921
ctx.addTimer(
903922
ProtobufTimeUtils.toJavaDuration(operation.getData().requestTimeout),
904923
() ->
@@ -2339,6 +2358,10 @@ public void startNexusOperation(
23392358
update(
23402359
ctx -> {
23412360
StateMachine<NexusOperationData> operation = getPendingNexusOperation(scheduledEventId);
2361+
if (operation.getState() == State.STARTED) {
2362+
// Operation was already started (e.g. from a previous attempt before retry).
2363+
return;
2364+
}
23422365
operation.action(StateMachines.Action.START, ctx, resp, 0);
23432366
operation.getData().identity = clientIdentity;
23442367

@@ -2378,13 +2401,30 @@ public void cancelNexusOperation(NexusOperationRef ref, Failure failure) {
23782401
});
23792402
}
23802403

2404+
/**
2405+
* Resolves the cancelRequestedEventId for a nexus operation, checking both the active operations
2406+
* map and the persisted cancel request IDs (for operations that have already completed/removed).
2407+
*/
2408+
private long resolveCancelRequestedEventId(long scheduledEventId) {
2409+
StateMachine<NexusOperationData> operation = nexusOperations.get(scheduledEventId);
2410+
if (operation != null) {
2411+
return operation.getData().cancelRequestedEventId;
2412+
}
2413+
Long stored = nexusCancelRequestedEventIds.get(scheduledEventId);
2414+
return stored != null ? stored : 0;
2415+
}
2416+
23812417
@Override
23822418
public void cancelNexusOperationRequestAcknowledge(NexusOperationRef ref) {
23832419
update(
23842420
ctx -> {
23852421
StateMachine<NexusOperationData> operation =
2386-
getPendingNexusOperation(ref.getScheduledEventId());
2387-
if (!operationInFlight(operation.getState())) {
2422+
nexusOperations.get(ref.getScheduledEventId());
2423+
if (operation != null && !operationInFlight(operation.getState())) {
2424+
return;
2425+
}
2426+
long cancelRequestedEventId = resolveCancelRequestedEventId(ref.getScheduledEventId());
2427+
if (cancelRequestedEventId == 0) {
23882428
return;
23892429
}
23902430
ctx.addEvent(
@@ -2393,12 +2433,39 @@ public void cancelNexusOperationRequestAcknowledge(NexusOperationRef ref) {
23932433
.setNexusOperationCancelRequestCompletedEventAttributes(
23942434
NexusOperationCancelRequestCompletedEventAttributes.newBuilder()
23952435
.setScheduledEventId(ref.getScheduledEventId())
2396-
.setRequestedEventId(operation.getData().cancelRequestedEventId))
2436+
.setRequestedEventId(cancelRequestedEventId))
23972437
.build());
2438+
ctx.onCommit(
2439+
historySize -> unresolvedNexusCancelRequests.remove(ref.getScheduledEventId()));
2440+
scheduleWorkflowTask(ctx);
23982441
ctx.unlockTimer("cancelNexusOperationRequestAcknowledge");
23992442
});
24002443
}
24012444

2445+
@Override
2446+
public void failNexusOperationCancelRequest(NexusOperationRef ref, Failure failure) {
2447+
update(
2448+
ctx -> {
2449+
long cancelRequestedEventId = resolveCancelRequestedEventId(ref.getScheduledEventId());
2450+
if (cancelRequestedEventId == 0) {
2451+
return;
2452+
}
2453+
ctx.addEvent(
2454+
HistoryEvent.newBuilder()
2455+
.setEventType(EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUEST_FAILED)
2456+
.setNexusOperationCancelRequestFailedEventAttributes(
2457+
NexusOperationCancelRequestFailedEventAttributes.newBuilder()
2458+
.setScheduledEventId(ref.getScheduledEventId())
2459+
.setRequestedEventId(cancelRequestedEventId)
2460+
.setFailure(failure))
2461+
.build());
2462+
ctx.onCommit(
2463+
historySize -> unresolvedNexusCancelRequests.remove(ref.getScheduledEventId()));
2464+
scheduleWorkflowTask(ctx);
2465+
ctx.unlockTimer("failNexusOperationCancelRequest");
2466+
});
2467+
}
2468+
24022469
@Override
24032470
public void completeNexusOperation(NexusOperationRef ref, Payload result) {
24042471
update(
@@ -2471,6 +2538,8 @@ private void timeoutNexusOperation(
24712538
}
24722539
operation.action(StateMachines.Action.TIME_OUT, ctx, timeoutType, 0);
24732540
nexusOperations.remove(scheduledEventId);
2541+
// The cancel response won't matter after a timeout, so clear the unresolved cancel.
2542+
unresolvedNexusCancelRequests.remove(scheduledEventId);
24742543
scheduleWorkflowTask(ctx);
24752544
});
24762545
} catch (StatusRuntimeException e) {
@@ -2496,7 +2565,8 @@ private void timeoutNexusRequest(long scheduledEventId, String requestMethod, in
24962565
ctx -> {
24972566
StateMachine<NexusOperationData> operation = getPendingNexusOperation(scheduledEventId);
24982567
if (attempt != operation.getData().getAttempt()
2499-
|| isTerminalState(operation.getState())) {
2568+
|| isTerminalState(operation.getState())
2569+
|| operation.getState() == State.STARTED) {
25002570
throw Status.NOT_FOUND.withDescription("Timer fired earlier").asRuntimeException();
25012571
}
25022572

@@ -2510,6 +2580,8 @@ private void timeoutNexusRequest(long scheduledEventId, String requestMethod, in
25102580

25112581
if (isTerminalState(operation.getState())) {
25122582
nexusOperations.remove(scheduledEventId);
2583+
// Cancel response won't arrive after terminal state, unblock workflow completion.
2584+
unresolvedNexusCancelRequests.remove(scheduledEventId);
25132585
scheduleWorkflowTask(ctx);
25142586
} else {
25152587
retryNexusTask(ctx, operation);

temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1048,9 +1048,30 @@ public void respondNexusTaskCompleted(
10481048
Failure.Builder b = Failure.newBuilder().setMessage(opError.getFailure().getMessage());
10491049

10501050
if (startResp.getOperationError().getOperationState().equals("canceled")) {
1051-
b.setCanceledFailureInfo(
1052-
CanceledFailureInfo.newBuilder()
1053-
.setDetails(nexusFailureMetadataToPayloads(opError.getFailure())));
1051+
b.setCanceledFailureInfo(CanceledFailureInfo.getDefaultInstance());
1052+
// Parse the nexus failure details as a Failure proto for the cause chain,
1053+
// without overwriting the message (unlike nexusFailureToAPIFailure).
1054+
io.temporal.api.nexus.v1.Failure nexusFailure = opError.getFailure();
1055+
if (nexusFailure.getMetadataMap().containsKey("type")
1056+
&& nexusFailure.getMetadataMap().get("type").equals(FAILURE_TYPE_STRING)) {
1057+
try {
1058+
Failure.Builder causeBuilder = Failure.newBuilder();
1059+
JSON_PARSER.merge(nexusFailure.getDetails().toString(UTF_8), causeBuilder);
1060+
Failure parsedCause = causeBuilder.build();
1061+
// If the parsed failure itself has CanceledFailureInfo, unwrap it
1062+
// since the outer failure already has CanceledFailureInfo set.
1063+
if (parsedCause.hasCanceledFailureInfo()) {
1064+
b.setCanceledFailureInfo(parsedCause.getCanceledFailureInfo());
1065+
if (parsedCause.hasCause()) {
1066+
b.setCause(parsedCause.getCause());
1067+
}
1068+
} else {
1069+
b.setCause(parsedCause);
1070+
}
1071+
} catch (InvalidProtocolBufferException e) {
1072+
throw new RuntimeException(e);
1073+
}
1074+
}
10541075
mutableState.cancelNexusOperation(tt.getOperationRef(), b.build());
10551076
} else {
10561077
mutableState.failNexusOperation(
@@ -1096,8 +1117,12 @@ public void respondNexusTaskFailed(
10961117
NexusTaskToken tt = NexusTaskToken.fromBytes(request.getTaskToken());
10971118
TestWorkflowMutableState mutableState =
10981119
getMutableState(tt.getOperationRef().getExecutionId());
1099-
if (mutableState.validateOperationTaskToken(tt)) {
1100-
Failure failure = handlerErrorToFailure(request.getError());
1120+
Failure failure = handlerErrorToFailure(request.getError());
1121+
if (tt.isCancel()) {
1122+
// For cancel failures, the operation may already be completed/removed,
1123+
// so skip token validation and record the event directly.
1124+
mutableState.failNexusOperationCancelRequest(tt.getOperationRef(), failure);
1125+
} else if (mutableState.validateOperationTaskToken(tt)) {
11011126
mutableState.failNexusOperation(tt.getOperationRef(), failure);
11021127
}
11031128
responseObserver.onNext(RespondNexusTaskFailedResponse.getDefaultInstance());
@@ -1171,6 +1196,7 @@ public void completeNexusOperation(
11711196

11721197
private static Failure handlerErrorToFailure(HandlerError err) {
11731198
return Failure.newBuilder()
1199+
.setMessage(err.getFailure().getMessage())
11741200
.setNexusHandlerFailureInfo(
11751201
NexusHandlerFailureInfo.newBuilder()
11761202
.setType(err.getErrorType())

0 commit comments

Comments
 (0)