Skip to content

Commit

Permalink
use message queue for incoming messages
Browse files Browse the repository at this point in the history
Signed-off-by: hwipl <33433250+hwipl@users.noreply.github.com>
  • Loading branch information
hwipl committed Nov 11, 2020
1 parent e4ee33b commit 4639645
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 27 deletions.
18 changes: 1 addition & 17 deletions nuqql_based/account.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ def __init__(self, config: "Config", callbacks: "Callbacks",
self.status = "online"
self._buddies: List[Buddy] = []
self._buddies_lock = Lock()
self._messages: List[str] = []
self._messages_lock = Lock()
self._history: List[str] = []
self._history_lock = Lock()
self.config = config
Expand Down Expand Up @@ -66,28 +64,14 @@ def receive_msg(self, msg: str) -> None:
Receive a message from other users or the backend
"""

self._messages_lock.acquire()
self._messages.append(msg)
self._messages_lock.release()
self.queue.put(msg)

if Message.is_message(msg) and self.config.get_history():
# TODO: add timestamp?
self._history_lock.acquire()
self._history.append(msg)
self._history_lock.release()

def get_messages(self) -> List[str]:
"""
Get received messages as list
"""

self._messages_lock.acquire()
messages = self._messages[:]
self._messages = []
self._messages_lock.release()

return messages

def get_history(self) -> List[str]:
"""
Get the message history
Expand Down
19 changes: 9 additions & 10 deletions nuqql_based/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,11 @@ async def _handle_incoming(self, writer: asyncio.StreamWriter) -> None:
"""

# get messages from callback for each account
accounts = self.account_list.get()
for acc in accounts.values():
messages = acc.get_messages()
# TODO: this expects a list. change to string? document list req?
messages += self.callbacks.call(Callback.GET_MESSAGES, acc, ())
for msg in messages:
writer.write(msg.encode())
await writer.drain()
while True:
msg = await self.queue.get()
writer.write(msg.encode())
await writer.drain()
self.queue.task_done()

async def _handle_messages(self, reader: asyncio.StreamReader, writer:
asyncio.StreamWriter) -> str:
Expand Down Expand Up @@ -102,9 +99,10 @@ async def _handle_client(self, reader: asyncio.StreamReader, writer:
writer.write(accounts.encode())
await writer.drain()

# start sending incoming messages to client
inc_task = asyncio.create_task(self._handle_incoming(writer))

while True:
# handle incoming messages
await self._handle_incoming(writer)

# handle each complete message
cmd = await self._handle_messages(reader, writer)
Expand All @@ -113,6 +111,7 @@ async def _handle_client(self, reader: asyncio.StreamReader, writer:
if cmd == "bye":
# some error occured handling the messages or user said bye,
# drop the client
inc_task.cancel()
writer.close()
await writer.wait_closed()
self.connected = False
Expand Down

0 comments on commit 4639645

Please sign in to comment.