This documentation page is also available as an interactive notebook. You can launch the notebook in
Kaggle or Colab, or download it for use with an IDE or local Jupyter installation, by clicking one of the
above links.
Build reusable aggregation logic for group-by queries and analytics.
Problem
You need aggregations beyond the built-in sum, count, mean, min,
max — such as collecting values into a list, concatenating strings, or
computing custom statistics.
Solution
What’s in this recipe:
- Define a UDA (User-Defined Aggregate) with the
@pxt.uda decorator
- Use UDAs in
group_by queries
- Create UDAs with multiple inputs
Setup
%pip install -qU pixeltable
import pixeltable as pxt
pxt.drop_dir('uda_demo', force=True)
pxt.create_dir('uda_demo')
Connected to Pixeltable database at: postgresql+psycopg://postgres:@/pixeltable?host=/Users/pjlb/.pixeltable/pgdata
Created directory ‘uda_demo’.
<pixeltable.catalog.dir.Dir at 0x16a80d480>
Create sample data
sales = pxt.create_table(
'uda_demo.sales',
{'region': pxt.String, 'product': pxt.String, 'amount': pxt.Float, 'quantity': pxt.Int}
)
sales.insert([
{'region': 'North', 'product': 'Widget', 'amount': 100.0, 'quantity': 5},
{'region': 'North', 'product': 'Gadget', 'amount': 250.0, 'quantity': 2},
{'region': 'North', 'product': 'Widget', 'amount': 150.0, 'quantity': 8},
{'region': 'South', 'product': 'Widget', 'amount': 200.0, 'quantity': 10},
{'region': 'South', 'product': 'Gadget', 'amount': 175.0, 'quantity': 3},
{'region': 'East', 'product': 'Widget', 'amount': 125.0, 'quantity': 6},
])
sales.collect()
Created table ‘sales’.Inserting rows into `sales`: 0 rows [00:00, ? rows/s]
Inserting rows into `sales`: 6 rows [00:00, 609.56 rows/s]
Inserted 6 rows with 0 errors.
Variance UDA (not built-in)
# A UDA is a class that inherits from pxt.Aggregator
# It must implement: __init__, update, and value
@pxt.uda
class variance(pxt.Aggregator):
"""Compute population variance using Welford's online algorithm."""
def __init__(self):
self.count = 0
self.mean = 0.0
self.m2 = 0.0 # Sum of squared differences from mean
def update(self, val: float) -> None:
if val is not None:
self.count += 1
delta = val - self.mean
self.mean += delta / self.count
delta2 = val - self.mean
self.m2 += delta * delta2
def value(self) -> float:
if self.count < 1:
return 0.0
return self.m2 / self.count # Population variance
# Use like any built-in aggregate
sales.select(variance(sales.amount)).collect()
# Use in group_by queries
sales.group_by(sales.region).select(
sales.region,
amount_variance=variance(sales.amount)
).collect()
String concatenation UDA
@pxt.uda
class string_agg(pxt.Aggregator):
"""Concatenate strings with a comma separator."""
def __init__(self):
self.values = []
def update(self, val: str) -> None:
if val is not None:
self.values.append(val)
def value(self) -> str:
return ', '.join(self.values)
# List all products sold in each region
sales.group_by(sales.region).select(
sales.region,
products=string_agg(sales.product)
).collect()
Collect values into a list
@pxt.uda
class collect_list(pxt.Aggregator):
"""Collect all values into a list."""
def __init__(self):
self.items = []
def update(self, val: float) -> None:
if val is not None:
self.items.append(val)
def value(self) -> list[float]:
return self.items
# Get all amounts per region as a list
sales.group_by(sales.region).select(
sales.region,
amounts=collect_list(sales.amount)
).collect()
Weighted average UDA
@pxt.uda
class weighted_avg(pxt.Aggregator):
"""Compute weighted average: sum(value * weight) / sum(weight)."""
def __init__(self):
self.weighted_sum = 0.0
self.weight_sum = 0.0
def update(self, value: float, weight: float) -> None:
if value is not None and weight is not None:
self.weighted_sum += value * weight
self.weight_sum += weight
def value(self) -> float:
if self.weight_sum == 0:
return 0.0
return self.weighted_sum / self.weight_sum
# Compute quantity-weighted average price per region
sales.group_by(sales.region).select(
sales.region,
avg_price=weighted_avg(sales.amount, sales.quantity)
).collect()
Mode UDA (most frequent value)
from collections import Counter
@pxt.uda
class mode(pxt.Aggregator):
"""Find the most frequent value in a group."""
def __init__(self):
self.counts = Counter()
def update(self, val: str) -> None:
if val is not None:
self.counts[val] += 1
def value(self) -> str:
if not self.counts:
return None
return self.counts.most_common(1)[0][0]
# Find most common product per region
sales.group_by(sales.region).select(
sales.region,
top_product=mode(sales.product)
).collect()
Explanation
UDA structure:
@pxt.uda
class my_aggregate(pxt.Aggregator):
def __init__(self): # Initialize state
self.state = initial_value
def update(self, val: InputType) -> None: # Called for each row
# Update internal state with val
def value(self) -> OutputType: # Called at the end
return self.state
Key points:
- Always handle
None values in update()
- Multiple parameters in
update() enable multi-column aggregations
(like weighted_avg)
- Return type annotation on
value() determines output column type
See also