Building a Multimodal Chat Application

Learn how to build a production-ready chat application that can process and understand multiple types of media using Pixeltable.

See the complete example on GitHub.

Table Structure and Types

First, define your core data structure:

import pixeltable as pxt
from pixeltable.functions import openai
from pixeltable.functions.huggingface import sentence_transformer
from pixeltable.functions.video import extract_audio
from pixeltable.iterators import DocumentSplitter
from pixeltable.iterators.string import StringSplitter

# Create a directory to organize data (optional)
pxt.drop_dir("chatbot", force=True)
pxt.create_dir("chatbot")

# Define core tables
docs_table = pxt.create_table(
    "chatbot.documents",
    {
        "document": pxt.Document,  # For text documents
        "video": pxt.Video,        # For video files
        "audio": pxt.Audio,        # For audio files
        "question": pxt.String,    # User queries
    }
)

conversations = pxt.create_table(
    "chatbot.conversations",
    {
        "role": pxt.String,        # 'user' or 'assistant'
        "content": pxt.String,     # Message content
        "timestamp": pxt.Timestamp # Message timestamp
    }
)

Views and Chunking

Create specialized views for processing different media types:

# Document chunks view
chunks_view = pxt.create_view(
    "chatbot.chunks",
    docs_table,
    iterator=DocumentSplitter.create(
        document=docs_table.document,
        separators="sentence",
        metadata="title,heading,sourceline",
    )
)

# Video transcription chunks
transcription_chunks = pxt.create_view(
    "chatbot.transcription_chunks",
    docs_table,
    iterator=StringSplitter.create(
        text=docs_table.transcription_text,
        separators="sentence"
    )
)

# Audio transcription chunks
audio_chunks = pxt.create_view(
    "chatbot.audio_chunks",
    docs_table,
    iterator=StringSplitter.create(
        text=docs_table.audio_transcription_text,
        separators="sentence"
    )
)

User-Defined Functions (UDFs)

Define custom functions for processing:

@conversations.query
def get_chat_history():
    """Retrieve chat history in chronological order"""
    return conversations.order_by(
        conversations.timestamp
    ).select(
        role=conversations.role,
        content=conversations.content
    )

@pxt.udf
def create_messages(history: list[dict], prompt: str) -> list[dict]:
    """Create message list for chat completion"""
    messages = [{
        'role': 'system',
        'content': 'You are a helpful AI assistant maintaining conversation context.'
    }]
    
    # Add historical messages
    messages.extend({
        'role': msg['role'],
        'content': msg['content']
    } for msg in history)
    
    # Add current prompt
    messages.append({
        'role': 'user',
        'content': prompt
    })
    
    return messages

@pxt.udf
def create_prompt(
    doc_context: list[dict],
    video_context: list[dict],
    audio_context: list[dict],
    question: str
) -> str:
    """Create a unified prompt from multiple context sources"""
    context_parts = []
    
    if doc_context:
        context_parts.append(
            "Document Context:\n" + "\n\n".join(
                item["text"] for item in doc_context if item and "text" in item
            )
        )
    
    if video_context:
        context_parts.append(
            "Video Context:\n" + "\n\n".join(
                item["text"] for item in video_context if item and "text" in item
            )
        )
    
    if audio_context:
        context_parts.append(
            "Audio Context:\n" + "\n\n".join(
                item["text"] for item in audio_context if item and "text" in item
            )
        )
    
    full_context = "\n\n---\n\n".join(context_parts) if context_parts else "No relevant context found."
    return f"Context:\n{full_context}\n\nQuestion:\n{question}"

Search and Filtering

Set up semantic search capabilities:

# Add embedding indexes for semantic search
chunks_view.add_embedding_index(
    "text",
    string_embed=sentence_transformer.using(model_id="intfloat/e5-large-v2")
)

transcription_chunks.add_embedding_index(
    "text",
    string_embed=sentence_transformer.using(model_id="intfloat/e5-large-v2")
)

audio_chunks.add_embedding_index(
    "text",
    string_embed=sentence_transformer.using(model_id="intfloat/e5-large-v2")
)

# Define search queries
@chunks_view.query
def get_relevant_chunks(query_text: str):
    """Find relevant document chunks"""
    sim = chunks_view.text.similarity(query_text)
    return (
        chunks_view.order_by(sim, asc=False)
        .select(chunks_view.text, sim=sim)
        .limit(20)
    )

@transcription_chunks.query
def get_relevant_transcript_chunks(query_text: str):
    """Find relevant video transcript chunks"""
    sim = transcription_chunks.text.similarity(query_text)
    return (
        transcription_chunks.order_by(sim, asc=False)
        .select(transcription_chunks.text, sim=sim)
        .limit(20)
    )

@audio_chunks.query
def get_relevant_audio_chunks(query_text: str):
    """Find relevant audio transcript chunks"""
    sim = audio_chunks.text.similarity(query_text)
    return (
        audio_chunks.order_by(sim, asc=False)
        .select(audio_chunks.text, sim=sim)
        .limit(20)
    )

Computed Columns

Define your processing workflow with computed columns:

# Video processing workflow
docs_table.add_computed_column(
    audio_extract=extract_audio(docs_table.video, format="mp3")
)
docs_table.add_computed_column(
    transcription=openai.transcriptions(
        audio=docs_table.audio_extract,
        model="whisper-1"
    )
)
docs_table.add_computed_column(
    transcription_text=docs_table.transcription.text
)

# Audio processing workflow
docs_table.add_computed_column(
    audio_transcription=openai.transcriptions(
        audio=docs_table.audio,
        model="whisper-1"
    )
)
docs_table.add_computed_column(
    audio_transcription_text=docs_table.audio_transcription.text
)

# Chat processing workflow
docs_table.add_computed_column(
    context_doc=chunks_view.queries.get_relevant_chunks(docs_table.question)
)
docs_table.add_computed_column(
    context_video=transcription_chunks.queries.get_relevant_transcript_chunks(docs_table.question)
)
docs_table.add_computed_column(
    context_audio=audio_chunks.queries.get_relevant_audio_chunks(docs_table.question)
)
docs_table.add_computed_column(
    prompt=create_prompt(
        docs_table.context_doc,
        docs_table.context_video,
        docs_table.context_audio,
        docs_table.question
    )
)
docs_table.add_computed_column(
    chat_history=conversations.queries.get_chat_history()
)
docs_table.add_computed_column(
    messages=create_messages(
        docs_table.chat_history,
        docs_table.prompt
    )
)
docs_table.add_computed_column(
    response=openai.chat_completions(
        messages=docs_table.messages,
        model="gpt-4o-mini"
    )
)
docs_table.add_computed_column(
    answer=docs_table.response.choices[0].message.content
)

Usage Example

Here’s how to use the application:

import pixeltable as pxt
from datetime import datetime

# Connect to your app
docs_table = pxt.get_table("chatbot.documents")
conversations = pxt.get_table("chatbot.conversations")

# Add a document
docs_table.insert([{
    "document": "path/to/document.pdf"
}])

# Add a video
docs_table.insert([{
    "video": "path/to/video.mp4"
}])

# Ask a question
question = "What are the key points from all sources?"

# Store user message
conversations.insert([{
    "role": "user",
    "content": question,
    "timestamp": datetime.now()
}])

# Get answer
docs_table.insert([{"question": question}])
result = docs_table.select(docs_table.answer).collect()
answer = result["answer"][0]

# Store assistant response
conversations.insert([{
    "role": "assistant",
    "content": answer,
    "timestamp": datetime.now()
}])

# View conversation history
history = conversations.collect().to_pandas()
print(history)

Best Practices

Table Structure

  • Keep table schemas focused and specific
  • Use appropriate column types
  • Document schema dependencies

Computed Columns

  • Group related computations
  • Consider computation cost
  • Monitor workflow performance

UDFs

  • Keep functions single-purpose
  • Add clear documentation
  • Handle edge cases

Search & Filtering

  • Choose appropriate embedding models
  • Tune chunk sizes for your use case
  • Balance result count vs relevance

Additional Resources

Source Code

Find the complete implementation in our sample apps repository.