Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ public WorkflowStartOutput start(WorkflowStartInput input) {
}
}
if (CurrentNexusOperationContext.isNexusContext()) {
CurrentNexusOperationContext.get().setStartWorkflowResponseLink(response.getLink());
// Auto-capture the start-workflow backlink so the task handler drains it onto the
// StartOperationResponse, the same path used for signal/signalWithStart responses.
CurrentNexusOperationContext.get().addBacklink(response.getLink());
}
return new WorkflowStartOutput(execution);
}
Expand All @@ -120,6 +122,13 @@ public WorkflowSignalOutput signal(WorkflowSignalInput input) {
.setRequestId(UUID.randomUUID().toString())
.setHeader(HeaderUtils.toHeaderGrpc(input.getHeader(), null));

// If this signal is being issued from inside a Nexus operation handler, forward the inbound
// Nexus task links so the SignalWorkflowExecution history event links back to the caller.
boolean inNexusContext = CurrentNexusOperationContext.isNexusContext();
if (inNexusContext) {
request.addAllLinks(CurrentNexusOperationContext.get().getNexusOperationLinks());
}

DataConverter dataConverterWitSignalContext =
clientOptions
.getDataConverter()
Expand All @@ -129,7 +138,12 @@ public WorkflowSignalOutput signal(WorkflowSignalInput input) {

Optional<Payloads> inputArgs = dataConverterWitSignalContext.toPayloads(input.getArguments());
inputArgs.ifPresent(request::setInput);
genericClient.signal(request.build());
SignalWorkflowExecutionResponse response = genericClient.signal(request.build());
// Server >=1.31 with EnableCHASMSignalBacklinks returns a backlink pointing at the signal
// event; older servers leave it unset. Propagate when present.
if (inNexusContext && response.hasLink()) {
CurrentNexusOperationContext.get().addBacklink(response.getLink());
}
return new WorkflowSignalOutput();
}

Expand All @@ -148,17 +162,31 @@ public WorkflowSignalWithStartOutput signalWithStart(WorkflowSignalWithStartInpu

Optional<Payloads> signalInput =
dataConverterWithWorkflowContext.toPayloads(input.getSignalArguments());
SignalWithStartWorkflowExecutionRequest request =
requestsHelper
.newSignalWithStartWorkflowExecutionRequest(
startRequest, input.getSignalName(), signalInput.orElse(null))
.build();
SignalWithStartWorkflowExecutionRequest.Builder requestBuilder =
requestsHelper.newSignalWithStartWorkflowExecutionRequest(
startRequest, input.getSignalName(), signalInput.orElse(null));
// If this signalWithStart is being issued from inside a Nexus operation handler, forward
// the inbound Nexus task links so both the WorkflowExecutionStarted and
// WorkflowExecutionSignaled events on the callee link back to the caller.
boolean inNexusContext = CurrentNexusOperationContext.isNexusContext();
if (inNexusContext) {
requestBuilder.addAllLinks(CurrentNexusOperationContext.get().getNexusOperationLinks());
} else {
log.debug(
"signalWithStart RPC issued outside a Nexus operation context; no link propagation");
}
SignalWithStartWorkflowExecutionRequest request = requestBuilder.build();
SignalWithStartWorkflowExecutionResponse response = genericClient.signalWithStart(request);
WorkflowExecution execution =
WorkflowExecution.newBuilder()
.setRunId(response.getRunId())
.setWorkflowId(request.getWorkflowId())
.build();
// Server >=1.31 with EnableCHASMSignalBacklinks returns a backlink pointing at the signal
// event; older servers leave it unset. Propagate when present.
if (inNexusContext && response.hasSignalLink()) {
CurrentNexusOperationContext.get().addBacklink(response.getSignalLink());
}
// TODO currently SignalWithStartWorkflowExecutionResponse doesn't have eagerWorkflowTask.
// We should wire it when it's implemented server-side.
return new WorkflowSignalWithStartOutput(new WorkflowStartOutput(execution));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public interface GenericWorkflowClient {

StartWorkflowExecutionResponse start(StartWorkflowExecutionRequest request);

void signal(SignalWorkflowExecutionRequest request);
SignalWorkflowExecutionResponse signal(SignalWorkflowExecutionRequest request);

SignalWithStartWorkflowExecutionResponse signalWithStart(
SignalWithStartWorkflowExecutionRequest request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,13 @@ private static Map<String, String> tagsForStartWorkflow(StartWorkflowExecutionRe
}

@Override
public void signal(SignalWorkflowExecutionRequest request) {
public SignalWorkflowExecutionResponse signal(SignalWorkflowExecutionRequest request) {
Map<String, String> tags =
new ImmutableMap.Builder<String, String>(1)
.put(MetricsTag.SIGNAL_NAME, request.getSignalName())
.build();
Scope scope = metricsScope.tagged(tags);
grpcRetryer.retry(
return grpcRetryer.retryWithResult(
() ->
service
.blockingStub()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,11 @@
import io.temporal.internal.nexus.OperationTokenUtil;
import java.util.*;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Utility functions shared by the implementation code. */
public final class InternalUtils {
public static String TEMPORAL_RESERVED_PREFIX = "__temporal_";

private static final Logger log = LoggerFactory.getLogger(InternalUtils.class);
private static String QUERY_TYPE_STACK_TRACE = "__stack_trace";
private static String ENHANCED_QUERY_TYPE_STACK_TRACE = "__enhanced_stack_trace";

Expand Down Expand Up @@ -94,19 +91,12 @@ public static NexusWorkflowStarter createNexusBoundStub(
: request.getLinks().stream()
.map(
(link) -> {
if (io.temporal.api.common.v1.Link.WorkflowEvent.getDescriptor()
.getFullName()
.equals(link.getType())) {
io.temporal.api.nexus.v1.Link nexusLink =
io.temporal.api.nexus.v1.Link.newBuilder()
.setType(link.getType())
.setUrl(link.getUri().toString())
.build();
return LinkConverter.nexusLinkToWorkflowEvent(nexusLink);
} else {
log.warn("ignoring unsupported link data type: {}", link.getType());
return null;
}
io.temporal.api.nexus.v1.Link nexusLink =
io.temporal.api.nexus.v1.Link.newBuilder()
.setType(link.getType())
.setUrl(link.getUri().toString())
.build();
return LinkConverter.nexusLinkToCommonLink(nexusLink);
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.temporal.api.enums.v1.EventType;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
Expand All @@ -20,6 +21,8 @@ public class LinkConverter {
private static final Logger log = LoggerFactory.getLogger(LinkConverter.class);

private static final String linkPathFormat = "temporal:///namespaces/%s/workflows/%s/%s/history";
private static final String linkPathNexusOperationFormat =
"temporal:///namespaces/%s/nexus-operations/%s/%s/details";
private static final String linkReferenceTypeKey = "referenceType";
private static final String linkEventIDKey = "eventID";
private static final String linkEventTypeKey = "eventType";
Expand All @@ -30,6 +33,12 @@ public class LinkConverter {
private static final String requestIDReferenceType =
Link.WorkflowEvent.RequestIdReference.getDescriptor().getName();

// Fully-qualified proto descriptor names used as the `type` field on nexus.v1.Link. Match the
// server's Nexus link converter so links round-trip cleanly across SDKs.
private static final String workflowEventType = Link.WorkflowEvent.getDescriptor().getFullName();
private static final String nexusOperationType =
Link.NexusOperation.getDescriptor().getFullName();

public static io.temporal.api.nexus.v1.Link workflowEventToNexusLink(Link.WorkflowEvent we) {
try {

Expand Down Expand Up @@ -160,6 +169,142 @@ public static Link nexusLinkToWorkflowEvent(io.temporal.api.nexus.v1.Link nexusL
return link.build();
}

/**
* Encode a {@link Link.NexusOperation} (a link to a standalone Nexus operation) into the (url,
* type) form used on the Nexus wire. URL format matches the canonical server implementation:
* {@code temporal:///namespaces/{ns}/nexus-operations/{op_id}/{run_id}/details}.
*/
public static io.temporal.api.nexus.v1.Link nexusOperationToNexusLink(Link.NexusOperation no) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this belongs in this PR, this probably belongs in the SANO PR, BUT the server doesn't support SANO links right now so the SDK can't send them yet.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may not have detected the lack of server support because this change doesn't exercise SANO, another reason this change doesn't belong in this PR

try {
String url =
String.format(
linkPathNexusOperationFormat,
URLEncoder.encode(no.getNamespace(), StandardCharsets.UTF_8.toString()),
URLEncoder.encode(no.getOperationId(), StandardCharsets.UTF_8.toString())
.replace("+", "%20"),
URLEncoder.encode(no.getRunId(), StandardCharsets.UTF_8.toString()));
return io.temporal.api.nexus.v1.Link.newBuilder()
.setUrl(url)
.setType(nexusOperationType)
.build();
} catch (UnsupportedEncodingException e) {
log.error("Failed to encode NexusOperation Nexus link URL", e);
}
return null;
}

/**
* Decode a {@code nexus.v1.Link} whose {@code type} is {@code Link.NexusOperation} into a {@code
* common.v1.Link} carrying a {@link Link.NexusOperation} variant. The URL must match the format
* produced by {@link #nexusOperationToNexusLink}.
*/
public static Link nexusLinkToNexusOperation(io.temporal.api.nexus.v1.Link nexusLink) {
try {

// Lots of if statements, but this way we double-check the validity of the link
// passed in.
URI uri = new URI(nexusLink.getUrl());
if (!"temporal".equals(uri.getScheme())) {
log.error(
"Failed to parse NexusOperation Nexus link URL: invalid scheme: {}", uri.getScheme());
return null;
}

StringTokenizer st = new StringTokenizer(uri.getRawPath(), "/");
if (!st.hasMoreTokens() || !"namespaces".equals(st.nextToken())) {
log.error(
"Failed to parse NexusOperation Nexus link URL: invalid path: {}", uri.getRawPath());
return null;
}
if (!st.hasMoreTokens()) {
log.error(
"Failed to parse NexusOperation Nexus link URL: invalid path: {}", uri.getRawPath());
return null;
}
String namespace = URLDecoder.decode(st.nextToken(), StandardCharsets.UTF_8.toString());
if (!st.hasMoreTokens() || !"nexus-operations".equals(st.nextToken())) {
log.error(
"Failed to parse NexusOperation Nexus link URL: invalid path: {}", uri.getRawPath());
return null;
}
if (!st.hasMoreTokens()) {
log.error(
"Failed to parse NexusOperation Nexus link URL: invalid path: {}", uri.getRawPath());
return null;
}
String operationId = URLDecoder.decode(st.nextToken(), StandardCharsets.UTF_8.toString());
if (!st.hasMoreTokens()) {
log.error(
"Failed to parse NexusOperation Nexus link URL: invalid path: {}", uri.getRawPath());
return null;
}
String runId = URLDecoder.decode(st.nextToken(), StandardCharsets.UTF_8.toString());
if (!st.hasMoreTokens() || !"details".equals(st.nextToken())) {
log.error(
"Failed to parse NexusOperation Nexus link URL: invalid path: {}", uri.getRawPath());
return null;
}
if (st.hasMoreTokens()) {
log.error(
"Failed to parse NexusOperation Nexus link URL: extra tokens after 'details': {}",
uri.getRawPath());
return null;
}

return Link.newBuilder()
.setNexusOperation(
Link.NexusOperation.newBuilder()
.setNamespace(namespace)
.setOperationId(operationId)
.setRunId(runId))
.build();
} catch (URISyntaxException | UnsupportedEncodingException e) {
log.error("Failed to parse NexusOperation Nexus link URL", e);
return null;
}
}

/**
* Encode a {@code common.v1.Link} into the Nexus-wire {@code nexus.v1.Link} (url, type) form,
* dispatching on the link's variant. Returns {@code null} (with a warn log) for variants the SDK
* does not yet know how to encode ({@code Activity}, {@code BatchJob}, unset) — match the
* server's link-converter behavior so we stay in lockstep.
*/
public static io.temporal.api.nexus.v1.Link commonLinkToNexusLink(Link commonLink) {
switch (commonLink.getVariantCase()) {
case WORKFLOW_EVENT:
return workflowEventToNexusLink(commonLink.getWorkflowEvent());
case NEXUS_OPERATION:
return nexusOperationToNexusLink(commonLink.getNexusOperation());
default:
log.warn(
"Cannot encode common.v1.Link variant {} as nexus.v1.Link: no encoder implemented",
commonLink.getVariantCase());
return null;
}
}

/**
* Decode a Nexus-wire {@code nexus.v1.Link} (url, type) into a {@code common.v1.Link},
* dispatching on the link's {@code type} field. Returns {@code null} (with a warn log) for types
* the SDK does not yet know how to decode.
*/
public static Link nexusLinkToCommonLink(io.temporal.api.nexus.v1.Link nexusLink) {
String type = nexusLink.getType();
if (workflowEventType.equals(type)) {
return nexusLinkToWorkflowEvent(nexusLink);
}
if (nexusOperationType.equals(type)) {
return nexusLinkToNexusOperation(nexusLink);
}
log.warn(
"Cannot decode nexus.v1.Link of type '{}' to common.v1.Link:"
+ " no decoder implemented (url='{}')",
type,
nexusLink.getUrl());
return null;
}

private static Map<String, String> parseQueryParams(URI uri) throws UnsupportedEncodingException {
final String query = uri.getQuery();
if (query == null || query.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
import io.temporal.common.interceptors.NexusOperationOutboundCallsInterceptor;
import io.temporal.nexus.NexusOperationContext;
import io.temporal.nexus.NexusOperationInfo;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class InternalNexusOperationContext {
private final String namespace;
Expand All @@ -14,7 +17,23 @@ public class InternalNexusOperationContext {
private final Scope metricScope;
private final WorkflowClient client;
NexusOperationOutboundCallsInterceptor outboundCalls;
Link startWorkflowResponseLink;
// Links extracted from the inbound Nexus task. Stored once at the task-handler boundary so the
// workflow client (signal, signalWithStart) can attach them to outgoing requests via
// SignalWorkflowExecutionRequest.links.
private List<Link> nexusOperationLinks = Collections.emptyList();
// Backlinks returned by outbound RPCs the operation handler issues (currently
// SignalWorkflowExecutionResponse.link and SignalWithStartWorkflowExecutionResponse.signal_link).
// One entry per outbound RPC that returned a link. Drained by the task handler when building
// StartOperationResponse so each RPC the handler issued gets a corresponding link on the caller
// workflow's history event.
//
// This context is only safe for use from the single thread that runs the operation handler (the
// Nexus task executor's thread). The mutators below assert this contract; a stray cross-thread
// call fails fast rather than silently corrupting the ArrayList.
private final List<Link> responseBacklinks = new ArrayList<>();
// Captured at construction (on the Nexus task executor's thread) and used to fail fast on any
// cross-thread mutation. See note on responseBacklinks.
private final Thread ownerThread;

public InternalNexusOperationContext(
String namespace,
Expand All @@ -27,6 +46,7 @@ public InternalNexusOperationContext(
this.endpoint = endpoint;
this.metricScope = metricScope;
this.client = client;
this.ownerThread = Thread.currentThread();
}

public Scope getMetricsScope() {
Expand Down Expand Up @@ -60,12 +80,36 @@ public NexusOperationContext getUserFacingContext() {
return new NexusOperationContextImpl();
}

public void setStartWorkflowResponseLink(Link link) {
this.startWorkflowResponseLink = link;
/**
* Set the {@code common.v1.Link}s extracted from the inbound Nexus task so they can be attached
* to RPCs issued by the operation handler.
*/
Comment thread
Evanthx marked this conversation as resolved.
public void setNexusOperationLinks(List<Link> links) {
this.nexusOperationLinks = links == null ? Collections.emptyList() : links;
}

public Link getStartWorkflowResponseLink() {
return startWorkflowResponseLink;
/** Links from the inbound Nexus task; empty if none. Never null. */
public List<Link> getNexusOperationLinks() {
return nexusOperationLinks;
}

/**
* Append a backlink returned by an outbound RPC the operation handler issued (signal or
* signalWithStart). The task handler drains the list when building the operation's
* StartOperationResponse.
*/
public void addBacklink(Link link) {
if (link != null) {
this.responseBacklinks.add(link);
}
}

/**
* Backlinks from every outbound RPC the handler issued. Never null; may be empty. Returned as an
* unmodifiable view; callers must not attempt to mutate.
*/
public List<Link> getBacklinks() {
return Collections.unmodifiableList(responseBacklinks);
}

private class NexusOperationContextImpl implements NexusOperationContext {
Expand Down
Loading
Loading