Skip to main content
If you’ve been wrangling multimodal data with custom Python scripts, DVC for versioning, Airflow for scheduling, and manual processing loops — this guide shows how Pixeltable replaces that plumbing with declarative tables.
Related use case: Data Wrangling for ML

Concept Mapping

Your DIY StackPixeltable Equivalent
S3 buckets for media filespxt.Image, pxt.Video, pxt.Audio columns — can still read from S3
DVC for data versioningBuilt-in history(), revert(), create_snapshot()
Airflow / cron for schedulingComputed columns — run automatically on insert
Custom scripts with OpenCV / PIL@pxt.udf functions as computed columns
cv2.VideoCapture() + frame loopsframe_iterator via create_view()
Manual retry logic (tenacity)Automatic retries with result caching
Embeddings as numpy / Parquetadd_embedding_index() with HNSW search
torch.utils.data.Dataset boilerplateto_pytorch_dataset() — one line
Re-run pipeline when data changesIncremental — only new rows are processed

Side by Side: Image Processing Pipeline

Process images: generate thumbnails, caption with an LLM, embed for search, version everything.
import pandas as pd
import numpy as np
from PIL import Image
from openai import OpenAI
from pathlib import Path
import base64, time

client = OpenAI()

# Load metadata
image_dir = Path('dataset/images/')
df = pd.DataFrame([
    {'filename': f.name, 'path': str(f), 'category': 'unknown'}
    for f in image_dir.glob('*.jpg')
])

# Generate thumbnails (manual loop)
thumb_dir = Path('dataset/thumbnails/')
thumb_dir.mkdir(exist_ok=True)
for idx, row in df.iterrows():
    img = Image.open(row['path'])
    img.thumbnail((256, 256))
    img.save(thumb_dir / row['filename'])
    df.at[idx, 'thumbnail'] = str(thumb_dir / row['filename'])

# Caption images (manual retry, one at a time)
def caption_image(path, max_retries=3):
    with open(path, 'rb') as f:
        b64 = base64.b64encode(f.read()).decode()
    for attempt in range(max_retries):
        try:
            resp = client.chat.completions.create(
                model='gpt-4o-mini',
                messages=[{'role': 'user', 'content': [
                    {'type': 'text', 'text': 'Describe this image in one sentence.'},
                    {'type': 'image_url', 'image_url': {
                        'url': f'data:image/jpeg;base64,{b64}'}}
                ]}],
            )
            return resp.choices[0].message.content
        except Exception:
            if attempt < max_retries - 1:
                time.sleep(2 ** attempt)
            else:
                return None

df['caption'] = [caption_image(row['path']) for _, row in df.iterrows()]

# Generate embeddings (batch manually, store as numpy)
valid = df.dropna(subset=['caption'])
resp = client.embeddings.create(
    input=valid['caption'].tolist(), model='text-embedding-3-small')
np.save('dataset/embeddings.npy', [e.embedding for e in resp.data])

# Persist and version
df.to_csv('dataset/metadata.csv', index=False)
# Then: dvc add dataset/ && dvc push && git add && git commit

What Changes

Custom ScriptsPixeltable
New imagesRe-run the entire pipelineimages.insert([...]) — everything downstream runs
Change modelRe-run everything; DVC tracks snapshots, not transformsDrop and re-add the column — only that column recomputes
Versioningdvc add + git commit ceremonyAutomatic — images.history(), pxt.create_snapshot()
SchedulingAirflow, cron, or manual re-runsNot needed — computed columns run on insert
Retriestry/except with backoff in every functionBuilt-in; successful results are cached
SearchBrute-force numpy, or set up a vector DBadd_embedding_index() with HNSW
PyTorch exportCustom Dataset classimages.to_pytorch_dataset()

Common Patterns

Video frame extraction

import cv2
from PIL import Image

cap = cv2.VideoCapture('demo.mp4')
fps = cap.get(cv2.CAP_PROP_FPS)
frames, idx = [], 0
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break
    if idx % int(fps) == 0:
        frames.append(Image.fromarray(
            cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)))
    idx += 1
cap.release()

Data versioning

dvc add dataset/
git add dataset.dvc .gitignore
git commit -m "update dataset v3"
dvc push

# Revert
git checkout HEAD~1 -- dataset.dvc
dvc checkout

PyTorch export

from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

class ImageDataset(Dataset):
    def __init__(self, df, transform=None):
        self.df = df.reset_index(drop=True)
        self.transform = transform
    def __len__(self):
        return len(self.df)
    def __getitem__(self, idx):
        img = Image.open(self.df.at[idx, 'path'])
        if self.transform:
            img = self.transform(img)
        return img, self.df.at[idx, 'category']

loader = DataLoader(ImageDataset(df, transforms.Compose([
    transforms.Resize((224, 224)), transforms.ToTensor()])),
    batch_size=32)

Next Steps

Last modified on March 3, 2026