Building a Custom Agent#

In this cookbook we show you how to build a custom agent using LlamaIndex.

The easiest way to build a custom agent is to simply subclass CustomSimpleAgentWorker and implement a few required functions. You have complete flexibility in defining the agent step-wise logic.

This lets you add arbitrarily complex reasoning logic on top of your RAG pipeline.

We show you how to build a simple agent that adds a retry layer on top of a RouterQueryEngine, allowing it to retry queries until the task is complete. We build this on top of both a SQL tool and a vector index query tool. Even if the tool makes an error or only answers part of the question, the agent can continue retrying the question until the task is complete.

Setup the Custom Agent#

Here we setup the custom agent.

Refresher#

An agent in LlamaIndex consists of both an agent runner + agent worker. An agent runner is an orchestrator that stores state like memory, whereas an agent worker controls the step-wise execution of a Task. Agent runners include sequential, parallel execution. More details can be found in our lower level API guide.

Most core agent logic (e.g. ReAct, function calling loops), can be executed in the agent worker. Therefore we’ve made it easy to subclass an agent worker, letting you plug it into any agent runner.

Creating a Custom Agent Worker Subclass#

As mentioned above we subclass CustomSimpleAgentWorker. This is a class that already sets up some scaffolding for you. This includes being able to take in tools, callbacks, LLM, and also ensures that the state/steps are properly formatted. In the meantime you mostly have to implement the following functions:

  • _initialize_state

  • _run_step

  • _finalize_task

Some additional notes:

  • You can implement _arun_step as well if you want to support async chat in the agent.

  • You can choose to override __init__ as long as you pass all remaining args, kwargs to super()

  • CustomSimpleAgentWorker is implemented as a Pydantic BaseModel meaning that you can define your own custom properties as well.

Here are the full set of base properties on each CustomSimpleAgentWorker (that you need to/can pass in when constructing your custom agent):

  • tools: Sequence[BaseTool]

  • tool_retriever: Optional[ObjectRetriever[BaseTool]]

  • llm: LLM

  • callback_manager: CallbackManager

  • verbose: bool

Note that tools and tool_retriever are mutually exclusive, you can only pass in one or the either (e.g. define a static list of tools or define a callable function that returns relevant tools given a user message). You can call get_tools(message: str) to return relevant tools given a message.

All of these properties are accessible via self when defining your custom agent.

from llama_index.agent import CustomSimpleAgentWorker, Task, AgentChatResponse
from typing import Dict, Any, List, Tuple, Optional
from llama_index.tools import BaseTool, QueryEngineTool
from llama_index.program import LLMTextCompletionProgram
from llama_index.output_parsers import PydanticOutputParser
from llama_index.query_engine import RouterQueryEngine
from llama_index.prompts import ChatPromptTemplate, PromptTemplate
from llama_index.selectors import PydanticSingleSelector
from pydantic import Field, BaseModel

Here we define some helper variables and methods. E.g. the prompt template to use to detect errors as well as the response format in Pydantic.

from llama_index.llms import ChatMessage, MessageRole

DEFAULT_PROMPT_STR = """
Given previous question/response pairs, please determine if an error has occurred in the response, and suggest \
    a modified question that will not trigger the error.

Examples of modified questions:
- The question itself is modified to elicit a non-erroneous response
- The question is augmented with context that will help the downstream system better answer the question.
- The question is augmented with examples of negative responses, or other negative questions.

An error means that either an exception has triggered, or the response is completely irrelevant to the question.

Please return the evaluation of the response in the following JSON format.

"""


def get_chat_prompt_template(
    system_prompt: str, current_reasoning: Tuple[str, str]
) -> ChatPromptTemplate:
    system_msg = ChatMessage(role=MessageRole.SYSTEM, content=system_prompt)
    messages = [system_msg]
    for raw_msg in current_reasoning:
        if raw_msg[0] == "user":
            messages.append(
                ChatMessage(role=MessageRole.USER, content=raw_msg[1])
            )
        else:
            messages.append(
                ChatMessage(role=MessageRole.ASSISTANT, content=raw_msg[1])
            )
    return ChatPromptTemplate(message_templates=messages)


class ResponseEval(BaseModel):
    """Evaluation of whether the response has an error."""

    has_error: bool = Field(
        ..., description="Whether the response has an error."
    )
    new_question: str = Field(..., description="The suggested new question.")
    explanation: str = Field(
        ...,
        description=(
            "The explanation for the error as well as for the new question."
            "Can include the direct stack trace as well."
        ),
    )
from pydantic import PrivateAttr


