Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Apply obstore as storage backend #3033

Open
wants to merge 21 commits into
base: master
Choose a base branch
from

Conversation

machichima
Copy link
Contributor

@machichima machichima commented Jan 4, 2025

Tracking issue

Related to flyteorg/flyte#4081

Why are the changes needed?

Use a Rust/Pyo3 package - obstore - as the storage backend for cloud storages. This provides the smaller dependencies size and enable users to use their own s3fs, gsfs, abfs, ... version.

What changes were proposed in this pull request?

Use obstore as the storage backend to replace s3fs, gsfs, and abfs.

How was this patch tested?

Setup process

Screenshots

Performance

  • put file to minio

put_file_runtime

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Related PRs

Docs link

Summary by Bito

This pull request introduces obstore as the new storage backend for Flytekit, replacing existing implementations for S3, GCS, and Azure. It includes enhancements in data persistence, updates to dependencies, and improved unit tests to ensure correctness and compatibility.

Unit tests added: True

Estimated effort to review (1-5, lower is better): 5

@flyte-bot
Copy link
Contributor

flyte-bot commented Jan 4, 2025

Code Review Agent Run #39883a

Actionable Suggestions - 7
  • plugins/flytekit-spark/flytekitplugins/spark/models.py - 2
    • Missing pod parameters in with_overrides method · Line 79-80
    • Consider adding null validation checks · Line 193-194
  • flytekit/core/data_persistence.py - 5
Additional Suggestions - 3
  • flytekit/core/data_persistence.py - 2
    • Consider optimizing bucket extraction timing · Line 521-522
    • Consider combining empty dict initializations · Line 59-60
  • plugins/flytekit-spark/tests/test_spark_task.py - 1
Review Details
  • Files reviewed - 5 · Commit Range: 64c6c79..0187150
    • Dockerfile.dev
    • flytekit/core/data_persistence.py
    • plugins/flytekit-spark/flytekitplugins/spark/models.py
    • plugins/flytekit-spark/flytekitplugins/spark/task.py
    • plugins/flytekit-spark/tests/test_spark_task.py
  • Files skipped - 0
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo

@flyte-bot
Copy link
Contributor

flyte-bot commented Jan 4, 2025

Changelist by Bito

This pull request implements the following key changes.

Key Change Files Impacted
New Feature - Integration of obstore as Storage Backend

obstore_filesystem.py - Introduced new classes for obstore filesystem integration.

Feature Improvement - Enhancements in Data Persistence

data_persistence.py - Replaced s3fs, gsfs, and abfs with obstore for cloud storage operations.

Other Improvements - Dependency Update

Dockerfile.dev - Updated obstore dependency to version 0.3.0b9.

pyproject.toml - Updated obstore dependency to version 0.3.0b10.

Testing - Unit Test Enhancements

test_data.py - Enhanced tests to accommodate obstore integration.

Testing - Unit Test Enhancements for obstore Integration

test_data.py - Refactored tests to accommodate obstore integration and improved test structure.

test_data_persistence.py - Enhanced tests with mock patches for Azure file provider initialization.

test_flyte_directory.py - Updated mock patches to reflect changes in obstore S3 filesystem.

New Feature - Integration of obstore as Storage Backend

obstore_filesystem.py - Introduced new classes for obstore filesystem integration.

Feature Improvement - Enhancements in Data Persistence

data_persistence.py - Replaced s3fs, gsfs, and abfs with obstore for cloud storage operations.

Other Improvements - Dependency Update

Dockerfile.dev - Updated obstore dependency to version 0.3.0b9.

pyproject.toml - Updated obstore dependency to version 0.3.0b10.

Comment on lines 79 to 80
driver_pod=self.driver_pod,
executor_pod=self.executor_pod,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing pod parameters in with_overrides method

Consider adding driver_pod and executor_pod to the with_overrides method to maintain consistency with the constructor parameters.

