This documentation page is also available as an interactive notebook. You can launch the notebook in
Kaggle or Colab, or download it for use with an IDE or local Jupyter installation, by clicking one of the
above links.
Build your own iterators to split documents, media, or any data into
rows.
Problem
You need to split data in a way that the built-in iterators
(DocumentSplitter, FrameIterator, AudioSplitter) don’t support —
like splitting by custom delimiters, extracting specific patterns, or
processing proprietary formats.
Solution
What’s in this recipe:
- Create a custom iterator by subclassing
ComponentIterator
- Define input/output schemas with type annotations
- Use the iterator with
pxt.create_view()
Setup
%pip install -qU pixeltable
import pixeltable as pxt
from pixeltable.iterators import ComponentIterator
import pixeltable.type_system as ts
from typing import Any
pxt.drop_dir('iterator_demo', force=True)
pxt.create_dir('iterator_demo')
Connected to Pixeltable database at: postgresql+psycopg://postgres:@/pixeltable?host=/Users/pjlb/.pixeltable/pgdata
Created directory ‘iterator_demo’.
<pixeltable.catalog.dir.Dir at 0x17f219990>
Example: sentence splitter iterator
This iterator splits text into sentences by splitting on ., !, ?.
import re
class SentenceSplitter(ComponentIterator):
"""Split text into sentences."""
def __init__(self, text: str):
# Split on sentence-ending punctuation
self.sentences = [s.strip() for s in re.split(r'[.!?]+', text) if s.strip()]
self.pos = 0
@classmethod
def input_schema(cls) -> dict[str, ts.ColumnType]:
# Define the input parameters and their Pixeltable types
return {
'text': ts.StringType(nullable=False)
}
@classmethod
def output_schema(cls, *args: Any, **kwargs: Any) -> tuple[dict[str, ts.ColumnType], list[str]]:
# Define output columns and which ones are "unstored" (computed on-the-fly)
return {
'sentence_idx': ts.IntType(),
'sentence': ts.StringType(),
}, [] # No unstored columns
def __next__(self) -> dict[str, Any]:
if self.pos >= len(self.sentences):
raise StopIteration
result = {
'sentence_idx': self.pos,
'sentence': self.sentences[self.pos],
}
self.pos += 1
return result
def close(self) -> None:
pass # No resources to release
def set_pos(self, pos: int) -> None:
self.pos = pos # Support resumable iteration
Create sample data
articles = pxt.create_table(
'iterator_demo.articles',
{'title': pxt.String, 'content': pxt.String}
)
articles.insert([
{
'title': 'AI News',
'content': 'AI is transforming industries. Machine learning models are getting better. The future looks bright!'
},
{
'title': 'Tech Update',
'content': 'New frameworks released today. Developers are excited? Performance improvements are significant.'
},
])
articles.collect()
Created table ‘articles’.
Inserting rows into `articles`: 2 rows [00:00, 250.91 rows/s]
Inserted 2 rows with 0 errors.
Use the iterator with create_view
# Create a view that expands each article into sentences
sentences_view = pxt.create_view(
'iterator_demo.sentences',
articles,
iterator=SentenceSplitter.create(text=articles.content)
)
sentences_view.collect()
Inserting rows into `sentences`: 6 rows [00:00, 4370.58 rows/s]
# The view inherits columns from the base table plus iterator outputs
sentences_view.select(
sentences_view.title,
sentences_view.sentence_idx,
sentences_view.sentence
).collect()
Example: sliding window iterator with parameters
class SlidingWindowSplitter(ComponentIterator):
"""Split text into overlapping windows of words."""
def __init__(self, text: str, window_size: int = 5, step: int = 2):
words = text.split()
self.windows = []
for i in range(0, len(words) - window_size + 1, step):
self.windows.append(' '.join(words[i:i + window_size]))
self.pos = 0
@classmethod
def input_schema(cls) -> dict[str, ts.ColumnType]:
return {
'text': ts.StringType(nullable=False),
'window_size': ts.IntType(), # Optional parameter
'step': ts.IntType(), # Optional parameter
}
@classmethod
def output_schema(cls, *args: Any, **kwargs: Any) -> tuple[dict[str, ts.ColumnType], list[str]]:
return {
'window_idx': ts.IntType(),
'window_text': ts.StringType(),
}, []
def __next__(self) -> dict[str, Any]:
if self.pos >= len(self.windows):
raise StopIteration
result = {'window_idx': self.pos, 'window_text': self.windows[self.pos]}
self.pos += 1
return result
def close(self) -> None:
pass
def set_pos(self, pos: int) -> None:
self.pos = pos
# Use with custom parameters
windows_view = pxt.create_view(
'iterator_demo.windows',
articles,
iterator=SlidingWindowSplitter.create(
text=articles.content,
window_size=4,
step=2
)
)
windows_view.select(
windows_view.title,
windows_view.window_idx,
windows_view.window_text
).collect()
Inserting rows into `windows`: 10 rows [00:00, 7230.31 rows/s]
Explanation
Iterator structure:
class MyIterator(ComponentIterator):
def __init__(self, input_col, param=default):
# Initialize state, prepare data to iterate
@classmethod
def input_schema(cls) -> dict[str, ts.ColumnType]:
# Map parameter names to Pixeltable types
return {'input_col': ts.StringType(nullable=False), 'param': ts.IntType()}
@classmethod
def output_schema(cls, *args, **kwargs) -> tuple[dict, list]:
# Return (output columns dict, list of unstored column names)
return {'col1': ts.IntType(), 'col2': ts.StringType()}, []
def __next__(self) -> dict:
# Return next row as dict, or raise StopIteration
def close(self) -> None:
# Release resources (file handles, etc.)
def set_pos(self, pos: int) -> None:
# Support resumable iteration
Key points:
- Use
ts.StringType(), ts.IntType(), ts.FloatType(),
ts.ImageType(), etc. for schemas
- Add
nullable=False for required inputs
- Unstored columns (second return value of
output_schema) are not
persisted
- Call with
Iterator.create(param=table.column) in create_view()
See also