⚠ This page is served via a proxy. Original site: https://github.com
This service does not collect credentials or authentication data.
Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 176 additions & 2 deletions lightning-persister/src/fs_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,22 @@
use crate::utils::{check_namespace_key_validity, is_valid_kvstore_str};

use lightning::types::string::PrintableString;
use lightning::util::persist::{KVStoreSync, MigratableKVStore};
use lightning::util::persist::{
KVStoreSync, MigratableKVStore, PageToken, PaginatedKVStoreSync, PaginatedListResponse,
};

use std::collections::HashMap;
use std::fs;
use std::io::{Read, Write};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::time::SystemTime;

#[cfg(feature = "tokio")]
use core::future::Future;
#[cfg(feature = "tokio")]
use lightning::util::persist::KVStore;
use lightning::util::persist::{KVStore, PaginatedKVStore};

#[cfg(target_os = "windows")]
use {std::ffi::OsStr, std::os::windows::ffi::OsStrExt};
Expand All @@ -39,6 +42,9 @@ fn path_to_windows_str<T: AsRef<OsStr>>(path: &T) -> Vec<u16> {
// a consistent view and error out.
const LIST_DIR_CONSISTENCY_RETRIES: usize = 10;

// The default page size for paginated list operations.
const PAGINATED_LIST_DEFAULT_PAGE_SIZE: usize = 50;

struct FilesystemStoreInner {
data_dir: PathBuf,
tmp_file_counter: AtomicUsize,
Expand Down Expand Up @@ -148,6 +154,22 @@ impl KVStoreSync for FilesystemStore {
}
}

impl PaginatedKVStoreSync for FilesystemStore {
fn list_paginated(
&self, primary_namespace: &str, secondary_namespace: &str, page_token: Option<PageToken>,
) -> Result<PaginatedListResponse, lightning::io::Error> {
let path = self.inner.get_checked_dest_file_path(
primary_namespace,
secondary_namespace,
None,
"list_paginated",
)?;
// Extract the last key from the page token for internal use
let last_key = page_token.map(|t| t.0);
self.inner.list_paginated(path, last_key, PAGINATED_LIST_DEFAULT_PAGE_SIZE)
}
}

impl FilesystemStoreInner {
fn get_inner_lock_ref(&self, path: PathBuf) -> Arc<RwLock<u64>> {
let mut outer_lock = self.locks.lock().unwrap();
Expand Down Expand Up @@ -456,6 +478,77 @@ impl FilesystemStoreInner {

Ok(keys)
}

fn list_paginated(
&self, prefixed_dest: PathBuf, last_key: Option<String>, page_size: usize,
) -> lightning::io::Result<PaginatedListResponse> {
if !Path::new(&prefixed_dest).exists() {
return Ok(PaginatedListResponse { keys: Vec::new(), next_page_token: None });
}

let mut entries: Vec<(String, SystemTime)> = Vec::with_capacity(page_size);
let mut retries = LIST_DIR_CONSISTENCY_RETRIES;

'retry_list: loop {
entries.clear();
'skip_entry: for entry in fs::read_dir(&prefixed_dest)? {
let entry = entry?;
let p = entry.path();

let res = dir_entry_is_key(&entry);
match res {
Ok(true) => {
let key = get_key_from_dir_entry_path(&p, &prefixed_dest)?;
// Get file creation time, falling back to modified time if unavailable.
let metadata = entry.metadata()?;
let created_time = metadata
.created()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe in most filesystems the created time is actually gonna be the last-update time because we overwrite-on-update. Thus, I don't think its possible to implement the pagination for the filesystem store (without including additional metadata in the filenames).

Sorry I had earlier assumed this would work but I dont think so.

.or_else(|_| metadata.modified())
.unwrap_or(SystemTime::UNIX_EPOCH);
entries.push((key, created_time));
},
Ok(false) => {
continue 'skip_entry;
},
Err(e) => {
if e.kind() == lightning::io::ErrorKind::NotFound && retries > 0 {
retries -= 1;
continue 'retry_list;
} else {
return Err(e.into());
}
},
}
}
break 'retry_list;
}

if entries.is_empty() {
return Ok(PaginatedListResponse { keys: Vec::new(), next_page_token: None });
}

// Sort by creation time descending (newest first), then by key name for stability.
entries.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));

