diff --git a/.cz.toml b/.cz.toml index 29117b5..ab20c79 100644 --- a/.cz.toml +++ b/.cz.toml @@ -2,5 +2,5 @@ name = "cz_conventional_commits" tag_format = "v$version" version_scheme = "semver" -version = "2.0.0" +version = "2.0.1" update_changelog_on_bump = true diff --git a/docs/snapshots-patching-and-generalization.md b/docs/snapshots-patching-and-generalization.md index 98768c7..39197a7 100644 --- a/docs/snapshots-patching-and-generalization.md +++ b/docs/snapshots-patching-and-generalization.md @@ -348,25 +348,61 @@ Changed paths include: ## Processor Transaction Flow -`DocumentProcessingRuntime.applyPatch(...)` now works roughly like this: +`DocumentProcessingRuntime.applyPatches(...)` now works roughly like this: ```text -rollback = current mutable materialized view baseSnapshot = current snapshot or snapshotManager.fromDocument(...) -canonicalPlan = ImmutablePatchPlanner(baseSnapshot.canonical).plan(...) -resolvedPlan = ImmutablePatchPlanner(baseSnapshot.resolved).plan(...) -conformancePlan = ConformanceEngine.planGeneralization(...) -if generalized: - commit generalized snapshot -else: - commit normal canonical patch snapshot +for each patch: + canonicalPlan = ImmutablePatchPlanner(working canonical root).plan(...) + resolvedPlan = ImmutablePatchPlanner(working resolved root).plan(...) + remember frozen before/after update metadata +conformancePlan = ConformanceEngine.planGeneralization(..., changedPaths) +commit final canonical/resolved roots once on failure: - restore rollback and previous snapshot + restore previous snapshot if one was active ``` +Batch conformance selects changed paths whose final resolved path has typed +metadata at the changed node or one of its ancestors up to the origin scope. +This catches typed descendants below an otherwise untyped root, including list +`itemType` and dictionary `valueType` paths, while leaving unrelated untyped +processor-managed writes out of the conformance planner. + The mutable `Node` view is now a compatibility adapter generated from the canonical snapshot. Snapshot state is authoritative. +## Batch Patch Application + +`ProcessorExecutionContext.applyPatches(List)` applies a changeset +atomically. + +Semantics: + +- patches are applied in order +- duplicate paths are preserved +- if any patch fails, the full batch rolls back +- the mutable materialized root is not deep-copied before a batch; planning runs + on frozen roots and the materialized view changes only at commit +- conformance/generalization is planned over the final working roots +- the runtime commits once +- document update events are returned and routed in patch order after the batch + commit +- update `before` values describe the value at the patch path immediately before + that patch entry was applied +- update `after` values normally describe the committed post-conformance value + at the patch path; if a later patch in the same batch overlaps that path, the + earlier update keeps its patch-time intermediate `after` value so duplicate + and add/remove patch-entry order remains observable +- update before/after values stay frozen-backed and materialize to `Node` only + when a matching `DocumentUpdateChannel` needs an event or a caller explicitly + reads `before()` / `after()` +- batch timing and update materialization counters are exposed package-privately + for tests and performance investigation +- `applyPatch` delegates to `applyPatches(singletonList(...))` + +This is the preferred path for workflow steps such as `Conversation/Update +Document` that apply a computed changeset. + ## Gas And Caching Resolved snapshot/type caches affect CPU and provider fetches, not gas. @@ -405,4 +441,6 @@ Still missing: - `ConformanceEngineTest` - `DocumentProcessorSnapshotTransactionTest` - `DocumentProcessorGeneralizationTest` +- `DocumentProcessingRuntimeBatchPatchTest` +- `DocumentProcessorBatchPatchTest` - `DocumentProcessorGasTest` diff --git a/src/main/java/blue/language/Blue.java b/src/main/java/blue/language/Blue.java index 7b72c7c..1d41a1d 100644 --- a/src/main/java/blue/language/Blue.java +++ b/src/main/java/blue/language/Blue.java @@ -26,6 +26,7 @@ import blue.language.snapshot.ResolvedSnapshot; import blue.language.utils.*; import blue.language.utils.limits.CompositeLimits; +import blue.language.utils.limits.ExcludedPathLimits; import blue.language.utils.limits.Limits; import java.util.ArrayList; @@ -40,6 +41,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.function.Predicate; import static blue.language.utils.UncheckedObjectMapper.JSON_MAPPER; import static blue.language.utils.UncheckedObjectMapper.YAML_MAPPER; @@ -109,6 +111,52 @@ public Node resolve(Node node, Limits limits) { return merger.resolve(node, effectiveLimits); } + public Node resolvePreservingPaths(Node node, Collection preservedPaths) { + return resolvePreservingPaths(node, NO_LIMITS, preservedPaths); + } + + public Node resolvePreservingPaths(Node node, Limits limits, Collection preservedPaths) { + if (node == null) { + throw new IllegalArgumentException("node must not be null"); + } + Set canonicalPreservedPaths = canonicalPreservedPaths(preservedPaths); + if (canonicalPreservedPaths.isEmpty()) { + return resolve(node.clone(), limits); + } + if (canonicalPreservedPaths.contains("/")) { + return node.clone(); + } + + Limits preservingLimits = limits == NO_LIMITS + ? ExcludedPathLimits.excluding(canonicalPreservedPaths) + : new CompositeLimits(limits, ExcludedPathLimits.excluding(canonicalPreservedPaths)); + Node resolved = resolve(node.clone(), preservingLimits); + for (String path : canonicalPreservedPaths) { + Node preserved = NodePathEditor.getOrNull(node, path); + if (preserved != null) { + NodePathEditor.put(resolved, path, preserved.clone()); + } + } + return resolved; + } + + public List selectPaths(Node node, Collection pathPatterns, Predicate predicate) { + return NodePathSelector.select(node, pathPatterns, predicate); + } + + public Node resolvePreservingMatchingPaths(Node node, + Collection pathPatterns, + Predicate predicate) { + return resolvePreservingMatchingPaths(node, NO_LIMITS, pathPatterns, predicate); + } + + public Node resolvePreservingMatchingPaths(Node node, + Limits limits, + Collection pathPatterns, + Predicate predicate) { + return resolvePreservingPaths(node, limits, selectPaths(node, pathPatterns, predicate)); + } + public Node reverse(Node node) { return new MergeReverser().reverse(node); } @@ -383,11 +431,23 @@ public Blue registerContractProcessor(String blueId, ContractProcessor canonicalPreservedPaths(Collection preservedPaths) { + if (preservedPaths == null || preservedPaths.isEmpty()) { + return Collections.emptySet(); + } + Set canonicalPaths = new HashSet<>(); + for (String preservedPath : preservedPaths) { + canonicalPaths.add(JsonPointer.canonicalize(preservedPath)); + } + return canonicalPaths; + } + private NodeProvider processorSnapshotNodeProvider() { Set processorTypeBlueIds = new HashSet<>(PROCESSOR_MANAGED_TYPE_BLUE_IDS); if (documentProcessor != null) { diff --git a/src/main/java/blue/language/conformance/ConformanceEngine.java b/src/main/java/blue/language/conformance/ConformanceEngine.java index 0148d84..18a5146 100644 --- a/src/main/java/blue/language/conformance/ConformanceEngine.java +++ b/src/main/java/blue/language/conformance/ConformanceEngine.java @@ -9,6 +9,8 @@ import blue.language.utils.NodeProviderWrapper; import blue.language.utils.limits.Limits; +import java.util.ArrayList; +import java.util.List; import java.util.Objects; public final class ConformanceEngine { @@ -60,4 +62,38 @@ public ConformancePlan planGeneralization(FrozenNode canonicalRoot, FrozenNode r return new FrozenConformancePlanner(nodeProvider, mergingProcessor, resolvedReferenceCache) .plan(canonicalRoot, resolvedRoot, changedPath); } + + public ConformancePlan planGeneralization(FrozenNode canonicalRoot, + FrozenNode resolvedRoot, + List changedPaths) { + if (changedPaths == null || changedPaths.isEmpty()) { + return ConformancePlan.unchanged(canonicalRoot, resolvedRoot); + } + FrozenNode nextCanonical = canonicalRoot; + FrozenNode nextResolved = resolvedRoot; + boolean generalized = false; + List canonicalPatches = new ArrayList<>(); + List allChangedPaths = new ArrayList<>(); + FrozenConformancePlanner planner = new FrozenConformancePlanner(nodeProvider, + mergingProcessor, + resolvedReferenceCache); + for (String changedPath : changedPaths) { + ConformancePlan plan = planner.plan(nextCanonical, nextResolved, changedPath); + nextCanonical = plan.canonicalRoot() != null ? plan.canonicalRoot() : nextCanonical; + nextResolved = plan.root(); + if (plan.generalized()) { + generalized = true; + canonicalPatches.addAll(plan.canonicalPatches()); + allChangedPaths.addAll(plan.changedPaths()); + } + } + if (!generalized) { + return ConformancePlan.unchanged(nextCanonical, nextResolved); + } + return ConformancePlan.generalized(nextCanonical, + nextResolved, + canonicalPatches, + allChangedPaths, + nextCanonical != null); + } } diff --git a/src/main/java/blue/language/conformance/ConformancePlan.java b/src/main/java/blue/language/conformance/ConformancePlan.java index c217035..f87e584 100644 --- a/src/main/java/blue/language/conformance/ConformancePlan.java +++ b/src/main/java/blue/language/conformance/ConformancePlan.java @@ -50,6 +50,19 @@ public static ConformancePlan unchanged(FrozenNode canonicalRoot, FrozenNode roo canonicalRoot != null); } + public static ConformancePlan generalized(FrozenNode canonicalRoot, + FrozenNode root, + List canonicalPatches, + List changedPaths, + boolean fullSnapshotRebuildAvoidable) { + return new ConformancePlan(canonicalRoot, + root, + true, + canonicalPatches, + changedPaths, + fullSnapshotRebuildAvoidable); + } + public FrozenNode canonicalRoot() { return canonicalRoot; } diff --git a/src/main/java/blue/language/merge/Merger.java b/src/main/java/blue/language/merge/Merger.java index 371a80c..7040434 100644 --- a/src/main/java/blue/language/merge/Merger.java +++ b/src/main/java/blue/language/merge/Merger.java @@ -115,8 +115,11 @@ private void mergeObject(Node target, Node source, Limits limits) { properties.forEach((key, value) -> { if (limits.shouldMergePathSegment(key, value)) { limits.enterPathSegment(key, value); - mergeProperty(target, key, value, limits); - limits.exitPathSegment(); + try { + mergeProperty(target, key, value, limits); + } finally { + limits.exitPathSegment(); + } } }); } @@ -137,17 +140,17 @@ private void mergeChildren(Node target, List sourceChildren, Limits limits if (targetChildren == null) { if (startsWithPrevious(sourceChildren)) { - targetChildren = resolvePreviousAnchor(sourceChildren.get(0), limits); + targetChildren = resolvePreviousAnchor(sourceChildren.get(0), limits, target.getItemType()); target.items(targetChildren); validatePreviousAnchor(targetChildren, sourceChildren.get(0)); if (LIST_MERGE_POLICY_APPEND_ONLY.equals(mergePolicy)) { - mergeAppendOnlyChildren(targetChildren, sourceChildren, limits); + mergeAppendOnlyChildren(targetChildren, sourceChildren, limits, target.getItemType()); } else { - mergePositionalChildren(targetChildren, sourceChildren, limits); + mergePositionalChildren(targetChildren, sourceChildren, limits, target.getItemType()); } return; } - targetChildren = resolveInitialChildren(sourceChildren, limits); + targetChildren = resolveInitialChildren(sourceChildren, limits, target.getItemType()); target.items(targetChildren); return; } @@ -157,13 +160,13 @@ private void mergeChildren(Node target, List sourceChildren, Limits limits } if (LIST_MERGE_POLICY_APPEND_ONLY.equals(mergePolicy)) { - mergeAppendOnlyChildren(targetChildren, sourceChildren, limits); + mergeAppendOnlyChildren(targetChildren, sourceChildren, limits, target.getItemType()); } else { - mergePositionalChildren(targetChildren, sourceChildren, limits); + mergePositionalChildren(targetChildren, sourceChildren, limits, target.getItemType()); } } - private List resolveInitialChildren(List sourceChildren, Limits limits) { + private List resolveInitialChildren(List sourceChildren, Limits limits, Node itemType) { List result = new ArrayList<>(); int start = startsWithPrevious(sourceChildren) ? 1 : 0; for (int i = start; i < sourceChildren.size(); i++) { @@ -175,7 +178,7 @@ private List resolveInitialChildren(List sourceChildren, Limits limi } child = withoutPosition(child); } - Node resolvedChild = resolveListChild(child, limits, String.valueOf(result.size())); + Node resolvedChild = resolveListChild(child, limits, String.valueOf(result.size()), itemType); if (resolvedChild != null) { result.add(resolvedChild); } @@ -183,9 +186,9 @@ private List resolveInitialChildren(List sourceChildren, Limits limi return result; } - private void mergeAppendOnlyChildren(List targetChildren, List sourceChildren, Limits limits) { + private void mergeAppendOnlyChildren(List targetChildren, List sourceChildren, Limits limits, Node itemType) { if (startsWithPrevious(sourceChildren)) { - appendChildren(targetChildren, sourceChildren, 1, limits); + appendChildren(targetChildren, sourceChildren, 1, limits, itemType); return; } @@ -197,13 +200,13 @@ private void mergeAppendOnlyChildren(List targetChildren, List sourc for (int i = 0; i < sourceChildren.size(); i++) { if (i >= targetChildren.size()) { - Node resolvedChild = resolveListChild(sourceChildren.get(i), limits, String.valueOf(i)); + Node resolvedChild = resolveListChild(sourceChildren.get(i), limits, String.valueOf(i), itemType); if (resolvedChild != null) { targetChildren.add(resolvedChild); } continue; } - Node sourceChild = resolveListChild(sourceChildren.get(i), limits, String.valueOf(i)); + Node sourceChild = resolveListChild(sourceChildren.get(i), limits, String.valueOf(i), itemType); if (sourceChild == null) { continue; } @@ -217,16 +220,16 @@ private void mergeAppendOnlyChildren(List targetChildren, List sourc } } - private void mergePositionalChildren(List targetChildren, List sourceChildren, Limits limits) { + private void mergePositionalChildren(List targetChildren, List sourceChildren, Limits limits, Node itemType) { boolean hasPositionControls = sourceChildren.stream().anyMatch(child -> child.getPosition() != null); int start = startsWithPrevious(sourceChildren) ? 1 : 0; if (!hasPositionControls) { if (startsWithPrevious(sourceChildren)) { - appendChildren(targetChildren, sourceChildren, start, limits); + appendChildren(targetChildren, sourceChildren, start, limits, itemType); return; } - mergeLegacyPositionalChildren(targetChildren, sourceChildren, start, limits); + mergeLegacyPositionalChildren(targetChildren, sourceChildren, start, limits, itemType); return; } @@ -241,9 +244,9 @@ private void mergePositionalChildren(List targetChildren, List sourc if (!positions.add(position)) { throw new IllegalArgumentException("Duplicate \"$pos\" value in list: " + position); } - mergeOrReplacePosition(targetChildren, position, withoutPosition(sourceChild), limits); + mergeOrReplacePosition(targetChildren, position, withoutPosition(sourceChild), limits, itemType); } else { - Node resolvedChild = resolveListChild(sourceChild, limits, String.valueOf(targetChildren.size())); + Node resolvedChild = resolveListChild(sourceChild, limits, String.valueOf(targetChildren.size()), itemType); if (resolvedChild != null) { targetChildren.add(resolvedChild); } @@ -251,7 +254,7 @@ private void mergePositionalChildren(List targetChildren, List sourc } } - private void mergeLegacyPositionalChildren(List targetChildren, List sourceChildren, int start, Limits limits) { + private void mergeLegacyPositionalChildren(List targetChildren, List sourceChildren, int start, Limits limits, Node itemType) { int sourceLength = sourceChildren.size() - start; if (sourceLength < targetChildren.size()) { throw new IllegalArgumentException(String.format( @@ -263,7 +266,7 @@ private void mergeLegacyPositionalChildren(List targetChildren, List for (int i = 0; i < sourceLength; i++) { Node sourceChild = sourceChildren.get(start + i); if (i >= targetChildren.size()) { - Node resolvedChild = resolveListChild(sourceChild, limits, String.valueOf(i)); + Node resolvedChild = resolveListChild(sourceChild, limits, String.valueOf(i), itemType); if (resolvedChild != null) { targetChildren.add(resolvedChild); } @@ -273,16 +276,19 @@ private void mergeLegacyPositionalChildren(List targetChildren, List } } - private void mergeOrReplacePosition(List targetChildren, int position, Node overlay, Limits limits) { + private void mergeOrReplacePosition(List targetChildren, int position, Node overlay, Limits limits, Node itemType) { + Node effectiveItemType = targetChildren.get(position).getType() != null + ? targetChildren.get(position).getType() + : itemType; if (isEmptyPlaceholder(targetChildren.get(position)) || overlay.getValue() != null || overlay.getItems() != null) { - Node resolvedChild = resolveListChild(overlay, limits, String.valueOf(position)); + Node resolvedChild = resolveListChild(overlay, limits, String.valueOf(position), effectiveItemType); if (resolvedChild != null) { targetChildren.set(position, resolvedChild); } return; } if (overlay.getType() != null) { - Node resolvedOverlay = resolveListChild(overlay, limits, String.valueOf(position)); + Node resolvedOverlay = resolveListChild(overlay, limits, String.valueOf(position), effectiveItemType); if (resolvedOverlay != null) { mergeObject(targetChildren.get(position), resolvedOverlay, limits); } @@ -291,16 +297,16 @@ private void mergeOrReplacePosition(List targetChildren, int position, Nod merge(targetChildren.get(position), overlay, limits); } - private void appendChildren(List targetChildren, List sourceChildren, int start, Limits limits) { + private void appendChildren(List targetChildren, List sourceChildren, int start, Limits limits, Node itemType) { for (int i = start; i < sourceChildren.size(); i++) { - Node resolvedChild = resolveListChild(sourceChildren.get(i), limits, String.valueOf(targetChildren.size())); + Node resolvedChild = resolveListChild(sourceChildren.get(i), limits, String.valueOf(targetChildren.size()), itemType); if (resolvedChild != null) { targetChildren.add(resolvedChild); } } } - private List resolvePreviousAnchor(Node previousAnchor, Limits limits) { + private List resolvePreviousAnchor(Node previousAnchor, Limits limits, Node itemType) { List fetched = nodeProvider.fetchByBlueId(previousAnchor.getPreviousBlueId()); if (fetched == null || fetched.isEmpty()) { throw new IllegalArgumentException("No content found for $previous blueId: " + previousAnchor.getPreviousBlueId()); @@ -311,7 +317,7 @@ private List resolvePreviousAnchor(Node previousAnchor, Limits limits) { : fetched; List resolved = new ArrayList<>(); for (int i = 0; i < previousChildren.size(); i++) { - Node resolvedChild = resolveListChild(previousChildren.get(i), limits, String.valueOf(i)); + Node resolvedChild = resolveListChild(previousChildren.get(i), limits, String.valueOf(i), itemType); if (resolvedChild != null) { resolved.add(resolvedChild); } @@ -342,7 +348,7 @@ private boolean isEmptyPlaceholder(Node node) { && node.getValueType() == null; } - private Node resolveListChild(Node child, Limits limits, String segment) { + private Node resolveListChild(Node child, Limits limits, String segment, Node itemType) { if (child.getPreviousBlueId() != null || child.getPosition() != null) { throw new IllegalArgumentException("List control items must be consumed before resolving list children."); } @@ -351,12 +357,26 @@ private Node resolveListChild(Node child, Limits limits, String segment) { } limits.enterPathSegment(segment, child); try { - return resolve(child, limits); + return resolve(applyItemType(child, itemType), limits); } finally { limits.exitPathSegment(); } } + private Node applyItemType(Node child, Node itemType) { + if (child.getType() != null || child.getBlueId() != null || itemType == null) { + return child; + } + return child.clone().type(itemTypeReference(itemType)); + } + + private Node itemTypeReference(Node itemType) { + if (itemType.getBlueId() != null) { + return new Node().blueId(itemType.getBlueId()); + } + return itemType.clone(); + } + private Node withoutPosition(Node node) { Node clone = node.clone(); clone.position(null); diff --git a/src/main/java/blue/language/model/Node.java b/src/main/java/blue/language/model/Node.java index 5e4cb3b..d5a539c 100644 --- a/src/main/java/blue/language/model/Node.java +++ b/src/main/java/blue/language/model/Node.java @@ -72,6 +72,10 @@ public Object getValue() { return value; } + public Object getRawValue() { + return value; + } + public List getItems() { return items; } @@ -324,6 +328,10 @@ public Node getAsNode(String path) { return (Node) get(path); } + public Node getNode(String path) { + return NodePathAccessor.getNode(this, path); + } + public String getAsText(String path) { return (String) get(path); } diff --git a/src/main/java/blue/language/processor/BatchPatchRecord.java b/src/main/java/blue/language/processor/BatchPatchRecord.java new file mode 100644 index 0000000..aaea9f3 --- /dev/null +++ b/src/main/java/blue/language/processor/BatchPatchRecord.java @@ -0,0 +1,68 @@ +package blue.language.processor; + +import blue.language.processor.model.JsonPatch; +import blue.language.snapshot.FrozenNode; + +import java.util.List; + +final class BatchPatchRecord { + + private final JsonPatch patch; + private final ImmutablePatchPlanner.PatchPlan canonicalPlan; + private final ImmutablePatchPlanner.PatchPlan resolvedPlan; + private final FrozenNode beforeAtPatchTime; + private final FrozenNode afterAtPatchTime; + private final boolean processorManagedConformanceBypass; + + BatchPatchRecord(JsonPatch patch, + ImmutablePatchPlanner.PatchPlan canonicalPlan, + ImmutablePatchPlanner.PatchPlan resolvedPlan, + boolean processorManagedConformanceBypass) { + this.patch = patch; + this.canonicalPlan = canonicalPlan; + this.resolvedPlan = resolvedPlan; + this.beforeAtPatchTime = resolvedPlan.before(); + this.afterAtPatchTime = resolvedPlan.after(); + this.processorManagedConformanceBypass = processorManagedConformanceBypass; + } + + JsonPatch patch() { + return patch; + } + + ImmutablePatchPlanner.PatchPlan canonicalPlan() { + return canonicalPlan; + } + + ImmutablePatchPlanner.PatchPlan resolvedPlan() { + return resolvedPlan; + } + + String path() { + return canonicalPlan.path(); + } + + JsonPatch.Op op() { + return canonicalPlan.op(); + } + + String originScope() { + return canonicalPlan.originScope(); + } + + List cascadeScopes() { + return canonicalPlan.cascadeScopes(); + } + + FrozenNode beforeAtPatchTime() { + return beforeAtPatchTime; + } + + FrozenNode afterAtPatchTime() { + return afterAtPatchTime; + } + + boolean processorManagedConformanceBypass() { + return processorManagedConformanceBypass; + } +} diff --git a/src/main/java/blue/language/processor/BatchPatchResult.java b/src/main/java/blue/language/processor/BatchPatchResult.java new file mode 100644 index 0000000..479786d --- /dev/null +++ b/src/main/java/blue/language/processor/BatchPatchResult.java @@ -0,0 +1,63 @@ +package blue.language.processor; + +import blue.language.snapshot.FrozenNode; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +final class BatchPatchResult { + + private final FrozenNode canonicalRoot; + private final FrozenNode resolvedRoot; + private final List updates; + private final long patchPlanningNanos; + private final long conformanceNanos; + private final long buildUpdatesNanos; + + BatchPatchResult(FrozenNode canonicalRoot, + FrozenNode resolvedRoot, + List updates) { + this(canonicalRoot, resolvedRoot, updates, 0L, 0L, 0L); + } + + BatchPatchResult(FrozenNode canonicalRoot, + FrozenNode resolvedRoot, + List updates, + long patchPlanningNanos, + long conformanceNanos, + long buildUpdatesNanos) { + this.canonicalRoot = Objects.requireNonNull(canonicalRoot, "canonicalRoot"); + this.resolvedRoot = Objects.requireNonNull(resolvedRoot, "resolvedRoot"); + this.updates = Collections.unmodifiableList(new ArrayList<>( + Objects.requireNonNull(updates, "updates"))); + this.patchPlanningNanos = patchPlanningNanos; + this.conformanceNanos = conformanceNanos; + this.buildUpdatesNanos = buildUpdatesNanos; + } + + FrozenNode canonicalRoot() { + return canonicalRoot; + } + + FrozenNode resolvedRoot() { + return resolvedRoot; + } + + List updates() { + return updates; + } + + long patchPlanningNanos() { + return patchPlanningNanos; + } + + long conformanceNanos() { + return conformanceNanos; + } + + long buildUpdatesNanos() { + return buildUpdatesNanos; + } +} diff --git a/src/main/java/blue/language/processor/BatchPatchTransaction.java b/src/main/java/blue/language/processor/BatchPatchTransaction.java new file mode 100644 index 0000000..f9bc9ad --- /dev/null +++ b/src/main/java/blue/language/processor/BatchPatchTransaction.java @@ -0,0 +1,181 @@ +package blue.language.processor; + +import blue.language.conformance.ConformanceEngine; +import blue.language.conformance.ConformancePlan; +import blue.language.processor.model.JsonPatch; +import blue.language.processor.util.PointerUtils; +import blue.language.processor.util.ProcessorPointerConstants; +import blue.language.snapshot.FrozenNode; +import blue.language.utils.JsonPointer; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +final class BatchPatchTransaction { + + private final String originScopePath; + private final List patches; + private final DocumentProcessingRuntime.PlanningContext planning; + private final ConformanceEngine conformanceEngine; + private final DocumentProcessingRuntime.UpdateMaterializationMetrics materializationMetrics; + + BatchPatchTransaction(String originScopePath, + List patches, + DocumentProcessingRuntime.PlanningContext planning, + ConformanceEngine conformanceEngine, + DocumentProcessingRuntime.UpdateMaterializationMetrics materializationMetrics) { + this.originScopePath = originScopePath; + this.patches = Collections.unmodifiableList(new ArrayList<>(patches)); + this.planning = planning; + this.conformanceEngine = conformanceEngine; + this.materializationMetrics = materializationMetrics; + } + + BatchPatchResult apply() { + long planningStart = System.nanoTime(); + FrozenNode workingCanonical = planning.baseSnapshot() != null + ? planning.baseSnapshot().frozenCanonicalRoot() + : planning.canonicalPlanner().root(); + FrozenNode workingResolved = planning.baseSnapshot() != null + ? planning.baseSnapshot().frozenResolvedRoot() + : planning.resolvedPlanner().root(); + List records = new ArrayList<>(); + for (JsonPatch patch : patches) { + ImmutablePatchPlanner.PatchPlan canonicalPlan = + ImmutablePatchPlanner.forFrozen(workingCanonical).plan(originScopePath, patch); + ImmutablePatchPlanner.PatchPlan resolvedPlan = + ImmutablePatchPlanner.forFrozen(workingResolved).plan(originScopePath, patch); + BatchPatchRecord record = new BatchPatchRecord(patch, + canonicalPlan, + resolvedPlan, + isProcessorManagedConformanceBypass(canonicalPlan)); + records.add(record); + workingCanonical = canonicalPlan.root(); + workingResolved = resolvedPlan.root(); + } + long patchPlanningNanos = System.nanoTime() - planningStart; + + long conformanceStart = System.nanoTime(); + ConformancePlan conformancePlan = planBatchConformance(workingCanonical, workingResolved, records); + long conformanceNanos = System.nanoTime() - conformanceStart; + FrozenNode finalCanonical = conformancePlan.canonicalRoot() != null + ? conformancePlan.canonicalRoot() + : workingCanonical; + FrozenNode finalResolved = conformancePlan.root(); + + long buildUpdatesStart = System.nanoTime(); + List updates = buildUpdates(records, finalResolved); + long buildUpdatesNanos = System.nanoTime() - buildUpdatesStart; + return new BatchPatchResult(finalCanonical, + finalResolved, + updates, + patchPlanningNanos, + conformanceNanos, + buildUpdatesNanos); + } + + private ConformancePlan planBatchConformance(FrozenNode canonicalRoot, + FrozenNode resolvedRoot, + List records) { + if (conformanceEngine == null) { + return ConformancePlan.unchanged(canonicalRoot, resolvedRoot); + } + List changedPaths = new ArrayList<>(); + for (BatchPatchRecord record : records) { + if (record.processorManagedConformanceBypass()) { + continue; + } + if (hasTypedNodeBetweenOriginAndPath(resolvedRoot, record.originScope(), record.path())) { + changedPaths.add(record.path()); + } + } + if (changedPaths.isEmpty()) { + return ConformancePlan.unchanged(canonicalRoot, resolvedRoot); + } + return conformanceEngine.planGeneralization(canonicalRoot, resolvedRoot, changedPaths); + } + + private boolean hasTypedNodeBetweenOriginAndPath(FrozenNode resolvedRoot, String originScope, String changedPath) { + ImmutablePatchPlanner planner = ImmutablePatchPlanner.forFrozen(resolvedRoot); + String normalizedOrigin = PointerUtils.normalizeScope(originScope); + String current = PointerUtils.normalizePointer(changedPath); + while (true) { + FrozenNode node = planner.read(current); + if (hasTypeMetadata(node)) { + return true; + } + if (current.equals(normalizedOrigin) || "/".equals(current)) { + return false; + } + current = parentPointer(current); + } + } + + private boolean hasTypeMetadata(FrozenNode node) { + return node != null + && (node.getType() != null + || node.getItemType() != null + || node.getKeyType() != null + || node.getValueType() != null); + } + + private String parentPointer(String pointer) { + List segments = JsonPointer.split(pointer); + if (segments.isEmpty()) { + return "/"; + } + return JsonPointer.toPointer(segments.subList(0, segments.size() - 1)); + } + + private List buildUpdates(List records, + FrozenNode finalResolvedRoot) { + List updates = new ArrayList<>(); + ImmutablePatchPlanner finalResolvedPlanner = ImmutablePatchPlanner.forFrozen(finalResolvedRoot); + for (BatchPatchRecord record : records) { + FrozenNode after = null; + if (record.op() != JsonPatch.Op.REMOVE) { + after = hasLaterOverlappingPatch(records, record) + ? record.afterAtPatchTime() + : finalResolvedPlanner.read(record.path()); + } + updates.add(new DocumentProcessingRuntime.DocumentUpdateData(record.path(), + record.beforeAtPatchTime(), + after, + record.op(), + record.originScope(), + record.cascadeScopes(), + materializationMetrics)); + } + return updates; + } + + private boolean hasLaterOverlappingPatch(List records, BatchPatchRecord current) { + int currentIndex = records.indexOf(current); + for (int i = currentIndex + 1; i < records.size(); i++) { + if (pathsOverlap(current.path(), records.get(i).path())) { + return true; + } + } + return false; + } + + private boolean pathsOverlap(String first, String second) { + return first.equals(second) + || isAncestorPath(first, second) + || isAncestorPath(second, first); + } + + private boolean isAncestorPath(String ancestor, String descendant) { + if ("/".equals(ancestor)) { + return !"/".equals(descendant); + } + return descendant.startsWith(ancestor + "/"); + } + + private boolean isProcessorManagedConformanceBypass(ImmutablePatchPlanner.PatchPlan result) { + String relativePath = PointerUtils.relativizePointer(result.originScope(), result.path()); + String initialized = ProcessorPointerConstants.RELATIVE_INITIALIZED; + return relativePath.equals(initialized) || relativePath.startsWith(initialized + "/"); + } +} diff --git a/src/main/java/blue/language/processor/ChannelRunner.java b/src/main/java/blue/language/processor/ChannelRunner.java index 3901a61..211975a 100644 --- a/src/main/java/blue/language/processor/ChannelRunner.java +++ b/src/main/java/blue/language/processor/ChannelRunner.java @@ -38,7 +38,15 @@ void runExternalChannel(String scopePath, } runtime.chargeChannelMatchAttempt(); ChannelContract contract = channel.contract(); - ProcessorEngine.ChannelMatch match = ProcessorEngine.evaluateChannel(owner, channel, bundle, scopePath, event); + ProcessingMetricsSink metrics = owner.metricsSink(); + metrics.incrementChannelEvaluations(); + long channelMatchStart = System.nanoTime(); + ProcessorEngine.ChannelMatch match; + try { + match = ProcessorEngine.evaluateChannel(owner, channel, bundle, scopePath, event); + } finally { + metrics.addChannelMatchNanos(System.nanoTime() - channelMatchStart); + } if (!match.matches) { return; } @@ -48,12 +56,14 @@ void runExternalChannel(String scopePath, } Node eventForHandlers = match.eventNode() != null ? match.eventNode() : event; Node checkpointEvent = event != null ? event.clone() : null; + long checkpointStart = System.nanoTime(); checkpointManager.ensureCheckpointMarker(scopePath, bundle); CheckpointManager.CheckpointRecord checkpoint = checkpointManager.findCheckpoint(bundle, channel.key()); String eventSignature = match.eventId != null ? match.eventId : ProcessorEngine.canonicalSignature(checkpointEvent); if (checkpointManager.isDuplicate(checkpoint, eventSignature)) { + metrics.addCheckpointUpdateNanos(System.nanoTime() - checkpointStart); return; } ChannelCheckpointContext checkpointContext = new ChannelCheckpointContext(scopePath, @@ -64,13 +74,17 @@ void runExternalChannel(String scopePath, checkpoint != null ? checkpoint.lastEventSignature : null, bundle.markers()); if (!match.processor.isNewerEvent(contract, checkpointContext)) { + metrics.addCheckpointUpdateNanos(System.nanoTime() - checkpointStart); return; } + metrics.addCheckpointUpdateNanos(System.nanoTime() - checkpointStart); runHandlers(scopePath, bundle, channel.key(), eventForHandlers, false); if (execution.isScopeInactive(scopePath)) { return; } + long checkpointPersistStart = System.nanoTime(); checkpointManager.persist(scopePath, bundle, checkpoint, eventSignature, checkpointEvent); + metrics.addCheckpointUpdateNanos(System.nanoTime() - checkpointPersistStart); } private void runDeliveries(String scopePath, @@ -78,8 +92,11 @@ private void runDeliveries(String scopePath, ContractBundle.ChannelBinding channel, Node checkpointEvent, ProcessorEngine.ChannelMatch match) { + ProcessingMetricsSink metrics = owner.metricsSink(); + long checkpointEnsureStart = System.nanoTime(); checkpointManager.ensureCheckpointMarker(scopePath, bundle); String fallbackSignature = ProcessorEngine.canonicalSignature(checkpointEvent); + metrics.addCheckpointUpdateNanos(System.nanoTime() - checkpointEnsureStart); for (ChannelDelivery delivery : match.deliveries()) { if (execution.isScopeInactive(scopePath)) { return; @@ -87,9 +104,11 @@ private void runDeliveries(String scopePath, String checkpointKey = delivery.checkpointKey() != null ? delivery.checkpointKey() : channel.key(); + long checkpointStart = System.nanoTime(); CheckpointManager.CheckpointRecord checkpoint = checkpointManager.findCheckpoint(bundle, checkpointKey); String eventSignature = delivery.eventId() != null ? delivery.eventId() : fallbackSignature; if (checkpointManager.isDuplicate(checkpoint, eventSignature)) { + metrics.addCheckpointUpdateNanos(System.nanoTime() - checkpointStart); continue; } Boolean shouldProcess = delivery.shouldProcess(); @@ -105,9 +124,11 @@ private void runDeliveries(String scopePath, checkpoint != null ? checkpoint.lastEventSignature : null, bundle.markers()); if (!match.processor.isNewerEvent(channel.contract(), checkpointContext)) { + metrics.addCheckpointUpdateNanos(System.nanoTime() - checkpointStart); continue; } } + metrics.addCheckpointUpdateNanos(System.nanoTime() - checkpointStart); Node eventForHandlers = delivery.eventForDelivery(); if (eventForHandlers == null) { continue; @@ -116,7 +137,9 @@ private void runDeliveries(String scopePath, if (execution.isScopeInactive(scopePath)) { return; } + long checkpointPersistStart = System.nanoTime(); checkpointManager.persist(scopePath, bundle, checkpoint, eventSignature, checkpointEvent); + metrics.addCheckpointUpdateNanos(System.nanoTime() - checkpointPersistStart); } } @@ -125,7 +148,10 @@ void runHandlers(String scopePath, String channelKey, Node event, boolean allowTerminatedWork) { + ProcessingMetricsSink metrics = owner.metricsSink(); + long discoveryStart = System.nanoTime(); List handlers = bundle.handlersFor(channelKey); + metrics.addHandlerDiscoveryNanos(System.nanoTime() - discoveryStart); if (handlers.isEmpty()) { return; } @@ -137,7 +163,15 @@ void runHandlers(String scopePath, event, bundle.markers(), owner.matchingService()); - if (!ProcessorEngine.matchesHandler(owner, handler.contract(), matchContext)) { + metrics.incrementHandlerMatchAttempts(); + long matchStart = System.nanoTime(); + boolean matches; + try { + matches = ProcessorEngine.matchesHandler(owner, handler.contract(), matchContext); + } finally { + metrics.addHandlerMatchNanos(System.nanoTime() - matchStart); + } + if (!matches) { continue; } runtime.chargeHandlerOverhead(); @@ -148,7 +182,13 @@ void runHandlers(String scopePath, handler.node(), allowTerminatedWork, false); - ProcessorEngine.executeHandler(owner, handler.contract(), context); + metrics.incrementHandlersExecuted(); + long executionStart = System.nanoTime(); + try { + ProcessorEngine.executeHandler(owner, handler.contract(), context); + } finally { + metrics.addHandlerExecutionNanos(System.nanoTime() - executionStart); + } if (execution.isScopeInactive(scopePath) && !allowTerminatedWork) { break; } diff --git a/src/main/java/blue/language/processor/ContractBundle.java b/src/main/java/blue/language/processor/ContractBundle.java index 7a7ba35..0593089 100644 --- a/src/main/java/blue/language/processor/ContractBundle.java +++ b/src/main/java/blue/language/processor/ContractBundle.java @@ -139,6 +139,33 @@ public List channelsOfType(Class type return result; } + ContractBundle copyWithRuntimeMarkers(Map runtimeMarkers, + Map runtimeMarkerNodes, + boolean runtimeCheckpointDeclared) { + Map> handlersCopy = new LinkedHashMap<>(); + for (Map.Entry> entry : handlersByChannel.entrySet()) { + handlersCopy.put(entry.getKey(), new ArrayList<>(entry.getValue())); + } + Map nodesCopy = new LinkedHashMap<>(contractNodes); + for (String key : markers.keySet()) { + nodesCopy.remove(key); + } + if (runtimeMarkerNodes != null) { + nodesCopy.putAll(runtimeMarkerNodes); + } + return new ContractBundle(new LinkedHashMap<>(channels), + new LinkedHashMap<>(channelNodes), + handlersCopy, + runtimeMarkers != null ? new LinkedHashMap<>(runtimeMarkers) : new LinkedHashMap<>(), + nodesCopy, + new ArrayList<>(embeddedPaths), + runtimeCheckpointDeclared); + } + + boolean hasStaticCheckpointDeclaration() { + return checkpointDeclared; + } + public static final class ChannelBinding { private final String key; private final ChannelContract contract; diff --git a/src/main/java/blue/language/processor/ContractLoader.java b/src/main/java/blue/language/processor/ContractLoader.java index 9006229..95f563a 100644 --- a/src/main/java/blue/language/processor/ContractLoader.java +++ b/src/main/java/blue/language/processor/ContractLoader.java @@ -1,7 +1,9 @@ package blue.language.processor; import blue.language.mapping.NodeToObjectConverter; +import blue.language.model.Node; import blue.language.processor.model.ChannelContract; +import blue.language.processor.model.ChannelEventCheckpoint; import blue.language.processor.model.Contract; import blue.language.processor.model.HandlerContract; import blue.language.processor.model.MarkerContract; @@ -15,6 +17,8 @@ import java.util.LinkedHashMap; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; /** * Parses contracts under a scope and produces a {@link ContractBundle}. @@ -24,6 +28,7 @@ final class ContractLoader { private final ContractProcessorRegistry registry; private final NodeToObjectConverter converter; private final TypeClassResolver typeResolver; + private final ConcurrentMap bundleCache = new ConcurrentHashMap<>(); ContractLoader(ContractProcessorRegistry registry, NodeToObjectConverter converter, @@ -39,6 +44,50 @@ ContractBundle load(ResolvedSnapshot snapshot, String scopePath) { } ContractBundle load(FrozenNode scopeNode, String scopePath) { + return load(scopeNode, scopePath, ProcessingMetricsSink.NOOP); + } + + ContractBundle load(FrozenNode scopeNode, String scopePath, ProcessingMetricsSink metricsSink) { + ProcessingMetricsSink metrics = metricsSink != null ? metricsSink : ProcessingMetricsSink.NOOP; + long keyStart = System.nanoTime(); + BundleCacheKey key; + try { + key = cacheKey(scopeNode, scopePath); + } finally { + metrics.addBundleLoadCacheKeyBuildNanos(System.nanoTime() - keyStart); + } + ContractBundle cached = bundleCache.get(key); + if (cached != null) { + metrics.incrementBundleLoadCacheHits(); + long reuseStart = System.nanoTime(); + try { + RuntimeMarkers runtimeMarkers = runtimeMarkers(scopeNode); + metrics.incrementBundlesReused(); + return cached.copyWithRuntimeMarkers(runtimeMarkers.markers, + runtimeMarkers.nodes, + runtimeMarkers.checkpointDeclared); + } finally { + metrics.addBundleLoadReuseNanos(System.nanoTime() - reuseStart); + } + } + + metrics.incrementBundleLoadCacheMisses(); + long buildStart = System.nanoTime(); + ContractBundle built; + try { + built = build(scopeNode, scopePath); + } finally { + metrics.addBundleLoadActualBuildNanos(System.nanoTime() - buildStart); + } + bundleCache.putIfAbsent(key, built); + metrics.incrementBundlesBuilt(); + RuntimeMarkers runtimeMarkers = runtimeMarkers(scopeNode); + return built.copyWithRuntimeMarkers(runtimeMarkers.markers, + runtimeMarkers.nodes, + runtimeMarkers.checkpointDeclared); + } + + private ContractBundle build(FrozenNode scopeNode, String scopePath) { ContractBundle.Builder builder = ContractBundle.builder(); if (scopeNode == null) { return builder.build(); @@ -118,6 +167,104 @@ ContractBundle load(FrozenNode scopeNode, String scopePath) { return builder.build(); } + private BundleCacheKey cacheKey(FrozenNode scopeNode, String scopePath) { + FrozenNode contractsNode = property(scopeNode, "contracts"); + FrozenNode channelBindingsNode = property(scopeNode, "channelBindings"); + return new BundleCacheKey(scopePath != null ? scopePath : "/", + registry.version(), + contractsSignature(contractsNode), + nodeSignature(channelBindingsNode)); + } + + private String contractsSignature(FrozenNode contractsNode) { + if (contractsNode == null) { + return ""; + } + Map properties = contractsNode.getProperties(); + if (properties == null || !properties.containsKey(ProcessorContractConstants.KEY_CHECKPOINT)) { + return nodeSignature(contractsNode); + } + StringBuilder builder = new StringBuilder(); + builder.append("contracts{"); + for (Map.Entry entry : properties.entrySet()) { + builder.append(entry.getKey()).append('='); + if (ProcessorContractConstants.KEY_CHECKPOINT.equals(entry.getKey())) { + builder.append(checkpointStaticSignature(entry.getValue())); + } else { + builder.append(nodeSignature(entry.getValue())); + } + builder.append(';'); + } + builder.append('}'); + return builder.toString(); + } + + private String checkpointStaticSignature(FrozenNode checkpointNode) { + if (checkpointNode == null) { + return ""; + } + Node node = checkpointNode.toNode(); + if (node.getProperties() != null) { + node.getProperties().remove("lastEvents"); + node.getProperties().remove("lastSignatures"); + } + return FrozenNode.fromResolvedNode(node).blueId(); + } + + private String nodeSignature(FrozenNode node) { + return node != null ? node.blueId() : ""; + } + + private FrozenNode property(FrozenNode node, String key) { + return node != null && node.getProperties() != null ? node.getProperties().get(key) : null; + } + + private RuntimeMarkers runtimeMarkers(FrozenNode scopeNode) { + Map markers = new LinkedHashMap<>(); + Map markerNodes = new LinkedHashMap<>(); + boolean checkpointDeclared = false; + FrozenNode contractsNode = property(scopeNode, "contracts"); + if (contractsNode == null || contractsNode.getProperties() == null) { + return new RuntimeMarkers(markers, markerNodes, false); + } + for (Map.Entry entry : contractsNode.getProperties().entrySet()) { + String key = entry.getKey(); + FrozenNode node = entry.getValue(); + String typeBlueId = typeBlueId(node); + if (typeBlueId == null) { + continue; + } + Class contractClass = typeResolver.resolveClass(typeBlueId); + if (contractClass == null || !MarkerContract.class.isAssignableFrom(contractClass)) { + continue; + } + Contract contract = converter.convertWithType(node.toNode(), Contract.class, false); + if (!(contract instanceof MarkerContract) || contract instanceof ProcessEmbedded) { + continue; + } + MarkerContract marker = (MarkerContract) contract; + marker.setKey(key); + marker.setTypeBlueId(typeBlueId); + if (ProcessorContractConstants.KEY_CHECKPOINT.equals(key) && !(marker instanceof ChannelEventCheckpoint)) { + throw new IllegalStateException( + "Reserved key 'checkpoint' must contain a Channel Event Checkpoint"); + } + if (marker instanceof ChannelEventCheckpoint) { + if (!ProcessorContractConstants.KEY_CHECKPOINT.equals(key)) { + throw new IllegalStateException( + "Channel Event Checkpoint must use reserved key 'checkpoint' at key '" + key + "'"); + } + if (checkpointDeclared) { + throw new IllegalStateException("Duplicate Channel Event Checkpoint markers detected in same contracts map"); + } + checkpointDeclared = true; + } + markers.put(key, marker); + markerNodes.put(key, node); + } + return new RuntimeMarkers(markers, markerNodes, checkpointDeclared); + } + @SuppressWarnings("unchecked") private String resolveHandlerChannel(String scopePath, String handlerKey, @@ -191,4 +338,55 @@ private String typeBlueId(FrozenNode node) { FrozenNode type = node.getType(); return type.getReferenceBlueId() != null ? type.getReferenceBlueId() : type.blueId(); } + + private static final class RuntimeMarkers { + final Map markers; + final Map nodes; + final boolean checkpointDeclared; + + RuntimeMarkers(Map markers, + Map nodes, + boolean checkpointDeclared) { + this.markers = markers; + this.nodes = nodes; + this.checkpointDeclared = checkpointDeclared; + } + } + + private static final class BundleCacheKey { + private final String scopePath; + private final long registryVersion; + private final String contractsSignature; + private final String channelBindingsSignature; + + BundleCacheKey(String scopePath, + long registryVersion, + String contractsSignature, + String channelBindingsSignature) { + this.scopePath = scopePath; + this.registryVersion = registryVersion; + this.contractsSignature = contractsSignature; + this.channelBindingsSignature = channelBindingsSignature; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (!(other instanceof BundleCacheKey)) { + return false; + } + BundleCacheKey that = (BundleCacheKey) other; + return registryVersion == that.registryVersion + && Objects.equals(scopePath, that.scopePath) + && Objects.equals(contractsSignature, that.contractsSignature) + && Objects.equals(channelBindingsSignature, that.channelBindingsSignature); + } + + @Override + public int hashCode() { + return Objects.hash(scopePath, registryVersion, contractsSignature, channelBindingsSignature); + } + } } diff --git a/src/main/java/blue/language/processor/ContractProcessorRegistry.java b/src/main/java/blue/language/processor/ContractProcessorRegistry.java index 50cb44c..e44bb5d 100644 --- a/src/main/java/blue/language/processor/ContractProcessorRegistry.java +++ b/src/main/java/blue/language/processor/ContractProcessorRegistry.java @@ -24,6 +24,7 @@ public class ContractProcessorRegistry { private final Map> handlerProcessorsByBlueId = new LinkedHashMap<>(); private final Map> channelProcessorsByBlueId = new LinkedHashMap<>(); private final Map> markerProcessorsByBlueId = new LinkedHashMap<>(); + private long version; public void registerHandler(HandlerProcessor processor) { Objects.requireNonNull(processor, "processor"); @@ -129,6 +130,10 @@ public Map> processors() { return Collections.unmodifiableMap(processorsByBlueId); } + long version() { + return version; + } + private void registerBlueIds(Class contractType, ContractProcessor processor) { Objects.requireNonNull(contractType, "contractType"); @@ -152,6 +157,7 @@ private void registerBlueIds(Class contractType, Contrac private void registerBlueId(String blueId, ContractProcessor processor) { processorsByBlueId.put(blueId, processor); + version++; if (processor instanceof HandlerProcessor) { @SuppressWarnings("unchecked") HandlerProcessor handler = (HandlerProcessor) processor; diff --git a/src/main/java/blue/language/processor/DocumentProcessingRuntime.java b/src/main/java/blue/language/processor/DocumentProcessingRuntime.java index a92ba82..6706f19 100644 --- a/src/main/java/blue/language/processor/DocumentProcessingRuntime.java +++ b/src/main/java/blue/language/processor/DocumentProcessingRuntime.java @@ -1,13 +1,13 @@ package blue.language.processor; import blue.language.conformance.ConformanceEngine; -import blue.language.conformance.ConformancePlan; import blue.language.model.Node; import blue.language.processor.model.JsonPatch; import blue.language.processor.util.PointerUtils; import blue.language.processor.util.ProcessorPointerConstants; import blue.language.snapshot.FrozenNode; import blue.language.snapshot.ResolvedSnapshot; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; @@ -22,8 +22,18 @@ public final class DocumentProcessingRuntime { private final GasMeter gasMeter; private final ConformanceEngine conformanceEngine; private final ProcessingSnapshotManager snapshotManager; + private final ProcessingMetricsSink metrics; private ResolvedSnapshot snapshot; private boolean runTerminated; + private long batchPatchCalls; + private long batchPatchEntries; + private long batchPatchPlanningNanos; + private long batchPatchConformanceNanos; + private long batchPatchBuildUpdatesNanos; + private long batchPatchCommitNanos; + private long batchPatchRollbackCopies; + private long documentUpdateBeforeNodeMaterializations; + private long documentUpdateAfterNodeMaterializations; public DocumentProcessingRuntime(Node document) { this(document, null, null); @@ -36,16 +46,31 @@ public DocumentProcessingRuntime(Node document, ConformanceEngine conformanceEng public DocumentProcessingRuntime(Node document, ConformanceEngine conformanceEngine, ProcessingSnapshotManager snapshotManager) { + this(document, conformanceEngine, snapshotManager, null); + } + + public DocumentProcessingRuntime(Node document, + ConformanceEngine conformanceEngine, + ProcessingSnapshotManager snapshotManager, + ProcessingMetricsSink metrics) { this.materializedView = new MaterializedDocumentView(Objects.requireNonNull(document, "document")); this.emissionRegistry = new EmissionRegistry(); this.gasMeter = new GasMeter(); this.conformanceEngine = conformanceEngine; this.snapshotManager = snapshotManager; + this.metrics = metrics != null ? metrics : ProcessingMetricsSink.NOOP; } public DocumentProcessingRuntime(ResolvedSnapshot snapshot, ConformanceEngine conformanceEngine, ProcessingSnapshotManager snapshotManager) { + this(snapshot, conformanceEngine, snapshotManager, null); + } + + public DocumentProcessingRuntime(ResolvedSnapshot snapshot, + ConformanceEngine conformanceEngine, + ProcessingSnapshotManager snapshotManager, + ProcessingMetricsSink metrics) { Objects.requireNonNull(snapshot, "snapshot"); this.materializedView = new MaterializedDocumentView(snapshot.canonicalRoot()); this.emissionRegistry = new EmissionRegistry(); @@ -53,6 +78,7 @@ public DocumentProcessingRuntime(ResolvedSnapshot snapshot, this.conformanceEngine = conformanceEngine; this.snapshotManager = snapshotManager; this.snapshot = snapshot; + this.metrics = metrics != null ? metrics : ProcessingMetricsSink.NOOP; } public Node document() { @@ -253,56 +279,63 @@ public void directWrite(String path, Node value) { } public DocumentUpdateData applyPatch(String originScopePath, JsonPatch patch) { - Node rollback = materializedView.copyRoot(); + if (patch == null) { + return null; + } + List updates = applyPatches(originScopePath, Collections.singletonList(patch)); + return updates.isEmpty() ? null : updates.get(0); + } + + public List applyPatches(String originScopePath, List patches) { + if (patches == null || patches.isEmpty()) { + return Collections.emptyList(); + } ResolvedSnapshot snapshotRollback = snapshot; - ImmutablePatchPlanner.PatchPlan result; - Node before; + batchPatchCalls++; + batchPatchEntries += patches.size(); try { - PlanningContext planning = planningContext(rollback); - result = planning.canonicalPlanner.plan(originScopePath, patch); - ImmutablePatchPlanner.PatchPlan resolvedPlan = planning.resolvedPlanner.plan(originScopePath, patch); - before = updateBefore(planning.baseSnapshot, result); - ConformancePlan conformancePlan = planConformanceFromPatch(result, resolvedPlan.root()); - if (conformancePlan.generalized()) { - commitGeneralization(conformancePlan); - } else { - SnapshotPatchPlan snapshotPatchPlan = prepareSnapshotPatch(planning.baseSnapshot, patch); - commitSnapshotPatch(snapshotPatchPlan, conformancePlan.root()); + PlanningContext planning = planningContext(materializedView.root()); + BatchPatchTransaction transaction = new BatchPatchTransaction(originScopePath, + patches, + planning, + conformanceEngine, + new UpdateMaterializationMetrics() { + @Override + public void recordBeforeNodeMaterialization() { + documentUpdateBeforeNodeMaterializations++; + metrics.incrementDocumentUpdateBeforeMaterializations(); + } + + @Override + public void recordAfterNodeMaterialization() { + documentUpdateAfterNodeMaterializations++; + metrics.incrementDocumentUpdateAfterMaterializations(); + } + }); + BatchPatchResult result = transaction.apply(); + batchPatchPlanningNanos += result.patchPlanningNanos(); + batchPatchConformanceNanos += result.conformanceNanos(); + batchPatchBuildUpdatesNanos += result.buildUpdatesNanos(); + metrics.addBatchPatchPlanningNanos(result.patchPlanningNanos()); + metrics.addBatchPatchConformanceNanos(result.conformanceNanos()); + metrics.addBatchPatchBuildUpdatesNanos(result.buildUpdatesNanos()); + long commitStart = System.nanoTime(); + try { + commitBatchPatchResult(result); + } finally { + long commitNanos = System.nanoTime() - commitStart; + batchPatchCommitNanos += commitNanos; + metrics.addBatchPatchCommitNanos(commitNanos); + metrics.addSnapshotCommitNanos(commitNanos); } + return result.updates(); } catch (RuntimeException ex) { - materializedView.replaceWith(rollback); snapshot = snapshotRollback; + if (snapshotRollback != null) { + materializedView.replaceWithSnapshot(snapshotRollback); + } throw ex; } - Node after = result.op() == JsonPatch.Op.REMOVE - ? null - : updateAfter(result); - return new DocumentUpdateData(result.path(), - before, - after, - result.op(), - result.originScope(), - result.cascadeScopes()); - } - - private ConformancePlan planConformanceFromPatch(ImmutablePatchPlanner.PatchPlan result, FrozenNode resolvedRoot) { - if (conformanceEngine == null) { - return ConformancePlan.unchanged(result.root(), resolvedRoot); - } - if (isProcessorManagedConformanceBypass(result)) { - return ConformancePlan.unchanged(result.root(), resolvedRoot); - } - FrozenNode originScope = ImmutablePatchPlanner.forFrozen(resolvedRoot).read(result.originScope()); - if (originScope == null || originScope.getType() == null) { - return ConformancePlan.unchanged(result.root(), resolvedRoot); - } - return conformanceEngine.planGeneralization(result.root(), resolvedRoot, result.path()); - } - - private boolean isProcessorManagedConformanceBypass(ImmutablePatchPlanner.PatchPlan result) { - String relativePath = PointerUtils.relativizePointer(result.originScope(), result.path()); - String initialized = ProcessorPointerConstants.RELATIVE_INITIALIZED; - return relativePath.equals(initialized) || relativePath.startsWith(initialized + "/"); } private JsonPatch directWritePatch(String path, Node before, Node value) { @@ -336,26 +369,6 @@ private SnapshotPatchPlan prepareSnapshotPatch(ResolvedSnapshot base, JsonPatch } } - private Node updateBefore(ResolvedSnapshot base, ImmutablePatchPlanner.PatchPlan plan) { - if (base != null) { - FrozenNode before = ImmutablePatchPlanner.readBefore(base, plan.path(), true); - if (before != null) { - return before.toNode(); - } - } - return plan.beforeNode(); - } - - private Node updateAfter(ImmutablePatchPlanner.PatchPlan plan) { - if (snapshot != null) { - FrozenNode after = ImmutablePatchPlanner.readAfter(snapshot, plan.path(), true); - if (after != null) { - return after.toNode(); - } - } - return materializedView.nodeAt(plan.path()); - } - private void commitSnapshotPatch(SnapshotPatchPlan plan, FrozenNode fallbackRoot) { if (snapshotManager == null || plan == null) { materializedView.replaceWith(fallbackRoot.toNode()); @@ -370,35 +383,73 @@ private void commitSnapshotPatch(SnapshotPatchPlan plan, FrozenNode fallbackRoot } } - private void commitGeneralization(ConformancePlan plan) { - if (snapshotManager != null - && plan.fullSnapshotRebuildAvoidable() - && plan.canonicalRoot() != null) { - snapshot = snapshotManager.cacheSnapshot(new ResolvedSnapshot(plan.canonicalRoot(), - plan.root(), - plan.canonicalRoot().blueId())); - materializedView.replaceWithSnapshot(snapshot); + private void commitBatchPatchResult(BatchPatchResult result) { + if (snapshotManager == null) { + Node next = result.resolvedRoot().toNode(); + materializedView.replaceWith(next); + snapshot = null; return; } - commitGeneralizedRoot(plan.root()); + ResolvedSnapshot next = new ResolvedSnapshot(result.canonicalRoot(), + result.resolvedRoot(), + result.canonicalRoot().blueId()); + ResolvedSnapshot cached = snapshotManager.cacheSnapshot(next); + materializedView.replaceWithSnapshot(cached); + snapshot = cached; } - private void commitGeneralizedRoot(FrozenNode generalizedRoot) { - if (snapshotManager == null) { - materializedView.replaceWith(generalizedRoot.toNode()); - return; - } - snapshot = snapshotManager.fromDocument(generalizedRoot.toNode()); - materializedView.replaceWithSnapshot(snapshot); + long batchPatchCallsForTest() { + return batchPatchCalls; + } + + long batchPatchEntriesForTest() { + return batchPatchEntries; + } + + long batchPatchPlanningNanosForTest() { + return batchPatchPlanningNanos; + } + + long batchPatchConformanceNanosForTest() { + return batchPatchConformanceNanos; + } + + long batchPatchBuildUpdatesNanosForTest() { + return batchPatchBuildUpdatesNanos; + } + + long batchPatchCommitNanosForTest() { + return batchPatchCommitNanos; + } + + long batchPatchRollbackCopiesForTest() { + return batchPatchRollbackCopies; + } + + long documentUpdateBeforeNodeMaterializationsForTest() { + return documentUpdateBeforeNodeMaterializations; + } + + long documentUpdateAfterNodeMaterializationsForTest() { + return documentUpdateAfterNodeMaterializations; + } + + interface UpdateMaterializationMetrics { + void recordBeforeNodeMaterialization(); + + void recordAfterNodeMaterialization(); } static final class DocumentUpdateData { private final String path; - private final Node before; - private final Node after; + private final FrozenNode beforeFrozen; + private final FrozenNode afterFrozen; + private Node before; + private Node after; private final JsonPatch.Op op; private final String originScope; private final List cascadeScopes; + private final UpdateMaterializationMetrics materializationMetrics; DocumentUpdateData(String path, Node before, @@ -407,11 +458,30 @@ static final class DocumentUpdateData { String originScope, List cascadeScopes) { this.path = path; + this.beforeFrozen = null; + this.afterFrozen = null; this.before = before; this.after = after; this.op = op; this.originScope = originScope; this.cascadeScopes = cascadeScopes; + this.materializationMetrics = null; + } + + DocumentUpdateData(String path, + FrozenNode beforeFrozen, + FrozenNode afterFrozen, + JsonPatch.Op op, + String originScope, + List cascadeScopes, + UpdateMaterializationMetrics materializationMetrics) { + this.path = path; + this.beforeFrozen = beforeFrozen; + this.afterFrozen = afterFrozen; + this.op = op; + this.originScope = originScope; + this.cascadeScopes = cascadeScopes; + this.materializationMetrics = materializationMetrics; } String path() { @@ -419,10 +489,25 @@ String path() { } Node before() { + if (before == null && beforeFrozen != null) { + before = beforeFrozen.toNode(); + if (materializationMetrics != null) { + materializationMetrics.recordBeforeNodeMaterialization(); + } + } return before; } Node after() { + if (op == JsonPatch.Op.REMOVE) { + return null; + } + if (after == null && afterFrozen != null) { + after = afterFrozen.toNode(); + if (materializationMetrics != null) { + materializationMetrics.recordAfterNodeMaterialization(); + } + } return after; } @@ -439,7 +524,7 @@ List cascadeScopes() { } } - private static final class PlanningContext { + static final class PlanningContext { private final ResolvedSnapshot baseSnapshot; private final ImmutablePatchPlanner canonicalPlanner; private final ImmutablePatchPlanner resolvedPlanner; @@ -451,6 +536,18 @@ private PlanningContext(ResolvedSnapshot baseSnapshot, this.canonicalPlanner = canonicalPlanner; this.resolvedPlanner = resolvedPlanner; } + + ResolvedSnapshot baseSnapshot() { + return baseSnapshot; + } + + ImmutablePatchPlanner canonicalPlanner() { + return canonicalPlanner; + } + + ImmutablePatchPlanner resolvedPlanner() { + return resolvedPlanner; + } } private static final class SnapshotPatchPlan { diff --git a/src/main/java/blue/language/processor/DocumentProcessor.java b/src/main/java/blue/language/processor/DocumentProcessor.java index b67ac53..e1acfd2 100644 --- a/src/main/java/blue/language/processor/DocumentProcessor.java +++ b/src/main/java/blue/language/processor/DocumentProcessor.java @@ -24,6 +24,7 @@ public class DocumentProcessor { private final ConformanceEngine conformanceEngine; private final ProcessingSnapshotManager snapshotManager; private final ContractMatchingService matchingService; + private ProcessingMetricsSink metricsSink; public DocumentProcessor() { this(ContractProcessorRegistryBuilder.create().registerDefaults().build()); @@ -63,6 +64,15 @@ public DocumentProcessor(ContractProcessorRegistry registry, ConformanceEngine conformanceEngine, ProcessingSnapshotManager snapshotManager, ContractMatchingService matchingService) { + this(registry, contractTypeResolver, conformanceEngine, snapshotManager, matchingService, null); + } + + public DocumentProcessor(ContractProcessorRegistry registry, + TypeClassResolver contractTypeResolver, + ConformanceEngine conformanceEngine, + ProcessingSnapshotManager snapshotManager, + ContractMatchingService matchingService, + ProcessingMetricsSink metricsSink) { this.contractRegistry = Objects.requireNonNull(registry, "registry"); this.contractTypeResolver = Objects.requireNonNull(contractTypeResolver, "contractTypeResolver"); this.contractConverter = new NodeToObjectConverter(this.contractTypeResolver); @@ -70,6 +80,7 @@ public DocumentProcessor(ContractProcessorRegistry registry, this.conformanceEngine = conformanceEngine; this.snapshotManager = snapshotManager; this.matchingService = Objects.requireNonNull(matchingService, "matchingService"); + this.metricsSink = metricsSink != null ? metricsSink : ProcessingMetricsSink.NOOP; } private DocumentProcessor(Builder builder) { @@ -77,7 +88,8 @@ private DocumentProcessor(Builder builder) { builder.contractTypeResolver, builder.conformanceEngine, builder.snapshotManager, - builder.matchingService); + builder.matchingService, + builder.metricsSink); } public DocumentProcessingResult initializeDocument(Node document) { @@ -152,6 +164,19 @@ ContractMatchingService matchingService() { return matchingService; } + ProcessingMetricsSink metricsSink() { + return metricsSink != null ? metricsSink : ProcessingMetricsSink.NOOP; + } + + public ProcessingMetricsSink processingMetricsSink() { + return metricsSink(); + } + + public DocumentProcessor processingMetricsSink(ProcessingMetricsSink metricsSink) { + this.metricsSink = metricsSink != null ? metricsSink : ProcessingMetricsSink.NOOP; + return this; + } + public Map markersFor(Node scopeNode, String scopePath) { ContractBundle bundle = contractLoader.load(FrozenNode.fromResolvedNode(scopeNode), scopePath); return bundle.markers(); @@ -183,6 +208,7 @@ public static final class Builder { private ConformanceEngine conformanceEngine; private ProcessingSnapshotManager snapshotManager; private ContractMatchingService matchingService = new ContractMatchingService(); + private ProcessingMetricsSink metricsSink = ProcessingMetricsSink.NOOP; public Builder withRegistry(ContractProcessorRegistry registry) { this.contractRegistry = Objects.requireNonNull(registry, "registry"); @@ -236,6 +262,11 @@ public Builder withMatchingService(ContractMatchingService matchingService) { return this; } + public Builder withProcessingMetricsSink(ProcessingMetricsSink metricsSink) { + this.metricsSink = metricsSink != null ? metricsSink : ProcessingMetricsSink.NOOP; + return this; + } + public DocumentProcessor build() { return new DocumentProcessor(this); } diff --git a/src/main/java/blue/language/processor/ImmutablePatchPlanner.java b/src/main/java/blue/language/processor/ImmutablePatchPlanner.java index f47d5d7..16df60e 100644 --- a/src/main/java/blue/language/processor/ImmutablePatchPlanner.java +++ b/src/main/java/blue/language/processor/ImmutablePatchPlanner.java @@ -43,6 +43,10 @@ static ImmutablePatchPlanner forMaterialized(Node root) { return new ImmutablePatchPlanner(FrozenNode.fromResolvedNode(root)); } + FrozenNode root() { + return root; + } + PatchPlan plan(String originScopePath, JsonPatch patch) { Objects.requireNonNull(originScopePath, "originScopePath"); Objects.requireNonNull(patch, "patch"); @@ -169,6 +173,14 @@ FrozenNode root() { return root; } + FrozenNode before() { + return before; + } + + FrozenNode after() { + return after; + } + Node rootNode() { return root.toNode(); } diff --git a/src/main/java/blue/language/processor/ProcessingMetricsSink.java b/src/main/java/blue/language/processor/ProcessingMetricsSink.java new file mode 100644 index 0000000..445f3bc --- /dev/null +++ b/src/main/java/blue/language/processor/ProcessingMetricsSink.java @@ -0,0 +1,123 @@ +package blue.language.processor; + +/** + * Optional metrics hook for document-processing instrumentation. + * + *

