forked from shroominic/codeinterpreter-api
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsession.py
198 lines (170 loc) · 7.94 KB
/
session.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
import uuid, base64, re
from io import BytesIO
from typing import Optional
from codeboxapi import CodeBox # type: ignore
from codeboxapi.schema import CodeBoxOutput # type: ignore
from langchain.tools import StructuredTool
from langchain.chat_models import ChatOpenAI
from langchain.chat_models.base import BaseChatModel
from langchain.prompts.chat import MessagesPlaceholder
from langchain.agents import AgentExecutor, BaseSingleActionAgent
from langchain.memory import ConversationBufferMemory
from codeinterpreterapi.schema import CodeInterpreterResponse, CodeInput, File, UserRequest
from codeinterpreterapi.config import settings
from codeinterpreterapi.chains.functions_agent import OpenAIFunctionsAgent
from codeinterpreterapi.prompts import code_interpreter_system_message
from codeinterpreterapi.callbacks import CodeCallbackHandler
from codeinterpreterapi.chains.modifications_check import get_file_modifications
from codeinterpreterapi.chains.remove_download_link import remove_download_link
class CodeInterpreterSession:
def __init__(self, model=None, openai_api_key=None) -> None:
self.codebox = CodeBox()
self.tools: list[StructuredTool] = self._tools()
self.llm: BaseChatModel = self._llm(model, openai_api_key)
self.agent_executor: AgentExecutor = self._agent_executor()
self.input_files: list[File] = []
self.output_files: list[File] = []
async def astart(self) -> None:
await self.codebox.astart()
def _tools(self) -> list[StructuredTool]:
return [
StructuredTool(
name="python",
description=
# TODO: variables as context to the agent
# TODO: current files as context to the agent
"Input a string of code to a python interpreter (jupyter kernel). "
"Variables are preserved between runs. ",
func=self.run_handler,
coroutine=self.arun_handler,
args_schema=CodeInput,
),
]
def _llm(self, model: Optional[str] = None, openai_api_key: Optional[str] = None) -> BaseChatModel:
if model is None:
model = "gpt-4"
if openai_api_key is None:
if settings.OPENAI_API_KEY is None:
raise ValueError("OpenAI API key missing.")
else:
openai_api_key = settings.OPENAI_API_KEY
return ChatOpenAI(
temperature=0.03,
model=model,
openai_api_key=openai_api_key,
max_retries=3,
request_timeout=60 * 3,
) # type: ignore
def _agent(self) -> BaseSingleActionAgent:
return OpenAIFunctionsAgent.from_llm_and_tools(
llm=self.llm,
tools=self.tools,
system_message=code_interpreter_system_message,
extra_prompt_messages=[MessagesPlaceholder(variable_name="memory")],
)
def _agent_executor(self) -> AgentExecutor:
return AgentExecutor.from_agent_and_tools(
agent=self._agent(),
callbacks=[CodeCallbackHandler(self)],
max_iterations=9,
tools=self.tools,
verbose=settings.VERBOSE,
memory=ConversationBufferMemory(memory_key="memory", return_messages=True),
)
async def show_code(self, code: str) -> None:
"""Callback function to show code to the user."""
if settings.VERBOSE:
print(code)
def run_handler(self, code: str):
raise NotImplementedError("Use arun_handler for now.")
async def arun_handler(self, code: str):
"""Run code in container and send the output to the user"""
output: CodeBoxOutput = await self.codebox.arun(code)
if not isinstance(output.content, str):
raise TypeError("Expected output.content to be a string.")
if output.type == "image/png":
filename = f"image-{uuid.uuid4()}.png"
file_buffer = BytesIO(base64.b64decode(output.content))
file_buffer.name = filename
self.output_files.append(File(name=filename, content=file_buffer.read()))
return f"Image {filename} got send to the user."
elif output.type == "error":
if "ModuleNotFoundError" in output.content:
if package := re.search(
r"ModuleNotFoundError: No module named '(.*)'", output.content
):
await self.codebox.ainstall(package.group(1))
return f"{package.group(1)} was missing but got installed now. Please try again."
else: pass
# TODO: preanalyze error to optimize next code generation
if settings.VERBOSE:
print("Error:", output.content)
elif modifications := await get_file_modifications(code, self.llm):
for filename in modifications:
if filename in [file.name for file in self.input_files]:
continue
fileb = await self.codebox.adownload(filename)
if not fileb.content:
continue
file_buffer = BytesIO(fileb.content)
file_buffer.name = filename
self.output_files.append(
File(name=filename, content=file_buffer.read())
)
return output.content
async def input_handler(self, request: UserRequest):
if not request.files:
return
if not request.content:
request.content = (
"I uploaded, just text me back and confirm that you got the file(s)."
)
request.content += "\n**The user uploaded the following files: **\n"
for file in request.files:
self.input_files.append(file)
request.content += f"[Attachment: {file.name}]\n"
await self.codebox.aupload(file.name, file.content)
request.content += "**File(s) are now available in the cwd. **\n"
async def output_handler(self, final_response: str) -> CodeInterpreterResponse:
"""Embed images in the response"""
for file in self.output_files:
if str(file.name) in final_response:
# rm ![Any](file.name) from the response
final_response = re.sub(rf"\n\n!\[.*\]\(.*\)", "", final_response)
if self.output_files and re.search(rf"\n\[.*\]\(.*\)", final_response):
final_response = await remove_download_link(final_response, self.llm)
return CodeInterpreterResponse(content=final_response, files=self.output_files)
async def generate_response(
self,
user_msg: str,
files: list[File] = [],
detailed_error: bool = False,
) -> CodeInterpreterResponse:
"""Generate a Code Interpreter response based on the user's input."""
user_request = UserRequest(content=user_msg, files=files)
try:
await self.input_handler(user_request)
response = await self.agent_executor.arun(input=user_request.content)
return await self.output_handler(response)
except Exception as e:
if settings.VERBOSE:
import traceback
traceback.print_exc()
if detailed_error:
return CodeInterpreterResponse(
content=f"Error in CodeInterpreterSession: {e.__class__.__name__} - {e}"
)
else:
return CodeInterpreterResponse(
content="Sorry, something went while generating your response."
"Please try again or restart the session."
)
async def is_running(self) -> bool:
return await self.codebox.astatus() == "running"
async def astop(self) -> None:
await self.codebox.astop()
async def __aenter__(self) -> "CodeInterpreterSession":
await self.astart()
return self
async def __aexit__(self, exc_type, exc_value, traceback) -> None:
await self.astop()