From cab0b9bace9ea93d896379020291a90a53955e02 Mon Sep 17 00:00:00 2001 From: benthecarman Date: Mon, 26 Jan 2026 08:31:17 -0600 Subject: [PATCH 1/2] Add PaginatedKVStore traits upstreamed from ldk-server Allows for a paginated KV store for more efficient listing of keys so you don't need to fetch all at once. Uses monotonic counter or timestamp to track the order of keys and allow for pagination. The traits are largely just copy-pasted from ldk-server. Adds some basic tests that were generated using claude code. --- lightning/src/util/persist.rs | 275 ++++++++++++++++++++++++++++++- lightning/src/util/test_utils.rs | 119 ++++++++++++- 2 files changed, 392 insertions(+), 2 deletions(-) diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index 2e1e8805d0a..ad3fc169094 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -347,6 +347,205 @@ pub trait KVStore { ) -> impl Future, io::Error>> + 'static + MaybeSend; } +/// An opaque token used for paginated listing operations. +/// +/// This token should be treated as an opaque value by callers. Pass the token returned from +/// one `list_paginated` call to the next call to continue pagination. The internal format +/// is implementation-defined and may change between versions. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct PageToken(pub String); + +/// Represents the response from a paginated `list` operation. +/// +/// Contains the list of keys and a token for retrieving the next page of results. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct PaginatedListResponse { + /// A vector of keys, ordered from most recently created to least recently created. + pub keys: Vec, + + /// A token that can be passed to the next call to continue pagination. + /// + /// Is `None` if there are no more pages to retrieve. + pub next_page_token: Option, +} + +/// Provides an interface that allows storage and retrieval of persisted values that are associated +/// with given keys, with support for pagination. +/// +/// In order to avoid collisions, the key space is segmented based on the given `primary_namespace`s +/// and `secondary_namespace`s. Implementations of this trait are free to handle them in different +/// ways, as long as per-namespace key uniqueness is asserted. +/// +/// Keys and namespaces are required to be valid ASCII strings in the range of +/// [`KVSTORE_NAMESPACE_KEY_ALPHABET`] and no longer than [`KVSTORE_NAMESPACE_KEY_MAX_LEN`]. Empty +/// primary namespaces and secondary namespaces (`""`) are considered valid; however, if +/// `primary_namespace` is empty, `secondary_namespace` must also be empty. This means that concerns +/// should always be separated by primary namespace first, before secondary namespaces are used. +/// While the number of primary namespaces will be relatively small and determined at compile time, +/// there may be many secondary namespaces per primary namespace. Note that per-namespace uniqueness +/// needs to also hold for keys *and* namespaces in any given namespace, i.e., conflicts between keys +/// and equally named primary or secondary namespaces must be avoided. +/// +/// **Note:** This trait extends the functionality of [`KVStoreSync`] by adding support for +/// paginated listing of keys in creation order. This is useful when dealing with a large number +/// of keys that cannot be efficiently retrieved all at once. +/// +/// For an asynchronous version of this trait, see [`PaginatedKVStore`]. +pub trait PaginatedKVStoreSync: KVStoreSync { + /// Returns a paginated list of keys that are stored under the given `secondary_namespace` in + /// `primary_namespace`, ordered from most recently created to least recently created. + /// + /// Implementations must return keys in reverse creation order (newest first). How creation + /// order is tracked is implementation-defined (e.g., storing creation timestamps, using an + /// incrementing ID, or another mechanism). Creation order (not last-updated order) is used + /// to prevent race conditions during pagination: if keys were ordered by update time, a key + /// updated mid-pagination could shift position, causing it to be skipped or returned twice + /// across pages. + /// + /// If `page_token` is provided, listing continues from where the previous page left off. + /// If `None`, listing starts from the most recently created entry. The `next_page_token` + /// in the returned [`PaginatedListResponse`] can be passed to subsequent calls to fetch + /// the next page. + /// + /// Implementations must generate a [`PageToken`] that encodes enough information to resume + /// listing from the correct position. The token should encode the creation timestamp (or + /// sequence number) and key name of the last returned entry. Tokens must remain valid across + /// multiple calls within a reasonable timeframe. If the entry referenced by a token has been + /// deleted, implementations should resume from the next valid position rather than failing. + /// Tokens are scoped to a specific `(primary_namespace, secondary_namespace)` pair and should + /// not be used across different namespace pairs. + /// + /// Returns an empty list if `primary_namespace` or `secondary_namespace` is unknown or if + /// there are no more keys to return. + fn list_paginated( + &self, primary_namespace: &str, secondary_namespace: &str, page_token: Option, + ) -> Result; +} + +/// A wrapper around a [`PaginatedKVStoreSync`] that implements the [`PaginatedKVStore`] trait. +/// It is not necessary to use this type directly. +#[derive(Clone)] +pub struct PaginatedKVStoreSyncWrapper(pub K) +where + K::Target: PaginatedKVStoreSync; + +impl Deref for PaginatedKVStoreSyncWrapper +where + K::Target: PaginatedKVStoreSync, +{ + type Target = Self; + fn deref(&self) -> &Self::Target { + self + } +} + +/// This is not exported to bindings users as async is only supported in Rust. +impl KVStore for PaginatedKVStoreSyncWrapper +where + K::Target: PaginatedKVStoreSync, +{ + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> impl Future, io::Error>> + 'static + MaybeSend { + let res = self.0.read(primary_namespace, secondary_namespace, key); + + async move { res } + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> impl Future> + 'static + MaybeSend { + let res = self.0.write(primary_namespace, secondary_namespace, key, buf); + + async move { res } + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> impl Future> + 'static + MaybeSend { + let res = self.0.remove(primary_namespace, secondary_namespace, key, lazy); + + async move { res } + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> impl Future, io::Error>> + 'static + MaybeSend { + let res = self.0.list(primary_namespace, secondary_namespace); + + async move { res } + } +} + +/// This is not exported to bindings users as async is only supported in Rust. +impl PaginatedKVStore for PaginatedKVStoreSyncWrapper +where + K::Target: PaginatedKVStoreSync, +{ + fn list_paginated( + &self, primary_namespace: &str, secondary_namespace: &str, page_token: Option, + ) -> impl Future> + 'static + MaybeSend { + let res = self.0.list_paginated(primary_namespace, secondary_namespace, page_token); + + async move { res } + } +} + +/// Provides an interface that allows storage and retrieval of persisted values that are associated +/// with given keys, with support for pagination. +/// +/// In order to avoid collisions, the key space is segmented based on the given `primary_namespace`s +/// and `secondary_namespace`s. Implementations of this trait are free to handle them in different +/// ways, as long as per-namespace key uniqueness is asserted. +/// +/// Keys and namespaces are required to be valid ASCII strings in the range of +/// [`KVSTORE_NAMESPACE_KEY_ALPHABET`] and no longer than [`KVSTORE_NAMESPACE_KEY_MAX_LEN`]. Empty +/// primary namespaces and secondary namespaces (`""`) are considered valid; however, if +/// `primary_namespace` is empty, `secondary_namespace` must also be empty. This means that concerns +/// should always be separated by primary namespace first, before secondary namespaces are used. +/// While the number of primary namespaces will be relatively small and determined at compile time, +/// there may be many secondary namespaces per primary namespace. Note that per-namespace uniqueness +/// needs to also hold for keys *and* namespaces in any given namespace, i.e., conflicts between keys +/// and equally named primary or secondary namespaces must be avoided. +/// +/// **Note:** This trait extends the functionality of [`KVStore`] by adding support for +/// paginated listing of keys in creation order. This is useful when dealing with a large number +/// of keys that cannot be efficiently retrieved all at once. +/// +/// For a synchronous version of this trait, see [`PaginatedKVStoreSync`]. +/// +/// This is not exported to bindings users as async is only supported in Rust. +pub trait PaginatedKVStore: KVStore { + /// Returns a paginated list of keys that are stored under the given `secondary_namespace` in + /// `primary_namespace`, ordered from most recently created to least recently created. + /// + /// Implementations must return keys in reverse creation order (newest first). How creation + /// order is tracked is implementation-defined (e.g., storing creation timestamps, using an + /// incrementing ID, or another mechanism). Creation order (not last-updated order) is used + /// to prevent race conditions during pagination: if keys were ordered by update time, a key + /// updated mid-pagination could shift position, causing it to be skipped or returned twice + /// across pages. + /// + /// If `page_token` is provided, listing continues from where the previous page left off. + /// If `None`, listing starts from the most recently created entry. The `next_page_token` + /// in the returned [`PaginatedListResponse`] can be passed to subsequent calls to fetch + /// the next page. + /// + /// Implementations must generate a [`PageToken`] that encodes enough information to resume + /// listing from the correct position. The token should encode the creation timestamp (or + /// sequence number) and key name of the last returned entry. Tokens must remain valid across + /// multiple calls within a reasonable timeframe. If the entry referenced by a token has been + /// deleted, implementations should resume from the next valid position rather than failing. + /// Tokens are scoped to a specific `(primary_namespace, secondary_namespace)` pair and should + /// not be used across different namespace pairs. + /// + /// Returns an empty list if `primary_namespace` or `secondary_namespace` is unknown or if + /// there are no more keys to return. + fn list_paginated( + &self, primary_namespace: &str, secondary_namespace: &str, page_token: Option, + ) -> impl Future> + 'static + MaybeSend; +} + /// Provides additional interface methods that are required for [`KVStore`]-to-[`KVStore`] /// data migration. pub trait MigratableKVStore: KVStoreSync { @@ -1565,7 +1764,7 @@ mod tests { use crate::ln::msgs::BaseMessageHandler; use crate::sync::Arc; use crate::util::test_channel_signer::TestChannelSigner; - use crate::util::test_utils::{self, TestStore}; + use crate::util::test_utils::{self, TestPaginatedStore, TestStore}; use bitcoin::hashes::hex::FromHex; use core::cmp; @@ -1975,4 +2174,78 @@ mod tests { let store: Arc = Arc::new(TestStore::new(false)); assert!(persist_fn::<_, TestChannelSigner>(Arc::clone(&store))); } + + #[test] + fn paginated_store_basic_operations() { + let store = TestPaginatedStore::new(10); + + // Write some data + store.write("ns1", "ns2", "key1", vec![1, 2, 3]).unwrap(); + store.write("ns1", "ns2", "key2", vec![4, 5, 6]).unwrap(); + + // Read it back + assert_eq!(KVStoreSync::read(&store, "ns1", "ns2", "key1").unwrap(), vec![1, 2, 3]); + assert_eq!(KVStoreSync::read(&store, "ns1", "ns2", "key2").unwrap(), vec![4, 5, 6]); + + // List should return keys in descending order + let response = store.list_paginated("ns1", "ns2", None).unwrap(); + assert_eq!(response.keys, vec!["key2", "key1"]); + assert!(response.next_page_token.is_none()); + + // Remove a key + KVStoreSync::remove(&store, "ns1", "ns2", "key1", false).unwrap(); + assert!(KVStoreSync::read(&store, "ns1", "ns2", "key1").is_err()); + } + + #[test] + fn paginated_store_pagination() { + let store = TestPaginatedStore::new(2); + + // Write 5 items with different order values + for i in 0..5i64 { + store.write("ns", "", &format!("key{i}"), vec![i as u8]).unwrap(); + } + + // First page should have 2 items (most recently created first: key4, key3) + let page1 = store.list_paginated("ns", "", None).unwrap(); + assert_eq!(page1.keys.len(), 2); + assert_eq!(page1.keys, vec!["key4", "key3"]); + assert!(page1.next_page_token.is_some()); + + // Second page + let page2 = store.list_paginated("ns", "", page1.next_page_token).unwrap(); + assert_eq!(page2.keys.len(), 2); + assert_eq!(page2.keys, vec!["key2", "key1"]); + assert!(page2.next_page_token.is_some()); + + // Third page (last item) + let page3 = store.list_paginated("ns", "", page2.next_page_token).unwrap(); + assert_eq!(page3.keys.len(), 1); + assert_eq!(page3.keys, vec!["key0"]); + assert!(page3.next_page_token.is_none()); + } + + #[test] + fn paginated_store_update_preserves_order() { + let store = TestPaginatedStore::new(10); + + // Write items with specific order values + store.write("ns", "", "key1", vec![1]).unwrap(); + store.write("ns", "", "key2", vec![2]).unwrap(); + store.write("ns", "", "key3", vec![3]).unwrap(); + + // Verify initial order (newest first) + let response = store.list_paginated("ns", "", None).unwrap(); + assert_eq!(response.keys, vec!["key3", "key2", "key1"]); + + // Update key1 with a new order value that would put it first if used + store.write("ns", "", "key1", vec![1, 1]).unwrap(); + + // Verify data was updated + assert_eq!(KVStoreSync::read(&store, "ns", "", "key1").unwrap(), vec![1, 1]); + + // Verify order is unchanged - creation order should have been preserved + let response = store.list_paginated("ns", "", None).unwrap(); + assert_eq!(response.keys, vec!["key3", "key2", "key1"]); + } } diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 34f5d5fe36e..de04054861f 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -51,6 +51,7 @@ use crate::sign::{ChannelSigner, PeerStorageKey}; use crate::sync::RwLock; use crate::types::features::{ChannelFeatures, InitFeatures, NodeFeatures}; use crate::util::async_poll::MaybeSend; +use crate::util::atomic_counter::AtomicCounter; use crate::util::config::UserConfig; use crate::util::dyn_signer::{ DynKeysInterface, DynKeysInterfaceTrait, DynPhantomKeysInterface, DynSigner, @@ -58,7 +59,7 @@ use crate::util::dyn_signer::{ use crate::util::logger::{Logger, Record}; #[cfg(feature = "std")] use crate::util::mut_global::MutGlobal; -use crate::util::persist::{KVStore, KVStoreSync, MonitorName}; +use crate::util::persist::{KVStore, KVStoreSync, MonitorName, PageToken, PaginatedListResponse}; use crate::util::ser::{Readable, ReadableArgs, Writeable, Writer}; use crate::util::test_channel_signer::{EnforcementState, TestChannelSigner}; use crate::util::wakers::Notifier; @@ -1124,6 +1125,122 @@ impl KVStoreSync for TestStore { unsafe impl Sync for TestStore {} unsafe impl Send for TestStore {} +/// A simple in-memory implementation of [`PaginatedKVStoreSync`] for testing. +/// +/// [`PaginatedKVStoreSync`]: crate::util::persist::PaginatedKVStoreSync +pub struct TestPaginatedStore { + data: Mutex)>>, + page_size: usize, + time_counter: AtomicCounter, +} + +impl TestPaginatedStore { + /// Creates a new `TestPaginatedStore` with the given page size. + pub fn new(page_size: usize) -> Self { + Self { data: Mutex::new(new_hash_map()), page_size, time_counter: AtomicCounter::new() } + } +} + +impl KVStoreSync for TestPaginatedStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Result, io::Error> { + let data = self.data.lock().unwrap(); + data.get(&(primary_namespace.to_string(), secondary_namespace.to_string(), key.to_string())) + .map(|(_, v)| v.clone()) + .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "key not found")) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> Result<(), io::Error> { + let mut data = self.data.lock().unwrap(); + let order = self.time_counter.next() as i64; + let key_tuple = + (primary_namespace.to_string(), secondary_namespace.to_string(), key.to_string()); + // Only use order for new entries; preserve existing order on updates + let order_to_use = + data.get(&key_tuple).map(|(existing_order, _)| *existing_order).unwrap_or(order); + data.insert(key_tuple, (order_to_use, buf)); + Ok(()) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool, + ) -> Result<(), io::Error> { + let mut data = self.data.lock().unwrap(); + data.remove(&( + primary_namespace.to_string(), + secondary_namespace.to_string(), + key.to_string(), + )); + Ok(()) + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> Result, io::Error> { + let mut all_keys = Vec::new(); + let mut page_token = None; + loop { + let response = crate::util::persist::PaginatedKVStoreSync::list_paginated( + self, + primary_namespace, + secondary_namespace, + page_token, + )?; + all_keys.extend(response.keys); + match response.next_page_token { + Some(token) => page_token = Some(token), + None => break, + } + } + Ok(all_keys) + } +} + +impl crate::util::persist::PaginatedKVStoreSync for TestPaginatedStore { + fn list_paginated( + &self, primary_namespace: &str, secondary_namespace: &str, page_token: Option, + ) -> Result { + let data = self.data.lock().unwrap(); + let mut entries: Vec<_> = data + .iter() + .filter(|((pn, sn, _), _)| pn == primary_namespace && sn == secondary_namespace) + .map(|((_, _, k), (t, _))| (k.clone(), *t)) + .collect(); + + // Sort by time descending, then by key + entries.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0))); + + // Apply pagination: find the first entry AFTER the given key in sort order. + // This implementation uses the last key as the page token. + let start_idx = if let Some(PageToken(ref last_key)) = page_token { + // Find the position of this key and start after it + entries.iter().position(|(k, _)| k == last_key).map(|pos| pos + 1).unwrap_or(0) + } else { + 0 + }; + + let page_entries: Vec<_> = + entries.into_iter().skip(start_idx).take(self.page_size).collect(); + + let next_page_token = if page_entries.len() == self.page_size { + page_entries.last().map(|(k, _)| PageToken(k.clone())) + } else { + None + }; + + Ok(PaginatedListResponse { + keys: page_entries.into_iter().map(|(k, _)| k).collect(), + next_page_token, + }) + } +} + +unsafe impl Sync for TestPaginatedStore {} +unsafe impl Send for TestPaginatedStore {} + pub struct TestBroadcaster { pub txn_broadcasted: Mutex>, pub blocks: Arc>>, From 3a6058882e825628f558406aa4fc07350da60e69 Mon Sep 17 00:00:00 2001 From: benthecarman Date: Wed, 28 Jan 2026 13:04:21 -0600 Subject: [PATCH 2/2] Implement PaginatedKVStore for FilesystemStore Test created with claude code --- lightning-persister/src/fs_store.rs | 178 +++++++++++++++++++++++++++- 1 file changed, 176 insertions(+), 2 deletions(-) diff --git a/lightning-persister/src/fs_store.rs b/lightning-persister/src/fs_store.rs index 73c24dc6fc0..e9b7cc8d446 100644 --- a/lightning-persister/src/fs_store.rs +++ b/lightning-persister/src/fs_store.rs @@ -2,7 +2,9 @@ use crate::utils::{check_namespace_key_validity, is_valid_kvstore_str}; use lightning::types::string::PrintableString; -use lightning::util::persist::{KVStoreSync, MigratableKVStore}; +use lightning::util::persist::{ + KVStoreSync, MigratableKVStore, PageToken, PaginatedKVStoreSync, PaginatedListResponse, +}; use std::collections::HashMap; use std::fs; @@ -10,11 +12,12 @@ use std::io::{Read, Write}; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, RwLock}; +use std::time::SystemTime; #[cfg(feature = "tokio")] use core::future::Future; #[cfg(feature = "tokio")] -use lightning::util::persist::KVStore; +use lightning::util::persist::{KVStore, PaginatedKVStore}; #[cfg(target_os = "windows")] use {std::ffi::OsStr, std::os::windows::ffi::OsStrExt}; @@ -39,6 +42,9 @@ fn path_to_windows_str>(path: &T) -> Vec { // a consistent view and error out. const LIST_DIR_CONSISTENCY_RETRIES: usize = 10; +// The default page size for paginated list operations. +const PAGINATED_LIST_DEFAULT_PAGE_SIZE: usize = 50; + struct FilesystemStoreInner { data_dir: PathBuf, tmp_file_counter: AtomicUsize, @@ -148,6 +154,22 @@ impl KVStoreSync for FilesystemStore { } } +impl PaginatedKVStoreSync for FilesystemStore { + fn list_paginated( + &self, primary_namespace: &str, secondary_namespace: &str, page_token: Option, + ) -> Result { + let path = self.inner.get_checked_dest_file_path( + primary_namespace, + secondary_namespace, + None, + "list_paginated", + )?; + // Extract the last key from the page token for internal use + let last_key = page_token.map(|t| t.0); + self.inner.list_paginated(path, last_key, PAGINATED_LIST_DEFAULT_PAGE_SIZE) + } +} + impl FilesystemStoreInner { fn get_inner_lock_ref(&self, path: PathBuf) -> Arc> { let mut outer_lock = self.locks.lock().unwrap(); @@ -456,6 +478,77 @@ impl FilesystemStoreInner { Ok(keys) } + + fn list_paginated( + &self, prefixed_dest: PathBuf, last_key: Option, page_size: usize, + ) -> lightning::io::Result { + if !Path::new(&prefixed_dest).exists() { + return Ok(PaginatedListResponse { keys: Vec::new(), next_page_token: None }); + } + + let mut entries: Vec<(String, SystemTime)> = Vec::with_capacity(page_size); + let mut retries = LIST_DIR_CONSISTENCY_RETRIES; + + 'retry_list: loop { + entries.clear(); + 'skip_entry: for entry in fs::read_dir(&prefixed_dest)? { + let entry = entry?; + let p = entry.path(); + + let res = dir_entry_is_key(&entry); + match res { + Ok(true) => { + let key = get_key_from_dir_entry_path(&p, &prefixed_dest)?; + // Get file creation time, falling back to modified time if unavailable. + let metadata = entry.metadata()?; + let created_time = metadata + .created() + .or_else(|_| metadata.modified()) + .unwrap_or(SystemTime::UNIX_EPOCH); + entries.push((key, created_time)); + }, + Ok(false) => { + continue 'skip_entry; + }, + Err(e) => { + if e.kind() == lightning::io::ErrorKind::NotFound && retries > 0 { + retries -= 1; + continue 'retry_list; + } else { + return Err(e.into()); + } + }, + } + } + break 'retry_list; + } + + if entries.is_empty() { + return Ok(PaginatedListResponse { keys: Vec::new(), next_page_token: None }); + } + + // Sort by creation time descending (newest first), then by key name for stability. + entries.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0))); + + // Apply pagination: find the first entry AFTER the given key in sort order. + let start_idx = if let Some(ref key) = last_key { + // Find the position of this key and start after it + entries.iter().position(|(k, _)| k == key).map(|pos| pos + 1).unwrap_or(0) + } else { + 0 + }; + + let page_entries: Vec = + entries.into_iter().skip(start_idx).take(page_size).map(|(k, _)| k).collect(); + + let next_page_token = if page_entries.len() == page_size { + page_entries.last().cloned().map(PageToken) + } else { + None + }; + + Ok(PaginatedListResponse { keys: page_entries, next_page_token }) + } } #[cfg(feature = "tokio")] @@ -544,6 +637,38 @@ impl KVStore for FilesystemStore { } } +#[cfg(feature = "tokio")] +impl PaginatedKVStore for FilesystemStore { + fn list_paginated( + &self, primary_namespace: &str, secondary_namespace: &str, page_token: Option, + ) -> impl Future> + 'static + Send + { + let this = Arc::clone(&self.inner); + + let path = this.get_checked_dest_file_path( + primary_namespace, + secondary_namespace, + None, + "list_paginated", + ); + + // Extract the last key from the page token for internal use + let last_key = page_token.map(|t| t.0); + + async move { + let path = match path { + Ok(path) => path, + Err(e) => return Err(e), + }; + tokio::task::spawn_blocking(move || { + this.list_paginated(path, last_key, PAGINATED_LIST_DEFAULT_PAGE_SIZE) + }) + .await + .unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))) + } + } +} + fn dir_entry_is_key(dir_entry: &fs::DirEntry) -> Result { let p = dir_entry.path(); if let Some(ext) = p.extension() { @@ -792,6 +917,55 @@ mod tests { assert_eq!(listed_keys.len(), 0); } + #[test] + fn test_list_paginated() { + let mut temp_path = std::env::temp_dir(); + temp_path.push("test_list_paginated"); + let fs_store = FilesystemStore::new(temp_path); + + let primary = "testspace"; + let secondary = "testsubspace"; + + // Write multiple keys with small delays to ensure different creation times + let keys = ["key_a", "key_b", "key_c", "key_d", "key_e"]; + for key in &keys { + KVStoreSync::write(&fs_store, primary, secondary, key, vec![42u8]).unwrap(); + // Small delay to ensure different creation times + std::thread::sleep(std::time::Duration::from_millis(10)); + } + + // Test that all keys are returned (no pagination cursor) + let response = + PaginatedKVStoreSync::list_paginated(&fs_store, primary, secondary, None).unwrap(); + assert_eq!(response.keys.len(), 5); + // Keys should be ordered by creation time descending (newest first) + // The last written key should be first + assert_eq!(response.keys[0], "key_e"); + assert_eq!(response.keys[4], "key_a"); + // No more pages since we have less than page_size (50) + assert!(response.next_page_token.is_none()); + + // Test pagination with a cursor + // First, get the first page starting from the beginning + let response = + PaginatedKVStoreSync::list_paginated(&fs_store, primary, secondary, None).unwrap(); + // Use one of the middle keys as a cursor to get remaining keys + let cursor = PageToken(response.keys[2].clone()); // Should be "key_c" + let response2 = + PaginatedKVStoreSync::list_paginated(&fs_store, primary, secondary, Some(cursor)) + .unwrap(); + // Should return the keys after "key_c" in the sorted order + assert_eq!(response2.keys.len(), 2); + assert_eq!(response2.keys[0], "key_b"); + assert_eq!(response2.keys[1], "key_a"); + + // Test with non-existent namespace returns empty + let response = + PaginatedKVStoreSync::list_paginated(&fs_store, "nonexistent", "", None).unwrap(); + assert!(response.keys.is_empty()); + assert!(response.next_page_token.is_none()); + } + #[test] fn test_data_migration() { let mut source_temp_path = std::env::temp_dir();