DRY up batched KVStore reads with semaphore-based BatchingStore#876
DRY up batched KVStore reads with semaphore-based BatchingStore#876tnull wants to merge 1 commit intolightningdevkit:mainfrom
BatchingStore#876Conversation
Introduce `BatchingStore`, a `KVStore` wrapper that limits concurrent async I/O via a `tokio::sync::Semaphore`. During initialization the builder wraps the store in `BatchingStore` so all parallel reads share a single concurrency cap, rather than each reader maintaining its own `JoinSet`-based batch queue. Replace the duplicated ~75-line batching loops in `read_payments` and `read_pending_payments` with a generic `read_all_objects<T: Readable>` helper that spawns all reads into a `JoinSet` (relying on the store wrapper for throttling) and collects deserialized results. Both functions become thin one-line delegations. Co-Authored-By: HAL 9000
|
👋 Thanks for assigning @joostjager as a reviewer! |
|
🔔 1st Reminder Hey @joostjager! This PR has been waiting for your review. |
|
🔔 2nd Reminder Hey @joostjager! This PR has been waiting for your review. |
|
LGTM |
|
This now needs a rebase, there's a merge conflict. |
|
🔔 3rd Reminder Hey @joostjager! This PR has been waiting for your review. |
joostjager
left a comment
There was a problem hiding this comment.
I am not sure a generic client-side read throttle is the right abstraction here.
I think the motivation is mainly VSS-specific? For local stores such as SQLite, this mostly limits task fan-out rather than real backend concurrency. Perhaps it should be implemented only in the VSS client. And perhaps longer term, a multi-key read would be helpful in the VSS protocol?
| )); | ||
|
|
||
| let peer_storage_key = keys_manager.get_peer_storage_key(); | ||
| let monitor_reader = Arc::new(AsyncPersister::new( |
There was a problem hiding this comment.
Doesn't this one need to go through the batch store too?
| debug_assert!(set.len() <= BATCH_SIZE); | ||
| } | ||
| for key in keys { | ||
| set.spawn(KVStore::read(kv_store, primary_namespace, secondary_namespace, &key)); |
There was a problem hiding this comment.
It is possible that this now creates a large amount of (blocked) tasks?
Introduce
BatchingStore, aKVStorewrapper that limits concurrent async I/O via atokio::sync::Semaphore. During initialization the builder wraps the store inBatchingStoreso all parallel reads share a single concurrency cap, rather than each reader maintaining its ownJoinSet-based batch queue.Replace the duplicated ~75-line batching loops in
read_paymentsandread_pending_paymentswith a genericread_all_objects<T: Readable>helper that spawns all reads into aJoinSet(relying on the store wrapper for throttling) and collects deserialized results. Both functions become thin one-line delegations.Co-Authored-By: HAL 9000