Skip to content

Commit

Permalink
Upgrade csp to perspective 3.x (#392)
Browse files Browse the repository at this point in the history
* Upgrade csp to perspective 3.x

Signed-off-by: Pascal Tomecek <pascal.tomecek@cubistsystematic.com>
Signed-off-by: Tim Paine <3105306+timkpaine@users.noreply.github.com>
  • Loading branch information
ptomecek authored Nov 19, 2024
1 parent 8274827 commit 25c2b67
Show file tree
Hide file tree
Showing 10 changed files with 352 additions and 109 deletions.
12 changes: 8 additions & 4 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -670,9 +670,9 @@ jobs:
python-version:
- 3.9
package:
- "sqlalchemy>=2"
- "sqlalchemy<2"
- "numpy==1.19.5"
- "numpy==1.22.4" # Min supported version of pandas 2.2
- "perspective-python<3"

runs-on: ${{ matrix.os }}

Expand Down Expand Up @@ -709,11 +709,15 @@ jobs:

- name: Python Test Steps
run: make test TEST_ARGS="-k TestDBReader"
if: ${{ contains( 'sqlalchemy', matrix.package )}}
if: ${{ contains( matrix.package, 'sqlalchemy' )}}

- name: Python Test Steps
run: make test
if: ${{ contains( 'numpy', matrix.package )}}
if: ${{ contains( matrix.package, 'numpy' )}}

- name: Python Test Steps
run: make test TEST_ARGS="-k Perspective"
if: ${{ contains( matrix.package, 'perspective' )}}

###########################################################################################################
#.........................................................................................................#
Expand Down
2 changes: 1 addition & 1 deletion conda/dev-environment-unix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ dependencies:
- librdkafka
- lz4-c
- mamba
- mdformat>=0.7.17,<0.8
- mdformat=0.7.17
- ninja
- numpy<2
- pandas
Expand Down
2 changes: 1 addition & 1 deletion conda/dev-environment-win.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ dependencies:
- lz4-c
- make
- mamba
- mdformat>=0.7.17,<0.8
- mdformat=0.7.17
- ninja
- numpy<2
- pandas
Expand Down
78 changes: 57 additions & 21 deletions csp/adapters/perspective.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
import threading
from datetime import timedelta
from perspective import Table as Table_, View as View_
from typing import Dict, Optional, Union

import csp
from csp import ts
from csp.impl.perspective_common import (
date_to_perspective,
datetime_to_perspective,
is_perspective3,
perspective_type_map,
)
from csp.impl.wiring.delayed_node import DelayedNodeWrapperDef

try:
Expand All @@ -14,20 +21,17 @@
raise ImportError("perspective adapter requires tornado package")


try:
from perspective import PerspectiveManager, Table as Table_, View as View_, __version__, set_threadpool_size

MAJOR, MINOR, PATCH = map(int, __version__.split("."))
if (MAJOR, MINOR, PATCH) < (0, 6, 2):
raise ImportError("perspective adapter requires 0.6.2 or greater of the perspective-python package")
except ImportError:
raise ImportError("perspective adapter requires 0.6.2 or greater of the perspective-python package")
_PERSPECTIVE_3 = is_perspective3()
if _PERSPECTIVE_3:
from perspective import Server
else:
from perspective import PerspectiveManager


# Run perspective update in a separate tornado loop
def perspective_thread(manager):
def perspective_thread(client):
loop = tornado.ioloop.IOLoop()
manager.set_loop_callback(loop.add_callback)
client.set_loop_callback(loop.add_callback)
loop.start()


Expand All @@ -38,12 +42,25 @@ def _apply_updates(table: object, data: {str: ts[object]}, throttle: timedelta):

with csp.state():
s_buffer = []
s_datetime_cols = set()
s_date_cols = set()

with csp.start():
csp.schedule_alarm(alarm, throttle, True)
if _PERSPECTIVE_3:
s_datetime_cols = set([c for c, t in table.schema().items() if t == "datetime"])
s_date_cols = set([c for c, t in table.schema().items() if t == "date"])

if csp.ticked(data):
s_buffer.append(dict(data.tickeditems()))
row = dict(data.tickeditems())
if _PERSPECTIVE_3:
for col, value in row.items():
if col in s_datetime_cols:
row[col] = datetime_to_perspective(row[col])
if col in s_date_cols:
row[col] = date_to_perspective(row[col])

s_buffer.append(row)

if csp.ticked(alarm):
if len(s_buffer) > 0:
Expand All @@ -54,19 +71,25 @@ def _apply_updates(table: object, data: {str: ts[object]}, throttle: timedelta):


@csp.node
def _launch_application(port: int, manager: object, stub: ts[object]):
def _launch_application(port: int, server: object, stub: ts[object]):
with csp.state():
s_app = None
s_ioloop = None
s_iothread = None

with csp.start():
from perspective import PerspectiveTornadoHandler
if _PERSPECTIVE_3:
from perspective.handlers.tornado import PerspectiveTornadoHandler

handler_args = {"perspective_server": server, "check_origin": True}
else:
from perspective import PerspectiveTornadoHandler

handler_args = {"manager": server, "check_origin": True}
s_app = tornado.web.Application(
[
# create a websocket endpoint that the client Javascript can access
(r"/websocket", PerspectiveTornadoHandler, {"manager": manager, "check_origin": True})
(r"/websocket", PerspectiveTornadoHandler, handler_args)
],
websocket_ping_interval=15,
)
Expand Down Expand Up @@ -196,21 +219,34 @@ def create_table(self, name, limit=None, index=None):
return table

def _instantiate(self):
set_threadpool_size(self._threadpool_size)

manager = PerspectiveManager()
if _PERSPECTIVE_3:
server = Server()
client = server.new_local_client()
thread = threading.Thread(target=perspective_thread, kwargs=dict(client=client))
else:
from perspective import set_threadpool_size

thread = threading.Thread(target=perspective_thread, kwargs=dict(manager=manager))
set_threadpool_size(self._threadpool_size)
manager = PerspectiveManager()
thread = threading.Thread(target=perspective_thread, kwargs=dict(manager=manager))
thread.daemon = True
thread.start()

for table_name, table in self._tables.items():
schema = {
k: v.tstype.typ if not issubclass(v.tstype.typ, csp.Enum) else str for k, v in table.columns.items()
}
ptable = Table(schema, limit=table.limit, index=table.index)
manager.host_table(table_name, ptable)
if _PERSPECTIVE_3:
psp_type_map = perspective_type_map()
schema = {col: psp_type_map.get(typ, typ) for col, typ in schema.items()}
ptable = client.table(schema, name=table_name, limit=table.limit, index=table.index)
else:
ptable = Table(schema, limit=table.limit, index=table.index)
manager.host_table(table_name, ptable)

_apply_updates(ptable, table.columns, self._throttle)

_launch_application(self._port, manager, csp.const("stub"))
if _PERSPECTIVE_3:
_launch_application(self._port, server, csp.const("stub"))
else:
_launch_application(self._port, manager, csp.const("stub"))
40 changes: 33 additions & 7 deletions csp/dataframe.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import datetime, timedelta
from datetime import date, datetime, timedelta
from packaging import version
from typing import Dict, Optional

import csp.baselib
Expand All @@ -12,6 +13,7 @@ class DataFrame:
def __init__(self, data: Optional[Dict] = None):
self._data = data or {}
self._columns = list(self._data.keys())
self._psp_client = None

@property
def columns(self):
Expand Down Expand Up @@ -204,10 +206,17 @@ def to_perspective(self, starttime: datetime, endtime: datetime = None, realtime
try:
import perspective

if version.parse(perspective.__version__) >= version.parse("3"):
_PERSPECTIVE_3 = True
from perspective.widget import PerspectiveWidget
else:
_PERSPECTIVE_3 = False
from perspective import PerspectiveWidget

global RealtimePerspectiveWidget
if RealtimePerspectiveWidget is None:

class RealtimePerspectiveWidget(perspective.PerspectiveWidget):
class RealtimePerspectiveWidget(PerspectiveWidget):
def __init__(self, engine_runner, *args, **kwargs):
super().__init__(*args, **kwargs)
self._runner = engine_runner
Expand All @@ -222,14 +231,14 @@ def join(self):
self._runner.join()

except ImportError:
raise ImportError("eval_perspective requires perspective-python installed")
raise ImportError("to_perspective requires perspective-python installed")

if not realtime:
df = self.to_pandas(starttime, endtime)
return perspective.PerspectiveWidget(df.ffill(), plugin="Y Line", columns=self._columns, group_by="index")
return PerspectiveWidget(df.ffill(), plugin="Y Line", columns=self._columns, group_by="index")

@csp.node
def apply_updates(table: object, data: {str: csp.ts[object]}, timecol: str, throttle: timedelta):
def apply_updates(table: object, data: Dict[str, csp.ts[object]], timecol: str, throttle: timedelta):
with csp.alarms():
alarm = csp.alarm(bool)
with csp.state():
Expand All @@ -240,7 +249,10 @@ def apply_updates(table: object, data: {str: csp.ts[object]}, timecol: str, thro

if csp.ticked(data):
s_buffer.append(dict(data.tickeditems()))
s_buffer[-1][timecol] = csp.now()
if _PERSPECTIVE_3:
s_buffer[-1][timecol] = int(csp.now().timestamp() * 1000)
else:
s_buffer[-1][timecol] = csp.now()

if csp.ticked(alarm):
if len(s_buffer) > 0:
Expand All @@ -252,7 +264,21 @@ def apply_updates(table: object, data: {str: csp.ts[object]}, timecol: str, thro
timecol = "time"
schema = {k: v.tstype.typ for k, v in self._data.items()}
schema[timecol] = datetime
table = perspective.Table(schema)
if _PERSPECTIVE_3:
perspective_type_map = {
str: "string",
float: "float",
int: "integer",
date: "date",
datetime: "datetime",
bool: "boolean",
}
schema = {col: perspective_type_map[typ] for col, typ in schema.items()}
if self._psp_client is None:
self._psp_client = perspective.Server().new_local_client()
table = self._psp_client.table(schema)
else:
table = perspective.Table(schema)
runner = csp.run_on_thread(
apply_updates,
table,
Expand Down
Loading

0 comments on commit 25c2b67

Please sign in to comment.