User-Defined Functions (UDFs) in Pixeltable allow you to extend the platform with custom Python code. They bridge the gap between Pixeltable’s built-in operations and your specific data processing needs, enabling you to create reusable components for transformations, analysis, and AI workflows.
Pixeltable UDFs offer several key advantages:
Reusability: Define a function once and use it across multiple tables and operations
Type Safety: Strong typing ensures data compatibility throughout your workflows
Performance: Batch processing and caching capabilities optimize execution
Integration: Seamlessly combine custom code with Pixeltable’s query system
Flexibility: Process any data type including text, images, videos, and embeddings
UDFs can be as simple as a basic transformation or as complex as a multi-stage ML workflow. Pixeltable offers three types of custom functions to handle different scenarios:
Copy
import pixeltable as pxt# Basic UDF for text transformation@pxt.udfdef clean_text(text: str) -> str: """Clean and normalize text data.""" return text.lower().strip()# Use in a computed columndocuments = pxt.get_table('my_documents')documents.add_computed_column( clean_content=clean_text(documents.content))
# Defined directly in your code@pxt.udfdef extract_year(date_str: str) -> int: return int(date_str.split('-')[0])# Used immediatelytable.add_computed_column( year=extract_year(table.date))
Local UDFs are serialized with their columns. Changes to the UDF only affect new columns.
Copy
# Defined directly in your code@pxt.udfdef extract_year(date_str: str) -> int: return int(date_str.split('-')[0])# Used immediatelytable.add_computed_column( year=extract_year(table.date))
Local UDFs are serialized with their columns. Changes to the UDF only affect new columns.
Copy
# In my_functions.py@pxt.udfdef clean_text(text: str) -> str: return text.strip().lower()# In your applicationfrom my_functions import clean_texttable.add_computed_column( clean_content=clean_text(table.content))
Module UDFs are referenced by path. Changes to the UDF affect all uses after reload.
Copy
from pixeltable.func import Batch@pxt.udf(batch_size=32)def process_batch(items: Batch[str]) -> Batch[str]: results = [] for item in items: results.append(item.upper()) return results# Used like a regular UDFtable.add_computed_column( processed=process_batch(table.text))
Batched UDFs process multiple rows at once for better performance.
@pxt.udf(batch_size=16)def embed_texts( texts: Batch[str]) -> Batch[pxt.Array]: # Process multiple texts at once return model.encode(texts)
Caching
Copy
@pxt.udfdef expensive_operation(text: str) -> str: # Cache model instance if not hasattr(expensive_operation, 'model'): expensive_operation.model = load_model() return expensive_operation.model(text)
Async Support
Copy
from typing import Optional, Literal, Union, Anyimport json@pxt.udfasync def chat_completions( messages: list, *, model: str, model_kwargs: Optional[dict] = None,) -> dict: # Setup API request with proper context management result = await openai_client.chat_completions( messages=messages, model=model, model_kwargs=model_kwargs ) # Process response return json.loads(result.text)# Example usage in a computed columntable.add_computed_column( response=chat_completions( [ {'role': 'system', 'content': 'You are a helpful assistant.'}, {'role': 'user', 'content': table.prompt} ], model='gpt-4o-mini' ))
Async UDFs are specifically designed for handling external API calls, such as LLM calls, database queries, or web service interactions. They should not be used for general computation or data processing. They keep your Pixeltable workflows responsive by allowing background execution of time-consuming operations.
@pxt.udfdef validate_score(score: float) -> float: if not 0 <= score <= 100: raise ValueError("Score must be between 0 and 100") return score
Performance
Use batching for GPU operations
Cache expensive resources
Process data in chunks when possible
Copy
@pxt.udf(batch_size=32)def process_chunk(items: Batch[str]) -> Batch[str]: if not hasattr(process_chunk, 'model'): process_chunk.model = load_expensive_model() return process_chunk.model.process_batch(items)
Organization
Keep related UDFs in modules
Use clear, descriptive names
Document complex operations
Copy
@pxt.udfdef normalize_text( text: str, lowercase: bool = True, remove_punctuation: bool = True) -> str: """Normalize text by optionally lowercasing and removing punctuation.""" if lowercase: text = text.lower() if remove_punctuation: text = text.translate(str.maketrans("", "", string.punctuation)) return text
Table UDFs
Define clear input and output columns for your table UDFs
Implement cleanup routines for tables that grow large
Balance between too many small tables and monolithic tables
Use clear naming conventions for tables and their UDFs
Document the purpose and expected inputs for each table UDF
# Create a table with your workflowfinance_agent = pxt.create_table('directory.financial_analyst', {'prompt': pxt.String})# Add computed columns for processingfinance_agent.add_computed_column(/* ... */)
# Create a table with your workflowfinance_agent = pxt.create_table('directory.financial_analyst', {'prompt': pxt.String})# Add computed columns for processingfinance_agent.add_computed_column(/* ... */)
# Use like any other UDFresult_table.add_computed_column( result=finance_agent_udf(result_table.prompt))
Setup and Tools
Copy
import timefrom typing import Optionalimport yfinance as yfimport pixeltable as pxtfrom pixeltable.functions.openai import chat_completions, invoke_toolsDIRECTORY = 'agent'OPENAI_MODEL = 'gpt-4o-mini'# Create Fresh Directorypxt.drop_dir(DIRECTORY, force=True)pxt.create_dir(DIRECTORY, if_exists='ignore')# yfinance tool for getting stock information@pxt.udfdef stock_info(ticker: str) -> Optional[dict]: """Get stock info for a given ticker symbol.""" stock = yf.Ticker(ticker) return stock.info# Helper UDF to create a prompt with tool outputs@pxt.udfdef create_prompt(question: str, tool_outputs: list[dict]) -> str: return f""" QUESTION: {question} RESULTS: {tool_outputs} """
Step 1: Create Agent Table
Copy
# Create Financial Analyst Agent Tablefinance_agent = pxt.create_table( f'{DIRECTORY}.financial_analyst', {'prompt': pxt.String}, if_exists='ignore')# Prepare initial messages for LLMmessages = [{'role': 'user', 'content': finance_agent.prompt}]# Define available toolstools = pxt.tools(stock_info)# Get initial response with tool callsfinance_agent.add_computed_column( initial_response=chat_completions( model=OPENAI_MODEL, messages=messages, tools=tools, tool_choice=tools.choice(required=True) ))# Execute the requested toolsfinance_agent.add_computed_column( tool_output=invoke_tools(tools, finance_agent.initial_response))# Create prompt with tool resultsfinance_agent.add_computed_column( stock_response_prompt=create_prompt( finance_agent.prompt, finance_agent.tool_output ))# Generate final response using tool resultsfinal_messages = [ {'role': 'system', 'content': "Answer the user's question based on the results."}, {'role': 'user', 'content': finance_agent.stock_response_prompt},]finance_agent.add_computed_column( final_response=chat_completions( model=OPENAI_MODEL, messages=final_messages ))# Extract answer textfinance_agent.add_computed_column( answer=finance_agent.final_response.choices[0].message.content)
Step 2: Convert to UDF
Copy
# Convert the finance_agent table to a UDFfinance_agent_udf = pxt.udf( finance_agent, return_value=finance_agent.answer)
Step 3: Create Consumer Table
Copy
# Create a Portfolio Manager table that uses the finance agentportfolio_manager = pxt.create_table( f'{DIRECTORY}.portfolio_manager', {'prompt': pxt.String}, if_exists='ignore')# Add the finance agent UDF as a computed columnportfolio_manager.add_computed_column( result=finance_agent_udf(portfolio_manager.prompt))
Step 4: Test the Workflow
Copy
# Get the portfolio manager tableportfolio_manager = pxt.get_table(f'{DIRECTORY}.portfolio_manager')# Insert a test queryportfolio_manager.insert([ {'prompt': 'What is the price of NVDIA?'}])# View resultsresult = portfolio_manager.select(portfolio_manager.result).collect()print(result)
You can create a workflow of table UDFs to handle complex multi-stage processing:
Copy
# Create a chain of specialized agentsresearch_agent = pxt.udf(research_table, return_value=research_table.findings)analysis_agent = pxt.udf(analysis_table, return_value=analysis_table.insights)report_agent = pxt.udf(report_table, return_value=report_table.document)# Use them in sequenceworkflow.add_computed_column(research=research_agent(workflow.query))workflow.add_computed_column(analysis=analysis_agent(workflow.research))workflow.add_computed_column(report=report_agent(workflow.analysis))
Parallel Processing with Table UDFs
Execute multiple table UDFs in parallel and combine their results:
Copy
# Define specialized agents for different tasksstock_agent = pxt.udf(stock_table, return_value=stock_table.analysis)news_agent = pxt.udf(news_table, return_value=news_table.summary)sentiment_agent = pxt.udf(sentiment_table, return_value=sentiment_table.score)# Process in parallelportfolio.add_computed_column(stock_data=stock_agent(portfolio.ticker))portfolio.add_computed_column(news_data=news_agent(portfolio.ticker))portfolio.add_computed_column(sentiment=sentiment_agent(portfolio.ticker))# Combine resultsportfolio.add_computed_column(report=combine_insights( portfolio.stock_data, portfolio.news_data, portfolio.sentiment))