Skip to content

Latest commit

 

History

History
 
 

azure-eventgrid

Azure Event Grid client library for Python

Azure Event Grid is a fully-managed intelligent event routing service that allows for uniform event consumption using a publish-subscribe model.

Source code | Package (PyPI) | Package (Conda) | API reference documentation | Product documentation | Samples | Changelog

Getting started

Prerequisites

Install the package

Install the Azure Event Grid client library for Python with pip:

pip install azure-eventgrid
  • An existing Event Grid topic or domain is required. You can create the resource using Azure Portal or Azure CLI

If you use Azure CLI, replace <resource-group-name> and <resource-name> with your own unique names.

Create an Event Grid Topic

az eventgrid topic --create --location <location> --resource-group <resource-group-name> --name <resource-name>

Create an Event Grid Domain

az eventgrid domain --create --location <location> --resource-group <resource-group-name> --name <resource-name>

Authenticate the client

In order to interact with the Event Grid service, you will need to create an instance of a client. An endpoint and credential are necessary to instantiate the client object.

Using Azure Active Directory (AAD)

Azure Event Grid provides integration with Azure Active Directory (Azure AD) for identity-based authentication of requests. With Azure AD, you can use role-based access control (RBAC) to grant access to your Azure Event Grid resources to users, groups, or applications.

To send events to a topic or domain with a TokenCredential, the authenticated identity should have the "EventGrid Data Sender" role assigned.

With the azure-identity package, you can seamlessly authorize requests in both development and production environments. To learn more about Azure Active Directory, see the azure-identity README.

For example, you can use DefaultAzureCredential to construct a client which will authenticate using Azure Active Directory:

from azure.identity import DefaultAzureCredential
from azure.eventgrid import EventGridPublisherClient, EventGridEvent

default_az_credential = DefaultAzureCredential()
endpoint = os.environ["EVENTGRID_TOPIC_ENDPOINT"]
client = EventGridPublisherClient(endpoint, default_az_credential)

Looking up the endpoint

You can find the topic endpoint within the Event Grid Topic resource on the Azure portal. This will look like: "https://<event-grid-topic-name>.<topic-location>.eventgrid.azure.net/api/events"

Create the client with AzureKeyCredential

To use an Access key as the credential parameter, pass the key as a string into an instance of AzureKeyCredential.

Note: The Access Key may be found in the azure portal in the "Access Keys" menu of the Event Grid Topic resource. They may also be obtained via the azure CLI, or the azure-mgmt-eventgrid library. A guide for getting access keys can be found here.

import os
from azure.eventgrid import EventGridPublisherClient
from azure.core.credentials import AzureKeyCredential

topic_key = os.environ["EVENTGRID_TOPIC_KEY"]
endpoint = os.environ["EVENTGRID_TOPIC_ENDPOINT"]

credential_key = AzureKeyCredential(topic_key)
client = EventGridPublisherClient(endpoint, credential_key)

Note: A client may also be authenticated via SAS signature, using the AzureSasCredential. A sample demonstrating this, is available here (async_version).

Note: The generate_sas method can be used to generate a shared access signature. A sample demonstrating this can be seen here.

Key concepts

Topic

A topic is a channel within the EventGrid service to send events. The event schema that a topic accepts is decided at topic creation time. If events of a schema type are sent to a topic that requires a different schema type, errors will be raised.

Domain

An event domain is a management tool for large numbers of Event Grid topics related to the same application. They allow you to publish events to thousands of topics. Domains also give you authorization and authentication control over each topic. For more information, visit Event domain overview.

When you create an event domain, a publishing endpoint for this domain is made available to you. This process is similar to creating an Event Grid Topic. The only difference is that, when publishing to a domain, you must specify the topic within the domain that you'd like the event to be delivered to.

Event schemas

An event is the smallest amount of information that fully describes something that happened in the system. When a custom topic or domain is created, you must specify the schema that will be used when publishing events.

Event Grid supports multiple schemas for encoding events.

Event Grid schema

While you may configure your topic to use a custom schema, it is more common to use the already-defined Event Grid schema. See the specifications and requirements here.

CloudEvents v1.0 schema

Another option is to use the CloudEvents v1.0 schema. CloudEvents is a Cloud Native Computing Foundation project which produces a specification for describing event data in a common way. The service summary of CloudEvents can be found here.

EventGridPublisherClient

EventGridPublisherClient provides operations to send event data to a topic hostname specified during client initialization.

Regardless of the schema that your topic or domain is configured to use, EventGridPublisherClient will be used to publish events to it. Use the send method publishing events.

