Visit the full docs here to see additional examples and the API reference.
prefect-azure is a collection of prebuilt Prefect tasks that can be used to quickly construct Prefect flows.
Requires an installation of Python 3.7+
We recommend using a Python virtual environment manager such as pipenv, conda or virtualenv.
These tasks are designed to work with Prefect 2.0. For more information about how to use Prefect, please refer to the Prefect documentation.
Install prefect-azure
with pip
pip install prefect-azure
To use Blob Storage:
pip install "prefect-azure[blob_storage]"
To use Cosmos DB:
pip install "prefect-azure[cosmos_db]"
To use ML Datastore:
pip install "prefect-azure[ml_datastore]"
A list of available blocks in prefect-azure
and their setup instructions can be found here.
from prefect import flow
from prefect_azure import AzureBlobStorageCredentials
from prefect_azure.blob_storage import blob_storage_download
@flow
def example_blob_storage_download_flow():
connection_string = "connection_string"
blob_storage_credentials = AzureBlobStorageCredentials(
connection_string=connection_string,
)
data = blob_storage_download(
blob="prefect.txt",
container="prefect",
azure_credentials=blob_storage_credentials,
)
return data
example_blob_storage_download_flow()
Use with_options
to customize options on any existing task or flow:
custom_blob_storage_download_flow = example_blob_storage_download_flow.with_options(
name="My custom task name",
retries=2,
retry_delay_seconds=10,
)
from prefect import flow
from prefect_azure import AzureContainerInstanceCredentials
from prefect_azure.container_instance import AzureContainerInstanceJob
@flow
def container_instance_job_flow():
aci_credentials = AzureContainerInstanceCredentials.load("MY_BLOCK_NAME")
container_instance_job = AzureContainerInstanceJob(
aci_credentials=aci_credentials,
resource_group_name="azure_resource_group.example.name",
subscription_id="<MY_AZURE_SUBSCRIPTION_ID>",
command=["echo", "hello world"],
)
return container_instance_job.run()
If we have a_flow_module.py
:
from prefect import flow, get_run_logger
@flow
def log_hello_flow(name="Marvin"):
logger = get_run_logger()
logger.info(f"{name} said hello!")
if __name__ == "__main__":
log_hello_flow()
We can run that flow using an Azure Container Instance, but first create the infrastructure block:
from prefect_azure import AzureContainerInstanceCredentials
from prefect_azure.container_instance import AzureContainerInstanceJob
container_instance_job = AzureContainerInstanceJob(
aci_credentials=AzureContainerInstanceCredentials.load("MY_BLOCK_NAME"),
resource_group_name="azure_resource_group.example.name",
subscription_id="<MY_AZURE_SUBSCRIPTION_ID>",
)
container_instance_job.save("aci-dev")
Then, create the deployment either on the UI or through the CLI:
prefect deployment build a_flow_module.py:log_hello_flow --name aci-dev -ib container-instance-job/aci-dev
Visit Prefect Deployments for more information about deployments.
For more tips on how to use tasks and flows in a Collection, check out Using Collections!
If you encounter and bugs while using prefect-azure
, feel free to open an issue in the prefect-azure repository.
If you have any questions or issues while using prefect-azure
, you can find help in either the Prefect Discourse forum or the Prefect Slack community
Feel free to star or watch prefect-azure
for updates too!
If you'd like to help contribute to fix an issue or add a feature to prefect-azure
, please propose changes through a pull request from a fork of the repository.
Here are the steps:
- Fork the repository
- Clone the forked repository
- Install the repository and its dependencies:
pip install -e ".[dev]"
- Make desired changes
- Add tests
- Insert an entry to CHANGELOG.md
- Install
pre-commit
to perform quality checks prior to commit:
pre-commit install
git commit
,git push
, and create a pull request