Skip to content

Commit

Permalink
[ServiceBus] Http proxy/transport type bug fixing and sample update (A…
Browse files Browse the repository at this point in the history
…zure#10721)

* 1.Http proxy/transport type bug fixing
2.sample update

* update changelog

* add test for proxy and websocket
  • Loading branch information
yunhaoling authored Apr 8, 2020
1 parent 2b2cfd4 commit 0225748
Show file tree
Hide file tree
Showing 8 changed files with 195 additions and 1 deletion.
3 changes: 3 additions & 0 deletions sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## 7.0.0b2 (Unreleased)

**BugFixes**

* Fig bug where http_proxy and transport_type in ServiceBusClient are not propagated into Sender/Receiver creation properly.

## 7.0.0b1 (2020-04-06)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ def get_queue_sender(self, queue_name, **kwargs):
queue_name=queue_name,
credential=self._credential,
logging_enable=self._config.logging_enable,
transport_type=self._config.transport_type,
http_proxy=self._config.http_proxy,
connection=self._connection,
**kwargs
)
Expand Down Expand Up @@ -213,6 +215,8 @@ def get_queue_receiver(self, queue_name, **kwargs):
queue_name=queue_name,
credential=self._credential,
logging_enable=self._config.logging_enable,
transport_type=self._config.transport_type,
http_proxy=self._config.http_proxy,
connection=self._connection,
**kwargs
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ def get_queue_sender(self, queue_name, **kwargs):
queue_name=queue_name,
credential=self._credential,
logging_enable=self._config.logging_enable,
transport_type=self._config.transport_type,
http_proxy=self._config.http_proxy,
connection=self._connection,
**kwargs
)
Expand Down Expand Up @@ -211,6 +213,8 @@ def get_queue_receiver(self, queue_name, **kwargs):
queue_name=queue_name,
credential=self._credential,
logging_enable=self._config.logging_enable,
transport_type=self._config.transport_type,
http_proxy=self._config.http_proxy,
connection=self._connection,
**kwargs
)
Expand Down
2 changes: 2 additions & 0 deletions sdk/servicebus/azure-servicebus/samples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ Both [sync version](./sync_samples) and [async version](./async_samples) of samp
- Receive messages from session-enabled queue
- [client_identity_authentication.py](./sync_samples/client_identity_authentication.py) ([async_version](./async_samples/client_identity_authentication_async.py)) - Examples to authenticate the client by Azure Activate Directory
- Authenticate and create the client utilizing the `azure.identity` library
- [proxy.py](./sync_samples/proxy.py) ([async_version](./async_samples/proxy_async.py)) - Examples to send message behind a proxy:
- Send message behind a proxy


## Prerequisites
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#!/usr/bin/env python

# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

"""
Example to show sending message through http proxy to a Service Bus Queue asynchronously.
"""

# pylint: disable=C0111

import os
import asyncio
from azure.servicebus import Message
from azure.servicebus.aio import ServiceBusClient

CONNECTION_STR = os.environ['SERVICE_BUS_CONNECTION_STR']
QUEUE_NAME = os.environ["SERVICE_BUS_QUEUE_NAME"]


HTTP_PROXY = {
'proxy_hostname': '127.0.0.1', # proxy hostname.
'proxy_port': 8899, # proxy port.
'username': 'admin', # username used for proxy authentication if needed.
'password': '123456' # password used for proxy authentication if needed.
}


async def send_single_message(sender):
message = Message("DATA" * 64)
await sender.send(message)


async def main():
servicebus_client = ServiceBusClient.from_connection_string(
conn_str=CONNECTION_STR,
http_proxy=HTTP_PROXY
)

async with servicebus_client:
sender = servicebus_client.get_queue_sender(queue_name=QUEUE_NAME)
async with sender:
await send_single_message(sender)

print("Send message is done.")


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
44 changes: 44 additions & 0 deletions sdk/servicebus/azure-servicebus/samples/sync_samples/proxy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#!/usr/bin/env python

# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

"""
Example to show sending message through http proxy to a Service Bus Queue.
"""

# pylint: disable=C0111

import os
from azure.servicebus import ServiceBusClient, Message

CONNECTION_STR = os.environ['SERVICE_BUS_CONNECTION_STR']
QUEUE_NAME = os.environ["SERVICE_BUS_QUEUE_NAME"]


HTTP_PROXY = {
'proxy_hostname': '127.0.0.1', # proxy hostname.
'proxy_port': 8899, # proxy port.
'username': 'admin', # username used for proxy authentication if needed.
'password': '123456' # password used for proxy authentication if needed.
}


def send_single_message(sender):
message = Message("DATA" * 64)
sender.send(message)


servicebus_client = ServiceBusClient.from_connection_string(
conn_str=CONNECTION_STR,
http_proxy=HTTP_PROXY
)

with servicebus_client:
sender = servicebus_client.get_queue_sender(queue_name=QUEUE_NAME)
with sender:
send_single_message(sender)

print("Send message is done.")
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
ServiceBusClient,
ReceivedMessage,
AutoLockRenew)
from azure.servicebus import TransportType
from azure.servicebus._common.message import Message, BatchMessage, PeekMessage
from azure.servicebus._common.constants import ReceiveSettleMode
from azure.servicebus._common.utils import utc_now
Expand Down Expand Up @@ -1033,3 +1034,45 @@ async def test_async_queue_cancel_scheduled_messages(self, servicebus_namespace_

messages = await receiver.receive(max_wait_time=120)
assert len(messages) == 0

@pytest.mark.liveTest
@pytest.mark.live_test_only
@CachedResourceGroupPreparer(name_prefix='servicebustest')
@CachedServiceBusNamespacePreparer(name_prefix='servicebustest')
@ServiceBusQueuePreparer(name_prefix='servicebustest', dead_lettering_on_message_expiration=True)
async def test_queue_message_amqp_over_websocket(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs):
async with ServiceBusClient.from_connection_string(
servicebus_namespace_connection_string,
transport_type=TransportType.AmqpOverWebsocket,
logging_enable=False) as sb_client:

async with sb_client.get_queue_sender(servicebus_queue.name) as sender:
assert sender._config.transport_type == TransportType.AmqpOverWebsocket
message = Message("Test")
await sender.send(message)

async with sb_client.get_queue_receiver(servicebus_queue.name, mode=ReceiveSettleMode.ReceiveAndDelete) as receiver:
assert receiver._config.transport_type == TransportType.AmqpOverWebsocket
messages = await receiver.receive(max_wait_time=5)
assert len(messages) == 1

def test_queue_message_http_proxy_setting(self):
mock_conn_str = "Endpoint=sb://mock.servicebus.windows.net/;SharedAccessKeyName=mock;SharedAccessKey=mock"
http_proxy = {
'proxy_hostname': '127.0.0.1',
'proxy_port': 8899,
'username': 'admin',
'password': '123456'
}

sb_client = ServiceBusClient.from_connection_string(mock_conn_str, http_proxy=http_proxy)
assert sb_client._config.http_proxy == http_proxy
assert sb_client._config.transport_type == TransportType.AmqpOverWebsocket

sender = sb_client.get_queue_sender(queue_name="mock")
assert sender._config.http_proxy == http_proxy
assert sender._config.transport_type == TransportType.AmqpOverWebsocket

receiver = sb_client.get_queue_receiver(queue_name="mock")
assert receiver._config.http_proxy == http_proxy
assert receiver._config.transport_type == TransportType.AmqpOverWebsocket
45 changes: 44 additions & 1 deletion sdk/servicebus/azure-servicebus/tests/test_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import uuid
from datetime import datetime, timedelta

from azure.servicebus import ServiceBusClient, AutoLockRenew
from azure.servicebus import ServiceBusClient, AutoLockRenew, TransportType
from azure.servicebus._common.message import Message, PeekMessage, ReceivedMessage, BatchMessage
from azure.servicebus._common.constants import ReceiveSettleMode, _X_OPT_LOCK_TOKEN
from azure.servicebus._common.utils import utc_now
Expand Down Expand Up @@ -1158,3 +1158,46 @@ def test_queue_cancel_scheduled_messages(self, servicebus_namespace_connection_s
print(str(m))
m.complete()
raise


@pytest.mark.liveTest
@pytest.mark.live_test_only
@CachedResourceGroupPreparer(name_prefix='servicebustest')
@CachedServiceBusNamespacePreparer(name_prefix='servicebustest')
@ServiceBusQueuePreparer(name_prefix='servicebustest', dead_lettering_on_message_expiration=True)
def test_queue_message_amqp_over_websocket(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs):
with ServiceBusClient.from_connection_string(
servicebus_namespace_connection_string,
transport_type=TransportType.AmqpOverWebsocket,
logging_enable=False) as sb_client:

with sb_client.get_queue_sender(servicebus_queue.name) as sender:
assert sender._config.transport_type == TransportType.AmqpOverWebsocket
message = Message("Test")
sender.send(message)

with sb_client.get_queue_receiver(servicebus_queue.name, mode=ReceiveSettleMode.ReceiveAndDelete) as receiver:
assert receiver._config.transport_type == TransportType.AmqpOverWebsocket
messages = receiver.receive(max_wait_time=5)
assert len(messages) == 1

def test_queue_message_http_proxy_setting(self):
mock_conn_str = "Endpoint=sb://mock.servicebus.windows.net/;SharedAccessKeyName=mock;SharedAccessKey=mock"
http_proxy = {
'proxy_hostname': '127.0.0.1',
'proxy_port': 8899,
'username': 'admin',
'password': '123456'
}

sb_client = ServiceBusClient.from_connection_string(mock_conn_str, http_proxy=http_proxy)
assert sb_client._config.http_proxy == http_proxy
assert sb_client._config.transport_type == TransportType.AmqpOverWebsocket

sender = sb_client.get_queue_sender(queue_name="mock")
assert sender._config.http_proxy == http_proxy
assert sender._config.transport_type == TransportType.AmqpOverWebsocket

receiver = sb_client.get_queue_receiver(queue_name="mock")
assert receiver._config.http_proxy == http_proxy
assert receiver._config.transport_type == TransportType.AmqpOverWebsocket

0 comments on commit 0225748

Please sign in to comment.