Skip to content

Milvus

MilvusVectorStore #

Bases: BasePydanticVectorStore

The Milvus Vector Store.

In this vector store we store the text, its embedding and a its metadata in a Milvus collection. This implementation allows the use of an already existing collection. It also supports creating a new one if the collection doesn't exist or if overwrite is set to True.

Parameters:

Name Type Description Default
uri str

The URI to connect to, comes in the form of "http://address:port".

'http://localhost:19530'
token str

The token for log in. Empty if not using rbac, if using rbac it will most likely be "username:password".

''
collection_name str

The name of the collection where data will be stored. Defaults to "llamalection".

'llamacollection'
dim int

The dimension of the embedding vectors for the collection. Required if creating a new collection.

None
embedding_field str

The name of the embedding field for the collection, defaults to DEFAULT_EMBEDDING_KEY.

DEFAULT_EMBEDDING_KEY
doc_id_field str

The name of the doc_id field for the collection, defaults to DEFAULT_DOC_ID_KEY.

DEFAULT_DOC_ID_KEY
similarity_metric str

The similarity metric to use, currently supports IP and L2.

'IP'
consistency_level str

Which consistency level to use for a newly created collection. Defaults to "Strong".

'Strong'
overwrite bool

Whether to overwrite existing collection with same name. Defaults to False.

False
text_key str

What key text is stored in in the passed collection. Used when bringing your own collection. Defaults to None.

None
index_config dict

The configuration used for building the Milvus index. Defaults to None.

None
search_config dict

The configuration used for searching the Milvus index. Note that this must be compatible with the index type specified by index_config. Defaults to None.

None

Raises:

Type Description
ImportError

Unable to import pymilvus.

MilvusException

Error communicating with Milvus, more can be found in logging under Debug.

Returns:

Name Type Description
MilvusVectorstore

Vectorstore that supports add, delete, and query.

Examples:

pip install llama-index-vector-stores-milvus

from llama_index.vector_stores.milvus import MilvusVectorStore

