Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue where contexts were being notified before gnode state was updated #1136

Merged
merged 4 commits into from
Jul 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 42 additions & 15 deletions cpp/perspective/src/cpp/gnode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,10 +275,19 @@ t_gnode::_process_table(t_uindex port_id) {

// first update - master table is empty
if (m_gstate->mapping_size() == 0) {
// Update context from state first - computes columns during update
_update_contexts_from_state(flattened);
// Compute columns here on the flattened table, as the flattened table
// does not have any of the computed columns that are stored on the
// gnode, i.e. from all created contexts.
_compute_all_columns({flattened});

m_gstate->update_master_table(flattened.get());

m_oports[PSP_PORT_FLATTENED]->set_table(flattened);

// Update context from state after gnode state has been updated, as
// contexts obliquely read gnode state at various points.
_update_contexts_from_state(flattened);

release_inputs();
release_outputs();

Expand Down Expand Up @@ -589,8 +598,8 @@ t_gnode::process(t_uindex port_id) {

if (result.m_flattened_data_table) {
notify_contexts(*result.m_flattened_data_table);
}
}

// Whether the user should be notified - False if process_table exited
// early, True otherwise.
return result.m_should_notify_userspace;
Expand Down Expand Up @@ -756,10 +765,12 @@ t_gnode::_register_context(const std::string& name, t_ctx_type type, std::int64_
bool should_update = m_gstate->mapping_size() > 0;

// TODO: shift columns forward in cleanup, translate dead indices
std::shared_ptr<t_data_table> flattened;
std::shared_ptr<t_data_table> pkeyed_table;

if (should_update) {
flattened = m_gstate->get_pkeyed_table();
// Will not have computed columns added in the context to be
// registered, but all previously computed columns.
pkeyed_table = m_gstate->get_pkeyed_table();
}

std::vector<t_computed_column_definition> computed_columns;
Expand All @@ -770,36 +781,52 @@ t_gnode::_register_context(const std::string& name, t_ctx_type type, std::int64_
t_ctx2* ctx = static_cast<t_ctx2*>(ptr_);
ctx->reset();
computed_columns = ctx->get_config().get_computed_columns();
m_computed_column_map.add_computed_columns(computed_columns);
if (should_update)
update_context_from_state<t_ctx2>(ctx, flattened);
m_computed_column_map.add_computed_columns(computed_columns);

if (should_update) {
// Compute all valid computed columns + new computed columns that
// were added as part of this context. Do so separately from
// update_context_from_state, so that registration-specific logic
// is centralized in one place.
_compute_all_columns({pkeyed_table});
update_context_from_state<t_ctx2>(ctx, pkeyed_table);
}
} break;
case ONE_SIDED_CONTEXT: {
set_ctx_state<t_ctx1>(ptr_);
t_ctx1* ctx = static_cast<t_ctx1*>(ptr_);
ctx->reset();
computed_columns = ctx->get_config().get_computed_columns();
m_computed_column_map.add_computed_columns(computed_columns);
if (should_update)
update_context_from_state<t_ctx1>(ctx, flattened);

if (should_update) {
_compute_all_columns({pkeyed_table});
update_context_from_state<t_ctx1>(ctx, pkeyed_table);
}
} break;
case ZERO_SIDED_CONTEXT: {
set_ctx_state<t_ctx0>(ptr_);
t_ctx0* ctx = static_cast<t_ctx0*>(ptr_);
ctx->reset();
computed_columns = ctx->get_config().get_computed_columns();
m_computed_column_map.add_computed_columns(computed_columns);
if (should_update)
update_context_from_state<t_ctx0>(ctx, flattened);

if (should_update) {
_compute_all_columns({pkeyed_table});
update_context_from_state<t_ctx0>(ctx, pkeyed_table);
}
} break;
case GROUPED_PKEY_CONTEXT: {
set_ctx_state<t_ctx0>(ptr_);
auto ctx = static_cast<t_ctx_grouped_pkey*>(ptr_);
ctx->reset();
computed_columns = ctx->get_config().get_computed_columns();
m_computed_column_map.add_computed_columns(computed_columns);
if (should_update)
update_context_from_state<t_ctx_grouped_pkey>(ctx, flattened);

if (should_update) {
_compute_all_columns({pkeyed_table});
update_context_from_state<t_ctx_grouped_pkey>(ctx, pkeyed_table);
}
} break;
default: { PSP_COMPLAIN_AND_ABORT("Unexpected context type"); } break;
}
Expand Down
11 changes: 0 additions & 11 deletions cpp/perspective/src/include/perspective/gnode.h
Original file line number Diff line number Diff line change
Expand Up @@ -464,17 +464,6 @@ t_gnode::update_context_from_state(
if (flattened->size() == 0)
return;

// Flattened won't have the computed columns if it didn't pass through the
// main body of `process_table`, i.e. creating a 1/2 sided context, so
// compute again here.
auto computed_columns = m_computed_column_map.m_computed_columns;

if (computed_columns.size() > 0) {
for (const auto& computed : computed_columns) {
_compute_column(computed.second, flattened);
}
}

// Need to cast shared ptr to a const reference before passing to notify,
// reference is valid as `notify` is not async
const t_data_table& const_flattened =
Expand Down
60 changes: 60 additions & 0 deletions packages/perspective/test/js/computed/functionality.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,41 @@ module.exports = perspective => {
table.delete();
});

it("Should be able to create chained computed columns in `view()` from schema, and updates propagate", async function() {
const table = perspective.table({
w: "float",
x: "integer",
y: "string",
z: "boolean"
});
const view = table.view({
computed_columns: [
{
column: "int + float",
computed_function_name: "+",
inputs: ["w", "x"]
},
{
column: "computed2",
computed_function_name: "pow2",
inputs: ["int + float"]
}
]
});

const result = await view.to_columns();
expect(result).toEqual({});

table.update(common.int_float_data);

const new_result = await view.to_columns();
expect(new_result["int + float"]).toEqual([2.5, 4.5, 6.5, 8.5]);
expect(new_result["computed2"]).toEqual([2.5, 4.5, 6.5, 8.5].map(x => Math.pow(x, 2)));

view.delete();
table.delete();
});

it("Should be able to create multiple computed columns in `view()`", async function() {
const table = perspective.table(common.int_float_data);
const view = table.view({
Expand Down Expand Up @@ -682,6 +717,31 @@ module.exports = perspective => {
table.delete();
});

it("Should be able to row pivot on a non-computed column and get correct results.", async function() {
const table = perspective.table(common.int_float_data);
const view = table.view({
row_pivots: ["w"],
computed_columns: [
{
column: "int + float",
computed_function_name: "+",
inputs: ["w", "x"]
}
]
});
const result = await view.to_columns();
expect(result).toEqual({
__ROW_PATH__: [[], [1.5], [2.5], [3.5], [4.5]],
"int + float": [22, 2.5, 4.5, 6.5, 8.5],
w: [12, 1.5, 2.5, 3.5, 4.5],
x: [10, 1, 2, 3, 4],
y: [4, 1, 1, 1, 1],
z: [4, 1, 1, 1, 1]
});
view.delete();
table.delete();
});

it("Should be able to row pivot on a computed column.", async function() {
const table = perspective.table(common.int_float_data);
const view = table.view({
Expand Down
52 changes: 52 additions & 0 deletions packages/perspective/test/js/pivots.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,33 @@ module.exports = perspective => {
table.delete();
});

it("['z'], weighted mean on a table created from schema should return valid values after update", async function() {
const table = perspective.table({
x: "integer",
y: "integer",
z: "boolean"
});

const view = table.view({
row_pivots: ["z"],
columns: ["x"],
aggregates: {x: ["weighted mean", "y"]}
});

const answer = [
{__ROW_PATH__: [], x: 2.8333333333333335},
{__ROW_PATH__: [false], x: 3.3333333333333335},
{__ROW_PATH__: [true], x: 2.3333333333333335}
];

table.update(data2);

let result = await view.to_json();
expect(result).toEqual(answer);
view.delete();
table.delete();
});

it("['z'], mean", async function() {
var table = perspective.table(data);
var view = table.view({
Expand All @@ -113,6 +140,31 @@ module.exports = perspective => {
table.delete();
});

it("['z'], mean on a table created from schema should return valid values after update", async function() {
const table = perspective.table({
x: "integer",
y: "string",
z: "boolean"
});
const view = table.view({
row_pivots: ["z"],
columns: ["x"],
aggregates: {x: "mean"}
});
const answer = [
{__ROW_PATH__: [], x: 2.5},
{__ROW_PATH__: [false], x: 3},
{__ROW_PATH__: [true], x: 2}
];

table.update(data);

let result = await view.to_json();
expect(result).toEqual(answer);
view.delete();
table.delete();
});

it("['z'], first by index", async function() {
var table = perspective.table(data);
var view = table.view({
Expand Down
9 changes: 9 additions & 0 deletions python/perspective/perspective/tests/core/test_aggregates.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,15 @@ def test_aggregates_widget_load(self):
widget = PerspectiveWidget(data, aggregates=aggs)
assert widget.aggregates == aggs

def test_aggregates_widget_load_weighted_mean(self):
aggs = {
"a": Aggregate.AVG,
"b": ["weighted mean", "a"]
}
data = {"a": [1, 2, 3], "b": ["a", "b", "c"]}
widget = PerspectiveWidget(data, aggregates=aggs)
assert widget.aggregates == aggs

def test_aggregates_widget_setattr(self):
data = {"a": [1, 2, 3], "b": ["a", "b", "c"]}
widget = PerspectiveWidget(data)
Expand Down
43 changes: 41 additions & 2 deletions python/perspective/perspective/tests/table/test_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,46 @@ def test_view_aggregate_datetime_leading_zeroes(self):
{"__ROW_PATH__": [datetime(2019, 1, 1, 5, 5, 5)], "a": 1}
]

def test_view_aggregate_multiple_columns(self):
def test_view_aggregate_mean(self):
data = [
{"a": "a", "x": 1, "y": 200},
{"a": "a", "x": 2, "y": 100},
{"a": "a", "x": 3, "y": None}
]
tbl = Table(data)
view = tbl.view(
aggregates={"y": "mean"},
row_pivots=["a"],
columns=['y']
)
assert view.to_records() == [
{"__ROW_PATH__": [], "y": 300 / 2},
{"__ROW_PATH__": ["a"], "y": 300 / 2}
]

def test_view_aggregate_mean_from_schema(self):
data = [
{"a": "a", "x": 1, "y": 200},
{"a": "a", "x": 2, "y": 100},
{"a": "a", "x": 3, "y": None}
]
tbl = Table({
"a": str,
"x": int,
"y": float
})
view = tbl.view(
aggregates={"y": "mean"},
row_pivots=["a"],
columns=['y']
)
tbl.update(data)
assert view.to_records() == [
{"__ROW_PATH__": [], "y": 300 / 2},
{"__ROW_PATH__": ["a"], "y": 300 / 2}
]

def test_view_aggregate_weighted_mean(self):
data = [
{"a": "a", "x": 1, "y": 200},
{"a": "a", "x": 2, "y": 100},
Expand All @@ -449,7 +488,7 @@ def test_view_aggregate_multiple_columns(self):
{"__ROW_PATH__": ["a"], "y": (1.0 * 200 + 2 * 100) / (1.0 + 2)}
]

def test_view_aggregate_multiple_columns_with_negative_weights(self):
def test_view_aggregate_weighted_mean_with_negative_weights(self):
data = [
{"a": "a", "x": 1, "y": 200},
{"a": "a", "x": -2, "y": 100},
Expand Down
5 changes: 5 additions & 0 deletions python/perspective/perspective/viewer/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ def validate_aggregates(aggregates):
elif isinstance(v, string_types):
if v not in Aggregate.options():
raise PerspectiveError('Unrecognized aggregate: %s', v)
elif isinstance(v, list):
# Parse weighted mean aggregate in ["weighted mean", "COLUMN"]
if len(v) == 2 and v[0] == "weighted mean":
continue
raise PerspectiveError("Unrecognized aggregate in incorrect syntax for weighted mean: %s - Syntax should be: ['weighted mean', 'COLUMN']", v)
else:
raise PerspectiveError('Cannot parse aggregation of type %s', str(type(v)))
return aggregates
Expand Down
2 changes: 1 addition & 1 deletion python/perspective/perspective/viewer/viewer_traitlets.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class PerspectiveTraitlets(HasTraits):
columns = List(default_value=[]).tag(sync=True)
row_pivots = List(trait=Unicode(), default_value=[]).tag(sync=True, o=True)
column_pivots = List(trait=Unicode(), default_value=[]).tag(sync=True)
aggregates = Dict(trait=Unicode(), default_value={}).tag(sync=True)
aggregates = Dict(default_value={}).tag(sync=True)
sort = List(default_value=[]).tag(sync=True)
filters = List(default_value=[]).tag(sync=True)
computed_columns = List(default_value=[]).tag(sync=True)
Expand Down