Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .cz.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
name = "cz_conventional_commits"
tag_format = "v$version"
version_scheme = "semver"
version = "2.0.0"
version = "2.0.1"
update_changelog_on_bump = true
58 changes: 48 additions & 10 deletions docs/snapshots-patching-and-generalization.md
Original file line number Diff line number Diff line change
Expand Up @@ -348,25 +348,61 @@ Changed paths include:

## Processor Transaction Flow

`DocumentProcessingRuntime.applyPatch(...)` now works roughly like this:
`DocumentProcessingRuntime.applyPatches(...)` now works roughly like this:

```text
rollback = current mutable materialized view
baseSnapshot = current snapshot or snapshotManager.fromDocument(...)
canonicalPlan = ImmutablePatchPlanner(baseSnapshot.canonical).plan(...)
resolvedPlan = ImmutablePatchPlanner(baseSnapshot.resolved).plan(...)
conformancePlan = ConformanceEngine.planGeneralization(...)
if generalized:
commit generalized snapshot
else:
commit normal canonical patch snapshot
for each patch:
canonicalPlan = ImmutablePatchPlanner(working canonical root).plan(...)
resolvedPlan = ImmutablePatchPlanner(working resolved root).plan(...)
remember frozen before/after update metadata
conformancePlan = ConformanceEngine.planGeneralization(..., changedPaths)
commit final canonical/resolved roots once
on failure:
restore rollback and previous snapshot
restore previous snapshot if one was active
```

Batch conformance selects changed paths whose final resolved path has typed
metadata at the changed node or one of its ancestors up to the origin scope.
This catches typed descendants below an otherwise untyped root, including list
`itemType` and dictionary `valueType` paths, while leaving unrelated untyped
processor-managed writes out of the conformance planner.

The mutable `Node` view is now a compatibility adapter generated from the
canonical snapshot. Snapshot state is authoritative.

## Batch Patch Application

`ProcessorExecutionContext.applyPatches(List<JsonPatch>)` applies a changeset
atomically.

Semantics:

- patches are applied in order
- duplicate paths are preserved
- if any patch fails, the full batch rolls back
- the mutable materialized root is not deep-copied before a batch; planning runs
on frozen roots and the materialized view changes only at commit
- conformance/generalization is planned over the final working roots
- the runtime commits once
- document update events are returned and routed in patch order after the batch
commit
- update `before` values describe the value at the patch path immediately before
that patch entry was applied
- update `after` values normally describe the committed post-conformance value
at the patch path; if a later patch in the same batch overlaps that path, the
earlier update keeps its patch-time intermediate `after` value so duplicate
and add/remove patch-entry order remains observable
- update before/after values stay frozen-backed and materialize to `Node` only
when a matching `DocumentUpdateChannel` needs an event or a caller explicitly
reads `before()` / `after()`
- batch timing and update materialization counters are exposed package-privately
for tests and performance investigation
- `applyPatch` delegates to `applyPatches(singletonList(...))`

This is the preferred path for workflow steps such as `Conversation/Update
Document` that apply a computed changeset.

## Gas And Caching

Resolved snapshot/type caches affect CPU and provider fetches, not gas.
Expand Down Expand Up @@ -405,4 +441,6 @@ Still missing:
- `ConformanceEngineTest`
- `DocumentProcessorSnapshotTransactionTest`
- `DocumentProcessorGeneralizationTest`
- `DocumentProcessingRuntimeBatchPatchTest`
- `DocumentProcessorBatchPatchTest`
- `DocumentProcessorGasTest`
92 changes: 86 additions & 6 deletions src/main/java/blue/language/Blue.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import blue.language.snapshot.ResolvedSnapshot;
import blue.language.utils.*;
import blue.language.utils.limits.CompositeLimits;
import blue.language.utils.limits.ExcludedPathLimits;
import blue.language.utils.limits.Limits;

import java.util.ArrayList;
Expand All @@ -40,6 +41,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Predicate;

import static blue.language.utils.UncheckedObjectMapper.JSON_MAPPER;
import static blue.language.utils.UncheckedObjectMapper.YAML_MAPPER;
Expand Down Expand Up @@ -109,6 +111,52 @@ public Node resolve(Node node, Limits limits) {
return merger.resolve(node, effectiveLimits);
}

public Node resolvePreservingPaths(Node node, Collection<String> preservedPaths) {
return resolvePreservingPaths(node, NO_LIMITS, preservedPaths);
}

public Node resolvePreservingPaths(Node node, Limits limits, Collection<String> preservedPaths) {
if (node == null) {
throw new IllegalArgumentException("node must not be null");
}
Set<String> canonicalPreservedPaths = canonicalPreservedPaths(preservedPaths);
if (canonicalPreservedPaths.isEmpty()) {
return resolve(node.clone(), limits);
}
if (canonicalPreservedPaths.contains("/")) {
return node.clone();
}

Limits preservingLimits = limits == NO_LIMITS
? ExcludedPathLimits.excluding(canonicalPreservedPaths)
: new CompositeLimits(limits, ExcludedPathLimits.excluding(canonicalPreservedPaths));
Node resolved = resolve(node.clone(), preservingLimits);
for (String path : canonicalPreservedPaths) {
Node preserved = NodePathEditor.getOrNull(node, path);
if (preserved != null) {
NodePathEditor.put(resolved, path, preserved.clone());
}
}
return resolved;
}

public List<String> selectPaths(Node node, Collection<String> pathPatterns, Predicate<Node> predicate) {
return NodePathSelector.select(node, pathPatterns, predicate);
}

public Node resolvePreservingMatchingPaths(Node node,
Collection<String> pathPatterns,
Predicate<Node> predicate) {
return resolvePreservingMatchingPaths(node, NO_LIMITS, pathPatterns, predicate);
}

public Node resolvePreservingMatchingPaths(Node node,
Limits limits,
Collection<String> pathPatterns,
Predicate<Node> predicate) {
return resolvePreservingPaths(node, limits, selectPaths(node, pathPatterns, predicate));
}

public Node reverse(Node node) {
return new MergeReverser().reverse(node);
}
Expand Down Expand Up @@ -383,11 +431,23 @@ public Blue registerContractProcessor(String blueId, ContractProcessor<? extends
}

public DocumentProcessingResult processDocument(Node document, Node event) {
return attachProcessingSnapshot(ensureDocumentProcessor().processDocument(document, event));
DocumentProcessor processor = ensureDocumentProcessor();
long start = System.nanoTime();
try {
return attachProcessingSnapshot(processor, processor.processDocument(document, event));
} finally {
processor.processingMetricsSink().addBlueProcessDocumentNanos(System.nanoTime() - start);
}
}

public DocumentProcessingResult processDocument(ResolvedSnapshot snapshot, Node event) {
return ensureDocumentProcessor().processDocument(snapshot, event);
DocumentProcessor processor = ensureDocumentProcessor();
long start = System.nanoTime();
try {
return processor.processDocument(snapshot, event);
} finally {
processor.processingMetricsSink().addBlueProcessDocumentNanos(System.nanoTime() - start);
}
}

public DocumentProcessor getDocumentProcessor() {
Expand All @@ -403,7 +463,8 @@ public Blue documentProcessor(DocumentProcessor documentProcessor) {
}

public DocumentProcessingResult initializeDocument(Node document) {
return attachProcessingSnapshot(ensureDocumentProcessor().initializeDocument(document));
DocumentProcessor processor = ensureDocumentProcessor();
return attachProcessingSnapshot(processor, processor.initializeDocument(document));
}

public DocumentProcessingResult initializeDocument(ResolvedSnapshot snapshot) {
Expand Down Expand Up @@ -510,11 +571,18 @@ private DocumentProcessor createDefaultDocumentProcessor() {
.build();
}

private DocumentProcessingResult attachProcessingSnapshot(DocumentProcessingResult result) {
private DocumentProcessingResult attachProcessingSnapshot(DocumentProcessor processor, DocumentProcessingResult result) {
if (result == null || result.capabilityFailure() || result.snapshot() != null) {
return result;
}
return result.withSnapshot(resolveProcessingSnapshot(result.document()));
long start = System.nanoTime();
try {
return result.withSnapshot(resolveProcessingSnapshot(result.document()));
} finally {
long nanos = System.nanoTime() - start;
processor.processingMetricsSink().addResultSnapshotAttachNanos(nanos);
processor.processingMetricsSink().addBlueIdCalculationNanos(nanos);
}
}

private void refreshDocumentProcessorConformanceEngine() {
Expand All @@ -523,7 +591,8 @@ private void refreshDocumentProcessorConformanceEngine() {
documentProcessor.getContractTypeResolver(),
conformanceEngine(),
processingSnapshotManager(),
new ContractMatchingService(this));
new ContractMatchingService(this),
documentProcessor.processingMetricsSink());
}
}

Expand Down Expand Up @@ -612,6 +681,17 @@ private boolean canMinimizePatchedOverride(JsonPatch patch) {
return true;
}

private Set<String> canonicalPreservedPaths(Collection<String> preservedPaths) {
if (preservedPaths == null || preservedPaths.isEmpty()) {
return Collections.emptySet();
}
Set<String> canonicalPaths = new HashSet<>();
for (String preservedPath : preservedPaths) {
canonicalPaths.add(JsonPointer.canonicalize(preservedPath));
}
return canonicalPaths;
}

private NodeProvider processorSnapshotNodeProvider() {
Set<String> processorTypeBlueIds = new HashSet<>(PROCESSOR_MANAGED_TYPE_BLUE_IDS);
if (documentProcessor != null) {
Expand Down
36 changes: 36 additions & 0 deletions src/main/java/blue/language/conformance/ConformanceEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import blue.language.utils.NodeProviderWrapper;
import blue.language.utils.limits.Limits;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

public final class ConformanceEngine {
Expand Down Expand Up @@ -60,4 +62,38 @@ public ConformancePlan planGeneralization(FrozenNode canonicalRoot, FrozenNode r
return new FrozenConformancePlanner(nodeProvider, mergingProcessor, resolvedReferenceCache)
.plan(canonicalRoot, resolvedRoot, changedPath);
}

public ConformancePlan planGeneralization(FrozenNode canonicalRoot,
FrozenNode resolvedRoot,
List<String> changedPaths) {
if (changedPaths == null || changedPaths.isEmpty()) {
return ConformancePlan.unchanged(canonicalRoot, resolvedRoot);
}
FrozenNode nextCanonical = canonicalRoot;
FrozenNode nextResolved = resolvedRoot;
boolean generalized = false;
List<CanonicalGeneralizationPatch> canonicalPatches = new ArrayList<>();
List<String> allChangedPaths = new ArrayList<>();
FrozenConformancePlanner planner = new FrozenConformancePlanner(nodeProvider,
mergingProcessor,
resolvedReferenceCache);
for (String changedPath : changedPaths) {
ConformancePlan plan = planner.plan(nextCanonical, nextResolved, changedPath);
nextCanonical = plan.canonicalRoot() != null ? plan.canonicalRoot() : nextCanonical;
nextResolved = plan.root();
if (plan.generalized()) {
generalized = true;
canonicalPatches.addAll(plan.canonicalPatches());
allChangedPaths.addAll(plan.changedPaths());
}
}
if (!generalized) {
return ConformancePlan.unchanged(nextCanonical, nextResolved);
}
return ConformancePlan.generalized(nextCanonical,
nextResolved,
canonicalPatches,
allChangedPaths,
nextCanonical != null);
}
}
13 changes: 13 additions & 0 deletions src/main/java/blue/language/conformance/ConformancePlan.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,19 @@ public static ConformancePlan unchanged(FrozenNode canonicalRoot, FrozenNode roo
canonicalRoot != null);
}

public static ConformancePlan generalized(FrozenNode canonicalRoot,
FrozenNode root,
List<CanonicalGeneralizationPatch> canonicalPatches,
List<String> changedPaths,
boolean fullSnapshotRebuildAvoidable) {
return new ConformancePlan(canonicalRoot,
root,
true,
canonicalPatches,
changedPaths,
fullSnapshotRebuildAvoidable);
}

public FrozenNode canonicalRoot() {
return canonicalRoot;
}
Expand Down
Loading
Loading