Skip to content

[FLINK-39401][formats] Port raw line-delimiter option to release-1.15#28048

Open
featzhang wants to merge 3 commits intoapache:release-1.15from
featzhang:feature/FLINK-39401-raw-line-delimiter
Open

[FLINK-39401][formats] Port raw line-delimiter option to release-1.15#28048
featzhang wants to merge 3 commits intoapache:release-1.15from
featzhang:feature/FLINK-39401-raw-line-delimiter

Conversation

@featzhang
Copy link
Copy Markdown
Member

What is the purpose of the change

Port PR #27897 (FLINK-39401) from master to release-1.15. Extends the raw format with an optional raw.line-delimiter configuration that lets each Kafka/file message encode multiple records separated by a delimiter.

Note: I'm aware release-1.15 is past its community maintenance window. Opening this PR for visibility in case anyone else on 1.15 needs the same backport; feel free to close if out of scope.

Brief change log

  • RawFormatOptions: add LINE_DELIMITER ConfigOption (no default, supports Java escape sequences like \n, \r\n).
  • RawFormatFactory: read the option, register it in optionalOptions(), and pass it to the (de)serialization schemas.
  • RawFormatDeserializationSchema:
    • new 5-arg constructor accepting @Nullable String lineDelimiter; the previous 4-arg constructor delegates with null for backward compatibility.
    • pre-compiled Pattern for splitting; new deserialize(byte[], Collector<RowData>) override emits one RowData per segment.
    • null message with delimiter → zero rows; trailing delimiter stripped so round-trip produces one row, not two.
  • RawFormatSerializationSchema:
    • new 4-arg constructor accepting @Nullable String lineDelimiter; old 3-arg constructor delegates with null.
    • pre-computed delimiterBytes; serialize() appends them after the value bytes. null row still returns null.

Verifying this change

Added tests:

  • RawFormatFactoryTest.testLineDelimiterOption — verifies the factory wires the option through correctly.
  • RawFormatLineDelimiterTest (new, 11 tests) — covers:
    • deserialize without / with \n / with multi-char / with GBK charset delimiters
    • null message, trailing delimiter, round-trip
    • serialize without / with \n / with custom delimiter, null row

Run:

mvn test -pl flink-table/flink-table-runtime -Dtest='RawFormat*'

Result: Tests run: 51, Failures: 0, Errors: 0, Skipped: 0 (11 new + 7 factory + 33 existing SerDe).

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no (new option is additive, behavior unchanged when unset)
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): yes — raw format deserialize/serialize. When the option is unset, the behavior and allocations are unchanged; when set, a pre-compiled Pattern and pre-computed byte[] avoid per-record allocation.
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

Port PR apache#27897 from master to release-1.15. Extends the raw format to
support an optional 'raw.line-delimiter' configuration option:

- Deserialization: splits each incoming message by the delimiter using
  a pre-compiled Pattern and emits one RowData per segment.
  Null messages with delimiter produce zero rows. Trailing delimiter is
  stripped to ensure round-trip compatibility.
- Serialization: appends delimiter bytes (pre-computed) after each
  serialized value.
- Backward compatible: all existing behavior preserved when
  raw.line-delimiter is not set.

Changes:
- RawFormatOptions: add LINE_DELIMITER ConfigOption (no default value)
- RawFormatFactory: read option, pass to schema builders, register in
  optionalOptions()
- RawFormatDeserializationSchema: add lineDelimiter + lineDelimiterPattern
  fields, new 5-arg constructor, override deserialize(byte[], Collector)
- RawFormatSerializationSchema: add lineDelimiter + delimiterBytes fields,
  new 4-arg constructor, append delimiter in serialize()
- RawFormatFactoryTest: add testLineDelimiterOption()
- RawFormatLineDelimiterTest: new test class with 9 tests (JUnit 4)
@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Apr 27, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@spuru9
Copy link
Copy Markdown
Contributor

spuru9 commented Apr 27, 2026

@featzhang flinkbot is acting weirdly. Try making a empty comment. git commit -m "trigger" --allow-empty.

@github-actions github-actions Bot added the community-reviewed PR has been reviewed by the community. label Apr 28, 2026
CI failures on previous build are unrelated to this PR:
- Azure agent pool image label missing for release-1.15 pipeline
- Pre-existing flaky WikipediaEditsSourceTest (external IRC dependency)

Empty commit to re-trigger Azure CI.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community. target:release-1.15

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants