Skip to content

Milvus

MilvusVectorStore #

Bases: BasePydanticVectorStore

The Milvus Vector Store.

In this vector store we store the text, its embedding and a its metadata in a Milvus collection. This implementation allows the use of an already existing collection. It also supports creating a new one if the collection doesn't exist or if overwrite is set to True.

Parameters:

Name Type Description Default
uri str

The URI to connect to, comes in the form of "http://address:port".

'http://localhost:19530'
token str

The token for log in. Empty if not using rbac, if using rbac it will most likely be "username:password".

''
collection_name str

The name of the collection where data will be stored. Defaults to "llamalection".

'llamacollection'
dim int

The dimension of the embedding vectors for the collection. Required if creating a new collection.

None
embedding_field str

The name of the embedding field for the collection, defaults to DEFAULT_EMBEDDING_KEY.

DEFAULT_EMBEDDING_KEY
doc_id_field str

The name of the doc_id field for the collection, defaults to DEFAULT_DOC_ID_KEY.

DEFAULT_DOC_ID_KEY
similarity_metric str

The similarity metric to use, currently supports IP and L2.

'IP'
consistency_level str

Which consistency level to use for a newly created collection. Defaults to "Strong".

'Strong'
overwrite bool

Whether to overwrite existing collection with same name. Defaults to False.

False
text_key str

What key text is stored in in the passed collection. Used when bringing your own collection. Defaults to None.

None
index_config dict

The configuration used for building the Milvus index. Defaults to None.

None
search_config dict

The configuration used for searching the Milvus index. Note that this must be compatible with the index type specified by index_config. Defaults to None.

None
batch_size int

Configures the number of documents processed in one batch when inserting data into Milvus. Defaults to DEFAULT_BATCH_SIZE.

DEFAULT_BATCH_SIZE
enable_sparse bool

A boolean flag indicating whether to enable support for sparse embeddings for hybrid retrieval. Defaults to False.

False
sparse_embedding_function BaseSparseEmbeddingFunction

If enable_sparse is True, this object should be provided to convert text to a sparse embedding.

None
hybrid_ranker str

Specifies the type of ranker used in hybrid search queries. Currently only supports ['RRFRanker','WeightedRanker']. Defaults to "RRFRanker".

'RRFRanker'
hybrid_ranker_params dict

Configuration parameters for the hybrid ranker. The structure of this dictionary depends on the specific ranker being used: - For "RRFRanker", it should include: - 'k' (int): A parameter used in Reciprocal Rank Fusion (RRF). This value is used to calculate the rank scores as part of the RRF algorithm, which combines multiple ranking strategies into a single score to improve search relevance. - For "WeightedRanker", it expects: - 'weights' (list of float): A list of exactly two weights: 1. The weight for the dense embedding component. 2. The weight for the sparse embedding component. These weights are used to adjust the importance of the dense and sparse components of the embeddings in the hybrid retrieval process. Defaults to an empty dictionary, implying that the ranker will operate with its predefined default settings.

{}

Raises:

Type Description
ImportError

Unable to import pymilvus.

MilvusException

Error communicating with Milvus, more can be found in logging under Debug.

Returns:

Name Type Description
MilvusVectorstore

Vectorstore that supports add, delete, and query.

Examples:

pip install llama-index-vector-stores-milvus

from llama_index.vector_stores.milvus import MilvusVectorStore

# Setup MilvusVectorStore
vector_store = MilvusVectorStore(
    dim=1536,
    collection_name="your_collection_name",
    uri="http://milvus_address:port",
    token="your_milvus_token_here",
    overwrite=True
)
Source code in llama-index-integrations/vector_stores/llama-index-vector-stores-milvus/llama_index/vector_stores/milvus/base.py
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
class MilvusVectorStore(BasePydanticVectorStore):
    """The Milvus Vector Store.

    In this vector store we store the text, its embedding and
    a its metadata in a Milvus collection. This implementation
    allows the use of an already existing collection.
    It also supports creating a new one if the collection doesn't
    exist or if `overwrite` is set to True.

    Args:
        uri (str, optional): The URI to connect to, comes in the form of
            "http://address:port".
        token (str, optional): The token for log in. Empty if not using rbac, if
            using rbac it will most likely be "username:password".
        collection_name (str, optional): The name of the collection where data will be
            stored. Defaults to "llamalection".
        dim (int, optional): The dimension of the embedding vectors for the collection.
            Required if creating a new collection.
        embedding_field (str, optional): The name of the embedding field for the
            collection, defaults to DEFAULT_EMBEDDING_KEY.
        doc_id_field (str, optional): The name of the doc_id field for the collection,
            defaults to DEFAULT_DOC_ID_KEY.
        similarity_metric (str, optional): The similarity metric to use,
            currently supports IP and L2.
        consistency_level (str, optional): Which consistency level to use for a newly
            created collection. Defaults to "Strong".
        overwrite (bool, optional): Whether to overwrite existing collection with same
            name. Defaults to False.
        text_key (str, optional): What key text is stored in in the passed collection.
            Used when bringing your own collection. Defaults to None.
        index_config (dict, optional): The configuration used for building the
            Milvus index. Defaults to None.
        search_config (dict, optional): The configuration used for searching
            the Milvus index. Note that this must be compatible with the index
            type specified by `index_config`. Defaults to None.
        batch_size (int): Configures the number of documents processed in one
            batch when inserting data into Milvus. Defaults to DEFAULT_BATCH_SIZE.
        enable_sparse (bool): A boolean flag indicating whether to enable support
            for sparse embeddings for hybrid retrieval. Defaults to False.
        sparse_embedding_function (BaseSparseEmbeddingFunction, optional): If enable_sparse
             is True, this object should be provided to convert text to a sparse embedding.
        hybrid_ranker (str): Specifies the type of ranker used in hybrid search queries.
            Currently only supports ['RRFRanker','WeightedRanker']. Defaults to "RRFRanker".
        hybrid_ranker_params (dict, optional): Configuration parameters for the hybrid ranker.
            The structure of this dictionary depends on the specific ranker being used:
            - For "RRFRanker", it should include:
                - 'k' (int): A parameter used in Reciprocal Rank Fusion (RRF). This value is used
                             to calculate the rank scores as part of the RRF algorithm, which combines
                             multiple ranking strategies into a single score to improve search relevance.
            - For "WeightedRanker", it expects:
                - 'weights' (list of float): A list of exactly two weights:
                     1. The weight for the dense embedding component.
                     2. The weight for the sparse embedding component.
                  These weights are used to adjust the importance of the dense and sparse components of the embeddings
                  in the hybrid retrieval process.
            Defaults to an empty dictionary, implying that the ranker will operate with its predefined default settings.

    Raises:
        ImportError: Unable to import `pymilvus`.
        MilvusException: Error communicating with Milvus, more can be found in logging
            under Debug.

    Returns:
        MilvusVectorstore: Vectorstore that supports add, delete, and query.

    Examples:
        `pip install llama-index-vector-stores-milvus`

        ```python
        from llama_index.vector_stores.milvus import MilvusVectorStore

        # Setup MilvusVectorStore
        vector_store = MilvusVectorStore(
            dim=1536,
            collection_name="your_collection_name",
            uri="http://milvus_address:port",
            token="your_milvus_token_here",
            overwrite=True
        )
        ```
    """

    stores_text: bool = True
    stores_node: bool = True

    uri: str = "http://localhost:19530"
    token: str = ""
    collection_name: str = "llamacollection"
    dim: Optional[int]
    embedding_field: str = DEFAULT_EMBEDDING_KEY
    doc_id_field: str = DEFAULT_DOC_ID_KEY
    similarity_metric: str = "IP"
    consistency_level: str = "Strong"
    overwrite: bool = False
    text_key: Optional[str]
    output_fields: List[str] = Field(default_factory=list)
    index_config: Optional[dict]
    search_config: Optional[dict]
    batch_size: int = DEFAULT_BATCH_SIZE
    enable_sparse: bool = False
    sparse_embedding_field: str = "sparse_embedding"
    sparse_embedding_function: Any
    hybrid_ranker: str
    hybrid_ranker_params: dict = {}

    _milvusclient: MilvusClient = PrivateAttr()
    _collection: Any = PrivateAttr()

    def __init__(
        self,
        uri: str = "http://localhost:19530",
        token: str = "",
        collection_name: str = "llamacollection",
        dim: Optional[int] = None,
        embedding_field: str = DEFAULT_EMBEDDING_KEY,
        doc_id_field: str = DEFAULT_DOC_ID_KEY,
        similarity_metric: str = "IP",
        consistency_level: str = "Strong",
        overwrite: bool = False,
        text_key: Optional[str] = None,
        output_fields: Optional[List[str]] = None,
        index_config: Optional[dict] = None,
        search_config: Optional[dict] = None,
        batch_size: int = DEFAULT_BATCH_SIZE,
        enable_sparse: bool = False,
        sparse_embedding_function: Optional[BaseSparseEmbeddingFunction] = None,
        hybrid_ranker: str = "RRFRanker",
        hybrid_ranker_params: dict = {},
        **kwargs: Any,
    ) -> None:
        """Init params."""
        super().__init__(
            collection_name=collection_name,
            dim=dim,
            embedding_field=embedding_field,
            doc_id_field=doc_id_field,
            consistency_level=consistency_level,
            overwrite=overwrite,
            text_key=text_key,
            output_fields=output_fields or [],
            index_config=index_config if index_config else {},
            search_config=search_config if search_config else {},
            batch_size=batch_size,
            enable_sparse=enable_sparse,
            sparse_embedding_function=sparse_embedding_function,
            hybrid_ranker=hybrid_ranker,
            hybrid_ranker_params=hybrid_ranker_params,
        )

        # Select the similarity metric
        similarity_metrics_map = {
            "ip": "IP",
            "l2": "L2",
            "euclidean": "L2",
            "cosine": "COSINE",
        }
        self.similarity_metric = similarity_metrics_map.get(
            similarity_metric.lower(), "L2"
        )
        # Connect to Milvus instance
        self._milvusclient = MilvusClient(
            uri=uri,
            token=token,
            **kwargs,  # pass additional arguments such as server_pem_path
        )
        # Delete previous collection if overwriting
        if overwrite and collection_name in self.client.list_collections():
            self._milvusclient.drop_collection(collection_name)

        # Create the collection if it does not exist
        if collection_name not in self.client.list_collections():
            if dim is None:
                raise ValueError("Dim argument required for collection creation.")
            if self.enable_sparse is False:
                self._milvusclient.create_collection(
                    collection_name=collection_name,
                    dimension=dim,
                    primary_field_name=MILVUS_ID_FIELD,
                    vector_field_name=embedding_field,
                    id_type="string",
                    metric_type=self.similarity_metric,
                    max_length=65_535,
                    consistency_level=consistency_level,
                )
            else:
                try:
                    _ = DataType.SPARSE_FLOAT_VECTOR
                except Exception as e:
                    logger.error(
                        "Hybrid retrieval is only supported in Milvus 2.4.0 or later."
                    )
                    raise NotImplementedError(
                        "Hybrid retrieval requires Milvus 2.4.0 or later."
                    ) from e
                self._create_hybrid_index(collection_name)

        if self.enable_sparse is False:
            self._collection = Collection(
                collection_name, using=self._milvusclient._using
            )
        else:
            host, port = extract_host_port(uri)
            connections.connect("default", host, port)
            self._collection = Collection(collection_name)

        self._create_index_if_required()

        self.enable_sparse = enable_sparse
        if self.enable_sparse is True and sparse_embedding_function is None:
            logger.warning("Sparse embedding function is not provided, using default.")
            self.sparse_embedding_function = get_defualt_sparse_embedding_function()
        elif self.enable_sparse is True and sparse_embedding_function is not None:
            self.sparse_embedding_function = sparse_embedding_function
        else:
            pass

        logger.debug(f"Successfully created a new collection: {self.collection_name}")

    @property
    def client(self) -> Any:
        """Get client."""
        return self._milvusclient

    def add(self, nodes: List[BaseNode], **add_kwargs: Any) -> List[str]:
        """Add the embeddings and their nodes into Milvus.

        Args:
            nodes (List[BaseNode]): List of nodes with embeddings
                to insert.

        Raises:
            MilvusException: Failed to insert data.

        Returns:
            List[str]: List of ids inserted.
        """
        insert_list = []
        insert_ids = []

        if self.enable_sparse is True and self.sparse_embedding_function is None:
            logger.fatal(
                "sparse_embedding_function is None when enable_sparse is True."
            )

        # Process that data we are going to insert
        for node in nodes:
            entry = node_to_metadata_dict(node)
            entry[MILVUS_ID_FIELD] = node.node_id
            entry[self.embedding_field] = node.embedding

            if self.enable_sparse is True:
                entry[
                    self.sparse_embedding_field
                ] = self.sparse_embedding_function.encode_documents([node.text])[0]

            insert_ids.append(node.node_id)
            insert_list.append(entry)

        # Insert the data into milvus
        for insert_batch in iter_batch(insert_list, self.batch_size):
            self._collection.insert(insert_batch)
        if add_kwargs.get("force_flush", False):
            self._collection.flush()
        self._create_index_if_required()
        logger.debug(
            f"Successfully inserted embeddings into: {self.collection_name} "
            f"Num Inserted: {len(insert_list)}"
        )
        return insert_ids

    def delete(self, ref_doc_id: str, **delete_kwargs: Any) -> None:
        """
        Delete nodes using with ref_doc_id.

        Args:
            ref_doc_id (str): The doc_id of the document to delete.

        Raises:
            MilvusException: Failed to delete the doc.
        """
        # Adds ability for multiple doc delete in future.
        doc_ids: List[str]
        if isinstance(ref_doc_id, list):
            doc_ids = ref_doc_id  # type: ignore
        else:
            doc_ids = [ref_doc_id]

        # Begin by querying for the primary keys to delete
        doc_ids = ['"' + entry + '"' for entry in doc_ids]
        entries = self._milvusclient.query(
            collection_name=self.collection_name,
            filter=f"{self.doc_id_field} in [{','.join(doc_ids)}]",
        )
        if len(entries) > 0:
            ids = [entry["id"] for entry in entries]
            self._milvusclient.delete(collection_name=self.collection_name, pks=ids)
            logger.debug(f"Successfully deleted embedding with doc_id: {doc_ids}")

    def query(self, query: VectorStoreQuery, **kwargs: Any) -> VectorStoreQueryResult:
        """Query index for top k most similar nodes.

        Args:
            query_embedding (List[float]): query embedding
            similarity_top_k (int): top k most similar nodes
            doc_ids (Optional[List[str]]): list of doc_ids to filter by
            node_ids (Optional[List[str]]): list of node_ids to filter by
            output_fields (Optional[List[str]]): list of fields to return
            embedding_field (Optional[str]): name of embedding field
        """
        if query.mode == VectorStoreQueryMode.DEFAULT:
            pass
        elif query.mode == VectorStoreQueryMode.HYBRID:
            if self.enable_sparse is False:
                raise ValueError(f"QueryMode is HYBRID, but enable_sparse is False.")
        else:
            raise ValueError(f"Milvus does not support {query.mode} yet.")

        expr = []
        output_fields = ["*"]

        # Parse the filter
        if query.filters is not None:
            expr.extend(_to_milvus_filter(query.filters))

        # Parse any docs we are filtering on
        if query.doc_ids is not None and len(query.doc_ids) != 0:
            expr_list = ['"' + entry + '"' for entry in query.doc_ids]
            expr.append(f"{self.doc_id_field} in [{','.join(expr_list)}]")

        # Parse any nodes we are filtering on
        if query.node_ids is not None and len(query.node_ids) != 0:
            expr_list = ['"' + entry + '"' for entry in query.node_ids]
            expr.append(f"{MILVUS_ID_FIELD} in [{','.join(expr_list)}]")

        # Limit output fields
        if query.output_fields is not None:
            output_fields = query.output_fields
        elif len(self.output_fields) > 0:
            output_fields = self.output_fields

        # Convert to string expression
        string_expr = ""
        if len(expr) != 0:
            string_expr = f" {query.filters.condition.value} ".join(expr)

        # Perform the search
        if query.mode == VectorStoreQueryMode.DEFAULT:
            # Perform default search
            res = self._milvusclient.search(
                collection_name=self.collection_name,
                data=[query.query_embedding],
                filter=string_expr,
                limit=query.similarity_top_k,
                output_fields=output_fields,
                search_params=self.search_config,
                anns_field=self.embedding_field,
            )
            logger.debug(
                f"Successfully searched embedding in collection: {self.collection_name}"
                f" Num Results: {len(res[0])}"
            )

            nodes = []
            similarities = []
            ids = []
            # Parse the results
            for hit in res[0]:
                if not self.text_key:
                    node = metadata_dict_to_node(
                        {
                            "_node_content": hit["entity"].get("_node_content", None),
                            "_node_type": hit["entity"].get("_node_type", None),
                        }
                    )
                else:
                    try:
                        text = hit["entity"].get(self.text_key)
                    except Exception:
                        raise ValueError(
                            "The passed in text_key value does not exist "
                            "in the retrieved entity."
                        )

                    metadata = {
                        key: hit["entity"].get(key) for key in self.output_fields
                    }
                    node = TextNode(text=text, metadata=metadata)

                nodes.append(node)
                similarities.append(hit["distance"])
                ids.append(hit["id"])

        else:
            # Perform hybrid search
            sparse_emb = self.sparse_embedding_function.encode_queries(
                [query.query_str]
            )[0]
            sparse_search_params = {"metric_type": "IP"}

            sparse_req = AnnSearchRequest(
                [sparse_emb],
                self.sparse_embedding_field,
                sparse_search_params,
                limit=query.similarity_top_k,
            )

            dense_search_params = {
                "metric_type": self.similarity_metric,
                "params": self.search_config,
            }
            dense_emb = query.query_embedding
            dense_req = AnnSearchRequest(
                [dense_emb],
                self.embedding_field,
                dense_search_params,
                limit=query.similarity_top_k,
            )
            ranker = None

            if WeightedRanker is None or RRFRanker is None:
                logger.error(
                    "Hybrid retrieval is only supported in Milvus 2.4.0 or later."
                )
                raise ValueError(
                    "Hybrid retrieval is only supported in Milvus 2.4.0 or later."
                )
            if self.hybrid_ranker == "WeightedRanker":
                if self.hybrid_ranker_params == {}:
                    self.hybrid_ranker_params = {"weights": [1.0, 1.0]}
                ranker = WeightedRanker(*self.hybrid_ranker_params["weights"])
            elif self.hybrid_ranker == "RRFRanker":
                if self.hybrid_ranker_params == {}:
                    self.hybrid_ranker_params = {"k": 60}
                ranker = RRFRanker(self.hybrid_ranker_params["k"])
            else:
                raise ValueError(f"Unsupported ranker: {self.hybrid_ranker}")

            res = self._collection.hybrid_search(
                [dense_req, sparse_req],
                rerank=ranker,
                limit=query.similarity_top_k,
                output_fields=output_fields,
            )

            logger.debug(
                f"Successfully searched embedding in collection: {self.collection_name}"
                f" Num Results: {len(res[0])}"
            )

            nodes = []
            similarities = []
            ids = []
            # Parse the results
            for hit in res[0]:
                if not self.text_key:
                    node = metadata_dict_to_node(
                        {
                            "_node_content": hit.entity.get("_node_content"),
                            "_node_type": hit.entity.get("_node_type"),
                        }
                    )
                else:
                    try:
                        text = hit.entity.get(self.text_key)
                    except Exception:
                        raise ValueError(
                            "The passed in text_key value does not exist "
                            "in the retrieved entity."
                        )

                    metadata = {key: hit.entity.get(key) for key in self.output_fields}
                    node = TextNode(text=text, metadata=metadata)

                nodes.append(node)
                similarities.append(hit.distance)
                ids.append(hit.id)

        return VectorStoreQueryResult(nodes=nodes, similarities=similarities, ids=ids)

    def _create_index_if_required(self, force: bool = False) -> None:
        # This helper method is introduced to allow the index to be created
        # both in the constructor and in the `add` method. The `force` flag is
        # provided to ensure that the index is created in the constructor even
        # if self.overwrite is false. In the `add` method, the index is
        # recreated only if self.overwrite is true.
        if self.enable_sparse is False:
            if (self._collection.has_index() and self.overwrite) or force:
                self._collection.release()
                self._collection.drop_index()
                base_params: Dict[str, Any] = self.index_config.copy()
                index_type: str = base_params.pop("index_type", "FLAT")
                index_params: Dict[str, Union[str, Dict[str, Any]]] = {
                    "params": base_params,
                    "metric_type": self.similarity_metric,
                    "index_type": index_type,
                }
                self._collection.create_index(
                    self.embedding_field, index_params=index_params
                )
                self._collection.load()
        else:
            if (
                self._collection.has_index(index_name=self.embedding_field)
                and self.overwrite
            ) or force:
                if self._collection.has_index(index_name=self.embedding_field) is True:
                    self._collection.release()
                    self._collection.drop_index(index_name=self.embedding_field)
                if (
                    self._collection.has_index(index_name=self.sparse_embedding_field)
                    is True
                ):
                    self._collection.drop_index(index_name=self.sparse_embedding_field)
                self._create_hybrid_index(self.collection_name)
                self._collection.load()

    def _create_hybrid_index(self, collection_name):
        schema = MilvusClient.create_schema(auto_id=False, enable_dynamic_field=True)

        schema.add_field(
            field_name="id",
            datatype=DataType.VARCHAR,
            max_length=65535,
            is_primary=True,
        )
        schema.add_field(
            field_name=self.embedding_field,
            datatype=DataType.FLOAT_VECTOR,
            dim=self.dim,
        )
        schema.add_field(
            field_name=self.sparse_embedding_field,
            datatype=DataType.SPARSE_FLOAT_VECTOR,
        )
        self._collection = Collection(
            collection_name, schema=schema, using=self._milvusclient._using
        )

        sparse_index = {"index_type": "SPARSE_INVERTED_INDEX", "metric_type": "IP"}
        self._collection.create_index(self.sparse_embedding_field, sparse_index)
        base_params = self.index_config.copy()
        index_type = base_params.pop("index_type", "FLAT")
        dense_index = {
            "params": base_params,
            "metric_type": self.similarity_metric,
            "index_type": index_type,
        }
        self._collection.create_index(self.embedding_field, dense_index)

