Skip to main content

Logging

  • Implement Python logging in UDFs and application endpoints
  • Track execution time, errors, API call latency
  • Use structured logging (JSON) for log aggregation
import logging
import time
import pixeltable as pxt

logger = logging.getLogger(__name__)

@pxt.udf
def process_video(video: pxt.Video) -> pxt.Json:
    start = time.time()
    try:
        # Your processing logic here
        result = {'processed': True}
        logger.info(f"Processed in {time.time() - start:.2f}s")
        return result
    except Exception as e:
        logger.error(f"Processing failed: {e}")
        raise

Resource Monitoring

  • Monitor CPU, RAM, Disk I/O, Network on Pixeltable host
  • Track UDF execution time and model inference latency
  • Alert on resource exhaustion
Key Metrics to Track:
MetricWhat to Watch
CPUSustained high usage during inference
MemoryGrowth over time (potential leaks)
Disk I/OBottlenecks during media processing
NetworkAPI call latency to external services

Optimization

Batch Operations

Use batch processing for better throughput:
# Batch UDF execution for GPU models
@pxt.udf(batch_size=32)
def embed_batch(texts: pxt.Batch[str]) -> pxt.Batch[list[float]]:
    # Process multiple items at once
    return model.encode(texts)

# Batch inserts (more efficient than individual inserts)
table.insert([row1, row2, row3, ...])

Performance Tips

  • Batch Operations: Use @pxt.udf(batch_size=32) for GPU model inference
  • Batch Inserts: Insert multiple rows at once: table.insert([row1, row2, ...])
  • Profile UDFs: Add execution time logging to identify bottlenecks
  • Embedding Indexes: Use pgvector for efficient similarity search

Rate Limiting

Built-In Provider Limits

Automatic rate limiting for OpenAI, Anthropic, Gemini, etc. is configured per-model in config.toml:
# ~/.pixeltable/config.toml
[openai]
requests_per_minute = 500
tokens_per_minute = 90000

Custom API Rate Limiting

Use resource_pool to throttle calls to self-hosted models or custom endpoints:
# Default: 600 requests per minute
@pxt.udf(resource_pool='request-rate:my_service')
async def call_custom_api(prompt: str) -> dict:
    # Your logic to call custom endpoint
    return await custom_api_call(prompt)

# Example: Custom rate-limited UDF for self-hosted model
@pxt.udf(resource_pool='request-rate:my_ray_cluster')
async def call_ray_model(prompt: str, model: str) -> dict:
    # Your logic to call FastAPI + Ray cluster
    return await custom_api_call(prompt, model)

Advanced Features