Skip to content

Instantly share code, notes, and snippets.

@tom-clickhouse
Created December 28, 2023 16:50
Show Gist options
  • Save tom-clickhouse/2363ddd25ee96700ebcd66dcca8a8ba9 to your computer and use it in GitHub Desktop.
Save tom-clickhouse/2363ddd25ee96700ebcd66dcca8a8ba9 to your computer and use it in GitHub Desktop.
Parameterized views for monitoring async inserts
-- we use [arraySlice(groupArray(column), 1, length_parameter)] instead of [groupArray(length_parameter)(column)]
-- because the latter is currently not supported - i.e. the max_size paramter of groupArray can't be an view paramater
-- SELECT * FROM monitor_parts(db_name='default', table_name='upclick_metrics', last_x_minutes=10,last_m_parts_per_node_per_partition=4)
CREATE OR REPLACE VIEW monitor_parts AS
WITH
{db_name:String} AS db_name,
{table_name:String} AS table_name,
{last_x_minutes:UInt32} AS last_x_minutes,
{last_m_parts_per_node_per_partition:UInt32} AS last_m_parts_per_node_per_partition,
T_PART_LOG AS (
SELECT
hostName() AS node_name,
event_time,
part_name,
rows,
size_in_bytes AS bytes_on_disk,
partition_id,
part_type
FROM
clusterAllReplicas(default, system.part_log)
WHERE
database = db_name
AND table = table_name
AND event_time > now() - toIntervalMinute(last_x_minutes)
and event_type = 'NewPart' -- Replicas have event_type 'DownloadPart', therefore we only see the new parts here on the node that got the insert
),
T_PARTS AS (
SELECT
name,
partition AS opt_partition_name,
-- part_storage_type AS opt_part_storage_type,
data_compressed_bytes AS opt_data_compressed_bytes,
data_uncompressed_bytes AS opt_data_uncompressed_bytes
FROM
system.parts
WHERE
database = db_name
AND table = table_name
),
T_BOTH AS (
SELECT
node_name,
event_time,
part_name,
rows,
bytes_on_disk,
opt_data_compressed_bytes,
opt_data_uncompressed_bytes,
partition_id,
opt_partition_name,
part_type
-- ,opt_part_storage_type
FROM T_PART_LOG tpl LEFT OUTER JOIN T_PARTS tp ON tpl.part_name = tp.name
ORDER BY
event_time DESC
),
T1 AS (
SELECT
node_name,
partition_id,
any(opt_partition_name) AS opt_partition_name,
arraySlice(groupArray(event_time), 1, last_m_parts_per_node_per_partition) AS event_time_,
arrayPushBack(arrayMap(d -> formatReadableTimeDelta(d), arrayPopFront(arrayDifference(arrayMap(t -> -1 * toUnixTimestamp(t) ,event_time_)))), '-') AS diff_,
arraySlice(groupArray(rows), 1, last_m_parts_per_node_per_partition) AS rows_,
arraySlice(groupArray(part_name), 1, last_m_parts_per_node_per_partition) AS part_name_,
arraySlice(groupArray(bytes_on_disk), 1, last_m_parts_per_node_per_partition) AS bytes_on_disk_,
arraySlice(groupArray(opt_data_compressed_bytes), 1, last_m_parts_per_node_per_partition) AS opt_data_compressed_bytes_,
arraySlice(groupArray(opt_data_uncompressed_bytes), 1, last_m_parts_per_node_per_partition) AS opt_data_uncompressed_bytes_,
arraySlice(groupArray(part_type), 1, last_m_parts_per_node_per_partition) AS part_type_
-- ,groupArray(last_m_parts_per_node_per_partition)(opt_part_storage_type) AS opt_part_storage_type_
FROM T_BOTH
GROUP BY node_name, partition_id
),
T2 AS (
SELECT
DENSE_RANK() OVER(ORDER BY node_name) AS host_id,
DENSE_RANK() OVER(ORDER BY partition_id) AS partition_id,
--partition_id,
opt_partition_name,
formatDateTime(event_time_, '%T') AS event_time,
diff AS duration_since_prev_part,
rows,
bytes_on_disk,
opt_data_compressed_bytes,
opt_data_uncompressed_bytes,
part_type
-- ,opt_part_storage_type
FROM
T1 ARRAY JOIN
event_time_ AS event_time_,
diff_ AS diff,
rows_ AS rows,
bytes_on_disk_ AS bytes_on_disk,
opt_data_compressed_bytes_ AS opt_data_compressed_bytes,
opt_data_uncompressed_bytes_ AS opt_data_uncompressed_bytes,
part_type_ AS part_type
-- ,opt_part_storage_type_ AS opt_part_storage_type
ORDER BY
host_id ASC,
partition_id ASC,
event_time_ DESC
)
SELECT
host_id AS n,
replaceOne(opt_partition_name, 'tuple()', '') AS ptn,
partition_id AS ptn_id,
event_time AS write,
replaceOne(duration_since_prev_part, 'seconds', 's') AS prev,
rows,
formatReadableSize(bytes_on_disk) AS on_disk
,formatReadableSize(opt_data_compressed_bytes) AS data_compressed
,formatReadableSize(opt_data_uncompressed_bytes) AS data_uncompressed
,part_type AS type
-- ,opt_part_storage_type AS storage
FROM T2
SETTINGS skip_unavailable_shards = 1;
-- SELECT * FROM monitor_flushes(db_name='default', table_name='upclick_metrics', last_x_minutes=10,last_m_flushes_per_node_per_shape_per_settings=4)
CREATE OR REPLACE VIEW monitor_flushes AS
WITH
{db_name:String} AS db_name,
{table_name:String} AS table_name,
{last_x_minutes:UInt32} AS last_x_minutes,
{last_m_flushes_per_node_per_shape_per_settings:UInt32} AS last_m_flushes_per_node_per_shape_per_settings,
T_ASYNC_INSERT_LOG AS (
SELECT
hostName() AS host_name,
query,
format,
query_id,
bytes,
rows,
flush_time,
flush_query_id
FROM
clusterAllReplicas(default, system.asynchronous_insert_log)
WHERE
status = 'Ok'
AND database = db_name
AND table = table_name
AND flush_time > now() - toIntervalMinute(last_x_minutes)
),
T_QUERY_LOG AS (
SELECT
query_id,
normalized_query_hash,
Settings,
cityHash64(Settings) AS settings_hash,
user,
client_name,
http_user_agent
FROM
clusterAllReplicas(default, system.query_log)
WHERE
has(tables, db_name || '.' || table_name)
AND query_kind = 'Insert'
AND type = 'QueryFinish'
AND event_time > now() - toIntervalMinute(last_x_minutes)
),
T_BOTH AS (
SELECT
*
FROM T_ASYNC_INSERT_LOG tail JOIN T_QUERY_LOG tql ON tail.query_id = tql.query_id
),
-- group matches by flush_query_id to get one row per buffer flush
T1 AS (
SELECT
any(host_name) AS host_name,
any(query) AS sample_query,
arraySort(arrayDistinct(groupArray(format))) AS formats,
groupArray(query_id) AS query_ids,
sum(bytes) AS data_uncompressed_bytes,
sum(rows) AS rows,
any(flush_time) AS flush_time,
count() AS queries,
--
any(normalized_query_hash) AS normalized_query_hash,
any(Settings) AS Settings,
any(settings_hash) AS settings_hash,
arraySort(arrayDistinct(groupArray(user))) AS users,
arraySort(arrayDistinct(groupArray(client_name))) AS client_names,
arraySort(arrayDistinct(groupArray(http_user_agent))) AS http_user_agents
FROM
T_BOTH
GROUP BY
flush_query_id
ORDER BY
host_name,
normalized_query_hash, -- it is (per table) per query shape per settings per host, so by ordering by it, it becomes clear
settings_hash,
flush_time DESC
),
-- but it is too much rows, it is hard to get a quick overview of all involved hosts and query shapes and settings
-- therefore it would be nice if we could see only n last_m_flushes_per_node_per_shape_per_settings
-- that is easy to formulate in ClickHouse - we can group by host and query shape and settings
-- and then use groupArray in order to select n last_m_flushes_per_node_per_shape_per_settings
T2 AS (
SELECT
host_name,
normalized_query_hash,
settings_hash,
arraySlice(groupArray(flush_time), 1, last_m_flushes_per_node_per_shape_per_settings) AS flush_time_,
arrayPushBack(arrayMap(d -> formatReadableTimeDelta(d), arrayPopFront(arrayDifference(arrayMap(t -> -1 * toUnixTimestamp(t) ,flush_time_)))), '-') AS diff_,
arraySlice(groupArray(queries), 1, last_m_flushes_per_node_per_shape_per_settings) AS queries_,
arraySlice(groupArray(data_uncompressed_bytes), 1, last_m_flushes_per_node_per_shape_per_settings) AS data_uncompressed_bytes_,
arraySlice(groupArray(rows), 1, last_m_flushes_per_node_per_shape_per_settings) AS rows_,
any(sample_query) AS sample_query,
arraySlice(groupArray(formats), 1, last_m_flushes_per_node_per_shape_per_settings) AS formats_,
arraySlice(groupArray(query_ids), 1, last_m_flushes_per_node_per_shape_per_settings) AS query_ids_,
any(Settings) AS Settings,
arraySlice(groupArray(users), 1, last_m_flushes_per_node_per_shape_per_settings) AS users_,
arraySlice(groupArray(client_names), 1, last_m_flushes_per_node_per_shape_per_settings) AS client_names_,
arraySlice(groupArray(http_user_agents), 1, last_m_flushes_per_node_per_shape_per_settings) AS http_user_agents_
FROM
T1
GROUP BY
host_name,
normalized_query_hash,
settings_hash
),
-- now for easier readability bring everything back into tabular format by using the inverse of groupArray: ARRAY JOIN
T3 AS (
SELECT
DENSE_RANK() OVER(ORDER BY host_name) AS host_id,
-- normalized_query_hash,
-- settings_hash,
DENSE_RANK() OVER(ORDER BY normalized_query_hash) AS query_shape_id,
DENSE_RANK() OVER(ORDER BY settings_hash) AS settings_id,
formatDateTime(flush_time_, '%T') AS flush_time,
diff AS duration_since_prev_flush,
queries,
data_uncompressed_bytes,
rows,
sample_query,
Settings,
query_ids,
formats,
users,
client_names,
http_user_agents
FROM
T2 ARRAY JOIN
flush_time_ AS flush_time_,
diff_ AS diff,
queries_ AS queries,
data_uncompressed_bytes_ AS data_uncompressed_bytes,
rows_ AS rows,
query_ids_ AS query_ids,
formats_ AS formats,
users_ AS users,
client_names_ AS client_names,
http_user_agents_ AS http_user_agents
ORDER BY
host_name,
query_shape_id,
settings_id,
flush_time DESC
)
SELECT
host_id AS n,
query_shape_id AS q,
settings_id AS s,
flush_time AS flush,
replaceOne(duration_since_prev_flush, 'seconds', 's') AS prev,
rows,
formatReadableSize(data_uncompressed_bytes) AS data
,sample_query
,Settings AS sample_settings
-- ,query_ids
-- ,formats
-- ,users
-- ,client_names
-- ,http_user_agents
FROM
T3
SETTINGS skip_unavailable_shards = 1;
-- SELECT * FROM monitor_flush_errors(db_name='default', table_name='upclick_metrics', last_x_minutes=10)
CREATE OR REPLACE VIEW monitor_flush_errors AS
WITH
{db_name:String} AS db_name,
{table_name:String} AS table_name,
{last_x_minutes:UInt32} AS last_x_minutes
SELECT
DENSE_RANK() OVER(ORDER BY hostName()) AS n,
max(event_time) AS flush,
status,
exception,
any(query_id) AS query_id
FROM
clusterAllReplicas(default, system.asynchronous_insert_log)
WHERE
status <> 'Ok'
AND database = db_name AND table = table_name
AND event_time > now() - toIntervalMinute(last_x_minutes)
GROUP BY hostName(), status, exception
ORDER BY
hostName(), flush DESC;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment