-
Notifications
You must be signed in to change notification settings - Fork 307
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Apply obstore as storage backend #3033
base: master
Are you sure you want to change the base?
Conversation
Code Review Agent Run #39883aActionable Suggestions - 7
Additional Suggestions - 3
Review Details
|
Changelist by BitoThis pull request implements the following key changes.
|
driver_pod=self.driver_pod, | ||
executor_pod=self.executor_pod, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding driver_pod
and executor_pod
to the with_overrides
method to maintain consistency with the constructor parameters.
Code suggestion
Check the AI-generated fix before applying
@@ -56,6 +56,8 @@ def with_overrides(
new_spark_conf: Optional[Dict[str, str]] = None,
new_hadoop_conf: Optional[Dict[str, str]] = None,
new_databricks_conf: Optional[Dict[str, Dict]] = None,
+ driver_pod: Optional[K8sPod] = None,
+ executor_pod: Optional[K8sPod] = None,
) -> "SparkJob":
if not new_spark_conf:
new_spark_conf = self.spark_conf
@@ -65,6 +67,12 @@ def with_overrides(
if not new_databricks_conf:
new_databricks_conf = self.databricks_conf
+ if not driver_pod:
+ driver_pod = self.driver_pod
+
+ if not executor_pod:
+ executor_pod = self.executor_pod
+
return SparkJob(
spark_type=self.spark_type,
application_file=self.application_file,
Code Review Run #39883a
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
driverPod=self.driver_pod.to_flyte_idl() if self.driver_pod else None, | ||
executorPod=self.executor_pod.to_flyte_idl() if self.executor_pod else None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding null checks for to_flyte_idl()
calls on driver_pod
and executor_pod
to avoid potential NoneType errors.
Code suggestion
Check the AI-generated fix before applying
driverPod=self.driver_pod.to_flyte_idl() if self.driver_pod else None, | |
executorPod=self.executor_pod.to_flyte_idl() if self.executor_pod else None, | |
driverPod=self.driver_pod.to_flyte_idl() if self.driver_pod and hasattr(self.driver_pod, 'to_flyte_idl') else None, | |
executorPod=self.executor_pod.to_flyte_idl() if self.executor_pod and hasattr(self.executor_pod, 'to_flyte_idl') else None, |
Code Review Run #39883a
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
flytekit/core/data_persistence.py
Outdated
if "file" in path: | ||
# no bucket for file | ||
return "", path |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The condition if "file" in path
may match paths containing 'file' anywhere in the string, not just the protocol. Consider using if get_protocol(path) == "file"
for more precise protocol checking.
Code suggestion
Check the AI-generated fix before applying
if "file" in path: | |
# no bucket for file | |
return "", path | |
if get_protocol(path) == "file": | |
# no bucket for file | |
return "", path |
Code Review Run #39883a
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
flytekit/core/data_persistence.py
Outdated
support_types = ["s3", "gs", "abfs"] | ||
if protocol in support_types: | ||
file_path = "/".join(path_li[1:]) | ||
return (bucket, file_path) | ||
else: | ||
return bucket, path |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The list of supported storage types support_types = ['s3', 'gs', 'abfs']
could be defined as a module-level constant since it's used for validation. Consider moving it outside the function to improve maintainability.
Code suggestion
Check the AI-generated fix before applying
@@ -53,1 +53,2 @@
_ANON = "anon"
+SUPPORTED_STORAGE_TYPES = ["s3", "gs", "abfs"]
@@ -136,2 +136,1 @@
- support_types = ["s3", "gs", "abfs"]
- if protocol in support_types:
+ if protocol in SUPPORTED_STORAGE_TYPES:
Code Review Run #39883a
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
flytekit/core/data_persistence.py
Outdated
kwargs["store"] = store | ||
|
||
if anonymous: | ||
kwargs[_ANON] = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using kwargs[_ANON] = anonymous
instead of hardcoding True
to maintain consistency with the input parameter value.
Code suggestion
Check the AI-generated fix before applying
kwargs[_ANON] = True | |
kwargs[_ANON] = anonymous |
Code Review Run #39883a
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
bucket, to_path_file_only = split_path(to_path) | ||
file_system = await self.get_async_filesystem_for_path(to_path, bucket) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider validating the bucket
parameter before passing it to get_async_filesystem_for_path()
. An empty bucket could cause issues with certain storage backends. Similar issues were also found in:
- flytekit/core/data_persistence.py (line 318)
- flytekit/core/data_persistence.py (line 521)
- flytekit/core/data_persistence.py (line 308)
Code suggestion
Check the AI-generated fix before applying
bucket, to_path_file_only = split_path(to_path) | |
file_system = await self.get_async_filesystem_for_path(to_path, bucket) | |
bucket, to_path_file_only = split_path(to_path) | |
protocol = get_protocol(to_path) | |
if protocol in ['s3', 'gs', 'abfs'] and not bucket: | |
raise ValueError(f'Bucket cannot be empty for {protocol} protocol') | |
file_system = await self.get_async_filesystem_for_path(to_path, bucket) |
Code Review Run #39883a
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
Code Review Agent Run #8926b7Actionable Suggestions - 0Review Details
|
successfully run it on local, not yet tested remote Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
7c76cc6
to
17bde4a
Compare
Signed-off-by: machichima <nary12321@gmail.com>
Code Review Agent Run #0b7f4dActionable Suggestions - 4
Additional Suggestions - 1
Review Details
|
bucket, to_path_file_only = split_path(to_path) | ||
file_system = await self.get_async_filesystem_for_path(to_path, bucket) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider extracting the bucket and path splitting logic into a separate method to improve code reusability and maintainability. The split_path
function is used in multiple places and could be encapsulated better.
Code suggestion
Check the AI-generated fix before applying
bucket, to_path_file_only = split_path(to_path) | |
file_system = await self.get_async_filesystem_for_path(to_path, bucket) | |
bucket, path = self._split_and_get_bucket_path(to_path) | |
file_system = await self.get_async_filesystem_for_path(to_path, bucket) |
Code Review Run #0b7f4d
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
bucket, from_path_file_only = split_path(from_path) | ||
file_system = await self.get_async_filesystem_for_path(from_path, bucket) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider handling the case where split_path()
returns empty bucket for non-file protocols. Currently passing empty bucket to get_async_filesystem_for_path()
could cause issues with cloud storage access.
Code suggestion
Check the AI-generated fix before applying
bucket, from_path_file_only = split_path(from_path) | |
file_system = await self.get_async_filesystem_for_path(from_path, bucket) | |
bucket, from_path_file_only = split_path(from_path) | |
protocol = get_protocol(from_path) | |
if protocol not in ['file'] and not bucket: | |
raise ValueError(f'Empty bucket not allowed for protocol {protocol}') | |
file_system = await self.get_async_filesystem_for_path(from_path, bucket) |
Code Review Run #0b7f4d
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
flytekit/core/data_persistence.py
Outdated
fsspec.register_implementation("s3", AsyncFsspecStore) | ||
fsspec.register_implementation("gs", AsyncFsspecStore) | ||
fsspec.register_implementation("abfs", AsyncFsspecStore) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider moving the fsspec
implementation registrations to a more appropriate initialization location, such as a module-level __init__.py
or a dedicated setup function. This would improve code organization and make the registrations more discoverable.
Code suggestion
Check the AI-generated fix before applying
fsspec.register_implementation("s3", AsyncFsspecStore) | |
fsspec.register_implementation("gs", AsyncFsspecStore) | |
fsspec.register_implementation("abfs", AsyncFsspecStore) | |
def register_fsspec_implementations(): | |
fsspec.register_implementation("s3", AsyncFsspecStore) | |
fsspec.register_implementation("gs", AsyncFsspecStore) | |
fsspec.register_implementation("abfs", AsyncFsspecStore) | |
# Call during module initialization | |
register_fsspec_implementations() |
Code Review Run #0b7f4d
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
Specify the class properties for each file storage Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Code Review Agent Run #2ba550Actionable Suggestions - 4
Additional Suggestions - 2
Review Details
|
assert kwargs == {"cache_regions": True} | ||
|
||
mock_from_env.return_value = mock.Mock() | ||
mock_from_env.assert_called_with("") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The mock_from_env
assertion appears to be missing configuration parameters that were previously set for AWS HTTP connections and virtual hosted style requests. This could affect test coverage of S3 configuration behavior.
Code suggestion
Check the AI-generated fix before applying
mock_from_env.assert_called_with("") | |
mock_from_env.assert_called_with( | |
"", | |
config={ | |
"aws_allow_http": "true", | |
"aws_virtual_hosted_style_request": "false", | |
}, | |
) |
Code Review Run #2ba550
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
flytekit/core/data_persistence.py
Outdated
def s3store_from_env(bucket: str, retries: int, **store_kwargs) -> S3Store: | ||
store = S3Store.from_env( | ||
bucket, | ||
config={ | ||
**store_kwargs, | ||
"aws_allow_http": "true", # Allow HTTP connections | ||
"aws_virtual_hosted_style_request": "false", # Use path-style addressing | ||
}, | ||
) | ||
return store | ||
|
||
|
||
@lru_cache | ||
def gcsstore_from_env(bucket: str) -> GCSStore: | ||
store = GCSStore.from_env(bucket) | ||
return store | ||
|
||
|
||
@lru_cache | ||
def azurestore_from_env(container: str, **store_kwargs) -> AzureStore: | ||
store = AzureStore.from_env( | ||
container, | ||
config=store_kwargs, | ||
) | ||
return store |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider consolidating the store creation functions into a single factory function to reduce code duplication. The current implementation has similar patterns repeated across s3store_from_env
, gcsstore_from_env
, and azurestore_from_env
.
Code suggestion
Check the AI-generated fix before applying
-@lru_cache
-def s3store_from_env(bucket: str, retries: int, **store_kwargs) -> S3Store:
- store = S3Store.from_env(
- bucket,
- config={
- **store_kwargs,
- "aws_allow_http": "true",
- "aws_virtual_hosted_style_request": "false",
- },
- )
- return store
-
-@lru_cache
-def gcsstore_from_env(bucket: str) -> GCSStore:
- store = GCSStore.from_env(bucket)
- return store
-
-@lru_cache
-def azurestore_from_env(container: str, **store_kwargs) -> AzureStore:
- store = AzureStore.from_env(
- container,
- config=store_kwargs,
- )
- return store
+@lru_cache
+def create_store(store_type: str, container: str, **kwargs) -> Union[S3Store, GCSStore, AzureStore]:
+ if store_type == "s3":
+ return S3Store.from_env(container, config={**kwargs, "aws_allow_http": "true", "aws_virtual_hosted_style_request": "false"})
+ elif store_type == "gcs":
+ return GCSStore.from_env(container)
+ elif store_type == "azure":
+ return AzureStore.from_env(container, config=kwargs)
+ raise ValueError(f"Unsupported store type: {store_type}")
Code Review Run #2ba550
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
def gcsstore_from_env(bucket: str) -> GCSStore: | ||
store = GCSStore.from_env(bucket) | ||
return store |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider passing the anonymous
parameter to gcsstore_from_env()
since it was previously used to set the token to _ANON
when anonymous=True
Code suggestion
Check the AI-generated fix before applying
def gcsstore_from_env(bucket: str) -> GCSStore: | |
store = GCSStore.from_env(bucket) | |
return store | |
def gcsstore_from_env(bucket: str, anonymous: bool = False) -> GCSStore: | |
store = GCSStore.from_env(bucket) | |
if anonymous: | |
store.token = _ANON | |
return store |
Code Review Run #2ba550
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
flytekit/core/data_persistence.py
Outdated
if anonymous: | ||
kwargs[_ANON] = "true" | ||
|
||
store = azurestore_from_env(container, **store_kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using direct function call parameters instead of unpacking store_kwargs
to improve code readability and maintainability. The function call could be more explicit about what parameters are being passed.
Code suggestion
Check the AI-generated fix before applying
- store_kwargs: Dict[str, Any] = {}
- if azure_cfg.account_name:
- store_kwargs["account_name"] = azure_cfg.account_name
- if azure_cfg.account_key:
- store_kwargs["account_key"] = azure_cfg.account_key
- if azure_cfg.client_id:
- store_kwargs["client_id"] = azure_cfg.client_id
- if azure_cfg.client_secret:
- store_kwargs["client_secret"] = azure_cfg.client_secret
- if azure_cfg.tenant_id:
- store_kwargs["tenant_id"] = azure_cfg.tenant_id
- store = azurestore_from_env(container, **store_kwargs)
+ store = azurestore_from_env(
+ container,
+ account_name=azure_cfg.account_name,
+ account_key=azure_cfg.account_key,
+ client_id=azure_cfg.client_id,
+ client_secret=azure_cfg.client_secret,
+ tenant_id=azure_cfg.tenant_id
+ )
Code Review Run #2ba550
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
9f5daf0
to
749a7fe
Compare
Code Review Agent Run #ab65d8Actionable Suggestions - 6
Additional Suggestions - 5
Review Details
|
_FSSPEC_S3_SECRET = "secret" | ||
_ANON = "anon" | ||
_FSSPEC_S3_KEY_ID = "access_key_id" | ||
_FSSPEC_S3_SECRET = "secret_access_key" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The variable name '_FSSPEC_S3_SECRET' suggests a hardcoded password/secret. Consider using a more secure method of handling secrets.
Code suggestion
Check the AI-generated fix before applying
_FSSPEC_S3_SECRET = "secret_access_key" | |
_FSSPEC_S3_ACCESS_KEY = "secret_access_key" |
Code Review Run #ab65d8
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
flytekit/core/obstore_filesystem.py
Outdated
def __init__(self, retries: Optional[int] = None, **kwargs): | ||
""" | ||
Initialize the ObstoreS3FileSystem with optional retries. | ||
|
||
Args: | ||
retries (int): Number of retry for requests | ||
**kwargs: Other keyword arguments passed to the parent class | ||
""" | ||
if retries is not None: | ||
self.retries = retries | ||
|
||
super().__init__(**kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The __init__
method only handles retries
parameter but ignores validation of other important parameters like connect_timeout
and read_timeout
Code suggestion
Check the AI-generated fix before applying
@@ -25,12 +25,18 @@
def __init__(self, retries: Optional[int] = None, **kwargs):
if retries is not None:
self.retries = retries
+ connect_timeout = kwargs.get('connect_timeout')
+ if connect_timeout is not None:
+ self.connect_timeout = connect_timeout
+ read_timeout = kwargs.get('read_timeout')
+ if read_timeout is not None:
+ self.read_timeout = read_timeout
super().__init__(**kwargs)
Code Review Run #ab65d8
Consider consolidating the blocksize
property definition since it appears to be duplicated across multiple classes with the same value. Could potentially move this to a common base class or mixin.
Code suggestion
Check the AI-generated fix before applying
@@ -9,28 +9,33 @@
+class BaseObstoreFileSystem(AsyncFsspecStore):
+ blocksize = DEFAULT_BLOCK_SIZE
+
-class ObstoreS3FileSystem(AsyncFsspecStore):
+class ObstoreS3FileSystem(BaseObstoreFileSystem):
"""
Add following property used in S3FileSystem
"""
root_marker = ""
- blocksize = DEFAULT_BLOCK_SIZE
protocol = ("s3", "s3a")
_extra_tokenize_attributes = ("default_block_size",)
-class ObstoreGCSFileSystem(AsyncFsspecStore):
+class ObstoreGCSFileSystem(BaseObstoreFileSystem):
"""
Add following property used in GCSFileSystem
"""
scopes = {"read_only", "read_write", "full_control"}
- blocksize = DEFAULT_BLOCK_SIZE
protocol = "gcs", "gs"
Code Review Run #29efcc
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
scopes = {"read_only", "read_write", "full_control"} | ||
retries = 6 # number of retries on http failure | ||
default_block_size = DEFAULT_BLOCK_SIZE | ||
protocol = "gcs", "gs" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The protocol
tuple definition is missing parentheses which could lead to incorrect protocol handling. Consider adding parentheses: protocol = ("gcs", "gs")
Code suggestion
Check the AI-generated fix before applying
protocol = "gcs", "gs" | |
protocol = ("gcs", "gs") |
Code Review Run #ab65d8
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
**kwargs, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider removing the duplicate **kwargs
in the return statement since it's already included in azure_setup_args()
Code suggestion
Check the AI-generated fix before applying
**kwargs, | |
} | |
} |
Code Review Run #ab65d8
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
else: | ||
dst = file_system.get(from_path, to_path, recursive=recursive, **kwargs) | ||
if isinstance(dst, (str, pathlib.Path)): | ||
return dst | ||
return to_path | ||
except OSError as oe: | ||
except (OSError, GenericError) as oe: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider handling GenericError
separately from OSError
since they may require different error handling approaches. The current implementation treats them the same way which could mask important error details.
Code suggestion
Check the AI-generated fix before applying
except (OSError, GenericError) as oe: | |
except OSError as oe: | |
logger.debug(f"Error in getting {from_path} to {to_path} rec {recursive} {oe}") | |
await self._handle_get_error(file_system, from_path, to_path, recursive, oe, **kwargs) | |
except GenericError as ge: | |
logger.debug(f"Error in getting {from_path} to {to_path} rec {recursive} {ge}") | |
exists = True # Force anonymous filesystem retry for GenericError |
Code Review Run #ab65d8
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
Signed-off-by: machichima <nary12321@gmail.com>
Code Review Agent Run #29efccActionable Suggestions - 1
Review Details
|
fd22002
to
31015e0
Compare
Code Review Agent Run #77d6ecActionable Suggestions - 0Review Details
|
Code Review Agent Run #1a212cActionable Suggestions - 1
Review Details
|
flytekit/core/data_persistence.py
Outdated
"aws_allow_http": "true", # Allow HTTP connections | ||
"aws_virtual_hosted_style_request": "false", # Use path-style addressing | ||
}, | ||
client_options={"timeout": "999s"}, # need to put this to somewhere for user to config? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The hardcoded timeout value of 999s
in client_options
may be too short for large data transfers. Consider making this configurable through environment variables or configuration settings.
Code suggestion
Check the AI-generated fix before applying
client_options={"timeout": "999s"}, # need to put this to somewhere for user to config? | |
client_options={"timeout": os.getenv("S3_CLIENT_TIMEOUT", "999s")}, |
Code Review Run #1a212c
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
Signed-off-by: machichima <nary12321@gmail.com>
42d400c
to
9645dc7
Compare
Code Review Agent Run #52212eActionable Suggestions - 7
Additional Suggestions - 3
Review Details
|
|
||
scopes = {"read_only", "read_write", "full_control"} | ||
blocksize = DEFAULT_BLOCK_SIZE | ||
protocol = "gcs", "gs" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The protocol
assignment for ObstoreGCSFileSystem
should be enclosed in parentheses to ensure it is a tuple.
Code suggestion
Check the AI-generated fix before applying
protocol = "gcs", "gs" | |
protocol = ("gcs", "gs") |
Code Review Run #52212e
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
import os | ||
import random | ||
import shutil | ||
import tempfile | ||
from uuid import UUID | ||
import typing | ||
import asyncio | ||
from botocore.parsers import base64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The import statement from botocore.parsers import base64
seems incorrect. The base64
module is not part of botocore.parsers
. Consider using Python's built-in base64
module instead.
Code suggestion
Check the AI-generated fix before applying
from botocore.parsers import base64 | |
import base64 |
Code Review Run #52212e
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
storage_options = get_fsspec_storage_options( | ||
"gs", DataConfig.auto(), anonymous=True, other_argument="value" | ||
) | ||
assert storage_options == {"other_argument": "value"} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The assertion in the test case for GCS storage options should include the token
key with the value anon
to ensure the anonymous
parameter is correctly handled.
Code suggestion
Check the AI-generated fix before applying
assert storage_options == {"other_argument": "value"} | |
assert storage_options == {"token": "anon", "other_argument": "value"} |
Code Review Run #52212e
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
@@ -39,6 +39,7 @@ dependencies = [ | |||
"marshmallow-jsonschema>=0.12.0", | |||
"mashumaro>=3.15", | |||
"msgpack>=1.1.0", | |||
"obstore==0.3.0b10", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The version pinning of obstore
to 0.3.0b10
might limit flexibility. Consider using a version range to allow for updates.
Code suggestion
Check the AI-generated fix before applying
"obstore==0.3.0b10", | |
"obstore>=0.3.0b10,<0.4.0", |
Code Review Run #52212e
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
if azure_cfg.tenant_id: | ||
kwargs["tenant_id"] = azure_cfg.tenant_id | ||
kwargs[_ANON] = anonymous | ||
store = gcsstore_from_env(bucket) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function gcsstore_from_env
is called without checking if the bucket
parameter is valid or non-empty. Consider adding validation for the bucket
parameter.
Code suggestion
Check the AI-generated fix before applying
@@ -132,0 +132,2 @@
+ if not bucket:
+ raise ValueError("Bucket name must be provided")
Code Review Run #52212e
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
fsspec.register_implementation("s3", ObstoreS3FileSystem) | ||
fsspec.register_implementation("gs", ObstoreGCSFileSystem) | ||
fsspec.register_implementation("abfs", ObstoreAzureBlobFileSystem) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding error handling for the fsspec.register_implementation
calls to ensure that any issues during registration are caught and handled appropriately.
Code suggestion
Check the AI-generated fix before applying
fsspec.register_implementation("s3", ObstoreS3FileSystem) | |
fsspec.register_implementation("gs", ObstoreGCSFileSystem) | |
fsspec.register_implementation("abfs", ObstoreAzureBlobFileSystem) | |
fsspec.register_implementation("s3", ObstoreS3FileSystem) | |
fsspec.register_implementation("gs", ObstoreGCSFileSystem) | |
fsspec.register_implementation("abfs", ObstoreAzureBlobFileSystem) | |
try: | |
fsspec.register_implementation("s3", ObstoreS3FileSystem) | |
except Exception as e: | |
logger.error(f"Failed to register S3 implementation: {e}") | |
try: | |
fsspec.register_implementation("gs", ObstoreGCSFileSystem) | |
except Exception as e: | |
logger.error(f"Failed to register GCS implementation: {e}") | |
try: | |
fsspec.register_implementation("abfs", ObstoreAzureBlobFileSystem) | |
except Exception as e: | |
logger.error(f"Failed to register Azure Blob implementation: {e}") |
Code Review Run #52212e
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
Tracking issue
Related to flyteorg/flyte#4081
Why are the changes needed?
Use a Rust/Pyo3 package - obstore - as the storage backend for cloud storages. This provides the smaller dependencies size and enable users to use their own s3fs, gsfs, abfs, ... version.
What changes were proposed in this pull request?
Use obstore as the storage backend to replace s3fs, gsfs, and abfs.
How was this patch tested?
Setup process
Screenshots
Performance
Check all the applicable boxes
Related PRs
Docs link
Summary by Bito
This pull request introduces obstore as the new storage backend for Flytekit, replacing existing implementations for S3, GCS, and Azure. It includes enhancements in data persistence, updates to dependencies, and improved unit tests to ensure correctness and compatibility.Unit tests added: True
Estimated effort to review (1-5, lower is better): 5