diff --git a/Cargo.lock b/Cargo.lock index ac5b27f8..50f04dc6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -746,6 +746,15 @@ dependencies = [ "rustc_version", ] +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -975,6 +984,12 @@ version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +[[package]] +name = "endian-type" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" + [[package]] name = "equivalent" version = "1.0.2" @@ -1104,6 +1119,8 @@ dependencies = [ "jiff", "linked-hash-map", "log", + "metrics", + "metrics-util", "opendal", "ordered-float", "parking_lot", @@ -1403,6 +1420,9 @@ name = "hashbrown" version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" +dependencies = [ + "foldhash 0.2.0", +] [[package]] name = "heck" @@ -2054,6 +2074,36 @@ dependencies = [ "autocfg", ] +[[package]] +name = "metrics" +version = "0.24.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d5312e9ba3771cfa961b585728215e3d972c950a3eed9252aa093d6301277e8" +dependencies = [ + "ahash", + "portable-atomic", +] + +[[package]] +name = "metrics-util" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdfb1365fea27e6dd9dc1dbc19f570198bc86914533ad639dae939635f096be4" +dependencies = [ + "aho-corasick", + "crossbeam-epoch", + "crossbeam-utils", + "hashbrown 0.16.1", + "indexmap 2.13.1", + "metrics", + "ordered-float", + "quanta", + "radix_trie", + "rand 0.9.2", + "rand_xoshiro", + "sketches-ddsketch", +] + [[package]] name = "mime" version = "0.3.17" @@ -2077,6 +2127,15 @@ version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084" +[[package]] +name = "nibble_vec" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a5d83df9f36fe23f0c3648c6bbb8b0298bb5f1939c8f2704431371f4b84d43" +dependencies = [ + "smallvec", +] + [[package]] name = "num" version = "0.4.3" @@ -2546,6 +2605,21 @@ dependencies = [ "cc", ] +[[package]] +name = "quanta" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3ab5a9d756f0d97bdc89019bd2e4ea098cf9cde50ee7564dde6b81ccc8f06c7" +dependencies = [ + "crossbeam-utils", + "libc", + "once_cell", + "raw-cpuid", + "wasi", + "web-sys", + "winapi", +] + [[package]] name = "quick-xml" version = "0.37.5" @@ -2648,6 +2722,16 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc33ff2d4973d518d823d61aa239014831e521c75da58e3df4840d3f47749d09" +[[package]] +name = "radix_trie" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c069c179fcdc6a2fe24d8d18305cf085fdbd4f922c041943e203685d6a1c58fd" +dependencies = [ + "endian-type", + "nibble_vec", +] + [[package]] name = "rand" version = "0.8.5" @@ -2709,6 +2793,24 @@ dependencies = [ "getrandom 0.3.4", ] +[[package]] +name = "rand_xoshiro" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f703f4665700daf5512dcca5f43afa6af89f09db47fb56be587f80636bda2d41" +dependencies = [ + "rand_core 0.9.5", +] + +[[package]] +name = "raw-cpuid" +version = "11.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "498cd0dc59d73224351ee52a95fee0f1a617a2eae0e7d9d720cc622c73a54186" +dependencies = [ + "bitflags", +] + [[package]] name = "redox_syscall" version = "0.5.18" @@ -3214,6 +3316,12 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3a9fe34e3e7a50316060351f37187a3f546bce95496156754b601a5fa71b76e" +[[package]] +name = "sketches-ddsketch" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c6f73aeb92d671e0cc4dca167e59b2deb6387c375391bc99ee743f326994a2b" + [[package]] name = "slab" version = "0.4.12" diff --git a/crates/fluss/src/client/table/batch_scanner.rs b/crates/fluss/src/client/table/batch_scanner.rs new file mode 100644 index 00000000..a2c4c2fe --- /dev/null +++ b/crates/fluss/src/client/table/batch_scanner.rs @@ -0,0 +1,254 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! One-shot bounded scanner backed by a single `LimitScanRequest` RPC. +//! +//! Unlike [`crate::client::table::LogScanner`], a `BatchScanner` does not +//! subscribe to bucket offsets or stream from the server. It performs a single +//! eager request for up to `limit` rows from one `TableBucket` and exposes the +//! result as a single Arrow [`RecordBatch`] on the first call to +//! [`BatchScanner::poll_batch`]; subsequent calls return `None`. + +use crate::client::metadata::Metadata; +use crate::error::{ApiError, Error, FlussError, Result}; +use crate::metadata::{TableBucket, TableInfo}; +use crate::proto::ErrorResponse; +use crate::record::kv::{KvRecordBatch, KvRecordReadContext, ReadContext as KvReadContext, SchemaGetter}; +use crate::record::{LogRecordsBatches, ReadContext as ArrowReadContext, ScanBatch, RowAppendRecordBatchBuilder, to_arrow_schema}; +use crate::rpc::RpcClient; +use crate::rpc::message::LimitScanRequest; +use arrow::array::RecordBatch; +use arrow_schema::SchemaRef; +use bytes::Bytes; +use std::sync::Arc; + +/// Adapter over a [`TableInfo`] that satisfies [`SchemaGetter`] for a single +/// table. KV lookups always carry the same schema id, so we just hand back +/// the embedded schema. +struct TableInfoSchemaGetter { + schema: Arc, +} + +impl SchemaGetter for TableInfoSchemaGetter { + fn get_schema(&self, _schema_id: i16) -> Result> { + Ok(Arc::clone(&self.schema)) + } +} + +/// One-shot bounded scanner. +/// +/// The scanner sends a single `LimitScanRequest` on construction and caches +/// the resulting Arrow `RecordBatch`. The first `poll_batch()` returns the +/// batch (wrapped in a [`ScanBatch`]); the second returns `None`. +pub struct BatchScanner { + bucket: TableBucket, + /// Pre-fetched batch, taken out on the first `poll_batch` call. + batch: Option, +} + +impl BatchScanner { + pub(super) async fn new( + rpc_client: Arc, + metadata: Arc, + table_info: TableInfo, + projected_fields: Option>, + bucket: TableBucket, + limit: i32, + ) -> Result { + // Resolve leader for the target bucket (mirrors Lookuper's pattern). + let leader = metadata + .leader_for(&table_info.table_path, &bucket) + .await? + .ok_or_else(|| { + Error::leader_not_available(format!( + "No leader found for table bucket: {bucket}" + )) + })?; + let connection = rpc_client.get_connection(&leader).await?; + + // Fire the single LimitScanRequest RPC. + let request = LimitScanRequest::new( + table_info.table_id, + bucket.partition_id(), + bucket.bucket_id(), + limit, + ); + let response = connection.request(request).await?; + + // Surface server-side errors using the same shape as Lookuper. + if let Some(error_code) = response.error_code + && error_code != FlussError::None.code() + { + let err: ApiError = ErrorResponse { + error_code, + error_message: response.error_message.clone(), + } + .into(); + return Err(Error::FlussAPIError { api_error: err }); + } + + let is_log_table = response.is_log_table.unwrap_or(false); + let raw = response.records.unwrap_or_default(); + + let batch = if is_log_table { + decode_log_batch(&table_info, projected_fields.as_deref(), raw)? + } else { + decode_kv_batch(&table_info, projected_fields.as_deref(), raw)? + }; + + Ok(Self { + bucket, + batch: Some(batch), + }) + } + + /// Returns the pre-fetched batch on the first call, then `None`. + pub async fn poll_batch(&mut self) -> Result> { + Ok(self + .batch + .take() + .map(|b| ScanBatch::new(self.bucket.clone(), b, 0))) + } + + /// The bucket scanned by this `BatchScanner`. + pub fn bucket(&self) -> &TableBucket { + &self.bucket + } +} + +/// Decode an Arrow-IPC encoded `LogRecordBatch` payload into a single Arrow +/// `RecordBatch`. Multiple inner batches (rare for a `LimitScanRequest`) are +/// concatenated. +fn decode_log_batch( + table_info: &TableInfo, + projected_fields: Option<&[usize]>, + raw: Vec, +) -> Result { + let full_schema = to_arrow_schema(table_info.get_row_type())?; + let read_context = match projected_fields { + None => ArrowReadContext::new(full_schema.clone(), false), + Some(fields) => { + ArrowReadContext::with_projection_pushdown(full_schema.clone(), fields.to_vec(), false)? + } + }; + + let target_schema: SchemaRef = match projected_fields { + None => full_schema, + Some(fields) => ArrowReadContext::project_schema( + to_arrow_schema(table_info.get_row_type())?, + fields, + )?, + }; + + if raw.is_empty() { + return Ok(RecordBatch::new_empty(target_schema)); + } + + let mut batches: Vec = Vec::new(); + for log_batch in LogRecordsBatches::new(raw) { + let log_batch = log_batch?; + let rb = log_batch.record_batch(&read_context)?; + batches.push(rb); + } + + if batches.is_empty() { + return Ok(RecordBatch::new_empty(target_schema)); + } + if batches.len() == 1 { + return Ok(batches.into_iter().next().unwrap()); + } + arrow::compute::concat_batches(&target_schema, batches.iter()).map_err(|e| { + Error::UnexpectedError { + message: format!("Failed to concatenate log record batches: {e}"), + source: None, + } + }) +} + +/// Decode a KV-format payload into a single Arrow `RecordBatch`. Each +/// `CompactedRow` is appended through [`RowAppendRecordBatchBuilder`]; deletion +/// records (no value) are skipped because primary key tables don't return +/// tombstones from a limit scan. +fn decode_kv_batch( + table_info: &TableInfo, + projected_fields: Option<&[usize]>, + raw: Vec, +) -> Result { + let row_type = table_info.get_row_type(); + let full_arrow_schema = to_arrow_schema(row_type)?; + + if raw.is_empty() { + let schema: SchemaRef = match projected_fields { + None => full_arrow_schema, + Some(fields) => ArrowReadContext::project_schema(full_arrow_schema, fields)?, + }; + return Ok(RecordBatch::new_empty(schema)); + } + + let kv_format = table_info.table_config.get_kv_format()?; + let schema_getter = Arc::new(TableInfoSchemaGetter { + schema: Arc::new(table_info.get_schema().clone()), + }); + let read_context = KvRecordReadContext::new(kv_format, schema_getter); + + // The KV records payload may be a single batch or a sequence of batches. + // The server-side `LimitScanResponse` returns one batch in practice, but + // we walk the buffer defensively. + let bytes = Bytes::from(raw); + let mut builder = RowAppendRecordBatchBuilder::new(row_type)?; + let mut position = 0usize; + + while position < bytes.len() { + let kv_batch = KvRecordBatch::new(bytes.clone(), position); + let size = kv_batch.size_in_bytes().map_err(|e| Error::UnexpectedError { + message: format!("Invalid KvRecordBatch length: {e}"), + source: None, + })?; + + let records = kv_batch.records_unchecked(&read_context as &dyn KvReadContext)?; + let decoder = records.decoder_arc(); + for record in records { + let record = record.map_err(|e| Error::UnexpectedError { + message: format!("Failed to read KV record: {e}"), + source: None, + })?; + if let Some(row) = record.row(&*decoder) { + builder.append(&row)?; + } + } + + position = position.checked_add(size).ok_or_else(|| Error::UnexpectedError { + message: "KvRecordBatch position overflow".to_string(), + source: None, + })?; + } + + let full_batch = Arc::unwrap_or_clone(builder.build_arrow_record_batch()?); + + match projected_fields { + None => Ok(full_batch), + Some(fields) => { + let projected_schema = + ArrowReadContext::project_schema(full_arrow_schema, fields)?; + let columns: Vec<_> = fields + .iter() + .map(|&idx| full_batch.column(idx).clone()) + .collect(); + Ok(RecordBatch::try_new(projected_schema, columns)?) + } + } +} diff --git a/crates/fluss/src/client/table/mod.rs b/crates/fluss/src/client/table/mod.rs index 2d3d0171..95587f4f 100644 --- a/crates/fluss/src/client/table/mod.rs +++ b/crates/fluss/src/client/table/mod.rs @@ -24,6 +24,7 @@ use std::sync::Arc; pub const EARLIEST_OFFSET: i64 = -2; mod append; +mod batch_scanner; mod lookup; mod log_fetch_buffer; @@ -33,6 +34,7 @@ mod scanner; mod upsert; pub use append::{AppendWriter, TableAppend}; +pub use batch_scanner::BatchScanner; pub use lookup::{LookupResult, Lookuper, PrefixKeyLookuper, TableLookup, TablePrefixLookup}; pub use remote_log::{ DEFAULT_REMOTE_FILE_DOWNLOAD_THREAD_NUM, DEFAULT_SCANNER_REMOTE_LOG_PREFETCH_NUM, diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 00c5b238..027c69fc 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -16,6 +16,7 @@ // under the License. use crate::client::connection::FlussConnection; +use crate::client::table::batch_scanner::BatchScanner; use crate::client::credentials::SecurityTokenManager; use crate::client::metadata::Metadata; use crate::client::table::log_fetch_buffer::{ @@ -50,6 +51,8 @@ pub struct TableScan<'a> { metadata: Arc, /// Column indices to project. None means all columns, Some(vec) means only the specified columns (non-empty). projected_fields: Option>, + /// Optional row limit. When set, callers may construct a [`BatchScanner`] for a one-shot bounded scan. + limit: Option, } impl<'a> TableScan<'a> { @@ -59,9 +62,57 @@ impl<'a> TableScan<'a> { table_info, metadata, projected_fields: None, + limit: None, } } + /// Sets a row limit for the scan, enabling [`Self::create_batch_scanner`]. + /// + /// The limit must be positive. Callers configure a limit prior to + /// constructing a `BatchScanner` for a one-shot bounded read. + pub fn limit(mut self, n: i32) -> Result { + if n <= 0 { + return Err(Error::IllegalArgument { + message: format!("Scan limit must be positive, got {n}"), + }); + } + self.limit = Some(n); + Ok(self) + } + + /// Creates a `BatchScanner` that performs a single bounded scan of `table_bucket`. + /// + /// Requires a previously-configured limit via [`Self::limit`]. The scanner sends + /// a `LimitScanRequest` eagerly and exposes the resulting batch through + /// [`BatchScanner::poll_batch`]. + pub async fn create_batch_scanner( + self, + table_bucket: TableBucket, + ) -> Result { + let limit = self.limit.ok_or_else(|| Error::IllegalArgument { + message: "create_batch_scanner requires a limit configured via .limit(n)" + .to_string(), + })?; + if table_bucket.table_id() != self.table_info.table_id { + return Err(Error::IllegalArgument { + message: format!( + "Bucket table_id {} does not match scan table_id {}", + table_bucket.table_id(), + self.table_info.table_id + ), + }); + } + BatchScanner::new( + self.conn.get_connections(), + self.metadata.clone(), + self.table_info, + self.projected_fields, + table_bucket, + limit, + ) + .await + } + /// Projects the scan to only include specified columns by their indices. /// /// # Arguments