⚠ This page is served via a proxy. Original site: https://github.com
This service does not collect credentials or authentication data.
Skip to content

Conversation

@dayesouza
Copy link
Contributor

This pull request introduces a new streaming table abstraction to the storage layer, enabling efficient row-by-row access and transformation for both CSV and Parquet backends. The changes add a unified Table interface, implement CSV and Parquet streaming table classes, and update the provider modules to support these new interfaces and transformers. The overall design improves memory efficiency and extensibility for large dataset processing.

Table Abstraction and Streaming Interface

  • Added a new abstract base class Table in table.py to provide a unified streaming row-by-row interface with async iteration, write, and close methods. Supports row transformation and async context management.
  • Introduced the RowTransformer type for flexible row transformation, allowing both callable functions and classes (e.g., Pydantic models) to be used for row processing.

CSV Table Streaming Implementation

  • Added CSVTable in csv_table.py for streaming access to CSV tables, supporting async iteration, row transformation, row existence checks, and writing rows with automatic header management.
  • Updated CSVTableProvider to enforce FileStorage backend, import new streaming classes, and expose an open method for streaming table access with optional row transformation. [1] [2] [3]

Parquet Table Streaming Implementation

  • Added ParquetTable in parquet_table.py to simulate streaming for Parquet tables by loading DataFrames and yielding rows, with support for row transformation and batch writing.
  • Updated ParquetTableProvider to import new streaming classes and expose an open method for streaming table access with optional row transformation. [1] [2]

Provider Interface Unification

  • Updated TableProvider base class to include the new open method, standardizing streaming table access across different storage backends. [1] [2]
  • Updated module exports in tables/__init__.py to include the new Table class.

Supporting Changes

  • Added a get_path method to FileStorage for easier file path access needed by streaming table implementations.
  • Updated imports in workflow modules to use the new Table abstraction.

if isinstance(self._storage, FileStorage) and self._write_file is None:
file_path = self._storage.get_path(self._file_key)
file_path.parent.mkdir(parents=True, exist_ok=True)
self._write_file = Path.open(file_path, "w", encoding="utf-8", newline="")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be "a" for append mode?


logger.info("Workflow completed: create_base_text_units")
return WorkflowFunctionOutput(result=output)
return WorkflowFunctionOutput(result=None)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ooh, this is gonna be challenging. Don't worry about it for this PR, but we may want to return a sample set of rows (like 5)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants