Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up on, saving to cloud #214

Merged
merged 30 commits into from
Jan 7, 2025
Merged

Speed up on, saving to cloud #214

merged 30 commits into from
Jan 7, 2025

Conversation

peterdudfield
Copy link
Contributor

@peterdudfield peterdudfield commented Jan 2, 2025

Pull Request

Description

  • Only uses processes, not threads in the cloud
  • reduce chunks from 8 in lat, lon and variables to 2, this reduce the numbers of files by 64. Down from ~50,000 to ~700. This makes writing to s3 much quicker. Reduces the run time from 1 hour to 10 mins.
  • For archive, all the options default to what they were before

openclimatefix/ocf-infrastructure#697

How Has This Been Tested?

  • CI tests
  • Ran locally
  • Tested on Development

Checklist:

  • My code follows OCF's coding style guidelines
  • I have performed a self-review of my own code
  • I have made corresponding changes to the documentation
  • I have added tests that prove my fix is effective or that my feature works
  • I have checked my code and corrected any misspellings

@peterdudfield peterdudfield requested a review from devsjc January 2, 2025 13:06
@peterdudfield peterdudfield marked this pull request as draft January 2, 2025 13:11
@peterdudfield peterdudfield changed the title add NUMBER_CONCURRENT_JOBS Speed up on, saving to cloud Jan 3, 2025
@peterdudfield peterdudfield marked this pull request as ready for review January 3, 2025 10:07
Copy link
Collaborator

@devsjc devsjc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see the utility in this but I'd change the implementation a little bit personally. Additionally, there are some consequences to this change that you might not know about as well, I'l try to explain here:

A bit of context first: The previous consumer iteration just had the one chunk for geographical coordinates. This was fine becase at any given point when making the dataset, all of that data was available to be written. This is because the old consumer would pull all the raw data, merge it all in memory, and then write it to the store. The pre merge was memory intensive and slow, but it meant for certain that that whole chunk of all the geographic data would always be written all at once.

The new consumer doesn't do this - it pulls the raw files and writes them on the fly to specified regions of the store, which its what enables parallel processing. However for this to be safe, it has to be the case that each individual write is covered by a chunk, i.e. no chunk can be bigger than the data that is expected to be recieved by a single file. See

Concurrent writes with region are safe as long as they modify distinct chunks in the underlying Zarr arrays (or use an appropriate lock).

from https://docs.xarray.dev/en/stable/user-guide/io.html?appending-to-existing-zarr-stores=#distributed-writes

Mostly, sources of NWP data provide files per parameter, so as long as there is one parameter per chunk in the store, they can be written in parallel. But some (ceda for one) split their files by areas, so there are four files each covering a different geographical region. As such, there has to be at least four chunks per geographical coordinate in the store, whether its being saved to S3 or otherwise, or else the regional writing will be broken.

The long and short of this is that the default chunking modification isn't a coordinate issue, but a RawRepository one - it depends on how they provide thair data. By using 8 by default I had a catch-all for everything we covered so far, but I appreciate that is only necessary for the ones that provide multiple area files. As such, I think this PR should be slightly reworked to modify where this logic is carried out, away from the NWPDimensionCoordinateMap class and into the RepositoryMetadata. Alternatively, we can drop the chunk size down to be equal to the length of each coordinate but use an appropriate lock as instructed by the docs? But this might lose us some concurrency performance gains.

I'm happy to make the changes - hopefully my reasoning above makes sense?

@@ -194,6 +196,7 @@ def _download(self, url: str) -> ResultE[pathlib.Path]:
).with_suffix(".grib").expanduser()

# Only download the file if not already present
log.info("Checking for local file: '%s'", local_path)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this and the below should be debug logs

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had it as threads because it's IO that's intensive, as opposed to compute, in each iteration. How come you cahnge it to processes? Also, if concurrency is set to True, why would n_jobs want to then be set to 1? Would that not make it not concurrent again?

if os.getenv("CONCURRENCY", "True").capitalize() == "False":
prefer = "threads"

concurrency = os.getenv("CONCURRENCY", "True").capitalize() == "False"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, there is some funny logic here that is not clear, I will tidy up

@peterdudfield
Copy link
Contributor Author

I can see the utility in this but I'd change the implementation a little bit personally. Additionally, there are some consequences to this change that you might not know about as well, I'l try to explain here:

A bit of context first: The previous consumer iteration just had the one chunk for geographical coordinates. This was fine becase at any given point when making the dataset, all of that data was available to be written. This is because the old consumer would pull all the raw data, merge it all in memory, and then write it to the store. The pre merge was memory intensive and slow, but it meant for certain that that whole chunk of all the geographic data would always be written all at once.

The new consumer doesn't do this - it pulls the raw files and writes them on the fly to specified regions of the store, which its what enables parallel processing. However for this to be safe, it has to be the case that each individual write is covered by a chunk, i.e. no chunk can be bigger than the data that is expected to be recieved by a single file. See

Concurrent writes with region are safe as long as they modify distinct chunks in the underlying Zarr arrays (or use an appropriate lock).

from https://docs.xarray.dev/en/stable/user-guide/io.html?appending-to-existing-zarr-stores=#distributed-writes

Mostly, sources of NWP data provide files per parameter, so as long as there is one parameter per chunk in the store, they can be written in parallel. But some (ceda for one) split their files by areas, so there are four files each covering a different geographical region. As such, there has to be at least four chunks per geographical coordinate in the store, whether its being saved to S3 or otherwise, or else the regional writing will be broken.

The long and short of this is that the default chunking modification isn't a coordinate issue, but a RawRepository one - it depends on how they provide thair data. By using 8 by default I had a catch-all for everything we covered so far, but I appreciate that is only necessary for the ones that provide multiple area files. As such, I think this PR should be slightly reworked to modify where this logic is carried out, away from the NWPDimensionCoordinateMap class and into the RepositoryMetadata. Alternatively, we can drop the chunk size down to be equal to the length of each coordinate but use an appropriate lock as instructed by the docs? But this might lose us some concurrency performance gains.

I'm happy to make the changes - hopefully my reasoning above makes sense?

Yea, it seems a long way round to do it, but I couldnt quite work out how to change it otherwise. Perhaps there's a way you can modify it

@devsjc devsjc merged commit 0f51e30 into main Jan 7, 2025
4 checks passed
@devsjc devsjc deleted the concurrent-jobs branch January 7, 2025 09:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants