Azure Event Grid is a fully-managed intelligent event routing service that allows for uniform event consumption using a publish-subscribe model.
Source code | Package (PyPI) | Package (Conda) | API reference documentation | Product documentation | Samples | Changelog
- Python 3.7 or later is required to use this package.
- You must have an Azure subscription and an Event Grid Topic resource to use this package. Follow this step-by-step tutorial to register the Event Grid resource provider and create Event Grid topics using the Azure portal. There is a similar tutorial using Azure CLI.
Install the Azure Event Grid client library for Python with pip:
pip install azure-eventgrid
- An existing Event Grid topic or domain is required. You can create the resource using Azure Portal or Azure CLI
If you use Azure CLI, replace <resource-group-name>
and <resource-name>
with your own unique names.
az eventgrid topic --create --location <location> --resource-group <resource-group-name> --name <resource-name>
az eventgrid domain --create --location <location> --resource-group <resource-group-name> --name <resource-name>
In order to interact with the Event Grid service, you will need to create an instance of a client. An endpoint and credential are necessary to instantiate the client object.
Azure Event Grid provides integration with Azure Active Directory (Azure AD) for identity-based authentication of requests. With Azure AD, you can use role-based access control (RBAC) to grant access to your Azure Event Grid resources to users, groups, or applications.
To send events to a topic or domain with a TokenCredential
, the authenticated identity should have the "EventGrid Data Sender" role assigned.
With the azure-identity
package, you can seamlessly authorize requests in both development and production environments. To learn more about Azure Active Directory, see the azure-identity
README.
For example, you can use DefaultAzureCredential
to construct a client which will authenticate using Azure Active Directory:
from azure.identity import DefaultAzureCredential
from azure.eventgrid import EventGridPublisherClient, EventGridEvent
default_az_credential = DefaultAzureCredential()
endpoint = os.environ["EVENTGRID_TOPIC_ENDPOINT"]
client = EventGridPublisherClient(endpoint, default_az_credential)
You can find the topic endpoint within the Event Grid Topic resource on the Azure portal. This will look like:
"https://<event-grid-topic-name>.<topic-location>.eventgrid.azure.net/api/events"
To use an Access key as the credential
parameter,
pass the key as a string into an instance of AzureKeyCredential.
Note: The Access Key may be found in the azure portal in the "Access Keys" menu of the Event Grid Topic resource. They may also be obtained via the azure CLI, or the
azure-mgmt-eventgrid
library. A guide for getting access keys can be found here.
import os
from azure.eventgrid import EventGridPublisherClient
from azure.core.credentials import AzureKeyCredential
topic_key = os.environ["EVENTGRID_TOPIC_KEY"]
endpoint = os.environ["EVENTGRID_TOPIC_ENDPOINT"]
credential_key = AzureKeyCredential(topic_key)
client = EventGridPublisherClient(endpoint, credential_key)
Note: A client may also be authenticated via SAS signature, using the
AzureSasCredential
. A sample demonstrating this, is available here (async_version).
Note: The
generate_sas
method can be used to generate a shared access signature. A sample demonstrating this can be seen here.
A topic is a channel within the EventGrid service to send events. The event schema that a topic accepts is decided at topic creation time. If events of a schema type are sent to a topic that requires a different schema type, errors will be raised.
An event domain is a management tool for large numbers of Event Grid topics related to the same application. They allow you to publish events to thousands of topics. Domains also give you authorization and authentication control over each topic. For more information, visit Event domain overview.
When you create an event domain, a publishing endpoint for this domain is made available to you. This process is similar to creating an Event Grid Topic. The only difference is that, when publishing to a domain, you must specify the topic within the domain that you'd like the event to be delivered to.
An event is the smallest amount of information that fully describes something that happened in the system. When a custom topic or domain is created, you must specify the schema that will be used when publishing events.
Event Grid supports multiple schemas for encoding events.
While you may configure your topic to use a custom schema, it is more common to use the already-defined Event Grid schema. See the specifications and requirements here.
Another option is to use the CloudEvents v1.0 schema. CloudEvents is a Cloud Native Computing Foundation project which produces a specification for describing event data in a common way. The service summary of CloudEvents can be found here.
EventGridPublisherClient
provides operations to send event data to a topic hostname specified during client initialization.
Regardless of the schema that your topic or domain is configured to use, EventGridPublisherClient
will be used to publish events to it. Use the send
method publishing events.
The following formats of events are allowed to be sent:
-
A list or a single instance of strongly typed EventGridEvents.
-
A dict representation of a serialized EventGridEvent object.
-
A list or a single instance of strongly typed CloudEvents.
-
A dict representation of a serialized CloudEvent object.
-
A dict representation of any Custom Schema.
Please have a look at the samples for detailed examples.
Note: It is important to know if your topic supports CloudEvents or EventGridEvents before publishing. If you send to a topic that does not support the schema of the event you are sending, send() will throw an exception.
A system topic in Event Grid represents one or more events published by Azure services such as Azure Storage or Azure Event Hubs. For example, a system topic may represent all blob events or only blob creation and blob deletion events published for a specific storage account.
The names of the various event types for the system events published to Azure Event Grid are available in azure.eventgrid.SystemEventNames
.
For complete list of recognizable system topics, visit System Topics.
For more information about the key concepts on Event Grid, see Concepts in Azure Event Grid.
Event Grid on Kubernetes with Azure Arc is an offering that allows you to run Event Grid on your own Kubernetes cluster. This capability is enabled by the use of Azure Arc enabled Kubernetes. Through Azure Arc enabled Kubernetes, a supported Kubernetes cluster connects to Azure. Once connected, you are able to install Event Grid on it. Learn more about it here.
Starting with v4.7.0, this package also supports publishing a CNCF cloud event from https://pypi.org/project/cloudevents/. You would be able to pass a CloudEvent object from this library to the send
API.
from cloudevents.http import CloudEvent
event = CloudEvent(...)
client.send(event)
The following sections provide several code snippets covering some of the most common Event Grid tasks, including:
- Send an Event Grid Event
- Send a Cloud Event
- Send Multiple Events
- Send events as Dictionaries
- Consume a payload from storage queue
- Consume from ServiceBus
This example publishes an Event Grid event.
import os
from azure.core.credentials import AzureKeyCredential
from azure.eventgrid import EventGridPublisherClient, EventGridEvent
key = os.environ["EG_ACCESS_KEY"]
endpoint = os.environ["EG_TOPIC_HOSTNAME"]
event = EventGridEvent(
data={"team": "azure-sdk"},
subject="Door1",
event_type="Azure.Sdk.Demo",
data_version="2.0"
)
credential = AzureKeyCredential(key)
client = EventGridPublisherClient(endpoint, credential)
client.send(event)
This example publishes a Cloud event.
import os
from azure.core.credentials import AzureKeyCredential
from azure.core.messaging import CloudEvent
from azure.eventgrid import EventGridPublisherClient
key = os.environ["CLOUD_ACCESS_KEY"]
endpoint = os.environ["CLOUD_TOPIC_HOSTNAME"]
event = CloudEvent(
type="Azure.Sdk.Sample",
source="https://egsample.dev/sampleevent",
data={"team": "azure-sdk"}
)
credential = AzureKeyCredential(key)
client = EventGridPublisherClient(endpoint, credential)
client.send(event)
It is possible to send events as a batch when sending multiple events to a topic or a domain. This example sends a list of CloudEvents using the send method.
WARNING: When sending a list of multiple events at one time, iterating over and sending each event will not result in optimal performance. For best performance, it is highly recommended to send a list of events.
import os
from azure.core.credentials import AzureKeyCredential
from azure.core.messaging import CloudEvent
from azure.eventgrid import EventGridPublisherClient
key = os.environ["CLOUD_ACCESS_KEY"]
endpoint = os.environ["CLOUD_TOPIC_HOSTNAME"]
event0 = CloudEvent(
type="Azure.Sdk.Sample",
source="https://egsample.dev/sampleevent",
data={"team": "azure-sdk"}
)
event1 = CloudEvent(
type="Azure.Sdk.Sample",
source="https://egsample.dev/sampleevent",
data={"team2": "azure-eventgrid"}
)
events = [event0, event1]
credential = AzureKeyCredential(key)
client = EventGridPublisherClient(endpoint, credential)
client.send(events)
A dict representation of respective serialized models can also be used to publish CloudEvent(s) or EventGridEvent(s) apart from the strongly typed objects.
Use a dict-like representation to send to a topic with custom schema as shown below.
import os
import uuid
import datetime as dt
from msrest.serialization import UTC
from azure.core.credentials import AzureKeyCredential
from azure.eventgrid import EventGridPublisherClient
key = os.environ["CUSTOM_SCHEMA_ACCESS_KEY"]
endpoint = os.environ["CUSTOM_SCHEMA_TOPIC_HOSTNAME"]
event = custom_schema_event = {
"customSubject": "sample",
"customEventType": "sample.event",
"customDataVersion": "2.0",
"customId": uuid.uuid4(),
"customEventTime": dt.datetime.now(UTC()).isoformat(),
"customData": "sample data"
}
credential = AzureKeyCredential(key)
client = EventGridPublisherClient(endpoint, credential)
client.send(event)
This example consumes a message received from storage queue and deserializes it to a CloudEvent object.
from azure.core.messaging import CloudEvent
from azure.storage.queue import QueueServiceClient, BinaryBase64DecodePolicy
import os
import json
# all types of CloudEvents below produce same DeserializedEvent
connection_str = os.environ['STORAGE_QUEUE_CONN_STR']
queue_name = os.environ['STORAGE_QUEUE_NAME']
with QueueServiceClient.from_connection_string(connection_str) as qsc:
payload = qsc.get_queue_client(
queue=queue_name,
message_decode_policy=BinaryBase64DecodePolicy()
).peek_messages()
## deserialize payload into a list of typed Events
events = [CloudEvent.from_dict(json.loads(msg.content)) for msg in payload]
This example consumes a payload message received from ServiceBus and deserializes it to an EventGridEvent object.
from azure.eventgrid import EventGridEvent
from azure.servicebus import ServiceBusClient
import os
import json
# all types of EventGridEvents below produce same DeserializedEvent
connection_str = os.environ['SERVICE_BUS_CONN_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']
with ServiceBusClient.from_connection_string(connection_str) as sb_client:
payload = sb_client.get_queue_receiver(queue_name).receive_messages()
## deserialize payload into a list of typed Events
events = [EventGridEvent.from_dict(json.loads(next(msg.body).decode('utf-8'))) for msg in payload]
You can use OpenTelemetry for Python as usual with EventGrid since it's compatible with azure-core tracing integration.
Here is an example of using OpenTelemetry to trace sending a CloudEvent.
First, set OpenTelemetry as enabled tracing plugin for EventGrid.
from azure.core.settings import settings
from azure.core.tracing.ext.opentelemetry_span import OpenTelemetrySpan
settings.tracing_implementation = OpenTelemetrySpan
Regular open telemetry usage from here. See OpenTelemetry for details.
This example uses a simple console exporter to export the traces. Any exporter can be used here including azure-monitor-opentelemetry-exporter
, jaeger
, zipkin
etc.
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter
from opentelemetry.sdk.trace.export import SimpleSpanProcessor # this requires opentelemetry >= 1.0.0
# Simple console exporter
exporter = ConsoleSpanExporter()
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
trace.get_tracer_provider().add_span_processor(
SimpleSpanProcessor(exporter)
)
Once the tracer
and exporter
are set, please follow the example below to start collecting traces while using the send
method from the EventGridPublisherClient
to send a CloudEvent object.
import os
from azure.eventgrid import EventGridPublisherClient
from azure.core.messaging import CloudEvent
from azure.core.credentials import AzureKeyCredential
hostname = os.environ['CLOUD_TOPIC_HOSTNAME']
key = AzureKeyCredential(os.environ['CLOUD_ACCESS_KEY'])
cloud_event = CloudEvent(
source = 'demo',
type = 'sdk.demo',
data = {'test': 'hello'},
)
with tracer.start_as_current_span(name="MyApplication"):
client = EventGridPublisherClient(hostname, key)
client.send(cloud_event)
- Enable
azure.eventgrid
logger to collect traces from the library.
Event Grid client library will raise exceptions defined in Azure Core.
This library uses the standard logging library for logging. Basic information about HTTP sessions (URLs, headers, etc.) is logged at INFO level.
Optional keyword arguments can be passed in at the client and per-operation level. The azure-core reference documentation describes available configurations for retries, logging, transport protocols, and more.
The following section provides several code snippets illustrating common patterns used in the Event Grid Python API.
These code samples show common champion scenario operations with the Azure Event Grid client library.
-
Generate Shared Access Signature: sample_generate_sas.py
-
Authenticate the client: sample_authentication.py (async_version)
-
Publish events to a topic using SAS: sample_publish_events_to_a_topic_using_sas_credential_async.py (async_version)
-
Publish Event Grid Events to a topic: sample_publish_eg_events_to_a_topic.py (async_version)
-
Publish EventGrid Events to a domain topic: sample_publish_eg_events_to_a_domain_topic.py (async_version)
-
Publish a Cloud Event: sample_publish_events_using_cloud_events_1.0_schema.py (async_version)
-
Publish a Custom Schema: sample_publish_custom_schema_to_a_topic.py (async_version)
The following samples cover publishing and consuming dict
representations of EventGridEvents and CloudEvents.
-
Publish EventGridEvent as dict like representation: sample_publish_eg_event_using_dict.py (async_version)
-
Publish CloudEvent as dict like representation: sample_publish_cloud_event_using_dict.py (async_version)
-
Consume a Custom Payload of raw cloudevent data: sample_consume_custom_payload.py
More samples can be found here.
- More samples related to the send scenario can be seen here.
- To see more samples related to consuming a payload from different messaging services as a typed object, please visit Consume Samples
For more extensive documentation on Azure Event Grid, see the Event Grid documentation on docs.microsoft.com.
This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit cla.microsoft.com.
When you submit a pull request, a CLA-bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repos using our CLA.
This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact opencode@microsoft.com with any additional questions or comments.