diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java b/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java index 896fdc0762..3307d4c2f6 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java @@ -103,7 +103,9 @@ public WorkflowStartOutput start(WorkflowStartInput input) { } } if (CurrentNexusOperationContext.isNexusContext()) { - CurrentNexusOperationContext.get().setStartWorkflowResponseLink(response.getLink()); + // Auto-capture the start-workflow backlink so the task handler drains it onto the + // StartOperationResponse, the same path used for signal/signalWithStart responses. + CurrentNexusOperationContext.get().addBacklink(response.getLink()); } return new WorkflowStartOutput(execution); } @@ -120,6 +122,13 @@ public WorkflowSignalOutput signal(WorkflowSignalInput input) { .setRequestId(UUID.randomUUID().toString()) .setHeader(HeaderUtils.toHeaderGrpc(input.getHeader(), null)); + // If this signal is being issued from inside a Nexus operation handler, forward the inbound + // Nexus task links so the SignalWorkflowExecution history event links back to the caller. + boolean inNexusContext = CurrentNexusOperationContext.isNexusContext(); + if (inNexusContext) { + request.addAllLinks(CurrentNexusOperationContext.get().getNexusOperationLinks()); + } + DataConverter dataConverterWitSignalContext = clientOptions .getDataConverter() @@ -129,7 +138,12 @@ public WorkflowSignalOutput signal(WorkflowSignalInput input) { Optional inputArgs = dataConverterWitSignalContext.toPayloads(input.getArguments()); inputArgs.ifPresent(request::setInput); - genericClient.signal(request.build()); + SignalWorkflowExecutionResponse response = genericClient.signal(request.build()); + // Server >=1.31 with EnableCHASMSignalBacklinks returns a backlink pointing at the signal + // event; older servers leave it unset. Propagate when present. + if (inNexusContext && response.hasLink()) { + CurrentNexusOperationContext.get().addBacklink(response.getLink()); + } return new WorkflowSignalOutput(); } @@ -148,17 +162,31 @@ public WorkflowSignalWithStartOutput signalWithStart(WorkflowSignalWithStartInpu Optional signalInput = dataConverterWithWorkflowContext.toPayloads(input.getSignalArguments()); - SignalWithStartWorkflowExecutionRequest request = - requestsHelper - .newSignalWithStartWorkflowExecutionRequest( - startRequest, input.getSignalName(), signalInput.orElse(null)) - .build(); + SignalWithStartWorkflowExecutionRequest.Builder requestBuilder = + requestsHelper.newSignalWithStartWorkflowExecutionRequest( + startRequest, input.getSignalName(), signalInput.orElse(null)); + // If this signalWithStart is being issued from inside a Nexus operation handler, forward + // the inbound Nexus task links so both the WorkflowExecutionStarted and + // WorkflowExecutionSignaled events on the callee link back to the caller. + boolean inNexusContext = CurrentNexusOperationContext.isNexusContext(); + if (inNexusContext) { + requestBuilder.addAllLinks(CurrentNexusOperationContext.get().getNexusOperationLinks()); + } else { + log.debug( + "signalWithStart RPC issued outside a Nexus operation context; no link propagation"); + } + SignalWithStartWorkflowExecutionRequest request = requestBuilder.build(); SignalWithStartWorkflowExecutionResponse response = genericClient.signalWithStart(request); WorkflowExecution execution = WorkflowExecution.newBuilder() .setRunId(response.getRunId()) .setWorkflowId(request.getWorkflowId()) .build(); + // Server >=1.31 with EnableCHASMSignalBacklinks returns a backlink pointing at the signal + // event; older servers leave it unset. Propagate when present. + if (inNexusContext && response.hasSignalLink()) { + CurrentNexusOperationContext.get().addBacklink(response.getSignalLink()); + } // TODO currently SignalWithStartWorkflowExecutionResponse doesn't have eagerWorkflowTask. // We should wire it when it's implemented server-side. return new WorkflowSignalWithStartOutput(new WorkflowStartOutput(execution)); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClient.java b/temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClient.java index 317c2300b9..e4e313f578 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClient.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClient.java @@ -10,7 +10,7 @@ public interface GenericWorkflowClient { StartWorkflowExecutionResponse start(StartWorkflowExecutionRequest request); - void signal(SignalWorkflowExecutionRequest request); + SignalWorkflowExecutionResponse signal(SignalWorkflowExecutionRequest request); SignalWithStartWorkflowExecutionResponse signalWithStart( SignalWithStartWorkflowExecutionRequest request); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClientImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClientImpl.java index 58ad1e8f12..2b8e31c314 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClientImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClientImpl.java @@ -61,13 +61,13 @@ private static Map tagsForStartWorkflow(StartWorkflowExecutionRe } @Override - public void signal(SignalWorkflowExecutionRequest request) { + public SignalWorkflowExecutionResponse signal(SignalWorkflowExecutionRequest request) { Map tags = new ImmutableMap.Builder(1) .put(MetricsTag.SIGNAL_NAME, request.getSignalName()) .build(); Scope scope = metricsScope.tagged(tags); - grpcRetryer.retry( + return grpcRetryer.retryWithResult( () -> service .blockingStub() diff --git a/temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java b/temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java index 4c5ec49b12..3e8e4bd2fb 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java @@ -22,14 +22,11 @@ import io.temporal.internal.nexus.OperationTokenUtil; import java.util.*; import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** Utility functions shared by the implementation code. */ public final class InternalUtils { public static String TEMPORAL_RESERVED_PREFIX = "__temporal_"; - private static final Logger log = LoggerFactory.getLogger(InternalUtils.class); private static String QUERY_TYPE_STACK_TRACE = "__stack_trace"; private static String ENHANCED_QUERY_TYPE_STACK_TRACE = "__enhanced_stack_trace"; @@ -94,19 +91,12 @@ public static NexusWorkflowStarter createNexusBoundStub( : request.getLinks().stream() .map( (link) -> { - if (io.temporal.api.common.v1.Link.WorkflowEvent.getDescriptor() - .getFullName() - .equals(link.getType())) { - io.temporal.api.nexus.v1.Link nexusLink = - io.temporal.api.nexus.v1.Link.newBuilder() - .setType(link.getType()) - .setUrl(link.getUri().toString()) - .build(); - return LinkConverter.nexusLinkToWorkflowEvent(nexusLink); - } else { - log.warn("ignoring unsupported link data type: {}", link.getType()); - return null; - } + io.temporal.api.nexus.v1.Link nexusLink = + io.temporal.api.nexus.v1.Link.newBuilder() + .setType(link.getType()) + .setUrl(link.getUri().toString()) + .build(); + return LinkConverter.nexusLinkToCommonLink(nexusLink); }) .filter(Objects::nonNull) .collect(Collectors.toList()); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/common/LinkConverter.java b/temporal-sdk/src/main/java/io/temporal/internal/common/LinkConverter.java index 6d270eec63..e785c46ec6 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/common/LinkConverter.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/common/LinkConverter.java @@ -6,6 +6,7 @@ import io.temporal.api.enums.v1.EventType; import java.io.UnsupportedEncodingException; import java.net.URI; +import java.net.URISyntaxException; import java.net.URLDecoder; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; @@ -20,6 +21,8 @@ public class LinkConverter { private static final Logger log = LoggerFactory.getLogger(LinkConverter.class); private static final String linkPathFormat = "temporal:///namespaces/%s/workflows/%s/%s/history"; + private static final String linkPathNexusOperationFormat = + "temporal:///namespaces/%s/nexus-operations/%s/%s/details"; private static final String linkReferenceTypeKey = "referenceType"; private static final String linkEventIDKey = "eventID"; private static final String linkEventTypeKey = "eventType"; @@ -30,6 +33,12 @@ public class LinkConverter { private static final String requestIDReferenceType = Link.WorkflowEvent.RequestIdReference.getDescriptor().getName(); + // Fully-qualified proto descriptor names used as the `type` field on nexus.v1.Link. Match the + // server's Nexus link converter so links round-trip cleanly across SDKs. + private static final String workflowEventType = Link.WorkflowEvent.getDescriptor().getFullName(); + private static final String nexusOperationType = + Link.NexusOperation.getDescriptor().getFullName(); + public static io.temporal.api.nexus.v1.Link workflowEventToNexusLink(Link.WorkflowEvent we) { try { @@ -160,6 +169,142 @@ public static Link nexusLinkToWorkflowEvent(io.temporal.api.nexus.v1.Link nexusL return link.build(); } + /** + * Encode a {@link Link.NexusOperation} (a link to a standalone Nexus operation) into the (url, + * type) form used on the Nexus wire. URL format matches the canonical server implementation: + * {@code temporal:///namespaces/{ns}/nexus-operations/{op_id}/{run_id}/details}. + */ + public static io.temporal.api.nexus.v1.Link nexusOperationToNexusLink(Link.NexusOperation no) { + try { + String url = + String.format( + linkPathNexusOperationFormat, + URLEncoder.encode(no.getNamespace(), StandardCharsets.UTF_8.toString()), + URLEncoder.encode(no.getOperationId(), StandardCharsets.UTF_8.toString()) + .replace("+", "%20"), + URLEncoder.encode(no.getRunId(), StandardCharsets.UTF_8.toString())); + return io.temporal.api.nexus.v1.Link.newBuilder() + .setUrl(url) + .setType(nexusOperationType) + .build(); + } catch (UnsupportedEncodingException e) { + log.error("Failed to encode NexusOperation Nexus link URL", e); + } + return null; + } + + /** + * Decode a {@code nexus.v1.Link} whose {@code type} is {@code Link.NexusOperation} into a {@code + * common.v1.Link} carrying a {@link Link.NexusOperation} variant. The URL must match the format + * produced by {@link #nexusOperationToNexusLink}. + */ + public static Link nexusLinkToNexusOperation(io.temporal.api.nexus.v1.Link nexusLink) { + try { + + // Lots of if statements, but this way we double-check the validity of the link + // passed in. + URI uri = new URI(nexusLink.getUrl()); + if (!"temporal".equals(uri.getScheme())) { + log.error( + "Failed to parse NexusOperation Nexus link URL: invalid scheme: {}", uri.getScheme()); + return null; + } + + StringTokenizer st = new StringTokenizer(uri.getRawPath(), "/"); + if (!st.hasMoreTokens() || !"namespaces".equals(st.nextToken())) { + log.error( + "Failed to parse NexusOperation Nexus link URL: invalid path: {}", uri.getRawPath()); + return null; + } + if (!st.hasMoreTokens()) { + log.error( + "Failed to parse NexusOperation Nexus link URL: invalid path: {}", uri.getRawPath()); + return null; + } + String namespace = URLDecoder.decode(st.nextToken(), StandardCharsets.UTF_8.toString()); + if (!st.hasMoreTokens() || !"nexus-operations".equals(st.nextToken())) { + log.error( + "Failed to parse NexusOperation Nexus link URL: invalid path: {}", uri.getRawPath()); + return null; + } + if (!st.hasMoreTokens()) { + log.error( + "Failed to parse NexusOperation Nexus link URL: invalid path: {}", uri.getRawPath()); + return null; + } + String operationId = URLDecoder.decode(st.nextToken(), StandardCharsets.UTF_8.toString()); + if (!st.hasMoreTokens()) { + log.error( + "Failed to parse NexusOperation Nexus link URL: invalid path: {}", uri.getRawPath()); + return null; + } + String runId = URLDecoder.decode(st.nextToken(), StandardCharsets.UTF_8.toString()); + if (!st.hasMoreTokens() || !"details".equals(st.nextToken())) { + log.error( + "Failed to parse NexusOperation Nexus link URL: invalid path: {}", uri.getRawPath()); + return null; + } + if (st.hasMoreTokens()) { + log.error( + "Failed to parse NexusOperation Nexus link URL: extra tokens after 'details': {}", + uri.getRawPath()); + return null; + } + + return Link.newBuilder() + .setNexusOperation( + Link.NexusOperation.newBuilder() + .setNamespace(namespace) + .setOperationId(operationId) + .setRunId(runId)) + .build(); + } catch (URISyntaxException | UnsupportedEncodingException e) { + log.error("Failed to parse NexusOperation Nexus link URL", e); + return null; + } + } + + /** + * Encode a {@code common.v1.Link} into the Nexus-wire {@code nexus.v1.Link} (url, type) form, + * dispatching on the link's variant. Returns {@code null} (with a warn log) for variants the SDK + * does not yet know how to encode ({@code Activity}, {@code BatchJob}, unset) — match the + * server's link-converter behavior so we stay in lockstep. + */ + public static io.temporal.api.nexus.v1.Link commonLinkToNexusLink(Link commonLink) { + switch (commonLink.getVariantCase()) { + case WORKFLOW_EVENT: + return workflowEventToNexusLink(commonLink.getWorkflowEvent()); + case NEXUS_OPERATION: + return nexusOperationToNexusLink(commonLink.getNexusOperation()); + default: + log.warn( + "Cannot encode common.v1.Link variant {} as nexus.v1.Link: no encoder implemented", + commonLink.getVariantCase()); + return null; + } + } + + /** + * Decode a Nexus-wire {@code nexus.v1.Link} (url, type) into a {@code common.v1.Link}, + * dispatching on the link's {@code type} field. Returns {@code null} (with a warn log) for types + * the SDK does not yet know how to decode. + */ + public static Link nexusLinkToCommonLink(io.temporal.api.nexus.v1.Link nexusLink) { + String type = nexusLink.getType(); + if (workflowEventType.equals(type)) { + return nexusLinkToWorkflowEvent(nexusLink); + } + if (nexusOperationType.equals(type)) { + return nexusLinkToNexusOperation(nexusLink); + } + log.warn( + "Cannot decode nexus.v1.Link of type '{}' to common.v1.Link:" + + " no decoder implemented (url='{}')", + type, + nexusLink.getUrl()); + return null; + } + private static Map parseQueryParams(URI uri) throws UnsupportedEncodingException { final String query = uri.getQuery(); if (query == null || query.isEmpty()) { diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/InternalNexusOperationContext.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/InternalNexusOperationContext.java index d7306ea968..105117fb19 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/nexus/InternalNexusOperationContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/InternalNexusOperationContext.java @@ -6,6 +6,9 @@ import io.temporal.common.interceptors.NexusOperationOutboundCallsInterceptor; import io.temporal.nexus.NexusOperationContext; import io.temporal.nexus.NexusOperationInfo; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; public class InternalNexusOperationContext { private final String namespace; @@ -14,7 +17,23 @@ public class InternalNexusOperationContext { private final Scope metricScope; private final WorkflowClient client; NexusOperationOutboundCallsInterceptor outboundCalls; - Link startWorkflowResponseLink; + // Links extracted from the inbound Nexus task. Stored once at the task-handler boundary so the + // workflow client (signal, signalWithStart) can attach them to outgoing requests via + // SignalWorkflowExecutionRequest.links. + private List nexusOperationLinks = Collections.emptyList(); + // Backlinks returned by outbound RPCs the operation handler issues (currently + // SignalWorkflowExecutionResponse.link and SignalWithStartWorkflowExecutionResponse.signal_link). + // One entry per outbound RPC that returned a link. Drained by the task handler when building + // StartOperationResponse so each RPC the handler issued gets a corresponding link on the caller + // workflow's history event. + // + // This context is only safe for use from the single thread that runs the operation handler (the + // Nexus task executor's thread). The mutators below assert this contract; a stray cross-thread + // call fails fast rather than silently corrupting the ArrayList. + private final List responseBacklinks = new ArrayList<>(); + // Captured at construction (on the Nexus task executor's thread) and used to fail fast on any + // cross-thread mutation. See note on responseBacklinks. + private final Thread ownerThread; public InternalNexusOperationContext( String namespace, @@ -27,6 +46,7 @@ public InternalNexusOperationContext( this.endpoint = endpoint; this.metricScope = metricScope; this.client = client; + this.ownerThread = Thread.currentThread(); } public Scope getMetricsScope() { @@ -60,12 +80,36 @@ public NexusOperationContext getUserFacingContext() { return new NexusOperationContextImpl(); } - public void setStartWorkflowResponseLink(Link link) { - this.startWorkflowResponseLink = link; + /** + * Set the {@code common.v1.Link}s extracted from the inbound Nexus task so they can be attached + * to RPCs issued by the operation handler. + */ + public void setNexusOperationLinks(List links) { + this.nexusOperationLinks = links == null ? Collections.emptyList() : links; } - public Link getStartWorkflowResponseLink() { - return startWorkflowResponseLink; + /** Links from the inbound Nexus task; empty if none. Never null. */ + public List getNexusOperationLinks() { + return nexusOperationLinks; + } + + /** + * Append a backlink returned by an outbound RPC the operation handler issued (signal or + * signalWithStart). The task handler drains the list when building the operation's + * StartOperationResponse. + */ + public void addBacklink(Link link) { + if (link != null) { + this.responseBacklinks.add(link); + } + } + + /** + * Backlinks from every outbound RPC the handler issued. Never null; may be empty. Returned as an + * unmodifiable view; callers must not attempt to mutate. + */ + public List getBacklinks() { + return Collections.unmodifiableList(responseBacklinks); } private class NexusOperationContextImpl implements NexusOperationContext { diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java index 7f10ba8c62..3f3799ff52 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java @@ -20,6 +20,7 @@ import io.temporal.failure.CanceledFailure; import io.temporal.failure.TemporalFailure; import io.temporal.internal.common.InternalUtils; +import io.temporal.internal.common.LinkConverter; import io.temporal.internal.common.NexusUtil; import io.temporal.internal.worker.NexusTask; import io.temporal.internal.worker.NexusTaskHandler; @@ -284,6 +285,10 @@ private StartOperationResponse handleStartOperation( .setCallbackUrl(task.getCallback()) .setRequestId(task.getRequestId()); task.getCallbackHeaderMap().forEach(operationStartDetails::putCallbackHeader); + // Stash the inbound links in common.v1.Link form on the operation context so that signal + // RPCs issued by the handler (e.g. SignalWithStartWorkflow on the callee) can attach them + // to SignalWorkflowExecutionRequest.links. + List inboundCommonLinks = new ArrayList<>(); task.getLinksList() .forEach( link -> { @@ -296,7 +301,18 @@ private StartOperationResponse handleStartOperation( "Invalid link URL: " + link.getUrl(), e); } + // Convert each inbound nexus.v1.Link to common.v1.Link, dispatching on the link's + // type field (WorkflowEvent, NexusOperation, etc.). LinkConverter logs the warn for + // any unknown type and returns null. We don't throw as we know the URI is valid + // (If it wasn't, then nexusProtoLinkToLink would have already thrown) + // so this might just be a new link type and we don't want to + // break forward compatibility. + io.temporal.api.common.v1.Link commonLink = LinkConverter.nexusLinkToCommonLink(link); + if (commonLink != null) { + inboundCommonLinks.add(commonLink); + } }); + CurrentNexusOperationContext.get().setNexusOperationLinks(inboundCommonLinks); HandlerInputContent.Builder input = HandlerInputContent.newBuilder().setDataStream(task.getPayload().toByteString().newInput()); @@ -307,10 +323,32 @@ private StartOperationResponse handleStartOperation( try { OperationStartResult result = startOperation(context, operationStartDetails.build(), input.build()); + // If signal or signalWithStart RPCs the handler issued returned backlinks, propagate them + // to the caller so the caller workflow's history event links to each event on the callee. + // Same set of backlinks applies to both sync and async response variants. + List backlinks = new ArrayList<>(); + for (io.temporal.api.common.v1.Link backlink : + CurrentNexusOperationContext.get().getBacklinks()) { + io.temporal.api.nexus.v1.Link converted = LinkConverter.commonLinkToNexusLink(backlink); + if (converted != null) { + backlinks.add(converted); + } else { + // The SDK stashed this backlink itself in RootWorkflowClientInvoker; failing to + // re-encode it now means a LinkConverter regression or a malformed link from the + // server. Either is an SDK invariant violation worth shouting about (warn is too + // quiet — the caller's history event will be missing a link with no other diagnostic). + log.error( + "SDK-stashed backlink failed to re-encode as nexus.v1.Link; caller history will be" + + " missing a link. backlink={}", + backlink); + } + } + if (result.isSync()) { startResponseBuilder.setSyncSuccess( StartOperationResponse.Sync.newBuilder() .setPayload(Payload.parseFrom(result.getSyncResult().getDataBytes())) + .addAllLinks(backlinks) .build()); } else { startResponseBuilder.setAsyncSuccess( @@ -326,6 +364,7 @@ private StartOperationResponse handleStartOperation( .setUrl(link.getUri().toString()) .build()) .collect(Collectors.toList())) + .addAllLinks(backlinks) .build()); } } catch (OperationException e) { @@ -349,6 +388,8 @@ private StartOperationResponse handleStartOperation( new RuntimeException("Unknown operation state: " + e.getState())); } startResponseBuilder.setFailure(dataConverter.exceptionToFailure(temporalFailure)); + } catch (Throwable failure) { + convertKnownFailures(failure); } return startResponseBuilder.build(); } diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowRunOperationImpl.java b/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowRunOperationImpl.java index 537235ce53..4146de8eb7 100644 --- a/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowRunOperationImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowRunOperationImpl.java @@ -1,20 +1,13 @@ package io.temporal.nexus; -import static io.temporal.internal.common.LinkConverter.workflowEventToNexusLink; -import static io.temporal.internal.common.NexusUtil.nexusProtoLinkToLink; - import io.nexusrpc.handler.*; import io.nexusrpc.handler.OperationHandler; -import io.temporal.api.common.v1.Link; -import io.temporal.api.common.v1.WorkflowExecution; -import io.temporal.api.enums.v1.EventType; import io.temporal.client.WorkflowClient; import io.temporal.internal.client.NexusStartWorkflowRequest; import io.temporal.internal.client.NexusStartWorkflowResponse; import io.temporal.internal.nexus.CurrentNexusOperationContext; import io.temporal.internal.nexus.InternalNexusOperationContext; import io.temporal.internal.nexus.OperationTokenUtil; -import java.net.URISyntaxException; class WorkflowRunOperationImpl implements OperationHandler { private final WorkflowHandleFactory handleFactory; @@ -40,38 +33,9 @@ public OperationStartResult start( NexusStartWorkflowResponse nexusStartWorkflowResponse = handle.getInvoker().invoke(nexusRequest); - WorkflowExecution workflowExec = nexusStartWorkflowResponse.getWorkflowExecution(); - // If the start workflow response returned a link use it, otherwise - // create the link information about the new workflow and return to the caller. - Link.WorkflowEvent workflowEventLink = - nexusCtx.getStartWorkflowResponseLink().hasWorkflowEvent() - ? nexusCtx.getStartWorkflowResponseLink().getWorkflowEvent() - : null; - if (workflowEventLink == null) { - workflowEventLink = - Link.WorkflowEvent.newBuilder() - .setNamespace(nexusCtx.getNamespace()) - .setWorkflowId(workflowExec.getWorkflowId()) - .setRunId(workflowExec.getRunId()) - .setEventRef( - Link.WorkflowEvent.EventReference.newBuilder() - .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED)) - .build(); - } - io.temporal.api.nexus.v1.Link nexusLink = workflowEventToNexusLink(workflowEventLink); - // Attach the link to the operation result. - OperationStartResult.Builder result = - OperationStartResult.newAsyncBuilder(nexusStartWorkflowResponse.getOperationToken()); - if (nexusLink != null) { - try { - ctx.addLinks(nexusProtoLinkToLink(nexusLink)); - } catch (URISyntaxException e) { - // Not expected as the link is constructed by the SDK. - throw new HandlerException(HandlerException.ErrorType.INTERNAL, "failed to parse URI", e); - } - } - return result.build(); + return OperationStartResult.newAsyncBuilder(nexusStartWorkflowResponse.getOperationToken()) + .build(); } @Override diff --git a/temporal-sdk/src/test/java/io/temporal/internal/client/RootWorkflowClientInvokerLinkPropagationTest.java b/temporal-sdk/src/test/java/io/temporal/internal/client/RootWorkflowClientInvokerLinkPropagationTest.java new file mode 100644 index 0000000000..f7593755d1 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/internal/client/RootWorkflowClientInvokerLinkPropagationTest.java @@ -0,0 +1,265 @@ +package io.temporal.internal.client; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.uber.m3.tally.RootScopeBuilder; +import com.uber.m3.tally.Scope; +import io.temporal.api.common.v1.Link; +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.api.enums.v1.EventType; +import io.temporal.api.workflowservice.v1.SignalWithStartWorkflowExecutionRequest; +import io.temporal.api.workflowservice.v1.SignalWithStartWorkflowExecutionResponse; +import io.temporal.api.workflowservice.v1.SignalWorkflowExecutionRequest; +import io.temporal.api.workflowservice.v1.SignalWorkflowExecutionResponse; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowClientOptions; +import io.temporal.client.WorkflowOptions; +import io.temporal.common.interceptors.Header; +import io.temporal.common.interceptors.WorkflowClientCallsInterceptor.WorkflowSignalInput; +import io.temporal.common.interceptors.WorkflowClientCallsInterceptor.WorkflowSignalWithStartInput; +import io.temporal.common.interceptors.WorkflowClientCallsInterceptor.WorkflowStartInput; +import io.temporal.internal.client.external.GenericWorkflowClient; +import io.temporal.internal.nexus.CurrentNexusOperationContext; +import io.temporal.internal.nexus.InternalNexusOperationContext; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +/** + * Unit tests for {@link RootWorkflowClientInvoker#signal} link propagation in and out of the Nexus + * operation context. These run against mocked dependencies and exercise the code paths that the + * integration tests in {@code SignalOperationLinkingTest} can only cover when a real flag-enabled + * server is available. + */ +public class RootWorkflowClientInvokerLinkPropagationTest { + + private static final String NAMESPACE = "test-namespace"; + private static final String WORKFLOW_ID = "wf-target"; + + private GenericWorkflowClient genericClient; + private RootWorkflowClientInvoker invoker; + private InternalNexusOperationContext nexusCtx; + + @Before + public void setUp() { + genericClient = mock(GenericWorkflowClient.class); + invoker = + new RootWorkflowClientInvoker( + genericClient, + WorkflowClientOptions.newBuilder() + .setNamespace(NAMESPACE) + .validateAndBuildWithDefaults(), + new WorkerFactoryRegistry()); + Scope metricsScope = new RootScopeBuilder().reportEvery(com.uber.m3.util.Duration.ofMillis(10)); + nexusCtx = + new InternalNexusOperationContext( + NAMESPACE, "tq", "endpoint", metricsScope, mock(WorkflowClient.class)); + CurrentNexusOperationContext.set(nexusCtx); + } + + @After + public void tearDown() { + CurrentNexusOperationContext.unset(); + } + + /** + * Happy path against a flag-enabled server: inbound nexus links are forwarded onto the + * SignalWorkflowExecutionRequest, and the response's backlink is captured back onto the operation + * context. + */ + @Test + public void signalForwardsInboundLinksAndCapturesResponseBacklink() { + Link inboundLink = + workflowEventLink( + "caller-wf", "caller-run", EventType.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED); + nexusCtx.setNexusOperationLinks(Collections.singletonList(inboundLink)); + + Link responseLink = + workflowEventLink( + WORKFLOW_ID, "target-run", EventType.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED); + SignalWorkflowExecutionResponse response = + SignalWorkflowExecutionResponse.newBuilder().setLink(responseLink).build(); + when(genericClient.signal(any(SignalWorkflowExecutionRequest.class))).thenReturn(response); + + invoker.signal(newSignalInput()); + + // Forward direction: the request the SDK sent carries the inbound link. + ArgumentCaptor captor = + ArgumentCaptor.forClass(SignalWorkflowExecutionRequest.class); + org.mockito.Mockito.verify(genericClient).signal(captor.capture()); + SignalWorkflowExecutionRequest sent = captor.getValue(); + Assert.assertEquals("request should carry the single inbound link", 1, sent.getLinksCount()); + Assert.assertEquals(inboundLink, sent.getLinks(0)); + + // Backward direction: the response's link is now on the context for the task handler to read. + List captured = nexusCtx.getBacklinks(); + Assert.assertEquals("expected one captured backlink", 1, captured.size()); + Assert.assertEquals(responseLink, captured.get(0)); + } + + /** + * Older-server compatibility: the server returns a response without {@code link} set. The SDK + * must not crash and must leave the operation context's backlink list empty. + */ + @Test + public void signalAgainstOlderServerCapturesNoBacklink() { + Link inboundLink = + workflowEventLink( + "caller-wf", "caller-run", EventType.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED); + nexusCtx.setNexusOperationLinks(Collections.singletonList(inboundLink)); + + // Pre-1.31 server / flag-off server: response has no link. + SignalWorkflowExecutionResponse response = SignalWorkflowExecutionResponse.getDefaultInstance(); + when(genericClient.signal(any(SignalWorkflowExecutionRequest.class))).thenReturn(response); + + invoker.signal(newSignalInput()); + + // Forward direction still works regardless of server version. + ArgumentCaptor captor = + ArgumentCaptor.forClass(SignalWorkflowExecutionRequest.class); + org.mockito.Mockito.verify(genericClient).signal(captor.capture()); + Assert.assertEquals(1, captor.getValue().getLinksCount()); + + // Backward direction: no backlink captured because the server didn't send one. + Assert.assertTrue( + "expected no captured backlink when server returned no link", + nexusCtx.getBacklinks().isEmpty()); + } + + /** + * Multi-signal: two signal RPCs in a row each contribute a backlink; both must be captured in + * order on the context, ready for the task handler to drain into the operation response. + */ + @Test + public void multipleSignalsAccumulateAllBacklinks() { + Link firstResponseLink = + workflowEventLink("callee-a", "run-a", EventType.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED); + Link secondResponseLink = + workflowEventLink("callee-b", "run-b", EventType.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED); + when(genericClient.signal(any(SignalWorkflowExecutionRequest.class))) + .thenReturn(SignalWorkflowExecutionResponse.newBuilder().setLink(firstResponseLink).build()) + .thenReturn( + SignalWorkflowExecutionResponse.newBuilder().setLink(secondResponseLink).build()); + + invoker.signal(newSignalInput()); + invoker.signal(newSignalInput()); + + List captured = nexusCtx.getBacklinks(); + Assert.assertEquals( + "expected one backlink per signal call", + Arrays.asList(firstResponseLink, secondResponseLink), + captured); + } + + /** + * Happy-path mirror of {@link #signalForwardsInboundLinksAndCapturesResponseBacklink} but for + * {@code signalWithStart}. The forward direction must attach inbound links to {@link + * SignalWithStartWorkflowExecutionRequest#getLinksList}, and the backward direction must capture + * {@code response.signal_link} via the same backlink path. Different proto field name ({@code + * signal_link} vs {@code link}) and different code path inside {@link + * io.temporal.internal.client.RootWorkflowClientInvoker#signalWithStart} — a regression in only + * one branch would otherwise pass the plain-signal tests. + */ + @Test + public void signalWithStartForwardsInboundLinksAndCapturesResponseBacklink() { + Link inboundLink = + workflowEventLink( + "caller-wf", "caller-run", EventType.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED); + nexusCtx.setNexusOperationLinks(Collections.singletonList(inboundLink)); + + Link responseLink = + workflowEventLink( + WORKFLOW_ID, "target-run", EventType.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED); + SignalWithStartWorkflowExecutionResponse response = + SignalWithStartWorkflowExecutionResponse.newBuilder() + .setRunId("target-run") + .setSignalLink(responseLink) + .build(); + when(genericClient.signalWithStart(any(SignalWithStartWorkflowExecutionRequest.class))) + .thenReturn(response); + + invoker.signalWithStart(newSignalWithStartInput()); + + // Forward direction: the SignalWithStartWorkflowExecutionRequest carries the inbound link. + ArgumentCaptor captor = + ArgumentCaptor.forClass(SignalWithStartWorkflowExecutionRequest.class); + org.mockito.Mockito.verify(genericClient).signalWithStart(captor.capture()); + SignalWithStartWorkflowExecutionRequest sent = captor.getValue(); + Assert.assertEquals("request should carry the single inbound link", 1, sent.getLinksCount()); + Assert.assertEquals(inboundLink, sent.getLinks(0)); + + // Backward direction: response.signal_link is on the context for the task handler to read. + List captured = nexusCtx.getBacklinks(); + Assert.assertEquals("expected one captured backlink", 1, captured.size()); + Assert.assertEquals(responseLink, captured.get(0)); + } + + /** + * Mixed-RPC accumulation: a handler that issues one signal and one signalWithStart against the + * same context must end up with both backlinks captured, in call order. Guards against + * regressions where one of the two code paths stops appending to the same list. + */ + @Test + public void mixedSignalAndSignalWithStartAccumulateAllBacklinks() { + Link signalResponseLink = + workflowEventLink("callee-s", "run-s", EventType.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED); + Link signalWithStartResponseLink = + workflowEventLink( + "callee-sws", "run-sws", EventType.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED); + when(genericClient.signal(any(SignalWorkflowExecutionRequest.class))) + .thenReturn( + SignalWorkflowExecutionResponse.newBuilder().setLink(signalResponseLink).build()); + when(genericClient.signalWithStart(any(SignalWithStartWorkflowExecutionRequest.class))) + .thenReturn( + SignalWithStartWorkflowExecutionResponse.newBuilder() + .setRunId("run-sws") + .setSignalLink(signalWithStartResponseLink) + .build()); + + invoker.signal(newSignalInput()); + invoker.signalWithStart(newSignalWithStartInput()); + + Assert.assertEquals( + "expected one backlink each from signal and signalWithStart, in call order", + Arrays.asList(signalResponseLink, signalWithStartResponseLink), + nexusCtx.getBacklinks()); + } + + // ── helpers ────────────────────────────────────────────────────────────────────────────── + + private static WorkflowSignalInput newSignalInput() { + return new WorkflowSignalInput( + WorkflowExecution.newBuilder().setWorkflowId(WORKFLOW_ID).build(), + "test-signal", + Header.empty(), + new Object[] {"payload"}); + } + + private static WorkflowSignalWithStartInput newSignalWithStartInput() { + WorkflowOptions options = WorkflowOptions.newBuilder().setTaskQueue("tq").build(); + WorkflowStartInput startInput = + new WorkflowStartInput( + WORKFLOW_ID, "TestWorkflow", Header.empty(), new Object[] {}, options); + return new WorkflowSignalWithStartInput( + startInput, "test-signal", new Object[] {"signal-payload"}); + } + + private static Link workflowEventLink(String workflowId, String runId, EventType eventType) { + return Link.newBuilder() + .setWorkflowEvent( + Link.WorkflowEvent.newBuilder() + .setNamespace(NAMESPACE) + .setWorkflowId(workflowId) + .setRunId(runId) + .setEventRef( + Link.WorkflowEvent.EventReference.newBuilder().setEventType(eventType))) + .build(); + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/internal/common/InternalUtilsTest.java b/temporal-sdk/src/test/java/io/temporal/internal/common/InternalUtilsTest.java new file mode 100644 index 0000000000..a82a63c2b5 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/internal/common/InternalUtilsTest.java @@ -0,0 +1,103 @@ +package io.temporal.internal.common; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.uber.m3.tally.RootScopeBuilder; +import com.uber.m3.tally.Scope; +import io.nexusrpc.Link; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowOptions; +import io.temporal.client.WorkflowStub; +import io.temporal.internal.client.NexusStartWorkflowRequest; +import io.temporal.internal.nexus.CurrentNexusOperationContext; +import io.temporal.internal.nexus.InternalNexusOperationContext; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +public class InternalUtilsTest { + private static final String NAMESPACE = "test-namespace"; + + @Before + public void setUp() { + Scope metricsScope = new RootScopeBuilder().reportEvery(com.uber.m3.util.Duration.ofMillis(10)); + CurrentNexusOperationContext.set( + new InternalNexusOperationContext( + NAMESPACE, "tq", "endpoint", metricsScope, mock(WorkflowClient.class))); + } + + @After + public void tearDown() { + CurrentNexusOperationContext.unset(); + } + + /** + * Regression guard for the {@code LinkConverter} generalization: inbound nexus links of every + * known type (WorkflowEvent + NexusOperation) must flow through {@code createNexusBoundStub} to + * the resulting workflow options. Unknown types must still be filtered out. + * + *

Before this PR, only WorkflowEvent-typed links were forwarded; NexusOperation links were + * silently dropped at the type guard in {@code InternalUtils.createNexusBoundStub}. + */ + @Test + public void createNexusBoundStub_dispatchesAllKnownLinkTypes() throws URISyntaxException { + WorkflowStub stub = mock(WorkflowStub.class); + WorkflowOptions stubOptions = WorkflowOptions.newBuilder().setWorkflowId("wf-id").build(); + when(stub.getOptions()).thenReturn(Optional.of(stubOptions)); + when(stub.newInstance(any(WorkflowOptions.class))).thenReturn(mock(WorkflowStub.class)); + + Link workflowEventInbound = + Link.newBuilder() + .setType("temporal.api.common.v1.Link.WorkflowEvent") + .setUri( + new URI( + "temporal:///namespaces/ns/workflows/caller-wf/caller-run/history?eventID=1&eventType=NexusOperationScheduled&referenceType=EventReference")) + .build(); + Link nexusOpInbound = + Link.newBuilder() + .setType("temporal.api.common.v1.Link.NexusOperation") + .setUri(new URI("temporal:///namespaces/ns/nexus-operations/op-id/run-id/details")) + .build(); + Link unknownInbound = + Link.newBuilder() + .setType("not.a.real.Type") + .setUri(new URI("temporal:///some/unknown/path")) + .build(); + + NexusStartWorkflowRequest request = + new NexusStartWorkflowRequest( + "req-id", + "", // no callback URL — exercises the link path without the callback-headers branch + Collections.emptyMap(), + "tq", + Arrays.asList(workflowEventInbound, nexusOpInbound, unknownInbound)); + + InternalUtils.createNexusBoundStub(stub, request); + + ArgumentCaptor captor = ArgumentCaptor.forClass(WorkflowOptions.class); + org.mockito.Mockito.verify(stub).newInstance(captor.capture()); + WorkflowOptions captured = captor.getValue(); + assertNotNull("expected links to be set on bound options", captured.getLinks()); + assertEquals( + "expected WorkflowEvent + NexusOperation to flow through; unknown filtered out", + 2, + captured.getLinks().size()); + assertTrue( + "expected one WorkflowEvent variant", + captured.getLinks().stream().anyMatch(l -> l.hasWorkflowEvent())); + assertTrue( + "expected one NexusOperation variant", + captured.getLinks().stream().anyMatch(l -> l.hasNexusOperation())); + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/internal/common/LinkConverterTest.java b/temporal-sdk/src/test/java/io/temporal/internal/common/LinkConverterTest.java index 60b67b1b81..25e9e00f65 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/common/LinkConverterTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/common/LinkConverterTest.java @@ -1,6 +1,10 @@ package io.temporal.internal.common; +import static io.temporal.internal.common.LinkConverter.commonLinkToNexusLink; +import static io.temporal.internal.common.LinkConverter.nexusLinkToCommonLink; +import static io.temporal.internal.common.LinkConverter.nexusLinkToNexusOperation; import static io.temporal.internal.common.LinkConverter.nexusLinkToWorkflowEvent; +import static io.temporal.internal.common.LinkConverter.nexusOperationToNexusLink; import static io.temporal.internal.common.LinkConverter.workflowEventToNexusLink; import static org.junit.Assert.*; @@ -352,4 +356,200 @@ public void testConvertNexusToWorkflowEvent_InvalidEventType() { assertNull(nexusLinkToWorkflowEvent(input)); } + + // ── NexusOperation encode / decode ─────────────────────────────────────────────────────── + + @Test + public void testConvertNexusOperationToNexus_Valid() { + Link.NexusOperation input = + Link.NexusOperation.newBuilder() + .setNamespace("ns") + .setOperationId("op-id") + .setRunId("run-id") + .build(); + + io.temporal.api.nexus.v1.Link expected = + io.temporal.api.nexus.v1.Link.newBuilder() + .setUrl("temporal:///namespaces/ns/nexus-operations/op-id/run-id/details") + .setType("temporal.api.common.v1.Link.NexusOperation") + .build(); + + assertEquals(expected, nexusOperationToNexusLink(input)); + } + + @Test + public void testConvertNexusToNexusOperation_Valid() { + io.temporal.api.nexus.v1.Link input = + io.temporal.api.nexus.v1.Link.newBuilder() + .setUrl("temporal:///namespaces/ns/nexus-operations/op-id/run-id/details") + .setType("temporal.api.common.v1.Link.NexusOperation") + .build(); + + Link expected = + Link.newBuilder() + .setNexusOperation( + Link.NexusOperation.newBuilder() + .setNamespace("ns") + .setOperationId("op-id") + .setRunId("run-id")) + .build(); + + assertEquals(expected, nexusLinkToNexusOperation(input)); + } + + @Test + public void testNexusOperationRoundTrip() { + Link original = + Link.newBuilder() + .setNexusOperation( + Link.NexusOperation.newBuilder() + .setNamespace("ns") + .setOperationId("op-id") + .setRunId("run-id")) + .build(); + + io.temporal.api.nexus.v1.Link encoded = commonLinkToNexusLink(original); + Link decoded = nexusLinkToCommonLink(encoded); + assertEquals(original, decoded); + } + + @Test + public void testNexusOperationRoundTrip_OperationIdWithSpecialChars() { + // operationId is the field the encoder applies the `+ → %20` workaround to (matching the + // workflowId precedent in WorkflowEvent). Verify spaces, '+', and '/' round-trip cleanly. + Link original = + Link.newBuilder() + .setNexusOperation( + Link.NexusOperation.newBuilder() + .setNamespace("ns") + .setOperationId("op with+special/chars") + .setRunId("run-id")) + .build(); + + io.temporal.api.nexus.v1.Link encoded = commonLinkToNexusLink(original); + Link decoded = nexusLinkToCommonLink(encoded); + assertEquals(original, decoded); + } + + @Test + public void testConvertNexusToNexusOperation_ExtraTokensAfterDetails() { + // Defends against silently lossy parsing if the server later extends the path beyond + // /details/... (e.g. /details/{event_id}). Older SDKs should reject rather than drop tokens. + io.temporal.api.nexus.v1.Link input = + io.temporal.api.nexus.v1.Link.newBuilder() + .setUrl("temporal:///namespaces/ns/nexus-operations/op-id/run-id/details/extra/junk") + .setType("temporal.api.common.v1.Link.NexusOperation") + .build(); + + assertNull(nexusLinkToNexusOperation(input)); + } + + @Test + public void testConvertNexusToNexusOperation_InvalidScheme() { + io.temporal.api.nexus.v1.Link input = + io.temporal.api.nexus.v1.Link.newBuilder() + .setUrl("http:///namespaces/ns/nexus-operations/op-id/run-id/details") + .setType("temporal.api.common.v1.Link.NexusOperation") + .build(); + + assertNull(nexusLinkToNexusOperation(input)); + } + + @Test + public void testConvertNexusToNexusOperation_InvalidPath() { + io.temporal.api.nexus.v1.Link input = + io.temporal.api.nexus.v1.Link.newBuilder() + .setUrl("temporal:///namespaces/ns/workflows/wf-id/run-id/history") + .setType("temporal.api.common.v1.Link.NexusOperation") + .build(); + + assertNull(nexusLinkToNexusOperation(input)); + } + + // ── Dispatchers ────────────────────────────────────────────────────────────────────────── + + @Test + public void testNexusLinkToCommonLink_DispatchesOnType() { + io.temporal.api.nexus.v1.Link workflowEventLink = + io.temporal.api.nexus.v1.Link.newBuilder() + .setUrl( + "temporal:///namespaces/ns/workflows/wf-id/run-id/history?eventID=1&eventType=WorkflowExecutionStarted&referenceType=EventReference") + .setType("temporal.api.common.v1.Link.WorkflowEvent") + .build(); + Link decodedWorkflowEvent = nexusLinkToCommonLink(workflowEventLink); + assertNotNull(decodedWorkflowEvent); + assertTrue(decodedWorkflowEvent.hasWorkflowEvent()); + + io.temporal.api.nexus.v1.Link nexusOpLink = + io.temporal.api.nexus.v1.Link.newBuilder() + .setUrl("temporal:///namespaces/ns/nexus-operations/op-id/run-id/details") + .setType("temporal.api.common.v1.Link.NexusOperation") + .build(); + Link decodedNexusOp = nexusLinkToCommonLink(nexusOpLink); + assertNotNull(decodedNexusOp); + assertTrue(decodedNexusOp.hasNexusOperation()); + } + + @Test + public void testNexusLinkToCommonLink_UnknownTypeReturnsNull() { + io.temporal.api.nexus.v1.Link input = + io.temporal.api.nexus.v1.Link.newBuilder() + .setUrl("temporal:///some/other/path") + .setType("not.a.real.Type") + .build(); + + assertNull(nexusLinkToCommonLink(input)); + } + + @Test + public void testCommonLinkToNexusLink_DispatchesOnVariant() { + Link workflowEvent = + Link.newBuilder() + .setWorkflowEvent( + Link.WorkflowEvent.newBuilder() + .setNamespace("ns") + .setWorkflowId("wf-id") + .setRunId("run-id") + .setEventRef( + Link.WorkflowEvent.EventReference.newBuilder() + .setEventId(1) + .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED))) + .build(); + io.temporal.api.nexus.v1.Link encodedWorkflowEvent = commonLinkToNexusLink(workflowEvent); + assertNotNull(encodedWorkflowEvent); + assertEquals("temporal.api.common.v1.Link.WorkflowEvent", encodedWorkflowEvent.getType()); + + Link nexusOp = + Link.newBuilder() + .setNexusOperation( + Link.NexusOperation.newBuilder() + .setNamespace("ns") + .setOperationId("op-id") + .setRunId("run-id")) + .build(); + io.temporal.api.nexus.v1.Link encodedNexusOp = commonLinkToNexusLink(nexusOp); + assertNotNull(encodedNexusOp); + assertEquals("temporal.api.common.v1.Link.NexusOperation", encodedNexusOp.getType()); + } + + @Test + public void testCommonLinkToNexusLink_UnsupportedVariantReturnsNull() { + // Activity and BatchJob variants are not yet encoded by either the SDK or the canonical + // server implementation; the dispatcher logs and returns null until they are. + Link activity = + Link.newBuilder() + .setActivity( + Link.Activity.newBuilder() + .setNamespace("ns") + .setActivityId("act-id") + .setRunId("run-id")) + .build(); + assertNull(commonLinkToNexusLink(activity)); + + Link batchJob = + Link.newBuilder().setBatchJob(Link.BatchJob.newBuilder().setJobId("job-id")).build(); + assertNull(commonLinkToNexusLink(batchJob)); + + assertNull(commonLinkToNexusLink(Link.getDefaultInstance())); + } } diff --git a/temporal-sdk/src/test/java/io/temporal/internal/nexus/NexusTaskHandlerImplTest.java b/temporal-sdk/src/test/java/io/temporal/internal/nexus/NexusTaskHandlerImplTest.java index ad7c628e98..f9308c2071 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/nexus/NexusTaskHandlerImplTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/nexus/NexusTaskHandlerImplTest.java @@ -8,9 +8,12 @@ import com.uber.m3.util.Duration; import io.nexusrpc.Header; import io.nexusrpc.handler.*; +import io.temporal.api.common.v1.Link; import io.temporal.api.common.v1.Payload; +import io.temporal.api.enums.v1.EventType; import io.temporal.api.nexus.v1.Request; import io.temporal.api.nexus.v1.StartOperationRequest; +import io.temporal.api.nexus.v1.StartOperationResponse; import io.temporal.api.workflowservice.v1.PollNexusTaskQueueResponse; import io.temporal.client.WorkflowClient; import io.temporal.common.converter.DataConverter; @@ -157,6 +160,223 @@ public void startAsyncSyncOperation() throws TimeoutException { "test id", result.getResponse().getStartOperation().getAsyncSuccess().getOperationToken()); } + /** + * Verify that signal-response backlinks stashed on the {@link InternalNexusOperationContext} + * during a handler invocation are merged into the resulting {@code StartOperationResponse.Async} + * via {@link io.temporal.internal.common.LinkConverter}. No server required. + */ + @Test + public void asyncResponseIncludesSignalBacklinks() throws TimeoutException { + WorkflowClient client = mock(WorkflowClient.class); + NexusTaskHandlerImpl nexusTaskHandlerImpl = + new NexusTaskHandlerImpl( + client, NAMESPACE, TASK_QUEUE, dataConverter, new WorkerInterceptor[] {}); + nexusTaskHandlerImpl.registerNexusServiceImplementations( + new Object[] {new BacklinkStashingAsyncServiceImpl()}); + nexusTaskHandlerImpl.start(); + + PollNexusTaskQueueResponse.Builder task = + PollNexusTaskQueueResponse.newBuilder() + .setRequest( + Request.newBuilder() + .setStartOperation( + StartOperationRequest.newBuilder() + .setOperation("operation") + .setService("TestNexusService1") + .setPayload(dataConverter.toPayload("op-token").get()) + .build())); + + NexusTaskHandler.Result result = + nexusTaskHandlerImpl.handle(new NexusTask(task, null, null), metricsScope); + + Assert.assertNull(result.getHandlerException()); + StartOperationResponse.Async async = result.getResponse().getStartOperation().getAsyncSuccess(); + Assert.assertEquals("op-token", async.getOperationToken()); + Assert.assertEquals( + "expected one signal backlink on the async response", 1, async.getLinksCount()); + // The backlink was stashed as a WorkflowEvent for callee workflowId "callee-wf"; the response + // should contain a temporal:// URL referencing that workflow. + Assert.assertTrue( + "expected backlink URL to reference the callee workflow, got: " + + async.getLinks(0).getUrl(), + async.getLinks(0).getUrl().contains("callee-wf")); + } + + /** + * Same as {@link #asyncResponseIncludesSignalBacklinks} but the handler returns sync. Guards + * against the sync and async builders drifting (both must call {@code addAllLinks(backlinks)}). + */ + @Test + public void syncResponseIncludesSignalBacklinks() throws TimeoutException { + WorkflowClient client = mock(WorkflowClient.class); + NexusTaskHandlerImpl nexusTaskHandlerImpl = + new NexusTaskHandlerImpl( + client, NAMESPACE, TASK_QUEUE, dataConverter, new WorkerInterceptor[] {}); + nexusTaskHandlerImpl.registerNexusServiceImplementations( + new Object[] {new BacklinkStashingSyncServiceImpl()}); + nexusTaskHandlerImpl.start(); + + PollNexusTaskQueueResponse.Builder task = + PollNexusTaskQueueResponse.newBuilder() + .setRequest( + Request.newBuilder() + .setStartOperation( + StartOperationRequest.newBuilder() + .setOperation("operation") + .setService("TestNexusService1") + .setPayload(dataConverter.toPayload("input").get()) + .build())); + + NexusTaskHandler.Result result = + nexusTaskHandlerImpl.handle(new NexusTask(task, null, null), metricsScope); + + Assert.assertNull(result.getHandlerException()); + StartOperationResponse.Sync sync = result.getResponse().getStartOperation().getSyncSuccess(); + Assert.assertEquals( + "expected one signal backlink on the sync response", 1, sync.getLinksCount()); + Assert.assertTrue( + "expected backlink URL to reference the callee workflow, got: " + sync.getLinks(0).getUrl(), + sync.getLinks(0).getUrl().contains("callee-wf")); + } + + /** + * Mixed inbound link list: one valid {@code WorkflowEvent}-typed link plus one unknown-type link. + * The handler must observe only the valid one on {@code getNexusOperationLinks()} — unknown types + * are logged + dropped by {@link io.temporal.internal.common.LinkConverter}. + */ + @Test + public void inboundLinkListFiltersUnknownTypes() throws TimeoutException { + InboundLinkCapturingServiceImpl.capturedInboundLinks = null; + WorkflowClient client = mock(WorkflowClient.class); + NexusTaskHandlerImpl nexusTaskHandlerImpl = + new NexusTaskHandlerImpl( + client, NAMESPACE, TASK_QUEUE, dataConverter, new WorkerInterceptor[] {}); + nexusTaskHandlerImpl.registerNexusServiceImplementations( + new Object[] {new InboundLinkCapturingServiceImpl()}); + nexusTaskHandlerImpl.start(); + + io.temporal.api.nexus.v1.Link validLink = + io.temporal.api.nexus.v1.Link.newBuilder() + .setUrl( + "temporal:///namespaces/ns/workflows/caller-wf/caller-run/history?eventID=1&eventType=NexusOperationScheduled&referenceType=EventReference") + .setType("temporal.api.common.v1.Link.WorkflowEvent") + .build(); + io.temporal.api.nexus.v1.Link unknownLink = + io.temporal.api.nexus.v1.Link.newBuilder() + .setUrl("temporal:///some/unknown/path") + .setType("not.a.real.Type") + .build(); + + PollNexusTaskQueueResponse.Builder task = + PollNexusTaskQueueResponse.newBuilder() + .setRequest( + Request.newBuilder() + .setStartOperation( + StartOperationRequest.newBuilder() + .setOperation("operation") + .setService("TestNexusService1") + .setPayload(dataConverter.toPayload("input").get()) + .addLinks(validLink) + .addLinks(unknownLink) + .build())); + + NexusTaskHandler.Result result = + nexusTaskHandlerImpl.handle(new NexusTask(task, null, null), metricsScope); + + Assert.assertNull(result.getHandlerException()); + Assert.assertNotNull( + "handler should have captured the inbound links", + InboundLinkCapturingServiceImpl.capturedInboundLinks); + Assert.assertEquals( + "expected only the valid WorkflowEvent link to be forwarded; the unknown-type link must" + + " be dropped by LinkConverter", + 1, + InboundLinkCapturingServiceImpl.capturedInboundLinks.size()); + Assert.assertTrue( + "expected the surviving link to be the WorkflowEvent variant", + InboundLinkCapturingServiceImpl.capturedInboundLinks.get(0).hasWorkflowEvent()); + } + + /** + * Handler that simulates what a real Nexus operation would do after issuing a signal: stash a + * backlink on the operation context, then return an async result. Lets us exercise the + * async-response link merge in {@link NexusTaskHandlerImpl} without standing up a real signal + * RPC. + */ + @ServiceImpl(service = TestNexusServices.TestNexusService1.class) + public class BacklinkStashingAsyncServiceImpl { + @OperationImpl + public OperationHandler operation() { + return new OperationHandler() { + @Override + public OperationStartResult start( + OperationContext ctx, OperationStartDetails details, @Nullable String token) { + Link backlink = + Link.newBuilder() + .setWorkflowEvent( + Link.WorkflowEvent.newBuilder() + .setNamespace(NAMESPACE) + .setWorkflowId("callee-wf") + .setRunId("callee-run-id") + .setEventRef( + Link.WorkflowEvent.EventReference.newBuilder() + .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED))) + .build(); + CurrentNexusOperationContext.get().addBacklink(backlink); + return OperationStartResult.async(token); + } + + @Override + public void cancel(OperationContext ctx, OperationCancelDetails details) {} + }; + } + } + + /** Sync mirror of {@link BacklinkStashingAsyncServiceImpl}. */ + @ServiceImpl(service = TestNexusServices.TestNexusService1.class) + public class BacklinkStashingSyncServiceImpl { + @OperationImpl + public OperationHandler operation() { + return OperationHandler.sync( + (ctx, details, input) -> { + Link backlink = + Link.newBuilder() + .setWorkflowEvent( + Link.WorkflowEvent.newBuilder() + .setNamespace(NAMESPACE) + .setWorkflowId("callee-wf") + .setRunId("callee-run-id") + .setEventRef( + Link.WorkflowEvent.EventReference.newBuilder() + .setEventType( + EventType.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED))) + .build(); + CurrentNexusOperationContext.get().addBacklink(backlink); + return "result"; + }); + } + } + + /** + * Records the inbound common.v1.Link list the handler observes on its operation context, so a + * test can assert that LinkConverter filtered out unknown-type links before stashing. + */ + @ServiceImpl(service = TestNexusServices.TestNexusService1.class) + public static class InboundLinkCapturingServiceImpl { + static volatile java.util.List capturedInboundLinks; + + @OperationImpl + public OperationHandler operation() { + return OperationHandler.sync( + (ctx, details, input) -> { + capturedInboundLinks = + new java.util.ArrayList<>( + CurrentNexusOperationContext.get().getNexusOperationLinks()); + return "ok"; + }); + } + } + @ServiceImpl(service = TestNexusServices.TestNexusService1.class) public class TestNexusServiceImpl { @OperationImpl diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SignalOperationLinkingTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SignalOperationLinkingTest.java new file mode 100644 index 0000000000..67fecd851e --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SignalOperationLinkingTest.java @@ -0,0 +1,427 @@ +package io.temporal.workflow.nexus; + +import static org.junit.Assume.assumeTrue; + +import io.nexusrpc.handler.OperationCancelDetails; +import io.nexusrpc.handler.OperationContext; +import io.nexusrpc.handler.OperationHandler; +import io.nexusrpc.handler.OperationImpl; +import io.nexusrpc.handler.OperationStartDetails; +import io.nexusrpc.handler.OperationStartResult; +import io.nexusrpc.handler.ServiceImpl; +import io.temporal.api.enums.v1.EventType; +import io.temporal.api.history.v1.History; +import io.temporal.api.history.v1.HistoryEvent; +import io.temporal.client.BatchRequest; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowOptions; +import io.temporal.client.WorkflowStub; +import io.temporal.nexus.Nexus; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.NexusOperationHandle; +import io.temporal.workflow.NexusOperationOptions; +import io.temporal.workflow.NexusServiceOptions; +import io.temporal.workflow.SignalMethod; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; +import io.temporal.workflow.shared.TestNexusServices; +import io.temporal.workflow.shared.TestWorkflows; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import javax.annotation.Nullable; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +/** + * Verifies link propagation in both directions when a Nexus operation handler interacts with a + * workflow via signal. Covers three scenarios: + * + *

    + *
  • {@link #testSignalOperationLinks()} — sync handler, two signals (signalWithStart + plain + * signal). + *
  • {@link #testMultiSignalOperationLinks()} — one Nexus operation signals three different + * callees; verifies all three backlinks land on the caller's single {@code + * NexusOperationCompleted} event. + *
  • {@link #testAsyncSignalOperationLinks()} — handler returns an async result after signaling; + * verifies the backlink lands on {@code NexusOperationStarted} (the async response path in + * {@link io.temporal.internal.nexus.NexusTaskHandlerImpl}). + *
+ * + *

All tests require Temporal server ≥ 1.31 with {@code EnableCHASMSignalBacklinks=true}; the + * in-memory test server does not implement this path so the class is skipped unless a real server + * is in use. + */ +public class SignalOperationLinkingTest extends BaseNexusTest { + + private static final String MODE_SIGNAL_WITH_START = "signalWithStart"; + private static final String MODE_SIGNAL = "signal"; + private static final String MODE_MULTI_SIGNAL_WITH_START = "multi"; + private static final String MODE_ASYNC_SIGNAL_WITH_START = "asyncSignalWithStart"; + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes(SignalCallerWorkflow.class, SignalCalleeWorkflowImpl.class) + .setNexusServiceImplementation(new SignalingNexusServiceImpl()) + .build(); + + @BeforeClass + public static void requireExternalService() { + // The server-side backlink implementation (temporalio/temporal#9897) is gated by + // EnableCHASMSignalBacklinks and is only present in real servers. + assumeTrue( + "signal backlinks require a real server with EnableCHASMSignalBacklinks=true", + SDKTestWorkflowRule.useExternalService); + } + + @Override + protected SDKTestWorkflowRule getTestWorkflowRule() { + return testWorkflowRule; + } + + // ── Tests ──────────────────────────────────────────────────────────────────────────────── + + @Test + public void testSignalOperationLinks() { + runTwoSignalScenario(); + } + + /** + * One Nexus operation signals three different callees. The handler's three signal-class RPCs each + * contribute a backlink and all three end up on the caller's single {@code + * NexusOperationCompleted} event. + */ + @Test + public void testMultiSignalOperationLinks() { + WorkflowClient client = testWorkflowRule.getWorkflowClient(); + List calleeIds = + Arrays.asList( + "multi-callee-a-" + UUID.randomUUID(), + "multi-callee-b-" + UUID.randomUUID(), + "multi-callee-c-" + UUID.randomUUID()); + + TestWorkflows.TestWorkflow1 callerStub = + testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestWorkflow1.class, "caller"); + String result = + callerStub.execute(MODE_MULTI_SIGNAL_WITH_START + ":" + String.join(",", calleeIds)); + Assert.assertEquals("ok:multi:" + String.join(",", calleeIds), result); + + // Each callee gets one signal and completes. + for (String calleeId : calleeIds) { + String calleeResult = client.newUntypedWorkflowStub(calleeId).getResult(String.class); + Assert.assertEquals("multi-signal", calleeResult); + } + + String callerWorkflowId = WorkflowStub.fromTyped(callerStub).getExecution().getWorkflowId(); + History callerHistory = client.fetchHistory(callerWorkflowId).getHistory(); + + // Caller → each callee: forward links on every callee's WorkflowExecutionSignaled event. + for (String calleeId : calleeIds) { + History calleeHistory = client.fetchHistory(calleeId).getHistory(); + assertForwardLinks(calleeHistory, callerWorkflowId, /* expectedCount= */ 1); + } + + // Callee → caller: the single NexusOperationCompleted carries one backlink per callee. + List completedEvents = + getAllEventsOfType(callerHistory, EventType.EVENT_TYPE_NEXUS_OPERATION_COMPLETED); + Assert.assertEquals( + "expected exactly one NexusOperationCompleted event", 1, completedEvents.size()); + HistoryEvent completed = completedEvents.get(0); + Assert.assertEquals( + "expected one backlink per signaled callee", calleeIds.size(), completed.getLinksCount()); + List backlinkWorkflowIds = new ArrayList<>(); + for (int i = 0; i < completed.getLinksCount(); i++) { + io.temporal.api.common.v1.Link.WorkflowEvent backlink = + completed.getLinks(i).getWorkflowEvent(); + backlinkWorkflowIds.add(backlink.getWorkflowId()); + EventType backlinkEventType = + backlink.hasRequestIdRef() + ? backlink.getRequestIdRef().getEventType() + : backlink.getEventRef().getEventType(); + Assert.assertEquals(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED, backlinkEventType); + } + Assert.assertTrue( + "expected backlinks to reference all three callees: " + backlinkWorkflowIds, + backlinkWorkflowIds.containsAll(calleeIds)); + } + + /** + * Async response path: handler signals the callee then returns an async result. Verifies that the + * backlink lands on {@code NexusOperationStarted} (the async branch in NexusTaskHandlerImpl). + */ + @Test + public void testAsyncSignalOperationLinks() { + WorkflowClient client = testWorkflowRule.getWorkflowClient(); + String calleeWorkflowId = "async-callee-" + UUID.randomUUID(); + + TestWorkflows.TestWorkflow1 callerStub = + testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestWorkflow1.class, "caller"); + String result = callerStub.execute(MODE_ASYNC_SIGNAL_WITH_START + ":" + calleeWorkflowId); + Assert.assertEquals("async-started", result); + + String calleeResult = client.newUntypedWorkflowStub(calleeWorkflowId).getResult(String.class); + Assert.assertEquals("async-signal", calleeResult); + + String callerWorkflowId = WorkflowStub.fromTyped(callerStub).getExecution().getWorkflowId(); + History callerHistory = client.fetchHistory(callerWorkflowId).getHistory(); + History calleeHistory = client.fetchHistory(calleeWorkflowId).getHistory(); + + assertForwardLinks(calleeHistory, callerWorkflowId, /* expectedCount= */ 1); + + // Backward direction lands on NexusOperationStarted for the async response path. + List startedEvents = + getAllEventsOfType(callerHistory, EventType.EVENT_TYPE_NEXUS_OPERATION_STARTED); + Assert.assertEquals( + "expected exactly one NexusOperationStarted event for the async op", + 1, + startedEvents.size()); + assertBacklink(startedEvents.get(0), calleeWorkflowId); + } + + // ── Shared scenario + assertion helpers ────────────────────────────────────────────────── + + /** Drive the two-signal flow (signalWithStart + plain signal) and assert link propagation. */ + private void runTwoSignalScenario() { + WorkflowClient client = testWorkflowRule.getWorkflowClient(); + String calleeWorkflowId = "signal-callee-" + UUID.randomUUID(); + + TestWorkflows.TestWorkflow1 callerStub = + testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestWorkflow1.class, "caller"); + String result = callerStub.execute("twoSync:" + calleeWorkflowId); + Assert.assertEquals("ok:signalWithStart|ok:signal", result); + + String calleeResult = client.newUntypedWorkflowStub(calleeWorkflowId).getResult(String.class); + Assert.assertEquals("first,second", calleeResult); + + String callerWorkflowId = WorkflowStub.fromTyped(callerStub).getExecution().getWorkflowId(); + History callerHistory = client.fetchHistory(callerWorkflowId).getHistory(); + History calleeHistory = client.fetchHistory(calleeWorkflowId).getHistory(); + + assertForwardLinks(calleeHistory, callerWorkflowId, /* expectedCount= */ 2); + + List completedEvents = + getAllEventsOfType(callerHistory, EventType.EVENT_TYPE_NEXUS_OPERATION_COMPLETED); + Assert.assertEquals( + "expected two NexusOperationCompleted events on the caller", 2, completedEvents.size()); + for (HistoryEvent completed : completedEvents) { + assertBacklink(completed, calleeWorkflowId); + } + } + + /** + * Assert that the callee history has {@code expectedCount} {@code WorkflowExecutionSignaled} + * events, each linked back to the caller's {@code NexusOperationScheduled} event. + */ + private static void assertForwardLinks( + History calleeHistory, String callerWorkflowId, int expectedCount) { + List signaledEvents = + getAllEventsOfType(calleeHistory, EventType.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED); + Assert.assertEquals( + "expected " + expectedCount + " WorkflowExecutionSignaled events on the callee", + expectedCount, + signaledEvents.size()); + for (HistoryEvent signaled : signaledEvents) { + Assert.assertTrue( + "expected at least one link on each WorkflowExecutionSignaled event", + signaled.getLinksCount() >= 1); + Assert.assertEquals( + "signaled-event link should reference the caller workflow", + callerWorkflowId, + signaled.getLinks(0).getWorkflowEvent().getWorkflowId()); + Assert.assertEquals( + EventType.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED, + signaled.getLinks(0).getWorkflowEvent().getEventRef().getEventType()); + } + } + + /** + * Assert that a single caller-side event ({@code NexusOperationCompleted} or {@code + * NexusOperationStarted}) carries a backlink to the callee's {@code WorkflowExecutionSignaled} + * event. Server PR #9897 keys these via {@code RequestIdReference} rather than {@code + * EventReference}, so we accept either oneof variant. + */ + private static void assertBacklink(HistoryEvent event, String calleeWorkflowId) { + Assert.assertTrue( + "expected a signal-event backlink on " + event.getEventType().name(), + event.getLinksCount() >= 1); + io.temporal.api.common.v1.Link.WorkflowEvent backlink = event.getLinks(0).getWorkflowEvent(); + Assert.assertEquals(calleeWorkflowId, backlink.getWorkflowId()); + EventType backlinkEventType = + backlink.hasRequestIdRef() + ? backlink.getRequestIdRef().getEventType() + : backlink.getEventRef().getEventType(); + Assert.assertEquals(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED, backlinkEventType); + } + + /** Find all history events of a given type, in order. */ + private static List getAllEventsOfType(History history, EventType type) { + List out = new ArrayList<>(); + for (HistoryEvent e : history.getEventsList()) { + if (e.getEventType() == type) { + out.add(e); + } + } + return out; + } + + // ── Workflows ──────────────────────────────────────────────────────────────────────────── + + /** + * Caller workflow. Branches on a mode prefix in the input: + * + *

    + *
  • {@code twoSync:} — invoke the nexus op twice synchronously (signalWithStart, + * then signal). + *
  • {@code multi:,,} — invoke the nexus op once synchronously; handler + * signalWithStart's each id. + *
  • {@code asyncSignalWithStart:} — invoke the nexus op asynchronously via {@code + * Workflow.startNexusOperation}; wait for execution start and return without waiting for + * the operation result. + *
+ */ + public static class SignalCallerWorkflow implements TestWorkflows.TestWorkflow1 { + @Override + public String execute(String input) { + String[] parts = input.split(":", 2); + String mode = parts[0]; + String rest = parts[1]; + + TestNexusServices.TestNexusService1 stub = + Workflow.newNexusServiceStub( + TestNexusServices.TestNexusService1.class, + NexusServiceOptions.newBuilder() + .setEndpoint(getEndpointName()) + .setOperationOptions( + NexusOperationOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofSeconds(30)) + .build()) + .build()); + + switch (mode) { + case "twoSync": + { + String r1 = stub.operation(MODE_SIGNAL_WITH_START + ":" + rest); + String r2 = stub.operation(MODE_SIGNAL + ":" + rest); + return r1 + "|" + r2; + } + case MODE_MULTI_SIGNAL_WITH_START: + return stub.operation(MODE_MULTI_SIGNAL_WITH_START + ":" + rest); + case MODE_ASYNC_SIGNAL_WITH_START: + { + NexusOperationHandle h = + Workflow.startNexusOperation( + stub::operation, MODE_ASYNC_SIGNAL_WITH_START + ":" + rest); + // Wait for the async op to be Started (the event that carries the backlink) but + // not for its eventual result — the async op completes outside this workflow. + h.getExecution().get(); + return "async-started"; + } + default: + throw new IllegalArgumentException("unknown mode: " + mode); + } + } + } + + /** Callee workflow. Awaits {@code expectedSignals} signals then returns their joined payloads. */ + @WorkflowInterface + public interface SignalCalleeWorkflow { + @WorkflowMethod + String execute(int expectedSignals); + + @SignalMethod + void ping(String msg); + } + + public static class SignalCalleeWorkflowImpl implements SignalCalleeWorkflow { + private final List received = new ArrayList<>(); + + @Override + public String execute(int expectedSignals) { + Workflow.await(() -> received.size() >= expectedSignals); + return String.join(",", received); + } + + @Override + public void ping(String msg) { + received.add(msg); + } + } + + // ── Nexus service ──────────────────────────────────────────────────────────────────────── + + /** + * Single Nexus operation that dispatches based on a mode prefix in its input. Supports sync and + * async return shapes. + */ + @ServiceImpl(service = TestNexusServices.TestNexusService1.class) + public static class SignalingNexusServiceImpl { + + @OperationImpl + public OperationHandler operation() { + return new OperationHandler() { + @Override + public OperationStartResult start( + OperationContext ctx, OperationStartDetails details, @Nullable String input) { + String[] parts = input.split(":", 2); + String mode = parts[0]; + String rest = parts[1]; + + io.temporal.nexus.NexusOperationContext opCtx = Nexus.getOperationContext(); + WorkflowClient client = opCtx.getWorkflowClient(); + String taskQueue = opCtx.getInfo().getTaskQueue(); + + switch (mode) { + case MODE_SIGNAL_WITH_START: + signalWithStart(client, rest, taskQueue, /* expectedSignals= */ 2, "first"); + return OperationStartResult.sync("ok:" + MODE_SIGNAL_WITH_START); + case MODE_SIGNAL: + client.newWorkflowStub(SignalCalleeWorkflow.class, rest).ping("second"); + return OperationStartResult.sync("ok:" + MODE_SIGNAL); + case MODE_MULTI_SIGNAL_WITH_START: + for (String id : rest.split(",")) { + signalWithStart(client, id, taskQueue, /* expectedSignals= */ 1, "multi-signal"); + } + return OperationStartResult.sync("ok:multi:" + rest); + case MODE_ASYNC_SIGNAL_WITH_START: + signalWithStart(client, rest, taskQueue, /* expectedSignals= */ 1, "async-signal"); + // Async branch in NexusTaskHandlerImpl. The caller never waits for completion, so + // the token is opaque. + return OperationStartResult.async("async-op-" + UUID.randomUUID()); + default: + throw new IllegalArgumentException("unknown mode: " + mode); + } + } + + @Override + public void cancel(OperationContext ctx, OperationCancelDetails details) { + // Not exercised in these tests. + } + }; + } + + private static void signalWithStart( + WorkflowClient client, + String calleeWorkflowId, + String taskQueue, + int expectedSignals, + String signalPayload) { + SignalCalleeWorkflow startStub = + client.newWorkflowStub( + SignalCalleeWorkflow.class, + WorkflowOptions.newBuilder() + .setWorkflowId(calleeWorkflowId) + .setTaskQueue(taskQueue) + .build()); + BatchRequest batch = client.newSignalWithStartRequest(); + batch.add(startStub::execute, expectedSignals); + batch.add(startStub::ping, signalPayload); + client.signalWithStart(batch); + } + } +} diff --git a/temporal-serviceclient/src/main/proto b/temporal-serviceclient/src/main/proto index d2fc34ab84..803029f9cf 160000 --- a/temporal-serviceclient/src/main/proto +++ b/temporal-serviceclient/src/main/proto @@ -1 +1 @@ -Subproject commit d2fc34ab844603f50e41365f46c7fb82bdedffe6 +Subproject commit 803029f9cfb905e23341d470f02aec8e7ef373d0