Document Indexing and RAG
A hands-on guide to building a question-answering system (chatbot) on your documents.
Document Indexing and RAG
In this tutorial, we'll demonstrate how RAG operations can be implemented in Pixeltable. In particular, we'll develop a RAG application that summarizes a collection of PDF documents and uses ChatGPT to answer questions about them.
In a traditional RAG workflow, such operations might be implemented as a Python script that runs on a periodic schedule or in response to certain events. In Pixeltable, they are implemented as persistent tables that are updated automatically and incrementally as new data becomes available.
If you are running this tutorial in Colab:
In order to make the tutorial run a bit snappier, let's switch to a GPU-equipped instance for this Colab session. To do that, click on the Runtime -> Change runtime type
menu item at the top, then select the GPU
radio button and click on Save
.
We first set up our OpenAI API key:
import os
import getpass
if 'OPENAI_API_KEY' not in os.environ:
os.environ['OPENAI_API_KEY'] = getpass.getpass('OpenAI API Key:')
OpenAI API Key: Β·Β·Β·Β·Β·Β·Β·Β·
We then install the packages we need for this tutorial and then set up our environment.
%pip install -q pixeltable sentence-transformers tiktoken openai openpyxl
import numpy as np
import pixeltable as pxt
# Ensure a clean slate for the demo
pxt.drop_dir('rag_demo', force=True)
pxt.create_dir('rag_demo')
Connected to Pixeltable database at:
postgresql://postgres:@/pixeltable?host=/Users/asiegel/.pixeltable/pgdata
Created directory `rag_demo`.
<pixeltable.catalog.dir.Dir at 0x32381b520>
Next we'll create a table containing the sample questions we want to answer. The questions are stored in an Excel spreadsheet, along with a set of "ground truth" answers to help evaluate our model pipeline. We can use Pixeltable's handy import_excel()
utility to load them. Note that we can pass the URL of the spreadsheet directly to the import utility.
base = 'https://github.com/pixeltable/pixeltable/raw/release/docs/source/data/rag-demo/'
qa_url = base + 'Q-A-Rag.xlsx'
queries_t = pxt.io.import_excel('rag_demo.queries', qa_url)
Created table `queries`.
Inserting rows into `queries`: 8 rows [00:00, 4485.29 rows/s]
Inserted 8 rows with 0 errors.
queries_t.head()
S__No_ | Question | correct_answer |
---|---|---|
1 | What is roughly the current mortage rate? | 0.07 |
2 | What is the current dividend yield for Alphabet Inc. (\$GOOGL)? | 0.0046 |
3 | What is the market capitalization of Alphabet? | \$2182.8 Billion |
4 | What are the latest financial metrics for Accenture PLC? | missed consensus forecasts and strong total bookings rising by 22% annually |
5 | What is the overall latest rating for Amazon.com from analysts? | SELL |
6 | What is the operating cash flow of Amazon in Q1 2024? | 18,989 Million |
7 | What is the expected EPS for Nvidia in Q1 2026? | 0.73 EPS |
8 | What are the main reasons to buy Nvidia? | Datacenter, GPUs Demands, Self-driving, and cash-flow |
Outline
There are two major parts to our RAG application:
- Document Indexing: Load the documents, split them into chunks, and index them using a vector embedding.
- Querying: For each question on our list, do a top-k lookup for the most relevant chunks, use them to construct a ChatGPT prompt, and send the enriched prompt to an LLM.
We'll implement both parts in Pixeltable.
1. Document Indexing
All data in Pixeltable, including documents, resides in tables.
Tables are persistent containers that can serve as the store of record for your data. Since we are starting from scratch, we will start with an empty table rag_demo.documents
with a single column, document
.
documents_t = pxt.create_table(
'rag_demo.documents',
{'document': pxt.DocumentType()}
)
documents_t
Created table `documents`.
Column Name | Type | Computed With |
---|---|---|
document | document |
Next, we'll insert our first few source documents into the new table. We'll leave the rest for later, in order to show how to update the indexed document base incrementally.
document_urls = [
base + 'Argus-Market-Digest-June-2024.pdf',
base + 'Argus-Market-Watch-June-2024.pdf',
base + 'Company-Research-Alphabet.pdf',
base + 'Jefferson-Amazon.pdf',
base + 'Mclean-Equity-Alphabet.pdf',
base + 'Zacks-Nvidia-Repeport.pdf',
]
documents_t.insert({'document': url} for url in document_urls[:3])
documents_t.show()
Inserting rows into `documents`: 3 rows [00:00, 2290.30 rows/s]
Inserted 3 rows with 0 errors.
In RAG applications, we often decompose documents into smaller units, or chunks, rather than treating each document as a single entity. In this example, we'll use Pixeltable's built-in DocumentSplitter
, but in general the chunking methodology is highly customizable. DocumentSplitter
has a variety of options for controlling the chunking behavior, and it's also possible to replace it entirely with a user-defined iterator (or an adapter for a third-party document splitter).
In Pixeltable, operations such as chunking can be automated by creating views of the base documents
table. A view is a virtual derived table: rather than adding data directly to the view, we define it via a computation over the base table. In this example, the view is defined by iteration over the chunks of a DocumentSplitter
.
from pixeltable.iterators import DocumentSplitter
chunks_t = pxt.create_view(
'rag_demo.chunks',
documents_t,
iterator=DocumentSplitter.create(
document=documents_t.document,
separators='token_limit',
limit=300
)
)
Inserting rows into `chunks`: 41 rows [00:00, 10517.83 rows/s]
Created view `chunks` with 41 rows, 0 exceptions.
Our chunks
view now has 3 columns:
chunks_t
Column Name | Type | Computed With |
---|---|---|
pos | int | |
text | string | |
document | document |
text
is the chunk text produced by theDocumentSplitter
pos
is a system-generated integer column, starting at 0, that provides a sequence number for each rowdocument
, which is simply thedocument
column from the base tabledocuments
. We won't need it here, but having access to the base table's columns (in effect a parent-child join) can be quite useful.
Notice that as soon as we created it, chunks
was automatically populated with data from the existing documents in our base table. We can select the first 2 chunks from each document using common dataframe operations, in order to get a feel for what was extracted:
chunks_t.where(chunks_t.pos < 2).show()
Now let's compute vector embeddings for the document chunks and store them in a vector index. Pixeltable has built-in support for vector indexing using a variety of embedding model families, and it's easy for users to add new ones via UDFs. In this demo, we're going to use the E5 model from the Huggingface sentence_transformers
library, which runs locally.
The following command creates a vector index on the text
column in the chunks
table, using the E5 embedding model. Note that defining the index is sufficient in order to load it with the existing data (and also to update it when the underlying data changes, as we'll see later).
from pixeltable.functions.huggingface import sentence_transformer
@pxt.expr_udf
def e5_embed(text: str) -> np.ndarray:
return sentence_transformer(text, model_id='intfloat/e5-large-v2')
chunks_t.add_embedding_index('text', string_embed=e5_embed)
Computing cells: 100%|ββββββββββββββββββββββββββββββββββββββββββ| 41/41 [00:03<00:00, 12.02 cells/s]
This completes the first part of our application, creating an indexed document base. Next, we'll use it to run some queries.
2. Querying
In order to express a top-k lookup against our index, we use Pixeltable's similarity
operator in combination with the standard order_by
and limit
operations. Before building this into our application, let's run a sample query to make sure it works.
query_text = "What is the expected EPS for Nvidia in Q1 2026?"
sim = chunks_t.text.similarity(query_text)
nvidia_eps_query = (
chunks_t
.order_by(sim, asc=False)
.select(similarity=sim, text=chunks_t.text)
.limit(5)
)
nvidia_eps_query.collect()
We perform this context retrieval for each row of our queries
table by adding it as a computed column. In this case, the operation is a top-k similarity lookup against the data in the chunks
table. To implement this operation, we'll use Pixeltable's @query
decorator to enhance the capabilities of the chunks
table.
# A @query is essentially a reusable, parameterized query that is attached to a table (or view),
# which is a modular way of getting data from that table.
@chunks_t.query
def top_k(query_text: str):
sim = chunks_t.text.similarity(query_text)
return (
chunks_t.order_by(sim, asc=False)
.select(chunks_t.text, sim=sim)
.limit(5)
)
# Now add a computed column to `queries_t`, calling the stored query
# `chunks_t.top_k` that we just defined.
queries_t['question_context'] = chunks_t.top_k(queries_t.Question)
Computing cells: 100%|ββββββββββββββββββββββββββββββββββββββββββββ| 8/8 [00:04<00:00, 1.85 cells/s]
Added 8 column values with 0 errors.
Our queries
table now looks like this:
queries_t
Column Name | Type | Computed With |
---|---|---|
S__No_ | int | |
Question | string | |
correct_answer | string | |
question_context | json | top_k(Question) |
The new column question_context
now contains the result of executing the query for each row, formatted as a list of dictionaries:
queries_t.select(queries_t.question_context).head(1)
question_context |
---|
[{"sim": 0.795, "text": " that simply hasn't happened, and current sentiment \nreflects pent-up frustration with the overall lack of purchase affordability.\" \nBased on the ...... .5% for April. The Zillow Home Value index rose by 4.3% in April and \n4.3% in May. High mortgage rates are a challenge, but we remain bullish on \n"}, {"sim": 0.794, "text": "\n37058.23\n5473.17\n4831.39\n17721.59\n15160.55\n1.48\n1.37\n2.07\n1.86\nCURRENT\nRANKING\nFive-Day Put/Call:\nMomentum:\nBullish Sentiment:\nMutual Fund Cash:\n ...... \nEconomist \nat \nFannie \nMae, \nsaid \n\"While \nmany \nrespondents expressed optimism at the beginning of the year that mortgage \nrates would decline,"}, {"sim": 0.779, "text": "+11.78%\nEnterprise Value\n\$2103.1 B\n6/20/2024\nEx. Dividend Date\n6/10/2024\nDividend\n\$0.200000\nDividend Yield (Annualized)\n0.45%\n6/20/2024\nP/E (TTM)\n ...... e\nTrading Characteristics\n52 Week High\n6/12/2024\n\$180.41\n52 Week Low\n7/11/2023\n\$115.35\n% Price Above/Below\n 20-Day Average\n2.4\n 50-Day Average"}, {"sim": 0.77, "text": ", duplication, redistribution or disclosure is prohibited by law and can result in prosecution. The content of this report \nmay be derived from Ar ...... no liability for any loss arising from the use of this report, nor shall Argus treat all recipients of this report as \ncustomers simply by virtue"}, {"sim": 0.768, "text": " \$11.97-\$12.20; the new guidance implies growth of 2%-3% from FY23. That is down from 3%-5% growth\nguidance offered in March 2024.\nWe are reducing ...... h flow from operations was \$9.52 billion in FY23, \$9.54 billion in FY22, \$8.98 billion in FY21, \$8.36 billion in FY20,\nand \$6.63 billion in FY19.\n"}] |
Asking the LLM
Now it's time for the final step in our application: feeding the document chunks and questions to an LLM for resolution. In this demo, we'll use OpenAI for this, but any other inference cloud or local model could be used instead.
We start by defining a UDF that takes a top-k list of context chunks and a question and turns them into a ChatGPT prompt.
# Define a UDF to create an LLM prompt given a top-k list of
# context chunks and a question.
@pxt.udf
def create_prompt(top_k_list: list[dict], question: str) -> str:
concat_top_k = '\n\n'.join(
elt['text'] for elt in reversed(top_k_list)
)
return f'''
PASSAGES:
{concat_top_k}
QUESTION:
{question}'''
We then add that again as a computed column to queries
:
queries_t['prompt'] = create_prompt(
queries_t.question_context, queries_t.Question
)
Computing cells: 100%|βββββββββββββββββββββββββββββββββββββββββββ| 8/8 [00:00<00:00, 498.68 cells/s]
Added 8 column values with 0 errors.
We now have a new string column containing the prompt:
queries_t
Column Name | Type | Computed With |
---|---|---|
S__No_ | int | |
Question | string | |
correct_answer | string | |
question_context | json | top_k(Question) |
prompt | string | create_prompt(question_context, Question) |
queries_t.select(queries_t.prompt).head(1)
We now add another computed column to call OpenAI. For the chat_completions()
call, we need to construct two messages, containing the instructions to the model and the prompt. For the latter, we can simply reference the prompt
column we just added.
from pixeltable.functions import openai
# Assemble the prompt and instructions into OpenAI's message format
messages = [
{
'role': 'system',
'content': 'Please read the following passages and answer the question based on their contents.'
},
{
'role': 'user',
'content': queries_t.prompt
}
]
# Add a computed column that calls OpenAI
queries_t['response'] = openai.chat_completions(
model='gpt-4o-mini', messages=messages
)
Computing cells: 100%|ββββββββββββββββββββββββββββββββββββββββββββ| 8/8 [00:09<00:00, 1.17s/ cells]
Added 8 column values with 0 errors.
Our queries
table now contains a JSON-structured column response
, which holds the entire API response structure. At the moment, we're only interested in the response content, which we can extract easily into another computed column:
queries_t['answer'] = queries_t.response.choices[0].message.content
Computing cells: 100%|βββββββββββββββββββββββββββββββββββββββββββ| 8/8 [00:00<00:00, 618.75 cells/s]
Added 8 column values with 0 errors.
We now have the following queries
schema:
queries_t
Column Name | Type | Computed With |
---|---|---|
S__No_ | int | |
Question | string | |
correct_answer | string | |
question_context | json | top_k(Question) |
prompt | string | create_prompt(question_context, Question) |
response | json | chat_completions([{'role': 'system', 'content': 'Please read the following passages and answer the question based on their contents.'}, {'role': 'user', 'content': prompt}], model='gpt-4o-mini') |
answer | json | response.choices[0].message.content |
Let's take a look at what we got back:
queries_t.select(queries_t.Question, queries_t.correct_answer, queries_t.answer).show()
The application works, but, as expected, a few questions couldn't be answered due to the missing documents. As a final step, let's add the remaining documents to our document base, and run the queries again.
Incremental Updates
Pixeltable's views and computed columns update automatically in response to new data. We can see this when we add the remaining documents to our documents
table. Watch how the chunks
view is updated to stay in sync with documents
:
documents_t.insert({'document': p} for p in document_urls[3:])
Inserting rows into `documents`: 3 rows [00:00, 1808.41 rows/s]
Inserting rows into `chunks`: 68 rows [00:00, 727.28 rows/s]
Inserted 71 rows with 0 errors.
UpdateStatus(num_rows=71, num_computed_values=0, num_excs=0, updated_cols=[], cols_with_excs=[])
(Note: although Pixeltable updates documents
and chunks
, it does not automatically update the queries
table. This is by design: we don't want all rows in queries
to get automatically re-executed every time a single new document is added to the document base. However, newly-added rows will be run over the new, incrementally-updated index.)
To confirm that the chunks
index got updated, we'll re-run the chunks retrieval query for the question
What is the expected EPS for Nvidia in Q1 2026?
Previously, our most similar chunk had a similarity score of ~0.81. Let's see what we get now:
nvidia_eps_query.collect()
Our most similar chunk now has a score of ~0.86 and pulls in more relevant chunks from the newly-inserted documents.
Let's force all the queries to run again by removing them from the queries
table and re-inserting them as new rows. Note how the structure of the tables and computed columns is preserved, even after we delete the existing data.
questions = list(
queries_t.select(
queries_t.S__No_,
queries_t.Question,
queries_t.correct_answer
).collect()
)
queries_t.delete()
queries_t.insert(questions)
Computing cells: 100%|ββββββββββββββββββββββββββββββββββββββββββ| 56/56 [00:13<00:00, 4.26 cells/s]
Inserting rows into `queries`: 8 rows [00:00, 658.76 rows/s]
Computing cells: 100%|ββββββββββββββββββββββββββββββββββββββββββ| 56/56 [00:13<00:00, 4.25 cells/s]
Inserted 8 rows with 0 errors.
UpdateStatus(num_rows=8, num_computed_values=56, num_excs=0, updated_cols=[], cols_with_excs=[])
As a final step, let's confirm that all the queries now have answers:
queries_t.select(
queries_t.Question,
queries_t.correct_answer,
queries_t.answer
).show()
Updated 7 days ago