Skip to content

Falkordb

FalkorDBGraphStore #

Bases: GraphStore

FalkorDB Graph Store.

In this graph store, triplets are stored within FalkorDB.

Parameters:

Name Type Description Default
simple_graph_store_data_dict Optional[dict]

data dict containing the triplets. See FalkorDBGraphStoreData for more details.

required
Source code in llama-index-integrations/graph_stores/llama-index-graph-stores-falkordb/llama_index/graph_stores/falkordb/base.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
class FalkorDBGraphStore(GraphStore):
    """FalkorDB Graph Store.

    In this graph store, triplets are stored within FalkorDB.

    Args:
        simple_graph_store_data_dict (Optional[dict]): data dict
            containing the triplets. See FalkorDBGraphStoreData
            for more details.
    """

    def __init__(
        self,
        url: str,
        database: str = "falkor",
        node_label: str = "Entity",
        **kwargs: Any,
    ) -> None:
        """Initialize params."""
        self._node_label = node_label

        self._driver = FalkorDB.from_url(url).select_graph(database)

        try:
            self._driver.query(f"CREATE INDEX FOR (n:`{self._node_label}`) ON (n.id)")
        except redis.ResponseError as e:
            # TODO: to find an appropriate way to handle this issue.
            logger.warning("Create index failed: %s", e)

        self._database = database

        self.schema = ""
        self.get_query = f"""
            MATCH (n1:`{self._node_label}`)-[r]->(n2:`{self._node_label}`)
            WHERE n1.id = $subj RETURN type(r), n2.id
        """

    @property
    def client(self) -> None:
        return self._driver

    def get(self, subj: str) -> List[List[str]]:
        """Get triplets."""
        result = self._driver.query(
            self.get_query, params={"subj": subj}, read_only=True
        )
        return result.result_set

    def get_rel_map(
        self, subjs: Optional[List[str]] = None, depth: int = 2, limit: int = 30
    ) -> Dict[str, List[List[str]]]:
        """Get flat rel map."""
        # The flat means for multi-hop relation path, we could get
        # knowledge like: subj -> rel -> obj -> rel -> obj -> rel -> obj.
        # This type of knowledge is useful for some tasks.
        # +-------------+------------------------------------+
        # | subj        | flattened_rels                     |
        # +-------------+------------------------------------+
        # | "player101" | [95, "player125", 2002, "team204"] |
        # | "player100" | [1997, "team204"]                  |
        # ...
        # +-------------+------------------------------------+

        rel_map: Dict[Any, List[Any]] = {}
        if subjs is None or len(subjs) == 0:
            # unlike simple graph_store, we don't do get_all here
            return rel_map

        query = f"""
            MATCH (n1:{self._node_label})
            WHERE n1.id IN $subjs
            WITH n1
            MATCH p=(n1)-[e*1..{depth}]->(z)
            RETURN p LIMIT {limit}
        """

        data = self.query(query, params={"subjs": subjs})
        if not data:
            return rel_map

        for record in data:
            nodes = record[0].nodes()
            edges = record[0].edges()

            subj_id = nodes[0].properties["id"]
            path = []
            for i, edge in enumerate(edges):
                dest = nodes[i + 1]
                dest_id = dest.properties["id"]
                path.append(edge.relation)
                path.append(dest_id)

            paths = rel_map[subj_id] if subj_id in rel_map else []
            paths.append(path)
            rel_map[subj_id] = paths

        return rel_map

    def upsert_triplet(self, subj: str, rel: str, obj: str) -> None:
        """Add triplet."""
        query = """
            MERGE (n1:`%s` {id:$subj})
            MERGE (n2:`%s` {id:$obj})
            MERGE (n1)-[:`%s`]->(n2)
        """

        prepared_statement = query % (
            self._node_label,
            self._node_label,
            rel.replace(" ", "_").upper(),
        )

        # Call FalkorDB with prepared statement
        self._driver.query(prepared_statement, params={"subj": subj, "obj": obj})

    def delete(self, subj: str, rel: str, obj: str) -> None:
        """Delete triplet."""

        def delete_rel(subj: str, obj: str, rel: str) -> None:
            rel = rel.replace(" ", "_").upper()
            query = f"""
                MATCH (n1:`{self._node_label}`)-[r:`{rel}`]->(n2:`{self._node_label}`)
                WHERE n1.id = $subj AND n2.id = $obj DELETE r
            """

            # Call FalkorDB with prepared statement
            self._driver.query(query, params={"subj": subj, "obj": obj})

        def delete_entity(entity: str) -> None:
            query = f"MATCH (n:`{self._node_label}`) WHERE n.id = $entity DELETE n"

            # Call FalkorDB with prepared statement
            self._driver.query(query, params={"entity": entity})

        def check_edges(entity: str) -> bool:
            query = f"""
                MATCH (n1:`{self._node_label}`)--()
                WHERE n1.id = $entity RETURN count(*)
            """

            # Call FalkorDB with prepared statement
            result = self._driver.query(
                query, params={"entity": entity}, read_only=True
            )
            return bool(result.result_set)

        delete_rel(subj, obj, rel)
        if not check_edges(subj):
            delete_entity(subj)
        if not check_edges(obj):
            delete_entity(obj)

    def refresh_schema(self) -> None:
        """
        Refreshes the FalkorDB graph schema information.
        """
        node_properties = self.query("CALL DB.PROPERTYKEYS()")
        relationships = self.query("CALL DB.RELATIONSHIPTYPES()")

        self.schema = f"""
        Properties: {node_properties}
        Relationships: {relationships}
        """

    def get_schema(self, refresh: bool = False) -> str:
        """Get the schema of the FalkorDBGraph store."""
        if self.schema and not refresh:
            return self.schema
        self.refresh_schema()
        logger.debug(f"get_schema() schema:\n{self.schema}")
        return self.schema

    def query(self, query: str, params: Optional[Dict[str, Any]] = None) -> Any:
        result = self._driver.query(query, params=params)
        return result.result_set

