Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async Mode (GIL unlock) Does Not Work With Tables Based On Views #1704

Closed
choandrew opened this issue Jan 14, 2022 · 1 comment · Fixed by #1707
Closed

Async Mode (GIL unlock) Does Not Work With Tables Based On Views #1704

choandrew opened this issue Jan 14, 2022 · 1 comment · Fixed by #1707
Labels
bug Concrete, reproducible bugs

Comments

@choandrew
Copy link

Bug Report

High Level Overview

  1. You have a regular base_table + manager
  2. Create a view from that base_table then create a derived_table from that view, with the derived_table updating with deltas from the view
  3. then you host BOTH tables with the manager

Any code containing this will segfault very quickly

    # you have a base_table + manager
    MANAGER = perspective.PerspectiveManager()
    BASE_TABLE = perspective.Table(columns, index="index")

    # create a view from that base_table then create a derived_table from that view
    VIEW = BASE_TABLE.view()
    DERIVED_TABLE = perspective.Table(VIEW.to_arrow(), index=BASE_TABLE.get_index())
    VIEW.on_update(updater_func, "row")

    # then you host BOTH tables
    MANAGER.host(BASE_TABLE, "base")
    MANAGER.host(DERIVED_TABLE, "DERIVED")

where updater_func fed into VIEW.on_update can either be

manager.call_loop(table.update, delta)
# OR
table.update(delta)

Bug Reproduction Code:

Minimal code reproduction on perspective 1.1.0

import asyncio
import socket
import threading

import tornado
import tornado.web
import tornado.ioloop
import tornado.gen

import perspective


class MainHandler(tornado.web.RequestHandler):
    def initialize(self, domain: str, port: int, table_name: str):
        self.port = port
        self.domain = domain
        self.table_name = table_name

    def get(self):
        self.write("")


def make_app(
    MANAGER: perspective.PerspectiveManager, domain: str, port: int, table_name: str,
):

    handlers = [
        (
            r"/websocket",
            perspective.PerspectiveTornadoHandler,
            {"manager": MANAGER, "check_origin": True},
        ),
        (
            r"/",
            MainHandler,
            {"domain": domain, "port": port, "table_name": table_name},
        ),
    ]
    app = tornado.web.Application(handlers)
    return app


def perspective_thread(MANAGER: perspective.PerspectiveManager):
    loop = tornado.ioloop.IOLoop()
    MANAGER.set_loop_callback(loop.add_callback)
    loop.start()


def async_updater(table: perspective.Table, manager: perspective.PerspectiveManager):
    def update_function(port: int, delta):
        manager.call_loop(table.update, delta)

    return update_function


def sync_updater(table: perspective.Table, manager: perspective.PerspectiveManager):
    def update_function(port: int, delta):
        table.update(delta)

    return update_function


async def main(async_mode: bool):
    print("starting up.")
    columns = {"index": int, "num1": int, "num2": int}

    MANAGER = perspective.PerspectiveManager()

    BASE_TABLE = perspective.Table(columns, index="index")
    table_name = "base_table"

    VIEW = BASE_TABLE.view()

    DERIVED_TABLE = perspective.Table(VIEW.to_arrow(), index=BASE_TABLE.get_index())

    MANAGER.host(BASE_TABLE, table_name)

    MANAGER.host(DERIVED_TABLE, "DERIVED")
    VIEW.on_update(async_updater(DERIVED_TABLE, MANAGER), "row")
    # if you comment out *either* of the above 2 line, it works
    # you can even replace async_updater with sync_updater

    """
    # below does not work either if you put it after the loop_callback is set
    MANAGER.call_loop(
        VIEW.on_update, async_updater(DERIVED_TABLE, MANAGER), "row",
    )
    """

    if async_mode:
        thread = threading.Thread(target=perspective_thread, args=(MANAGER,))
        thread.daemon = True
        thread.start()
        while MANAGER._loop_callback is None:
            print("Pausing to let Perspective Manager thread set up.")
            await asyncio.sleep(1)
    else:
        loop = asyncio.get_event_loop()
        MANAGER.set_loop_callback(loop.call_soon)

    domain = socket.gethostname()
    port = 8080
    app = make_app(MANAGER, domain, port, table_name)
    app.listen(port)

    print(f"url: {domain}:{port}")

    i = 0

    MANAGER.call_loop(BASE_TABLE.update, [{"index": i, "num1": i, "num2": 2 * i}])
    while True:
        await asyncio.sleep(0.001)
        MANAGER.call_loop(BASE_TABLE.update, [{"index": i, "num1": i, "num2": 2 * i}])
        MANAGER.call_loop(
            BASE_TABLE.update,
            [{"index": p, "num1": p % 3, "num2": 2 * p + i} for p in range(i)],
        )
        print(i)
        i += 1


if __name__ == "__main__":
    async_mode = True
    asyncio.run(main(async_mode), debug=True)

Expected Result:

No crashing

Actual Result:

Seg fault very soon after I start running

Environment:

linux
perspective 1.1.0

Additional Context:

I suspect it was the same issue I had back with this issue #1313, but I have narrowed down what specifically is wrong
This suggests that this is not a new bug

@texodus
Copy link
Member

texodus commented Jan 18, 2022

Thanks for the report @choandrew!

This has been fixed in #1707 and will be in the v1.1.1 release. I find the PYTHONMALLOC=debug environment value to be quite helpful when isolating GIL issues, as opposed to relying on thread contention. I've added this mode to our test suite to help assert not just working tests, but strictly correct GIL usage for these, as well as a simplified version of your repro which fails consistently in this mode.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Concrete, reproducible bugs
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants