Skip to content

Commit

Permalink
Merge pull request #1164 from finos/fix-remote-view
Browse files Browse the repository at this point in the history
Fixes #1159 - deltas are now generated on first update for 0-sided contexts
  • Loading branch information
texodus authored Aug 26, 2020
2 parents 823bede + 523d24f commit 1ee6766
Show file tree
Hide file tree
Showing 10 changed files with 876 additions and 42 deletions.
93 changes: 68 additions & 25 deletions cpp/perspective/src/cpp/context_zero.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -381,20 +381,12 @@ t_ctx0::sidedness() const {
return 0;
}

/**
* @brief Handle additions and new data, calculating deltas along the way.
*
* @param flattened
* @param delta
* @param prev
* @param curr
* @param transitions
* @param existed
*/
void
t_ctx0::notify(const t_data_table& flattened, const t_data_table& delta,
const t_data_table& prev, const t_data_table& curr, const t_data_table& transitions,
const t_data_table& existed) {
// Notify the context with new data when the `t_gstate` master table is
// not empty, and being updated with new data.
psp_log_time(repr() + " notify.enter");
t_uindex nrecs = flattened.size();
std::shared_ptr<const t_column> pkey_sptr = flattened.get_const_column("psp_pkey");
Expand All @@ -406,6 +398,7 @@ t_ctx0::notify(const t_data_table& flattened, const t_data_table& delta,
const t_column* existed_col = existed_sptr.get();

bool delete_encountered = false;

if (m_config.has_filters()) {
t_mask msk_prev = filter_table_for_config(prev, m_config);
t_mask msk_curr = filter_table_for_config(curr, m_config);
Expand Down Expand Up @@ -441,20 +434,24 @@ t_ctx0::notify(const t_data_table& flattened, const t_data_table& delta,
default: { PSP_COMPLAIN_AND_ABORT("Unexpected OP"); } break;
}

// add the pkey for updated rows
// add the pkey for row delta
add_delta_pkey(pkey);
}
psp_log_time(repr() + " notify.has_filter_path.updated_traversal");

// calculate deltas
calc_step_delta(flattened, prev, curr, transitions);
// calculate cell deltas if enabled
if (get_deltas_enabled()) {
calc_step_delta(flattened, prev, curr, transitions);
}

m_has_delta = m_deltas->size() > 0 || m_delta_pkeys.size() > 0 || delete_encountered;

psp_log_time(repr() + " notify.has_filter_path.exit");

return;
}

// Context does not have filters applied
for (t_uindex idx = 0; idx < nrecs; ++idx) {
t_tscalar pkey = m_symtable.get_interned_tscalar(pkey_col->get_scalar(idx));
std::uint8_t op_ = *(op_col->get_nth<std::uint8_t>(idx));
Expand All @@ -476,26 +473,25 @@ t_ctx0::notify(const t_data_table& flattened, const t_data_table& delta,
default: { PSP_COMPLAIN_AND_ABORT("Unexpected OP"); } break;
}

// add the pkey for updated rows
// add the pkey for row delta
add_delta_pkey(pkey);
}

psp_log_time(repr() + " notify.no_filter_path.updated_traversal");

// calculate deltas
calc_step_delta(flattened, prev, curr, transitions);
// calculate cell deltas if enabled
if (get_deltas_enabled()) {
calc_step_delta(flattened, prev, curr, transitions);
}
m_has_delta = m_deltas->size() > 0 || m_delta_pkeys.size() > 0 || delete_encountered;

psp_log_time(repr() + " notify.no_filter_path.exit");
}

/**
* @brief Handle the addition of new data.
*
* @param flattened
*/
void
t_ctx0::notify(const t_data_table& flattened) {
// Notify the context with new data after the `t_gstate`'s master table
// has been updated for the first time with data.
t_uindex nrecs = flattened.size();
std::shared_ptr<const t_column> pkey_sptr = flattened.get_const_column("psp_pkey");
std::shared_ptr<const t_column> op_sptr = flattened.get_const_column("psp_op");
Expand All @@ -518,11 +514,19 @@ t_ctx0::notify(const t_data_table& flattened) {
m_traversal->add_row(m_gstate, m_config, pkey);
}
} break;
default: {
// pass
} break;
default: break;
}