get #

get(subj: str) -> List[List[str]]

Get triplets.

Source code in llama-index-integrations/graph_stores/llama-index-graph-stores-falkordb/llama_index/graph_stores/falkordb/base.py
54
55
56
57
58
59
def get(self, subj: str) -> List[List[str]]:
    """Get triplets."""
    result = self._driver.query(
        self.get_query, params={"subj": subj}, read_only=True
    )
    return result.result_set

get_rel_map #

get_rel_map(subjs: Optional[List[str]] = None, depth: int = 2, limit: int = 30) -> Dict[str, List[List[str]]]

Get flat rel map.

Source code in llama-index-integrations/graph_stores/llama-index-graph-stores-falkordb/llama_index/graph_stores/falkordb/base.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def get_rel_map(
    self, subjs: Optional[List[str]] = None, depth: int = 2, limit: int = 30
) -> Dict[str, List[List[str]]]:
    """Get flat rel map."""
    # The flat means for multi-hop relation path, we could get
    # knowledge like: subj -> rel -> obj -> rel -> obj -> rel -> obj.
    # This type of knowledge is useful for some tasks.
    # +-------------+------------------------------------+
    # | subj        | flattened_rels                     |
    # +-------------+------------------------------------+
    # | "player101" | [95, "player125", 2002, "team204"] |
    # | "player100" | [1997, "team204"]                  |
    # ...
    # +-------------+------------------------------------+

    rel_map: Dict[Any, List[Any]] = {}
    if subjs is None or len(subjs) == 0:
        # unlike simple graph_store, we don't do get_all here
        return rel_map

    query = f"""
        MATCH (n1:{self._node_label})
        WHERE n1.id IN $subjs
        WITH n1
        MATCH p=(n1)-[e*1..{depth}]->(z)
        RETURN p LIMIT {limit}
    """

    data = self.query(query, params={"subjs": subjs})
    if not data:
        return rel_map

    for record in data:
        nodes = record[0].nodes()
        edges = record[0].edges()

        subj_id = nodes[0].properties["id"]
        path = []
        for i, edge in enumerate(edges):
            dest = nodes[i + 1]
            dest_id = dest.properties["id"]
            path.append(edge.relation)
            path.append(dest_id)

        paths = rel_map[subj_id] if subj_id in rel_map else []
        paths.append(path)
        rel_map[subj_id] = paths

    return rel_map

