Skip to main content
Open in Kaggle  Open in Colab  Download Notebook
This documentation page is also available as an interactive notebook. You can launch the notebook in Kaggle or Colab, or download it for use with an IDE or local Jupyter installation, by clicking one of the above links.
Build your own iterators to split documents, media, or any data into rows.

Problem

You need to split data in a way that the built-in iterators (DocumentSplitter, FrameIterator, AudioSplitter) don’t support — like splitting by custom delimiters, extracting specific patterns, or processing proprietary formats.

Solution

What’s in this recipe:
  • Create a custom iterator by subclassing ComponentIterator
  • Define input/output schemas with type annotations
  • Use the iterator with pxt.create_view()

Setup

%pip install -qU pixeltable
import pixeltable as pxt
from pixeltable.iterators import ComponentIterator
import pixeltable.type_system as ts
from typing import Any

pxt.drop_dir('iterator_demo', force=True)
pxt.create_dir('iterator_demo')
Connected to Pixeltable database at: postgresql+psycopg://postgres:@/pixeltable?host=/Users/pjlb/.pixeltable/pgdata
Created directory ‘iterator_demo’.
<pixeltable.catalog.dir.Dir at 0x17f219990>

Example: sentence splitter iterator

This iterator splits text into sentences by splitting on ., !, ?.
import re

class SentenceSplitter(ComponentIterator):
    """Split text into sentences."""

    def __init__(self, text: str):
        # Split on sentence-ending punctuation
        self.sentences = [s.strip() for s in re.split(r'[.!?]+', text) if s.strip()]
        self.pos = 0

    @classmethod
    def input_schema(cls) -> dict[str, ts.ColumnType]:
        # Define the input parameters and their Pixeltable types
        return {
            'text': ts.StringType(nullable=False)
        }

    @classmethod
    def output_schema(cls, *args: Any, **kwargs: Any) -> tuple[dict[str, ts.ColumnType], list[str]]:
        # Define output columns and which ones are "unstored" (computed on-the-fly)
        return {
            'sentence_idx': ts.IntType(),
            'sentence': ts.StringType(),
        }, []  # No unstored columns

    def __next__(self) -> dict[str, Any]:
        if self.pos >= len(self.sentences):
            raise StopIteration
        result = {
            'sentence_idx': self.pos,
            'sentence': self.sentences[self.pos],
        }
        self.pos += 1
        return result

    def close(self) -> None:
        pass  # No resources to release

    def set_pos(self, pos: int) -> None:
        self.pos = pos  # Support resumable iteration

Create sample data

articles = pxt.create_table(
    'iterator_demo.articles',
    {'title': pxt.String, 'content': pxt.String}
)

articles.insert([
    {
        'title': 'AI News',
        'content': 'AI is transforming industries. Machine learning models are getting better. The future looks bright!'
    },
    {
        'title': 'Tech Update',
        'content': 'New frameworks released today. Developers are excited? Performance improvements are significant.'
    },
])

articles.collect()
Created table ‘articles’.
Inserting rows into `articles`: 2 rows [00:00, 250.91 rows/s]
Inserted 2 rows with 0 errors.

Use the iterator with create_view

# Create a view that expands each article into sentences
sentences_view = pxt.create_view(
    'iterator_demo.sentences',
    articles,
    iterator=SentenceSplitter.create(text=articles.content)
)

sentences_view.collect()
Inserting rows into `sentences`: 6 rows [00:00, 4370.58 rows/s]
# The view inherits columns from the base table plus iterator outputs
sentences_view.select(
    sentences_view.title,
    sentences_view.sentence_idx,
    sentences_view.sentence
).collect()

Example: sliding window iterator with parameters

class SlidingWindowSplitter(ComponentIterator):
    """Split text into overlapping windows of words."""

    def __init__(self, text: str, window_size: int = 5, step: int = 2):
        words = text.split()
        self.windows = []
        for i in range(0, len(words) - window_size + 1, step):
            self.windows.append(' '.join(words[i:i + window_size]))
        self.pos = 0

    @classmethod
    def input_schema(cls) -> dict[str, ts.ColumnType]:
        return {
            'text': ts.StringType(nullable=False),
            'window_size': ts.IntType(),  # Optional parameter
            'step': ts.IntType(),          # Optional parameter
        }

    @classmethod
    def output_schema(cls, *args: Any, **kwargs: Any) -> tuple[dict[str, ts.ColumnType], list[str]]:
        return {
            'window_idx': ts.IntType(),
            'window_text': ts.StringType(),
        }, []

    def __next__(self) -> dict[str, Any]:
        if self.pos >= len(self.windows):
            raise StopIteration
        result = {'window_idx': self.pos, 'window_text': self.windows[self.pos]}
        self.pos += 1
        return result

    def close(self) -> None:
        pass

    def set_pos(self, pos: int) -> None:
        self.pos = pos
# Use with custom parameters
windows_view = pxt.create_view(
    'iterator_demo.windows',
    articles,
    iterator=SlidingWindowSplitter.create(
        text=articles.content,
        window_size=4,
        step=2
    )
)

windows_view.select(
    windows_view.title,
    windows_view.window_idx,
    windows_view.window_text
).collect()
Inserting rows into `windows`: 10 rows [00:00, 7230.31 rows/s]

Explanation

Iterator structure:
class MyIterator(ComponentIterator):
    def __init__(self, input_col, param=default):
        # Initialize state, prepare data to iterate
        
    @classmethod
    def input_schema(cls) -> dict[str, ts.ColumnType]:
        # Map parameter names to Pixeltable types
        return {'input_col': ts.StringType(nullable=False), 'param': ts.IntType()}
    
    @classmethod  
    def output_schema(cls, *args, **kwargs) -> tuple[dict, list]:
        # Return (output columns dict, list of unstored column names)
        return {'col1': ts.IntType(), 'col2': ts.StringType()}, []
    
    def __next__(self) -> dict:
        # Return next row as dict, or raise StopIteration
        
    def close(self) -> None:
        # Release resources (file handles, etc.)
        
    def set_pos(self, pos: int) -> None:
        # Support resumable iteration
Key points:
  • Use ts.StringType(), ts.IntType(), ts.FloatType(), ts.ImageType(), etc. for schemas
  • Add nullable=False for required inputs
  • Unstored columns (second return value of output_schema) are not persisted
  • Call with Iterator.create(param=table.column) in create_view()

See also