User-Defined Functions (UDFs) in Pixeltable allow you to extend the platform with custom Python code. They bridge the gap between Pixeltable’s built-in operations and your specific data processing needs, enabling you to create reusable components for transformations, analysis, and AI workflows.Pixeltable UDFs offer several key advantages:
Reusability: Define a function once and use it across multiple tables and operations
Type Safety: Strong typing ensures data compatibility throughout your workflows
Performance: Batch processing and caching capabilities optimize execution
Integration: Seamlessly combine custom code with Pixeltable’s query system
Flexibility: Process any data type including text, images, videos, and embeddings
UDFs can be as simple as a basic transformation or as complex as a multi-stage ML workflow. Pixeltable offers three types of custom functions to handle different scenarios:
Copy
Ask AI
import pixeltable as pxt# Basic UDF for text transformation@pxt.udfdef clean_text(text: str) -> str: """Clean and normalize text data.""" return text.lower().strip()# Use in a computed columndocuments = pxt.get_table('my_documents')documents.add_computed_column( clean_content=clean_text(documents.content))
# Defined directly in your code@pxt.udfdef extract_year(date_str: str) -> int: return int(date_str.split('-')[0])# Used immediatelytable.add_computed_column( year=extract_year(table.date))
Local UDFs are serialized with their columns. Changes to the UDF only affect new columns.
@pxt.udf(batch_size=16)def embed_texts( texts: Batch[str]) -> Batch[pxt.Array]: # Process multiple texts at once return model.encode(texts)
Caching
Copy
Ask AI
@pxt.udfdef expensive_operation(text: str) -> str: # Cache model instance if not hasattr(expensive_operation, 'model'): expensive_operation.model = load_model() return expensive_operation.model(text)
Async Support
Copy
Ask AI
from typing import Optional, Literal, Union, Anyimport json@pxt.udfasync def chat_completions( messages: list, *, model: str, model_kwargs: Optional[dict] = None,) -> dict: # Setup API request with proper context management result = await openai_client.chat_completions( messages=messages, model=model, model_kwargs=model_kwargs ) # Process response return json.loads(result.text)# Example usage in a computed columntable.add_computed_column( response=chat_completions( [ {'role': 'system', 'content': 'You are a helpful assistant.'}, {'role': 'user', 'content': table.prompt} ], model='gpt-4o-mini' ))
Async UDFs are specifically designed for handling external API calls, such as LLM calls, database queries, or web service interactions. They should not be used for general computation or data processing. They keep your Pixeltable workflows responsive by allowing background execution of time-consuming operations.
@pxt.udfdef validate_score(score: float) -> float: if not 0 <= score <= 100: raise ValueError("Score must be between 0 and 100") return score
Performance
Use batching for GPU operations
Cache expensive resources
Process data in chunks when possible
Copy
Ask AI
@pxt.udf(batch_size=32)def process_chunk(items: Batch[str]) -> Batch[str]: if not hasattr(process_chunk, 'model'): process_chunk.model = load_expensive_model() return process_chunk.model.process_batch(items)
Organization
Keep related UDFs in modules
Use clear, descriptive names
Document complex operations
Copy
Ask AI
@pxt.udfdef normalize_text( text: str, lowercase: bool = True, remove_punctuation: bool = True) -> str: """Normalize text by optionally lowercasing and removing punctuation.""" if lowercase: text = text.lower() if remove_punctuation: text = text.translate(str.maketrans("", "", string.punctuation)) return text
Table UDFs
Define clear input and output columns for your table UDFs
Implement cleanup routines for tables that grow large
Balance between too many small tables and monolithic tables
Use clear naming conventions for tables and their UDFs
Document the purpose and expected inputs for each table UDF
# Create a table with your workflowfinance_agent = pxt.create_table('directory.financial_analyst', {'prompt': pxt.String})# Add computed columns for processingfinance_agent.add_computed_column(/* ... */)
You can create a workflow of table UDFs to handle complex multi-stage processing:
Copy
Ask AI
# Create a chain of specialized agentsresearch_agent = pxt.udf(research_table, return_value=research_table.findings)analysis_agent = pxt.udf(analysis_table, return_value=analysis_table.insights)report_agent = pxt.udf(report_table, return_value=report_table.document)# Use them in sequenceworkflow.add_computed_column(research=research_agent(workflow.query))workflow.add_computed_column(analysis=analysis_agent(workflow.research))workflow.add_computed_column(report=report_agent(workflow.analysis))
Parallel Processing with Table UDFs
Execute multiple table UDFs in parallel and combine their results:
Copy
Ask AI
# Define specialized agents for different tasksstock_agent = pxt.udf(stock_table, return_value=stock_table.analysis)news_agent = pxt.udf(news_table, return_value=news_table.summary)sentiment_agent = pxt.udf(sentiment_table, return_value=sentiment_table.score)# Process in parallelportfolio.add_computed_column(stock_data=stock_agent(portfolio.ticker))portfolio.add_computed_column(news_data=news_agent(portfolio.ticker))portfolio.add_computed_column(sentiment=sentiment_agent(portfolio.ticker))# Combine resultsportfolio.add_computed_column(report=combine_insights( portfolio.stock_data, portfolio.news_data, portfolio.sentiment))