Code suggestion
Check the AI-generated fix before applying
 @@ -56,6 +56,8 @@ def with_overrides(
          new_spark_conf: Optional[Dict[str, str]] = None,
          new_hadoop_conf: Optional[Dict[str, str]] = None,
          new_databricks_conf: Optional[Dict[str, Dict]] = None,
 +        driver_pod: Optional[K8sPod] = None,
 +        executor_pod: Optional[K8sPod] = None,
      ) -> "SparkJob":
          if not new_spark_conf:
              new_spark_conf = self.spark_conf
 @@ -65,6 +67,12 @@ def with_overrides(
          if not new_databricks_conf:
              new_databricks_conf = self.databricks_conf
 
 +        if not driver_pod:
 +            driver_pod = self.driver_pod
 +
 +        if not executor_pod:
 +            executor_pod = self.executor_pod
 +
          return SparkJob(
              spark_type=self.spark_type,
              application_file=self.application_file,

Code Review Run #39883a


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Comment on lines 193 to 194
driverPod=self.driver_pod.to_flyte_idl() if self.driver_pod else None,
executorPod=self.executor_pod.to_flyte_idl() if self.executor_pod else None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding null validation checks

Consider adding null checks for to_flyte_idl() calls on driver_pod and executor_pod to avoid potential NoneType errors.

Code suggestion
Check the AI-generated fix before applying
Suggested change
driverPod=self.driver_pod.to_flyte_idl() if self.driver_pod else None,
executorPod=self.executor_pod.to_flyte_idl() if self.executor_pod else None,
driverPod=self.driver_pod.to_flyte_idl() if self.driver_pod and hasattr(self.driver_pod, 'to_flyte_idl') else None,
executorPod=self.executor_pod.to_flyte_idl() if self.executor_pod and hasattr(self.executor_pod, 'to_flyte_idl') else None,

Code Review Run #39883a


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Comment on lines 119 to 121
if "file" in path:
# no bucket for file
return "", path
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improve file protocol detection precision

The condition if "file" in path may match paths containing 'file' anywhere in the string, not just the protocol. Consider using if get_protocol(path) == "file" for more precise protocol checking.

Code suggestion
Check the AI-generated fix before applying
Suggested change
if "file" in path:
# no bucket for file
return "", path
if get_protocol(path) == "file":
# no bucket for file
return "", path

Code Review Run #39883a


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Comment on lines 136 to 141
support_types = ["s3", "gs", "abfs"]
if protocol in support_types:
file_path = "/".join(path_li[1:])
return (bucket, file_path)
else:
return bucket, path
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider moving storage types to constant

The list of supported storage types support_types = ['s3', 'gs', 'abfs'] could be defined as a module-level constant since it's used for validation. Consider moving it outside the function to improve maintainability.

Code suggestion
Check the AI-generated fix before applying
 @@ -53,1 +53,2 @@
  _ANON = "anon"
 +SUPPORTED_STORAGE_TYPES = ["s3", "gs", "abfs"]
 @@ -136,2 +136,1 @@
 -        support_types = ["s3", "gs", "abfs"]
 -        if protocol in support_types:
 +        if protocol in SUPPORTED_STORAGE_TYPES:

Code Review Run #39883a


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

kwargs["store"] = store

if anonymous:
kwargs[_ANON] = True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using anonymous parameter for _ANON

Consider using kwargs[_ANON] = anonymous instead of hardcoding True to maintain consistency with the input parameter value.

Code suggestion
Check the AI-generated fix before applying
Suggested change
kwargs[_ANON] = True
kwargs[_ANON] = anonymous

Code Review Run #39883a


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Comment on lines +433 to +434
bucket, to_path_file_only = split_path(to_path)
file_system = await self.get_async_filesystem_for_path(to_path, bucket)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider validating bucket before filesystem call

Consider validating the bucket parameter before passing it to get_async_filesystem_for_path(). An empty bucket could cause issues with certain storage backends. Similar issues were also found in:

  • flytekit/core/data_persistence.py (line 318)
  • flytekit/core/data_persistence.py (line 521)
  • flytekit/core/data_persistence.py (line 308)
Code suggestion
Check the AI-generated fix before applying
Suggested change
bucket, to_path_file_only = split_path(to_path)
file_system = await self.get_async_filesystem_for_path(to_path, bucket)
bucket, to_path_file_only = split_path(to_path)
protocol = get_protocol(to_path)
if protocol in ['s3', 'gs', 'abfs'] and not bucket:
raise ValueError(f'Bucket cannot be empty for {protocol} protocol')
file_system = await self.get_async_filesystem_for_path(to_path, bucket)

Code Review Run #39883a


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

@flyte-bot
Copy link
Contributor

flyte-bot commented Jan 4, 2025

Code Review Agent Run #8926b7

Actionable Suggestions - 0
Review Details
  • Files reviewed - 1 · Commit Range: 0187150..7c76cc6
    • pyproject.toml
  • Files skipped - 0
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo

successfully run it on local, not yet tested remote

Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
@flyte-bot
Copy link
Contributor

flyte-bot commented Jan 5, 2025

Code Review Agent Run #0b7f4d

Actionable Suggestions - 4
  • flytekit/core/data_persistence.py - 4
Additional Suggestions - 1
  • flytekit/core/data_persistence.py - 1
    • Consider combining dictionary initializations · Line 59-60
Review Details
  • Files reviewed - 3 · Commit Range: 58ba73c..353f000
    • Dockerfile.dev
    • flytekit/core/data_persistence.py
    • pyproject.toml
  • Files skipped - 0
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo

Comment on lines +433 to +434
bucket, to_path_file_only = split_path(to_path)
file_system = await self.get_async_filesystem_for_path(to_path, bucket)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider extracting path splitting logic

Consider extracting the bucket and path splitting logic into a separate method to improve code reusability and maintainability. The split_path function is used in multiple places and could be encapsulated better.

Code suggestion
Check the AI-generated fix before applying
Suggested change
bucket, to_path_file_only = split_path(to_path)
file_system = await self.get_async_filesystem_for_path(to_path, bucket)
bucket, path = self._split_and_get_bucket_path(to_path)
file_system = await self.get_async_filesystem_for_path(to_path, bucket)

Code Review Run #0b7f4d


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Comment on lines +391 to +392
bucket, from_path_file_only = split_path(from_path)
file_system = await self.get_async_filesystem_for_path(from_path, bucket)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handle empty bucket case for storage

Consider handling the case where split_path() returns empty bucket for non-file protocols. Currently passing empty bucket to get_async_filesystem_for_path() could cause issues with cloud storage access.

Code suggestion
Check the AI-generated fix before applying
Suggested change
bucket, from_path_file_only = split_path(from_path)
file_system = await self.get_async_filesystem_for_path(from_path, bucket)
bucket, from_path_file_only = split_path(from_path)
protocol = get_protocol(from_path)
if protocol not in ['file'] and not bucket:
raise ValueError(f'Empty bucket not allowed for protocol {protocol}')
file_system = await self.get_async_filesystem_for_path(from_path, bucket)

Code Review Run #0b7f4d


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Comment on lines 735 to 737
fsspec.register_implementation("s3", AsyncFsspecStore)
fsspec.register_implementation("gs", AsyncFsspecStore)
fsspec.register_implementation("abfs", AsyncFsspecStore)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider relocating fsspec implementation registrations

Consider moving the fsspec implementation registrations to a more appropriate initialization location, such as a module-level __init__.py or a dedicated setup function. This would improve code organization and make the registrations more discoverable.

Code suggestion
Check the AI-generated fix before applying
Suggested change
fsspec.register_implementation("s3", AsyncFsspecStore)
fsspec.register_implementation("gs", AsyncFsspecStore)
fsspec.register_implementation("abfs", AsyncFsspecStore)
def register_fsspec_implementations():
fsspec.register_implementation("s3", AsyncFsspecStore)
fsspec.register_implementation("gs", AsyncFsspecStore)
fsspec.register_implementation("abfs", AsyncFsspecStore)
# Call during module initialization
register_fsspec_implementations()

Code Review Run #0b7f4d


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Specify the class properties for each file storage

Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
@flyte-bot
Copy link
Contributor

flyte-bot commented Jan 11, 2025

Code Review Agent Run #2ba550

Actionable Suggestions - 4
  • tests/flytekit/unit/core/test_data.py - 1
    • Consider retaining AWS config test parameters · Line 312-312
  • flytekit/core/data_persistence.py - 3
    • Consider consolidating store creation functions · Line 61-85
    • Consider preserving anonymous access functionality · Line 74-76
    • Consider explicit parameters over kwargs unpacking · Line 179-179
Additional Suggestions - 2
  • tests/flytekit/unit/core/test_data.py - 2
    • Consider if storage options test is complete · Line 338-338
    • Consider updating mock patch path consistently · Line 241-241
Review Details
  • Files reviewed - 4 · Commit Range: 9c7e8db..9f5daf0
    • flytekit/core/data_persistence.py
    • flytekit/core/obstore_filesystem.py
    • tests/flytekit/unit/core/test_data.py
    • tests/flytekit/unit/core/test_data_persistence.py
  • Files skipped - 0
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo

assert kwargs == {"cache_regions": True}

mock_from_env.return_value = mock.Mock()
mock_from_env.assert_called_with("")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider retaining AWS config test parameters

The mock_from_env assertion appears to be missing configuration parameters that were previously set for AWS HTTP connections and virtual hosted style requests. This could affect test coverage of S3 configuration behavior.

Code suggestion
Check the AI-generated fix before applying
Suggested change
mock_from_env.assert_called_with("")
mock_from_env.assert_called_with(
"",
config={
"aws_allow_http": "true",
"aws_virtual_hosted_style_request": "false",
},
)

Code Review Run #2ba550


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Comment on lines 61 to 92
def s3store_from_env(bucket: str, retries: int, **store_kwargs) -> S3Store:
store = S3Store.from_env(
bucket,
config={
**store_kwargs,
"aws_allow_http": "true", # Allow HTTP connections
"aws_virtual_hosted_style_request": "false", # Use path-style addressing
},
)
return store


@lru_cache
def gcsstore_from_env(bucket: str) -> GCSStore:
store = GCSStore.from_env(bucket)
return store


@lru_cache
def azurestore_from_env(container: str, **store_kwargs) -> AzureStore:
store = AzureStore.from_env(
container,
config=store_kwargs,
)
return store
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider consolidating store creation functions

Consider consolidating the store creation functions into a single factory function to reduce code duplication. The current implementation has similar patterns repeated across s3store_from_env, gcsstore_from_env, and azurestore_from_env.

Code suggestion
Check the AI-generated fix before applying
 -@lru_cache
 -def s3store_from_env(bucket: str, retries: int, **store_kwargs) -> S3Store:
 -    store = S3Store.from_env(
 -        bucket,
 -        config={
 -            **store_kwargs,
 -            "aws_allow_http": "true",
 -            "aws_virtual_hosted_style_request": "false",
 -        },
 -    )
 -    return store
 -
 -@lru_cache
 -def gcsstore_from_env(bucket: str) -> GCSStore:
 -    store = GCSStore.from_env(bucket)
 -    return store
 -
 -@lru_cache
 -def azurestore_from_env(container: str, **store_kwargs) -> AzureStore:
 -    store = AzureStore.from_env(
 -        container,
 -        config=store_kwargs,
 -    )
 -    return store
 +@lru_cache
 +def create_store(store_type: str, container: str, **kwargs) -> Union[S3Store, GCSStore, AzureStore]:
 +    if store_type == "s3":
 +        return S3Store.from_env(container, config={**kwargs, "aws_allow_http": "true", "aws_virtual_hosted_style_request": "false"})
 +    elif store_type == "gcs":
 +        return GCSStore.from_env(container)
 +    elif store_type == "azure":
 +        return AzureStore.from_env(container, config=kwargs)
 +    raise ValueError(f"Unsupported store type: {store_type}")

Code Review Run #2ba550


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Comment on lines +74 to +83
def gcsstore_from_env(bucket: str) -> GCSStore:
store = GCSStore.from_env(bucket)
return store
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider preserving anonymous access functionality

Consider passing the anonymous parameter to gcsstore_from_env() since it was previously used to set the token to _ANON when anonymous=True

Code suggestion
Check the AI-generated fix before applying
Suggested change
def gcsstore_from_env(bucket: str) -> GCSStore:
store = GCSStore.from_env(bucket)
return store
def gcsstore_from_env(bucket: str, anonymous: bool = False) -> GCSStore:
store = GCSStore.from_env(bucket)
if anonymous:
store.token = _ANON
return store

Code Review Run #2ba550


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

if anonymous:
kwargs[_ANON] = "true"

store = azurestore_from_env(container, **store_kwargs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider explicit parameters over kwargs unpacking

Consider using direct function call parameters instead of unpacking store_kwargs to improve code readability and maintainability. The function call could be more explicit about what parameters are being passed.

Code suggestion
Check the AI-generated fix before applying
 -    store_kwargs: Dict[str, Any] = {}
 -    if azure_cfg.account_name:
 -        store_kwargs["account_name"] = azure_cfg.account_name
 -    if azure_cfg.account_key:
 -        store_kwargs["account_key"] = azure_cfg.account_key
 -    if azure_cfg.client_id:
 -        store_kwargs["client_id"] = azure_cfg.client_id
 -    if azure_cfg.client_secret:
 -        store_kwargs["client_secret"] = azure_cfg.client_secret
 -    if azure_cfg.tenant_id:
 -        store_kwargs["tenant_id"] = azure_cfg.tenant_id
 -    store = azurestore_from_env(container, **store_kwargs)
 +    store = azurestore_from_env(
 +        container,
 +        account_name=azure_cfg.account_name,
 +        account_key=azure_cfg.account_key,
 +        client_id=azure_cfg.client_id,
 +        client_secret=azure_cfg.client_secret,
 +        tenant_id=azure_cfg.tenant_id
 +    )

Code Review Run #2ba550


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
@flyte-bot
Copy link
Contributor

flyte-bot commented Jan 12, 2025

Code Review Agent Run #ab65d8

Actionable Suggestions - 6
  • flytekit/core/data_persistence.py - 4
    • Potential hardcoded secret variable name · Line 61-61
    • Potential hardcoded secret variable name · Line 61-61
    • Remove duplicate kwargs parameter passing · Line 215-216
    • Consider separate handling for different exceptions · Line 447-447
  • flytekit/core/obstore_filesystem.py - 2
    • Consider validating additional connection parameters · Line 25-36
    • Missing parentheses in protocol tuple definition · Line 47-47
Additional Suggestions - 5
  • flytekit/core/data_persistence.py - 2
    • Consider caching filesystem instance for bucket · Line 571-571
    • Remove duplicate kwargs parameter passing · Line 207-208
  • tests/flytekit/unit/core/test_data.py - 3
    • Consider single line provider initialization · Line 549-551
    • Consider single line function signature · Line 572-574
    • Consider single line constructor initialization · Line 599-601
Review Details
  • Files reviewed - 7 · Commit Range: 58ba73c..749a7fe
    • Dockerfile.dev
    • flytekit/core/data_persistence.py
    • flytekit/core/obstore_filesystem.py
    • pyproject.toml
    • tests/flytekit/unit/core/test_data.py
    • tests/flytekit/unit/core/test_data_persistence.py
    • tests/flytekit/unit/core/test_flyte_directory.py
  • Files skipped - 0
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo

_FSSPEC_S3_SECRET = "secret"
_ANON = "anon"
_FSSPEC_S3_KEY_ID = "access_key_id"
_FSSPEC_S3_SECRET = "secret_access_key"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential hardcoded secret variable name

The variable name '_FSSPEC_S3_SECRET' suggests a hardcoded password/secret. Consider using a more secure method of handling secrets.

Code suggestion
Check the AI-generated fix before applying
Suggested change
_FSSPEC_S3_SECRET = "secret_access_key"
_FSSPEC_S3_ACCESS_KEY = "secret_access_key"

Code Review Run #ab65d8


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Comment on lines 25 to 36
def __init__(self, retries: Optional[int] = None, **kwargs):
"""
Initialize the ObstoreS3FileSystem with optional retries.

Args:
retries (int): Number of retry for requests
**kwargs: Other keyword arguments passed to the parent class
"""
if retries is not None:
self.retries = retries

super().__init__(**kwargs)
Copy link
Contributor

@flyte-bot flyte-bot Jan 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider validating additional connection parameters

The __init__ method only handles retries parameter but ignores validation of other important parameters like connect_timeout and read_timeout

Code suggestion
Check the AI-generated fix before applying
 @@ -25,12 +25,18 @@
     def __init__(self, retries: Optional[int] = None, **kwargs):
         if retries is not None:
             self.retries = retries
 +        connect_timeout = kwargs.get('connect_timeout')
 +        if connect_timeout is not None:
 +            self.connect_timeout = connect_timeout
 +        read_timeout = kwargs.get('read_timeout')
 +        if read_timeout is not None:
 +            self.read_timeout = read_timeout
         super().__init__(**kwargs)

Code Review Run #ab65d8

Consider consolidating duplicate blocksize property

Consider consolidating the blocksize property definition since it appears to be duplicated across multiple classes with the same value. Could potentially move this to a common base class or mixin.

Code suggestion
Check the AI-generated fix before applying
 @@ -9,28 +9,33 @@
 +class BaseObstoreFileSystem(AsyncFsspecStore):
 +    blocksize = DEFAULT_BLOCK_SIZE
 +
 -class ObstoreS3FileSystem(AsyncFsspecStore):
 +class ObstoreS3FileSystem(BaseObstoreFileSystem):
      """
      Add following property used in S3FileSystem
      """
      root_marker = ""
 -    blocksize = DEFAULT_BLOCK_SIZE
      protocol = ("s3", "s3a")
      _extra_tokenize_attributes = ("default_block_size",)
 
 -class ObstoreGCSFileSystem(AsyncFsspecStore):
 +class ObstoreGCSFileSystem(BaseObstoreFileSystem):
      """
      Add following property used in GCSFileSystem
      """
      scopes = {"read_only", "read_write", "full_control"}
 -    blocksize = DEFAULT_BLOCK_SIZE
      protocol = "gcs", "gs"

Code Review Run #29efcc


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

scopes = {"read_only", "read_write", "full_control"}
retries = 6 # number of retries on http failure
default_block_size = DEFAULT_BLOCK_SIZE
protocol = "gcs", "gs"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing parentheses in protocol tuple definition

The protocol tuple definition is missing parentheses which could lead to incorrect protocol handling. Consider adding parentheses: protocol = ("gcs", "gs")

Code suggestion
Check the AI-generated fix before applying
Suggested change
protocol = "gcs", "gs"
protocol = ("gcs", "gs")

Code Review Run #ab65d8


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Comment on lines +215 to +216
**kwargs,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove duplicate kwargs parameter passing

Consider removing the duplicate **kwargs in the return statement since it's already included in azure_setup_args()

Code suggestion
Check the AI-generated fix before applying
Suggested change
**kwargs,
}
}

Code Review Run #ab65d8


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

else:
dst = file_system.get(from_path, to_path, recursive=recursive, **kwargs)
if isinstance(dst, (str, pathlib.Path)):
return dst
return to_path
except OSError as oe:
except (OSError, GenericError) as oe:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider separate handling for different exceptions

Consider handling GenericError separately from OSError since they may require different error handling approaches. The current implementation treats them the same way which could mask important error details.

Code suggestion
Check the AI-generated fix before applying
Suggested change
except (OSError, GenericError) as oe:
except OSError as oe:
logger.debug(f"Error in getting {from_path} to {to_path} rec {recursive} {oe}")
await self._handle_get_error(file_system, from_path, to_path, recursive, oe, **kwargs)
except GenericError as ge:
logger.debug(f"Error in getting {from_path} to {to_path} rec {recursive} {ge}")
exists = True # Force anonymous filesystem retry for GenericError

Code Review Run #ab65d8


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Signed-off-by: machichima <nary12321@gmail.com>
@flyte-bot
Copy link
Contributor

flyte-bot commented Jan 15, 2025

Code Review Agent Run #29efcc

Actionable Suggestions - 1
  • flytekit/core/obstore_filesystem.py - 1
    • Consider consolidating duplicate blocksize property · Line 25-36
Review Details
  • Files reviewed - 2 · Commit Range: 749a7fe..31d8880
    • flytekit/core/data_persistence.py
    • flytekit/core/obstore_filesystem.py
  • Files skipped - 0
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo

@flyte-bot
Copy link
Contributor

flyte-bot commented Jan 17, 2025

Code Review Agent Run #77d6ec

Actionable Suggestions - 0
Review Details
  • Files reviewed - 2 · Commit Range: 31d8880..31015e0
    • flytekit/core/data_persistence.py
    • pyproject.toml
  • Files skipped - 0
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo

@flyte-bot
Copy link
Contributor

flyte-bot commented Jan 17, 2025

Code Review Agent Run #1a212c

Actionable Suggestions - 1
  • flytekit/core/data_persistence.py - 1
    • Consider configurable timeout for S3 operations · Line 77-77
Review Details
  • Files reviewed - 1 · Commit Range: 31015e0..42d400c
    • flytekit/core/data_persistence.py
  • Files skipped - 0
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo

"aws_allow_http": "true", # Allow HTTP connections
"aws_virtual_hosted_style_request": "false", # Use path-style addressing
},
client_options={"timeout": "999s"}, # need to put this to somewhere for user to config?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider configurable timeout for S3 operations

The hardcoded timeout value of 999s in client_options may be too short for large data transfers. Consider making this configurable through environment variables or configuration settings.

Code suggestion
Check the AI-generated fix before applying
Suggested change
client_options={"timeout": "999s"}, # need to put this to somewhere for user to config?
client_options={"timeout": os.getenv("S3_CLIENT_TIMEOUT", "999s")},

Code Review Run #1a212c


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Signed-off-by: machichima <nary12321@gmail.com>
@flyte-bot
Copy link
Contributor

flyte-bot commented Jan 23, 2025

Code Review Agent Run #52212e

Actionable Suggestions - 7
  • flytekit/core/obstore_filesystem.py - 2
  • tests/flytekit/unit/core/test_data.py - 2
  • pyproject.toml - 1
    • Consider version range for obstore · Line 42-42
  • flytekit/core/data_persistence.py - 2
Additional Suggestions - 3
  • tests/flytekit/unit/core/test_data.py - 1
  • flytekit/core/obstore_filesystem.py - 1
  • tests/flytekit/unit/core/test_data_persistence.py - 1
Review Details
  • Files reviewed - 7 · Commit Range: 58ba73c..9645dc7
    • Dockerfile.dev
    • flytekit/core/data_persistence.py
    • flytekit/core/obstore_filesystem.py
    • pyproject.toml
    • tests/flytekit/unit/core/test_data.py
    • tests/flytekit/unit/core/test_data_persistence.py
    • tests/flytekit/unit/core/test_flyte_directory.py
  • Files skipped - 0
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo


scopes = {"read_only", "read_write", "full_control"}
blocksize = DEFAULT_BLOCK_SIZE
protocol = "gcs", "gs"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure protocol is a tuple

The protocol assignment for ObstoreGCSFileSystem should be enclosed in parentheses to ensure it is a tuple.

Code suggestion
Check the AI-generated fix before applying
Suggested change
protocol = "gcs", "gs"
protocol = ("gcs", "gs")

Code Review Run #52212e


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

import os
import random
import shutil
import tempfile
from uuid import UUID
import typing
import asyncio
from botocore.parsers import base64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorrect import statement

The import statement from botocore.parsers import base64 seems incorrect. The base64 module is not part of botocore.parsers. Consider using Python's built-in base64 module instead.

Code suggestion
Check the AI-generated fix before applying
Suggested change
from botocore.parsers import base64
import base64

Code Review Run #52212e


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

storage_options = get_fsspec_storage_options(
"gs", DataConfig.auto(), anonymous=True, other_argument="value"
)
assert storage_options == {"other_argument": "value"}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assertion missing expected token key

The assertion in the test case for GCS storage options should include the token key with the value anon to ensure the anonymous parameter is correctly handled.

Code suggestion
Check the AI-generated fix before applying
Suggested change
assert storage_options == {"other_argument": "value"}
assert storage_options == {"token": "anon", "other_argument": "value"}

Code Review Run #52212e


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

@@ -39,6 +39,7 @@ dependencies = [
"marshmallow-jsonschema>=0.12.0",
"mashumaro>=3.15",
"msgpack>=1.1.0",
"obstore==0.3.0b10",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider version range for obstore

The version pinning of obstore to 0.3.0b10 might limit flexibility. Consider using a version range to allow for updates.

Code suggestion
Check the AI-generated fix before applying
Suggested change
"obstore==0.3.0b10",
"obstore>=0.3.0b10,<0.4.0",

Code Review Run #52212e


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

if azure_cfg.tenant_id:
kwargs["tenant_id"] = azure_cfg.tenant_id
kwargs[_ANON] = anonymous
store = gcsstore_from_env(bucket)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add validation for bucket parameter

The function gcsstore_from_env is called without checking if the bucket parameter is valid or non-empty. Consider adding validation for the bucket parameter.

Code suggestion
Check the AI-generated fix before applying
 @@ -132,0 +132,2 @@
 +    if not bucket:
 +        raise ValueError("Bucket name must be provided")

Code Review Run #52212e


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Comment on lines +793 to +795
fsspec.register_implementation("s3", ObstoreS3FileSystem)
fsspec.register_implementation("gs", ObstoreGCSFileSystem)
fsspec.register_implementation("abfs", ObstoreAzureBlobFileSystem)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add error handling for registration

Consider adding error handling for the fsspec.register_implementation calls to ensure that any issues during registration are caught and handled appropriately.

Code suggestion
Check the AI-generated fix before applying
Suggested change
fsspec.register_implementation("s3", ObstoreS3FileSystem)
fsspec.register_implementation("gs", ObstoreGCSFileSystem)
fsspec.register_implementation("abfs", ObstoreAzureBlobFileSystem)
fsspec.register_implementation("s3", ObstoreS3FileSystem)
fsspec.register_implementation("gs", ObstoreGCSFileSystem)
fsspec.register_implementation("abfs", ObstoreAzureBlobFileSystem)
try:
fsspec.register_implementation("s3", ObstoreS3FileSystem)
except Exception as e:
logger.error(f"Failed to register S3 implementation: {e}")
try:
fsspec.register_implementation("gs", ObstoreGCSFileSystem)
except Exception as e:
logger.error(f"Failed to register GCS implementation: {e}")
try:
fsspec.register_implementation("abfs", ObstoreAzureBlobFileSystem)
except Exception as e:
logger.error(f"Failed to register Azure Blob implementation: {e}")

Code Review Run #52212e


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: In progress
Development

Successfully merging this pull request may close these issues.

4 participants