client property #

client: Any

Get client.

add #

add(nodes: List[BaseNode], **add_kwargs: Any) -> List[str]

Add the embeddings and their nodes into Milvus.

Parameters:

Name Type Description Default
nodes List[BaseNode]

List of nodes with embeddings to insert.

required

Raises:

Type Description
MilvusException

Failed to insert data.

Returns:

Type Description
List[str]

List[str]: List of ids inserted.

Source code in llama-index-integrations/vector_stores/llama-index-vector-stores-milvus/llama_index/vector_stores/milvus/base.py
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
def add(self, nodes: List[BaseNode], **add_kwargs: Any) -> List[str]:
    """Add the embeddings and their nodes into Milvus.

    Args:
        nodes (List[BaseNode]): List of nodes with embeddings
            to insert.

    Raises:
        MilvusException: Failed to insert data.

    Returns:
        List[str]: List of ids inserted.
    """
    insert_list = []
    insert_ids = []

    if self.enable_sparse is True and self.sparse_embedding_function is None:
        logger.fatal(
            "sparse_embedding_function is None when enable_sparse is True."
        )

    # Process that data we are going to insert
    for node in nodes:
        entry = node_to_metadata_dict(node)
        entry[MILVUS_ID_FIELD] = node.node_id
        entry[self.embedding_field] = node.embedding

        if self.enable_sparse is True:
            entry[
                self.sparse_embedding_field
            ] = self.sparse_embedding_function.encode_documents([node.text])[0]

        insert_ids.append(node.node_id)
        insert_list.append(entry)

    # Insert the data into milvus
    for insert_batch in iter_batch(insert_list, self.batch_size):
        self._collection.insert(insert_batch)
    if add_kwargs.get("force_flush", False):
        self._collection.flush()
    self._create_index_if_required()
    logger.debug(
        f"Successfully inserted embeddings into: {self.collection_name} "
        f"Num Inserted: {len(insert_list)}"
    )
    return insert_ids

