Skip to main content
Open in Kaggle  Open in Colab  Download Notebook
This documentation page is also available as an interactive notebook. You can launch the notebook in Kaggle or Colab, or download it for use with an IDE or local Jupyter installation, by clicking one of the above links.
Build reusable aggregation logic for group-by queries and analytics.

Problem

You need aggregations beyond the built-in sum, count, mean, min, max — such as collecting values into a list, concatenating strings, or computing custom statistics.

Solution

What’s in this recipe:
  • Define a UDA (User-Defined Aggregate) with the @pxt.uda decorator
  • Use UDAs in group_by queries
  • Create UDAs with multiple inputs

Setup

%pip install -qU pixeltable
import pixeltable as pxt

pxt.drop_dir('uda_demo', force=True)
pxt.create_dir('uda_demo')
Connected to Pixeltable database at: postgresql+psycopg://postgres:@/pixeltable?host=/Users/pjlb/.pixeltable/pgdata
Created directory ‘uda_demo’.
<pixeltable.catalog.dir.Dir at 0x16a80d480>

Create sample data

sales = pxt.create_table(
    'uda_demo.sales',
    {'region': pxt.String, 'product': pxt.String, 'amount': pxt.Float, 'quantity': pxt.Int}
)

sales.insert([
    {'region': 'North', 'product': 'Widget', 'amount': 100.0, 'quantity': 5},
    {'region': 'North', 'product': 'Gadget', 'amount': 250.0, 'quantity': 2},
    {'region': 'North', 'product': 'Widget', 'amount': 150.0, 'quantity': 8},
    {'region': 'South', 'product': 'Widget', 'amount': 200.0, 'quantity': 10},
    {'region': 'South', 'product': 'Gadget', 'amount': 175.0, 'quantity': 3},
    {'region': 'East', 'product': 'Widget', 'amount': 125.0, 'quantity': 6},
])

sales.collect()
Created table ‘sales’.Inserting rows into `sales`: 0 rows [00:00, ? rows/s]
Inserting rows into `sales`: 6 rows [00:00, 609.56 rows/s]
Inserted 6 rows with 0 errors.

Variance UDA (not built-in)

# A UDA is a class that inherits from pxt.Aggregator
# It must implement: __init__, update, and value

@pxt.uda
class variance(pxt.Aggregator):
    """Compute population variance using Welford's online algorithm."""

    def __init__(self):
        self.count = 0
        self.mean = 0.0
        self.m2 = 0.0  # Sum of squared differences from mean

    def update(self, val: float) -> None:
        if val is not None:
            self.count += 1
            delta = val - self.mean
            self.mean += delta / self.count
            delta2 = val - self.mean
            self.m2 += delta * delta2

    def value(self) -> float:
        if self.count < 1:
            return 0.0
        return self.m2 / self.count  # Population variance
# Use like any built-in aggregate
sales.select(variance(sales.amount)).collect()
# Use in group_by queries
sales.group_by(sales.region).select(
    sales.region,
    amount_variance=variance(sales.amount)
).collect()

String concatenation UDA

@pxt.uda
class string_agg(pxt.Aggregator):
    """Concatenate strings with a comma separator."""

    def __init__(self):
        self.values = []

    def update(self, val: str) -> None:
        if val is not None:
            self.values.append(val)

    def value(self) -> str:
        return ', '.join(self.values)
# List all products sold in each region
sales.group_by(sales.region).select(
    sales.region,
    products=string_agg(sales.product)
).collect()

Collect values into a list

@pxt.uda
class collect_list(pxt.Aggregator):
    """Collect all values into a list."""

    def __init__(self):
        self.items = []

    def update(self, val: float) -> None:
        if val is not None:
            self.items.append(val)

    def value(self) -> list[float]:
        return self.items
# Get all amounts per region as a list
sales.group_by(sales.region).select(
    sales.region,
    amounts=collect_list(sales.amount)
).collect()

Weighted average UDA

@pxt.uda
class weighted_avg(pxt.Aggregator):
    """Compute weighted average: sum(value * weight) / sum(weight)."""

    def __init__(self):
        self.weighted_sum = 0.0
        self.weight_sum = 0.0

    def update(self, value: float, weight: float) -> None:
        if value is not None and weight is not None:
            self.weighted_sum += value * weight
            self.weight_sum += weight

    def value(self) -> float:
        if self.weight_sum == 0:
            return 0.0
        return self.weighted_sum / self.weight_sum
# Compute quantity-weighted average price per region
sales.group_by(sales.region).select(
    sales.region,
    avg_price=weighted_avg(sales.amount, sales.quantity)
).collect()

Mode UDA (most frequent value)

from collections import Counter

@pxt.uda
class mode(pxt.Aggregator):
    """Find the most frequent value in a group."""

    def __init__(self):
        self.counts = Counter()

    def update(self, val: str) -> None:
        if val is not None:
            self.counts[val] += 1

    def value(self) -> str:
        if not self.counts:
            return None
        return self.counts.most_common(1)[0][0]
# Find most common product per region
sales.group_by(sales.region).select(
    sales.region,
    top_product=mode(sales.product)
).collect()

Explanation

UDA structure:
@pxt.uda
class my_aggregate(pxt.Aggregator):
    def __init__(self):  # Initialize state
        self.state = initial_value

    def update(self, val: InputType) -> None:  # Called for each row
        # Update internal state with val

    def value(self) -> OutputType:  # Called at the end
        return self.state
Key points:
  • Always handle None values in update()
  • Multiple parameters in update() enable multi-column aggregations (like weighted_avg)
  • Return type annotation on value() determines output column type

See also