Implementations should be cheap and thread-safe. All methods are no-ops + * by default so callers can record fine-grained timings without branching.

+ */ +public interface ProcessingMetricsSink { + ProcessingMetricsSink NOOP = new ProcessingMetricsSink() { + }; + + default void addProcessDocumentNanos(long nanos) { + } + + default void addBlueProcessDocumentNanos(long nanos) { + } + + default void addEventPreprocessNanos(long nanos) { + } + + default void addResultSnapshotAttachNanos(long nanos) { + } + + default void addBlueIdCalculationNanos(long nanos) { + } + + default void addBundleLoadNanos(long nanos) { + } + + default void addBundleLoadCacheKeyBuildNanos(long nanos) { + } + + default void addBundleLoadActualBuildNanos(long nanos) { + } + + default void addBundleLoadReuseNanos(long nanos) { + } + + default void incrementBundleLoadCacheHits() { + } + + default void incrementBundleLoadCacheMisses() { + } + + default void incrementBundlesBuilt() { + } + + default void incrementBundlesReused() { + } + + default void addChannelDiscoveryNanos(long nanos) { + } + + default void addChannelMatchNanos(long nanos) { + } + + default void incrementChannelEvaluations() { + } + + default void addHandlerDiscoveryNanos(long nanos) { + } + + default void addHandlerMatchNanos(long nanos) { + } + + default void incrementHandlerMatchAttempts() { + } + + default void addHandlerExecutionNanos(long nanos) { + } + + default void incrementHandlersExecuted() { + } + + default void addTriggeredEventRoutingNanos(long nanos) { + } + + default void incrementTriggeredEventsRouted() { + } + + default void addCheckpointUpdateNanos(long nanos) { + } + + default void addSnapshotCommitNanos(long nanos) { + } + + default void addPostProcessingNanos(long nanos) { + } + + default void addPatchBoundaryNanos(long nanos) { + } + + default void addPatchGasNanos(long nanos) { + } + + default void addDocumentUpdateRoutingNanos(long nanos) { + } + + default void incrementDocumentUpdateEventsBuilt() { + } + + default void incrementDocumentUpdateEventsSkippedNoChannel() { + } + + default void addBatchPatchPlanningNanos(long nanos) { + } + + default void addBatchPatchConformanceNanos(long nanos) { + } + + default void addBatchPatchBuildUpdatesNanos(long nanos) { + } + + default void addBatchPatchCommitNanos(long nanos) { + } + + default void incrementDocumentUpdateBeforeMaterializations() { + } + + default void incrementDocumentUpdateAfterMaterializations() { + } +} diff --git a/src/main/java/blue/language/processor/ProcessorEngine.java b/src/main/java/blue/language/processor/ProcessorEngine.java index f49e7f2..4bf2f2a 100644 --- a/src/main/java/blue/language/processor/ProcessorEngine.java +++ b/src/main/java/blue/language/processor/ProcessorEngine.java @@ -64,38 +64,66 @@ static DocumentProcessingResult initializeDocument(DocumentProcessor owner, Reso static DocumentProcessingResult processDocument(DocumentProcessor owner, Node document, Node event) { Objects.requireNonNull(document, "document"); Objects.requireNonNull(event, "event"); - if (!isInitialized(owner, document)) { - throw new IllegalStateException("Document not initialized"); - } - Node cloned = document.clone(); - Execution execution = new Execution(owner, cloned); + ProcessingMetricsSink metrics = owner.metricsSink(); + long processStart = System.nanoTime(); + long preprocessStart = System.nanoTime(); + Execution execution = null; try { + if (!isInitialized(owner, document)) { + throw new IllegalStateException("Document not initialized"); + } + Node cloned = document.clone(); + execution = new Execution(owner, cloned); + metrics.addEventPreprocessNanos(System.nanoTime() - preprocessStart); + long bundleStart = System.nanoTime(); execution.loadBundles("/"); + metrics.addBundleLoadNanos(System.nanoTime() - bundleStart); execution.processExternalEvent("/", event); } catch (RunTerminationException ignored) { // Processing terminated early; result still returned. } catch (MustUnderstandFailureException ex) { + metrics.addProcessDocumentNanos(System.nanoTime() - processStart); return DocumentProcessingResult.capabilityFailure(document.clone(), ex.getMessage()); } - return execution.result(); + long postStart = System.nanoTime(); + try { + return execution.result(); + } finally { + metrics.addPostProcessingNanos(System.nanoTime() - postStart); + metrics.addProcessDocumentNanos(System.nanoTime() - processStart); + } } static DocumentProcessingResult processDocument(DocumentProcessor owner, ResolvedSnapshot snapshot, Node event) { Objects.requireNonNull(snapshot, "snapshot"); Objects.requireNonNull(event, "event"); - if (!isInitialized(owner, snapshot)) { - throw new IllegalStateException("Document not initialized"); - } - Execution execution = new Execution(owner, snapshot); + ProcessingMetricsSink metrics = owner.metricsSink(); + long processStart = System.nanoTime(); + long preprocessStart = System.nanoTime(); + Execution execution = null; try { + if (!isInitialized(owner, snapshot)) { + throw new IllegalStateException("Document not initialized"); + } + execution = new Execution(owner, snapshot); + metrics.addEventPreprocessNanos(System.nanoTime() - preprocessStart); + long bundleStart = System.nanoTime(); execution.loadBundles("/"); + metrics.addBundleLoadNanos(System.nanoTime() - bundleStart); execution.processExternalEvent("/", event); } catch (RunTerminationException ignored) { // Processing terminated early; result still returned. } catch (MustUnderstandFailureException ex) { + metrics.addProcessDocumentNanos(System.nanoTime() - processStart); return DocumentProcessingResult.capabilityFailure(snapshot.canonicalRoot(), ex.getMessage()); } - return execution.result(); + long postStart = System.nanoTime(); + try { + return execution.result(); + } finally { + metrics.addPostProcessingNanos(System.nanoTime() - postStart); + metrics.addProcessDocumentNanos(System.nanoTime() - processStart); + } } static boolean isInitialized(DocumentProcessor owner, Node document) { @@ -292,7 +320,10 @@ static final class Execution { Execution(DocumentProcessor owner, Node document) { this.owner = owner; - this.runtime = new DocumentProcessingRuntime(document, owner.conformanceEngine(), owner.snapshotManager()); + this.runtime = new DocumentProcessingRuntime(document, + owner.conformanceEngine(), + owner.snapshotManager(), + owner.metricsSink()); this.checkpointManager = new CheckpointManager(runtime, ProcessorEngine::canonicalSignature); this.terminationService = new TerminationService(runtime); this.channelRunner = new ChannelRunner(owner, this, runtime, checkpointManager); @@ -301,7 +332,10 @@ static final class Execution { Execution(DocumentProcessor owner, ResolvedSnapshot snapshot) { this.owner = owner; - this.runtime = new DocumentProcessingRuntime(snapshot, owner.conformanceEngine(), owner.snapshotManager()); + this.runtime = new DocumentProcessingRuntime(snapshot, + owner.conformanceEngine(), + owner.snapshotManager(), + owner.metricsSink()); this.checkpointManager = new CheckpointManager(runtime, ProcessorEngine::canonicalSignature); this.terminationService = new TerminationService(runtime); this.channelRunner = new ChannelRunner(owner, this, runtime, checkpointManager); @@ -324,7 +358,20 @@ void handlePatch(String scopePath, ContractBundle bundle, JsonPatch patch, boolean allowReservedMutation) { - scopeExecutor.handlePatch(scopePath, bundle, patch, allowReservedMutation); + if (patch == null) { + return; + } + handlePatches(scopePath, + bundle, + Collections.singletonList(patch), + allowReservedMutation); + } + + void handlePatches(String scopePath, + ContractBundle bundle, + List patches, + boolean allowReservedMutation) { + scopeExecutor.handlePatches(scopePath, bundle, patches, allowReservedMutation); } ProcessorExecutionContext createContext(String scopePath, diff --git a/src/main/java/blue/language/processor/ProcessorExecutionContext.java b/src/main/java/blue/language/processor/ProcessorExecutionContext.java index 9552caa..260e937 100644 --- a/src/main/java/blue/language/processor/ProcessorExecutionContext.java +++ b/src/main/java/blue/language/processor/ProcessorExecutionContext.java @@ -4,6 +4,8 @@ import blue.language.processor.model.JsonPatch; import blue.language.snapshot.FrozenNode; +import java.util.Collections; +import java.util.List; import java.util.Objects; /** @@ -42,6 +44,10 @@ public String contractKey() { return contractKey; } + public String scopePath() { + return scopePath; + } + public Node contractNode() { return contractNode != null ? contractNode.toNode() : null; } @@ -55,10 +61,20 @@ public Node event() { } public void applyPatch(JsonPatch patch) { + if (patch == null) { + return; + } + applyPatches(Collections.singletonList(patch)); + } + + public void applyPatches(List patches) { if (!allowTerminatedWork && execution.isScopeInactive(scopePath)) { return; } - execution.handlePatch(scopePath, bundle, patch, allowReservedMutation); + if (patches == null || patches.isEmpty()) { + return; + } + execution.handlePatches(scopePath, bundle, patches, allowReservedMutation); } public void emitEvent(Node emission) { @@ -99,6 +115,20 @@ public Node documentAt(String absolutePointer) { return runtime().nodeAt(absolutePointer); } + public FrozenNode canonicalFrozenAt(String absolutePointer) { + if (absolutePointer == null || absolutePointer.isEmpty()) { + return null; + } + return runtime().canonicalFrozenAt(absolutePointer); + } + + public FrozenNode resolvedFrozenAt(String absolutePointer) { + if (absolutePointer == null || absolutePointer.isEmpty()) { + return null; + } + return runtime().resolvedFrozenAt(absolutePointer); + } + public boolean documentContains(String absolutePointer) { if (absolutePointer == null || absolutePointer.isEmpty()) { return false; diff --git a/src/main/java/blue/language/processor/ScopeExecutor.java b/src/main/java/blue/language/processor/ScopeExecutor.java index 2ccb64e..a798e97 100644 --- a/src/main/java/blue/language/processor/ScopeExecutor.java +++ b/src/main/java/blue/language/processor/ScopeExecutor.java @@ -12,6 +12,7 @@ import blue.language.snapshot.FrozenNode; import blue.language.utils.BlueIdCalculator; +import java.util.Collections; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -66,7 +67,7 @@ void initializeScope(String scopePath, boolean chargeScopeEntry) { preInitSnapshot = (canonicalScopeNode != null ? canonicalScopeNode : scopeNode).toNode(); } - bundle = owner.contractLoader().load(scopeNode, normalizedScope); + bundle = owner.contractLoader().load(scopeNode, normalizedScope, owner.metricsSink()); bundles.put(normalizedScope, bundle); String nextEmbedded = null; @@ -117,7 +118,7 @@ void loadBundles(String scopePath) { } FrozenNode scopeNode = runtime.resolvedFrozenAt(normalizedScope); ContractBundle bundle = scopeNode != null - ? owner.contractLoader().load(scopeNode, normalizedScope) + ? owner.contractLoader().load(scopeNode, normalizedScope, owner.metricsSink()) : ContractBundle.empty(); bundles.put(normalizedScope, bundle); for (String embeddedPointer : bundle.embeddedPaths()) { @@ -136,7 +137,9 @@ void processExternalEvent(String scopePath, Node event) { if (bundle == null) { return; } + long channelDiscoveryStart = System.nanoTime(); List channels = bundle.channelsOfType(ChannelContract.class); + owner.metricsSink().addChannelDiscoveryNanos(System.nanoTime() - channelDiscoveryStart); if (channels.isEmpty()) { finalizeScope(normalizedScope, bundle); return; @@ -157,32 +160,73 @@ void handlePatch(String scopePath, ContractBundle bundle, JsonPatch patch, boolean allowReservedMutation) { + if (patch == null) { + return; + } + handlePatches(scopePath, + bundle, + Collections.singletonList(patch), + allowReservedMutation); + } + + void handlePatches(String scopePath, + ContractBundle bundle, + List patches, + boolean allowReservedMutation) { if (execution.isScopeInactive(scopePath)) { return; } + if (patches == null || patches.isEmpty()) { + return; + } runtime.chargeBoundaryCheck(); try { - validatePatchBoundary(scopePath, bundle, patch); - enforceReservedKeyWriteProtection(scopePath, patch, allowReservedMutation); + long boundaryStart = System.nanoTime(); + for (JsonPatch patch : patches) { + validatePatchBoundary(scopePath, bundle, patch); + enforceReservedKeyWriteProtection(scopePath, patch, allowReservedMutation); + } + owner.metricsSink().addPatchBoundaryNanos(System.nanoTime() - boundaryStart); } catch (ProcessorEngine.BoundaryViolationException ex) { execution.enterFatalTermination(scopePath, bundle, execution.fatalReason(ex, "Boundary violation")); return; } try { - switch (patch.getOp()) { - case ADD: - case REPLACE: - runtime.chargePatchAddOrReplace(patch.getVal()); - break; - case REMOVE: - runtime.chargePatchRemove(); - break; - default: - break; + long gasStart = System.nanoTime(); + for (JsonPatch patch : patches) { + switch (patch.getOp()) { + case ADD: + case REPLACE: + runtime.chargePatchAddOrReplace(patch.getVal()); + break; + case REMOVE: + runtime.chargePatchRemove(); + break; + default: + break; + } } - DocumentProcessingRuntime.DocumentUpdateData data = runtime.applyPatch(scopePath, patch); + owner.metricsSink().addPatchGasNanos(System.nanoTime() - gasStart); + List updates = runtime.applyPatches(scopePath, patches); + long routingStart = System.nanoTime(); + routeDocumentUpdatesAfterBatch(scopePath, bundle, updates); + owner.metricsSink().addDocumentUpdateRoutingNanos(System.nanoTime() - routingStart); + } catch (ProcessorEngine.BoundaryViolationException ex) { + execution.enterFatalTermination(scopePath, bundle, execution.fatalReason(ex, "Boundary violation")); + } catch (IllegalArgumentException | IllegalStateException ex) { + execution.enterFatalTermination(scopePath, bundle, execution.fatalReason(ex, "Runtime fatal")); + } + } + + private void routeDocumentUpdatesAfterBatch(String scopePath, + ContractBundle bundle, + List updates) { + if (updates == null || updates.isEmpty()) { + return; + } + for (DocumentProcessingRuntime.DocumentUpdateData data : updates) { if (data == null) { - return; + continue; } markCutOffChildrenIfNeeded(scopePath, bundle, data); runtime.chargeCascadeRouting(data.cascadeScopes().size()); @@ -194,22 +238,24 @@ void handlePatch(String scopePath, if (execution.isScopeInactive(cascadeScope)) { continue; } - Node updateEvent = ProcessorEngine.createDocumentUpdateEvent(data, cascadeScope); - for (ContractBundle.ChannelBinding channel : targetBundle.channelsOfType(DocumentUpdateChannel.class)) { + List channels = targetBundle.channelsOfType(DocumentUpdateChannel.class); + if (channels.isEmpty()) { + owner.metricsSink().incrementDocumentUpdateEventsSkippedNoChannel(); + continue; + } + for (ContractBundle.ChannelBinding channel : channels) { DocumentUpdateChannel duc = (DocumentUpdateChannel) channel.contract(); if (!ProcessorEngine.matchesDocumentUpdate(cascadeScope, duc.getPath(), data.path())) { continue; } + Node updateEvent = ProcessorEngine.createDocumentUpdateEvent(data, cascadeScope); + owner.metricsSink().incrementDocumentUpdateEventsBuilt(); channelRunner.runHandlers(cascadeScope, targetBundle, channel.key(), updateEvent, false); if (execution.isScopeInactive(cascadeScope)) { break; } } } - } catch (ProcessorEngine.BoundaryViolationException ex) { - execution.enterFatalTermination(scopePath, bundle, execution.fatalReason(ex, "Boundary violation")); - } catch (IllegalArgumentException | IllegalStateException ex) { - execution.enterFatalTermination(scopePath, bundle, execution.fatalReason(ex, "Runtime fatal")); } } @@ -269,7 +315,7 @@ private ContractBundle refreshBundle(String scopePath) { bundles.remove(normalizedScope); return null; } - ContractBundle refreshed = owner.contractLoader().load(scopeNode, normalizedScope); + ContractBundle refreshed = owner.contractLoader().load(scopeNode, normalizedScope, owner.metricsSink()); bundles.put(normalizedScope, refreshed); return refreshed; } @@ -343,32 +389,38 @@ private void bridgeEmbeddedEmissions(String scopePath, ContractBundle bundle) { } private void drainTriggeredQueue(String scopePath, ContractBundle bundle) { - if (execution.isScopeInactive(scopePath)) { - return; - } - ScopeRuntimeContext context = runtime.scope(scopePath); - if (context.triggeredQueue().isEmpty()) { - return; - } - List triggeredChannels = bundle.channelsOfType(TriggeredEventChannel.class); - if (triggeredChannels.isEmpty()) { - context.triggeredQueue().clear(); - return; - } - while (!context.triggeredQueue().isEmpty()) { - Node next = context.triggeredQueue().pollFirst(); - runtime.chargeDrainEvent(); - for (ContractBundle.ChannelBinding channel : triggeredChannels) { - if (execution.isScopeInactive(scopePath)) { - context.triggeredQueue().clear(); - return; - } - channelRunner.runHandlers(scopePath, bundle, channel.key(), next.clone(), false); - if (execution.isScopeInactive(scopePath)) { - context.triggeredQueue().clear(); - return; + long routingStart = System.nanoTime(); + try { + if (execution.isScopeInactive(scopePath)) { + return; + } + ScopeRuntimeContext context = runtime.scope(scopePath); + if (context.triggeredQueue().isEmpty()) { + return; + } + List triggeredChannels = bundle.channelsOfType(TriggeredEventChannel.class); + if (triggeredChannels.isEmpty()) { + context.triggeredQueue().clear(); + return; + } + while (!context.triggeredQueue().isEmpty()) { + Node next = context.triggeredQueue().pollFirst(); + owner.metricsSink().incrementTriggeredEventsRouted(); + runtime.chargeDrainEvent(); + for (ContractBundle.ChannelBinding channel : triggeredChannels) { + if (execution.isScopeInactive(scopePath)) { + context.triggeredQueue().clear(); + return; + } + channelRunner.runHandlers(scopePath, bundle, channel.key(), next.clone(), false); + if (execution.isScopeInactive(scopePath)) { + context.triggeredQueue().clear(); + return; + } } } + } finally { + owner.metricsSink().addTriggeredEventRoutingNanos(System.nanoTime() - routingStart); } } diff --git a/src/main/java/blue/language/snapshot/FrozenNode.java b/src/main/java/blue/language/snapshot/FrozenNode.java index 95e6de6..8de24e4 100644 --- a/src/main/java/blue/language/snapshot/FrozenNode.java +++ b/src/main/java/blue/language/snapshot/FrozenNode.java @@ -263,6 +263,11 @@ public FrozenNode item(int index) { public FrozenNode at(String pointer) { List segments = JsonPointer.split(pointer); + return at(segments); + } + + public FrozenNode at(List pointerSegments) { + List segments = pointerSegments != null ? pointerSegments : Collections.emptyList(); if (segments.isEmpty()) { return this; } diff --git a/src/main/java/blue/language/utils/NodePathAccessor.java b/src/main/java/blue/language/utils/NodePathAccessor.java index bc08627..9ffe627 100644 --- a/src/main/java/blue/language/utils/NodePathAccessor.java +++ b/src/main/java/blue/language/utils/NodePathAccessor.java @@ -29,6 +29,21 @@ public static Object get(Node node, String path, Function linkingPro return getRecursive(node, segments, 0, linkingProvider, resolveFinalLink); } + public static Node getNode(Node node, String path) { + if (path == null || !path.startsWith("/")) { + throw new IllegalArgumentException("Invalid path: " + path); + } + if (path.equals("/")) { + return node; + } + + Node current = node; + for (String segment : JsonPointer.split(path)) { + current = getStructuralNodeForSegment(current, segment); + } + return current; + } + private static Object getRecursive(Node node, List segments, int index, Function linkingProvider, boolean resolveFinalLink) { if (index == segments.size() - 1 && !resolveFinalLink) { // Return the node itself for the last segment if we're not resolving the final link @@ -84,6 +99,42 @@ private static Node getNodeForSegment(Node node, String segment, Function items = node.getItems(); + if (items == null || itemIndex >= items.size()) { + throw new IllegalArgumentException("Invalid item index: " + itemIndex); + } + return items.get(itemIndex); + } + + Map properties = node.getProperties(); + if (properties == null || !properties.containsKey(segment)) { + throw new IllegalArgumentException("Property not found: " + segment); + } + return properties.get(segment); + } + private static Node link(Node node, Function linkingProvider) { Node linked = linkingProvider.apply(node); return linked == null ? node : linked; diff --git a/src/main/java/blue/language/utils/NodePathEditor.java b/src/main/java/blue/language/utils/NodePathEditor.java new file mode 100644 index 0000000..e5168b4 --- /dev/null +++ b/src/main/java/blue/language/utils/NodePathEditor.java @@ -0,0 +1,114 @@ +package blue.language.utils; + +import blue.language.model.Node; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public final class NodePathEditor { + + private NodePathEditor() { + } + + public static Node getOrNull(Node node, String pointer) { + Node current = node; + for (String segment : JsonPointer.split(pointer)) { + if (current == null) { + return null; + } + current = childAtOrNull(current, segment); + } + return current; + } + + public static void put(Node root, String pointer, Node value) { + List segments = JsonPointer.split(pointer); + if (segments.isEmpty()) { + root.replaceWith(value); + return; + } + + Node parent = root; + for (int i = 0; i < segments.size() - 1; i++) { + parent = childAtOrCreate(parent, segments.get(i)); + } + setChild(parent, segments.get(segments.size() - 1), value); + } + + private static Node childAtOrNull(Node node, String segment) { + if ("type".equals(segment)) { + return node.getType(); + } + if ("itemType".equals(segment)) { + return node.getItemType(); + } + if ("keyType".equals(segment)) { + return node.getKeyType(); + } + if ("valueType".equals(segment)) { + return node.getValueType(); + } + if ("blue".equals(segment)) { + return node.getBlue(); + } + if (JsonPointer.isArrayIndexSegment(segment) && node.getItems() != null && !"-".equals(segment)) { + int index = Integer.parseInt(segment); + return index < node.getItems().size() ? node.getItems().get(index) : null; + } + return node.getProperties() != null ? node.getProperties().get(segment) : null; + } + + private static Node childAtOrCreate(Node node, String segment) { + Node child = childAtOrNull(node, segment); + if (child != null) { + return child; + } + child = new Node(); + setChild(node, segment, child); + return child; + } + + private static void setChild(Node node, String segment, Node value) { + if ("type".equals(segment)) { + node.type(value); + return; + } + if ("itemType".equals(segment)) { + node.itemType(value); + return; + } + if ("keyType".equals(segment)) { + node.keyType(value); + return; + } + if ("valueType".equals(segment)) { + node.valueType(value); + return; + } + if ("blue".equals(segment)) { + node.blue(value); + return; + } + if (JsonPointer.isArrayIndexSegment(segment) && !"-".equals(segment)) { + int index = Integer.parseInt(segment); + List items = node.getItems(); + if (items == null) { + items = new ArrayList<>(); + node.items(items); + } + while (items.size() <= index) { + items.add(new Node()); + } + items.set(index, value); + return; + } + Map properties = node.getProperties(); + if (properties == null) { + node.properties(new HashMap<>()); + properties = node.getProperties(); + } + properties.put(segment, value); + } +} diff --git a/src/main/java/blue/language/utils/NodePathSelector.java b/src/main/java/blue/language/utils/NodePathSelector.java new file mode 100644 index 0000000..4efc585 --- /dev/null +++ b/src/main/java/blue/language/utils/NodePathSelector.java @@ -0,0 +1,136 @@ +package blue.language.utils; + +import blue.language.model.Node; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Predicate; + +/** + * Selects concrete JSON Pointer paths from a node using simple path patterns. + * + *

Patterns are JSON Pointer-like paths. {@code *} matches any property key + * or list index at one level. {@code -} matches every list item at one level, + * which is useful for contract masks such as {@code /products/-/ean}.

+ */ +public final class NodePathSelector { + + private NodePathSelector() { + } + + public static List select(Node root, Collection patterns, Predicate predicate) { + if (root == null || patterns == null || patterns.isEmpty()) { + return new ArrayList<>(); + } + if (predicate == null) { + throw new IllegalArgumentException("predicate must not be null"); + } + + Set selected = new LinkedHashSet<>(); + for (String pattern : patterns) { + select(root, JsonPointer.split(pattern), 0, new ArrayList<>(), predicate, selected); + } + return new ArrayList<>(selected); + } + + private static void select(Node current, + List pattern, + int index, + List currentPath, + Predicate predicate, + Set selected) { + if (current == null) { + return; + } + if (index == pattern.size()) { + if (predicate.test(current)) { + selected.add(JsonPointer.toPointer(currentPath)); + } + return; + } + + String segment = pattern.get(index); + if ("*".equals(segment)) { + traverseAllChildren(current, pattern, index, currentPath, predicate, selected); + return; + } + if ("-".equals(segment)) { + traverseListItems(current, pattern, index, currentPath, predicate, selected); + return; + } + + Node child = childAtOrNull(current, segment); + if (child != null) { + currentPath.add(segment); + select(child, pattern, index + 1, currentPath, predicate, selected); + currentPath.remove(currentPath.size() - 1); + } + } + + private static void traverseAllChildren(Node current, + List pattern, + int index, + List currentPath, + Predicate predicate, + Set selected) { + if (current.getItems() != null) { + traverseListItems(current, pattern, index, currentPath, predicate, selected); + } + if (current.getProperties() != null) { + for (Map.Entry entry : current.getProperties().entrySet()) { + currentPath.add(entry.getKey()); + select(entry.getValue(), pattern, index + 1, currentPath, predicate, selected); + currentPath.remove(currentPath.size() - 1); + } + } + } + + private static void traverseListItems(Node current, + List pattern, + int index, + List currentPath, + Predicate predicate, + Set selected) { + if (current.getItems() == null) { + return; + } + for (int i = 0; i < current.getItems().size(); i++) { + currentPath.add(String.valueOf(i)); + select(current.getItems().get(i), pattern, index + 1, currentPath, predicate, selected); + currentPath.remove(currentPath.size() - 1); + } + } + + private static Node childAtOrNull(Node node, String segment) { + if ("type".equals(segment)) { + return node.getType(); + } + if ("itemType".equals(segment)) { + return node.getItemType(); + } + if ("keyType".equals(segment)) { + return node.getKeyType(); + } + if ("valueType".equals(segment)) { + return node.getValueType(); + } + if ("blue".equals(segment)) { + return node.getBlue(); + } + if (node.getItems() != null && isListIndex(segment)) { + int index = Integer.parseInt(segment); + return index < node.getItems().size() ? node.getItems().get(index) : null; + } + return node.getProperties() != null ? node.getProperties().get(segment) : null; + } + + private static boolean isListIndex(String segment) { + return segment != null + && !segment.isEmpty() + && segment.chars().allMatch(Character::isDigit); + } +} diff --git a/src/main/java/blue/language/utils/limits/ExcludedPathLimits.java b/src/main/java/blue/language/utils/limits/ExcludedPathLimits.java new file mode 100644 index 0000000..dc691c9 --- /dev/null +++ b/src/main/java/blue/language/utils/limits/ExcludedPathLimits.java @@ -0,0 +1,91 @@ +package blue.language.utils.limits; + +import blue.language.model.Node; +import blue.language.utils.JsonPointer; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.Stack; +import java.util.stream.Collectors; + +/** + * Prevents merge/extension work at specific JSON Pointer paths. + * + *

This is intentionally contract-agnostic. Callers decide which authored + * subtrees need to be preserved for later runtime processing; the language + * resolver only skips those paths.

+ */ +public class ExcludedPathLimits implements Limits { + private final Set excludedPaths; + private final Stack currentPath = new Stack<>(); + private final Stack enteredPathSegment = new Stack<>(); + + public ExcludedPathLimits(Collection excludedPaths) { + this.excludedPaths = excludedPaths == null + ? new HashSet<>() + : excludedPaths.stream() + .map(JsonPointer::canonicalize) + .collect(Collectors.toSet()); + } + + public static ExcludedPathLimits excluding(Collection excludedPaths) { + return new ExcludedPathLimits(excludedPaths); + } + + @Override + public boolean shouldExtendPathSegment(String pathSegment, Node currentNode) { + return !isExcluded(potentialPath(pathSegment)); + } + + @Override + public boolean shouldMergePathSegment(String pathSegment, Node currentNode) { + return !isExcluded(potentialPath(pathSegment)); + } + + @Override + public void enterPathSegment(String pathSegment, Node currentNode) { + boolean realSegment = pathSegment != null && !pathSegment.isEmpty(); + enteredPathSegment.push(realSegment); + if (realSegment) { + currentPath.push(pathSegment); + } + } + + @Override + public void exitPathSegment() { + if (enteredPathSegment.isEmpty()) { + return; + } + if (enteredPathSegment.pop() && !currentPath.isEmpty()) { + currentPath.pop(); + } + } + + private List potentialPath(String pathSegment) { + List potentialPath = new ArrayList<>(currentPath); + if (pathSegment != null && !pathSegment.isEmpty()) { + potentialPath.add(pathSegment); + } + return potentialPath; + } + + private boolean isExcluded(List path) { + String pointer = JsonPointer.toPointer(path); + for (String excludedPath : excludedPaths) { + if (pointer.equals(excludedPath) || isDescendantOf(pointer, excludedPath)) { + return true; + } + } + return false; + } + + private boolean isDescendantOf(String pointer, String ancestor) { + if ("/".equals(ancestor)) { + return true; + } + return pointer.startsWith(ancestor + "/"); + } +} diff --git a/src/test/java/blue/language/MaskedResolutionTest.java b/src/test/java/blue/language/MaskedResolutionTest.java new file mode 100644 index 0000000..5b0a6df --- /dev/null +++ b/src/test/java/blue/language/MaskedResolutionTest.java @@ -0,0 +1,274 @@ +package blue.language; + +import blue.language.model.Node; +import blue.language.provider.BasicNodeProvider; +import blue.language.utils.limits.PathLimits; +import org.junit.jupiter.api.Test; + +import java.math.BigInteger; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static blue.language.utils.Properties.INTEGER_TYPE_BLUE_ID; +import static blue.language.utils.Properties.LIST_TYPE_BLUE_ID; +import static blue.language.utils.Properties.TEXT_TYPE_BLUE_ID; +import static blue.language.utils.UncheckedObjectMapper.YAML_MAPPER; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class MaskedResolutionTest { + + @Test + void normalResolutionStillRejectsAuthoredScalarWhereTypeRequiresList() { + ContractTypes types = contractTypes(); + Blue blue = new Blue(types.provider); + + Node document = blue.yamlToNode( + "contracts:\n" + + " apply:\n" + + " type:\n" + + " blueId: " + types.maskedContractId + "\n" + + " payload: \"${steps.Prepare.payload}\""); + + assertThrows(IllegalArgumentException.class, () -> blue.resolve(document)); + } + + @Test + void preservedPathKeepsExpressionValueWithoutMergingDeclaredListType() { + ContractTypes types = contractTypes(); + Blue blue = new Blue(types.provider); + + Node document = blue.yamlToNode( + "contracts:\n" + + " apply:\n" + + " type:\n" + + " blueId: " + types.maskedContractId + "\n" + + " payload: \"${steps.Prepare.payload}\""); + + Node resolved = blue.resolvePreservingPaths(document, + Collections.singleton("/contracts/apply/payload")); + Node apply = resolved.getAsNode("/contracts/apply"); + Node payload = apply.getProperties().get("payload"); + + assertEquals("${steps.Prepare.payload}", payload.getValue()); + assertEquals(TEXT_TYPE_BLUE_ID, payload.getType().getBlueId()); + assertNull(payload.getItemType()); + assertNull(payload.getItems()); + assertEquals("inherited", apply.getProperties().get("label").getValue()); + } + + @Test + void preservedPathsUseJsonPointerEscaping() { + BasicNodeProvider provider = new BasicNodeProvider(); + provider.addSingleDocs( + "name: Escaped Contract\n" + + "\"a/b\":\n" + + " type:\n" + + " blueId: " + LIST_TYPE_BLUE_ID + "\n" + + "regular: inherited"); + String typeId = provider.getBlueIdByName("Escaped Contract"); + Blue blue = new Blue(provider); + + Node document = blue.yamlToNode( + "type:\n" + + " blueId: " + typeId + "\n" + + "\"a/b\": \"${deferred.list}\""); + + assertThrows(IllegalArgumentException.class, () -> blue.resolve(document.clone())); + + Node resolved = blue.resolvePreservingPaths(document, Collections.singleton("/a~1b")); + + assertEquals("${deferred.list}", resolved.getProperties().get("a/b").getValue()); + assertEquals("inherited", resolved.getProperties().get("regular").getValue()); + } + + @Test + void preservedResolutionCanCombineWithNormalPathLimits() { + ContractTypes types = contractTypes(); + Blue blue = new Blue(types.provider); + + Node document = node( + "contracts:\n" + + " apply:\n" + + " type:\n" + + " blueId: " + types.maskedContractId + "\n" + + " payload: \"${steps.Prepare.payload}\"\n" + + " untouched:\n" + + " type:\n" + + " blueId: " + types.maskedContractId + "\n" + + " payload:\n" + + " - amount: 1\n" + + " memo: ok"); + + Node resolved = blue.resolvePreservingPaths( + document, + PathLimits.withSinglePath("/contracts/apply"), + Collections.singleton("/contracts/apply/payload")); + + Node apply = resolved.getAsNode("/contracts/apply"); + assertEquals("${steps.Prepare.payload}", apply.getProperties().get("payload").getValue()); + assertFalse(resolved.getAsNode("/contracts").getProperties().containsKey("untouched")); + } + + @Test + void matchingPathPatternsPreserveOnlyExpressionLeavesInsideAList() { + ProductTypes types = productTypes(); + Blue blue = new Blue(types.provider); + List patterns = Arrays.asList("/products", "/products/-/ean"); + + Node document = blue.yamlToNode( + "type:\n" + + " blueId: " + types.inventoryId + "\n" + + "products:\n" + + " - name: product 1\n" + + " ean: \"${event.ean}\""); + + assertEquals(Collections.singletonList("/products/0/ean"), + blue.selectPaths(document, patterns, this::isExpressionText)); + + Node resolved = blue.resolvePreservingMatchingPaths(document, patterns, this::isExpressionText); + Node products = resolved.getProperties().get("products"); + Node product = products.getItems().get(0); + Node ean = product.getProperties().get("ean"); + + assertEquals(LIST_TYPE_BLUE_ID, products.getType().getBlueId()); + assertEquals(types.productId, products.getItemType().getBlueId()); + assertEquals(types.productId, product.getType().getBlueId()); + assertEquals("${event.ean}", ean.getValue()); + assertEquals(TEXT_TYPE_BLUE_ID, ean.getType().getBlueId()); + } + + @Test + void matchingPathPatternsKeepLiteralListFullyValidatedWhenNoNodesMatchPredicate() { + ProductTypes types = productTypes(); + Blue blue = new Blue(types.provider); + List patterns = Arrays.asList("/products", "/products/-/ean"); + + Node document = blue.yamlToNode( + "type:\n" + + " blueId: " + types.inventoryId + "\n" + + "products:\n" + + " - name: product 1\n" + + " ean: 1234"); + + assertTrue(blue.selectPaths(document, patterns, this::isExpressionText).isEmpty()); + + Node resolved = blue.resolvePreservingMatchingPaths(document, patterns, this::isExpressionText); + Node product = resolved.getAsNode("/products").getItems().get(0); + Node ean = product.getProperties().get("ean"); + + assertEquals(types.productId, product.getType().getBlueId()); + assertEquals(INTEGER_TYPE_BLUE_ID, ean.getType().getBlueId()); + assertEquals(new BigInteger("1234"), ean.getValue()); + } + + @Test + void matchingPathPatternsDoNotPreserveInvalidNonExpressionLeaf() { + ProductTypes types = productTypes(); + Blue blue = new Blue(types.provider); + List patterns = Arrays.asList("/products", "/products/-/ean"); + + Node document = blue.yamlToNode( + "type:\n" + + " blueId: " + types.inventoryId + "\n" + + "products:\n" + + " - name: product 1\n" + + " ean: not-a-number"); + + assertTrue(blue.selectPaths(document, patterns, this::isExpressionText).isEmpty()); + assertThrows(IllegalArgumentException.class, + () -> blue.resolvePreservingMatchingPaths(document, patterns, this::isExpressionText)); + } + + private Node node(String yaml) { + return YAML_MAPPER.readValue(yaml, Node.class); + } + + private boolean isExpressionText(Node node) { + Object value = node.getRawValue(); + if (!(value instanceof String)) { + return false; + } + String text = ((String) value).trim(); + return text.startsWith("${") && text.endsWith("}") && text.length() > 3; + } + + private ContractTypes contractTypes() { + BasicNodeProvider provider = new BasicNodeProvider(); + + provider.addSingleDocs( + "name: Patch Entry\n" + + "amount:\n" + + " type:\n" + + " blueId: " + INTEGER_TYPE_BLUE_ID + "\n" + + "memo:\n" + + " type: Text"); + + String patchEntryId = provider.getBlueIdByName("Patch Entry"); + + provider.addSingleDocs( + "name: Masked Contract\n" + + "payload:\n" + + " type:\n" + + " blueId: " + LIST_TYPE_BLUE_ID + "\n" + + " itemType:\n" + + " blueId: " + patchEntryId + "\n" + + "label: inherited"); + + return new ContractTypes( + provider, + provider.getBlueIdByName("Masked Contract")); + } + + private ProductTypes productTypes() { + BasicNodeProvider provider = new BasicNodeProvider(); + + provider.addSingleDocs( + "name: Product\n" + + "ean:\n" + + " type:\n" + + " blueId: " + INTEGER_TYPE_BLUE_ID); + + String productId = provider.getBlueIdByName("Product"); + + provider.addSingleDocs( + "name: Product Inventory\n" + + "products:\n" + + " type:\n" + + " blueId: " + LIST_TYPE_BLUE_ID + "\n" + + " itemType:\n" + + " blueId: " + productId + "\n" + + "status: open"); + + return new ProductTypes( + provider, + productId, + provider.getBlueIdByName("Product Inventory")); + } + + private static final class ContractTypes { + private final BasicNodeProvider provider; + private final String maskedContractId; + + private ContractTypes(BasicNodeProvider provider, String maskedContractId) { + this.provider = provider; + this.maskedContractId = maskedContractId; + } + } + + private static final class ProductTypes { + private final BasicNodeProvider provider; + private final String productId; + private final String inventoryId; + + private ProductTypes(BasicNodeProvider provider, String productId, String inventoryId) { + this.provider = provider; + this.productId = productId; + this.inventoryId = inventoryId; + } + } +} diff --git a/src/test/java/blue/language/processor/ContractBundleCacheTest.java b/src/test/java/blue/language/processor/ContractBundleCacheTest.java new file mode 100644 index 0000000..1658121 --- /dev/null +++ b/src/test/java/blue/language/processor/ContractBundleCacheTest.java @@ -0,0 +1,175 @@ +package blue.language.processor; + +import blue.language.Blue; +import blue.language.model.Node; +import blue.language.processor.contracts.IncrementPropertyContractProcessor; +import blue.language.processor.contracts.SetPropertyContractProcessor; +import blue.language.processor.contracts.TestEventChannelProcessor; +import blue.language.processor.model.TestEvent; +import org.junit.jupiter.api.Test; + +import java.math.BigInteger; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class ContractBundleCacheTest { + + @Test + void processingStateChangesReuseBundleAndRefreshCheckpointMarkers() { + RecordingMetrics metrics = new RecordingMetrics(); + Blue blue = configuredBlue(metrics); + Node initialized = blue.initializeDocument(blue.yamlToNode( + "count: 0\n" + + "contracts:\n" + + " testChannel:\n" + + " type:\n" + + " blueId: TestEventChannel\n" + + " increment:\n" + + " type:\n" + + " blueId: IncrementProperty\n" + + " channel: testChannel\n" + + " propertyKey: /count\n")).document(); + + DocumentProcessingResult first = blue.processDocument(initialized, event(blue, "evt-1")); + long missesAfterFirst = metrics.bundleLoadCacheMisses; + DocumentProcessingResult second = blue.processDocument(first.document(), event(blue, "evt-2")); + long hitsAfterSecond = metrics.bundleLoadCacheHits; + DocumentProcessingResult duplicate = blue.processDocument(second.document(), event(blue, "evt-2")); + + assertEquals(new BigInteger("2"), duplicate.document().get("/count")); + assertTrue(metrics.bundleLoadCacheMisses > missesAfterFirst, + "first checkpoint-bearing run should build the checkpoint-shaped bundle once"); + assertTrue(hitsAfterSecond > 0, "second run should reuse at least one cached bundle"); + assertTrue(metrics.bundlesReused > 0, "bundle reuse metric should be incremented"); + } + + @Test + void changingContractsInvalidatesBundleCache() { + RecordingMetrics metrics = new RecordingMetrics(); + Blue blue = configuredBlue(metrics); + Node initialized = blue.initializeDocument(blue.yamlToNode( + "orders: {}\n" + + "contracts:\n" + + " testChannel:\n" + + " type:\n" + + " blueId: TestEventChannel\n" + + " set:\n" + + " type:\n" + + " blueId: SetProperty\n" + + " channel: testChannel\n" + + " path: /orders\n" + + " propertyKey: count\n" + + " propertyValue: 1\n")).document(); + + DocumentProcessingResult first = blue.processDocument(initialized, event(blue, "evt-1")); + long missesBeforeContractChange = metrics.bundleLoadCacheMisses; + Node changedContracts = first.document().clone(); + changedContracts.getAsNode("/contracts/set") + .properties("propertyValue", new Node().value(2)); + DocumentProcessingResult second = blue.processDocument(changedContracts, event(blue, "evt-2")); + + assertEquals(new BigInteger("2"), second.document().get("/orders/count")); + assertTrue(metrics.bundleLoadCacheMisses > missesBeforeContractChange, + "changing /contracts should force a new bundle build"); + } + + @Test + void changingChannelBindingsInvalidatesBundleCacheKey() { + RecordingMetrics metrics = new RecordingMetrics(); + Blue blue = configuredBlue(metrics); + Node initialized = blue.initializeDocument(blue.yamlToNode( + "orders: {}\n" + + "channelBindings:\n" + + " owner:\n" + + " timelineId: one\n" + + "contracts:\n" + + " testChannel:\n" + + " type:\n" + + " blueId: TestEventChannel\n" + + " set:\n" + + " type:\n" + + " blueId: SetProperty\n" + + " channel: testChannel\n" + + " path: /orders\n" + + " propertyKey: count\n" + + " propertyValue: 1\n")).document(); + + DocumentProcessingResult first = blue.processDocument(initialized, event(blue, "evt-1")); + long missesBeforeBindingChange = metrics.bundleLoadCacheMisses; + Node changedBindings = first.document().clone(); + changedBindings.getAsNode("/channelBindings/owner") + .properties("timelineId", new Node().value("two")); + blue.processDocument(changedBindings, event(blue, "evt-2")); + + assertTrue(metrics.bundleLoadCacheMisses > missesBeforeBindingChange, + "changing /channelBindings should force a new bundle build"); + } + + @Test + void embeddedScopesCacheIndependently() { + RecordingMetrics metrics = new RecordingMetrics(); + Blue blue = configuredBlue(metrics); + Node initialized = blue.initializeDocument(blue.yamlToNode( + "child:\n" + + " count: 0\n" + + " contracts:\n" + + " testChannel:\n" + + " type:\n" + + " blueId: TestEventChannel\n" + + " increment:\n" + + " type:\n" + + " blueId: IncrementProperty\n" + + " channel: testChannel\n" + + " propertyKey: /count\n" + + "contracts:\n" + + " embedded:\n" + + " type:\n" + + " blueId: ProcessEmbedded\n" + + " paths:\n" + + " - /child\n")).document(); + + DocumentProcessingResult first = blue.processDocument(initialized, event(blue, "evt-1")); + long hitsBeforeSecond = metrics.bundleLoadCacheHits; + DocumentProcessingResult second = blue.processDocument(first.document(), event(blue, "evt-2")); + + assertEquals(new BigInteger("2"), second.document().get("/child/count")); + assertTrue(metrics.bundleLoadCacheHits - hitsBeforeSecond >= 2, + "root and embedded child scopes should be independently reusable"); + } + + private Blue configuredBlue(RecordingMetrics metrics) { + DocumentProcessor processor = DocumentProcessor.builder() + .withProcessingMetricsSink(metrics) + .registerContractProcessor(new TestEventChannelProcessor()) + .registerContractProcessor(new IncrementPropertyContractProcessor()) + .registerContractProcessor(new SetPropertyContractProcessor()) + .build(); + return new Blue().documentProcessor(processor); + } + + private Node event(Blue blue, String eventId) { + return blue.objectToNode(new TestEvent().eventId(eventId)); + } + + private static final class RecordingMetrics implements ProcessingMetricsSink { + long bundleLoadCacheHits; + long bundleLoadCacheMisses; + long bundlesReused; + + @Override + public void incrementBundleLoadCacheHits() { + bundleLoadCacheHits++; + } + + @Override + public void incrementBundleLoadCacheMisses() { + bundleLoadCacheMisses++; + } + + @Override + public void incrementBundlesReused() { + bundlesReused++; + } + } +} diff --git a/src/test/java/blue/language/processor/DocumentProcessingRuntimeBatchPatchTest.java b/src/test/java/blue/language/processor/DocumentProcessingRuntimeBatchPatchTest.java new file mode 100644 index 0000000..9c6741f --- /dev/null +++ b/src/test/java/blue/language/processor/DocumentProcessingRuntimeBatchPatchTest.java @@ -0,0 +1,347 @@ +package blue.language.processor; + +import blue.language.Blue; +import blue.language.model.Node; +import blue.language.processor.model.JsonPatch; +import blue.language.provider.BasicNodeProvider; +import blue.language.snapshot.CanonicalPatchResult; +import blue.language.snapshot.FrozenNode; +import blue.language.snapshot.ResolvedSnapshot; +import org.junit.jupiter.api.Test; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static blue.language.utils.UncheckedObjectMapper.YAML_MAPPER; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class DocumentProcessingRuntimeBatchPatchTest { + + @Test + void applyPatchesAppliesMultipleObjectPatchesAndCommitsOnce() { + Node document = new Node(); + CountingSnapshotManager manager = new CountingSnapshotManager(); + DocumentProcessingRuntime runtime = new DocumentProcessingRuntime(document, null, manager); + List patches = Arrays.asList( + JsonPatch.add("/a", new Node().value("one")), + JsonPatch.add("/b", new Node().value("two")), + JsonPatch.replace("/a", new Node().value("three")) + ); + + List updates = runtime.applyPatches("/", patches); + + assertEquals(3, updates.size()); + assertEquals("three", document.getAsText("/a")); + assertEquals("two", document.getAsText("/b")); + assertEquals("one", updates.get(0).after().getValue()); + assertEquals("two", updates.get(1).after().getValue()); + assertEquals("three", updates.get(2).after().getValue()); + assertEquals(1, manager.fromDocumentCalls); + assertEquals(0, manager.applyPatchCalls); + assertEquals(1, manager.cacheSnapshotCalls); + assertEquals(1, runtime.batchPatchCallsForTest()); + assertEquals(3, runtime.batchPatchEntriesForTest()); + assertEquals(0, runtime.batchPatchRollbackCopiesForTest()); + } + + @Test + void duplicatePatchPathsPreserveUpdateOrder() { + Node document = new Node().properties("status", new Node().value("idle")); + DocumentProcessingRuntime runtime = new DocumentProcessingRuntime(document); + + List updates = runtime.applyPatches("/", Arrays.asList( + JsonPatch.replace("/status", new Node().value("first")), + JsonPatch.replace("/status", new Node().value("second")) + )); + + assertEquals("second", document.getAsText("/status")); + assertEquals("idle", updates.get(0).before().getValue()); + assertEquals("first", updates.get(0).after().getValue()); + assertEquals("first", updates.get(1).before().getValue()); + assertEquals("second", updates.get(1).after().getValue()); + } + + @Test + void batchRollsBackWhenLaterPatchFails() { + Node document = new Node().properties("status", new Node().value("idle")); + DocumentProcessingRuntime runtime = new DocumentProcessingRuntime(document); + + assertThrows(IllegalStateException.class, () -> runtime.applyPatches("/", Arrays.asList( + JsonPatch.replace("/status", new Node().value("active")), + JsonPatch.remove("/missing") + ))); + + assertEquals("idle", document.getAsText("/status")); + assertNull(document.getProperties().get("missing")); + assertEquals(0, runtime.batchPatchRollbackCopiesForTest()); + } + + @Test + void batchFailureDuringCommitLeavesDocumentUnchanged() { + Node document = new Node().properties("status", new Node().value("idle")); + CountingSnapshotManager manager = new CountingSnapshotManager(); + manager.failCacheSnapshot = true; + DocumentProcessingRuntime runtime = new DocumentProcessingRuntime(document, null, manager); + + assertThrows(IllegalStateException.class, () -> runtime.applyPatches("/", Collections.singletonList( + JsonPatch.replace("/status", new Node().value("active")) + ))); + + assertEquals("idle", document.getAsText("/status")); + assertEquals(1, manager.fromDocumentCalls); + assertEquals(1, manager.cacheSnapshotCalls); + assertEquals(0, runtime.batchPatchRollbackCopiesForTest()); + } + + @Test + void batchArrayPatchesMatchSequentialArrayPatches() { + Node batchDoc = arrayDocument("values", 1, 2, 3); + Node sequentialDoc = arrayDocument("values", 1, 2, 3); + List patches = Arrays.asList( + JsonPatch.add("/values/1", new Node().value(99)), + JsonPatch.replace("/values/2", new Node().value(100)), + JsonPatch.remove("/values/0") + ); + + new DocumentProcessingRuntime(batchDoc).applyPatches("/", patches); + DocumentProcessingRuntime sequential = new DocumentProcessingRuntime(sequentialDoc); + for (JsonPatch patch : patches) { + sequential.applyPatch("/", patch); + } + + assertEquals(Arrays.asList(99, 100, 3), integerValues(batchDoc, "/values")); + assertEquals(integerValues(sequentialDoc, "/values"), integerValues(batchDoc, "/values")); + } + + @Test + void applyPatchDelegatesToApplyPatchesSemantics() { + Node one = new Node(); + Node two = new Node(); + + new DocumentProcessingRuntime(one).applyPatch("/", JsonPatch.add("/x", new Node().value(1))); + new DocumentProcessingRuntime(two).applyPatches("/", Collections.singletonList( + JsonPatch.add("/x", new Node().value(1)) + )); + + assertEquals(one.getAsInteger("/x"), two.getAsInteger("/x")); + } + + @Test + void addRemoveAndRemoveAddSamePathPreserveOrderedUpdates() { + Node document = new Node().properties("temp", new Node().value("old")); + DocumentProcessingRuntime runtime = new DocumentProcessingRuntime(document); + + List updates = runtime.applyPatches("/", Arrays.asList( + JsonPatch.remove("/temp"), + JsonPatch.add("/temp", new Node().value("new")), + JsonPatch.add("/scratch", new Node().value("value")), + JsonPatch.remove("/scratch") + )); + + assertEquals("new", document.getAsText("/temp")); + assertEquals("old", updates.get(0).before().getValue()); + assertNull(updates.get(0).after()); + assertNull(updates.get(1).before()); + assertEquals("new", updates.get(1).after().getValue()); + assertNull(updates.get(2).before()); + assertEquals("value", updates.get(2).after().getValue()); + assertEquals("value", updates.get(3).before().getValue()); + assertNull(updates.get(3).after()); + assertThrows(IllegalArgumentException.class, () -> document.getAsNode("/scratch")); + } + + @Test + void updateDataMaterializesBeforeAndAfterLazily() { + Node document = new Node().properties("status", new Node().value("idle")); + DocumentProcessingRuntime runtime = new DocumentProcessingRuntime(document); + + List updates = runtime.applyPatches("/", Collections.singletonList( + JsonPatch.replace("/status", new Node().value("active")) + )); + + assertEquals(0, runtime.documentUpdateBeforeNodeMaterializationsForTest()); + assertEquals(0, runtime.documentUpdateAfterNodeMaterializationsForTest()); + + assertEquals("idle", updates.get(0).before().getValue()); + assertEquals("active", updates.get(0).after().getValue()); + assertEquals("idle", updates.get(0).before().getValue()); + assertEquals("active", updates.get(0).after().getValue()); + + assertEquals(1, runtime.documentUpdateBeforeNodeMaterializationsForTest()); + assertEquals(1, runtime.documentUpdateAfterNodeMaterializationsForTest()); + } + + @Test + void inheritedParentThenChildPatchDoesNotMinimizeMidBatch() { + BasicNodeProvider provider = new BasicNodeProvider(); + provider.addSingleDocs( + "name: Has Inherited List\n" + + "a:\n" + + " - inherited"); + Blue blue = new Blue(provider); + Node canonical = YAML_MAPPER.readValue( + "name: Instance\n" + + "type:\n" + + " blueId: " + provider.getBlueIdByName("Has Inherited List") + "\n", Node.class); + ResolvedSnapshot snapshot = blue.resolveToSnapshot(canonical); + DocumentProcessingRuntime runtime = new DocumentProcessingRuntime(snapshot, null, new PassthroughSnapshotManager()); + + Node inheritedList = new Node().items(Collections.singletonList(new Node().value("inherited"))); + runtime.applyPatches("/", Arrays.asList( + JsonPatch.replace("/a", inheritedList), + JsonPatch.add("/a/-", new Node().value("custom")) + )); + + assertEquals("inherited", runtime.snapshot().canonicalRoot().getAsText("/a/0")); + assertEquals("custom", runtime.snapshot().canonicalRoot().getAsText("/a/1")); + } + + @Test + void sameInheritedPathCanBeChangedAgainInSameBatch() { + BasicNodeProvider provider = new BasicNodeProvider(); + provider.addSingleDocs( + "name: Has Inherited Status\n" + + "status: idle"); + Blue blue = new Blue(provider); + Node canonical = YAML_MAPPER.readValue( + "name: Instance\n" + + "type:\n" + + " blueId: " + provider.getBlueIdByName("Has Inherited Status") + "\n", Node.class); + ResolvedSnapshot snapshot = blue.resolveToSnapshot(canonical); + DocumentProcessingRuntime runtime = new DocumentProcessingRuntime(snapshot, null, new PassthroughSnapshotManager()); + + runtime.applyPatches("/", Arrays.asList( + JsonPatch.replace("/status", new Node().value("idle")), + JsonPatch.replace("/status", new Node().value("custom")) + )); + + assertEquals("custom", runtime.snapshot().canonicalRoot().getAsText("/status")); + } + + @Test + void escapedPointerKeysWorkInBatch() { + Node document = new Node(); + DocumentProcessingRuntime runtime = new DocumentProcessingRuntime(document); + + runtime.applyPatches("/", Arrays.asList( + JsonPatch.add("/tilde/a~1b", new Node().value("slash")), + JsonPatch.add("/tilde/a~0b", new Node().value("tilde")), + JsonPatch.add("/tilde/~01key", new Node().value("literal")) + )); + + assertEquals("slash", document.getAsText("/tilde/a~1b")); + assertEquals("tilde", document.getAsText("/tilde/a~0b")); + assertEquals("literal", document.getAsText("/tilde/~01key")); + } + + @Test + void batchPatchAvoidsRepeatedSnapshotCommitCost() { + Node document = new Node(); + DocumentProcessingRuntime runtime = new DocumentProcessingRuntime(document); + List patches = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + patches.add(JsonPatch.add("/values/k" + i, new Node().value(i))); + } + + long start = System.nanoTime(); + runtime.applyPatches("/", patches); + long elapsedMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + + assertEquals(100, document.getAsNode("/values").getProperties().size()); + assertTrue(elapsedMs < 1000, "Batch patching should not be catastrophically slow; elapsedMs=" + elapsedMs); + assertEquals(1, runtime.batchPatchCallsForTest()); + assertEquals(100, runtime.batchPatchEntriesForTest()); + assertEquals(0, runtime.batchPatchRollbackCopiesForTest()); + assertTrue(runtime.batchPatchPlanningNanosForTest() > 0); + assertTrue(runtime.batchPatchBuildUpdatesNanosForTest() > 0); + assertTrue(runtime.batchPatchCommitNanosForTest() > 0); + System.out.printf("batchPatchEntries=%d planningMs=%d conformanceMs=%d buildUpdatesMs=%d commitMs=%d beforeAfterMaterializations=%d/%d%n", + runtime.batchPatchEntriesForTest(), + TimeUnit.NANOSECONDS.toMillis(runtime.batchPatchPlanningNanosForTest()), + TimeUnit.NANOSECONDS.toMillis(runtime.batchPatchConformanceNanosForTest()), + TimeUnit.NANOSECONDS.toMillis(runtime.batchPatchBuildUpdatesNanosForTest()), + TimeUnit.NANOSECONDS.toMillis(runtime.batchPatchCommitNanosForTest()), + runtime.documentUpdateBeforeNodeMaterializationsForTest(), + runtime.documentUpdateAfterNodeMaterializationsForTest()); + } + + private List integerValues(Node document, String path) { + List values = new ArrayList<>(); + for (Node item : document.getAsNode(path).getItems()) { + Object value = item.getValue(); + assertTrue(value instanceof BigInteger); + values.add(((BigInteger) value).intValue()); + } + return values; + } + + private Node arrayDocument(String key, Object... entries) { + List items = new ArrayList<>(); + for (Object entry : entries) { + items.add(new Node().value(entry)); + } + Node arrayNode = new Node().items(items); + return new Node().properties(key, arrayNode); + } + + private static final class CountingSnapshotManager implements ProcessingSnapshotManager { + private int fromDocumentCalls; + private int applyPatchCalls; + private int cacheSnapshotCalls; + private boolean failCacheSnapshot; + + @Override + public ResolvedSnapshot fromDocument(Node document) { + fromDocumentCalls++; + FrozenNode root = FrozenNode.fromNode(document.clone()); + return new ResolvedSnapshot(root, FrozenNode.fromResolvedNode(document.clone()), root.blueId()); + } + + @Override + public ResolvedSnapshot applyPatch(ResolvedSnapshot snapshot, JsonPatch patch) { + applyPatchCalls++; + CanonicalPatchResult patched = snapshot.applyCanonicalPatch(patch); + return new ResolvedSnapshot(patched.root(), + FrozenNode.fromResolvedNode(patched.root().toNode()), + patched.blueId()); + } + + @Override + public ResolvedSnapshot cacheSnapshot(ResolvedSnapshot snapshot) { + cacheSnapshotCalls++; + if (failCacheSnapshot) { + throw new IllegalStateException("snapshot cache failed"); + } + return snapshot; + } + } + + private static final class PassthroughSnapshotManager implements ProcessingSnapshotManager { + @Override + public ResolvedSnapshot fromDocument(Node document) { + FrozenNode root = FrozenNode.fromNode(document.clone()); + return new ResolvedSnapshot(root, FrozenNode.fromResolvedNode(document.clone()), root.blueId()); + } + + @Override + public ResolvedSnapshot applyPatch(ResolvedSnapshot snapshot, JsonPatch patch) { + CanonicalPatchResult patched = snapshot.applyCanonicalPatch(patch); + return new ResolvedSnapshot(patched.root(), + FrozenNode.fromResolvedNode(patched.root().toNode()), + patched.blueId()); + } + + @Override + public ResolvedSnapshot cacheSnapshot(ResolvedSnapshot snapshot) { + return snapshot; + } + } +} diff --git a/src/test/java/blue/language/processor/DocumentProcessorBatchPatchTest.java b/src/test/java/blue/language/processor/DocumentProcessorBatchPatchTest.java new file mode 100644 index 0000000..5b407ab --- /dev/null +++ b/src/test/java/blue/language/processor/DocumentProcessorBatchPatchTest.java @@ -0,0 +1,162 @@ +package blue.language.processor; + +import blue.language.Blue; +import blue.language.model.Node; +import blue.language.processor.contracts.ApplyBatchPatchContractProcessor; +import blue.language.processor.contracts.RecordDocumentUpdateContractProcessor; +import blue.language.processor.model.JsonPatch; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class DocumentProcessorBatchPatchTest { + + @Test + void processorExecutionContextApplyPatchesWorksInsideHandler() { + Blue blue = new Blue(); + blue.registerContractProcessor(new ApplyBatchPatchContractProcessor()); + Node original = blue.yamlToNode( + "name: Batch Handler Doc\n" + + "contracts:\n" + + " lifecycle:\n" + + " type:\n" + + " blueId: LifecycleChannel\n" + + " apply:\n" + + " channel: lifecycle\n" + + " type:\n" + + " blueId: ApplyBatchPatch\n"); + + DocumentProcessingResult result = blue.initializeDocument(original); + + assertEquals("one", result.document().getAsText("/a")); + assertEquals("two", result.document().getAsText("/b")); + } + + @Test + void boundaryViolationInSecondPatchRollsBackEarlierPatch() { + Node document = new Node(); + ProcessorEngine.Execution execution = new ProcessorEngine.Execution(new DocumentProcessor(), document); + ContractBundle bundle = ContractBundle.builder().build(); + + execution.handlePatches("/foo", bundle, Arrays.asList( + JsonPatch.add("/foo/a", new Node().value("applied-first")), + JsonPatch.add("/bar", new Node().value("outside")) + ), false); + + Node resultDoc = execution.result().document(); + Node foo = resultDoc.getAsNode("/foo"); + assertFalse(hasProperty(foo, "a")); + assertTrue(execution.runtime().isScopeTerminated("/foo")); + } + + @Test + void reservedKeyViolationInSecondPatchRollsBackEarlierPatch() { + Node document = new Node().properties("foo", new Node()); + ProcessorEngine.Execution execution = new ProcessorEngine.Execution(new DocumentProcessor(), document); + ContractBundle bundle = ContractBundle.builder().build(); + + execution.handlePatches("/foo", bundle, Arrays.asList( + JsonPatch.add("/foo/a", new Node().value("applied-first")), + JsonPatch.add("/foo/contracts/initialized", new Node().value("reserved")) + ), false); + + Node resultDoc = execution.result().document(); + Node foo = resultDoc.getAsNode("/foo"); + assertFalse(hasProperty(foo, "a")); + assertTrue(execution.runtime().isScopeTerminated("/foo")); + } + + @Test + void documentUpdateChannelsReceiveBatchUpdatesInPatchOrder() { + RecordDocumentUpdateContractProcessor recorder = new RecordDocumentUpdateContractProcessor(); + Blue blue = new Blue(); + blue.registerContractProcessor(new ApplyBatchPatchContractProcessor()); + blue.registerContractProcessor(recorder); + Node original = blue.yamlToNode( + "name: Batch Update Doc\n" + + "contracts:\n" + + " lifecycle:\n" + + " type:\n" + + " blueId: LifecycleChannel\n" + + " watchA:\n" + + " type:\n" + + " blueId: DocumentUpdateChannel\n" + + " path: /a\n" + + " watchB:\n" + + " type:\n" + + " blueId: DocumentUpdateChannel\n" + + " path: /b\n" + + " apply:\n" + + " channel: lifecycle\n" + + " type:\n" + + " blueId: ApplyBatchPatch\n" + + " recordA:\n" + + " channel: watchA\n" + + " type:\n" + + " blueId: RecordDocumentUpdate\n" + + " recordB:\n" + + " channel: watchB\n" + + " type:\n" + + " blueId: RecordDocumentUpdate\n"); + + blue.initializeDocument(original); + + assertEquals(Arrays.asList("/a", "/b"), recorder.paths()); + } + + @Test + void unmatchedDocumentUpdateChannelDoesNotMaterializeUpdateNodes() { + Blue blue = new Blue(); + Node document = blue.yamlToNode( + "name: Lazy Update Doc\n" + + "contracts:\n" + + " watchOther:\n" + + " type:\n" + + " blueId: DocumentUpdateChannel\n" + + " path: /other\n"); + ProcessorEngine.Execution execution = new ProcessorEngine.Execution(new DocumentProcessor(), document); + execution.loadBundles("/"); + + execution.handlePatches("/", execution.bundleForScope("/"), Collections.singletonList( + JsonPatch.add("/a", new Node().value("one")) + ), false); + + assertEquals(0, execution.runtime().documentUpdateBeforeNodeMaterializationsForTest()); + assertEquals(0, execution.runtime().documentUpdateAfterNodeMaterializationsForTest()); + } + + @Test + void matchingDocumentUpdateChannelMaterializesUpdateNodes() { + Blue blue = new Blue(); + Node document = blue.yamlToNode( + "name: Lazy Update Doc\n" + + "a: old\n" + + "contracts:\n" + + " watchA:\n" + + " type:\n" + + " blueId: DocumentUpdateChannel\n" + + " path: /a\n"); + ProcessorEngine.Execution execution = new ProcessorEngine.Execution(new DocumentProcessor(), document); + execution.loadBundles("/"); + + execution.handlePatches("/", execution.bundleForScope("/"), Collections.singletonList( + JsonPatch.replace("/a", new Node().value("new")) + ), false); + + assertEquals(1, execution.runtime().documentUpdateBeforeNodeMaterializationsForTest()); + assertEquals(1, execution.runtime().documentUpdateAfterNodeMaterializationsForTest()); + } + + private boolean hasProperty(Node node, String key) { + assertNotNull(node); + Map properties = node.getProperties(); + return properties != null && properties.containsKey(key); + } +} diff --git a/src/test/java/blue/language/processor/DocumentProcessorGeneralizationTest.java b/src/test/java/blue/language/processor/DocumentProcessorGeneralizationTest.java index 9ca088a..9942413 100644 --- a/src/test/java/blue/language/processor/DocumentProcessorGeneralizationTest.java +++ b/src/test/java/blue/language/processor/DocumentProcessorGeneralizationTest.java @@ -1,16 +1,24 @@ package blue.language.processor; import blue.language.Blue; +import blue.language.conformance.ConformanceEngine; import blue.language.conformance.ConformanceEngineTest; import blue.language.model.Node; import blue.language.processor.model.JsonPatch; import blue.language.provider.BasicNodeProvider; +import blue.language.utils.BlueIdCalculator; +import blue.language.utils.Properties; import org.junit.jupiter.api.Test; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + import static blue.language.utils.UncheckedObjectMapper.YAML_MAPPER; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; class DocumentProcessorGeneralizationTest { @@ -68,4 +76,528 @@ void untypedRootPatchesAreNotConformanceEnforced() { assertNotNull(document.getAsNode("/contracts/initialized")); assertEquals("InitializationMarker", document.getAsNode("/contracts/initialized/type").getBlueId()); } + + @Test + void batchPatchGeneralizesChangedNodeAndAncestorOnce() { + BasicNodeProvider nodeProvider = ConformanceEngineTest.priceProvider(); + Blue blue = new Blue(nodeProvider); + Node document = blue.resolve(YAML_MAPPER.readValue( + "name: Shoes\n" + + "type:\n" + + " blueId: " + nodeProvider.getBlueIdByName("European Product") + "\n" + + "price:\n" + + " amount: 150\n" + + " currency: EUR\n" + + "stock: 5", Node.class)); + DocumentProcessingRuntime runtime = new DocumentProcessingRuntime(document, blue.conformanceEngine()); + + List updates = runtime.applyPatches("/", Arrays.asList( + JsonPatch.replace("/price/currency", new Node().value("USD")), + JsonPatch.replace("/stock", new Node().value(6)) + )); + + assertEquals(2, updates.size()); + assertEquals("USD", document.getAsText("/price/currency")); + assertEquals(6, document.getAsInteger("/stock")); + assertEquals("Price", document.getAsNode("/price/type").getName()); + assertEquals("Global Product", document.getType().getName()); + } + + @Test + void nonGeneralizableBatchRollsBackAllPatches() { + BasicNodeProvider nodeProvider = new BasicNodeProvider(); + nodeProvider.addSingleDocs( + "name: Fixed One\n" + + "x: 1"); + Blue blue = new Blue(nodeProvider); + Node document = blue.resolve(YAML_MAPPER.readValue( + "name: Instance\n" + + "type:\n" + + " blueId: " + nodeProvider.getBlueIdByName("Fixed One") + "\n" + + "x: 1\n" + + "y: old", Node.class)); + DocumentProcessingRuntime runtime = new DocumentProcessingRuntime(document, blue.conformanceEngine()); + + assertThrows(IllegalArgumentException.class, () -> runtime.applyPatches("/", Arrays.asList( + JsonPatch.replace("/y", new Node().value("new")), + JsonPatch.replace("/x", new Node().value(2)) + ))); + + assertEquals(1, document.getAsInteger("/x")); + assertEquals("old", document.getAsText("/y")); + assertEquals("Fixed One", document.getType().getName()); + } + + @Test + void processorManagedInitializedMarkerBypassWorksInBatch() { + Blue blue = new Blue(); + Node document = new Node(); + DocumentProcessingRuntime runtime = new DocumentProcessingRuntime(document, blue.conformanceEngine()); + + runtime.applyPatches("/", Arrays.asList( + JsonPatch.add("/contracts/initialized", + new Node().type(new Node().blueId("InitializationMarker"))), + JsonPatch.add("/status", new Node().value("active")) + )); + + assertNotNull(document.getAsNode("/contracts/initialized")); + assertEquals("active", document.getAsText("/status")); + } + + @Test + void batchParentThenChildPatchGeneralizesAndPreservesChildValue() { + BasicNodeProvider nodeProvider = ConformanceEngineTest.priceProvider(); + Blue blue = new Blue(nodeProvider); + Node document = blue.resolve(YAML_MAPPER.readValue( + "name: Shoes\n" + + "type:\n" + + " blueId: " + nodeProvider.getBlueIdByName("European Product") + "\n" + + "price:\n" + + " amount: 150\n" + + " currency: EUR", Node.class)); + DocumentProcessingRuntime runtime = new DocumentProcessingRuntime(document, blue.conformanceEngine()); + + runtime.applyPatches("/", Arrays.asList( + JsonPatch.replace("/price", YAML_MAPPER.readValue( + "amount: 175\n" + + "currency: EUR", Node.class)), + JsonPatch.replace("/price/currency", new Node().value("USD")) + )); + + assertEquals(175, document.getAsInteger("/price/amount")); + assertEquals("USD", document.getAsText("/price/currency")); + assertEquals("Price", document.getAsNode("/price/type").getName()); + assertEquals("Global Product", document.getType().getName()); + } + + @Test + void batchChildThenSiblingPatchGeneralizesOnceAndPreservesBothChanges() { + BasicNodeProvider nodeProvider = ConformanceEngineTest.priceProvider(); + Blue blue = new Blue(nodeProvider); + Node document = blue.resolve(YAML_MAPPER.readValue( + "name: Shoes\n" + + "type:\n" + + " blueId: " + nodeProvider.getBlueIdByName("European Product") + "\n" + + "price:\n" + + " amount: 150\n" + + " currency: EUR", Node.class)); + DocumentProcessingRuntime runtime = new DocumentProcessingRuntime(document, blue.conformanceEngine()); + + runtime.applyPatches("/", Arrays.asList( + JsonPatch.replace("/price/currency", new Node().value("USD")), + JsonPatch.replace("/price/amount", new Node().value(200)) + )); + + assertEquals(200, document.getAsInteger("/price/amount")); + assertEquals("USD", document.getAsText("/price/currency")); + assertEquals("Price", document.getAsNode("/price/type").getName()); + assertEquals("Global Product", document.getType().getName()); + } + + @Test + void batchSiblingPatchesRequiringAncestorGeneralizationPreserveBothChanges() { + BasicNodeProvider nodeProvider = productWithAvailabilityProvider(); + Blue blue = new Blue(nodeProvider); + Node document = blue.resolve(YAML_MAPPER.readValue( + "name: Shoes\n" + + "type:\n" + + " blueId: " + nodeProvider.getBlueIdByName("European Listed Product") + "\n" + + "price:\n" + + " amount: 150\n" + + " currency: EUR\n" + + "availability:\n" + + " region: EU", Node.class)); + DocumentProcessingRuntime runtime = new DocumentProcessingRuntime(document, blue.conformanceEngine()); + + runtime.applyPatches("/", Arrays.asList( + JsonPatch.replace("/price/currency", new Node().value("USD")), + JsonPatch.replace("/availability/region", new Node().value("US")) + )); + + assertEquals("USD", document.getAsText("/price/currency")); + assertEquals("US", document.getAsText("/availability/region")); + assertEquals("Price", document.getAsNode("/price/type").getName()); + assertEquals("Availability", document.getAsNode("/availability/type").getName()); + assertEquals("Global Listed Product", document.getType().getName()); + } + + @Test + void batchDictionaryValueTypePatchesPreserveValuesAndDictionaryType() { + BasicNodeProvider nodeProvider = orderBookProvider(); + Blue blue = new Blue(nodeProvider); + Node document = blue.resolve(YAML_MAPPER.readValue( + "name: Book\n" + + "type:\n" + + " blueId: " + nodeProvider.getBlueIdByName("Open Order Book") + "\n" + + "orders:\n" + + " order-a:\n" + + " status: open\n" + + " order-b:\n" + + " status: open", Node.class)); + DocumentProcessingRuntime runtime = new DocumentProcessingRuntime(document, blue.conformanceEngine()); + + runtime.applyPatches("/", Arrays.asList( + JsonPatch.replace("/orders/order-a/status", new Node().value("closed")), + JsonPatch.replace("/orders/order-b/status", new Node().value("closed")) + )); + + assertEquals("closed", document.getAsText("/orders/order-a/status")); + assertEquals("closed", document.getAsText("/orders/order-b/status")); + assertEquals("Order", document.getAsNode("/orders/valueType").getName()); + } + + @Test + void batchListItemTypePatchesMatchSequentialBehavior() { + BasicNodeProvider nodeProvider = itemListProvider(); + Blue blue = new Blue(nodeProvider); + Node batchDocument = blue.resolve(YAML_MAPPER.readValue( + "name: Batch List\n" + + "type:\n" + + " blueId: " + nodeProvider.getBlueIdByName("Open Item List") + "\n" + + "entries:\n" + + " - status: open\n" + + " - status: open", Node.class)); + Node sequentialDocument = batchDocument.clone(); + List patches = Arrays.asList( + JsonPatch.replace("/entries/0/status", new Node().value("closed")), + JsonPatch.add("/entries/-", YAML_MAPPER.readValue("status: closed", Node.class)), + JsonPatch.remove("/entries/1") + ); + + new DocumentProcessingRuntime(batchDocument, blue.conformanceEngine()).applyPatches("/", patches); + DocumentProcessingRuntime sequential = new DocumentProcessingRuntime(sequentialDocument, blue.conformanceEngine()); + for (JsonPatch patch : patches) { + sequential.applyPatch("/", patch); + } + + assertEquals(sequentialDocument.getAsText("/entries/0/status"), batchDocument.getAsText("/entries/0/status")); + assertEquals(sequentialDocument.getAsText("/entries/1/status"), batchDocument.getAsText("/entries/1/status")); + assertEquals(sequentialDocument.getAsNode("/entries/itemType").getName(), + batchDocument.getAsNode("/entries/itemType").getName()); + } + + @Test + void batchGeneralizesTypedChildUnderUntypedRoot() { + BasicNodeProvider nodeProvider = ConformanceEngineTest.priceProvider(); + Blue blue = new Blue(nodeProvider); + Node batchDocument = blue.resolve(YAML_MAPPER.readValue( + "name: Untyped Container\n" + + "child:\n" + + " type:\n" + + " blueId: " + nodeProvider.getBlueIdByName("Price in EUR") + "\n" + + " amount: 100\n" + + " currency: EUR", Node.class)); + Node sequentialDocument = batchDocument.clone(); + List patches = Arrays.asList( + JsonPatch.replace("/child/currency", new Node().value("USD")) + ); + + new DocumentProcessingRuntime(batchDocument, blue.conformanceEngine()).applyPatches("/", patches); + applySequential(sequentialDocument, blue.conformanceEngine(), patches); + + assertEquivalentDocuments(sequentialDocument, batchDocument, "typed child under untyped root"); + assertEquals("USD", batchDocument.getAsText("/child/currency")); + assertEquals("Price", batchDocument.getAsNode("/child/type").getName()); + } + + @Test + void batchGeneralizesDictionaryValueTypeUnderUntypedRoot() { + BasicNodeProvider nodeProvider = orderBookProvider(); + Blue blue = new Blue(nodeProvider); + Node batchDocument = blue.resolve(YAML_MAPPER.readValue( + "name: Untyped Book\n" + + "orders:\n" + + " type:\n" + + " blueId: " + Properties.DICTIONARY_TYPE_BLUE_ID + "\n" + + " keyType:\n" + + " blueId: " + Properties.TEXT_TYPE_BLUE_ID + "\n" + + " valueType:\n" + + " blueId: " + nodeProvider.getBlueIdByName("Open Order") + "\n" + + " order-a:\n" + + " type:\n" + + " blueId: " + nodeProvider.getBlueIdByName("Open Order") + "\n" + + " status: open\n" + + " order-b:\n" + + " type:\n" + + " blueId: " + nodeProvider.getBlueIdByName("Open Order") + "\n" + + " status: open", Node.class)); + Node sequentialDocument = batchDocument.clone(); + List patches = Arrays.asList( + JsonPatch.replace("/orders/order-a/status", new Node().value("closed")) + ); + + new DocumentProcessingRuntime(batchDocument, blue.conformanceEngine()).applyPatches("/", patches); + applySequential(sequentialDocument, blue.conformanceEngine(), patches); + + assertEquivalentDocuments(sequentialDocument, batchDocument, "dictionary valueType under untyped root"); + assertEquals("closed", batchDocument.getAsText("/orders/order-a/status")); + assertEquals("Order", batchDocument.getAsNode("/orders/valueType").getName()); + } + + @Test + void batchGeneralizesListItemTypeUnderUntypedRoot() { + BasicNodeProvider nodeProvider = itemListProvider(); + Blue blue = new Blue(nodeProvider); + Node batchDocument = blue.resolve(YAML_MAPPER.readValue( + "name: Untyped List\n" + + "entries:\n" + + " type:\n" + + " blueId: " + Properties.LIST_TYPE_BLUE_ID + "\n" + + " itemType:\n" + + " blueId: " + nodeProvider.getBlueIdByName("Open Item") + "\n" + + " items:\n" + + " - type:\n" + + " blueId: " + nodeProvider.getBlueIdByName("Open Item") + "\n" + + " status: open\n" + + " - type:\n" + + " blueId: " + nodeProvider.getBlueIdByName("Open Item") + "\n" + + " status: open", Node.class)); + Node sequentialDocument = batchDocument.clone(); + List patches = Arrays.asList( + JsonPatch.replace("/entries/0/status", new Node().value("closed")) + ); + + new DocumentProcessingRuntime(batchDocument, blue.conformanceEngine()).applyPatches("/", patches); + applySequential(sequentialDocument, blue.conformanceEngine(), patches); + + assertEquivalentDocuments(sequentialDocument, batchDocument, "list itemType under untyped root"); + assertEquals("closed", batchDocument.getAsText("/entries/0/status")); + assertEquals("Item", batchDocument.getAsNode("/entries/itemType").getName()); + } + + @Test + void conformanceAffectedUpdateAfterReflectsCommittedResolvedValue() { + BasicNodeProvider nodeProvider = ConformanceEngineTest.priceProvider(); + Blue blue = new Blue(nodeProvider); + Node document = blue.resolve(YAML_MAPPER.readValue( + "name: Shoes\n" + + "type:\n" + + " blueId: " + nodeProvider.getBlueIdByName("European Product") + "\n" + + "price:\n" + + " amount: 150\n" + + " currency: EUR", Node.class)); + DocumentProcessingRuntime runtime = new DocumentProcessingRuntime(document, blue.conformanceEngine()); + + List updates = runtime.applyPatches("/", Arrays.asList( + JsonPatch.replace("/price", YAML_MAPPER.readValue( + "amount: 150\n" + + "currency: USD", Node.class)) + )); + + assertEquals(1, updates.size()); + assertEquals("USD", updates.get(0).after().getAsText("/currency")); + assertEquals("Price", updates.get(0).after().getType().getName()); + assertEquals("Price", document.getAsNode("/price/type").getName()); + assertEquals("Global Product", document.getType().getName()); + } + + @Test + void batchAndSequentialRuntimeProduceEquivalentDocumentsAcrossPatchLists() { + assertBatchMatchesSequential(new Node().properties("a", new Node().value("one"), + "b", new Node().value("two")), + null, + Arrays.asList( + JsonPatch.replace("/a", new Node().value("three")), + JsonPatch.replace("/b", new Node().value("four"))), + "multiple object replacements"); + + assertBatchMatchesSequential(new Node().properties("status", new Node().value("idle")), + null, + Arrays.asList( + JsonPatch.replace("/status", new Node().value("first")), + JsonPatch.replace("/status", new Node().value("second"))), + "duplicate paths"); + + assertBatchMatchesSequential(new Node(), + null, + Arrays.asList( + JsonPatch.add("/temp", new Node().value("value")), + JsonPatch.remove("/temp")), + "add then remove same path"); + + assertBatchMatchesSequential(new Node().properties("temp", new Node().value("old")), + null, + Arrays.asList( + JsonPatch.remove("/temp"), + JsonPatch.add("/temp", new Node().value("new"))), + "remove then add same path"); + + assertBatchMatchesSequential(listDocument(), + null, + Arrays.asList( + JsonPatch.add("/values/1", new Node().value(99)), + JsonPatch.replace("/values/2", new Node().value(100)), + JsonPatch.remove("/values/0")), + "list add replace remove"); + + BasicNodeProvider priceProvider = ConformanceEngineTest.priceProvider(); + Blue priceBlue = new Blue(priceProvider); + assertBatchMatchesSequential(priceBlue.resolve(YAML_MAPPER.readValue( + "name: Untyped Container\n" + + "child:\n" + + " type:\n" + + " blueId: " + priceProvider.getBlueIdByName("Price in EUR") + "\n" + + " amount: 100\n" + + " currency: EUR", Node.class)), + priceBlue.conformanceEngine(), + Arrays.asList(JsonPatch.replace("/child/currency", new Node().value("USD"))), + "typed child generalization"); + + BasicNodeProvider orderProvider = orderBookProvider(); + Blue orderBlue = new Blue(orderProvider); + assertBatchMatchesSequential(orderBlue.resolve(YAML_MAPPER.readValue( + "name: Untyped Book\n" + + "orders:\n" + + " type:\n" + + " blueId: " + Properties.DICTIONARY_TYPE_BLUE_ID + "\n" + + " keyType:\n" + + " blueId: " + Properties.TEXT_TYPE_BLUE_ID + "\n" + + " valueType:\n" + + " blueId: " + orderProvider.getBlueIdByName("Open Order") + "\n" + + " order-a:\n" + + " type:\n" + + " blueId: " + orderProvider.getBlueIdByName("Open Order") + "\n" + + " status: open", Node.class)), + orderBlue.conformanceEngine(), + Arrays.asList(JsonPatch.replace("/orders/order-a/status", new Node().value("closed"))), + "dictionary valueType update"); + + BasicNodeProvider itemProvider = itemListProvider(); + Blue itemBlue = new Blue(itemProvider); + assertBatchMatchesSequential(itemBlue.resolve(YAML_MAPPER.readValue( + "name: Untyped List\n" + + "entries:\n" + + " type:\n" + + " blueId: " + Properties.LIST_TYPE_BLUE_ID + "\n" + + " itemType:\n" + + " blueId: " + itemProvider.getBlueIdByName("Open Item") + "\n" + + " items:\n" + + " - type:\n" + + " blueId: " + itemProvider.getBlueIdByName("Open Item") + "\n" + + " status: open", Node.class)), + itemBlue.conformanceEngine(), + Arrays.asList(JsonPatch.replace("/entries/0/status", new Node().value("closed"))), + "list itemType update"); + } + + private BasicNodeProvider productWithAvailabilityProvider() { + BasicNodeProvider nodeProvider = ConformanceEngineTest.priceProvider(); + nodeProvider.addSingleDocs( + "name: Availability\n" + + "region:\n" + + " type: Text"); + nodeProvider.addSingleDocs( + "name: EU Availability\n" + + "type:\n" + + " blueId: " + nodeProvider.getBlueIdByName("Availability") + "\n" + + "region: EU"); + nodeProvider.addSingleDocs( + "name: Global Listed Product\n" + + "price:\n" + + " type:\n" + + " blueId: " + nodeProvider.getBlueIdByName("Price") + "\n" + + "availability:\n" + + " type:\n" + + " blueId: " + nodeProvider.getBlueIdByName("Availability")); + nodeProvider.addSingleDocs( + "name: European Listed Product\n" + + "type:\n" + + " blueId: " + nodeProvider.getBlueIdByName("Global Listed Product") + "\n" + + "price:\n" + + " type:\n" + + " blueId: " + nodeProvider.getBlueIdByName("Price in EUR") + "\n" + + "availability:\n" + + " type:\n" + + " blueId: " + nodeProvider.getBlueIdByName("EU Availability")); + return nodeProvider; + } + + private void assertBatchMatchesSequential(Node initial, + ConformanceEngine conformanceEngine, + List patches, + String label) { + Node batchDocument = initial.clone(); + Node sequentialDocument = initial.clone(); + List batchUpdates = + new DocumentProcessingRuntime(batchDocument, conformanceEngine).applyPatches("/", patches); + List sequentialUpdates = + applySequential(sequentialDocument, conformanceEngine, patches); + + assertEquivalentDocuments(sequentialDocument, batchDocument, label); + assertEquals(updatePaths(sequentialUpdates), updatePaths(batchUpdates), label + " update paths"); + } + + private List applySequential(Node document, + ConformanceEngine conformanceEngine, + List patches) { + DocumentProcessingRuntime sequential = new DocumentProcessingRuntime(document, conformanceEngine); + List updates = new ArrayList<>(); + for (JsonPatch patch : patches) { + updates.add(sequential.applyPatch("/", patch)); + } + return updates; + } + + private void assertEquivalentDocuments(Node expected, Node actual, String label) { + assertEquals(BlueIdCalculator.calculateBlueId(expected), + BlueIdCalculator.calculateBlueId(actual), + label); + } + + private List updatePaths(List updates) { + List paths = new ArrayList<>(); + for (DocumentProcessingRuntime.DocumentUpdateData update : updates) { + assertNotNull(update); + paths.add(update.path()); + } + return paths; + } + + private Node listDocument() { + return new Node().properties("values", new Node().items(Arrays.asList( + new Node().value(1), + new Node().value(2), + new Node().value(3)))); + } + + private BasicNodeProvider orderBookProvider() { + BasicNodeProvider nodeProvider = new BasicNodeProvider(); + nodeProvider.addSingleDocs( + "name: Order\n" + + "status:\n" + + " type: Text"); + nodeProvider.addSingleDocs( + "name: Open Order\n" + + "type:\n" + + " blueId: " + nodeProvider.getBlueIdByName("Order") + "\n" + + "status: open"); + nodeProvider.addSingleDocs( + "name: Open Order Book\n" + + "orders:\n" + + " type: Dictionary\n" + + " keyType: Text\n" + + " valueType:\n" + + " blueId: " + nodeProvider.getBlueIdByName("Order")); + return nodeProvider; + } + + private BasicNodeProvider itemListProvider() { + BasicNodeProvider nodeProvider = new BasicNodeProvider(); + nodeProvider.addSingleDocs( + "name: Item\n" + + "status:\n" + + " type: Text"); + nodeProvider.addSingleDocs( + "name: Open Item\n" + + "type:\n" + + " blueId: " + nodeProvider.getBlueIdByName("Item") + "\n" + + "status: open"); + nodeProvider.addSingleDocs( + "name: Open Item List\n" + + "entries:\n" + + " type: List\n" + + " itemType:\n" + + " blueId: " + nodeProvider.getBlueIdByName("Item")); + return nodeProvider; + } } diff --git a/src/test/java/blue/language/processor/DocumentProcessorSnapshotTransactionTest.java b/src/test/java/blue/language/processor/DocumentProcessorSnapshotTransactionTest.java index 4fee423..255eabe 100644 --- a/src/test/java/blue/language/processor/DocumentProcessorSnapshotTransactionTest.java +++ b/src/test/java/blue/language/processor/DocumentProcessorSnapshotTransactionTest.java @@ -35,7 +35,8 @@ void runtimePatchUsesCanonicalOverlaySnapshotWhenNoGeneralizationIsNeeded() { assertEquals(2, document.getAsInteger("/x")); assertEquals(1, manager.fromDocumentCalls); - assertEquals(1, manager.applyPatchCalls); + assertEquals(0, manager.applyPatchCalls); + assertEquals(1, manager.cacheSnapshotCalls); assertEquals(2, runtime.snapshot().canonicalRoot().getAsInteger("/x")); assertEquals("keep", runtime.snapshot().canonicalRoot().getAsText("/other")); assertSnapshotConsistent(runtime.snapshot()); @@ -77,10 +78,11 @@ void snapshotPlanIsAuthoritativeAfterImmutablePlanning() { runtime.applyPatch("/", JsonPatch.replace("/x", new Node().value(2))); - assertEquals(1, document.getAsInteger("/x")); - assertEquals(1, runtime.snapshot().resolvedRoot().getAsInteger("/x")); + assertEquals(2, document.getAsInteger("/x")); + assertEquals(2, runtime.snapshot().resolvedRoot().getAsInteger("/x")); assertEquals(1, manager.fromDocumentCalls); - assertEquals(1, manager.applyPatchCalls); + assertEquals(0, manager.applyPatchCalls); + assertEquals(1, manager.cacheSnapshotCalls); assertSnapshotConsistent(runtime.snapshot()); } @@ -107,7 +109,8 @@ void runtimeSnapshotTracksMixedAddReplaceRemoveAndArrayAppendPatches() { assertEquals("new", canonical.getAsText("/tags/1")); assertMissing(canonical, "/obsolete"); assertEquals(1, manager.fromDocumentCalls); - assertEquals(4, manager.applyPatchCalls); + assertEquals(0, manager.applyPatchCalls); + assertEquals(4, manager.cacheSnapshotCalls); assertSnapshotConsistent(runtime.snapshot()); } @@ -236,7 +239,7 @@ void directWriteKeepsCanonicalSnapshotInTheSameRuntimeTransaction() { } @Test - void canonicalOverlayFailureFallsBackToFullSnapshotRebuildWithoutChangingGasPath() { + void runtimePatchCommitsBatchSnapshotWithoutSnapshotPatchManagerFallback() { CountingSnapshotManager manager = new CountingSnapshotManager(); manager.failApplyPatch = true; Node document = YAML_MAPPER.readValue("x: 1", Node.class); @@ -245,17 +248,17 @@ void canonicalOverlayFailureFallsBackToFullSnapshotRebuildWithoutChangingGasPath runtime.applyPatch("/", JsonPatch.replace("/x", new Node().value(2))); assertEquals(2, document.getAsInteger("/x")); - assertEquals(2, manager.fromDocumentCalls); - assertEquals(1, manager.applyPatchCalls); + assertEquals(1, manager.fromDocumentCalls); + assertEquals(0, manager.applyPatchCalls); + assertEquals(1, manager.cacheSnapshotCalls); assertEquals(2, runtime.snapshot().canonicalRoot().getAsInteger("/x")); assertSnapshotConsistent(runtime.snapshot()); } @Test - void snapshotRebuildFailureRollsBackDocumentAndSnapshotTogether() { + void batchSnapshotCacheFailureRollsBackDocumentAndSnapshotTogether() { CountingSnapshotManager manager = new CountingSnapshotManager(); - manager.failApplyPatch = true; - manager.failFromDocumentOnCall = 2; + manager.failCacheSnapshot = true; Node document = YAML_MAPPER.readValue("x: 1", Node.class); DocumentProcessingRuntime runtime = new DocumentProcessingRuntime(document, null, manager); @@ -263,8 +266,9 @@ void snapshotRebuildFailureRollsBackDocumentAndSnapshotTogether() { () -> runtime.applyPatch("/", JsonPatch.replace("/x", new Node().value(2)))); assertEquals(1, document.getAsInteger("/x")); - assertEquals(2, manager.fromDocumentCalls); - assertEquals(1, manager.applyPatchCalls); + assertEquals(1, manager.fromDocumentCalls); + assertEquals(0, manager.applyPatchCalls); + assertEquals(1, manager.cacheSnapshotCalls); } @Test @@ -470,7 +474,7 @@ void executionContextReadsUseResolvedSnapshotIndexWhenSnapshotIsAvailable() { } @Test - void processorPatchToInheritedValueIsMinimizedOutOfCanonicalSnapshot() { + void processorPatchToInheritedValueKeepsCanonicalOverrideWhileBatchMinimizationIsDisabled() { BasicNodeProvider provider = new BasicNodeProvider(); provider.addSingleDocs( "name: Money\n" + @@ -501,7 +505,7 @@ void processorPatchToInheritedValueIsMinimizedOutOfCanonicalSnapshot() { blue.objectToNode(new TestEvent().eventId("evt-inherited"))); assertEquals(0, processed.resolvedDocument().getAsInteger("/balance/cents")); - assertMissing(processed.canonicalDocument(), "/balance/cents"); + assertEquals(0, processed.canonicalDocument().getAsInteger("/balance/cents")); assertSnapshotConsistent(processed.snapshot()); } @@ -554,6 +558,7 @@ private static final class CountingSnapshotManager implements ProcessingSnapshot private int applyPatchCalls; private int cacheSnapshotCalls; private boolean failApplyPatch; + private boolean failCacheSnapshot; private boolean returnCurrentSnapshotOnApplyPatch; private int failFromDocumentOnCall; @@ -611,6 +616,9 @@ public ResolvedSnapshot applyPatch(ResolvedSnapshot snapshot, JsonPatch patch) { @Override public ResolvedSnapshot cacheSnapshot(ResolvedSnapshot snapshot) { cacheSnapshotCalls++; + if (failCacheSnapshot) { + throw new IllegalStateException("snapshot cache failed"); + } return snapshot; } } diff --git a/src/test/java/blue/language/processor/contracts/ApplyBatchPatchContractProcessor.java b/src/test/java/blue/language/processor/contracts/ApplyBatchPatchContractProcessor.java new file mode 100644 index 0000000..69ba2ca --- /dev/null +++ b/src/test/java/blue/language/processor/contracts/ApplyBatchPatchContractProcessor.java @@ -0,0 +1,25 @@ +package blue.language.processor.contracts; + +import blue.language.model.Node; +import blue.language.processor.HandlerProcessor; +import blue.language.processor.ProcessorExecutionContext; +import blue.language.processor.model.ApplyBatchPatch; +import blue.language.processor.model.JsonPatch; + +import java.util.Arrays; + +public class ApplyBatchPatchContractProcessor implements HandlerProcessor { + + @Override + public Class contractType() { + return ApplyBatchPatch.class; + } + + @Override + public void execute(ApplyBatchPatch contract, ProcessorExecutionContext context) { + context.applyPatches(Arrays.asList( + JsonPatch.replace("/a", new Node().value("one")), + JsonPatch.replace("/b", new Node().value("two")) + )); + } +} diff --git a/src/test/java/blue/language/processor/contracts/RecordDocumentUpdateContractProcessor.java b/src/test/java/blue/language/processor/contracts/RecordDocumentUpdateContractProcessor.java new file mode 100644 index 0000000..f37902a --- /dev/null +++ b/src/test/java/blue/language/processor/contracts/RecordDocumentUpdateContractProcessor.java @@ -0,0 +1,30 @@ +package blue.language.processor.contracts; + +import blue.language.model.Node; +import blue.language.processor.HandlerProcessor; +import blue.language.processor.ProcessorExecutionContext; +import blue.language.processor.model.RecordDocumentUpdate; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class RecordDocumentUpdateContractProcessor implements HandlerProcessor { + + private final List paths = new ArrayList<>(); + + @Override + public Class contractType() { + return RecordDocumentUpdate.class; + } + + @Override + public void execute(RecordDocumentUpdate contract, ProcessorExecutionContext context) { + Node path = context.event().getProperties().get("path"); + paths.add(path != null ? String.valueOf(path.getValue()) : null); + } + + public List paths() { + return Collections.unmodifiableList(paths); + } +} diff --git a/src/test/java/blue/language/processor/model/ApplyBatchPatch.java b/src/test/java/blue/language/processor/model/ApplyBatchPatch.java new file mode 100644 index 0000000..2ac3337 --- /dev/null +++ b/src/test/java/blue/language/processor/model/ApplyBatchPatch.java @@ -0,0 +1,7 @@ +package blue.language.processor.model; + +import blue.language.model.TypeBlueId; + +@TypeBlueId("ApplyBatchPatch") +public class ApplyBatchPatch extends HandlerContract { +} diff --git a/src/test/java/blue/language/processor/model/RecordDocumentUpdate.java b/src/test/java/blue/language/processor/model/RecordDocumentUpdate.java new file mode 100644 index 0000000..3905963 --- /dev/null +++ b/src/test/java/blue/language/processor/model/RecordDocumentUpdate.java @@ -0,0 +1,7 @@ +package blue.language.processor.model; + +import blue.language.model.TypeBlueId; + +@TypeBlueId("RecordDocumentUpdate") +public class RecordDocumentUpdate extends HandlerContract { +}