Open In Colab

Pathway Retriever#

Pathway is an open data processing framework. It allows you to easily develop data transformation pipelines and Machine Learning applications that work with live data sources and changing data.

This notebook demonstrates how to set up a live data indexing pipeline. You can query the results of this pipeline from your LLM application using the provided PathwayRetriever. However, under the hood, Pathway updates the index on each data change giving you always up-to-date answers.

In this notebook, we will use a simple document processing pipeline that:

  1. Monitors several data sources (files, S3 folders, cloud storage) for data changes.

  2. Parses, splits and embeds the documents using Llama-index methods.

  3. Builds a vector index for the data.

We will connect to the index using llama_index.retrievers.PathwayRetriever retriever, which implements the retrieve interface.

The basic pipeline described in this document allows to effortlessly build a simple index of files stored in a cloud location. However, Pathway provides everything needed to build realtime data pipelines and apps, including SQL-like able operations such as groupby-reductions and joins between disparate data sources, time-based grouping and windowing of data, and a wide array of connectors.

For more details about Pathway data ingestion pipeline and vector store, visit vector store pipeline.

Prerequisites#

Install pathway and llama-index packages. Then download sample data.

%pip install llama-index-embeddings-openai
%pip install llama-index-retrievers-pathway
!pip install pathway
!pip install llama-index

!mkdir -p 'data/'
!wget 'https://gist.githubusercontent.com/janchorowski/dd22a293f3d99d1b726eedc7d46d2fc0/raw/pathway_readme.md' -O 'data/pathway_readme.md'

Set up your OpenAI API key.

import getpass
import os

# omit if embedder of choice is not OpenAI
if "OPENAI_API_KEY" not in os.environ:
    os.environ["OPENAI_API_KEY"] = getpass.getpass("OpenAI API Key:")
OpenAI API Key: ········

Configure logging

import logging
import sys

logging.basicConfig(stream=sys.stdout, level=logging.ERROR)
logging.getLogger().addHandler(logging.StreamHandler(stream=sys.stdout))

Define data sources tracked by Pathway#

Pathway can listen to many sources simultaneously, such as local files, S3 folders, cloud storage and any data stream for data changes.

See pathway-io for more information.

import pathway as pw

data_sources = []
data_sources.append(
    pw.io.fs.read(
        "./data",
        format="binary",
        mode="streaming",
        with_metadata=True,
    )  # This creates a `pathway` connector that tracks
    # all the files in the ./data directory
)

# This creates a connector that tracks files in Google drive.
# please follow the instructions at https://pathway.com/developers/tutorials/connectors/gdrive-connector/ to get credentials
# data_sources.append(
#     pw.io.gdrive.read(object_id="17H4YpBOAKQzEJ93xmC2z170l0bP2npMy", service_user_credentials_file="credentials.json", with_metadata=True))

Create the document indexing pipeline#

Let us create the document indexing pipeline. The transformations should be a list of TransformComponents ending with an Embedding transformation.

In this example, let’s first split the text first using TokenTextSplitter, then embed with OpenAIEmbedding.

from llama_index.core.retrievers import PathwayVectorServer
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.node_parser import TokenTextSplitter

embed_model = OpenAIEmbedding(embed_batch_size=10)

transformations_example = [
    TokenTextSplitter(
        chunk_size=150,
        chunk_overlap=10,
        separator=" ",
    ),
    embed_model,
]

processing_pipeline = PathwayVectorServer(
    *data_sources,
    transformations=transformations_example,
)

# Define the Host and port that Pathway will be on
PATHWAY_HOST = "127.0.0.1"
PATHWAY_PORT = 8754

# `threaded` runs pathway in detached mode, we have to set it to False when running from terminal or container
# for more information on `with_cache` check out https://pathway.com/developers/api-docs/persistence-api
processing_pipeline.run_server(
    host=PATHWAY_HOST, port=PATHWAY_PORT, with_cache=False, threaded=True
)
<Thread(Thread-5 (run), started 140158811412032)>
======== Running on http://127.0.0.1:8754 ========
(Press CTRL+C to quit)
148

Create Retriever for llama-index#

from llama_index.retrievers.pathway import PathwayRetriever

