User-Defined Functions (UDFs) in Pixeltable allow you to extend the platform with custom Python code. They bridge the gap between Pixeltable’s built-in operations and your specific data processing needs, enabling you to create reusable components for transformations, analysis, and AI workflows.Pixeltable UDFs offer several key advantages:
Reusability: Define a function once and use it across multiple tables and operations
Type Safety: Strong typing ensures data compatibility throughout your workflows
Performance: Batch processing and caching capabilities optimize execution
Integration: Seamlessly combine custom code with Pixeltable’s query system
Flexibility: Process any data type including text, images, videos, and embeddings
UDFs can be as simple as a basic transformation or as complex as a multi-stage ML workflow. Pixeltable offers three types of custom functions to handle different scenarios:
Copy
Ask AI
import pixeltable as pxt# Basic UDF for text transformation@pxt.udfdef clean_text(text: str) -> str: """Clean and normalize text data.""" return text.lower().strip()# Use in a computed columndocuments = pxt.get_table('my_documents')documents.add_computed_column( clean_content=clean_text(documents.content))
# Defined directly in your code@pxt.udfdef extract_year(date_str: str) -> int: return int(date_str.split('-')[0])# Used immediatelytable.add_computed_column( year=extract_year(table.date))
Local UDFs are serialized with their columns. Changes to the UDF only affect new columns.
Copy
Ask AI
# In my_functions.py@pxt.udfdef clean_text(text: str) -> str: return text.strip().lower()# In your applicationfrom my_functions import clean_texttable.add_computed_column( clean_content=clean_text(table.content))
Module UDFs are referenced by path. Changes to the UDF affect all uses after reload.
Copy
Ask AI
from pixeltable.func import Batch@pxt.udf(batch_size=32)def process_batch(items: Batch[str]) -> Batch[str]: results = [] for item in items: results.append(item.upper()) return results# Used like a regular UDFtable.add_computed_column( processed=process_batch(table.text))
Batched UDFs process multiple rows at once for better performance.
@pxt.udf(batch_size=16)def embed_texts( texts: Batch[str]) -> Batch[pxt.Array]: # Process multiple texts at once return model.encode(texts)
Caching
Copy
Ask AI
@pxt.udfdef expensive_operation(text: str) -> str: # Cache model instance if not hasattr(expensive_operation, 'model'): expensive_operation.model = load_model() return expensive_operation.model(text)
Async Support
Copy
Ask AI
from typing import Literal, Union, Anyimport json@pxt.udfasync def chat_completions( messages: list, *, model: str, model_kwargs: dict | None = None,) -> dict: # Setup API request with proper context management result = await openai_client.chat_completions( messages=messages, model=model, model_kwargs=model_kwargs ) # Process response return json.loads(result.text)# Example usage in a computed columntable.add_computed_column( response=chat_completions( [ {'role': 'system', 'content': 'You are a helpful assistant.'}, {'role': 'user', 'content': table.prompt} ], model='gpt-4o-mini' ))
Async UDFs are specifically designed for handling external API calls, such as LLM calls, database queries, or web service interactions. They should not be used for general computation or data processing. They keep your Pixeltable workflows responsive by allowing background execution of time-consuming operations.
@pxt.udfdef validate_score(score: float) -> float: if not 0 <= score <= 100: raise ValueError("Score must be between 0 and 100") return score
Performance
Use batching for GPU operations
Cache expensive resources
Process data in chunks when possible
Copy
Ask AI
@pxt.udf(batch_size=32)def process_chunk(items: Batch[str]) -> Batch[str]: if not hasattr(process_chunk, 'model'): process_chunk.model = load_expensive_model() return process_chunk.model.process_batch(items)
Organization
Keep related UDFs in modules
Use clear, descriptive names
Document complex operations
Copy
Ask AI
@pxt.udfdef normalize_text( text: str, lowercase: bool = True, remove_punctuation: bool = True) -> str: """Normalize text by optionally lowercasing and removing punctuation.""" if lowercase: text = text.lower() if remove_punctuation: text = text.translate(str.maketrans("", "", string.punctuation)) return text
Table UDFs
Define clear input and output columns for your table UDFs
Implement cleanup routines for tables that grow large
Balance between too many small tables and monolithic tables
Use clear naming conventions for tables and their UDFs
Document the purpose and expected inputs for each table UDF
# Create a table with your workflowfinance_agent = pxt.create_table('directory.financial_analyst', {'prompt': pxt.String})# Add computed columns for processingfinance_agent.add_computed_column(/* ... */)
# Use like any other UDFresult_table.add_computed_column( result=finance_agent_udf(result_table.prompt))
Setup and Tools
Copy
Ask AI
import timeimport yfinance as yfimport pixeltable as pxtfrom pixeltable.functions.openai import chat_completions, invoke_toolsDIRECTORY = 'agent'OPENAI_MODEL = 'gpt-4o-mini'# Create Fresh Directorypxt.drop_dir(DIRECTORY, force=True)pxt.create_dir(DIRECTORY, if_exists='ignore')# yfinance tool for getting stock information@pxt.udfdef stock_info(ticker: str) -> dict | None: """Get stock info for a given ticker symbol.""" stock = yf.Ticker(ticker) return stock.info# Helper UDF to create a prompt with tool outputs@pxt.udfdef create_prompt(question: str, tool_outputs: list[dict]) -> str: return f""" QUESTION: {question} RESULTS: {tool_outputs} """
Step 1: Create Agent Table
Copy
Ask AI
# Create Financial Analyst Agent Tablefinance_agent = pxt.create_table( f'{DIRECTORY}.financial_analyst', {'prompt': pxt.String}, if_exists='ignore')# Prepare initial messages for LLMmessages = [{'role': 'user', 'content': finance_agent.prompt}]# Define available toolstools = pxt.tools(stock_info)# Get initial response with tool callsfinance_agent.add_computed_column( initial_response=chat_completions( model=OPENAI_MODEL, messages=messages, tools=tools, tool_choice=tools.choice(required=True) ))# Execute the requested toolsfinance_agent.add_computed_column( tool_output=invoke_tools(tools, finance_agent.initial_response))# Create prompt with tool resultsfinance_agent.add_computed_column( stock_response_prompt=create_prompt( finance_agent.prompt, finance_agent.tool_output ))# Generate final response using tool resultsfinal_messages = [ {'role': 'system', 'content': "Answer the user's question based on the results."}, {'role': 'user', 'content': finance_agent.stock_response_prompt},]finance_agent.add_computed_column( final_response=chat_completions( model=OPENAI_MODEL, messages=final_messages ))# Extract answer textfinance_agent.add_computed_column( answer=finance_agent.final_response.choices[0].message.content)
Step 2: Convert to UDF
Copy
Ask AI
# Convert the finance_agent table to a UDFfinance_agent_udf = pxt.udf( finance_agent, return_value=finance_agent.answer)
Step 3: Create Consumer Table
Copy
Ask AI
# Create a Portfolio Manager table that uses the finance agentportfolio_manager = pxt.create_table( f'{DIRECTORY}.portfolio_manager', {'prompt': pxt.String}, if_exists='ignore')# Add the finance agent UDF as a computed columnportfolio_manager.add_computed_column( result=finance_agent_udf(portfolio_manager.prompt))
Step 4: Test the Workflow
Copy
Ask AI
# Get the portfolio manager tableportfolio_manager = pxt.get_table(f'{DIRECTORY}.portfolio_manager')# Insert a test queryportfolio_manager.insert([ {'prompt': 'What is the price of NVDIA?'}])# View resultsresult = portfolio_manager.select(portfolio_manager.result).collect()print(result)
You can create a workflow of table UDFs to handle complex multi-stage processing:
Copy
Ask AI
# Create a chain of specialized agentsresearch_agent = pxt.udf(research_table, return_value=research_table.findings)analysis_agent = pxt.udf(analysis_table, return_value=analysis_table.insights)report_agent = pxt.udf(report_table, return_value=report_table.document)# Use them in sequenceworkflow.add_computed_column(research=research_agent(workflow.query))workflow.add_computed_column(analysis=analysis_agent(workflow.research))workflow.add_computed_column(report=report_agent(workflow.analysis))
Parallel Processing with Table UDFs
Execute multiple table UDFs in parallel and combine their results:
Copy
Ask AI
# Define specialized agents for different tasksstock_agent = pxt.udf(stock_table, return_value=stock_table.analysis)news_agent = pxt.udf(news_table, return_value=news_table.summary)sentiment_agent = pxt.udf(sentiment_table, return_value=sentiment_table.score)# Process in parallelportfolio.add_computed_column(stock_data=stock_agent(portfolio.ticker))portfolio.add_computed_column(news_data=news_agent(portfolio.ticker))portfolio.add_computed_column(sentiment=sentiment_agent(portfolio.ticker))# Combine resultsportfolio.add_computed_column(report=combine_insights( portfolio.stock_data, portfolio.news_data, portfolio.sentiment))
A retrieval UDF queries a table based on input parameters and returns matching rows:
Copy
Ask AI
import pixeltable as pxt# Create a knowledge base tablekb = pxt.create_table('app.knowledge_base', { 'topic': pxt.String, 'category': pxt.String, 'content': pxt.String})kb.insert([ {'topic': 'python', 'category': 'programming', 'content': 'Python is a high-level language...'}, {'topic': 'rust', 'category': 'programming', 'content': 'Rust is a systems language...'}, {'topic': 'photosynthesis', 'category': 'biology', 'content': 'Plants convert sunlight...'}])# Create a retrieval UDF from the tablelookup_kb = pxt.retrieval_udf( kb, name='search_knowledge_base', description='Search the knowledge base by topic and category', parameters=['topic', 'category'], # Columns to use as search parameters limit=5 # Max results to return)# Use it as a functionresults = lookup_kb(topic='python', category='programming')# Returns: [{'topic': 'python', 'category': 'programming', 'content': 'Python is...'}]
Columns to use as query parameters (defaults to all non-computed columns)
limit
Maximum number of rows to return
Retrieval UDFs are perfect for building knowledge bases that LLM agents can query. Combine them with embedding indexes for semantic search capabilities.