diff --git a/stomp/adapter/ws.py b/stomp/adapter/ws.py index 59515aeb..2dd8b2ca 100644 --- a/stomp/adapter/ws.py +++ b/stomp/adapter/ws.py @@ -5,6 +5,9 @@ import time from time import monotonic import websocket +import json +from .ws_utils import SockJSFrame + try: from socket import SOL_SOCKET, SO_KEEPALIVE, SOL_TCP, TCP_KEEPIDLE, TCP_KEEPINTVL, TCP_KEEPCNT @@ -95,7 +98,8 @@ def __init__(self, is_eol_fc=is_eol_default, bind_host_port=None, ws_path=None, - header=None): + header=None, + sockjs_transport=False): BaseTransport.__init__(self, auto_decode, encoding, is_eol_fc) if host_and_ports is None: @@ -147,6 +151,7 @@ def __init__(self, self.vhost = vhost self.ws_path = ws_path self.header = header + self.__sockjs = sockjs_transport # setup SSL self.__ssl_params = {} @@ -213,7 +218,12 @@ def send(self, encoded_frame): if self.socket is not None: try: with self.__socket_semaphore: - self.socket.send(encoded_frame) + if self.__sockjs is False: + self.socket.send(encoded_frame) + else: + frame = encoded_frame.decode() + frame = json.dumps([frame]) + self.socket.send(frame) except Exception: _, e, _ = sys.exc_info() logging.error("error sending frame", exc_info=True) @@ -226,7 +236,16 @@ def receive(self): :rtype: bytes """ try: - return self.socket.recv().encode() + frame = self.socket.recv().encode() + if self.__sockjs is False: + return frame + else: + if frame == SockJSFrame.HEARTBEAT or frame == SockJSFrame.OPEN: + return b'\x0a' # EOL should be treated as HEARTBEAT by the receiver + if frame == SockJSFrame.ARRAY: + return json.loads(frame.decode()[2:-1]).encode() + if frame == SockJSFrame.CLOSE: + return None except socket.error: _, e, _ = sys.exc_info() if get_errno(e) in (errno.EAGAIN, errno.EINTR): @@ -429,6 +448,11 @@ class WSStompConnection(StompConnection12): """ Represents a 1.2 connection (comprising transport plus 1.2 protocol class). See :py:class:`stomp.transport.Transport` for details on the initialisation parameters. + + :param ws_path: Websocket path (after the base url :/) + :type ws_path: str + :param sock_js: Enable SockJS protocol + :type sock_js: bool """ def __init__(self, host_and_ports=None, @@ -450,12 +474,13 @@ def __init__(self, bind_host_port=None, ws=None, ws_path=None, - header=None): + header=None, + sock_js=False): transport = WSTransport(host_and_ports, prefer_localhost, try_loopback_connect, reconnect_sleep_initial, reconnect_sleep_increase, reconnect_sleep_jitter, reconnect_sleep_max, reconnect_attempts_max, timeout, keepalive, vhost, auto_decode, encoding, bind_host_port=bind_host_port, - header=header, ws_path=ws_path) + header=header, ws_path=ws_path, sockjs_transport=sock_js) BaseConnection.__init__(self, transport) Protocol12.__init__(self, transport, heartbeats, auto_content_length, heart_beat_receive_scale=heart_beat_receive_scale) diff --git a/stomp/adapter/ws_utils.py b/stomp/adapter/ws_utils.py new file mode 100644 index 00000000..610f7d83 --- /dev/null +++ b/stomp/adapter/ws_utils.py @@ -0,0 +1,14 @@ +import enum + + +class SockJSFrame(bytes, enum.Enum): + + OPEN = b'o' + CLOSE = b'c' + ARRAY = b'a' + HEARTBEAT = b'h' + + def __eq__(self, other): + if not isinstance(other, bytes): + return False + return other.startswith(self)