Skip to content

Commit

Permalink
Feat/read file (TransformerOptimus#995)
Browse files Browse the repository at this point in the history
  • Loading branch information
AdarshJha619 authored Aug 7, 2023
1 parent 53f34f4 commit 0539263
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 43 deletions.
1 change: 0 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -154,4 +154,3 @@ html2text==2020.1.16
duckduckgo-search==3.8.3
google-generativeai==0.1.0
unstructured==0.8.1
beautifulsoup4==4.12.2
29 changes: 24 additions & 5 deletions superagi/tools/file/read_file.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@

import os
from typing import Type, Optional
import ebooklib
import bs4
from bs4 import BeautifulSoup

from pydantic import BaseModel, Field
from ebooklib import epub

from superagi.helper.resource_helper import ResourceHelper
from superagi.helper.s3_helper import S3Helper
Expand All @@ -11,7 +16,7 @@
from superagi.models.agent import Agent
from superagi.types.storage_types import StorageType
from superagi.config.config import get_config

from unstructured.partition.auto import partition

class ReadFileSchema(BaseModel):
"""Input for CopyFileTool."""
Expand Down Expand Up @@ -57,8 +62,22 @@ def _execute(self, file_name: str):
raise FileNotFoundError(f"File '{file_name}' not found.")
directory = os.path.dirname(final_path)
os.makedirs(directory, exist_ok=True)

# Check if the file is an .epub file
if final_path.lower().endswith('.epub'):
# Use ebooklib to read the epub file
book = epub.read_epub(final_path)
# Get the text content from each item in the book
content = []
for item in book.get_items_of_type(ebooklib.ITEM_DOCUMENT):
soup = BeautifulSoup(item.get_content(), 'html.parser')
content.append(soup.get_text())

content = "\n".join(content)
else:
elements = partition(final_path)
content = "\n\n".join([str(el) for el in elements])

return content


with open(final_path, 'r') as file:
file_content = file.read()
max_length = len(' '.join(file_content.split(" ")[:1000]))
return file_content[:max_length] + "\n File " + file_name + " read successfully."
151 changes: 114 additions & 37 deletions tests/unit_tests/tools/file/test_read_file.py
Original file line number Diff line number Diff line change
@@ -1,49 +1,126 @@
import os
import pytest
from unittest.mock import patch, mock_open, MagicMock
import tempfile
from unittest.mock import MagicMock, patch
from superagi.tools.file.read_file import ReadFileTool

from superagi.models.agent_execution import AgentExecution
from superagi.tools.file.read_file import ReadFileTool
from superagi.models.agent import Agent

@pytest.fixture
def mock_os_path_exists():
with patch("os.path.exists") as mock_exists:
yield mock_exists

@pytest.fixture
def mock_os_makedirs():
with patch("os.makedirs") as mock_makedirs:
yield mock_makedirs

@pytest.fixture
def mock_get_config():
with patch("superagi.config.config.get_config") as mock_get_config:
yield mock_get_config


@pytest.fixture
def read_file_tool():
read_file_tool = ReadFileTool()
read_file_tool.agent_id = 1 # Set a dummy agent ID for testing.

yield read_file_tool


def test_read_file_success(read_file_tool):
# Mock the open function, and make it return a file object that has 'Hello, World!' as its contents.
mock_file = mock_open(read_data='Hello, World!')
with patch('builtins.open', mock_file), \
patch('os.path.exists', return_value=True), \
patch('os.makedirs', return_value=True), \
patch('superagi.helper.resource_helper.ResourceHelper.get_root_input_dir',
return_value="/input_dir/{agent_id}/"), \
patch('superagi.helper.resource_helper.ResourceHelper.get_root_output_dir',
return_value="/output_dir/{agent_id}/"), \
patch('superagi.models.agent.Agent.get_agent_from_id', return_value=Agent(id=1, name='TestAgent')), \
patch('superagi.models.agent_execution.AgentExecution.get_agent_execution_from_id',
return_value=
AgentExecution(id=1, name='TestExecution')):
read_file_tool.toolkit_config.session = MagicMock()
file_content = read_file_tool._execute('file.txt')

expected_content = 'Hello, World!\n File file.txt read successfully.'
assert file_content == expected_content


def test_read_file_file_not_found(read_file_tool):
with patch('os.path.exists', return_value=False), \
patch('superagi.helper.resource_helper.ResourceHelper.get_root_input_dir',
return_value="/input_dir/{agent_id}/"), \
patch('superagi.helper.resource_helper.ResourceHelper.get_root_output_dir',
return_value="/output_dir/{agent_id}/"), \
patch('superagi.models.agent.Agent.get_agent_from_id', return_value=Agent(id=1, name='TestAgent')), \
patch('superagi.models.agent_execution.AgentExecution.get_agent_execution_from_id',
return_value=AgentExecution(id=1, name='TestExecution')):
read_file_tool.toolkit_config.session = MagicMock()
with pytest.raises(FileNotFoundError):
read_file_tool._execute('file.txt')
@pytest.fixture
def mock_s3_helper():
with patch("superagi.helper.s3_helper.S3Helper") as mock_s3_helper:
yield mock_s3_helper

@pytest.fixture
def mock_partition():
with patch("unstructured.partition.auto.partition") as mock_partition:
yield mock_partition

@pytest.fixture
def mock_get_agent_from_id():
with patch("superagi.models.agent.Agent.get_agent_from_id") as mock_get_agent:
yield mock_get_agent

@pytest.fixture
def mock_get_agent_execution_from_id():
with patch("superagi.models.agent_execution.AgentExecution.get_agent_execution_from_id") as mock_execution:
yield mock_execution
@pytest.fixture
def mock_resource_helper():
with patch("superagi.helper.resource_helper.ResourceHelper.get_agent_read_resource_path") as mock_resource_helper:
yield mock_resource_helper

def test_read_file_tool(mock_os_path_exists, mock_os_makedirs, mock_get_config, mock_s3_helper, mock_partition,
mock_get_agent_from_id, mock_get_agent_execution_from_id, mock_resource_helper):
mock_os_path_exists.return_value = True
mock_partition.return_value = ["This is a file.", "This is the second line."]
mock_get_config.return_value = "FILE"
mock_get_agent_from_id.return_value = MagicMock()
mock_get_agent_execution_from_id.return_value = MagicMock()

tool = ReadFileTool()

with tempfile.NamedTemporaryFile('w', delete=False, suffix='.txt') as tmp:
tmp.write("This is a file.\nThis is the second line.")
tmp.seek(0) # Reset file pointer to the beginning
tmp.close() # Explicitly close the file

mock_resource_helper.return_value = tmp.name

try:
result = tool._execute(tmp.name)
assert isinstance(result, str)
assert "This is a file." in result
assert "This is the second line." in result
finally:
os.remove(tmp.name) # Ensure the temporary file is deleted

def test_read_file_tool_s3(mock_os_path_exists, mock_os_makedirs, mock_get_config, mock_s3_helper, mock_partition,
mock_get_agent_from_id, mock_get_agent_execution_from_id, mock_resource_helper):
mock_os_path_exists.return_value = True
mock_get_config.return_value = "S3" # ensure this function returns "S3"
mock_get_agent_from_id.return_value = MagicMock()
mock_get_agent_execution_from_id.return_value = MagicMock()

tool = ReadFileTool()

with tempfile.NamedTemporaryFile('w', delete=False, suffix='.txt') as tmp:
tmp.write("This is a file.\nThis is the second line.")
tmp.seek(0) # Reset file pointer to the beginning
tmp.close() # Explicitly close the file

mock_resource_helper.return_value = tmp.name
mock_s3_helper.return_value.read_from_s3.return_value = open(tmp.name, 'r').read()

try:
result = tool._execute(tmp.name)
assert isinstance(result, str)
assert "This is a file." in result
assert "This is the second line." in result
finally:
os.remove(tmp.name) # Ensure the temporary file is deleted


def test_read_file_tool_not_found(mock_os_path_exists, mock_os_makedirs, mock_get_config, mock_s3_helper, mock_partition,
mock_get_agent_from_id, mock_get_agent_execution_from_id, mock_resource_helper):
mock_os_path_exists.return_value = False
mock_get_agent_from_id.return_value = MagicMock()
mock_get_agent_execution_from_id.return_value = MagicMock()

tool = ReadFileTool()

with tempfile.NamedTemporaryFile('w', delete=False, suffix='.txt') as tmp:
tmp.write("This is a file.\nThis is the second line.")
tmp.seek(0) # Reset file pointer to the beginning
tmp.close() # Explicitly close the file

try:
with pytest.raises(FileNotFoundError):
tool._execute(tmp.name)
finally:
os.remove(tmp.name) # Ensure the temporary file is deleted


0 comments on commit 0539263

Please sign in to comment.