Skip to content

Commit

Permalink
Upgrade csp to perspective 3.x
Browse files Browse the repository at this point in the history
Signed-off-by: Pascal Tomecek <pascal.tomecek@cubistsystematic.com>
Signed-off-by: Tim Paine <3105306+timkpaine@users.noreply.github.com>
  • Loading branch information
ptomecek authored and timkpaine committed Nov 17, 2024
1 parent 8274827 commit 8030fcb
Show file tree
Hide file tree
Showing 6 changed files with 255 additions and 96 deletions.
10 changes: 7 additions & 3 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -670,9 +670,9 @@ jobs:
python-version:
- 3.9
package:
- "sqlalchemy>=2"
- "sqlalchemy<2"
- "numpy==1.19.5"
- "perspective-python<3"

runs-on: ${{ matrix.os }}

Expand Down Expand Up @@ -709,11 +709,15 @@ jobs:

- name: Python Test Steps
run: make test TEST_ARGS="-k TestDBReader"
if: ${{ contains( 'sqlalchemy', matrix.package )}}
if: ${{ contains( matrix.package, 'sqlalchemy' )}}

- name: Python Test Steps
run: make test
if: ${{ contains( 'numpy', matrix.package )}}
if: ${{ contains( matrix.package, 'numpy' )}}

- name: Python Test Steps
run: make test TEST_ARGS="-k Perspective"
if: ${{ contains( matrix.package, 'perspective' )}}

###########################################################################################################
#.........................................................................................................#
Expand Down
48 changes: 33 additions & 15 deletions csp/adapters/perspective.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import threading
from datetime import timedelta
from packaging import version
from typing import Dict, Optional, Union

import csp
Expand All @@ -13,21 +14,25 @@
except ImportError:
raise ImportError("perspective adapter requires tornado package")


try:
from perspective import PerspectiveManager, Table as Table_, View as View_, __version__, set_threadpool_size
from perspective import Server, Table as Table_, View as View_, __version__, set_threadpool_size

if version.parse(__version__) >= version.parse("3"):
_PERSPECTIVE_3 = True
elif version.parse(__version__) >= version.parse("0.6.2"):
from perspective import PerspectiveManager

MAJOR, MINOR, PATCH = map(int, __version__.split("."))
if (MAJOR, MINOR, PATCH) < (0, 6, 2):
_PERSPECTIVE_3 = False
else:
raise ImportError("perspective adapter requires 0.6.2 or greater of the perspective-python package")
except ImportError:
raise ImportError("perspective adapter requires 0.6.2 or greater of the perspective-python package")


# Run perspective update in a separate tornado loop
def perspective_thread(manager):
def perspective_thread(client):
loop = tornado.ioloop.IOLoop()
manager.set_loop_callback(loop.add_callback)
client.set_loop_callback(loop.add_callback)
loop.start()


Expand All @@ -54,7 +59,7 @@ def _apply_updates(table: object, data: {str: ts[object]}, throttle: timedelta):


@csp.node
def _launch_application(port: int, manager: object, stub: ts[object]):
def _launch_application(port: int, server: object, stub: ts[object]):
with csp.state():
s_app = None
s_ioloop = None
Expand All @@ -63,10 +68,14 @@ def _launch_application(port: int, manager: object, stub: ts[object]):
with csp.start():
from perspective import PerspectiveTornadoHandler

if _PERSPECTIVE_3:
handler_args = {"perspective_server": server, "check_origin": True}
else:
handler_args = {"manager": server, "check_origin": True}
s_app = tornado.web.Application(
[
# create a websocket endpoint that the client Javascript can access
(r"/websocket", PerspectiveTornadoHandler, {"manager": manager, "check_origin": True})
(r"/websocket", PerspectiveTornadoHandler, handler_args)
],
websocket_ping_interval=15,
)
Expand Down Expand Up @@ -197,20 +206,29 @@ def create_table(self, name, limit=None, index=None):

def _instantiate(self):
set_threadpool_size(self._threadpool_size)

manager = PerspectiveManager()

thread = threading.Thread(target=perspective_thread, kwargs=dict(manager=manager))
if _PERSPECTIVE_3:
server = Server()
client = server.new_local_client()
thread = threading.Thread(target=perspective_thread, kwargs=dict(client=client))
else:
manager = PerspectiveManager()
thread = threading.Thread(target=perspective_thread, kwargs=dict(manager=manager))
thread.daemon = True
thread.start()

for table_name, table in self._tables.items():
schema = {
k: v.tstype.typ if not issubclass(v.tstype.typ, csp.Enum) else str for k, v in table.columns.items()
}
ptable = Table(schema, limit=table.limit, index=table.index)
manager.host_table(table_name, ptable)
if _PERSPECTIVE_3:
ptable = Table(schema, limit=table.limit, index=table.index)
manager.host_table(table_name, ptable)
else:
ptable = client.table(schema, name=table_name, limit=table.limit, index=table.index)

_apply_updates(ptable, table.columns, self._throttle)

_launch_application(self._port, manager, csp.const("stub"))
if _PERSPECTIVE_3:
_launch_application(self._port, server, csp.const("stub"))
else:
_launch_application(self._port, manager, csp.const("stub"))
40 changes: 33 additions & 7 deletions csp/dataframe.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import datetime, timedelta
from datetime import date, datetime, timedelta
from packaging import version
from typing import Dict, Optional

import csp.baselib
Expand All @@ -12,6 +13,7 @@ class DataFrame:
def __init__(self, data: Optional[Dict] = None):
self._data = data or {}
self._columns = list(self._data.keys())
self._psp_client = None

@property
def columns(self):
Expand Down Expand Up @@ -204,10 +206,17 @@ def to_perspective(self, starttime: datetime, endtime: datetime = None, realtime
try:
import perspective

if version.parse(perspective.__version__) >= version.parse("3"):
_PERSPECTIVE_3 = True
from perspective.widget import PerspectiveWidget
else:
_PERSPECTIVE_3 = False
from perspective import PerspectiveWidget

global RealtimePerspectiveWidget
if RealtimePerspectiveWidget is None:

class RealtimePerspectiveWidget(perspective.PerspectiveWidget):
class RealtimePerspectiveWidget(PerspectiveWidget):
def __init__(self, engine_runner, *args, **kwargs):
super().__init__(*args, **kwargs)
self._runner = engine_runner
Expand All @@ -222,14 +231,14 @@ def join(self):
self._runner.join()

except ImportError:
raise ImportError("eval_perspective requires perspective-python installed")
raise ImportError("to_perspective requires perspective-python installed")

if not realtime:
df = self.to_pandas(starttime, endtime)
return perspective.PerspectiveWidget(df.ffill(), plugin="Y Line", columns=self._columns, group_by="index")
return PerspectiveWidget(df.ffill(), plugin="Y Line", columns=self._columns, group_by="index")

@csp.node
def apply_updates(table: object, data: {str: csp.ts[object]}, timecol: str, throttle: timedelta):
def apply_updates(table: object, data: Dict[str, csp.ts[object]], timecol: str, throttle: timedelta):
with csp.alarms():
alarm = csp.alarm(bool)
with csp.state():
Expand All @@ -240,7 +249,10 @@ def apply_updates(table: object, data: {str: csp.ts[object]}, timecol: str, thro

if csp.ticked(data):
s_buffer.append(dict(data.tickeditems()))
s_buffer[-1][timecol] = csp.now()
if _PERSPECTIVE_3:
s_buffer[-1][timecol] = int(csp.now().timestamp() * 1000)
else:
s_buffer[-1][timecol] = csp.now()

if csp.ticked(alarm):
if len(s_buffer) > 0:
Expand All @@ -252,7 +264,21 @@ def apply_updates(table: object, data: {str: csp.ts[object]}, timecol: str, thro
timecol = "time"
schema = {k: v.tstype.typ for k, v in self._data.items()}
schema[timecol] = datetime
table = perspective.Table(schema)
if _PERSPECTIVE_3:
perspective_type_map = {
str: "string",
float: "float",
int: "integer",
date: "date",
datetime: "datetime",
bool: "boolean",
}
schema = {col: perspective_type_map[typ] for col, typ in schema.items()}
if self._psp_client is None:
self._psp_client = perspective.Server().new_local_client()
table = self._psp_client.table(schema)
else:
table = perspective.Table(schema)
runner = csp.run_on_thread(
apply_updates,
table,
Expand Down
Loading

0 comments on commit 8030fcb

Please sign in to comment.