Skip to content

Commit

Permalink
coord: unmaterialize all logging views by default (MaterializeInc#3752)
Browse files Browse the repository at this point in the history
* coord: unmaterialize all logging views by default

These logging views can eat up a lot of CPU. Make them all
unmaterialized instead. They can still be queried, but doing so will run
the required joins/aggregations on the underlying sources on demand.

* fix tests

Co-authored-by: Brennan Vincent <brennan@materialize.io>
  • Loading branch information
benesch and umanwizard authored Jul 29, 2020
1 parent 3ca9278 commit 77664ac
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 82 deletions.
60 changes: 16 additions & 44 deletions src/coord/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,35 +283,23 @@ where
}
}
GlobalId::System(_) => {
// TODO(benesch): a smarter way to determine whether this system index
// is on a logging source or a logging view. Probably logging sources
// should not be catalog views.
if logging
.unwrap()
.active_views()
.iter()
.any(|v| v.index_id == *id)
{
coord.create_index_dataflow(name.to_string(), *id, index.clone())
} else {
if is_cat_index(&id) {
let entry = coord.catalog.get_by_id(&index.on);
coord.views.insert(*id, ViewState::new(false, vec![]));
broadcast(
&mut coord.broadcast_tx,
SequencedCommand::CreateLocalInput {
name: name.to_string(),
index_id: *id,
index: IndexDesc {
on_id: *id,
keys: index.keys.clone(),
},
on_type: entry.desc().unwrap().typ().clone(),
if is_cat_index(&id) {
let entry = coord.catalog.get_by_id(&index.on);
coord.views.insert(*id, ViewState::new(false, vec![]));
broadcast(
&mut coord.broadcast_tx,
SequencedCommand::CreateLocalInput {
name: name.to_string(),
index_id: *id,
index: IndexDesc {
on_id: *id,
keys: index.keys.clone(),
},
);
}
coord.insert_index(*id, &index, Some(1_000))
on_type: entry.desc().unwrap().typ().clone(),
},
);
}
coord.insert_index(*id, &index, Some(1_000))
}
},
}
Expand Down Expand Up @@ -2733,7 +2721,7 @@ fn open_catalog(
}) => {
assert!(replace.is_none());
assert!(!if_not_exists);
assert!(materialize);
assert!(!materialize);
// Optimize the expression so that we can form an accurately typed description.
let optimized_expr = optimizer
.optimize(view.expr, catalog.indexes())
Expand All @@ -2752,23 +2740,7 @@ fn open_catalog(
schema: "mz_catalog".into(),
item: log_view.name.into(),
};
let index_name = format!("{}_primary_idx", log_view.name);
let index = auto_generate_primary_idx(
index_name.clone(),
view_name.clone(),
log_view.id,
&view.desc,
);
catalog.insert_item(log_view.id, view_name, CatalogItem::View(view));
catalog.insert_item(
log_view.index_id,
FullName {
database: DatabaseSpecifier::Ambient,
schema: "mz_catalog".into(),
item: index_name,
},
CatalogItem::Index(index),
);
}
err => panic!(
"internal error: failed to load bootstrap view:\n{}\nerror:\n{:?}",
Expand Down
35 changes: 12 additions & 23 deletions src/dataflow-types/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,15 +308,14 @@ pub struct LogView {
pub name: &'static str,
pub sql: &'static str,
pub id: GlobalId,
pub index_id: GlobalId,
}

// Stores all addresses that only have one slot (0) in
// mz_dataflow_operator_addresses. The resulting addresses are either channels
// or dataflows.
const VIEW_ADDRESSES_WITH_UNIT_LENGTH: LogView = LogView {
name: "mz_addresses_with_unit_length",
sql: "CREATE MATERIALIZED VIEW mz_addresses_with_unit_length AS SELECT
sql: "CREATE VIEW mz_addresses_with_unit_length AS SELECT
mz_dataflow_operator_addresses.id,
mz_dataflow_operator_addresses.worker
FROM
Expand All @@ -326,14 +325,13 @@ GROUP BY
mz_dataflow_operator_addresses.worker
HAVING count(*) = 1",
id: GlobalId::System(33),
index_id: GlobalId::System(34),
};

/// Maintains a list of the current dataflow operator ids, and their
/// corresponding operator names and local ids (per worker).
const VIEW_DATAFLOW_NAMES: LogView = LogView {
name: "mz_dataflow_names",
sql: "CREATE MATERIALIZED VIEW mz_dataflow_names AS SELECT
sql: "CREATE VIEW mz_dataflow_names AS SELECT
mz_dataflow_operator_addresses.id,
mz_dataflow_operator_addresses.worker,
mz_dataflow_operator_addresses.value as local_id,
Expand All @@ -349,14 +347,13 @@ WHERE
mz_dataflow_operator_addresses.worker = mz_addresses_with_unit_length.worker AND
mz_dataflow_operator_addresses.slot = 0",
id: GlobalId::System(35),
index_id: GlobalId::System(36),
};

/// Maintains a list of all operators bound to a dataflow and their
/// corresponding names and dataflow names and ids (per worker).
const VIEW_DATAFLOW_OPERATOR_DATAFLOWS: LogView = LogView {
name: "mz_dataflow_operator_dataflows",
sql: "CREATE MATERIALIZED VIEW mz_dataflow_operator_dataflows AS SELECT
sql: "CREATE VIEW mz_dataflow_operator_dataflows AS SELECT
mz_dataflow_operators.id,
mz_dataflow_operators.name,
mz_dataflow_operators.worker,
Expand All @@ -373,14 +370,13 @@ WHERE
mz_dataflow_names.local_id = mz_dataflow_operator_addresses.value AND
mz_dataflow_names.worker = mz_dataflow_operator_addresses.worker",
id: GlobalId::System(37),
index_id: GlobalId::System(38),
};

/// Maintains the number of records used by each operator in a dataflow (per
/// worker). Operators not using any records are not shown.
const VIEW_RECORDS_PER_DATAFLOW_OPERATOR: LogView = LogView {
name: "mz_records_per_dataflow_operator",
sql: "CREATE MATERIALIZED VIEW mz_records_per_dataflow_operator AS SELECT
sql: "CREATE VIEW mz_records_per_dataflow_operator AS SELECT
mz_dataflow_operator_dataflows.id,
mz_dataflow_operator_dataflows.name,
mz_dataflow_operator_dataflows.worker,
Expand All @@ -393,13 +389,12 @@ WHERE
mz_dataflow_operator_dataflows.id = mz_arrangement_sizes.operator AND
mz_dataflow_operator_dataflows.worker = mz_arrangement_sizes.worker",
id: GlobalId::System(39),
index_id: GlobalId::System(40),
};

/// Maintains the number of records used by each dataflow (per worker).
const VIEW_RECORDS_PER_DATAFLOW: LogView = LogView {
name: "mz_records_per_dataflow",
sql: "CREATE MATERIALIZED VIEW mz_records_per_dataflow AS SELECT
sql: "CREATE VIEW mz_records_per_dataflow AS SELECT
mz_records_per_dataflow_operator.dataflow_id as id,
mz_dataflow_names.name,
mz_records_per_dataflow_operator.worker,
Expand All @@ -415,13 +410,12 @@ GROUP BY
mz_dataflow_names.name,
mz_records_per_dataflow_operator.worker",
id: GlobalId::System(41),
index_id: GlobalId::System(42),
};

/// Maintains the number of records used by each dataflow (across all workers).
const VIEW_RECORDS_PER_DATAFLOW_GLOBAL: LogView = LogView {
name: "mz_records_per_dataflow_global",
sql: "CREATE MATERIALIZED VIEW mz_records_per_dataflow_global AS SELECT
sql: "CREATE VIEW mz_records_per_dataflow_global AS SELECT
mz_records_per_dataflow.id,
mz_records_per_dataflow.name,
SUM(mz_records_per_dataflow.records) as records
Expand All @@ -431,12 +425,11 @@ GROUP BY
mz_records_per_dataflow.id,
mz_records_per_dataflow.name",
id: GlobalId::System(43),
index_id: GlobalId::System(44),
};

const VIEW_PERF_DEPENDENCY_FRONTIERS: LogView = LogView {
name: "mz_perf_dependency_frontiers",
sql: "CREATE MATERIALIZED VIEW mz_perf_dependency_frontiers AS SELECT DISTINCT
sql: "CREATE VIEW mz_perf_dependency_frontiers AS SELECT DISTINCT
coalesce(mcn.name, index_deps.dataflow) as dataflow,
coalesce(mcn_source.name, frontier_source.global_id) as source,
frontier_source.time - frontier_df.time as lag_ms
Expand All @@ -447,21 +440,20 @@ JOIN mz_catalog.mz_materialization_frontiers frontier_df ON index_deps.dataflow
LEFT JOIN mz_catalog.mz_catalog_names mcn ON mcn.global_id = index_deps.dataflow
LEFT JOIN mz_catalog.mz_catalog_names mcn_source ON mcn_source.global_id = frontier_source.global_id",
id: GlobalId::System(45),
index_id: GlobalId::System(46),
};

const VIEW_PERF_ARRANGEMENT_RECORDS: LogView = LogView {
name: "mz_perf_arrangement_records",
sql: "CREATE MATERIALIZED VIEW mz_perf_arrangement_records AS SELECT mas.worker, name, records, operator
sql:
"CREATE VIEW mz_perf_arrangement_records AS SELECT mas.worker, name, records, operator
FROM mz_catalog.mz_arrangement_sizes mas
LEFT JOIN mz_catalog.mz_dataflow_operators mdo ON mdo.id = mas.operator AND mdo.worker = mas.worker",
id: GlobalId::System(47),
index_id: GlobalId::System(48),
};

const VIEW_PERF_PEEK_DURATIONS_CORE: LogView = LogView {
name: "mz_perf_peek_durations_core",
sql: "CREATE MATERIALIZED VIEW mz_perf_peek_durations_core AS SELECT
sql: "CREATE VIEW mz_perf_peek_durations_core AS SELECT
d_upper.worker,
CAST(d_upper.duration_ns AS TEXT) AS le,
sum(d_summed.count) AS count
Expand All @@ -473,27 +465,24 @@ WHERE
d_upper.duration_ns >= d_summed.duration_ns
GROUP BY d_upper.worker, d_upper.duration_ns",
id: GlobalId::System(49),
index_id: GlobalId::System(50),
};

const VIEW_PERF_PEEK_DURATIONS_BUCKET: LogView = LogView {
name: "mz_perf_peek_durations_bucket",
sql: "CREATE MATERIALIZED VIEW mz_perf_peek_durations_bucket AS
sql: "CREATE VIEW mz_perf_peek_durations_bucket AS
(
SELECT * FROM mz_catalog.mz_perf_peek_durations_core
) UNION (
SELECT worker, '+Inf', max(count) AS count FROM mz_catalog.mz_perf_peek_durations_core
GROUP BY worker
)",
id: GlobalId::System(51),
index_id: GlobalId::System(52),
};

const VIEW_PERF_PEEK_DURATIONS_AGGREGATES: LogView = LogView {
name: "mz_perf_peek_durations_aggregates",
sql: "CREATE MATERIALIZED VIEW mz_perf_peek_durations_aggregates AS SELECT worker, sum(duration_ns * count) AS sum, sum(count) AS count
sql: "CREATE VIEW mz_perf_peek_durations_aggregates AS SELECT worker, sum(duration_ns * count) AS sum, sum(count) AS count
FROM mz_catalog.mz_peek_durations lpd
GROUP BY worker",
id: GlobalId::System(53),
index_id: GlobalId::System(54),
};
7 changes: 3 additions & 4 deletions src/materialized/tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,9 @@ fn test_persistence() -> Result<(), Box<dyn Error>> {
vec![
"u6", "u1", "u4", "s27", "s23", "s55", "u2", "s31", "s25", "s35", "s57", "s3",
"s11", "s17", "s1", "s5", "s13", "s29", "s7", "u3", "u5", "s15", "s41", "s28",
"s24", "s56", "s47", "s49", "s21", "s32", "s45", "s9", "s26", "s33", "s36", "s51",
"s58", "s37", "s43", "s4", "s12", "s18", "s19", "s2", "s6", "s14", "s30", "s39",
"s53", "s8", "s16", "s42", "s48", "s50", "s22", "s46", "s34", "s52", "s10", "s38",
"s44", "s20", "s40", "s54"
"s24", "s56", "s47", "s49", "s21", "s32", "s45", "s9", "s26", "s33", "s51", "s58",
"s37", "s43", "s4", "s12", "s18", "s19", "s2", "s6", "s14", "s30", "s39", "s53",
"s8", "s16", "s22", "s10", "s20"
]
);
}
Expand Down
22 changes: 11 additions & 11 deletions test/testdrive/show.td
Original file line number Diff line number Diff line change
Expand Up @@ -223,17 +223,17 @@ mz_view_keys SYSTEM true
> SHOW FULL VIEWS FROM mz_catalog
VIEWS TYPE QUERYABLE MATERIALIZED
---------------------------------------------------------------
mz_addresses_with_unit_length SYSTEM true true
mz_dataflow_names SYSTEM true true
mz_dataflow_operator_dataflows SYSTEM true true
mz_perf_arrangement_records SYSTEM true true
mz_perf_dependency_frontiers SYSTEM true true
mz_perf_peek_durations_aggregates SYSTEM true true
mz_perf_peek_durations_bucket SYSTEM true true
mz_perf_peek_durations_core SYSTEM true true
mz_records_per_dataflow SYSTEM true true
mz_records_per_dataflow_global SYSTEM true true
mz_records_per_dataflow_operator SYSTEM true true
mz_addresses_with_unit_length SYSTEM true false
mz_dataflow_names SYSTEM true false
mz_dataflow_operator_dataflows SYSTEM true false
mz_perf_arrangement_records SYSTEM true false
mz_perf_dependency_frontiers SYSTEM true false
mz_perf_peek_durations_aggregates SYSTEM true false
mz_perf_peek_durations_bucket SYSTEM true false
mz_perf_peek_durations_core SYSTEM true false
mz_records_per_dataflow SYSTEM true false
mz_records_per_dataflow_global SYSTEM true false
mz_records_per_dataflow_operator SYSTEM true false

# test that information in shows correctly responds to materialization and unmaterialization of views

Expand Down

0 comments on commit 77664ac

Please sign in to comment.