Skip to content

Commit

Permalink
⚙️ improve localbox error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
shroominic committed Feb 6, 2024
1 parent 23f77dd commit 579ef3e
Showing 1 changed file with 129 additions and 62 deletions.
191 changes: 129 additions & 62 deletions src/codeboxapi/box/localbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,40 @@ def start(self) -> CodeBoxStatus:
return CodeBoxStatus(status="started")

def _connect(self) -> None:
response = requests.post(
f"{self.kernel_url}/kernels",
headers={"Content-Type": "application/json"},
timeout=270,
)
self.kernel_id = response.json()["id"]
# Implement retry logic for kernel connection
for attempt in range(5):
try:
response = requests.post(
f"{self.kernel_url}/kernels",
headers={"Content-Type": "application/json"},
timeout=60,
)
if response.status_code == 201:
self.kernel_id = response.json().get("id", None)
if self.kernel_id:
break
except requests.RequestException as e:
print(f"Could not connect to kernel. {e}")
time.sleep(5) # Wait for 5 seconds before retrying

if self.kernel_id is None:
raise Exception("Could not start kernel")
raise Exception("Could not start kernel after multiple attempts")

self.ws = ws_connect_sync(f"{self.ws_url}/kernels/{self.kernel_id}/channels")
# Connect to WebSocket with retry logic
for attempt in range(5):
try:
self.ws = ws_connect_sync(
f"{self.ws_url}/kernels/{self.kernel_id}/channels",
open_timeout=60,
close_timeout=60,
)
break # Break the loop if connection is successful
except (ConnectionClosedError, TimeoutError) as e:
print(f"Attempt {attempt + 1}: WebSocket connection failed. Error: {e}")
time.sleep(5) # Wait for 5 seconds before retrying

if not self.ws:
raise Exception("Could not connect to WebSocket after multiple attempts")

def _check_port(self) -> None:
try:
Expand Down Expand Up @@ -189,14 +213,45 @@ async def astart(self) -> CodeBoxStatus:

async def _aconnect(self) -> None:
if self.aiohttp_session is None:
self.aiohttp_session = aiohttp.ClientSession()
response = await self.aiohttp_session.post(
f"{self.kernel_url}/kernels", headers={"Content-Type": "application/json"}
)
self.kernel_id = (await response.json())["id"]
timeout = aiohttp.ClientTimeout(total=270)
self.aiohttp_session = aiohttp.ClientSession(timeout=timeout)

# Implement retry logic for kernel connection
for attempt in range(5):
try:
response = await self.aiohttp_session.post(
f"{self.kernel_url}/kernels",
headers={"Content-Type": "application/json"},
)
if response.status == 201:
self.kernel_id = (await response.json()).get("id", None)
if self.kernel_id:
break
except aiohttp.ClientError as e:
print(f"Attempt {attempt + 1}: Could not connect to kernel. Error: {e}")
await asyncio.sleep(5) # Wait for 5 seconds before retrying

if self.kernel_id is None:
raise Exception("Could not start kernel")
self.ws = await ws_connect(f"{self.ws_url}/kernels/{self.kernel_id}/channels")
raise Exception("Could not start kernel after multiple attempts")

# Connect to WebSocket with increased timeout and retry logic
for attempt in range(5):
try:
self.ws = await ws_connect(
f"{self.ws_url}/kernels/{self.kernel_id}/channels",
timeout=60,
open_timeout=60,
close_timeout=60,
)
break # Break the loop if connection is successful
except asyncio.TimeoutError as e:
print(
f"Attempt {attempt + 1}: WebSocket connection timeout. Error: {e}"
)
await asyncio.sleep(5) # Wait for 5 seconds before retrying

if not self.ws:
raise Exception("Could not connect to WebSocket after multiple attempts")

async def _acheck_port(self) -> None:
try:
Expand Down Expand Up @@ -294,44 +349,49 @@ def run(
self.start()
return self.run(code, file_path, retry - 1)

msg_header = received_msg.get("header", {})
msg_parent_header = received_msg.get("parent_header", {})
msg_content = received_msg.get("content", {})
msg_data = msg_content.get("data", {})

if (
received_msg["header"]["msg_type"] == "stream"
and received_msg["parent_header"]["msg_id"] == msg_id
msg_header["msg_type"] == "stream"
and msg_parent_header["msg_id"] == msg_id
):
msg = received_msg["content"]["text"].strip()
msg = msg_content["text"].strip()
if "Requirement already satisfied:" in msg:
continue
result += msg + "\n"
if settings.VERBOSE:
print("Output:\n", result)

elif (
received_msg["header"]["msg_type"] == "execute_result"
and received_msg["parent_header"]["msg_id"] == msg_id
msg_header["msg_type"] == "execute_result"
and msg_parent_header["msg_id"] == msg_id
):
result += received_msg["content"]["data"]["text/plain"].strip() + "\n"
result += msg_data["text/plain"].strip() + "\n"
if settings.VERBOSE:
print("Output:\n", result)

elif received_msg["header"]["msg_type"] == "display_data":
if "image/png" in received_msg["content"]["data"]:
elif msg_header["msg_type"] == "display_data":
if "image/png" in msg_data:
return CodeBoxOutput(
type="image/png",
content=received_msg["content"]["data"]["image/png"],
content=msg_data["image/png"],
)
if "text/plain" in received_msg["content"]["data"]:
if "text/plain" in msg_data:
return CodeBoxOutput(
type="text",
content=received_msg["content"]["data"]["text/plain"],
content=msg_data["text/plain"],
)
return CodeBoxOutput(
type="error",
content="Could not parse output",
)
elif (
received_msg["header"]["msg_type"] == "status"
and received_msg["parent_header"]["msg_id"] == msg_id
and received_msg["content"]["execution_state"] == "idle"
msg_header["msg_type"] == "status"
and msg_parent_header["msg_id"] == msg_id
and msg_content["execution_state"] == "idle"
):
if len(result) > 500:
result = "[...]\n" + result[-500:]
Expand All @@ -340,13 +400,10 @@ def run(
)

elif (
received_msg["header"]["msg_type"] == "error"
and received_msg["parent_header"]["msg_id"] == msg_id
msg_header["msg_type"] == "error"
and msg_parent_header["msg_id"] == msg_id
):
error = (
f"{received_msg['content']['ename']}: "
f"{received_msg['content']['evalue']}"
)
error = f"{msg_content['ename']}: " f"{msg_content['evalue']}"
if settings.VERBOSE:
print("Error:\n", error)
return CodeBoxOutput(type="error", content=error)
Expand All @@ -367,8 +424,6 @@ async def arun(
raise RuntimeError("Could not connect to kernel")
if not self.ws:
await self._aconnect()
if not self.ws:
raise RuntimeError("Jupyter not running. Make sure to start it first.")

if settings.VERBOSE:
print("Running code:\n", code)
Expand Down Expand Up @@ -406,40 +461,45 @@ async def arun(
await self.astart()
return await self.arun(code, file_path, retry - 1)

msg_header = received_msg.get("header", {})
msg_parent_header = received_msg.get("parent_header", {})
msg_content = received_msg.get("content", {})
msg_data = msg_content.get("data", {})

if (
received_msg["header"]["msg_type"] == "stream"
and received_msg["parent_header"]["msg_id"] == msg_id
msg_header["msg_type"] == "stream"
and msg_parent_header["msg_id"] == msg_id
):
msg = received_msg["content"]["text"].strip()
msg = msg_content["text"].strip()
if "Requirement already satisfied:" in msg:
continue
result += msg + "\n"
if settings.VERBOSE:
print("Output:\n", result)

elif (
received_msg["header"]["msg_type"] == "execute_result"
and received_msg["parent_header"]["msg_id"] == msg_id
msg_header["msg_type"] == "execute_result"
and msg_parent_header["msg_id"] == msg_id
):
result += received_msg["content"]["data"]["text/plain"].strip() + "\n"
result += msg_data["text/plain"].strip() + "\n"
if settings.VERBOSE:
print("Output:\n", result)

elif received_msg["header"]["msg_type"] == "display_data":
if "image/png" in received_msg["content"]["data"]:
elif msg_header["msg_type"] == "display_data":
if "image/png" in msg_data:
return CodeBoxOutput(
type="image/png",
content=received_msg["content"]["data"]["image/png"],
content=msg_data["image/png"],
)
if "text/plain" in received_msg["content"]["data"]:
if "text/plain" in msg_data:
return CodeBoxOutput(
type="text",
content=received_msg["content"]["data"]["text/plain"],
content=msg_data["text/plain"],
)
elif (
received_msg["header"]["msg_type"] == "status"
and received_msg["parent_header"]["msg_id"] == msg_id
and received_msg["content"]["execution_state"] == "idle"
msg_header["msg_type"] == "status"
and msg_parent_header["msg_id"] == msg_id
and msg_content["execution_state"] == "idle"
):
if len(result) > 500:
result = "[...]\n" + result[-500:]
Expand All @@ -448,13 +508,10 @@ async def arun(
)

elif (
received_msg["header"]["msg_type"] == "error"
and received_msg["parent_header"]["msg_id"] == msg_id
msg_header["msg_type"] == "error"
and msg_parent_header["msg_id"] == msg_id
):
error = (
f"{received_msg['content']['ename']}: "
f"{received_msg['content']['evalue']}"
)
error = f"{msg_content['ename']}: " f"{msg_content['evalue']}"
if settings.VERBOSE:
print("Error:\n", error)
return CodeBoxOutput(type="error", content=error)
Expand Down Expand Up @@ -500,18 +557,27 @@ async def alist_files(self) -> List[CodeBoxFile]:
return await asyncio.to_thread(self.list_files)

def restart(self) -> CodeBoxStatus:
# self.stop()
# self.start()
return CodeBoxStatus(status="restarted")

async def arestart(self) -> CodeBoxStatus:
# await self.astop()
# await self.astart()
return CodeBoxStatus(status="restarted")

def stop(self) -> CodeBoxStatus:
try:
if self.jupyter is not None:
self.jupyter.terminate()
self.jupyter.wait()
self.jupyter = None
time.sleep(2)
if isinstance(self.jupyter, subprocess.Popen):
self.jupyter.terminate()
self.jupyter.wait()
self.jupyter = None
time.sleep(2)
elif isinstance(self.jupyter, Process):
self.jupyter.terminate()
self.jupyter = None
time.sleep(5)
else:
for pid in self._jupyter_pids:
os.kill(pid, signal.SIGTERM)
Expand All @@ -534,8 +600,9 @@ def stop(self) -> CodeBoxStatus:
async def astop(self) -> CodeBoxStatus:
if self.jupyter is not None:
self.jupyter.terminate()
await asyncio.create_subprocess_exec("kill", "-9", str(self.jupyter.pid))
await asyncio.sleep(5)
self.jupyter = None
await asyncio.sleep(2)

if self.ws is not None:
try:
Expand Down

0 comments on commit 579ef3e

Please sign in to comment.