// Apply pagination: find the first entry AFTER the given key in sort order.
let start_idx = if let Some(ref key) = last_key {
// Find the position of this key and start after it
entries.iter().position(|(k, _)| k == key).map(|pos| pos + 1).unwrap_or(0)
} else {
0
};

let page_entries: Vec<String> =
entries.into_iter().skip(start_idx).take(page_size).map(|(k, _)| k).collect();

let next_page_token = if page_entries.len() == page_size {
page_entries.last().cloned().map(PageToken)
} else {
None
};

Ok(PaginatedListResponse { keys: page_entries, next_page_token })
}
}

#[cfg(feature = "tokio")]
Expand Down Expand Up @@ -544,6 +637,38 @@ impl KVStore for FilesystemStore {
}
}

#[cfg(feature = "tokio")]
impl PaginatedKVStore for FilesystemStore {
fn list_paginated(
&self, primary_namespace: &str, secondary_namespace: &str, page_token: Option<PageToken>,
) -> impl Future<Output = Result<PaginatedListResponse, lightning::io::Error>> + 'static + Send
{
let this = Arc::clone(&self.inner);

let path = this.get_checked_dest_file_path(
primary_namespace,
secondary_namespace,
None,
"list_paginated",
);

// Extract the last key from the page token for internal use
let last_key = page_token.map(|t| t.0);

async move {
let path = match path {
Ok(path) => path,
Err(e) => return Err(e),
};
tokio::task::spawn_blocking(move || {
this.list_paginated(path, last_key, PAGINATED_LIST_DEFAULT_PAGE_SIZE)
})
.await
.unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)))
}
}
}

fn dir_entry_is_key(dir_entry: &fs::DirEntry) -> Result<bool, lightning::io::Error> {
let p = dir_entry.path();
if let Some(ext) = p.extension() {
Expand Down Expand Up @@ -792,6 +917,55 @@ mod tests {
assert_eq!(listed_keys.len(), 0);
}

#[test]
fn test_list_paginated() {
let mut temp_path = std::env::temp_dir();
temp_path.push("test_list_paginated");
let fs_store = FilesystemStore::new(temp_path);

let primary = "testspace";
let secondary = "testsubspace";

// Write multiple keys with small delays to ensure different creation times
let keys = ["key_a", "key_b", "key_c", "key_d", "key_e"];
for key in &keys {
KVStoreSync::write(&fs_store, primary, secondary, key, vec![42u8]).unwrap();
// Small delay to ensure different creation times
std::thread::sleep(std::time::Duration::from_millis(10));
}

// Test that all keys are returned (no pagination cursor)
let response =
PaginatedKVStoreSync::list_paginated(&fs_store, primary, secondary, None).unwrap();
assert_eq!(response.keys.len(), 5);
// Keys should be ordered by creation time descending (newest first)
// The last written key should be first
assert_eq!(response.keys[0], "key_e");
assert_eq!(response.keys[4], "key_a");
Comment on lines +943 to +944
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can assert is_sorted

// No more pages since we have less than page_size (50)
assert!(response.next_page_token.is_none());

// Test pagination with a cursor
// First, get the first page starting from the beginning
let response =
PaginatedKVStoreSync::list_paginated(&fs_store, primary, secondary, None).unwrap();
// Use one of the middle keys as a cursor to get remaining keys
let cursor = PageToken(response.keys[2].clone()); // Should be "key_c"
let response2 =
PaginatedKVStoreSync::list_paginated(&fs_store, primary, secondary, Some(cursor))
.unwrap();
// Should return the keys after "key_c" in the sorted order
assert_eq!(response2.keys.len(), 2);
assert_eq!(response2.keys[0], "key_b");
assert_eq!(response2.keys[1], "key_a");

// Test with non-existent namespace returns empty
let response =
PaginatedKVStoreSync::list_paginated(&fs_store, "nonexistent", "", None).unwrap();
assert!(response.keys.is_empty());
assert!(response.next_page_token.is_none());
}

#[test]
fn test_data_migration() {
let mut source_temp_path = std::env::temp_dir();
Expand Down
Loading