Skip to content

Commit

Permalink
Remove legacy provider DAG logic (WordPress#849)
Browse files Browse the repository at this point in the history
* Remove legacy branches for DAG wrappers

* Derive provider script and media types from ingester class

* Rename 'ingestion_callable' to 'ingester_class'

* Fix tests

* Remove unused import

* Update docs

* Fix extended tests

* Fix bad merge

* Add Europeana reingestion doc
  • Loading branch information
stacimc authored Oct 28, 2022
1 parent c3c1e6b commit 92771e3
Showing 8 changed files with 186 additions and 289 deletions.
99 changes: 94 additions & 5 deletions DAGs.md
Original file line number Diff line number Diff line change
@@ -100,11 +100,11 @@ The following are DAGs grouped by their primary tag:

| DAG ID | Schedule Interval |
| --- | --- |
| `europeana_reingestion_workflow` | `@weekly` |
| `flickr_reingestion_workflow` | `@weekly` |
| `metropolitan_museum_reingestion_workflow` | `@weekly` |
| `phylopic_reingestion_workflow` | `@weekly` |
| `wikimedia_reingestion_workflow` | `@weekly` |
| [`europeana_reingestion_workflow`](#europeana_reingestion_workflow) | `@weekly` |
| [`flickr_reingestion_workflow`](#flickr_reingestion_workflow) | `@weekly` |
| [`metropolitan_museum_reingestion_workflow`](#metropolitan_museum_reingestion_workflow) | `@weekly` |
| [`phylopic_reingestion_workflow`](#phylopic_reingestion_workflow) | `@weekly` |
| [`wikimedia_reingestion_workflow`](#wikimedia_reingestion_workflow) | `@weekly` |


# DAG documentation
@@ -114,15 +114,19 @@ The following is documentation associated with each DAG (where available):
1. [`airflow_log_cleanup`](#airflow_log_cleanup)
1. [`audio_data_refresh`](#audio_data_refresh)
1. [`check_silenced_dags`](#check_silenced_dags)
1. [`europeana_reingestion_workflow`](#europeana_reingestion_workflow)
1. [`europeana_workflow`](#europeana_workflow)
1. [`flickr_reingestion_workflow`](#flickr_reingestion_workflow)
1. [`flickr_workflow`](#flickr_workflow)
1. [`freesound_workflow`](#freesound_workflow)
1. [`image_data_refresh`](#image_data_refresh)
1. [`inaturalist_workflow`](#inaturalist_workflow)
1. [`jamendo_workflow`](#jamendo_workflow)
1. [`metropolitan_museum_reingestion_workflow`](#metropolitan_museum_reingestion_workflow)
1. [`metropolitan_museum_workflow`](#metropolitan_museum_workflow)
1. [`oauth2_authorization`](#oauth2_authorization)
1. [`oauth2_token_refresh`](#oauth2_token_refresh)
1. [`phylopic_reingestion_workflow`](#phylopic_reingestion_workflow)
1. [`phylopic_workflow`](#phylopic_workflow)
1. [`pr_review_reminders`](#pr_review_reminders)
1. [`rawpixel_workflow`](#rawpixel_workflow)
@@ -132,6 +136,7 @@ The following is documentation associated with each DAG (where available):
1. [`smithsonian_workflow`](#smithsonian_workflow)
1. [`stocksnap_workflow`](#stocksnap_workflow)
1. [`wikimedia_commons_workflow`](#wikimedia_commons_workflow)
1. [`wikimedia_reingestion_workflow`](#wikimedia_reingestion_workflow)
1. [`wordpress_workflow`](#wordpress_workflow)


@@ -214,6 +219,19 @@ after the issue has been resolved.
The DAG runs weekly.


## `europeana_reingestion_workflow`


Content Provider: Europeana

ETL Process: Use the API to identify all CC licensed images.

Output: TSV file containing the images and the
respective meta-data.

Notes: https://www.europeana.eu/api/v2/search.json


## `europeana_workflow`


@@ -227,6 +245,20 @@ Output: TSV file containing the images and the
Notes: https://www.europeana.eu/api/v2/search.json


## `flickr_reingestion_workflow`


Content Provider: Flickr

ETL Process: Use the API to identify all CC licensed images.

Output: TSV file containing the images and the
respective meta-data.

Notes: https://www.flickr.com/help/terms/api
Rate limit: 3600 requests per hour.


## `flickr_workflow`


@@ -335,6 +367,35 @@ Notes: https://api.jamendo.com/v3.0/tracks/
channels: 1/2


## `metropolitan_museum_reingestion_workflow`


Content Provider: Metropolitan Museum of Art

ETL Process: Use the API to identify all CC0 artworks.

Output: TSV file containing the image, their respective
meta-data.

Notes: https://metmuseum.github.io/#search
"Please limit requests to 80 requests per second." May need to
bump up the delay (e.g. to 3 seconds), to avoid of blocking
during local development testing.

Some analysis to improve data quality was conducted using a
separate csv file here: https://github.com/metmuseum/openaccess

Get a list of object IDs:
https://collectionapi.metmuseum.org/public/collection/v1/objects?metadataDate=2022-08-10
Get a specific object:
https://collectionapi.metmuseum.org/public/collection/v1/objects/1027
The search functionality requires a specific query (term search)
in addition to date and public domain. It seems like it won't
connect with just date and license.
https://collectionapi.metmuseum.org/public/collection/v1/search?isPublicDomain=true&metadataDate=2022-08-07



## `metropolitan_museum_workflow`


@@ -402,6 +463,20 @@ update the tokens stored in the Variable upon successful refresh.
- Freesound


## `phylopic_reingestion_workflow`


Content Provider: PhyloPic

ETL Process: Use the API to identify all CC licensed images.

Output: TSV file containing the image,
their respective meta-data.

Notes: http://phylopic.org/api/
No rate limit specified.


## `phylopic_workflow`


@@ -526,6 +601,20 @@ Notes: https://stocksnap.io/api/load-photos/date/desc/1
## `wikimedia_commons_workflow`


Content Provider: Wikimedia Commons

ETL Process: Use the API to identify all CC-licensed images.

Output: TSV file containing the image, the respective
meta-data.

Notes: https://commons.wikimedia.org/wiki/API:Main_page
No rate limit specified.


## `wikimedia_reingestion_workflow`


Content Provider: Wikimedia Commons

ETL Process: Use the API to identify all CC-licensed images.
87 changes: 20 additions & 67 deletions openverse_catalog/dags/providers/factory_utils.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,19 @@
import inspect
import logging
import time
from datetime import datetime
from types import FunctionType
from typing import Callable, Sequence
from typing import Sequence, Type

from airflow.models import DagRun, TaskInstance
from airflow.utils.dates import cron_presets
from common.constants import MediaType
from common.storage.media import MediaStore
from providers.provider_api_scripts.provider_data_ingester import ProviderDataIngester


logger = logging.getLogger(__name__)


def _load_provider_script_stores(
ingestion_callable: Callable,
media_types: list[MediaType],
**kwargs,
) -> dict[str, MediaStore]:
"""
Load the stores associated with a provided ingestion callable. This callable is
assumed to be a legacy provider script and NOT a provider data ingestion class.
"""
# Stores exist at the module level, so in order to retrieve the output values we
# must first pull the stores from the module.
module = inspect.getmodule(ingestion_callable)
stores = {}
for media_type in media_types:
store = getattr(module, f"{media_type}_store", None)
if not store:
continue
stores[media_type] = store

if len(stores) != len(media_types):
raise ValueError(
f"Expected stores in {module.__name__} were missing: "
f"{list(set(media_types) - set(stores))}"
)
return stores


def generate_tsv_filenames(
ingestion_callable: Callable,
ingester_class: Type[ProviderDataIngester],
media_types: list[MediaType],
ti: TaskInstance,
dag_run: DagRun,
@@ -60,20 +31,14 @@ def generate_tsv_filenames(
args = args or []
logger.info("Pushing available store paths to XComs")

# TODO: This entire branch can be removed when all of the provider scripts have been
# TODO: refactored to subclass ProviderDataIngester.
if isinstance(ingestion_callable, FunctionType):
stores = _load_provider_script_stores(ingestion_callable, media_types)

else:
# A ProviderDataIngester class was passed instead. First we initialize the
# class, which will initialize the media stores and DelayedRequester.
logger.info(
f"Initializing ProviderIngester {ingestion_callable.__name__} in"
f"order to generate store filenames."
)
ingester = ingestion_callable(dag_run.conf, *args)
stores = ingester.media_stores
# Initialize the ProviderDataIngester class, which will initialize the
# DelayedRequester and appropriate media stores.
logger.info(
f"Initializing ProviderIngester {ingester_class.__name__} in"
f"order to generate store filenames."
)
ingester = ingester_class(dag_run.conf, *args)
stores = ingester.media_stores

# Push the media store output paths to XComs.
for store in stores.values():
@@ -84,7 +49,7 @@ def generate_tsv_filenames(


def pull_media_wrapper(
ingestion_callable: Callable,
ingester_class: Type[ProviderDataIngester],
media_types: list[MediaType],
tsv_filenames: list[str],
ti: TaskInstance,
@@ -107,23 +72,11 @@ def pull_media_wrapper(
)
logger.info("Setting media stores to the appropriate output filenames")

# TODO: This entire branch can be removed when all of the provider scripts have been
# TODO: refactored to subclass ProviderDataIngester.
if isinstance(ingestion_callable, FunctionType):
# Stores exist at the module level, so in order to set the output values we
# must first pull the stores from the module.
stores = _load_provider_script_stores(ingestion_callable, media_types)
run_func = ingestion_callable
else:
# A ProviderDataIngester class was passed instead. First we initialize the
# class, which will initialize the media stores and DelayedRequester.
logger.info(f"Initializing ProviderIngester {ingestion_callable.__name__}")
ingester = ingestion_callable(dag_run.conf, *args)
stores = ingester.media_stores
run_func = ingester.ingest_records
# args have already been passed into the ingester, we don't need them passed
# in again to the ingest_records function, so we clear the list
args = []
# Initialize the ProviderDataIngester class, which will initialize the
# media stores and DelayedRequester.
logger.info(f"Initializing ProviderIngester {ingester_class.__name__}")
ingester = ingester_class(dag_run.conf, *args)
stores = ingester.media_stores

for store, tsv_filename in zip(stores.values(), tsv_filenames):
logger.info(
@@ -134,10 +87,10 @@ def pull_media_wrapper(

logger.info("Beginning ingestion")
start_time = time.perf_counter()
# Not passing kwargs here because Airflow throws a bunch of stuff in there that
# none of our provider scripts are expecting.
# Not passing args or kwargs here because Airflow throws a bunch of stuff in
# there that none of our provider scripts are expecting.
try:
data = run_func(*args)
data = ingester.ingest_records()
finally:
end_time = time.perf_counter()
# Report duration
2 changes: 1 addition & 1 deletion openverse_catalog/dags/providers/provider_dag_factory.py
Original file line number Diff line number Diff line change
@@ -124,7 +124,7 @@ def append_day_shift(id_str):
identifier = f"{provider_name}_{{{{ ts_nodash }}}}_{day_shift}"

ingestion_kwargs = {
"ingestion_callable": conf.ingestion_callable,
"ingester_class": conf.ingester_class,
"media_types": conf.media_types,
}
if conf.dated:
24 changes: 12 additions & 12 deletions openverse_catalog/dags/providers/provider_reingestion_workflows.py
Original file line number Diff line number Diff line change
@@ -47,14 +47,19 @@ class ProviderReingestionWorkflow(ProviderWorkflow):

def __post_init__(self):
if not self.dag_id:
self.dag_id = f"{self.provider_script}_reingestion_workflow"
# Call super() first to initialize the provider_name
super().__post_init__()
# Override the dag_id
self.dag_id = f"{self.provider_name}_reingestion_workflow"
return

super().__post_init__()


PROVIDER_REINGESTION_WORKFLOWS = [
ProviderReingestionWorkflow(
# 60 total reingestion days
provider_script="europeana",
ingestion_callable=EuropeanaDataIngester,
ingester_class=EuropeanaDataIngester,
max_active_tasks=3,
pull_timeout=timedelta(hours=12),
daily_list_length=7,
@@ -63,8 +68,7 @@ def __post_init__(self):
),
ProviderReingestionWorkflow(
# 128 total reingestion days
provider_script="flickr",
ingestion_callable=FlickrDataIngester,
ingester_class=FlickrDataIngester,
pull_timeout=timedelta(minutes=30),
daily_list_length=7,
weekly_list_length=12,
@@ -75,8 +79,7 @@ def __post_init__(self):
),
ProviderReingestionWorkflow(
# 64 total reingestion days
provider_script="metropolitan_museum",
ingestion_callable=MetMuseumDataIngester,
ingester_class=MetMuseumDataIngester,
max_active_tasks=2,
pull_timeout=timedelta(hours=12),
daily_list_length=6,
@@ -86,8 +89,7 @@ def __post_init__(self):
),
ProviderReingestionWorkflow(
# 64 total reingestion days
provider_script="phylopic",
ingestion_callable=PhylopicDataIngester,
ingester_class=PhylopicDataIngester,
max_active_tasks=2,
pull_timeout=timedelta(hours=12),
daily_list_length=6,
@@ -98,10 +100,8 @@ def __post_init__(self):
ProviderReingestionWorkflow(
# 64 total reingestion days
dag_id="wikimedia_reingestion_workflow",
provider_script="wikimedia_commons",
ingestion_callable=WikimediaCommonsDataIngester,
ingester_class=WikimediaCommonsDataIngester,
pull_timeout=timedelta(minutes=90),
media_types=("image", "audio"),
max_active_tasks=2,
daily_list_length=6,
one_month_list_length=9,
Loading

0 comments on commit 92771e3

Please sign in to comment.