class RetryAgentWorker(CustomSimpleAgentWorker):
    """Agent worker that adds a retry layer on top of a router.

    Continues iterating until there's no errors / task is done.

    """

    prompt_str: str = Field(default=DEFAULT_PROMPT_STR)
    max_iterations: int = Field(default=10)

    _router_query_engine: RouterQueryEngine = PrivateAttr()

    def __init__(self, tools: List[BaseTool], **kwargs: Any) -> None:
        """Init params."""
        # validate that all tools are query engine tools
        for tool in tools:
            if not isinstance(tool, QueryEngineTool):
                raise ValueError(
                    f"Tool {tool.metadata.name} is not a query engine tool."
                )
        self._router_query_engine = RouterQueryEngine(
            selector=PydanticSingleSelector.from_defaults(),
            query_engine_tools=tools,
            verbose=kwargs.get("verbose", False),
        )
        super().__init__(
            tools=tools,
            **kwargs,
        )

    def _initialize_state(self, task: Task, **kwargs: Any) -> Dict[str, Any]:
        """Initialize state."""
        return {"count": 0, "current_reasoning": []}

    def _run_step(
        self, state: Dict[str, Any], task: Task, input: Optional[str] = None
    ) -> Tuple[AgentChatResponse, bool]:
        """Run step.

        Returns:
            Tuple of (agent_response, is_done)

        """
        if input is not None:
            # if input is specified, override input
            new_input = input
        elif "new_input" not in state:
            new_input = task.input
        else:
            new_input = state["new_input"]

        if self.verbose:
            print(f"> Current Input: {new_input}")

        # first run router query engine
        response = self._router_query_engine.query(new_input)

        # append to current reasoning
        state["current_reasoning"].extend(
            [("user", new_input), ("assistant", str(response))]
        )

        # Then, check for errors
        # dynamically create pydantic program for structured output extraction based on template
        chat_prompt_tmpl = get_chat_prompt_template(
            self.prompt_str, state["current_reasoning"]
        )
        llm_program = LLMTextCompletionProgram.from_defaults(
            output_parser=PydanticOutputParser(output_cls=ResponseEval),
            prompt=chat_prompt_tmpl,
            llm=self.llm,
        )
        # run program, look at the result
        response_eval = llm_program(
            query_str=new_input, response_str=str(response)
        )
        if not response_eval.has_error:
            is_done = True
        else:
            is_done = False
        state["new_input"] = response_eval.new_question

        if self.verbose:
            print(f"> Question: {new_input}")
            print(f"> Response: {response}")
            print(f"> Response eval: {response_eval.dict()}")

        # return response
        return AgentChatResponse(response=str(response)), is_done

    def _finalize_task(self, state: Dict[str, Any], **kwargs) -> None:
        """Finalize task."""
        # nothing to finalize here
        # this is usually if you want to modify any sort of
        # internal state beyond what is set in `_initialize_state`
        pass

Setup Data and Tools#

We setup both a SQL Tool as well as vector index tools for each city.

from llama_index.tools.query_engine import QueryEngineTool

Setup SQL DB + Tool#

from sqlalchemy import (
    create_engine,
    MetaData,
    Table,
    Column,
    String,
    Integer,
    select,
    column,
)
from llama_index import SQLDatabase

engine = create_engine("sqlite:///:memory:", future=True)
metadata_obj = MetaData()
# create city SQL table
table_name = "city_stats"
city_stats_table = Table(
    table_name,
    metadata_obj,
    Column("city_name", String(16), primary_key=True),
    Column("population", Integer),
    Column("country", String(16), nullable=False),
)

metadata_obj.create_all(engine)
from sqlalchemy import insert

rows = [
    {"city_name": "Toronto", "population": 2930000, "country": "Canada"},
    {"city_name": "Tokyo", "population": 13960000, "country": "Japan"},
    {"city_name": "Berlin", "population": 3645000, "country": "Germany"},
]
for row in rows:
    stmt = insert(city_stats_table).values(**row)
    with engine.begin() as connection:
        cursor = connection.execute(stmt)
from llama_index.indices.struct_store.sql_query import NLSQLTableQueryEngine

sql_database = SQLDatabase(engine, include_tables=["city_stats"])
sql_query_engine = NLSQLTableQueryEngine(
    sql_database=sql_database, tables=["city_stats"], verbose=True
)
sql_tool = QueryEngineTool.from_defaults(
    query_engine=sql_query_engine,
    description=(
        "Useful for translating a natural language query into a SQL query over"
        " a table containing: city_stats, containing the population/country of"
        " each city"
    ),
)

Setup Vector Tools#

from llama_index.readers import WikipediaReader
from llama_index import VectorStoreIndex
cities = ["Toronto", "Berlin", "Tokyo"]
wiki_docs = WikipediaReader().load_data(pages=cities)
# build a separate vector index per city
# You could also choose to define a single vector index across all docs, and annotate each chunk by metadata
vector_tools = []
for city, wiki_doc in zip(cities, wiki_docs):
    vector_index = VectorStoreIndex.from_documents([wiki_doc])
    vector_query_engine = vector_index.as_query_engine()
    vector_tool = QueryEngineTool.from_defaults(
        query_engine=vector_query_engine,
        description=f"Useful for answering semantic questions about {city}",
    )
    vector_tools.append(vector_tool)

