Skip to content

Redis ingestion pipeline

RedisIngestionPipelinePack #

Bases: BaseLlamaPack

Redis Ingestion Pipeline Completion pack.

Source code in llama-index-packs/llama-index-packs-redis-ingestion-pipeline/llama_index/packs/redis_ingestion_pipeline/base.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class RedisIngestionPipelinePack(BaseLlamaPack):
    """Redis Ingestion Pipeline Completion pack."""

    def __init__(
        self,
        transformations: List[TransformComponent],
        hostname: str = "localhost",
        port: int = 6379,
        cache_collection_name: str = "ingest_cache",
        vector_collection_name: str = "vector_store",
        **kwargs: Any,
    ) -> None:
        """Init params."""
        self.vector_store = RedisVectorStore(
            hostname=hostname,
            port=port,
            collection_name=vector_collection_name,
        )

        self.ingest_cache = IngestionCache(
            cache=RedisCache(
                hostname=hostname,
                port=port,
            ),
            collection_name=cache_collection_name,
        )

        self.pipeline = IngestionPipeline(
            transformations=transformations,
            cache=self.ingest_cache,
            vector_store=self.vector_store,
        )

    def get_modules(self) -> Dict[str, Any]:
        """Get modules."""
        return {
            "pipeline": self.pipeline,
            "vector_store": self.vector_store,
            "ingest_cache": self.ingest_cache,
        }

    def run(self, inputs: List[BaseNode], **kwargs: Any) -> List[BaseNode]:
        """Run the pipeline."""
        return self.pipeline.run(nodes=inputs, **kwargs)

get_modules #

get_modules() -> Dict[str, Any]

Get modules.

Source code in llama-index-packs/llama-index-packs-redis-ingestion-pipeline/llama_index/packs/redis_ingestion_pipeline/base.py
47
48
49
50
51
52
53
def get_modules(self) -> Dict[str, Any]:
    """Get modules."""
    return {
        "pipeline": self.pipeline,
        "vector_store": self.vector_store,
        "ingest_cache": self.ingest_cache,
    }

run #

run(inputs: List[BaseNode], **kwargs: Any) -> List[BaseNode]

Run the pipeline.

Source code in llama-index-packs/llama-index-packs-redis-ingestion-pipeline/llama_index/packs/redis_ingestion_pipeline/base.py
55
56
57
def run(self, inputs: List[BaseNode], **kwargs: Any) -> List[BaseNode]:
    """Run the pipeline."""
    return self.pipeline.run(nodes=inputs, **kwargs)