Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

import java.net.URI;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -116,10 +114,10 @@ public Object readObject() {
return readCustomObject();

case URI:
return URI.create(readString());
return readURI();

case OFFSET_DATE_TIME:
return OffsetDateTime.ofInstant(readInstant(), ZoneOffset.of(readString()));
return readOffsetDateTime();

default:
throw new IllegalStateException("Unsupported type " + type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ public WorkflowOutputBuffer writeObject(Object object) {
writeInstant(value);
} else if (object instanceof OffsetDateTime value) {
writeType(Type.OFFSET_DATE_TIME);
writeInstant(value.toInstant());
writeString(value.getOffset().toString());
writeOffsetDateTime(value);
} else if (object instanceof URI value) {
writeType(Type.URI);
writeString(value.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ public static byte[] writeString(WorkflowBufferFactory factory, String value) {
return writeValue(factory, value, (b, v) -> b.writeString(v));
}

public static byte[] writeOffsetDateTime(WorkflowBufferFactory factory, OffsetDateTime value) {
return writeValue(factory, value, (b, v) -> b.writeOffsetDateTime(v));
}

public static byte[] writeURI(WorkflowBufferFactory factory, URI value) {
return writeValue(factory, value, (b, v) -> b.writeURI(v));
}
Comment thread
fjtirado marked this conversation as resolved.

public static byte[] writeCloudEventExtensions(WorkflowBufferFactory factory, CloudEvent event) {
try (ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
WorkflowOutputBuffer out = factory.output(bytesOut)) {
Expand All @@ -99,6 +107,9 @@ public static void writeCloudEventExtensions(WorkflowOutputBuffer out, CloudEven

public static CloudEventBuilder readCloudEventExtensions(
WorkflowBufferFactory factory, byte[] value, CloudEventBuilder builder) {
if (value == null) {
return builder;
}
try (ByteArrayInputStream bytesInt = new ByteArrayInputStream(value);
WorkflowInputBuffer in = factory.input(bytesInt)) {
return readCloudEventExtenstions(in, value, builder);
Expand Down Expand Up @@ -162,6 +173,14 @@ public static Instant readInstant(WorkflowBufferFactory factory, byte[] value) {
return readValue(factory, value, WorkflowInputBuffer::readInstant);
}

public static OffsetDateTime readOffsetDateTime(WorkflowBufferFactory factory, byte[] value) {
return readValue(factory, value, WorkflowInputBuffer::readOffsetDateTime);
}

public static URI readURI(WorkflowBufferFactory factory, byte[] value) {
return readValue(factory, value, WorkflowInputBuffer::readURI);
}

public static <T extends Enum<T>> T readEnum(
WorkflowBufferFactory factory, byte[] value, Class<T> enumClass) {
return readValue(factory, value, b -> b.readEnum(enumClass));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
package io.serverlessworkflow.impl.marshaller;

import java.io.Closeable;
import java.net.URI;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.Collection;
import java.util.Map;

Expand All @@ -40,6 +43,14 @@ public interface WorkflowInputBuffer extends Closeable {

byte[] readBytes();

default OffsetDateTime readOffsetDateTime() {
return OffsetDateTime.ofInstant(readInstant(), ZoneOffset.of(readString()));
}

default URI readURI() {
return URI.create(readString());
}

<T extends Enum<T>> T readEnum(Class<T> enumClass);

Instant readInstant();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
*/
package io.serverlessworkflow.impl.marshaller;

import java.net.URI;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.util.Collection;
import java.util.Map;

Expand All @@ -41,6 +43,17 @@ public interface WorkflowOutputBuffer extends AutoCloseable {

WorkflowOutputBuffer writeInstant(Instant instant);

default WorkflowOutputBuffer writeOffsetDateTime(OffsetDateTime time) {
writeInstant(time.toInstant());
writeString(time.getOffset().toString());
return this;
}

default WorkflowOutputBuffer writeURI(URI uri) {
writeString(uri.toString());
return this;
}

WorkflowOutputBuffer writeMap(Map<String, Object> map);

WorkflowOutputBuffer writeCollection(Collection<Object> col);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

import io.cloudevents.CloudEvent;
import io.serverlessworkflow.impl.events.EventRegistrationBuilder;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.function.Consumer;

Expand All @@ -37,7 +37,7 @@ public static AllStrategyCorrelationInfo instance() {

private InMemoryAllStrategyCorrelationInfo() {}

private Map<EventRegistrationBuilder, List<CloudEvent>> correlatedEvents;
private Map<EventRegistrationBuilder, Collection<CloudEvent>> correlatedEvents;
private Consumer<Map<EventRegistrationBuilder, CloudEvent>> starter;

@Override
Expand All @@ -48,9 +48,11 @@ public void correlate(EventRegistrationBuilder reg, CloudEvent event) {
synchronized (correlatedEvents) {
correlatedEvents.get(reg).add(event);
if (satisfyCondition(correlatedEvents)) {
for (java.util.Map.Entry<EventRegistrationBuilder, List<CloudEvent>> values :
for (java.util.Map.Entry<EventRegistrationBuilder, Collection<CloudEvent>> values :
correlatedEvents.entrySet()) {
result.put(values.getKey(), values.getValue().remove(0));
Iterator<CloudEvent> iter = values.getValue().iterator();
result.put(values.getKey(), iter.next());
iter.remove();
}
}
}
Expand All @@ -65,11 +67,11 @@ public void init(
Consumer<Map<EventRegistrationBuilder, CloudEvent>> starter) {
correlatedEvents = new HashMap<>();
this.starter = starter;
regs.forEach(reg -> correlatedEvents.put(reg, new ArrayList<CloudEvent>()));
regs.forEach(reg -> correlatedEvents.put(reg, new LinkedHashSet<CloudEvent>()));
}

private boolean satisfyCondition(Map<EventRegistrationBuilder, List<CloudEvent>> events) {
for (List<CloudEvent> values : events.values()) {
private boolean satisfyCondition(Map<EventRegistrationBuilder, Collection<CloudEvent>> events) {
for (Collection<CloudEvent> values : events.values()) {
if (values.isEmpty()) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,10 @@ public ScheduledEventConsumer(
builderInfo.registrations().registrations();
allStrategyCorrelationInfo.init(registrationBuilders, this::start);
registrationBuilders.forEach(
reg -> {
registrations.add(
eventConsumer.register(
reg, ce -> allStrategyCorrelationInfo.correlate(reg, (CloudEvent) ce)));
});
reg ->
registrations.add(
eventConsumer.register(
reg, ce -> allStrategyCorrelationInfo.correlate(reg, (CloudEvent) ce))));
} else {
builderInfo
.registrations()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -80,8 +82,16 @@ private void queueCorrelation(
Consumer<Map<EventRegistrationBuilder, CloudEvent>> starter) {
synchronized (this) {
this.completableFuture =
completableFuture.thenCompose(
v -> executor.execute(() -> doTransaction(function), definition));
completableFuture
.thenCompose(v -> executor.execute(() -> doTransaction(function), definition))
.exceptionally(
ex -> {
logger.error(
"Exception processing correlation task for definition {}",
definition.id(),
ex);
return List.of();
});
completableFuture.thenAccept(events -> events.forEach(starter));
}
}
Expand All @@ -90,40 +100,60 @@ private Collection<Map<EventRegistrationBuilder, CloudEvent>> eventAdded(
CorrelationOperations operations, String reg, CloudEvent event) {
logger.debug(
"Received event {} for definition {} and registration {}", event, definition.id(), reg);
Map<String, Collection<CloudEvent>> events = initMap();
operations.retrieveEvents(events);
events.get(reg).add(event);
Collection<Map<EventRegistrationBuilder, CloudEvent>> result = checkCorrelation(events);
operations.storeEvent(reg, event);
return checkCorrelation(operations);
markProcessed(operations, result);
return result;
Comment thread
fjtirado marked this conversation as resolved.
}

private Map<String, Collection<CloudEvent>> initMap() {
return id2RegMapping.keySet().stream()
.collect(Collectors.toMap(k -> k, k -> new LinkedHashSet<>()));
Copy link
Copy Markdown
Collaborator Author

@fjtirado fjtirado May 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The IA was right, although unlikely, we should prevent duplicate Id in case the implementor messed up, so using a LinkedHashSet (see my comment about SequenceCollection)

Comment thread
fjtirado marked this conversation as resolved.
}

private Collection<Map<EventRegistrationBuilder, CloudEvent>> startupCheck(
CorrelationOperations operations) {
logger.debug("Checking cloud events for definition {}", definition.id());
operations.clearProcessed();
return checkCorrelation(operations);
Map<String, Collection<CloudEvent>> events = initMap();
operations.retrieveEvents(events);
Collection<Map<EventRegistrationBuilder, CloudEvent>> result = checkCorrelation(events);
markProcessed(operations, result);
return result;
}

private final Collection<Map<EventRegistrationBuilder, CloudEvent>> checkCorrelation(
CorrelationOperations operations) {
Map<String, List<CloudEvent>> events = operations.retrieveEvents(id2RegMapping.keySet());
Map<String, Collection<CloudEvent>> events) {
logger.debug("Stored CloudEvents for definition {} are {}", definition.id(), events);
if (events.isEmpty()) {
return List.of();
}
Collection<Map<EventRegistrationBuilder, CloudEvent>> result = new ArrayList<>();
Map<String, Iterator<CloudEvent>> iteratingEvents =
events.entrySet().stream()
.collect(Collectors.toMap(Entry::getKey, e -> e.getValue().iterator()));
Comment on lines +132 to +134
Copy link
Copy Markdown
Collaborator Author

@fjtirado fjtirado May 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally I would like to use SequenceCollection rather than just Collection here to avoid the map of iterators (and just call removeFirst over the populated collection), but this interface was added in Java 21 and we need to be compliant with JDK 17.

Comment thread
fjtirado marked this conversation as resolved.
boolean notDone = true;
while (notDone) {
Map<EventRegistrationBuilder, CloudEvent> row = new HashMap<>();
for (Entry<String, List<CloudEvent>> item : events.entrySet()) {
List<CloudEvent> list = item.getValue();
if (list.isEmpty()) {
for (Entry<String, Iterator<CloudEvent>> item : iteratingEvents.entrySet()) {
Iterator<CloudEvent> iter = item.getValue();
if (!iter.hasNext()) {
notDone = false;
break;
}
CloudEvent retrieved = list.remove(0);
row.put(id2RegMapping.get(item.getKey()), retrieved);
row.put(id2RegMapping.get(item.getKey()), iter.next());
iter.remove();
}
if (notDone) {
result.add(row);
}
}
return result;
}

private void markProcessed(
CorrelationOperations operations,
Collection<Map<EventRegistrationBuilder, CloudEvent>> result) {
if (!result.isEmpty()) {
Map<String, Collection<String>> processed = new HashMap<>();
for (Map<EventRegistrationBuilder, CloudEvent> item : result) {
Expand All @@ -135,7 +165,6 @@ private final Collection<Map<EventRegistrationBuilder, CloudEvent>> checkCorrela
}
operations.markAsProcessed(processed);
}
return result;
}

public void addMetadata(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,11 @@

import io.cloudevents.CloudEvent;
import java.util.Collection;
import java.util.List;
import java.util.Map;

interface CorrelationOperations {
public interface CorrelationOperations {

default Map<String, List<CloudEvent>> retrieveEvents(Collection<String> targetRegIds) {
return Map.of();
}
default void retrieveEvents(Map<String, Collection<CloudEvent>> reg2EventsMap) {}

default void storeEvent(String regId, CloudEvent event) {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.Map;
import java.util.function.Function;

class OperationAllStrategyCorrelationInfo extends AbstractAllStrategyCorrelationInfo {
public class OperationAllStrategyCorrelationInfo extends AbstractAllStrategyCorrelationInfo {

private final PersistenceInstanceOperations operations;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class StoreAllStrategyCorrelationInfo extends AbstractAllStrategyCorrelationInfo {
public class StoreAllStrategyCorrelationInfo extends AbstractAllStrategyCorrelationInfo {

private static final Logger logger =
LoggerFactory.getLogger(StoreAllStrategyCorrelationInfo.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@
import io.serverlessworkflow.impl.persistence.PersistenceInstanceTransaction;
import io.serverlessworkflow.impl.persistence.PersistenceTaskInfo;
import io.serverlessworkflow.impl.persistence.PersistenceWorkflowInfo;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
Expand Down Expand Up @@ -113,26 +110,28 @@ public void clearStatus(WorkflowContextData workflowContext) {
clearStatus(workflowContext.definition(), key(workflowContext));
}

public Map<String, List<CloudEvent>> retrieveEvents(Collection<String> targetRegIds) {
Map<String, List<CloudEvent>> result = new HashMap<>();
targetRegIds.forEach(
regId -> {
Map<String, P> processedCes = processedCloudEvents(regId);
Map<String, C> ces = cloudEvents(regId);
result.put(
regId,
ces.values().stream()
@Override
public void retrieveEvents(Map<String, Collection<CloudEvent>> events) {
events
.entrySet()
.forEach(
e -> {
String regId = e.getKey();
Collection<CloudEvent> ces = e.getValue();
Map<String, P> processedCes = processedCloudEvents(regId);
cloudEvents(regId).values().stream()
.map(this::unmarshallCloudEvent)
Comment thread
fjtirado marked this conversation as resolved.
.filter(ce -> !processedCes.containsKey(ce.getId()))
.collect(Collectors.toCollection(ArrayList::new)));
});
return result;
.forEach(ces::add);
});
}

@Override
public void storeEvent(String regId, CloudEvent event) {
cloudEvents(regId).put(event.getId(), marshallCloudEvent(event));
}

@Override
public void markAsProcessed(Map<String, Collection<String>> regCeIds) {
regCeIds.forEach(
(k, v) -> {
Expand All @@ -141,6 +140,7 @@ public void markAsProcessed(Map<String, Collection<String>> regCeIds) {
});
}

@Override
public void clearProcessed() {
deleteAllProcessedMaps();
}
Expand Down
Loading
Loading