Skip to content

Commit

Permalink
Merge pull request #1375 from finos/call-loop
Browse files Browse the repository at this point in the history
Add `call_loop` and `get_table_names`
  • Loading branch information
texodus authored Apr 6, 2021
2 parents 28eddda + db62cbf commit 795f619
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 8 deletions.
8 changes: 4 additions & 4 deletions docs/md/python.md
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,10 @@ using a websocket API.
By default, `perspective` will run with a synchronous interface. Using the
`PerspectiveManager.set_loop_callback()` method, `perspective` can be configured
to defer the application of side-effectful calls like `update()` to an event
loop, such as `asyncio`. When running in Async mode, Perspective will release
the GIL for some operations, enabling better parallelism and overall better
server performance. There are a few important differences when running
`PerspectiveManager` in this mode:
loop, such as `tornado.ioloop.IOLoop`. When running in Async mode, Perspective
will release the GIL for some operations, enabling better parallelism and
overall better server performance. There are a few important differences when
running `PerspectiveManager` in this mode:

* Calls to methods like `update()` will return immediately, and the reciprocal
`on_update()` callbacks will be invoked on an event later scheduled. Calls
Expand Down
26 changes: 23 additions & 3 deletions python/perspective/perspective/manager/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
# the Apache License 2.0. The full license can be found in the LICENSE file.
#

import six
import random
import string
from functools import partial
Expand Down Expand Up @@ -103,9 +104,24 @@ def get_table(self, name):
"""Return a table under management by name."""
return self._tables.get(name, None)

def get_table_names(self):
"""Return the tables that are hosted with this manager by name."""
return list(six.iterkeys(self._tables))

def new_session(self):
return PerspectiveSession(self)

def call_loop(self, f, *args, **kwargs):
"""Calls `f()` on this `PerspectiveManager`'s event loop if it has one,
or raise a PerspectiveError if there is no loop callback set using
`set_loop_callback()`.
"""
if self._loop_callback is None:
raise PerspectiveError(
"Event loop not set on this PerspectiveManager - use set_loop_callback() before calling call_loop()."
)
return self._loop_callback(f, *args, **kwargs)

def set_loop_callback(self, loop_callback):
"""Sets this `PerspectiveManager` to run in Async mode, defering
`update()` application and releasing the GIL for expensive operations.
Expand All @@ -114,10 +130,14 @@ def set_loop_callback(self, loop_callback):
hosts must only be interacted with from the same thread.
Args:
loop_callback: A Function which accepts a Function arguments
and schedules it to run on the same thread on which
`set_loop_callback()` was originally invoked.
loop_callback: A function which accepts a function reference and
its args/kwargs, and schedules it to run on the same thread
on which set_loop_callback()` was originally invoked.
"""
if self._loop_callback is not None:
raise PerspectiveError("PerspectiveManager already has a `loop_callback`")
if not callable(loop_callback):
raise PerspectiveError("`loop_callback` must be a function")
self._loop_callback = loop_callback
for table in self._tables.values():
loop_callback(lambda: table._table.get_pool().set_event_loop())
Expand Down
38 changes: 37 additions & 1 deletion python/perspective/perspective/tests/core/test_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
import threading

from functools import partial
from perspective import Table, PerspectiveManager
from pytest import raises
from perspective import Table, PerspectiveManager, PerspectiveError


def syncify(f):
Expand Down Expand Up @@ -76,6 +77,41 @@ def _task():
assert _task() == 5
tbl.delete()

def test_async_call_loop(self):
tbl = Table({"a": int, "b": float, "c": str})
manager = PerspectiveManager()
manager.set_loop_callback(TestAsync.loop.add_callback)
manager.call_loop(tbl.update, data)
manager.host(tbl)

@syncify
def _task():
return tbl.size()

assert _task() == 10
tbl.delete()

def test_async_call_loop_error_if_no_loop(self):
tbl = Table({"a": int, "b": float, "c": str})
manager = PerspectiveManager()

with raises(PerspectiveError):
# loop not set - errors
manager.call_loop(tbl.update, data)

manager.set_loop_callback(TestAsync.loop.add_callback)
manager.call_loop(tbl.update, data)
manager.host(tbl)

@syncify
def _task():
return tbl.size()

# subsequent calls to call_loop will work if loop_callback is set.
assert _task() == 10

tbl.delete()

def test_async_multiple_managers_queue_process(self):
tbl = Table({"a": int, "b": float, "c": str})
tbl2 = Table({"a": int, "b": float, "c": str})
Expand Down
8 changes: 8 additions & 0 deletions python/perspective/perspective/tests/manager/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ def test_manager_host_invalid(self):
with raises(PerspectiveError):
manager.host({})

def test_manager_host(self):
manager = PerspectiveManager()
table = Table(data)
manager.host(table)
table.update({"a": [4, 5, 6], "b": ["d", "e", "f"]})
names = manager.get_table_names()
assert manager.get_table(names[0]).size() == 6

def test_manager_host_table_transitive(self):
manager = PerspectiveManager()
table = Table(data)
Expand Down

0 comments on commit 795f619

Please sign in to comment.