Skip to content

Mymagic

MyMagicAI #

Bases: LLM

MyMagicAI LLM.

Examples:

pip install llama-index-llms-mymagic

from llama_index.llms.mistralai import MistralAI

llm = MistralAI(model="mistral7b", api_key="<api_key>")

resp = llm.complete("Paul Graham is ")

print(resp)
Source code in llama-index-integrations/llms/llama-index-llms-mymagic/llama_index/llms/mymagic/base.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
class MyMagicAI(LLM):
    """MyMagicAI LLM.

    Examples:
        `pip install llama-index-llms-mymagic`

        ```python
        from llama_index.llms.mistralai import MistralAI

        llm = MistralAI(model="mistral7b", api_key="<api_key>")

        resp = llm.complete("Paul Graham is ")

        print(resp)
        ```
    """

    base_url_template: str = "https://{model}.mymagic.ai"
    api_key: str = None
    model: str = Field(default="mistral7b", description="The MyMagicAI model to use.")
    max_tokens: int = Field(
        default=10, description="The maximum number of tokens to generate."
    )
    question = Field(default="", description="The user question.")
    storage_provider: str = Field(
        default="gcs", description="The storage provider to use."
    )
    bucket_name: str = Field(
        default="your-bucket-name",
        description="The bucket name where the data is stored.",
    )
    session: str = Field(
        default="test-session",
        description="The session to use. This is a subfolder in the bucket where your data is located.",
    )
    role_arn: Optional[str] = Field(
        None, description="ARN for role assumption in AWS S3."
    )
    system_prompt: str = Field(
        default="Answer the question based only on the given content. Do not give explanations or examples. Do not continue generating more text after the answer.",
        description="The system prompt to use.",
    )
    question_data: Dict[str, Any] = Field(
        default_factory=dict, description="The data to send to the MyMagicAI API."
    )
    region: Optional[str] = Field(
        "eu-west-2", description="The region the bucket is in. Only used for AWS S3."
    )
    return_output: Optional[bool] = Field(
        False, description="Whether MyMagic API should return the output json"
    )
    input_json_file: Optional[str] = None

    structured_output: Optional[Dict[str, Any]] = Field(
        None, description="User-defined structure for the response output"
    )

    def __init__(
        self,
        api_key: str,
        storage_provider: str,
        bucket_name: str,
        session: str,
        system_prompt: Optional[str],
        role_arn: Optional[str] = None,
        region: Optional[str] = None,
        return_output: Optional[bool] = False,
        input_json_file: Optional[str] = None,
        structured_output: Optional[Dict[str, Any]] = None,
        **kwargs: Any,
    ) -> None:
        super().__init__(**kwargs)
        self.return_output = return_output

        self.question_data = {
            "storage_provider": storage_provider,
            "bucket_name": bucket_name,
            "personal_access_token": api_key,
            "session": session,
            "max_tokens": self.max_tokens,
            "role_arn": role_arn,
            "system_prompt": system_prompt,
            "region": region,
            "return_output": return_output,
            "input_json_file": input_json_file,
            "structured_output": structured_output,
        }

    @classmethod
    def class_name(cls) -> str:
        return "MyMagicAI"

    def _construct_url(self, model: str) -> str:
        """Constructs the API endpoint URL based on the specified model."""
        return self.base_url_template.format(model=model)

    async def _submit_question(self, question_data: Dict[str, Any]) -> Dict[str, Any]:
        timeout_config = httpx.Timeout(600.0, connect=60.0)

        async with httpx.AsyncClient(timeout=timeout_config) as client:
            url = f"{self._construct_url(self.model)}/submit_question"
            resp = await client.post(url, json=question_data)
            resp.raise_for_status()
            return resp.json()

    def _submit_question_sync(self, question_data: Dict[str, Any]) -> Dict[str, Any]:
        """Submits a question to the model synchronously."""
        url = f"{self._construct_url(self.model)}/submit_question"
        resp = requests.post(url, json=question_data)
        resp.raise_for_status()
        return resp.json()

    def _get_result_sync(self, task_id: str) -> Dict[str, Any]:
        """Polls for the result of a task synchronously."""
        url = f"{self._construct_url(self.model)}/get_result/{task_id}"
        response = requests.get(url)
        response.raise_for_status()
        return response.json()

    async def _get_result(self, task_id: str) -> Dict[str, Any]:
        async with httpx.AsyncClient() as client:
            resp = await client.get(
                f"{self._construct_url(self.model)}/get_result/{task_id}"
            )
            resp.raise_for_status()
            return resp.json()

    async def acomplete(
        self,
        question: str,
        model: Optional[str] = None,
        max_tokens: Optional[int] = None,
        poll_interval: float = 1.0,
    ) -> CompletionResponse:
        self.question_data["question"] = question
        self.model = self.question_data["model"] = model or self.model
        self.max_tokens = self.question_data["max_tokens"] = (
            max_tokens or self.max_tokens
        )

        task_response = await self._submit_question(self.question_data)

        if self.return_output:
            return task_response

        task_id = task_response.get("task_id")
        while True:
            result = await self._get_result(task_id)
            if result["status"] != "PENDING":
                return result
            await asyncio.sleep(poll_interval)

    def complete(
        self,
        question: str,
        model: Optional[str] = None,
        max_tokens: Optional[int] = None,
        poll_interval: float = 1.0,
    ) -> CompletionResponse:
        self.question_data["question"] = question
        self.model = self.question_data["model"] = model or self.model
        self.max_tokens = self.question_data["max_tokens"] = (
            max_tokens or self.max_tokens
        )

        task_response = self._submit_question_sync(self.question_data)
        if self.return_output:
            return task_response

        task_id = task_response.get("task_id")
        while True:
            result = self._get_result_sync(task_id)
            if result["status"] != "PENDING":
                return CompletionResponse(
                    text=result.get("message", ""),
                    additional_kwargs={"status": result["status"]},
                )
            time.sleep(poll_interval)

    def stream_complete(self, question: str) -> CompletionResponseGen:
        raise NotImplementedError(
            "MyMagicAI does not currently support streaming completion."
        )

    async def achat(self, question: str) -> ChatResponse:
        raise NotImplementedError("MyMagicAI does not currently support chat.")

    def chat(self, question: str) -> ChatResponse:
        raise NotImplementedError("MyMagicAI does not currently support chat.")

    async def astream_complete(self, question: str) -> CompletionResponseAsyncGen:
        raise NotImplementedError("MyMagicAI does not currently support streaming.")

    async def astream_chat(self, question: str) -> ChatResponseAsyncGen:
        raise NotImplementedError("MyMagicAI does not currently support streaming.")

    def chat(self, question: str) -> ChatResponse:
        raise NotImplementedError("MyMagicAI does not currently support chat.")

    def stream_chat(self, question: str) -> ChatResponseGen:
        raise NotImplementedError("MyMagicAI does not currently support chat.")

    @property
    def metadata(self) -> LLMMetadata:
        """LLM metadata."""
        return LLMMetadata(
            num_output=self.max_tokens,
            model_name=self.model,
            is_chat_model=False,
        )

metadata property #

metadata: LLMMetadata

LLM metadata.