Skip to content

Commit

Permalink
Start project
Browse files Browse the repository at this point in the history
  • Loading branch information
leonelluiscorado committed Feb 15, 2024
1 parent d0834a8 commit 64e3580
Show file tree
Hide file tree
Showing 7 changed files with 525 additions and 0 deletions.
Empty file added example-notebook.ipynb
Empty file.
2 changes: 2 additions & 0 deletions gedi_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
import GEDIPipeline

1 change: 1 addition & 0 deletions pipeline/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .gedi_pipeline import GEDIPipeline
77 changes: 77 additions & 0 deletions pipeline/gedi_downloader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import os
import requests
from tqdm import tqdm

class SessionNASA(requests.Session):

AUTH_HOST = 'urs.earthdata.nasa.gov'

def __init__(self, username, password):
super().__init__()
self.auth = (username, password)

def rebuild_auth(self, prepared_request, response):
headers = prepared_request.headers
url = prepared_request.url
if 'Authorization' in headers:
original_parsed = requests.utils.urlparse(response.request.url)
redirect_parsed = requests.utils.urlparse(url)
if (original_parsed.hostname != redirect_parsed.hostname) and redirect_parsed.hostname != self.AUTH_HOST and original_parsed.hostname != self.AUTH_HOST:
del headers['Authorization']
return

class GEDIDownloader:

def __init__(self, username, password, save_path=None):
self.save_path = save_path
self.session = SessionNASA(username, password)

def _download(self, content, save_path, length):
with open(save_path, "wb") as file, tqdm(total=int(length)) as pbar:

for chunk in content:
# Filter out keep alive chunks
if not chunk:
continue

file.write(chunk)
pbar.update(len(chunk))

def _check_file_integrity(self, file_path, size):
return os.path.getsize(file_path) == size


def _precheck_file(self, file_path, size):
if not os.path.exists(file_path):
print(f"[Downloader] Downloading granule and saving \"{file_path}\"...")
return False

# File exists but not complete, restart download
if os.path.getsize(file_path) != size:
print(f"[Downloader] File at \"{file_path}\" exists but corrupted. Downloading again...")
# Delete file and restart download
os.remove(file_path)
return False

# File exists and complete, skip download
print(f"[Downloader] File at \"{file_path}\" exists. Skipping download...")
return True

def download_granule(self, url):
filename = url.split("/")[-1]
file_path = os.path.join(self.save_path, filename)
chunk_size = 1024 * 128 # 128KB chunk

http_response = self.session.get(url, stream=True)
response_length = http_response.headers.get('content-length')

# If file not exists, download
if not self._precheck_file(file_path, int(response_length)):
self._download(http_response.iter_content(chunk_size=chunk_size), file_path, response_length)

if not self._check_file_integrity(file_path, int(response_length)):
# File not downloaded correctly, send message for download retry
return False

return True

109 changes: 109 additions & 0 deletions pipeline/gedi_finder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import os
import requests as r
from datetime import datetime

class GEDIFinder:

def __init__(self, product='GEDI02_A', version='002', date_start='', date_end='', roi=None):

self.product = product
self.version = version

# Date format must be in "Year.month.day"
try:
self.date_start = datetime.strptime(date_start, "%Y.%m.%d")
self.date_end = datetime.strptime(date_end, "%Y.%m.%d")
except:
print("Dates provided not valid. Valid format is \"Y.m.d\" (e.g. 2019.01.01).")

if roi is not None:
# GEDIFinder expects bbox to be (LL_lon, LL_lat, UR_lon, UR_lat)
[ul_lat, ul_lon, lr_lat, lr_lon] = roi
self.roi = " ".join(map(str, [ul_lon, lr_lat, lr_lon, ul_lat]))


def _find_all_granules(self):
"""
Requests all the links and download sizes for each granule found over the ROI provided.
Based on the GEDI Data Resources github repository by :
"""

# Define the base CMR granule search url, including LPDAAC provider name and max page size (2000 is the max allowed)
cmr = "https://cmr.earthdata.nasa.gov/search/granules.json?pretty=true&provider=LPDAAC_ECS&page_size=2000&concept_id="

# Set up dictionary where key is GEDI shortname + version
concept_ids = {'GEDI01_B.002': 'C1908344278-LPDAAC_ECS',
'GEDI02_A.002': 'C1908348134-LPDAAC_ECS',
'GEDI02_B.002': 'C1908350066-LPDAAC_ECS'}

# CMR uses pagination for queries with more features returned than the page size
page = 1
bbox = self.roi.replace(' ', ',') # remove any white spaces
product = self.product+"."+self.version

try:
# Send GET request to CMR granule search endpoint w/ product concept ID, bbox & page number, format return as json
cmr_response = r.get(f"{cmr}{concept_ids[product]}&bounding_box={bbox}&pageNum={page}").json()['feed']['entry']

# If 2000 features are returned, move to the next page and submit another request, and append to the response
while len(cmr_response) % 2000 == 0:
page += 1
cmr_response += r.get(f"{cmr}{concept_ids[product]}&bounding_box={bbox}&pageNum={page}").json()['feed']['entry']
# CMR returns more info than just the Data Pool links, below use list comprehension to return a list of DP links
return [(c['links'][0]['href'], c['granule_size']) for c in cmr_response if not ".png" in c['links'][0]['href']]
except:
# If the request did not complete successfully, print out the response from CMR
print("[Finder] Request not successful.")
print(r.get(f"{cmr}{concept_ids[product]}&bounding_box={bbox.replace(' ', '')}&pageNum={page}").json())
exit(0)


def _date_filter(self, granules):
"""
GEDI Finder, by default, finds all the granules that pass over ROI.
This function (date_filter) filters the desired granules by the dates provided.
"""
filter_g = []

for g in granules:
# Date of granule is at the 7th section on CMR website
date_sec = datetime.strptime(g[0].split("/")[7], "%Y.%m.%d")

# Stop search query if passes end_date
if date_sec > self.date_end:
return filter_g

if date_sec >= self.date_start and date_sec <= self.date_end:
filter_g.append(g)

return filter_g


def _check_download_size(self, link_list):
"""
Converts MB to GB and returns download size of all the links provided by the *link_list*
"""
return sum(float(l[1]) for l in link_list) / 1000


def find(self, output_filepath, save_file=True):

all_granules = self._find_all_granules()

print(f"[Finder] Found {len(all_granules)} granules over bbox [{self.roi}]")

granules_date_filtered = self._date_filter(all_granules)

print(f"[Finder] Between dates ({self.date_start}) and ({self.date_end}) exist {len(granules_date_filtered)} granules over bbox [{self.roi}]")
print(f"[Finder] Estimated download size for select granules : {self._check_download_size(granules_date_filtered):.2f} GB")

if save_file:
filename = f"{self.product.replace('.', '_')}_GranuleList_{datetime.now().strftime('%Y%m%d%H%M%S')}.txt"
# Open file and write each granule link on a new line
with open(os.path.join(output_filepath, filename), "w") as gf:
for g in granules_date_filtered:
gf.write(f"{g[0]}\n")

print(f"[Finder] Saved links to file {os.path.join(output_filepath, filename)}")

return granules_date_filtered
69 changes: 69 additions & 0 deletions pipeline/gedi_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from pipeline import *


"""
Script that controls the entire GEDI Finder - Downloader - Subsetter pipeline.
"""

class GEDIPipeline:

def __init__(self, username, password, out_directory, product, version, date_start, date_end, roi, sds, beams):

self.product = product
self.version = version
self.date_start, self.date_end = date_start, date_end
self.roi = [float(c) for c in roi.split(",")]
self.username = username
self.password = password
self.out_directory = out_directory
self.sds = sds
self.beams = beams

self.finder = GEDIFinder(
product=self.product,
version=self.version,
date_start=self.date_start,
date_end=self.date_end,
roi=self.roi
)

self.downloader = GEDIDownloader(
username=self.username,
password=self.password,
save_path=self.out_directory
)

self.subsetter = GEDISubsetter(
roi=self.roi,
product=self.product,
out_dir=self.out_directory,
sds=self.sds,
beams=self.beams
)

# Make dir if not exists
if not os.path.exists(out_directory):
os.mkdir(out_directory)

def run_pipeline(self):

all_granules = self.finder.find(output_filepath=self.out_directory, save_file=True)

# Start download for every granule
for g in all_granules:

if os.path.exists(os.path.join(self.out_directory, g[0].split("/")[-1].replace(".h5", ".gpkg"))):
print(f"Skipping granule from link {g} as it is already subsetted.")
continue

# Try Download
if not self.downloader.download_granule(g[0]):
print(f"[Download FAIL] Fail download for link {g}")

# Subset
self.subsetter.subset(os.path.join(self.out_directory, g[0].split("/")[-1]))

# Delete original file and keep subset to ROI granule to save space
os.remove(os.path.join(self.out_directory, g[0].split("/")[-1]))

return all_granules
Loading

0 comments on commit 64e3580

Please sign in to comment.