Skip to main content
Open in Kaggle  Open in Colab  Download Notebook
This documentation page is also available as an interactive notebook. You can launch the notebook in Kaggle or Colab, or download it for use with an IDE or local Jupyter installation, by clicking one of the above links.
An iterator in Pixeltable is a function that expands a single input row into multiple output rows. Built-in Pixeltable iterators include frame_iterator, which iterates over the frames of a video; tile_iterator, which iterates over tiles of an image; and document_splitter, which iterates over chunks (such as sentences or pages) of a document. These and other examples are discussed in the Iterators platform tutorial. As with UDFs, Pixeltable provides a way for users to define their own iterators from arbitrary Python code. Recall that custom UDFs are created by decorating a Python function with the @pxt.udf decorator. Similarly, custom iterators are created by decorating a Python generator function with @pxt.iterator.
Custom iterators are a relatively advanced Pixeltable feature. This guide will make the most sense if you’re already familiar with Pixeltable’s built-in iterators, as well as the pxt.udf decorator. If you haven’t encountered those concepts yet, it’s recommended to first read the Iterators and UDFs tutorial sections.
%pip install -qU pixeltable
import pixeltable as pxt

pxt.create_dir('iterators_demo', if_exists='replace_force')
Connected to Pixeltable database at: postgresql+psycopg://postgres:@/pixeltable?host=/Users/asiegel/.pixeltable/pgdata
Created directory ‘iterators_demo’.
<pixeltable.catalog.dir.Dir at 0x14739e080>
In this tutorial, we’ll be creating an iterator that takes an image as input, and produces multiple images as output. The output images will be variations of the input with different characteristics. To start, we’ll create a base table to store our source images.
t = pxt.create_table('iterators_demo/images', {'image': pxt.Image})
images = [
    'https://raw.githubusercontent.com/pixeltable/pixeltable/release/docs/resources/images/000000000108.jpg',
    'https://raw.githubusercontent.com/pixeltable/pixeltable/release/docs/resources/images/000000000632.jpg',
]
t.insert({'image': image} for image in images)
t.head()
Now let’s define a custom iterator. Our iterator is going to turn each image into n different grayscale images of varying brightness. Creating a functioning iterator is as simple as defining a Python generator function (a function that yields its output) and then decorating it with @pxt.iterator.
from PIL.Image import Image
from PIL.ImageEnhance import Brightness
from typing import Iterator, TypedDict


class GrayscaleOutput(TypedDict):
    brightness: float
    grayscale_image: Image


@pxt.iterator
def grayscale_iterator(
    image: Image, *, n: int
) -> Iterator[GrayscaleOutput]:
    grayscale_image = image.convert('L')
    enhancer = Brightness(grayscale_image)
    for brightness in [0.5 * (i + 1) for i in range(n)]:
        enhanced_image = enhancer.enhance(brightness)
        yield {
            'grayscale_image': enhanced_image,
            'brightness': brightness,
        }
Notice that before defining our iterator, we first introduced a TypedDict class describing the content of the iterator’s output. Unlike UDFs, iterators can (and usually do) return multiple outputs. They will always yield dictionaries, and you must annotate the return type with a suitable TypedDict. This is how Pixeltable knows what types to assign to the iterator’s output columns.
Defining a TypedDict for your iterator is not optional. Remember that Pixeltable is a database system, and everything must be typed!
Now let’s see our iterator in action! We’ll create a view on top of the images table and collect the results.
v = pxt.create_view(
    'iterators_demo/grayscale',
    t,
    iterator=grayscale_iterator(t.image, n=3),
)
v.head()
The iterator view has the columns brightness and grayscale_image, which were defined in GrayscaleOutput. In addition, Pixeltable added a third column pos. Every iterator will automatically output a pos column, regardless of what shows up in the iterator’s TypedDict. The pos column simply indicates the integer position of that row in the original iteration order. If we look at the schema of our new view, we can see that pos always has type Int.
v
In addition, a column for the original input image is included for reference. (Of course, the input image is not copied n times; Pixeltable materializes it in the view by joining against the base table.)

Parameterizing Iterators

Iterators often contain complex functionality; document_splitter, for example, has 10 optional parameters to tune its behavior. Like UDFs, iterators can involve any number of parameters. To illustrate this, let’s add an optional colorize parameter to our iterator.
from PIL import ImageOps


@pxt.iterator
def grayscale_iterator(
    image: Image, *, n: int, colorize: str | None = None
) -> Iterator[GrayscaleOutput]:
    grayscale_image = image.convert('L')
    if colorize is not None:
        grayscale_image = ImageOps.colorize(
            grayscale_image, black='black', white=colorize
        )
    enhancer = Brightness(grayscale_image)
    for brightness in [0.5 * (i + 1) for i in range(n)]:
        enhanced_image = enhancer.enhance(brightness)
        yield {
            'grayscale_image': enhanced_image,
            'brightness': brightness,
        }
v = pxt.create_view(
    'iterators_demo/grayscale',
    t,
    iterator=grayscale_iterator(t.image, n=3, colorize='red'),
    if_exists='replace',
)
v.head()

Validation

Often it’s desirable to validate an iterator’s inputs as a sanity check. Suppose we want to check that the colorize input is a valid PIL color name. That’s already being done, in a sense: when ImageOps.colorize is called in our iterator code, it will raise an exception if the color name is not valid. The problem is that the iterator code isn’t executed until our workflow actually runs. There’s nothing stopping us from instantiating instances of grayscale_iterator with broken inputs. To appreciate this distinction, let’s set up an empty table with no rows, and define an invalid iterator view on it.
t = pxt.create_table(
    'iterators_demo/images',
    {'image': pxt.Image},
    if_exists='replace_force',
)
Created table ‘images’.
v = pxt.create_view(
    'iterators_demo/grayscale',
    t,
    iterator=grayscale_iterator(
        t.image, n=3, colorize='invalid_color_name'
    ),
)
The view gets created without any errors, because nothing has actually run yet! Only when we go to insert data do we see an exception.
t.insert({'image': image} for image in images)
ValueError: unknown color specifier: ‘invalid_color_name’
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
Cell In[10], line 1
----> 1 t.insert({’image’: image} for image in images)File ~/Dropbox/workspace/pixeltable/pixeltable/pixeltable/catalog/insertable_table.py:174, in InsertableTable.insert(self, source, source_format, schema_overrides, on_error, print_stats, **kwargs)
    171 data_source.add_table_info(table)
    172 data_source.prepare_for_insert_into_table()
—> 174 return table.insert_table_data_source(
    175     data_source=data_source, fail_on_exception=fail_on_exception, print_stats=print_stats
    176 )File ~/Dropbox/workspace/pixeltable/pixeltable/pixeltable/catalog/insertable_table.py:195, in InsertableTable.insert_table_data_source(self, data_source, fail_on_exception, print_stats)
    193         status = pxt.UpdateStatus()
    194         for row_batch in data_source.valid_row_batch():
—> 195             status += self._tbl_version.get().insert(
    196                 rows=row_batch, query=None, print_stats=print_stats, fail_on_exception=fail_on_exception
    197             )
    199 Env.get().console_logger.info(status.insert_msg(start_ts))
    201 FileCache.get().emit_eviction_warnings()File ~/Dropbox/workspace/pixeltable/pixeltable/pixeltable/catalog/table_version.py:1183, in TableVersion.insert(self, rows, query, print_stats, fail_on_exception)
   1180         yield rowid
   1182 with Env.get().report_progress():
-> 1183     result = self._insert(
   1184         plan, time.time(), print_stats=print_stats, rowids=rowids(), abort_on_exc=fail_on_exception
   1185     )
   1186     return resultFile ~/Dropbox/workspace/pixeltable/pixeltable/pixeltable/catalog/table_version.py:1214, in TableVersion.insert(self, exec_plan, timestamp, rowids, print_stats, abort_on_exc)
   1211     from pixeltable.plan import Planner
   1213     view_plan,  = Planner.create_view_load_plan(view.get().path, propagates_insert=True)
-> 1214     status = view.get()._insert(view_plan, timestamp, print_stats=print_stats)
   1215     result += status.to_cascade()
   1217 # Use the net status after all propagationsFile ~/Dropbox/workspace/pixeltable/pixeltable/pixeltable/catalog/table_version.py:1201, in TableVersion._insert(self, exec_plan, timestamp, rowids, print_stats, abort_on_exc)
   1199 self.bump_version(timestamp, bump_schema_version=False)
   1200 exec_plan.ctx.title = self.display_str()
-> 1201 cols_with_excs, row_counts = self.store_tbl.insert_rows(
   1202     exec_plan, v_min=self.version, rowids=rowids, abort_on_exc=abort_on_exc
   1203 )
   1204 result = UpdateStatus(
   1205     cols_with_excs=[f’{self.name}.{self.cols_by_id[cid].name}’ for cid in cols_with_excs],
   1206     row_count_stats=row_counts,
   1207 )
   1209 # update viewsFile ~/Dropbox/workspace/pixeltable/pixeltable/pixeltable/store.py:425, in StoreBase.insert_rows(self, exec_plan, v_min, rowids, abort_on_exc)
    420 with exec_plan:
    421     progress_reporter = exec_plan.ctx.add_progress_reporter(
    422         f’Rows written (table {self.tbl_version.get().name!r})’, ’rows’
    423     )
—> 425     for row_batch in exec_plan:
    426         num_rows += len(row_batch)
    427         batch_table_rows: list[tuple[Any]] = []File ~/Dropbox/workspace/pixeltable/pixeltable/pixeltable/exec/exec_node.py:63, in ExecNode.__iter__(self)
     61 try:
     62     while True:
---> 63         batch: DataRowBatch = loop.run_until_complete(aiter.__anext__())
     64         yield batch
     65 except StopAsyncIteration:File /opt/miniconda3/envs/pxt/lib/python3.10/site-packages/nest_asyncio.py:99, in _patch_loop.<locals>.run_until_complete(self, future)
     96 if not f.done():
     97     raise RuntimeError(
     98         ’Event loop stopped before Future completed.’)
---> 99 return f.result()File /opt/miniconda3/envs/pxt/lib/python3.10/asyncio/futures.py:201, in Future.result(self)
    199 self.__log_traceback = False
    200 if self._exception is not None:
—> 201     raise self._exception.with_traceback(self._exception_tb)
    202 return self._resultFile /opt/miniconda3/envs/pxt/lib/python3.10/asyncio/tasks.py:232, in Task.__step(failed resolving arguments)
    228 try:
    229     if exc is None:
    230         # We use the `send` method directly, because coroutines
    231         # don’t have `iter` and `next` methods.
—> 232         result = coro.send(None)
    233     else:
    234         result = coro.throw(exc)File ~/Dropbox/workspace/pixeltable/pixeltable/pixeltable/exec/object_store_save_node.py:128, in ObjectStoreSaveNode.__aiter__(self)
    125 while True:
    126     # Create work to fill the queue to the high water mark … ?without overrunning the in-flight row limit.
    127     while not self.input_finished and self.queued_work < self.QUEUE_DEPTH_HIGH_WATER:
—> 128         input_batch = await self.get_input_batch(input_iter)
    129         if input_batch is not None:
    130             self.__process_input_batch(input_batch, executor)File ~/Dropbox/workspace/pixeltable/pixeltable/pixeltable/exec/object_store_save_node.py:114, in ObjectStoreSaveNode.get_input_batch(self, input_iter)
    112 """Get the next batch of input rows, or None if there are no more rows"""
    113 try:
—> 114     input_batch = await anext(input_iter)
    115     if input_batch is None:
    116         self.input_finished = TrueFile ~/Dropbox/workspace/pixeltable/pixeltable/pixeltable/exec/expr_eval/expr_eval_node.py:298, in ExprEvalNode.__aiter__(self)
    296         raise self.error from self.error.exc
    297     else:
—> 298         raise self.error
    299 if completed_aw in done:
    300     self._log_state(’completed_aw done’)File ~/Dropbox/workspace/pixeltable/pixeltable/pixeltable/exec/expr_eval/expr_eval_node.py:124, in ExprEvalNode._fetch_input_batch(self)
    122 assert not self.input_complete
    123 try:
—> 124     batch = await anext(self.input_iter)
    125     if self.progress_reporter is not None:
    126         # make sure our progress reporter shows up before we run anything long
    127         self.progress_reporter.update(0)File ~/Dropbox/workspace/pixeltable/pixeltable/pixeltable/exec/component_iteration_node.py:56, in ComponentIterationNode.__aiter__(self)
     54 if self.__non_nullable_args_specified(iterator_args):
     55     iterator = self.view.get().iterator_call.eval(iterator_args)
---> 56     for pos, component_dict in enumerate(iterator):
     57         output_row = self.row_builder.make_row()
     58         input_row.copy(output_row)Cell In[6], line 10, in grayscale_iterator(image, n, colorize)
      8 grayscale_image = image.convert(’L’)
      9 if colorize is not None:
---> 10     grayscale_image = ImageOps.colorize(
     11         grayscale_image, black=’black’, white=colorize
     12     )
     13 enhancer = Brightness(grayscale_image)
     14 for brightness in [0.5 * (i + 1) for i in range(n)]:File /opt/miniconda3/envs/pxt/lib/python3.10/site-packages/PIL/ImageOps.py:207, in colorize(image, black, white, mid, blackpoint, whitepoint, midpoint)
    205 # Define colors from arguments
    206 rgb_black = cast(Sequence[int], _color(black, ”RGB”))
—> 207 rgb_white = cast(Sequence[int], _color(white, ”RGB”))
    208 rgb_mid = cast(Sequence[int], _color(mid, ”RGB”)) if mid is not None else None
    210 # Empty lists for the mappingFile /opt/miniconda3/envs/pxt/lib/python3.10/site-packages/PIL/ImageOps.py:48, in _color(color, mode)
     45 if isinstance(color, str):
     46     from . import ImageColor
---> 48     color = ImageColor.getcolor(color, mode)
     49 return colorFile /opt/miniconda3/envs/pxt/lib/python3.10/site-packages/PIL/ImageColor.py:144, in getcolor(color, mode)
    130 """
    131 Same as :py:func:`~PIL.ImageColor.getrgb` for most modes. However, if
    132 “mode“ is HSV, converts the RGB value to a HSV value, or if “mode“ is
   (…)
    141 :return: “graylevel, (graylevel, alpha) or (red, green, blue[, alpha])“
    142 """
    143 # same as getrgb, but converts the result to the given mode
—> 144 rgb, alpha = getrgb(color), 255
    145 if len(rgb) == 4:
    146     alpha = rgb[3]File /opt/miniconda3/envs/pxt/lib/python3.10/site-packages/PIL/ImageColor.py:125, in getrgb(color)
    123     return int(m.group(1)), int(m.group(2)), int(m.group(3)), int(m.group(4))
    124 msg = f”unknown color specifier: {repr(color)}”
—> 125 raise ValueError(msg)ValueError: unknown color specifier: ‘invalid_color_name’
It’s more useful to do fail-fast validation, in which the arguments get checked at the time the iterator is first instantiated. This can be done in Pixeltable with the @validate decorator.
from PIL import ImageColor


@grayscale_iterator.validate
def _(bound_args: dict):
    color = bound_args.get('colorize')
    if color is not None:
        try:
            ImageColor.getrgb(color)
        except ValueError as exc:
            raise ValueError(f'Invalid color name: {color}') from exc
Now if we try to create an invalid instance, we get an error right away.
t = pxt.create_table(
    'iterators_demo/images',
    {'input': pxt.Image},
    if_exists='replace_force',
)
Created table ‘images’.
v = pxt.create_view(
    'iterators_demo/grayscale',
    t,
    iterator=grayscale_iterator(
        t.input, n=3, colorize='invalid_color_name'
    ),
)
ValueError: Invalid color name: invalid_color_name
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
Cell In[11], line 9, in _(bound_args)
      8 try:
----> 9     ImageColor.getrgb(color)
     10 except ValueError as exc:File /opt/miniconda3/envs/pxt/lib/python3.10/site-packages/PIL/ImageColor.py:125, in getrgb(color)
    124 msg = f”unknown color specifier: {repr(color)}”
—> 125 raise ValueError(msg)ValueError: unknown color specifier: ‘invalid_color_name’The above exception was the direct cause of the following exception:ValueError                                Traceback (most recent call last)
Cell In[13], line 4
      1 v = pxt.create_view(
      2     ’iterators_demo/grayscale’,
      3     t,
----> 4     iterator=grayscale_iterator(
      5         t.input, n=3, colorize=’invalid_color_name’
      6     ),
      7 )File ~/Dropbox/workspace/pixeltable/pixeltable/pixeltable/func/iterator.py:233, in GeneratingFunction.__call__(self, *args, **kwargs)
    231 # Run custom iterator validation on whatever args are bound to literals at this stage
    232 if self._validate is not None:
—> 233     self._validate(literal_args)
    235 output_schema = self.call_output_schema(literal_args)
    237 outputs = {
    238     name: IteratorOutput(orig_name=name, is_stored=(name not in self.unstored_cols), col_type=col_type)
    239     for name, col_type in output_schema.items()
    240 }Cell In[11], line 11, in _(bound_args)
      9     ImageColor.getrgb(color)
     10 except ValueError as exc:
---> 11     raise ValueError(f’Invalid color name: {color}’) from excValueError: Invalid color name: invalid_color_name
The input to validate(), bound_args, is a dictionary that contains all constant arguments for a particular instance of the iterator. In the above example, it contains colorize (because it’s equal to the constant value 'invalid_color_name'), but not image (which depends dynamically on the data in the t.input column). validate() will actually be called twice: once when the iterator is instantiated, with just the constant arguments present in bound_args; and again when the iterator is evaluated on each row, this time with all arguments present.

Class-Based Iterators

For complex iterators that need to maintain a lot of state or provide fine-grained control over their iteration mechanism, it can be convenient to define a class rather than a generator function. This can be done by writing a subclass of PxtIterator and decorating the class, rather than decorating a function. Here’s what grayscale_iterator looks like if written as a class; it is functionally identical to the earlier implementation.
@pxt.iterator
class grayscale_iterator(pxt.PxtIterator[GrayscaleOutput]):
    # The parameters of __init__() determine the iterator arguments
    def __init__(
        self, image: Image, *, n: int, colorize: str | None = None
    ):
        self.image = image
        self.n = n
        self.colorize = colorize
        self.idx = 0

        grayscale_image = self.image.convert('L')
        if self.colorize is not None:
            grayscale_image = ImageOps.colorize(
                grayscale_image, black='black', white=self.colorize
            )
        self.enhancer = Brightness(grayscale_image)

    # Every class-based iterator *must* implement a __next__() method
    # whose return type is a `TypedDict`.
    def __next__(self) -> GrayscaleOutput:
        if self.idx >= self.n:
            raise StopIteration

        brightness = 0.5 * (self.idx + 1)
        enhanced_image = self.enhancer.enhance(brightness)
        self.idx += 1
        return {
            'grayscale_image': enhanced_image,
            'brightness': brightness,
        }

    # When defining a class-based iterator, validate() can optionally be specified
    # as a @classmethod rather than a standalone decorated function.
    @classmethod
    def validate(cls, bound_args: dict):
        color = bound_args.get('colorize')
        if color is not None:
            try:
                ImageColor.getrgb(color)
            except ValueError as exc:
                raise ValueError(f'Invalid color name: {color}') from exc

Unstored Columns

That’s all you need to know to implement fully functional iterators. But sometimes, depending on the nature of the outputs, a little extra work will help make them more performant. In our example, every input image gets turned into n output images. Moreover, recreating those output images doesn’t involve a lot of computation: it’s just a simple color mask. If we store every output image as a separate file, then when n is large we’ll be using up a lot of storage without much benefit. Even at n=3, the outputs will consume 3x the storage as the inputs (maybe a little less since they’re monochrome now, but you get the idea). Just as with computed columns, Pixeltable provides an option for iterator outputs to be unstored - meaning the outputs won’t be saved to disk, and they’ll instead be dynamically regenerated each time a client queries them. Unstored columns don’t provide much benefit for scalar columns (integers or strings, say), where the storage footprint is small; or for expensive computations (such as generative model outputs), where we actually do want to persist the output. But for simple image operations, they can be a lifesaver.
In the Pixeltable library, frame_iterator and tile_iterator both use an unstored column for the output images. In the case of frame_iterator, the output is potentially huge, because video data is highly compressed, as compared to individually stored frame images.
To mark an iterator output as unstored, use the unstored_cols decorator parameter. There is one important caveat:
  • If you use unstored columns, you must implement your iterator as a class-based iterator; and
  • You must implement a seek() method in your class, as in the example below.
This is to ensure Pixeltable has efficient random access to the iterator outputs, to facilitate downstream queries against the iterator view.
# Mark `grayscale_image` as an unstored column.
@pxt.iterator(unstored_cols=['grayscale_image'])
class grayscale_iterator(pxt.PxtIterator[GrayscaleOutput]):
    def __init__(
        self, image: Image, *, n: int, colorize: str | None = None
    ):
        self.image = image
        self.n = n
        self.colorize = colorize
        self.idx = 0

        grayscale_image = self.image.convert('L')
        if self.colorize is not None:
            grayscale_image = ImageOps.colorize(
                grayscale_image, black='black', white=self.colorize
            )
        self.enhancer = Brightness(grayscale_image)

    def __next__(self) -> GrayscaleOutput:
        if self.idx >= self.n:
            raise StopIteration

        brightness = 0.5 * (self.idx + 1)
        enhanced_image = self.enhancer.enhance(brightness)
        self.idx += 1
        return {
            'grayscale_image': enhanced_image,
            'brightness': brightness,
        }

    # seek() will always receive the `pos` of the row being sought. It
    # will also receive the previously stored values of any *stored*
    # output columns in the target row, as keyword arguments.
    def seek(self, pos: int, **kwargs):
        assert 0 <= pos < self.n
        # 'brightness' is a stored column, so it should always be
        # present. We don't need it to implement seek(), but for
        # purposes of illustration let's check that it's here.
        assert 'brightness' in kwargs

        self.idx = pos  # Reset the iterator to the sought position.

    # When defining a class-based iterator, validate() can optionally
    # be a @classmethod rather than a standalone decorated function.
    @classmethod
    def validate(cls, bound_args: dict):
        color = bound_args.get('colorize')
        if color is not None:
            try:
                ImageColor.getrgb(color)
            except ValueError as exc:
                raise ValueError(f'Invalid color name: {color}') from exc
There it is: a complete, performant implementation of grayscale_iterator. Let’s check one more time that it all works as expected.
t = pxt.create_table(
    'iterators_demo/images',
    {'image': pxt.Image},
    if_exists='replace_force',
)
t.insert({'image': image} for image in images)
Inserted 2 rows with 0 errors in 0.03 s (75.79 rows/s)
2 rows inserted.
v = pxt.create_view(
    'iterators_demo/grayscale',
    t,
    iterator=grayscale_iterator(t.image, n=3),
)
v.head()
# Check that we have random access to arbitrary rows in the view.
v.where(v.pos == 2).collect()
Last modified on April 11, 2026