Skip to content

Neptune

NeptuneAnalyticsGraphStore #

Bases: NeptuneBaseGraphStore

Source code in llama-index-integrations/graph_stores/llama-index-graph-stores-neptune/llama_index/graph_stores/neptune/analytics.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
class NeptuneAnalyticsGraphStore(NeptuneBaseGraphStore):
    def __init__(
        self,
        graph_identifier: str,
        client: Any = None,
        credentials_profile_name: Optional[str] = None,
        region_name: Optional[str] = None,
        node_label: str = "Entity",
        **kwargs: Any,
    ) -> None:
        """Create a new Neptune Analytics graph wrapper instance."""
        self.node_label = node_label
        try:
            if client is not None:
                self._client = client
            else:
                import boto3

                if credentials_profile_name is not None:
                    session = boto3.Session(profile_name=credentials_profile_name)
                else:
                    # use default credentials
                    session = boto3.Session()

                self.graph_identifier = graph_identifier

                if region_name:
                    self._client = session.client(
                        "neptune-graph", region_name=region_name
                    )
                else:
                    self._client = session.client("neptune-graph")

        except ImportError:
            raise ModuleNotFoundError(
                "Could not import boto3 python package. "
                "Please install it with `pip install boto3`."
            )
        except Exception as e:
            if type(e).__name__ == "UnknownServiceError":
                raise ModuleNotFoundError(
                    "NeptuneGraph requires a boto3 version 1.34.40 or greater."
                    "Please install it with `pip install -U boto3`."
                ) from e
            else:
                raise ValueError(
                    "Could not load credentials to authenticate with AWS client. "
                    "Please check that credentials in the specified "
                    "profile name are valid."
                ) from e

        try:
            self._refresh_schema()
        except Exception as e:
            logger.error(
                f"Could not retrieve schema for Neptune due to the following error: {e}"
            )
            self.schema = None

    def query(self, query: str, params: dict = {}) -> Dict[str, Any]:
        """Query Neptune Analytics graph."""
        try:
            logger.debug(f"query() query: {query} parameters: {json.dumps(params)}")
            resp = self.client.execute_query(
                graphIdentifier=self.graph_identifier,
                queryString=query,
                parameters=params,
                language="OPEN_CYPHER",
            )
            return json.loads(resp["payload"].read().decode("UTF-8"))["results"]
        except Exception as e:
            raise NeptuneQueryException(
                {
                    "message": "An error occurred while executing the query.",
                    "details": str(e),
                }
            )

    def _get_summary(self) -> Dict:
        try:
            response = self.client.get_graph_summary(
                graphIdentifier=self.graph_identifier, mode="detailed"
            )
        except Exception as e:
            raise NeptuneQueryException(
                {
                    "message": ("Summary API error occurred on Neptune Analytics"),
                    "details": str(e),
                }
            )

        try:
            summary = response["graphSummary"]
        except Exception:
            raise NeptuneQueryException(
                {
                    "message": "Summary API did not return a valid response.",
                    "details": response.content.decode(),
                }
            )
        else:
            return summary

query #

query(query: str, params: dict = {}) -> Dict[str, Any]

Query Neptune Analytics graph.

Source code in llama-index-integrations/graph_stores/llama-index-graph-stores-neptune/llama_index/graph_stores/neptune/analytics.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def query(self, query: str, params: dict = {}) -> Dict[str, Any]:
    """Query Neptune Analytics graph."""
    try:
        logger.debug(f"query() query: {query} parameters: {json.dumps(params)}")
        resp = self.client.execute_query(
            graphIdentifier=self.graph_identifier,
            queryString=query,
            parameters=params,
            language="OPEN_CYPHER",
        )
        return json.loads(resp["payload"].read().decode("UTF-8"))["results"]
    except Exception as e:
        raise NeptuneQueryException(
            {
                "message": "An error occurred while executing the query.",
                "details": str(e),
            }
        )

NeptuneDatabaseGraphStore #

Bases: NeptuneBaseGraphStore

Source code in llama-index-integrations/graph_stores/llama-index-graph-stores-neptune/llama_index/graph_stores/neptune/database.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
class NeptuneDatabaseGraphStore(NeptuneBaseGraphStore):
    def __init__(
        self,
        host: str,
        port: int = 8182,
        use_https: bool = True,
        client: Any = None,
        credentials_profile_name: Optional[str] = None,
        region_name: Optional[str] = None,
        sign: bool = True,
        node_label: str = "Entity",
        **kwargs: Any,
    ) -> None:
        """Create a new Neptune Database graph wrapper instance."""
        self.node_label = node_label
        try:
            if credentials_profile_name is not None:
                session = boto3.Session(profile_name=credentials_profile_name)
            else:
                # use default credentials
                session = boto3.Session()

            client_params = {}
            if region_name:
                client_params["region_name"] = region_name

            protocol = "https" if use_https else "http"

            client_params["endpoint_url"] = f"{protocol}://{host}:{port}"

            if sign:
                self._client = session.client("neptunedata", **client_params)
            else:
                from botocore import UNSIGNED
                from botocore.config import Config

                self._client = session.client(
                    "neptunedata",
                    **client_params,
                    config=Config(signature_version=UNSIGNED),
                )

        except ImportError:
            raise ModuleNotFoundError(
                "Could not import boto3 python package. "
                "Please install it with `pip install boto3`."
            )
        except Exception as e:
            if type(e).__name__ == "UnknownServiceError":
                raise ModuleNotFoundError(
                    "Neptune Database requires a boto3 version 1.34.40 or greater."
                    "Please install it with `pip install -U boto3`."
                ) from e
            else:
                raise ValueError(
                    "Could not load credentials to authenticate with AWS client. "
                    "Please check that credentials in the specified "
                    "profile name are valid."
                ) from e

        try:
            self._refresh_schema()
        except Exception as e:
            logger.error(
                f"Could not retrieve schema for Neptune due to the following error: {e}"
            )
            self.schema = None

    def query(self, query: str, params: dict = {}) -> Dict[str, Any]:
        """Query Neptune database."""
        try:
            logger.debug(f"query() query: {query} parameters: {json.dumps(params)}")
            return self.client.execute_open_cypher_query(
                openCypherQuery=query, parameters=json.dumps(params)
            )["results"]
        except Exception as e:
            raise NeptuneQueryException(
                {
                    "message": "An error occurred while executing the query.",
                    "details": str(e),
                }
            )

    def _get_summary(self) -> Dict:
        try:
            response = self.client.get_propertygraph_summary()
        except Exception as e:
            raise NeptuneQueryException(
                {
                    "message": (
                        "Summary API is not available for this instance of Neptune,"
                        "ensure the engine version is >=1.2.1.0"
                    ),
                    "details": str(e),
                }
            )

        try:
            summary = response["payload"]["graphSummary"]
        except Exception:
            raise NeptuneQueryException(
                {
                    "message": "Summary API did not return a valid response.",
                    "details": response.content.decode(),
                }
            )
        else:
            return summary

query #

query(query: str, params: dict = {}) -> Dict[str, Any]

Query Neptune database.

Source code in llama-index-integrations/graph_stores/llama-index-graph-stores-neptune/llama_index/graph_stores/neptune/database.py
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def query(self, query: str, params: dict = {}) -> Dict[str, Any]:
    """Query Neptune database."""
    try:
        logger.debug(f"query() query: {query} parameters: {json.dumps(params)}")
        return self.client.execute_open_cypher_query(
            openCypherQuery=query, parameters=json.dumps(params)
        )["results"]
    except Exception as e:
        raise NeptuneQueryException(
            {
                "message": "An error occurred while executing the query.",
                "details": str(e),
            }
        )