RAG Tutorial #
In this tutorial, we will build a generative question-answering pipeline using retrieval-augmented generation (RAG) in less than 100 lines of Python code.
We will be using Haystack, an open-source framework for building production-ready LLM applications, retrieval-augmented generative pipelines and state-of-the-art search systems that work intelligently over large document collections.
For this tutorial, you will need Python 3.10 installed on your system.
Install Haystack and necessary dependencies #
1pip install haystack-ai
2pip install markdown-it-py mdit_plain pypdf
3pip install gdown
4pip install pdfminer.six
Import the necessary modules #
1# coding: utf-8
2
3from pprint import pprint
4from haystack import Pipeline
5from haystack.components.writers import DocumentWriter
6from haystack.components.converters import TextFileToDocument
7from haystack.components.preprocessors import DocumentSplitter, DocumentCleaner
8from haystack.components.routers import FileTypeRouter
9from haystack.components.joiners import DocumentJoiner
10from haystack.components.embedders import OpenAIDocumentEmbedder, OpenAITextEmbedder
11from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever
12from haystack.components.builders import PromptBuilder
13from haystack.components.generators import OpenAIGenerator
14from haystack.utils import Secret
15from haystack.document_stores.in_memory import InMemoryDocumentStore
Sample data #
In this example, we will be using a sample documentation.txt
file which contains a basic overview of an imaginary solution: DataSyncX.
The file can be downloaded here.
Initialize the documentStore #
Set up a DocumentStore
to organize your documents. A DocumentStore
holds the documents that the question-answering system relies on to retrieve answers to your queries. In this guide, for simplicity, you'll be working with the InMemoryDocumentStore
.
In more advanced setups, a dedicated Vectore Database such as Milvus is recommended.
1document_store = InMemoryDocumentStore()
Create a pipeline to index documents #
Create a pipeline to index documents, here we will only allow the text/plain
media type.
Define objects #
1file_type_router = FileTypeRouter(mime_types=["text/plain"])
2text_file_converter = TextFileToDocument()
3document_joiner = DocumentJoiner()
4document_cleaner = DocumentCleaner()
5document_splitter = DocumentSplitter(
6 split_by="word", split_length=150, split_overlap=50)
7document_writer = DocumentWriter(document_store)
Define the embedder #
Here will call the Ektos AI API directly to generate embeddings using of the gte-multilingual-base
embedding model.
1document_embedder = OpenAIDocumentEmbedder(
2 api_base_url="https://api.ektos.ai/v1/",
3 api_key=Secret.from_token("YOUR_EKTOS_API_KEY_HERE"),
4 # api_key=Secret.from_token(os.environ.get("EKTOS_API_KEY")),
5 model="gte-multilingual-base")
Assemble the pipeline #
1preprocessing_pipeline = Pipeline()
2preprocessing_pipeline.add_component(instance=file_type_router, name="file_type_router")
3preprocessing_pipeline.add_component(
4 instance=text_file_converter, name="text_file_converter")
5preprocessing_pipeline.add_component(instance=document_joiner, name="document_joiner")
6preprocessing_pipeline.add_component(instance=document_cleaner, name="document_cleaner")
7preprocessing_pipeline.add_component(
8 instance=document_splitter, name="document_splitter")
9preprocessing_pipeline.add_component(
10 instance=document_embedder, name="document_embedder")
11preprocessing_pipeline.add_component(instance=document_writer, name="document_writer")
12
13preprocessing_pipeline.connect(
14 "file_type_router.text/plain", "text_file_converter.sources")
15preprocessing_pipeline.connect("text_file_converter", "document_joiner")
16preprocessing_pipeline.connect("document_joiner", "document_cleaner")
17preprocessing_pipeline.connect("document_cleaner", "document_splitter")
18preprocessing_pipeline.connect("document_splitter", "document_embedder")
19preprocessing_pipeline.connect("document_embedder", "document_writer")
Add the documents to process #
1preprocessing_pipeline.run(
2 {"file_type_router": {"sources": ["./documentation.txt"]}})
You can already run the code to ensure the embeddings are generated properly and saved in the in-memory vectore database.
1$ python rag.py
2Calculating embeddings: 100%|███████████████████| 1/1 [00:00<00:00, 4.78it/s]
Create a pipeline for generation #
Define a template prompt #
Design a custom prompt for a generative question-answering task using the RAG method. The prompt should accept two inputs: documents, retrieved from a document store, and a user-submitted question. Utilize Jinja2's looping syntax to merge the contents of the retrieved documents into the prompt.
1template = """
2Answer the questions based on the given context.
3Do not add anything else after the answer.
4
5Context:
6{% for document in documents %}
7 {{ document.content }}
8{% endfor %}
9
10Question: {{ question }}
11Answer:
12"""
Define the prompt builder #
Then, create a PromptBuilder instance using your prompt template. The PromptBuilder will automatically insert the required values when provided, generating a full prompt. This method enables a more customized and efficient question-answering experience.
1prompt_buidler = PromptBuilder(template=template)
Define pipeline components: text embedder, retriever and LLM generator #
Here will call the Ektos AI API again to generate embeddings using the gte-multilingual-base
embedding model and the phi-3.5-mini-instruct
LLM for generation.
1text_embedder = OpenAITextEmbedder(
2 api_base_url="https://api.ektos.ai/v1/",
3 api_key=Secret.from_token("YOUR_EKTOS_API_KEY_HERE"),
4 # api_key=Secret.from_token(os.environ.get("EKTOS_API_KEY")),
5 model="gte-multilingual-base")
6retriever = InMemoryEmbeddingRetriever(document_store=document_store)
7llm_generator = OpenAIGenerator(
8 api_base_url="https://api.ektos.ai/v1/",
9 api_key=Secret.from_token("YOUR_EKTOS_API_KEY_HERE"),
10 # api_key=Secret.from_token(os.environ.get("EKTOS_API_KEY")),
11 model="phi-3.5-mini-instruct")
Assemble the pipeline #
1pipe = Pipeline()
2pipe.add_component("embedder", text_embedder)
3pipe.add_component("retriever", retriever)
4pipe.add_component("prompt_builder", prompt_buidler)
5pipe.add_component("llm", llm_generator)
6pipe.connect("embedder.embedding", "retriever.query_embedding")
7pipe.connect("retriever", "prompt_builder.documents")
8pipe.connect("prompt_builder", "llm")
Ask a question #
1question = (
2 "What is the phone number to contact DataSyncX support?"
3)
4
5pprint(pipe.run(
6 {
7 "embedder": {"text": question},
8 "prompt_builder": {"question": question},
9 }
10))
Executing the code #
When running the script, the following will happen sequentially:
- Indexing: The data (here
documentation.txt
) is converted to embeddings and stored in an in-memory vector database. - Retrieval: Given a user query that is converted to embeddings, a retriever is called to select the most relevant documents to augment the future query to the LLM.
- Augmentation: The query to be sent to the LLM is augmented with the retrieved context in addition to the original question.
- Generation: The LLM is called and generates output based on both the question and the additional context (both provided in the same prompt)
1$ python rag.py
2Calculating embeddings: 100%|██████████████████████████████████| 1/1 [00:00<00:00, 4.82it/s]
3{'embedder': {'meta': {'model': 'gte-multilingual-base',
4 'usage': {'completion_tokens': 0,
5 'prompt_tokens': 54,
6 'total_tokens': 54}}},
7 'llm': {'meta': [{'finish_reason': 'stop',
8 'index': 0,
9 'model': 'phi-3.5-mini-instruct',
10 'usage': {'completion_tokens': 16,
11 'completion_tokens_details': None,
12 'prompt_tokens': 1780,
13 'total_tokens': 1796}}],
14 'replies': [' +1-800-555-0123']}}
The LLM responded with the correct response: +1-800-555-0123
which was originally in the documentation.txt
file.
Notice the number of prompt_tokens
(1780), this indicates is due to the added context in addition to the question.
Full code #
1# coding: utf-8
2
3from pprint import pprint
4from haystack import Pipeline
5from haystack.components.writers import DocumentWriter
6from haystack.components.converters import TextFileToDocument
7from haystack.components.preprocessors import DocumentSplitter, DocumentCleaner
8from haystack.components.routers import FileTypeRouter
9from haystack.components.joiners import DocumentJoiner
10from haystack.components.embedders import OpenAIDocumentEmbedder, OpenAITextEmbedder
11from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever
12from haystack.components.builders import PromptBuilder
13from haystack.components.generators import OpenAIGenerator
14from haystack.utils import Secret
15from haystack.document_stores.in_memory import InMemoryDocumentStore
16
17document_store = InMemoryDocumentStore()
18file_type_router = FileTypeRouter(mime_types=["text/plain"])
19text_file_converter = TextFileToDocument()
20document_joiner = DocumentJoiner()
21document_cleaner = DocumentCleaner()
22document_splitter = DocumentSplitter(
23 split_by="word", split_length=150, split_overlap=50)
24document_writer = DocumentWriter(document_store)
25document_embedder = OpenAIDocumentEmbedder(
26 api_base_url="https://api.ektos.ai/v1/",
27 api_key=Secret.from_token("YOUR_EKTOS_API_KEY_HERE"),
28 # api_key=Secret.from_token(os.environ.get("EKTOS_API_KEY")),
29 model="gte-multilingual-base")
30
31preprocessing_pipeline = Pipeline()
32preprocessing_pipeline.add_component(instance=file_type_router, name="file_type_router")
33preprocessing_pipeline.add_component(
34 instance=text_file_converter, name="text_file_converter")
35preprocessing_pipeline.add_component(instance=document_joiner, name="document_joiner")
36preprocessing_pipeline.add_component(instance=document_cleaner, name="document_cleaner")
37preprocessing_pipeline.add_component(
38 instance=document_splitter, name="document_splitter")
39preprocessing_pipeline.add_component(
40 instance=document_embedder, name="document_embedder")
41preprocessing_pipeline.add_component(instance=document_writer, name="document_writer")
42
43preprocessing_pipeline.connect(
44 "file_type_router.text/plain", "text_file_converter.sources")
45preprocessing_pipeline.connect("text_file_converter", "document_joiner")
46preprocessing_pipeline.connect("document_joiner", "document_cleaner")
47preprocessing_pipeline.connect("document_cleaner", "document_splitter")
48preprocessing_pipeline.connect("document_splitter", "document_embedder")
49preprocessing_pipeline.connect("document_embedder", "document_writer")
50
51
52preprocessing_pipeline.run(
53 {"file_type_router": {"sources": ["./documentation.txt"]}})
54
55
56template = """
57Answer the questions based on the given context.
58Do not add anything else after the answer.
59
60Context:
61{% for document in documents %}
62 {{ document.content }}
63{% endfor %}
64
65Question: {{ question }}
66Answer:
67"""
68prompt_buidler = PromptBuilder(template=template)
69text_embedder = OpenAITextEmbedder(
70 api_base_url="https://api.ektos.ai/v1/",
71 api_key=Secret.from_token("YOUR_EKTOS_API_KEY_HERE"),
72 # api_key=Secret.from_token(os.environ.get("EKTOS_API_KEY")),
73 model="gte-multilingual-base")
74retriever = InMemoryEmbeddingRetriever(document_store=document_store)
75llm_generator = OpenAIGenerator(
76 api_base_url="https://api.ektos.ai/v1/",
77 api_key=Secret.from_token("YOUR_EKTOS_API_KEY_HERE"),
78 # api_key=Secret.from_token(os.environ.get("EKTOS_API_KEY")),
79 model="phi-3.5-mini-instruct")
80
81pipe = Pipeline()
82pipe.add_component("embedder", text_embedder)
83pipe.add_component("retriever", retriever)
84pipe.add_component("prompt_builder", prompt_buidler)
85pipe.add_component("llm", llm_generator)
86pipe.connect("embedder.embedding", "retriever.query_embedding")
87pipe.connect("retriever", "prompt_builder.documents")
88pipe.connect("prompt_builder", "llm")
89
90question = (
91 "What is the phone number to contact DataSyncX support?"
92)
93
94pprint(pipe.run(
95 {
96 "embedder": {"text": question},
97 "prompt_builder": {"question": question},
98 }
99))