upsert_triplet #

upsert_triplet(subj: str, rel: str, obj: str) -> None

Add triplet.

Source code in llama-index-integrations/graph_stores/llama-index-graph-stores-falkordb/llama_index/graph_stores/falkordb/base.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def upsert_triplet(self, subj: str, rel: str, obj: str) -> None:
    """Add triplet."""
    query = """
        MERGE (n1:`%s` {id:$subj})
        MERGE (n2:`%s` {id:$obj})
        MERGE (n1)-[:`%s`]->(n2)
    """

    prepared_statement = query % (
        self._node_label,
        self._node_label,
        rel.replace(" ", "_").upper(),
    )

    # Call FalkorDB with prepared statement
    self._driver.query(prepared_statement, params={"subj": subj, "obj": obj})

delete #

delete(subj: str, rel: str, obj: str) -> None

Delete triplet.

Source code in llama-index-integrations/graph_stores/llama-index-graph-stores-falkordb/llama_index/graph_stores/falkordb/base.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def delete(self, subj: str, rel: str, obj: str) -> None:
    """Delete triplet."""

    def delete_rel(subj: str, obj: str, rel: str) -> None:
        rel = rel.replace(" ", "_").upper()
        query = f"""
            MATCH (n1:`{self._node_label}`)-[r:`{rel}`]->(n2:`{self._node_label}`)
            WHERE n1.id = $subj AND n2.id = $obj DELETE r
        """

        # Call FalkorDB with prepared statement
        self._driver.query(query, params={"subj": subj, "obj": obj})

    def delete_entity(entity: str) -> None:
        query = f"MATCH (n:`{self._node_label}`) WHERE n.id = $entity DELETE n"

        # Call FalkorDB with prepared statement
        self._driver.query(query, params={"entity": entity})

    def check_edges(entity: str) -> bool:
        query = f"""
            MATCH (n1:`{self._node_label}`)--()
            WHERE n1.id = $entity RETURN count(*)
        """

        # Call FalkorDB with prepared statement
        result = self._driver.query(
            query, params={"entity": entity}, read_only=True
        )
        return bool(result.result_set)

    delete_rel(subj, obj, rel)
    if not check_edges(subj):
        delete_entity(subj)
    if not check_edges(obj):
        delete_entity(obj)

refresh_schema #

refresh_schema() -> None

Refreshes the FalkorDB graph schema information.

Source code in llama-index-integrations/graph_stores/llama-index-graph-stores-falkordb/llama_index/graph_stores/falkordb/base.py
165
166
167
168
169
170
171
172
173
174
175
def refresh_schema(self) -> None:
    """
    Refreshes the FalkorDB graph schema information.
    """
    node_properties = self.query("CALL DB.PROPERTYKEYS()")
    relationships = self.query("CALL DB.RELATIONSHIPTYPES()")

    self.schema = f"""
    Properties: {node_properties}
    Relationships: {relationships}
    """

get_schema #

get_schema(refresh: bool = False) -> str

Get the schema of the FalkorDBGraph store.

Source code in llama-index-integrations/graph_stores/llama-index-graph-stores-falkordb/llama_index/graph_stores/falkordb/base.py
177
178
179
180
181
182
183
def get_schema(self, refresh: bool = False) -> str:
    """Get the schema of the FalkorDBGraph store."""
    if self.schema and not refresh:
        return self.schema
    self.refresh_schema()
    logger.debug(f"get_schema() schema:\n{self.schema}")
    return self.schema