Build Custom Agent#

from llama_index.agent import AgentRunner
from llama_index.llms import OpenAI
llm = OpenAI(model="gpt-4")
callback_manager = llm.callback_manager

query_engine_tools = [sql_tool] + vector_tools
agent_worker = RetryAgentWorker.from_tools(
    query_engine_tools,
    llm=llm,
    verbose=True,
    callback_manager=callback_manager,
)
agent = AgentRunner(agent_worker, callback_manager=callback_manager)

Try Out Some Queries#

Let’s run some e2e queries through agent.chat.

response = agent.chat("Which countries are each city from?")
print(str(response))
> Current Input: Which countries are each city from?
Selecting query engine 0: The first choice is the most relevant because it mentions a table containing city_stats, which likely includes information about the country of each city..
> Question: Which countries are each city from?
> Response: The city of Toronto is from Canada, Tokyo is from Japan, and Berlin is from Germany.
> Response eval: {'has_error': True, 'new_question': 'Which country is each of the following cities from: Toronto, Tokyo, Berlin?', 'explanation': 'The original question was too vague and did not specify which cities the user was interested in. The new question provides specific cities for the system to provide information on.'}
> Current Input: Which country is each of the following cities from: Toronto, Tokyo, Berlin?
Selecting query engine 0: This choice is relevant because it mentions a table containing city_stats, which likely includes information about the country of each city..
> Question: Which country is each of the following cities from: Toronto, Tokyo, Berlin?
> Response: Toronto is from Canada, Tokyo is from Japan, and Berlin is from Germany.
> Response eval: {'has_error': False, 'new_question': '', 'explanation': ''}
Toronto is from Canada, Tokyo is from Japan, and Berlin is from Germany.
response = agent.chat(
    "What are the top modes of transporation fo the city with the higehest population?"
)
print(str(response))
> Current Input: What are the top modes of transporation fo the city with the higehest population?
Selecting query engine 0: The question is asking about the top modes of transportation for the city with the highest population, which requires translating a natural language query into a SQL query over a table containing city statistics..
> Question: What are the top modes of transporation fo the city with the higehest population?
> Response: I'm sorry, but there seems to be an error in the SQL query. Please check the syntax and try again.
> Response eval: {'has_error': True, 'new_question': 'What are the top modes of transportation for the city with the highest population?', 'explanation': 'The original question had spelling errors which might have caused confusion. The corrected question now clearly asks for the top modes of transportation in the city with the highest population.'}
> Current Input: What are the top modes of transportation for the city with the highest population?
Selecting query engine 0: This choice is relevant because it mentions translating a natural language query into a SQL query over a table containing city statistics, which could include information about the population of cities..
> Question: What are the top modes of transportation for the city with the highest population?
> Response: The city with the highest population is Tokyo, Japan.
> Response eval: {'has_error': True, 'new_question': 'What are the top modes of transportation in Tokyo, Japan?', 'explanation': 'The assistant did not answer the original question correctly. The question asked for the top modes of transportation in the city with the highest population, but the assistant only provided the city with the highest population. The new question directly asks for the top modes of transportation in Tokyo, Japan, which is the city with the highest population.'}
> Current Input: What are the top modes of transportation in Tokyo, Japan?
Selecting query engine 3: Tokyo is mentioned in choice 4, which is specifically about answering semantic questions about Tokyo..
> Question: What are the top modes of transportation in Tokyo, Japan?
> Response: The top modes of transportation in Tokyo, Japan are trains and subways, which are considered clean and efficient. There are also buses, monorails, and trams that play a secondary role in the transportation system. Additionally, there are expressways that connect Tokyo to other points in the Greater Tokyo Area and beyond. Taxis and long-distance ferries are also available for transportation within the city and to other islands.
> Response eval: {'has_error': True, 'new_question': 'What are the top modes of transportation in Tokyo, Japan?', 'explanation': 'The original question was not answered correctly. The assistant provided information about the city with the highest population, but did not answer the question about the top modes of transportation in that city. The new question directly asks about the modes of transportation in Tokyo, Japan, which is the city with the highest population.'}
> Current Input: What are the top modes of transportation in Tokyo, Japan?
Selecting query engine 3: The question specifically asks about Tokyo, and choice 4 is about answering semantic questions about Tokyo..
> Question: What are the top modes of transportation in Tokyo, Japan?
> Response: The top modes of transportation in Tokyo, Japan are trains and subways. Tokyo has an extensive network of clean and efficient trains and subways operated by various operators. There are up to 62 electric train lines and more than 900 train stations in Tokyo. Buses, monorails, and trams also play a secondary role in public transportation within the city. Additionally, Tokyo has expressways, taxis, and long-distance ferries as other means of transportation.
> Response eval: {'has_error': False, 'new_question': '', 'explanation': ''}
The top modes of transportation in Tokyo, Japan are trains and subways. Tokyo has an extensive network of clean and efficient trains and subways operated by various operators. There are up to 62 electric train lines and more than 900 train stations in Tokyo. Buses, monorails, and trams also play a secondary role in public transportation within the city. Additionally, Tokyo has expressways, taxis, and long-distance ferries as other means of transportation.
response = agent.chat("What are the sports teams of each city in Asia?")
print(str(response))
> Current Input: What are the sports teams of each city in Asia?
Selecting query engine 3: The question is asking about sports teams in Asia, and Tokyo is a city in Asia..
> Question: What are the sports teams of each city in Asia?
> Response: I'm sorry, but the context information does not provide a comprehensive list of sports teams in each city in Asia. It only mentions some of the sports teams in Tokyo, Japan. To answer your question, I would need more specific information or a different source that provides a broader overview of sports teams in cities across Asia.
> Response eval: {'has_error': True, 'new_question': 'What are some popular sports teams in Tokyo, Japan?', 'explanation': 'The original question is too broad and the system does not have enough information to provide a comprehensive list of sports teams in each city in Asia. The new question is more specific and focuses on sports teams in Tokyo, Japan, which the system has information on.'}
> Current Input: What are some popular sports teams in Tokyo, Japan?
Selecting query engine 3: Tokyo is mentioned in choice 4, which is about answering semantic questions about Tokyo..
> Question: What are some popular sports teams in Tokyo, Japan?
> Response: Some popular sports teams in Tokyo, Japan include the Yomiuri Giants and Tokyo Yakult Swallows in baseball, F.C. Tokyo and Tokyo Verdy 1969 in soccer, and the Hitachi SunRockers, Toyota Alvark Tokyo, and Tokyo Excellence in basketball. Tokyo is also known for its sumo wrestling tournaments held at the RyĹŤgoku Kokugikan sumo arena.
> Response eval: {'has_error': False, 'new_question': '', 'explanation': ''}
Some popular sports teams in Tokyo, Japan include the Yomiuri Giants and Tokyo Yakult Swallows in baseball, F.C. Tokyo and Tokyo Verdy 1969 in soccer, and the Hitachi SunRockers, Toyota Alvark Tokyo, and Tokyo Excellence in basketball. Tokyo is also known for its sumo wrestling tournaments held at the RyĹŤgoku Kokugikan sumo arena.

