Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IOManager can not handle both Partitioned and Un-partitioned assets at the same materialization #26840

Open
tnhgiang opened this issue Jan 6, 2025 · 0 comments
Labels
area: docs Related to documentation in general area: io-manager Related to I/O Managers

Comments

@tnhgiang
Copy link

tnhgiang commented Jan 6, 2025

What's the issue or suggestion?

Issue

According to document, we'll implement an custom I/O manager to handle partitions

class MyPartitionedIOManager(IOManager):
    def _get_path(self, context) -> str:
        if context.has_partition_key:
            return "/".join(context.asset_key.path + [context.asset_partition_key])
        else:
            return "/".join(context.asset_key.path)

    def handle_output(self, context: OutputContext, obj):
        write_csv(self._get_path(context), obj)

    def load_input(self, context: InputContext):
        return read_csv(self._get_path(context))

If both partitioned asset and un-partitioned asset materialize at the same time, the code above will introduce error:

  • The partitioned asset can work normally because it has both context.has_partition_key=True and value for context.asset_partition_key
  • The un-partitioned asset will be crashed because it has context.has_partition_key=True, but has no value for context.asset_partition_key

Suggestion

Instead of context.has_partition_key to check whether an asset has partitions or not, we should use context.has_asset_partitions

class MyPartitionedIOManager(IOManager):
    def _get_path(self, context) -> str:
        if context.has_asset_partitions:
            return "/".join(context.asset_key.path + [context.asset_partition_key])
        else:
            return "/".join(context.asset_key.path)

    def handle_output(self, context: OutputContext, obj):
        write_csv(self._get_path(context), obj)

    def load_input(self, context: InputContext):
        return read_csv(self._get_path(context))

Additional information

Dagster versioin: 1.9.3

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

@tnhgiang tnhgiang added the area: docs Related to documentation in general label Jan 6, 2025
@garethbrickman garethbrickman added the area: io-manager Related to I/O Managers label Jan 6, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area: docs Related to documentation in general area: io-manager Related to I/O Managers
Projects
None yet
Development

No branches or pull requests

2 participants