Skip to content

[FLINK-39558][table] LogicalUnnestRule: use Calcite Uncollect rowType instead of LogicalType round-trip#28053

Draft
jnh5y wants to merge 2 commits intoapache:masterfrom
jnh5y:flink-39558-uncollect-rowtype
Draft

[FLINK-39558][table] LogicalUnnestRule: use Calcite Uncollect rowType instead of LogicalType round-trip#28053
jnh5y wants to merge 2 commits intoapache:masterfrom
jnh5y:flink-39558-uncollect-rowtype

Conversation

@jnh5y
Copy link
Copy Markdown
Contributor

@jnh5y jnh5y commented Apr 27, 2026

What is the purpose of the change

Fixes an internal-error class in LogicalUnnestRule where the TableFunctionScan rowType diverges from what Calcite derives for the original Correlate(Uncollect) tree, causing RelOptUtil.verifyTypeEquivalence to fail for LEFT JOIN UNNEST shapes that don't fit the FLINK-33217 patch path.

Repro (fails on release-2.0 / master, passes with this PR):

CREATE TABLE nested_not_null (
  business_data ARRAY<STRING NOT NULL>,
  nested ROW<`data` ARRAY<STRING NOT NULL>>,
  nested_array ARRAY<ROW<`data` ARRAY<STRING NOT NULL>> NOT NULL>
);

-- Bare Uncollect under LEFT correlate
SELECT * FROM nested_not_null
  LEFT JOIN UNNEST(nested_not_null.business_data) AS exploded_bd ON TRUE;

-- ON-predicate adds a Filter between Correlate and Uncollect
SELECT * FROM nested_not_null
  LEFT JOIN UNNEST(nested_not_null.business_data) AS exploded_bd
  ON exploded_bd <> 'debug';

Both crash with java.lang.AssertionError: Cannot add expression of different type to set.

Jira: FLINK-39558

Brief change log

  • Replace UnnestRowsFunctionBase.getUnnestedType(...) round-trip in LogicalUnnestRule with Calcite's uncollect.getRowType(). This makes the rewritten Correlate's derived rowType match the original byte-for-byte.
  • Remove the now-dead getLogicalProjectWithAdjustedNullability and createNullableType helpers (FLINK-33217's CAST-to-nullable patchwork is no longer needed because the divergence at the source is gone).
  • Add reproducers for the two LEFT-JOIN-UNNEST shapes that were not covered by FLINK-33217 (bare Uncollect, Filter(Uncollect)).
  • Re-record plan fixtures in UnnestTest.xml (batch + stream), LogicalUnnestRuleTest.xml, MultiJoinTest.xml, and JavaCatalogTableTest.xml to reflect Calcite-derived field naming.

Field naming change

Calcite's Uncollect derives field names from the source array, so plan output for UNNEST columns changes:

  • ARRAY<T> / MULTISET<T>: the unnested column is named after the source array column (e.g., tags0 instead of synthetic f0 / EXPR$0).
  • MAP<K,V>: key/value columns are named KEY and VALUE instead of f0 / f1.
  • WITH ORDINALITY: ordinality column is named ORDINALITY (unchanged).

Multiple unnests in the same query are auto-disambiguated by Calcite's outer Correlate (e.g. two MAP unnests produce KEY, VALUE, KEY0, VALUE0).

The runtime INTERNAL_UNNEST_ROWS function is positional, so persisted CompiledPlan instances continue to restore correctly (verified by CorrelateRestoreTest).

Verifying this change

This change adds 2 reproducer tests in commit 1, which fail at HEAD~1 with RelOptUtil.verifyTypeEquivalence and pass with the fix in commit 2:

  • UnnestTestBase.testNullMismatchLeftJoinNoAliasList
  • UnnestTestBase.testNullMismatchLeftJoinOnPredicate

Full flink-table-planner suite passes (10691 tests, 0 failures, 41 skipped). CorrelateRestoreTest passes, demonstrating that persisted CompiledPlans containing UNNEST continue to restore correctly.

Does this pull request potentially affect one of the following parts?

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? n/a

jnh5y added 2 commits April 27, 2026 16:35
Adds two tests against the existing nested_not_null.business_data column,
covering LEFT-JOIN UNNEST shapes that hit RelOptUtil.verifyTypeEquivalence
("Cannot add expression of different type to set"):

  - testNullMismatchLeftJoinNoAliasList: bare Uncollect under LEFT (no
    column-list alias inserts no Project).
  - testNullMismatchLeftJoinOnPredicate: ON-clause predicate adds a
    LogicalFilter between the LEFT correlate and the Uncollect.

Generated-by: claude-opus-4-7
Replace the LogicalType round-trip in LogicalUnnestRule with Calcite's
native Uncollect.deriveRowType output. The previous code derived the
table-function-scan rowType via:

    Calcite RelDataType -> Flink LogicalType ->
    UnnestRowsFunctionBase.getUnnestedType(...) ->
    Calcite RelDataType (via createFieldTypeFromLogicalType)

This round-trip dropped per-field nullability information that Calcite's
upstream Correlate had derived for the LEFT-JOIN case, causing
RelOptUtil.verifyTypeEquivalence to fail with "Cannot add expression of
different type to set" whenever the array element was NOT NULL but the
LEFT join made the right-side fields nullable. The previous fix
(getLogicalProjectWithAdjustedNullability) patched up that divergence by
inserting CAST-to-nullable wrappers, but only for shapes where a
LogicalProject sat between the Correlate and the Uncollect; bare
Uncollect, Filter(Uncollect), and Filter(Project(Uncollect)) shapes were
not covered.

Using uncollect.getRowType() makes the rewritten Correlate's derived
rowType match the original byte-for-byte, eliminating the divergence at
the source. The patchwork helper and its dependent imports are removed.

Field naming follows Calcite's Uncollect convention now:
* ARRAY/MULTISET<T>: element column is named after the source array
  column with a numeric suffix to disambiguate (e.g. "tags0" instead of
  the synthetic "f0" / "EXPR$0").
* MAP<K,V>: key/value columns are named "KEY"/"VALUE" instead of
  "f0"/"f1".
* WITH ORDINALITY: ordinality column is named "ORDINALITY" (unchanged).

Recorded plan fixtures in UnnestTest.xml, LogicalUnnestRuleTest.xml, and
MultiJoinTest.xml are updated to reflect the new naming. The runtime
INTERNAL_UNNEST_ROWS function is positional, so existing CompiledPlan
instances continue to restore correctly (verified by
CorrelateRestoreTest).

Generated-by: claude-opus-4-7
@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Apr 27, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants