Skip to content

[Fix #1395] Refining All strategy correlation persistence approach#1398

Merged
fjtirado merged 2 commits into
serverlessworkflow:mainfrom
fjtirado:Fix_#1395a
May 26, 2026
Merged

[Fix #1395] Refining All strategy correlation persistence approach#1398
fjtirado merged 2 commits into
serverlessworkflow:mainfrom
fjtirado:Fix_#1395a

Conversation

@fjtirado
Copy link
Copy Markdown
Collaborator

@fjtirado fjtirado commented May 25, 2026

There were some implicit constraints in the map to be returned by retrieveEvents method, which
makes it better to pass the Map already populated with the expected registrations associated to an empty modifiable list. This will enforce implementors just to add the cloud events associated to every registration list (they are not responsible to create the map or the list any longer)

Also, the event should be stored after the correlation check, reducing the likeness of using it for calculations in a different cluster, event when not suitable db locking strategy exist.

Copilot AI review requested due to automatic review settings May 25, 2026 11:17
@fjtirado fjtirado changed the title [Fix #1395] Refining approach [Fix #1395] Refining All strategy correlation persistence approach May 25, 2026
@fjtirado fjtirado requested a review from ricardozanini May 25, 2026 11:18
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Refines the persistence-backed “all-strategy” event correlation flow by changing retrieveEvents from returning a newly-built map to instead populating a caller-provided map, and by adjusting correlation to work off a pre-initialized regId→events structure.

Changes:

  • Updated CorrelationOperations.retrieveEvents to a side-effecting void retrieveEvents(Map<String, List<CloudEvent>> ...) API.
  • Refactored AbstractAllStrategyCorrelationInfo to initialize the regId→events map up-front, load persisted events into it, then run correlation and mark processed.
  • Updated the BigMap persistence implementation to match the new retrieveEvents signature (plus minor formatting changes in ScheduledEventConsumer).

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.

File Description
impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceTransaction.java Updates BigMap persistence implementation to the new side-effecting retrieveEvents API.
impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/CorrelationOperations.java Changes retrieveEvents contract from returning a map to mutating a provided one.
impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractAllStrategyCorrelationInfo.java Refactors correlation flow to use an initialized regId→events map and new retrieveEvents contract.
impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledEventConsumer.java Minor lambda formatting adjustment (no functional change).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

There were some implicit constraints in the map to be returned by
retrieveEvents.
I think it is better to pass the Map already populated with the expected
registrations associated to an empty modifiable array and let the
implementor just add the cloud events to every array.

Also, the event can be stored at the end, reducing the likeness of using
it for calculations in a different cluster

Signed-off-by: fjtirado <ftirados@redhat.com>
Copilot AI review requested due to automatic review settings May 25, 2026 11:48
Comment on lines +127 to +129
Map<String, Iterator<CloudEvent>> iteratingEvents =
events.entrySet().stream()
.collect(Collectors.toMap(Entry::getKey, e -> e.getValue().iterator()));
Copy link
Copy Markdown
Collaborator Author

@fjtirado fjtirado May 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally I would like to use SequenceCollection rather than just Collection here to avoid the map of iterators (and just call removeFirst over the populated collection), but this interface was added in Java 21 and we need to be compliant with JDK 17.


private Map<String, Collection<CloudEvent>> initMap() {
return id2RegMapping.keySet().stream()
.collect(Collectors.toMap(k -> k, k -> new LinkedHashSet<>()));
Copy link
Copy Markdown
Collaborator Author

@fjtirado fjtirado May 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The IA was right, although unlikely, we should prevent duplicate Id in case the implementor messed up, so using a LinkedHashSet (see my comment about SequenceCollection)

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 4 out of 4 changed files in this pull request and generated 1 comment.

Comments suppressed due to low confidence (1)

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractAllStrategyCorrelationInfo.java:100

  • eventAdded now adds the incoming event to the correlation set unconditionally after retrieveEvents(...). If the same CloudEvent id was already marked as processed, it will have been filtered out of retrieveEvents, but this code re-introduces it and can correlate/start a workflow again for an already-processed event id (a regression vs the previous flow where the event was stored first and then filtered by processedCes). Consider checking the processed state for (regId,eventId) before adding/processing the incoming event (e.g., add an isProcessed(regId, eventId) operation, or have retrieveEvents/another API return processed ids so duplicates can be dropped).
    Map<String, Collection<CloudEvent>> events = initMap();
    operations.retrieveEvents(events);
    events.get(reg).add(event);
    Collection<Map<EventRegistrationBuilder, CloudEvent>> result = checkCorrelation(events);
    operations.storeEvent(reg, event);
    markProcessed(operations, result);

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

Copilot AI review requested due to automatic review settings May 25, 2026 14:40
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

@fjtirado fjtirado marked this pull request as draft May 25, 2026 16:32
@fjtirado fjtirado marked this pull request as ready for review May 25, 2026 17:12
Copilot AI review requested due to automatic review settings May 25, 2026 17:12
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 13 out of 13 changed files in this pull request and generated 3 comments.

Copilot AI review requested due to automatic review settings May 25, 2026 17:25
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 13 out of 13 changed files in this pull request and generated 2 comments.

Copy link
Copy Markdown
Member

@ricardozanini ricardozanini left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you planning to add tests to validate this change? I think we can spam the same workflow on different threads and check for correlation.

@fjtirado
Copy link
Copy Markdown
Collaborator Author

fjtirado commented May 26, 2026

Are you planning to add tests to validate this change? I think we can spam the same workflow on different threads and check for correlation.

Good point
The events are already sent in different threads and thats why there is a syncrhonized block here https://github.com/serverlessworkflow/sdk-java/blob/main/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractAllStrategyCorrelationInfo.java#L81-L86. so the test using different workflows will be redundant, in the sense multitthread scenarion (within the same JVM) is already covered
However, I think upstream (testing the DB implementation layer) we need to add more test with different JVMs to ensure the locking policy accross JVMS (which is not handled here) is consistent.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 13 out of 13 changed files in this pull request and generated 1 comment.

Copilot AI review requested due to automatic review settings May 26, 2026 09:48
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 13 out of 13 changed files in this pull request and generated 2 comments.

Signed-off-by: fjtirado <ftirados@redhat.com>
@fjtirado fjtirado marked this pull request as draft May 26, 2026 11:17
@fjtirado fjtirado marked this pull request as ready for review May 26, 2026 11:47
Copilot AI review requested due to automatic review settings May 26, 2026 11:47
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

@fjtirado fjtirado merged commit 07e5243 into serverlessworkflow:main May 26, 2026
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants