From e9a28210229e80f12c9ed33769179eadd423e91d Mon Sep 17 00:00:00 2001 From: fjtirado Date: Mon, 25 May 2026 13:15:22 +0200 Subject: [PATCH 1/2] [Fix #1395] Refining approach There were some implicit constraints in the map to be returned by retrieveEvents. I think it is better to pass the Map already populated with the expected registrations associated to an empty modifiable array and let the implementor just add the cloud events to every array. Also, the event can be stored at the end, reducing the likeness of using it for calculations in a different cluster Signed-off-by: fjtirado --- .../scheduler/ScheduledEventConsumer.java | 9 +++--- .../AbstractAllStrategyCorrelationInfo.java | 32 +++++++++++++++---- .../persistence/CorrelationOperations.java | 4 +-- .../bigmap/BigMapInstanceTransaction.java | 29 +++++++++-------- 4 files changed, 45 insertions(+), 29 deletions(-) diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledEventConsumer.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledEventConsumer.java index 09c0931b2..bc70833ae 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledEventConsumer.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledEventConsumer.java @@ -60,11 +60,10 @@ public ScheduledEventConsumer( builderInfo.registrations().registrations(); allStrategyCorrelationInfo.init(registrationBuilders, this::start); registrationBuilders.forEach( - reg -> { - registrations.add( - eventConsumer.register( - reg, ce -> allStrategyCorrelationInfo.correlate(reg, (CloudEvent) ce))); - }); + reg -> + registrations.add( + eventConsumer.register( + reg, ce -> allStrategyCorrelationInfo.correlate(reg, (CloudEvent) ce)))); } else { builderInfo .registrations() diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractAllStrategyCorrelationInfo.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractAllStrategyCorrelationInfo.java index 8e915313c..18c9e2939 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractAllStrategyCorrelationInfo.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractAllStrategyCorrelationInfo.java @@ -90,19 +90,33 @@ private Collection> eventAdded( CorrelationOperations operations, String reg, CloudEvent event) { logger.debug( "Received event {} for definition {} and registration {}", event, definition.id(), reg); + Map> events = initMap(); + operations.retrieveEvents(events); + events.get(reg).add(event); + Collection> result = checkCorrelation(events); operations.storeEvent(reg, event); - return checkCorrelation(operations); + markProcessed(operations, result); + return result; + } + + private Map> initMap() { + return id2RegMapping.keySet().stream() + .collect(Collectors.toMap(k -> k, k -> new ArrayList<>())); } private Collection> startupCheck( CorrelationOperations operations) { + logger.debug("Checking cloud events for definition {}", definition.id()); operations.clearProcessed(); - return checkCorrelation(operations); + Map> events = initMap(); + operations.retrieveEvents(events); + Collection> result = checkCorrelation(events); + markProcessed(operations, result); + return result; } private final Collection> checkCorrelation( - CorrelationOperations operations) { - Map> events = operations.retrieveEvents(id2RegMapping.keySet()); + Map> events) { logger.debug("Stored CloudEvents for definition {} are {}", definition.id(), events); if (events.isEmpty()) { return List.of(); @@ -117,13 +131,18 @@ private final Collection> checkCorrela notDone = false; break; } - CloudEvent retrieved = list.remove(0); - row.put(id2RegMapping.get(item.getKey()), retrieved); + row.put(id2RegMapping.get(item.getKey()), list.remove(0)); } if (notDone) { result.add(row); } } + return result; + } + + private void markProcessed( + CorrelationOperations operations, + Collection> result) { if (!result.isEmpty()) { Map> processed = new HashMap<>(); for (Map item : result) { @@ -135,7 +154,6 @@ private final Collection> checkCorrela } operations.markAsProcessed(processed); } - return result; } public void addMetadata( diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/CorrelationOperations.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/CorrelationOperations.java index 7aeb7a8f9..03b19da86 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/CorrelationOperations.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/CorrelationOperations.java @@ -22,9 +22,7 @@ interface CorrelationOperations { - default Map> retrieveEvents(Collection targetRegIds) { - return Map.of(); - } + default void retrieveEvents(Map> reg2EventsMap) {} default void storeEvent(String regId, CloudEvent event) {} diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceTransaction.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceTransaction.java index e76acaa9b..7e214ddfa 100644 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceTransaction.java +++ b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceTransaction.java @@ -27,9 +27,7 @@ import io.serverlessworkflow.impl.persistence.PersistenceInstanceTransaction; import io.serverlessworkflow.impl.persistence.PersistenceTaskInfo; import io.serverlessworkflow.impl.persistence.PersistenceWorkflowInfo; -import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -113,26 +111,28 @@ public void clearStatus(WorkflowContextData workflowContext) { clearStatus(workflowContext.definition(), key(workflowContext)); } - public Map> retrieveEvents(Collection targetRegIds) { - Map> result = new HashMap<>(); - targetRegIds.forEach( - regId -> { - Map processedCes = processedCloudEvents(regId); - Map ces = cloudEvents(regId); - result.put( - regId, - ces.values().stream() + @Override + public void retrieveEvents(Map> events) { + events + .entrySet() + .forEach( + e -> { + String regId = e.getKey(); + List cloudEvents = e.getValue(); + Map processedCes = processedCloudEvents(regId); + cloudEvents(regId).values().stream() .map(this::unmarshallCloudEvent) .filter(ce -> !processedCes.containsKey(ce.getId())) - .collect(Collectors.toCollection(ArrayList::new))); - }); - return result; + .forEach(cloudEvents::add); + }); } + @Override public void storeEvent(String regId, CloudEvent event) { cloudEvents(regId).put(event.getId(), marshallCloudEvent(event)); } + @Override public void markAsProcessed(Map> regCeIds) { regCeIds.forEach( (k, v) -> { @@ -141,6 +141,7 @@ public void markAsProcessed(Map> regCeIds) { }); } + @Override public void clearProcessed() { deleteAllProcessedMaps(); } From b213e67b945a74c26d2230f7d65b390429df2cbd Mon Sep 17 00:00:00 2001 From: fjtirado Date: Mon, 25 May 2026 13:48:51 +0200 Subject: [PATCH 2/2] [Fix #1395] Handling duplicated id Signed-off-by: fjtirado --- .../impl/marshaller/AbstractInputBuffer.java | 6 +-- .../impl/marshaller/AbstractOutputBuffer.java | 3 +- .../impl/marshaller/MarshallingUtils.java | 19 +++++++++ .../impl/marshaller/WorkflowInputBuffer.java | 11 ++++++ .../impl/marshaller/WorkflowOutputBuffer.java | 13 +++++++ .../InMemoryAllStrategyCorrelationInfo.java | 18 +++++---- .../AbstractAllStrategyCorrelationInfo.java | 39 ++++++++++++------- .../persistence/CorrelationOperations.java | 5 +-- .../OperationAllStrategyCorrelationInfo.java | 2 +- .../StoreAllStrategyCorrelationInfo.java | 2 +- .../bigmap/BigMapInstanceTransaction.java | 7 ++-- .../bigmap/BytesMapInstanceTransaction.java | 7 +++- 12 files changed, 93 insertions(+), 39 deletions(-) diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/AbstractInputBuffer.java b/impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/AbstractInputBuffer.java index c91914edd..d3379b2f3 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/AbstractInputBuffer.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/AbstractInputBuffer.java @@ -17,8 +17,6 @@ import java.net.URI; import java.time.Instant; -import java.time.OffsetDateTime; -import java.time.ZoneOffset; import java.util.ArrayList; import java.util.Collection; import java.util.LinkedHashMap; @@ -116,10 +114,10 @@ public Object readObject() { return readCustomObject(); case URI: - return URI.create(readString()); + return readURI(); case OFFSET_DATE_TIME: - return OffsetDateTime.ofInstant(readInstant(), ZoneOffset.of(readString())); + return readOffsetDateTime(); default: throw new IllegalStateException("Unsupported type " + type); diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/AbstractOutputBuffer.java b/impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/AbstractOutputBuffer.java index 75fae0341..1b943d876 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/AbstractOutputBuffer.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/AbstractOutputBuffer.java @@ -104,8 +104,7 @@ public WorkflowOutputBuffer writeObject(Object object) { writeInstant(value); } else if (object instanceof OffsetDateTime value) { writeType(Type.OFFSET_DATE_TIME); - writeInstant(value.toInstant()); - writeString(value.getOffset().toString()); + writeOffsetDateTime(value); } else if (object instanceof URI value) { writeType(Type.URI); writeString(value.toString()); diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/MarshallingUtils.java b/impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/MarshallingUtils.java index 347d682f8..7444e7726 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/MarshallingUtils.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/MarshallingUtils.java @@ -78,6 +78,14 @@ public static byte[] writeString(WorkflowBufferFactory factory, String value) { return writeValue(factory, value, (b, v) -> b.writeString(v)); } + public static byte[] writeOffsetDateTime(WorkflowBufferFactory factory, OffsetDateTime value) { + return writeValue(factory, value, (b, v) -> b.writeOffsetDateTime(v)); + } + + public static byte[] writeURI(WorkflowBufferFactory factory, URI value) { + return writeValue(factory, value, (b, v) -> b.writeURI(v)); + } + public static byte[] writeCloudEventExtensions(WorkflowBufferFactory factory, CloudEvent event) { try (ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); WorkflowOutputBuffer out = factory.output(bytesOut)) { @@ -99,6 +107,9 @@ public static void writeCloudEventExtensions(WorkflowOutputBuffer out, CloudEven public static CloudEventBuilder readCloudEventExtensions( WorkflowBufferFactory factory, byte[] value, CloudEventBuilder builder) { + if (value == null) { + return builder; + } try (ByteArrayInputStream bytesInt = new ByteArrayInputStream(value); WorkflowInputBuffer in = factory.input(bytesInt)) { return readCloudEventExtenstions(in, value, builder); @@ -162,6 +173,14 @@ public static Instant readInstant(WorkflowBufferFactory factory, byte[] value) { return readValue(factory, value, WorkflowInputBuffer::readInstant); } + public static OffsetDateTime readOffsetDateTime(WorkflowBufferFactory factory, byte[] value) { + return readValue(factory, value, WorkflowInputBuffer::readOffsetDateTime); + } + + public static URI readURI(WorkflowBufferFactory factory, byte[] value) { + return readValue(factory, value, WorkflowInputBuffer::readURI); + } + public static > T readEnum( WorkflowBufferFactory factory, byte[] value, Class enumClass) { return readValue(factory, value, b -> b.readEnum(enumClass)); diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/WorkflowInputBuffer.java b/impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/WorkflowInputBuffer.java index f45567dfa..53a600e95 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/WorkflowInputBuffer.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/WorkflowInputBuffer.java @@ -16,7 +16,10 @@ package io.serverlessworkflow.impl.marshaller; import java.io.Closeable; +import java.net.URI; import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; import java.util.Collection; import java.util.Map; @@ -40,6 +43,14 @@ public interface WorkflowInputBuffer extends Closeable { byte[] readBytes(); + default OffsetDateTime readOffsetDateTime() { + return OffsetDateTime.ofInstant(readInstant(), ZoneOffset.of(readString())); + } + + default URI readURI() { + return URI.create(readString()); + } + > T readEnum(Class enumClass); Instant readInstant(); diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/WorkflowOutputBuffer.java b/impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/WorkflowOutputBuffer.java index f7ec25cf5..59d22ef4f 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/WorkflowOutputBuffer.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/WorkflowOutputBuffer.java @@ -15,7 +15,9 @@ */ package io.serverlessworkflow.impl.marshaller; +import java.net.URI; import java.time.Instant; +import java.time.OffsetDateTime; import java.util.Collection; import java.util.Map; @@ -41,6 +43,17 @@ public interface WorkflowOutputBuffer extends AutoCloseable { WorkflowOutputBuffer writeInstant(Instant instant); + default WorkflowOutputBuffer writeOffsetDateTime(OffsetDateTime time) { + writeInstant(time.toInstant()); + writeString(time.getOffset().toString()); + return this; + } + + default WorkflowOutputBuffer writeURI(URI uri) { + writeString(uri.toString()); + return this; + } + WorkflowOutputBuffer writeMap(Map map); WorkflowOutputBuffer writeCollection(Collection col); diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/InMemoryAllStrategyCorrelationInfo.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/InMemoryAllStrategyCorrelationInfo.java index 540a94814..2ff3b2ae1 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/InMemoryAllStrategyCorrelationInfo.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/InMemoryAllStrategyCorrelationInfo.java @@ -17,10 +17,10 @@ import io.cloudevents.CloudEvent; import io.serverlessworkflow.impl.events.EventRegistrationBuilder; -import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; -import java.util.List; +import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.Map; import java.util.function.Consumer; @@ -37,7 +37,7 @@ public static AllStrategyCorrelationInfo instance() { private InMemoryAllStrategyCorrelationInfo() {} - private Map> correlatedEvents; + private Map> correlatedEvents; private Consumer> starter; @Override @@ -48,9 +48,11 @@ public void correlate(EventRegistrationBuilder reg, CloudEvent event) { synchronized (correlatedEvents) { correlatedEvents.get(reg).add(event); if (satisfyCondition(correlatedEvents)) { - for (java.util.Map.Entry> values : + for (java.util.Map.Entry> values : correlatedEvents.entrySet()) { - result.put(values.getKey(), values.getValue().remove(0)); + Iterator iter = values.getValue().iterator(); + result.put(values.getKey(), iter.next()); + iter.remove(); } } } @@ -65,11 +67,11 @@ public void init( Consumer> starter) { correlatedEvents = new HashMap<>(); this.starter = starter; - regs.forEach(reg -> correlatedEvents.put(reg, new ArrayList())); + regs.forEach(reg -> correlatedEvents.put(reg, new LinkedHashSet())); } - private boolean satisfyCondition(Map> events) { - for (List values : events.values()) { + private boolean satisfyCondition(Map> events) { + for (Collection values : events.values()) { if (values.isEmpty()) { return false; } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractAllStrategyCorrelationInfo.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractAllStrategyCorrelationInfo.java index 18c9e2939..166030138 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractAllStrategyCorrelationInfo.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractAllStrategyCorrelationInfo.java @@ -23,6 +23,8 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -80,8 +82,16 @@ private void queueCorrelation( Consumer> starter) { synchronized (this) { this.completableFuture = - completableFuture.thenCompose( - v -> executor.execute(() -> doTransaction(function), definition)); + completableFuture + .thenCompose(v -> executor.execute(() -> doTransaction(function), definition)) + .exceptionally( + ex -> { + logger.error( + "Exception processing correlation task for definition {}", + definition.id(), + ex); + return List.of(); + }); completableFuture.thenAccept(events -> events.forEach(starter)); } } @@ -90,7 +100,7 @@ private Collection> eventAdded( CorrelationOperations operations, String reg, CloudEvent event) { logger.debug( "Received event {} for definition {} and registration {}", event, definition.id(), reg); - Map> events = initMap(); + Map> events = initMap(); operations.retrieveEvents(events); events.get(reg).add(event); Collection> result = checkCorrelation(events); @@ -99,16 +109,16 @@ private Collection> eventAdded( return result; } - private Map> initMap() { + private Map> initMap() { return id2RegMapping.keySet().stream() - .collect(Collectors.toMap(k -> k, k -> new ArrayList<>())); + .collect(Collectors.toMap(k -> k, k -> new LinkedHashSet<>())); } private Collection> startupCheck( CorrelationOperations operations) { logger.debug("Checking cloud events for definition {}", definition.id()); operations.clearProcessed(); - Map> events = initMap(); + Map> events = initMap(); operations.retrieveEvents(events); Collection> result = checkCorrelation(events); markProcessed(operations, result); @@ -116,22 +126,23 @@ private Collection> startupCheck( } private final Collection> checkCorrelation( - Map> events) { + Map> events) { logger.debug("Stored CloudEvents for definition {} are {}", definition.id(), events); - if (events.isEmpty()) { - return List.of(); - } Collection> result = new ArrayList<>(); + Map> iteratingEvents = + events.entrySet().stream() + .collect(Collectors.toMap(Entry::getKey, e -> e.getValue().iterator())); boolean notDone = true; while (notDone) { Map row = new HashMap<>(); - for (Entry> item : events.entrySet()) { - List list = item.getValue(); - if (list.isEmpty()) { + for (Entry> item : iteratingEvents.entrySet()) { + Iterator iter = item.getValue(); + if (!iter.hasNext()) { notDone = false; break; } - row.put(id2RegMapping.get(item.getKey()), list.remove(0)); + row.put(id2RegMapping.get(item.getKey()), iter.next()); + iter.remove(); } if (notDone) { result.add(row); diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/CorrelationOperations.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/CorrelationOperations.java index 03b19da86..dacaa8655 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/CorrelationOperations.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/CorrelationOperations.java @@ -17,12 +17,11 @@ import io.cloudevents.CloudEvent; import java.util.Collection; -import java.util.List; import java.util.Map; -interface CorrelationOperations { +public interface CorrelationOperations { - default void retrieveEvents(Map> reg2EventsMap) {} + default void retrieveEvents(Map> reg2EventsMap) {} default void storeEvent(String regId, CloudEvent event) {} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/OperationAllStrategyCorrelationInfo.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/OperationAllStrategyCorrelationInfo.java index 1b5a77b5d..3dbba48a9 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/OperationAllStrategyCorrelationInfo.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/OperationAllStrategyCorrelationInfo.java @@ -22,7 +22,7 @@ import java.util.Map; import java.util.function.Function; -class OperationAllStrategyCorrelationInfo extends AbstractAllStrategyCorrelationInfo { +public class OperationAllStrategyCorrelationInfo extends AbstractAllStrategyCorrelationInfo { private final PersistenceInstanceOperations operations; diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/StoreAllStrategyCorrelationInfo.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/StoreAllStrategyCorrelationInfo.java index 3a148a249..17d4b9cde 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/StoreAllStrategyCorrelationInfo.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/StoreAllStrategyCorrelationInfo.java @@ -24,7 +24,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class StoreAllStrategyCorrelationInfo extends AbstractAllStrategyCorrelationInfo { +public class StoreAllStrategyCorrelationInfo extends AbstractAllStrategyCorrelationInfo { private static final Logger logger = LoggerFactory.getLogger(StoreAllStrategyCorrelationInfo.class); diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceTransaction.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceTransaction.java index 7e214ddfa..92a2d808f 100644 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceTransaction.java +++ b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceTransaction.java @@ -28,7 +28,6 @@ import io.serverlessworkflow.impl.persistence.PersistenceTaskInfo; import io.serverlessworkflow.impl.persistence.PersistenceWorkflowInfo; import java.util.Collection; -import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; @@ -112,18 +111,18 @@ public void clearStatus(WorkflowContextData workflowContext) { } @Override - public void retrieveEvents(Map> events) { + public void retrieveEvents(Map> events) { events .entrySet() .forEach( e -> { String regId = e.getKey(); - List cloudEvents = e.getValue(); + Collection ces = e.getValue(); Map processedCes = processedCloudEvents(regId); cloudEvents(regId).values().stream() .map(this::unmarshallCloudEvent) .filter(ce -> !processedCes.containsKey(ce.getId())) - .forEach(cloudEvents::add); + .forEach(ces::add); }); } diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceTransaction.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceTransaction.java index ae8be0a41..a0fa994e3 100644 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceTransaction.java +++ b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceTransaction.java @@ -39,6 +39,7 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.net.URI; +import java.time.OffsetDateTime; public abstract class BytesMapInstanceTransaction extends BigMapInstanceTransaction { @@ -205,7 +206,8 @@ protected byte[] marshallCloudEvent(CloudEvent event) { writer.writeEnum(event.getSpecVersion()); writer.writeString(event.getId()); writer.writeString(event.getType()); - writer.writeString(event.getSource().toString()); + writer.writeURI(event.getSource()); + writer.writeObject(event.getTime()); writer.writeObject(event.getSubject()); writer.writeObject(event.getDataSchema()); writer.writeObject(event.getDataContentType()); @@ -224,7 +226,8 @@ protected CloudEvent unmarshallCloudEvent(byte[] eventData) { CloudEventBuilder.fromSpecVersion(reader.readEnum(SpecVersion.class)); builder.withId(reader.readString()); builder.withType(reader.readString()); - builder.withSource(URI.create(reader.readString())); + builder.withSource(reader.readURI()); + builder.withTime((OffsetDateTime) reader.readObject()); builder.withSubject((String) reader.readObject()); builder.withDataSchema((URI) reader.readObject()); builder.withDataContentType((String) reader.readObject());