// Add primary key to track row delta
add_delta_pkey(pkey);
}

// Calculate the step delta, if enabled in the context through an on_update
// callback with the "cell" or "row" mode set.
if (get_deltas_enabled()) {
calc_step_delta(flattened);
}

return;
}

Expand All @@ -535,13 +539,51 @@ t_ctx0::notify(const t_data_table& flattened) {
case OP_INSERT: {
m_traversal->add_row(m_gstate, m_config, pkey);
} break;
default: { } break; }
default: break;
}

// Add primary key to track row delta
add_delta_pkey(pkey);
}

// Calculate the step delta, if enabled in the context through an on_update
// callback with the "cell" or "row" mode set.
if (get_deltas_enabled()) {
calc_step_delta(flattened);
}
}

void
t_ctx0::calc_step_delta(const t_data_table& flattened) {
// Calculate step deltas when the `t_gstate` master table is updated with
// data for the first time, so every single row is a new delta.
t_uindex nrows = flattened.size();
const auto& column_names = m_config.get_column_names();
const t_column* pkey_col = flattened.get_const_column("psp_pkey").get();

// Add every row and every column to the delta
for (const auto& name : column_names) {
auto cidx = m_config.get_colidx(name);
const t_column* flattened_column = flattened.get_const_column(name).get();

for (t_uindex ridx = 0; ridx < nrows; ++ridx) {
m_deltas->insert(
t_zcdelta(
get_interned_tscalar(pkey_col->get_scalar(ridx)),
cidx,
mknone(),
get_interned_tscalar(flattened_column->get_scalar(ridx))
)
);
}
}
}

void
t_ctx0::calc_step_delta(const t_data_table& flattened, const t_data_table& prev,
const t_data_table& curr, const t_data_table& transitions) {
// Calculate step deltas when the `t_gstate` master table already has
// data, so we can take transitions into account.
t_uindex nrows = flattened.size();

PSP_VERBOSE_ASSERT(prev.size() == nrows, "Shape violation detected");
Expand Down Expand Up @@ -580,6 +622,7 @@ t_ctx0::calc_step_delta(const t_data_table& flattened, const t_data_table& prev,
}
}


/**
* @brief Mark a primary key as updated by adding it to the tracking set.
*
Expand Down
10 changes: 0 additions & 10 deletions cpp/perspective/src/cpp/view.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -596,22 +596,12 @@ View<CTX_T>::_get_deltas_enabled() const {
return m_ctx->get_deltas_enabled();
}

template <>
bool
View<t_ctx0>::_get_deltas_enabled() const {
return true;
}

template <typename CTX_T>
void
View<CTX_T>::_set_deltas_enabled(bool enabled_state) {
m_ctx->set_deltas_enabled(enabled_state);
}

template <>
void
View<t_ctx0>::_set_deltas_enabled(bool enabled_state) {}

// Pivot table operations
template <typename CTX_T>
std::int32_t
Expand Down
22 changes: 20 additions & 2 deletions cpp/perspective/src/include/perspective/context_zero.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,29 @@ class PERSPECTIVE_EXPORT t_ctx0 : public t_ctxbase<t_ctx0> {
std::vector<t_tscalar> get_all_pkeys(
const std::vector<std::pair<t_uindex, t_uindex>>& cells) const;

/**
* @brief During a call to `notify` when the master table is being updated
* with data for the first time (prev, curr, and transitions tables are
* empty), take all added rows in the traversal and store them in
* `m_deltas`.
*
* @param flattened
*/
void calc_step_delta(const t_data_table& flattened);

/**
* @brief During a call to `notify` when the master table has data,
* calculate the deltas - both changed and added cells, and write them
* to `m_deltas`.
*
* @param flattened
* @param prev
* @param curr
* @param transitions
*/
void calc_step_delta(const t_data_table& flattened, const t_data_table& prev,
const t_data_table& curr, const t_data_table& transitions);

void calc_row_delta(const t_data_table& flattened, const t_data_table& transitions);

void add_delta_pkey(t_tscalar pkey);

private:
Expand Down
Loading

0 comments on commit 1ee6766

Please sign in to comment.