# Setup MilvusVectorStore
vector_store = MilvusVectorStore(
    dim=1536,
    collection_name="your_collection_name",
    uri="http://milvus_address:port",
    token="your_milvus_token_here",
    overwrite=True
)
Source code in llama-index-integrations/vector_stores/llama-index-vector-stores-milvus/llama_index/vector_stores/milvus/base.py
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
class MilvusVectorStore(BasePydanticVectorStore):
    """The Milvus Vector Store.

    In this vector store we store the text, its embedding and
    a its metadata in a Milvus collection. This implementation
    allows the use of an already existing collection.
    It also supports creating a new one if the collection doesn't
    exist or if `overwrite` is set to True.

    Args:
        uri (str, optional): The URI to connect to, comes in the form of
            "http://address:port".
        token (str, optional): The token for log in. Empty if not using rbac, if
            using rbac it will most likely be "username:password".
        collection_name (str, optional): The name of the collection where data will be
            stored. Defaults to "llamalection".
        dim (int, optional): The dimension of the embedding vectors for the collection.
            Required if creating a new collection.
        embedding_field (str, optional): The name of the embedding field for the
            collection, defaults to DEFAULT_EMBEDDING_KEY.
        doc_id_field (str, optional): The name of the doc_id field for the collection,
            defaults to DEFAULT_DOC_ID_KEY.
        similarity_metric (str, optional): The similarity metric to use,
            currently supports IP and L2.
        consistency_level (str, optional): Which consistency level to use for a newly
            created collection. Defaults to "Strong".
        overwrite (bool, optional): Whether to overwrite existing collection with same
            name. Defaults to False.
        text_key (str, optional): What key text is stored in in the passed collection.
            Used when bringing your own collection. Defaults to None.
        index_config (dict, optional): The configuration used for building the
            Milvus index. Defaults to None.
        search_config (dict, optional): The configuration used for searching
            the Milvus index. Note that this must be compatible with the index
            type specified by `index_config`. Defaults to None.

    Raises:
        ImportError: Unable to import `pymilvus`.
        MilvusException: Error communicating with Milvus, more can be found in logging
            under Debug.

    Returns:
        MilvusVectorstore: Vectorstore that supports add, delete, and query.

    Examples:
        `pip install llama-index-vector-stores-milvus`

        ```python
        from llama_index.vector_stores.milvus import MilvusVectorStore

        # Setup MilvusVectorStore
        vector_store = MilvusVectorStore(
            dim=1536,
            collection_name="your_collection_name",
            uri="http://milvus_address:port",
            token="your_milvus_token_here",
            overwrite=True
        )
        ```
    """

    stores_text: bool = True
    stores_node: bool = True

    uri: str = "http://localhost:19530"
    token: str = ""
    collection_name: str = "llamacollection"
    dim: Optional[int]
    embedding_field: str = DEFAULT_EMBEDDING_KEY
    doc_id_field: str = DEFAULT_DOC_ID_KEY
    similarity_metric: str = "IP"
    consistency_level: str = "Strong"
    overwrite: bool = False
    text_key: Optional[str]
    output_fields: List[str] = Field(default_factory=list)
    index_config: Optional[dict]
    search_config: Optional[dict]
    batch_size: int = DEFAULT_BATCH_SIZE

    _milvusclient: MilvusClient = PrivateAttr()
    _collection: Any = PrivateAttr()

    def __init__(
        self,
        uri: str = "http://localhost:19530",
        token: str = "",
        collection_name: str = "llamacollection",
        dim: Optional[int] = None,
        embedding_field: str = DEFAULT_EMBEDDING_KEY,
        doc_id_field: str = DEFAULT_DOC_ID_KEY,
        similarity_metric: str = "IP",
        consistency_level: str = "Strong",
        overwrite: bool = False,
        text_key: Optional[str] = None,
        output_fields: Optional[List[str]] = None,
        index_config: Optional[dict] = None,
        search_config: Optional[dict] = None,
        batch_size: int = DEFAULT_BATCH_SIZE,
        **kwargs: Any,
    ) -> None:
        """Init params."""
        super().__init__(
            collection_name=collection_name,
            dim=dim,
            embedding_field=embedding_field,
            doc_id_field=doc_id_field,
            consistency_level=consistency_level,
            overwrite=overwrite,
            text_key=text_key,
            output_fields=output_fields or [],
            index_config=index_config if index_config else {},
            search_config=search_config if search_config else {},
            batch_size=batch_size,
        )

        # Select the similarity metric
        similarity_metrics_map = {
            "ip": "IP",
            "l2": "L2",
            "euclidean": "L2",
            "cosine": "COSINE",
        }
        self.similarity_metric = similarity_metrics_map.get(
            similarity_metric.lower(), "L2"
        )

        # Connect to Milvus instance
        self._milvusclient = MilvusClient(
            uri=uri,
            token=token,
            **kwargs,  # pass additional arguments such as server_pem_path
        )
        # Delete previous collection if overwriting
        if overwrite and collection_name in self.client.list_collections():
            self._milvusclient.drop_collection(collection_name)

        # Create the collection if it does not exist
        if collection_name not in self.client.list_collections():
            if dim is None:
                raise ValueError("Dim argument required for collection creation.")
            self._milvusclient.create_collection(
                collection_name=collection_name,
                dimension=dim,
                primary_field_name=MILVUS_ID_FIELD,
                vector_field_name=embedding_field,
                id_type="string",
                metric_type=self.similarity_metric,
                max_length=65_535,
                consistency_level=consistency_level,
            )

        self._collection = Collection(collection_name, using=self._milvusclient._using)
        self._create_index_if_required()

        logger.debug(f"Successfully created a new collection: {self.collection_name}")

    @property
    def client(self) -> Any:
        """Get client."""
        return self._milvusclient

    def add(self, nodes: List[BaseNode], **add_kwargs: Any) -> List[str]:
        """Add the embeddings and their nodes into Milvus.

        Args:
            nodes (List[BaseNode]): List of nodes with embeddings
                to insert.

        Raises:
            MilvusException: Failed to insert data.

        Returns:
            List[str]: List of ids inserted.
        """
        insert_list = []
        insert_ids = []

        # Process that data we are going to insert
        for node in nodes:
            entry = node_to_metadata_dict(node)
            entry[MILVUS_ID_FIELD] = node.node_id
            entry[self.embedding_field] = node.embedding

            insert_ids.append(node.node_id)
            insert_list.append(entry)

        # Insert the data into milvus
        for insert_batch in iter_batch(insert_list, self.batch_size):
            self._collection.insert(insert_batch)
        if add_kwargs.get("force_flush", False):
            self._collection.flush()
        self._create_index_if_required()
        logger.debug(
            f"Successfully inserted embeddings into: {self.collection_name} "
            f"Num Inserted: {len(insert_list)}"
        )
        return insert_ids

    def delete(self, ref_doc_id: str, **delete_kwargs: Any) -> None:
        """
        Delete nodes using with ref_doc_id.

        Args:
            ref_doc_id (str): The doc_id of the document to delete.

        Raises:
            MilvusException: Failed to delete the doc.
        """
        # Adds ability for multiple doc delete in future.
        doc_ids: List[str]
        if isinstance(ref_doc_id, list):
            doc_ids = ref_doc_id  # type: ignore
        else:
            doc_ids = [ref_doc_id]

        # Begin by querying for the primary keys to delete
        doc_ids = ['"' + entry + '"' for entry in doc_ids]
        entries = self._milvusclient.query(
            collection_name=self.collection_name,
            filter=f"{self.doc_id_field} in [{','.join(doc_ids)}]",
        )
        if len(entries) > 0:
            ids = [entry["id"] for entry in entries]
            self._milvusclient.delete(collection_name=self.collection_name, pks=ids)
            logger.debug(f"Successfully deleted embedding with doc_id: {doc_ids}")

    def query(self, query: VectorStoreQuery, **kwargs: Any) -> VectorStoreQueryResult:
        """Query index for top k most similar nodes.

        Args:
            query_embedding (List[float]): query embedding
            similarity_top_k (int): top k most similar nodes
            doc_ids (Optional[List[str]]): list of doc_ids to filter by
            node_ids (Optional[List[str]]): list of node_ids to filter by
            output_fields (Optional[List[str]]): list of fields to return
            embedding_field (Optional[str]): name of embedding field
        """
        if query.mode != VectorStoreQueryMode.DEFAULT:
            raise ValueError(f"Milvus does not support {query.mode} yet.")

        expr = []
        output_fields = ["*"]

        # Parse the filter
        if query.filters is not None:
            expr.extend(_to_milvus_filter(query.filters))

        # Parse any docs we are filtering on
        if query.doc_ids is not None and len(query.doc_ids) != 0:
            expr_list = ['"' + entry + '"' for entry in query.doc_ids]
            expr.append(f"{self.doc_id_field} in [{','.join(expr_list)}]")

        # Parse any nodes we are filtering on
        if query.node_ids is not None and len(query.node_ids) != 0:
            expr_list = ['"' + entry + '"' for entry in query.node_ids]
            expr.append(f"{MILVUS_ID_FIELD} in [{','.join(expr_list)}]")

        # Limit output fields
        if query.output_fields is not None:
            output_fields = query.output_fields
        elif len(self.output_fields) > 0:
            output_fields = self.output_fields

        # Convert to string expression
        string_expr = ""
        if len(expr) != 0:
            string_expr = f" {query.filters.condition.value} ".join(expr)

        # Perform the search
        res = self._milvusclient.search(
            collection_name=self.collection_name,
            data=[query.query_embedding],
            filter=string_expr,
            limit=query.similarity_top_k,
            output_fields=output_fields,
            search_params=self.search_config,
        )

        logger.debug(
            f"Successfully searched embedding in collection: {self.collection_name}"
            f" Num Results: {len(res[0])}"
        )

        nodes = []
        similarities = []
        ids = []

        # Parse the results
        for hit in res[0]:
            if not self.text_key:
                node = metadata_dict_to_node(
                    {
                        "_node_content": hit["entity"].get("_node_content", None),
                        "_node_type": hit["entity"].get("_node_type", None),
                    }
                )
            else:
                try:
                    text = hit["entity"].get(self.text_key)
                except Exception:
                    raise ValueError(
                        "The passed in text_key value does not exist "
                        "in the retrieved entity."
                    )

                metadata = {key: hit["entity"].get(key) for key in self.output_fields}
                node = TextNode(text=text, metadata=metadata)

            nodes.append(node)
            similarities.append(hit["distance"])
            ids.append(hit["id"])

        return VectorStoreQueryResult(nodes=nodes, similarities=similarities, ids=ids)

    def _create_index_if_required(self, force: bool = False) -> None:
        # This helper method is introduced to allow the index to be created
        # both in the constructor and in the `add` method. The `force` flag is
        # provided to ensure that the index is created in the constructor even
        # if self.overwrite is false. In the `add` method, the index is
        # recreated only if self.overwrite is true.
        if (self._collection.has_index() and self.overwrite) or force:
            self._collection.release()
            self._collection.drop_index()
            base_params: Dict[str, Any] = self.index_config.copy()
            index_type: str = base_params.pop("index_type", "FLAT")
            index_params: Dict[str, Union[str, Dict[str, Any]]] = {
                "params": base_params,
                "metric_type": self.similarity_metric,
                "index_type": index_type,
            }
            self._collection.create_index(
                self.embedding_field, index_params=index_params
            )
            self._collection.load()

client property #

client: Any

Get client.

add #

add(nodes: List[BaseNode], **add_kwargs: Any) -> List[str]

Add the embeddings and their nodes into Milvus.

Parameters:

Name Type Description Default
nodes List[BaseNode]

List of nodes with embeddings to insert.

required

Raises:

Type Description
MilvusException

Failed to insert data.

Returns:

Type Description
List[str]

List[str]: List of ids inserted.

Source code in llama-index-integrations/vector_stores/llama-index-vector-stores-milvus/llama_index/vector_stores/milvus/base.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
def add(self, nodes: List[BaseNode], **add_kwargs: Any) -> List[str]:
    """Add the embeddings and their nodes into Milvus.

    Args:
        nodes (List[BaseNode]): List of nodes with embeddings
            to insert.

    Raises:
        MilvusException: Failed to insert data.

    Returns:
        List[str]: List of ids inserted.
    """
    insert_list = []
    insert_ids = []

    # Process that data we are going to insert
    for node in nodes:
        entry = node_to_metadata_dict(node)
        entry[MILVUS_ID_FIELD] = node.node_id
        entry[self.embedding_field] = node.embedding

        insert_ids.append(node.node_id)
        insert_list.append(entry)

    # Insert the data into milvus
    for insert_batch in iter_batch(insert_list, self.batch_size):
        self._collection.insert(insert_batch)
    if add_kwargs.get("force_flush", False):
        self._collection.flush()
    self._create_index_if_required()
    logger.debug(
        f"Successfully inserted embeddings into: {self.collection_name} "
        f"Num Inserted: {len(insert_list)}"
    )
    return insert_ids

delete #

delete(ref_doc_id: str, **delete_kwargs: Any) -> None

Delete nodes using with ref_doc_id.

Parameters:

Name Type Description Default
ref_doc_id str

The doc_id of the document to delete.

required

Raises:

Type Description
MilvusException

Failed to delete the doc.

Source code in llama-index-integrations/vector_stores/llama-index-vector-stores-milvus/llama_index/vector_stores/milvus/base.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
def delete(self, ref_doc_id: str, **delete_kwargs: Any) -> None:
    """
    Delete nodes using with ref_doc_id.

    Args:
        ref_doc_id (str): The doc_id of the document to delete.

    Raises:
        MilvusException: Failed to delete the doc.
    """
    # Adds ability for multiple doc delete in future.
    doc_ids: List[str]
    if isinstance(ref_doc_id, list):
        doc_ids = ref_doc_id  # type: ignore
    else:
        doc_ids = [ref_doc_id]

    # Begin by querying for the primary keys to delete
    doc_ids = ['"' + entry + '"' for entry in doc_ids]
    entries = self._milvusclient.query(
        collection_name=self.collection_name,
        filter=f"{self.doc_id_field} in [{','.join(doc_ids)}]",
    )
    if len(entries) > 0:
        ids = [entry["id"] for entry in entries]
        self._milvusclient.delete(collection_name=self.collection_name, pks=ids)
        logger.debug(f"Successfully deleted embedding with doc_id: {doc_ids}")

query #

query(query: VectorStoreQuery, **kwargs: Any) -> VectorStoreQueryResult

Query index for top k most similar nodes.

Parameters:

Name Type Description Default
query_embedding List[float]

query embedding

required
similarity_top_k int

top k most similar nodes

required
doc_ids Optional[List[str]]

list of doc_ids to filter by

required
node_ids Optional[List[str]]

list of node_ids to filter by

required
output_fields Optional[List[str]]

list of fields to return

required
embedding_field Optional[str]

name of embedding field

required
Source code in llama-index-integrations/vector_stores/llama-index-vector-stores-milvus/llama_index/vector_stores/milvus/base.py
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
def query(self, query: VectorStoreQuery, **kwargs: Any) -> VectorStoreQueryResult:
    """Query index for top k most similar nodes.

    Args:
        query_embedding (List[float]): query embedding
        similarity_top_k (int): top k most similar nodes
        doc_ids (Optional[List[str]]): list of doc_ids to filter by
        node_ids (Optional[List[str]]): list of node_ids to filter by
        output_fields (Optional[List[str]]): list of fields to return
        embedding_field (Optional[str]): name of embedding field
    """
    if query.mode != VectorStoreQueryMode.DEFAULT:
        raise ValueError(f"Milvus does not support {query.mode} yet.")

    expr = []
    output_fields = ["*"]

    # Parse the filter
    if query.filters is not None:
        expr.extend(_to_milvus_filter(query.filters))

    # Parse any docs we are filtering on
    if query.doc_ids is not None and len(query.doc_ids) != 0:
        expr_list = ['"' + entry + '"' for entry in query.doc_ids]
        expr.append(f"{self.doc_id_field} in [{','.join(expr_list)}]")

    # Parse any nodes we are filtering on
    if query.node_ids is not None and len(query.node_ids) != 0:
        expr_list = ['"' + entry + '"' for entry in query.node_ids]
        expr.append(f"{MILVUS_ID_FIELD} in [{','.join(expr_list)}]")

    # Limit output fields
    if query.output_fields is not None:
        output_fields = query.output_fields
    elif len(self.output_fields) > 0:
        output_fields = self.output_fields

    # Convert to string expression
    string_expr = ""
    if len(expr) != 0:
        string_expr = f" {query.filters.condition.value} ".join(expr)

    # Perform the search
    res = self._milvusclient.search(
        collection_name=self.collection_name,
        data=[query.query_embedding],
        filter=string_expr,
        limit=query.similarity_top_k,
        output_fields=output_fields,
        search_params=self.search_config,
    )

    logger.debug(
        f"Successfully searched embedding in collection: {self.collection_name}"
        f" Num Results: {len(res[0])}"
    )

    nodes = []
    similarities = []
    ids = []

    # Parse the results
    for hit in res[0]:
        if not self.text_key:
            node = metadata_dict_to_node(
                {
                    "_node_content": hit["entity"].get("_node_content", None),
                    "_node_type": hit["entity"].get("_node_type", None),
                }
            )
        else:
            try:
                text = hit["entity"].get(self.text_key)
            except Exception:
                raise ValueError(
                    "The passed in text_key value does not exist "
                    "in the retrieved entity."
                )

            metadata = {key: hit["entity"].get(key) for key in self.output_fields}
            node = TextNode(text=text, metadata=metadata)

        nodes.append(node)
        similarities.append(hit["distance"])
        ids.append(hit["id"])

    return VectorStoreQueryResult(nodes=nodes, similarities=similarities, ids=ids)