delete #

delete(ref_doc_id: str, **delete_kwargs: Any) -> None

Delete nodes using with ref_doc_id.

Parameters:

Name Type Description Default
ref_doc_id str

The doc_id of the document to delete.

required

Raises:

Type Description
MilvusException

Failed to delete the doc.

Source code in llama-index-integrations/vector_stores/llama-index-vector-stores-milvus/llama_index/vector_stores/milvus/base.py
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
def delete(self, ref_doc_id: str, **delete_kwargs: Any) -> None:
    """
    Delete nodes using with ref_doc_id.

    Args:
        ref_doc_id (str): The doc_id of the document to delete.

    Raises:
        MilvusException: Failed to delete the doc.
    """
    # Adds ability for multiple doc delete in future.
    doc_ids: List[str]
    if isinstance(ref_doc_id, list):
        doc_ids = ref_doc_id  # type: ignore
    else:
        doc_ids = [ref_doc_id]

    # Begin by querying for the primary keys to delete
    doc_ids = ['"' + entry + '"' for entry in doc_ids]
    entries = self._milvusclient.query(
        collection_name=self.collection_name,
        filter=f"{self.doc_id_field} in [{','.join(doc_ids)}]",
    )
    if len(entries) > 0:
        ids = [entry["id"] for entry in entries]
        self._milvusclient.delete(collection_name=self.collection_name, pks=ids)
        logger.debug(f"Successfully deleted embedding with doc_id: {doc_ids}")

query #

query(query: VectorStoreQuery, **kwargs: Any) -> VectorStoreQueryResult

Query index for top k most similar nodes.

Parameters:

Name Type Description Default
query_embedding List[float]

query embedding

required
similarity_top_k int

top k most similar nodes

required
doc_ids Optional[List[str]]

list of doc_ids to filter by

required
node_ids Optional[List[str]]

list of node_ids to filter by

required
output_fields Optional[List[str]]

list of fields to return

required
embedding_field Optional[str]

name of embedding field

required
Source code in llama-index-integrations/vector_stores/llama-index-vector-stores-milvus/llama_index/vector_stores/milvus/base.py
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
def query(self, query: VectorStoreQuery, **kwargs: Any) -> VectorStoreQueryResult:
    """Query index for top k most similar nodes.

    Args:
        query_embedding (List[float]): query embedding
        similarity_top_k (int): top k most similar nodes
        doc_ids (Optional[List[str]]): list of doc_ids to filter by
        node_ids (Optional[List[str]]): list of node_ids to filter by
        output_fields (Optional[List[str]]): list of fields to return
        embedding_field (Optional[str]): name of embedding field
    """
    if query.mode == VectorStoreQueryMode.DEFAULT:
        pass
    elif query.mode == VectorStoreQueryMode.HYBRID:
        if self.enable_sparse is False:
            raise ValueError(f"QueryMode is HYBRID, but enable_sparse is False.")
    else:
        raise ValueError(f"Milvus does not support {query.mode} yet.")

    expr = []
    output_fields = ["*"]

    # Parse the filter
    if query.filters is not None:
        expr.extend(_to_milvus_filter(query.filters))

    # Parse any docs we are filtering on
    if query.doc_ids is not None and len(query.doc_ids) != 0:
        expr_list = ['"' + entry + '"' for entry in query.doc_ids]
        expr.append(f"{self.doc_id_field} in [{','.join(expr_list)}]")

    # Parse any nodes we are filtering on
    if query.node_ids is not None and len(query.node_ids) != 0:
        expr_list = ['"' + entry + '"' for entry in query.node_ids]
        expr.append(f"{MILVUS_ID_FIELD} in [{','.join(expr_list)}]")

    # Limit output fields
    if query.output_fields is not None:
        output_fields = query.output_fields
    elif len(self.output_fields) > 0:
        output_fields = self.output_fields

    # Convert to string expression
    string_expr = ""
    if len(expr) != 0:
        string_expr = f" {query.filters.condition.value} ".join(expr)

    # Perform the search
    if query.mode == VectorStoreQueryMode.DEFAULT:
        # Perform default search
        res = self._milvusclient.search(
            collection_name=self.collection_name,
            data=[query.query_embedding],
            filter=string_expr,
            limit=query.similarity_top_k,
            output_fields=output_fields,
            search_params=self.search_config,
            anns_field=self.embedding_field,
        )
        logger.debug(
            f"Successfully searched embedding in collection: {self.collection_name}"
            f" Num Results: {len(res[0])}"
        )

        nodes = []
        similarities = []
        ids = []
        # Parse the results
        for hit in res[0]:
            if not self.text_key:
                node = metadata_dict_to_node(
                    {
                        "_node_content": hit["entity"].get("_node_content", None),
                        "_node_type": hit["entity"].get("_node_type", None),
                    }
                )
            else:
                try:
                    text = hit["entity"].get(self.text_key)
                except Exception:
                    raise ValueError(
                        "The passed in text_key value does not exist "
                        "in the retrieved entity."
                    )

                metadata = {
                    key: hit["entity"].get(key) for key in self.output_fields
                }
                node = TextNode(text=text, metadata=metadata)

            nodes.append(node)
            similarities.append(hit["distance"])
            ids.append(hit["id"])

    else:
        # Perform hybrid search
        sparse_emb = self.sparse_embedding_function.encode_queries(
            [query.query_str]
        )[0]
        sparse_search_params = {"metric_type": "IP"}

        sparse_req = AnnSearchRequest(
            [sparse_emb],
            self.sparse_embedding_field,
            sparse_search_params,
            limit=query.similarity_top_k,
        )

        dense_search_params = {
            "metric_type": self.similarity_metric,
            "params": self.search_config,
        }
        dense_emb = query.query_embedding
        dense_req = AnnSearchRequest(
            [dense_emb],
            self.embedding_field,
            dense_search_params,
            limit=query.similarity_top_k,
        )
        ranker = None

        if WeightedRanker is None or RRFRanker is None:
            logger.error(
                "Hybrid retrieval is only supported in Milvus 2.4.0 or later."
            )
            raise ValueError(
                "Hybrid retrieval is only supported in Milvus 2.4.0 or later."
            )
        if self.hybrid_ranker == "WeightedRanker":
            if self.hybrid_ranker_params == {}:
                self.hybrid_ranker_params = {"weights": [1.0, 1.0]}
            ranker = WeightedRanker(*self.hybrid_ranker_params["weights"])
        elif self.hybrid_ranker == "RRFRanker":
            if self.hybrid_ranker_params == {}:
                self.hybrid_ranker_params = {"k": 60}
            ranker = RRFRanker(self.hybrid_ranker_params["k"])
        else:
            raise ValueError(f"Unsupported ranker: {self.hybrid_ranker}")

        res = self._collection.hybrid_search(
            [dense_req, sparse_req],
            rerank=ranker,
            limit=query.similarity_top_k,
            output_fields=output_fields,
        )

        logger.debug(
            f"Successfully searched embedding in collection: {self.collection_name}"
            f" Num Results: {len(res[0])}"
        )

        nodes = []
        similarities = []
        ids = []
        # Parse the results
        for hit in res[0]:
            if not self.text_key:
                node = metadata_dict_to_node(
                    {
                        "_node_content": hit.entity.get("_node_content"),
                        "_node_type": hit.entity.get("_node_type"),
                    }
                )
            else:
                try:
                    text = hit.entity.get(self.text_key)
                except Exception:
                    raise ValueError(
                        "The passed in text_key value does not exist "
                        "in the retrieved entity."
                    )

                metadata = {key: hit.entity.get(key) for key in self.output_fields}
                node = TextNode(text=text, metadata=metadata)

            nodes.append(node)
            similarities.append(hit.distance)
            ids.append(hit.id)

    return VectorStoreQueryResult(nodes=nodes, similarities=similarities, ids=ids)