Step-wise Queries#

We can also try some step-wise queries. This allows us to inject user feedback in the middle of a task execution to guide responses towards the correct state faster.

agent_worker = RetryAgentWorker.from_tools(
    query_engine_tools,
    llm=llm,
    verbose=True,
    callback_manager=callback_manager,
)
agent = AgentRunner(agent_worker, callback_manager=callback_manager)
task = agent.create_task("Which countries are each city from?")
step_output = agent.run_step(task.task_id)
step_output.output
> Current Input: Which countries are each city from?
Selecting query engine 0: The first choice is the most relevant because it mentions a table containing city_stats, which likely includes information about the country of each city..
> Question: Which countries are each city from?
> Response: The city of Toronto is from Canada, Tokyo is from Japan, and Berlin is from Germany.
> Response eval: {'has_error': True, 'new_question': 'Can you tell me the country of origin for each of these cities: Toronto, Tokyo, Berlin?', 'explanation': 'The original question was too vague and did not specify which cities the user was interested in. The new question provides specific cities for the system to provide information on.'}
AgentChatResponse(response='The city of Toronto is from Canada, Tokyo is from Japan, and Berlin is from Germany.', sources=[], source_nodes=[])
step_output = agent.run_step(
    task.task_id,
    input="Which country is each of the following cities from: Toronto, Tokyo, Berlin?",
)
step_output.output
> Current Input: Which country is each of the following cities from: Toronto, Tokyo, Berlin?
Selecting query engine 0: This choice is relevant because it mentions a table containing city_stats, which likely includes information about the country of each city..
> Question: Which country is each of the following cities from: Toronto, Tokyo, Berlin?
> Response: Toronto is from Canada, Tokyo is from Japan, and Berlin is from Germany.
> Response eval: {'has_error': False, 'new_question': '', 'explanation': ''}
AgentChatResponse(response='Toronto is from Canada, Tokyo is from Japan, and Berlin is from Germany.', sources=[], source_nodes=[])
if step_output.is_last:
    response = agent.finalize_response(task.task_id)
    print(str(response))
Toronto is from Canada, Tokyo is from Japan, and Berlin is from Germany.