-
Notifications
You must be signed in to change notification settings - Fork 56
[Fix #1395] Refining All strategy correlation persistence approach #1398
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,6 +23,8 @@ | |
| import java.util.ArrayList; | ||
| import java.util.Collection; | ||
| import java.util.HashMap; | ||
| import java.util.Iterator; | ||
| import java.util.LinkedHashSet; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Map.Entry; | ||
|
|
@@ -80,8 +82,16 @@ private void queueCorrelation( | |
| Consumer<Map<EventRegistrationBuilder, CloudEvent>> starter) { | ||
| synchronized (this) { | ||
| this.completableFuture = | ||
| completableFuture.thenCompose( | ||
| v -> executor.execute(() -> doTransaction(function), definition)); | ||
| completableFuture | ||
| .thenCompose(v -> executor.execute(() -> doTransaction(function), definition)) | ||
| .exceptionally( | ||
| ex -> { | ||
| logger.error( | ||
| "Exception processing correlation task for definition {}", | ||
| definition.id(), | ||
| ex); | ||
| return List.of(); | ||
| }); | ||
| completableFuture.thenAccept(events -> events.forEach(starter)); | ||
| } | ||
| } | ||
|
|
@@ -90,40 +100,60 @@ private Collection<Map<EventRegistrationBuilder, CloudEvent>> eventAdded( | |
| CorrelationOperations operations, String reg, CloudEvent event) { | ||
| logger.debug( | ||
| "Received event {} for definition {} and registration {}", event, definition.id(), reg); | ||
| Map<String, Collection<CloudEvent>> events = initMap(); | ||
| operations.retrieveEvents(events); | ||
| events.get(reg).add(event); | ||
| Collection<Map<EventRegistrationBuilder, CloudEvent>> result = checkCorrelation(events); | ||
| operations.storeEvent(reg, event); | ||
| return checkCorrelation(operations); | ||
| markProcessed(operations, result); | ||
| return result; | ||
|
fjtirado marked this conversation as resolved.
|
||
| } | ||
|
|
||
| private Map<String, Collection<CloudEvent>> initMap() { | ||
| return id2RegMapping.keySet().stream() | ||
| .collect(Collectors.toMap(k -> k, k -> new LinkedHashSet<>())); | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The IA was right, although unlikely, we should prevent duplicate Id in case the implementor messed up, so using a LinkedHashSet (see my comment about SequenceCollection)
fjtirado marked this conversation as resolved.
|
||
| } | ||
|
|
||
| private Collection<Map<EventRegistrationBuilder, CloudEvent>> startupCheck( | ||
| CorrelationOperations operations) { | ||
| logger.debug("Checking cloud events for definition {}", definition.id()); | ||
| operations.clearProcessed(); | ||
| return checkCorrelation(operations); | ||
| Map<String, Collection<CloudEvent>> events = initMap(); | ||
| operations.retrieveEvents(events); | ||
| Collection<Map<EventRegistrationBuilder, CloudEvent>> result = checkCorrelation(events); | ||
| markProcessed(operations, result); | ||
| return result; | ||
| } | ||
|
|
||
| private final Collection<Map<EventRegistrationBuilder, CloudEvent>> checkCorrelation( | ||
| CorrelationOperations operations) { | ||
| Map<String, List<CloudEvent>> events = operations.retrieveEvents(id2RegMapping.keySet()); | ||
| Map<String, Collection<CloudEvent>> events) { | ||
| logger.debug("Stored CloudEvents for definition {} are {}", definition.id(), events); | ||
| if (events.isEmpty()) { | ||
| return List.of(); | ||
| } | ||
| Collection<Map<EventRegistrationBuilder, CloudEvent>> result = new ArrayList<>(); | ||
| Map<String, Iterator<CloudEvent>> iteratingEvents = | ||
| events.entrySet().stream() | ||
| .collect(Collectors.toMap(Entry::getKey, e -> e.getValue().iterator())); | ||
|
Comment on lines
+132
to
+134
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ideally I would like to use SequenceCollection rather than just Collection here to avoid the map of iterators (and just call removeFirst over the populated collection), but this interface was added in Java 21 and we need to be compliant with JDK 17.
fjtirado marked this conversation as resolved.
|
||
| boolean notDone = true; | ||
| while (notDone) { | ||
| Map<EventRegistrationBuilder, CloudEvent> row = new HashMap<>(); | ||
| for (Entry<String, List<CloudEvent>> item : events.entrySet()) { | ||
| List<CloudEvent> list = item.getValue(); | ||
| if (list.isEmpty()) { | ||
| for (Entry<String, Iterator<CloudEvent>> item : iteratingEvents.entrySet()) { | ||
| Iterator<CloudEvent> iter = item.getValue(); | ||
| if (!iter.hasNext()) { | ||
| notDone = false; | ||
| break; | ||
| } | ||
| CloudEvent retrieved = list.remove(0); | ||
| row.put(id2RegMapping.get(item.getKey()), retrieved); | ||
| row.put(id2RegMapping.get(item.getKey()), iter.next()); | ||
| iter.remove(); | ||
| } | ||
| if (notDone) { | ||
| result.add(row); | ||
| } | ||
| } | ||
| return result; | ||
| } | ||
|
|
||
| private void markProcessed( | ||
| CorrelationOperations operations, | ||
| Collection<Map<EventRegistrationBuilder, CloudEvent>> result) { | ||
| if (!result.isEmpty()) { | ||
| Map<String, Collection<String>> processed = new HashMap<>(); | ||
| for (Map<EventRegistrationBuilder, CloudEvent> item : result) { | ||
|
|
@@ -135,7 +165,6 @@ private final Collection<Map<EventRegistrationBuilder, CloudEvent>> checkCorrela | |
| } | ||
| operations.markAsProcessed(processed); | ||
| } | ||
| return result; | ||
| } | ||
|
|
||
| public void addMetadata( | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.