retriever = PathwayRetriever(host=PATHWAY_HOST, port=PATHWAY_PORT)
retriever.retrieve(str_or_query_bundle="what is pathway")
[NodeWithScore(node=TextNode(id_='7f507732-bf58-4a45-948c-dc6bb762812e', embedding=None, metadata={'created_at': None, 'modified_at': 1703700445, 'owner': 'janek', 'path': '/home/janek/projects/llama_index/docs/examples/retrievers/data/pathway_readme.md'}, excluded_embed_metadata_keys=[], excluded_llm_metadata_keys=[], relationships={}, hash='28f5c0d0e8cae502a0c4d7a72947449c6297f56196fa5c830ebb889026e4ccb9', text="If you want to do streaming in Python, build an AI data pipeline, or if you are looking for your next Python data processing framework, keep reading.\n\nPathway provides a high-level programming interface in Python for defining data transformations, aggregations, and other operations on data streams.\nWith Pathway, you can effortlessly design and deploy sophisticated data workflows that efficiently handle high volumes of data in real time.\n\nPathway is interoperable with various data sources and sinks such as Kafka, CSV files, SQL/noSQL databases, and REST API's, allowing you to connect and process data from different storage systems.\n\nTypical use-cases of Pathway include realtime data processing, ETL (Extract, Transform, Load) pipelines, data analytics,", start_char_idx=None, end_char_idx=None, text_template='{metadata_str}\n\n{content}', metadata_template='{key}: {value}', metadata_seperator='\n'), score=0.16584277747494724),
 NodeWithScore(node=TextNode(id_='274b0481-dedd-41a8-8d94-d1319cd4bc2f', embedding=None, metadata={'created_at': None, 'modified_at': 1703700445, 'owner': 'janek', 'path': '/home/janek/projects/llama_index/docs/examples/retrievers/data/pathway_readme.md'}, excluded_embed_metadata_keys=[], excluded_llm_metadata_keys=[], relationships={}, hash='1c57f8f0d152933320cca745a42239f5ef7c70966c1e5b52fc83dfd515243012', text="Pathway is an open framework for high-throughput and low-latency real-time data processing. It is used to create Python code which seamlessly combines batch processing, streaming, and real-time API's for LLM apps. Pathway's distributed runtime (🦀-🐍) provides fresh results of your data pipelines whenever new inputs and requests are received.\n\nIn the first place, Pathway was designed to be a life-saver (or at least a time-saver) for Python developers and ML/AI engineers faced with live data sources, where you need to react quickly to fresh data. Still, Pathway is a powerful tool that can be used for a lot of things. If you want to do streaming in Python,", start_char_idx=None, end_char_idx=None, text_template='{metadata_str}\n\n{content}', metadata_template='{key}: {value}', metadata_seperator='\n'), score=0.1441072441651936),
 NodeWithScore(node=TextNode(id_='be82f00d-553a-47df-b639-5fc15ee7bdc8', embedding=None, metadata={'created_at': None, 'modified_at': 1703700445, 'owner': 'janek', 'path': '/home/janek/projects/llama_index/docs/examples/retrievers/data/pathway_readme.md'}, excluded_embed_metadata_keys=[], excluded_llm_metadata_keys=[], relationships={}, hash='625d489d1a465cbfcb4f50fb84a5e29ca07cffeb3c5ef900e530822e9ff8cf7f', text="Get Help\n\nIf you have any questions, issues, or just want to chat about Pathway, we're here to help! Feel free to:\n- Check out the documentation in https://pathway.com/developers/ for detailed information.\n- Reach out to us via email at [email protected].\n\nOur team is always happy to help you and ensure that you get the most out of Pathway.\nIf you would like to better understand how best to use Pathway in your project, please don't hesitate to reach out to us.", start_char_idx=None, end_char_idx=None, text_template='{metadata_str}\n\n{content}', metadata_template='{key}: {value}', metadata_seperator='\n'), score=0.186929683249915)]

Your turn! Now edit the contents of the source file, or upload a new file to the ./data directory and repeat the query - the set of retrieved documents will reflect the changes!

Use in Query Engine#

from llama_index.core.query_engine import RetrieverQueryEngine

query_engine = RetrieverQueryEngine.from_args(
    retriever,
)
response = query_engine.query("Tell me about Pathway")
print(str(response))
Pathway is an open framework for high-throughput and low-latency real-time data processing. It provides a high-level programming interface in Python for defining data transformations, aggregations, and other operations on data streams. With Pathway, you can effortlessly design and deploy sophisticated data workflows that efficiently handle high volumes of data in real time. It is interoperable with various data sources and sinks such as Kafka, CSV files, SQL/noSQL databases, and REST API's, allowing you to connect and process data from different storage systems. Pathway was designed to be a life-saver for Python developers and ML/AI engineers faced with live data sources, where quick reactions to fresh data are necessary. It can be used for a variety of purposes including streaming in Python, building AI data pipelines, and general data processing tasks. If you have any questions or need assistance with Pathway, you can check out the documentation on the official website or reach out to the team via email.