The following formats of events are allowed to be sent:

  • A list or a single instance of strongly typed EventGridEvents.

  • A dict representation of a serialized EventGridEvent object.

  • A list or a single instance of strongly typed CloudEvents.

  • A dict representation of a serialized CloudEvent object.

  • A dict representation of any Custom Schema.

Please have a look at the samples for detailed examples.

Note: It is important to know if your topic supports CloudEvents or EventGridEvents before publishing. If you send to a topic that does not support the schema of the event you are sending, send() will throw an exception.

System Topics

A system topic in Event Grid represents one or more events published by Azure services such as Azure Storage or Azure Event Hubs. For example, a system topic may represent all blob events or only blob creation and blob deletion events published for a specific storage account.

The names of the various event types for the system events published to Azure Event Grid are available in azure.eventgrid.SystemEventNames. For complete list of recognizable system topics, visit System Topics.

For more information about the key concepts on Event Grid, see Concepts in Azure Event Grid.

Event Grid on Kubernetes with Azure Arc

Event Grid on Kubernetes with Azure Arc is an offering that allows you to run Event Grid on your own Kubernetes cluster. This capability is enabled by the use of Azure Arc enabled Kubernetes. Through Azure Arc enabled Kubernetes, a supported Kubernetes cluster connects to Azure. Once connected, you are able to install Event Grid on it. Learn more about it here.

Support for CNCF Cloud Events

Starting with v4.7.0, this package also supports publishing a CNCF cloud event from https://pypi.org/project/cloudevents/. You would be able to pass a CloudEvent object from this library to the send API.

from cloudevents.http import CloudEvent

event = CloudEvent(...)

client.send(event)

Examples

The following sections provide several code snippets covering some of the most common Event Grid tasks, including:

Send an Event Grid Event

This example publishes an Event Grid event.

import os
from azure.core.credentials import AzureKeyCredential
from azure.eventgrid import EventGridPublisherClient, EventGridEvent

key = os.environ["EG_ACCESS_KEY"]
endpoint = os.environ["EG_TOPIC_HOSTNAME"]

event = EventGridEvent(
    data={"team": "azure-sdk"},
    subject="Door1",
    event_type="Azure.Sdk.Demo",
    data_version="2.0"
)

credential = AzureKeyCredential(key)
client = EventGridPublisherClient(endpoint, credential)

client.send(event)

Send a Cloud Event

This example publishes a Cloud event.

import os
from azure.core.credentials import AzureKeyCredential
from azure.core.messaging import CloudEvent
from azure.eventgrid import EventGridPublisherClient

key = os.environ["CLOUD_ACCESS_KEY"]
endpoint = os.environ["CLOUD_TOPIC_HOSTNAME"]

event = CloudEvent(
    type="Azure.Sdk.Sample",
    source="https://egsample.dev/sampleevent",
    data={"team": "azure-sdk"}
)

credential = AzureKeyCredential(key)
client = EventGridPublisherClient(endpoint, credential)

client.send(event)

Send Multiple events

It is possible to send events as a batch when sending multiple events to a topic or a domain. This example sends a list of CloudEvents using the send method.

WARNING: When sending a list of multiple events at one time, iterating over and sending each event will not result in optimal performance. For best performance, it is highly recommended to send a list of events.

import os
from azure.core.credentials import AzureKeyCredential
from azure.core.messaging import CloudEvent
from azure.eventgrid import EventGridPublisherClient

key = os.environ["CLOUD_ACCESS_KEY"]
endpoint = os.environ["CLOUD_TOPIC_HOSTNAME"]

event0 = CloudEvent(
    type="Azure.Sdk.Sample",
    source="https://egsample.dev/sampleevent",
    data={"team": "azure-sdk"}
)
event1 = CloudEvent(
    type="Azure.Sdk.Sample",
    source="https://egsample.dev/sampleevent",
    data={"team2": "azure-eventgrid"}
)

events = [event0, event1]

credential = AzureKeyCredential(key)
client = EventGridPublisherClient(endpoint, credential)

client.send(events)

Send events as dictionaries

A dict representation of respective serialized models can also be used to publish CloudEvent(s) or EventGridEvent(s) apart from the strongly typed objects.

Use a dict-like representation to send to a topic with custom schema as shown below.

import os
import uuid
import datetime as dt
from msrest.serialization import UTC
from azure.core.credentials import AzureKeyCredential
from azure.eventgrid import EventGridPublisherClient

key = os.environ["CUSTOM_SCHEMA_ACCESS_KEY"]
endpoint = os.environ["CUSTOM_SCHEMA_TOPIC_HOSTNAME"]

event = custom_schema_event = {
    "customSubject": "sample",
    "customEventType": "sample.event",
    "customDataVersion": "2.0",
    "customId": uuid.uuid4(),
    "customEventTime": dt.datetime.now(UTC()).isoformat(),
    "customData": "sample data"
    }

credential = AzureKeyCredential(key)
client = EventGridPublisherClient(endpoint, credential)

client.send(event)

Consume from storage queue

This example consumes a message received from storage queue and deserializes it to a CloudEvent object.

from azure.core.messaging import CloudEvent
from azure.storage.queue import QueueServiceClient, BinaryBase64DecodePolicy
import os
import json

# all types of CloudEvents below produce same DeserializedEvent
connection_str = os.environ['STORAGE_QUEUE_CONN_STR']
queue_name = os.environ['STORAGE_QUEUE_NAME']

with QueueServiceClient.from_connection_string(connection_str) as qsc:
    payload =  qsc.get_queue_client(
        queue=queue_name,
        message_decode_policy=BinaryBase64DecodePolicy()
        ).peek_messages()

    ## deserialize payload into a list of typed Events
    events = [CloudEvent.from_dict(json.loads(msg.content)) for msg in payload]

Consume from servicebus

This example consumes a payload message received from ServiceBus and deserializes it to an EventGridEvent object.

from azure.eventgrid import EventGridEvent
from azure.servicebus import ServiceBusClient
import os
import json

# all types of EventGridEvents below produce same DeserializedEvent
connection_str = os.environ['SERVICE_BUS_CONN_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']

with ServiceBusClient.from_connection_string(connection_str) as sb_client:
    payload =  sb_client.get_queue_receiver(queue_name).receive_messages()

    ## deserialize payload into a list of typed Events
    events = [EventGridEvent.from_dict(json.loads(next(msg.body).decode('utf-8'))) for msg in payload]

Distributed Tracing with EventGrid

You can use OpenTelemetry for Python as usual with EventGrid since it's compatible with azure-core tracing integration.

Here is an example of using OpenTelemetry to trace sending a CloudEvent.

First, set OpenTelemetry as enabled tracing plugin for EventGrid.

from azure.core.settings import settings
from azure.core.tracing.ext.opentelemetry_span import OpenTelemetrySpan

settings.tracing_implementation = OpenTelemetrySpan

Regular open telemetry usage from here. See OpenTelemetry for details. This example uses a simple console exporter to export the traces. Any exporter can be used here including azure-monitor-opentelemetry-exporter, jaeger, zipkin etc.

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter
from opentelemetry.sdk.trace.export import SimpleSpanProcessor  # this requires opentelemetry >= 1.0.0

# Simple console exporter
exporter = ConsoleSpanExporter()

trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
trace.get_tracer_provider().add_span_processor(
    SimpleSpanProcessor(exporter)
)

Once the tracer and exporter are set, please follow the example below to start collecting traces while using the send method from the EventGridPublisherClient to send a CloudEvent object.

import os
from azure.eventgrid import EventGridPublisherClient
from azure.core.messaging import CloudEvent
from azure.core.credentials import AzureKeyCredential

hostname = os.environ['CLOUD_TOPIC_HOSTNAME']
key = AzureKeyCredential(os.environ['CLOUD_ACCESS_KEY'])
cloud_event = CloudEvent(
    source = 'demo',
    type = 'sdk.demo',
    data = {'test': 'hello'},
)
with tracer.start_as_current_span(name="MyApplication"):
    client = EventGridPublisherClient(hostname, key)
    client.send(cloud_event)

Troubleshooting

  • Enable azure.eventgrid logger to collect traces from the library.

General

Event Grid client library will raise exceptions defined in Azure Core.

Logging

This library uses the standard logging library for logging. Basic information about HTTP sessions (URLs, headers, etc.) is logged at INFO level.

Optional Configuration

Optional keyword arguments can be passed in at the client and per-operation level. The azure-core reference documentation describes available configurations for retries, logging, transport protocols, and more.

Next steps

The following section provides several code snippets illustrating common patterns used in the Event Grid Python API.

More sample code

These code samples show common champion scenario operations with the Azure Event Grid client library.

The following samples cover publishing and consuming dict representations of EventGridEvents and CloudEvents.

More samples can be found here.

  • More samples related to the send scenario can be seen here.
  • To see more samples related to consuming a payload from different messaging services as a typed object, please visit Consume Samples

Additional documentation

For more extensive documentation on Azure Event Grid, see the Event Grid documentation on docs.microsoft.com.

Contributing

This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit cla.microsoft.com.

When you submit a pull request, a CLA-bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repos using our CLA.

This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact opencode@microsoft.com with any additional questions or comments.