-
-
Save tom-clickhouse/2363ddd25ee96700ebcd66dcca8a8ba9 to your computer and use it in GitHub Desktop.
Parameterized views for monitoring async inserts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- we use [arraySlice(groupArray(column), 1, length_parameter)] instead of [groupArray(length_parameter)(column)] | |
-- because the latter is currently not supported - i.e. the max_size paramter of groupArray can't be an view paramater | |
-- SELECT * FROM monitor_parts(db_name='default', table_name='upclick_metrics', last_x_minutes=10,last_m_parts_per_node_per_partition=4) | |
CREATE OR REPLACE VIEW monitor_parts AS | |
WITH | |
{db_name:String} AS db_name, | |
{table_name:String} AS table_name, | |
{last_x_minutes:UInt32} AS last_x_minutes, | |
{last_m_parts_per_node_per_partition:UInt32} AS last_m_parts_per_node_per_partition, | |
T_PART_LOG AS ( | |
SELECT | |
hostName() AS node_name, | |
event_time, | |
part_name, | |
rows, | |
size_in_bytes AS bytes_on_disk, | |
partition_id, | |
part_type | |
FROM | |
clusterAllReplicas(default, system.part_log) | |
WHERE | |
database = db_name | |
AND table = table_name | |
AND event_time > now() - toIntervalMinute(last_x_minutes) | |
and event_type = 'NewPart' -- Replicas have event_type 'DownloadPart', therefore we only see the new parts here on the node that got the insert | |
), | |
T_PARTS AS ( | |
SELECT | |
name, | |
partition AS opt_partition_name, | |
-- part_storage_type AS opt_part_storage_type, | |
data_compressed_bytes AS opt_data_compressed_bytes, | |
data_uncompressed_bytes AS opt_data_uncompressed_bytes | |
FROM | |
system.parts | |
WHERE | |
database = db_name | |
AND table = table_name | |
), | |
T_BOTH AS ( | |
SELECT | |
node_name, | |
event_time, | |
part_name, | |
rows, | |
bytes_on_disk, | |
opt_data_compressed_bytes, | |
opt_data_uncompressed_bytes, | |
partition_id, | |
opt_partition_name, | |
part_type | |
-- ,opt_part_storage_type | |
FROM T_PART_LOG tpl LEFT OUTER JOIN T_PARTS tp ON tpl.part_name = tp.name | |
ORDER BY | |
event_time DESC | |
), | |
T1 AS ( | |
SELECT | |
node_name, | |
partition_id, | |
any(opt_partition_name) AS opt_partition_name, | |
arraySlice(groupArray(event_time), 1, last_m_parts_per_node_per_partition) AS event_time_, | |
arrayPushBack(arrayMap(d -> formatReadableTimeDelta(d), arrayPopFront(arrayDifference(arrayMap(t -> -1 * toUnixTimestamp(t) ,event_time_)))), '-') AS diff_, | |
arraySlice(groupArray(rows), 1, last_m_parts_per_node_per_partition) AS rows_, | |
arraySlice(groupArray(part_name), 1, last_m_parts_per_node_per_partition) AS part_name_, | |
arraySlice(groupArray(bytes_on_disk), 1, last_m_parts_per_node_per_partition) AS bytes_on_disk_, | |
arraySlice(groupArray(opt_data_compressed_bytes), 1, last_m_parts_per_node_per_partition) AS opt_data_compressed_bytes_, | |
arraySlice(groupArray(opt_data_uncompressed_bytes), 1, last_m_parts_per_node_per_partition) AS opt_data_uncompressed_bytes_, | |
arraySlice(groupArray(part_type), 1, last_m_parts_per_node_per_partition) AS part_type_ | |
-- ,groupArray(last_m_parts_per_node_per_partition)(opt_part_storage_type) AS opt_part_storage_type_ | |
FROM T_BOTH | |
GROUP BY node_name, partition_id | |
), | |
T2 AS ( | |
SELECT | |
DENSE_RANK() OVER(ORDER BY node_name) AS host_id, | |
DENSE_RANK() OVER(ORDER BY partition_id) AS partition_id, | |
--partition_id, | |
opt_partition_name, | |
formatDateTime(event_time_, '%T') AS event_time, | |
diff AS duration_since_prev_part, | |
rows, | |
bytes_on_disk, | |
opt_data_compressed_bytes, | |
opt_data_uncompressed_bytes, | |
part_type | |
-- ,opt_part_storage_type | |
FROM | |
T1 ARRAY JOIN | |
event_time_ AS event_time_, | |
diff_ AS diff, | |
rows_ AS rows, | |
bytes_on_disk_ AS bytes_on_disk, | |
opt_data_compressed_bytes_ AS opt_data_compressed_bytes, | |
opt_data_uncompressed_bytes_ AS opt_data_uncompressed_bytes, | |
part_type_ AS part_type | |
-- ,opt_part_storage_type_ AS opt_part_storage_type | |
ORDER BY | |
host_id ASC, | |
partition_id ASC, | |
event_time_ DESC | |
) | |
SELECT | |
host_id AS n, | |
replaceOne(opt_partition_name, 'tuple()', '') AS ptn, | |
partition_id AS ptn_id, | |
event_time AS write, | |
replaceOne(duration_since_prev_part, 'seconds', 's') AS prev, | |
rows, | |
formatReadableSize(bytes_on_disk) AS on_disk | |
,formatReadableSize(opt_data_compressed_bytes) AS data_compressed | |
,formatReadableSize(opt_data_uncompressed_bytes) AS data_uncompressed | |
,part_type AS type | |
-- ,opt_part_storage_type AS storage | |
FROM T2 | |
SETTINGS skip_unavailable_shards = 1; | |
-- SELECT * FROM monitor_flushes(db_name='default', table_name='upclick_metrics', last_x_minutes=10,last_m_flushes_per_node_per_shape_per_settings=4) | |
CREATE OR REPLACE VIEW monitor_flushes AS | |
WITH | |
{db_name:String} AS db_name, | |
{table_name:String} AS table_name, | |
{last_x_minutes:UInt32} AS last_x_minutes, | |
{last_m_flushes_per_node_per_shape_per_settings:UInt32} AS last_m_flushes_per_node_per_shape_per_settings, | |
T_ASYNC_INSERT_LOG AS ( | |
SELECT | |
hostName() AS host_name, | |
query, | |
format, | |
query_id, | |
bytes, | |
rows, | |
flush_time, | |
flush_query_id | |
FROM | |
clusterAllReplicas(default, system.asynchronous_insert_log) | |
WHERE | |
status = 'Ok' | |
AND database = db_name | |
AND table = table_name | |
AND flush_time > now() - toIntervalMinute(last_x_minutes) | |
), | |
T_QUERY_LOG AS ( | |
SELECT | |
query_id, | |
normalized_query_hash, | |
Settings, | |
cityHash64(Settings) AS settings_hash, | |
user, | |
client_name, | |
http_user_agent | |
FROM | |
clusterAllReplicas(default, system.query_log) | |
WHERE | |
has(tables, db_name || '.' || table_name) | |
AND query_kind = 'Insert' | |
AND type = 'QueryFinish' | |
AND event_time > now() - toIntervalMinute(last_x_minutes) | |
), | |
T_BOTH AS ( | |
SELECT | |
* | |
FROM T_ASYNC_INSERT_LOG tail JOIN T_QUERY_LOG tql ON tail.query_id = tql.query_id | |
), | |
-- group matches by flush_query_id to get one row per buffer flush | |
T1 AS ( | |
SELECT | |
any(host_name) AS host_name, | |
any(query) AS sample_query, | |
arraySort(arrayDistinct(groupArray(format))) AS formats, | |
groupArray(query_id) AS query_ids, | |
sum(bytes) AS data_uncompressed_bytes, | |
sum(rows) AS rows, | |
any(flush_time) AS flush_time, | |
count() AS queries, | |
-- | |
any(normalized_query_hash) AS normalized_query_hash, | |
any(Settings) AS Settings, | |
any(settings_hash) AS settings_hash, | |
arraySort(arrayDistinct(groupArray(user))) AS users, | |
arraySort(arrayDistinct(groupArray(client_name))) AS client_names, | |
arraySort(arrayDistinct(groupArray(http_user_agent))) AS http_user_agents | |
FROM | |
T_BOTH | |
GROUP BY | |
flush_query_id | |
ORDER BY | |
host_name, | |
normalized_query_hash, -- it is (per table) per query shape per settings per host, so by ordering by it, it becomes clear | |
settings_hash, | |
flush_time DESC | |
), | |
-- but it is too much rows, it is hard to get a quick overview of all involved hosts and query shapes and settings | |
-- therefore it would be nice if we could see only n last_m_flushes_per_node_per_shape_per_settings | |
-- that is easy to formulate in ClickHouse - we can group by host and query shape and settings | |
-- and then use groupArray in order to select n last_m_flushes_per_node_per_shape_per_settings | |
T2 AS ( | |
SELECT | |
host_name, | |
normalized_query_hash, | |
settings_hash, | |
arraySlice(groupArray(flush_time), 1, last_m_flushes_per_node_per_shape_per_settings) AS flush_time_, | |
arrayPushBack(arrayMap(d -> formatReadableTimeDelta(d), arrayPopFront(arrayDifference(arrayMap(t -> -1 * toUnixTimestamp(t) ,flush_time_)))), '-') AS diff_, | |
arraySlice(groupArray(queries), 1, last_m_flushes_per_node_per_shape_per_settings) AS queries_, | |
arraySlice(groupArray(data_uncompressed_bytes), 1, last_m_flushes_per_node_per_shape_per_settings) AS data_uncompressed_bytes_, | |
arraySlice(groupArray(rows), 1, last_m_flushes_per_node_per_shape_per_settings) AS rows_, | |
any(sample_query) AS sample_query, | |
arraySlice(groupArray(formats), 1, last_m_flushes_per_node_per_shape_per_settings) AS formats_, | |
arraySlice(groupArray(query_ids), 1, last_m_flushes_per_node_per_shape_per_settings) AS query_ids_, | |
any(Settings) AS Settings, | |
arraySlice(groupArray(users), 1, last_m_flushes_per_node_per_shape_per_settings) AS users_, | |
arraySlice(groupArray(client_names), 1, last_m_flushes_per_node_per_shape_per_settings) AS client_names_, | |
arraySlice(groupArray(http_user_agents), 1, last_m_flushes_per_node_per_shape_per_settings) AS http_user_agents_ | |
FROM | |
T1 | |
GROUP BY | |
host_name, | |
normalized_query_hash, | |
settings_hash | |
), | |
-- now for easier readability bring everything back into tabular format by using the inverse of groupArray: ARRAY JOIN | |
T3 AS ( | |
SELECT | |
DENSE_RANK() OVER(ORDER BY host_name) AS host_id, | |
-- normalized_query_hash, | |
-- settings_hash, | |
DENSE_RANK() OVER(ORDER BY normalized_query_hash) AS query_shape_id, | |
DENSE_RANK() OVER(ORDER BY settings_hash) AS settings_id, | |
formatDateTime(flush_time_, '%T') AS flush_time, | |
diff AS duration_since_prev_flush, | |
queries, | |
data_uncompressed_bytes, | |
rows, | |
sample_query, | |
Settings, | |
query_ids, | |
formats, | |
users, | |
client_names, | |
http_user_agents | |
FROM | |
T2 ARRAY JOIN | |
flush_time_ AS flush_time_, | |
diff_ AS diff, | |
queries_ AS queries, | |
data_uncompressed_bytes_ AS data_uncompressed_bytes, | |
rows_ AS rows, | |
query_ids_ AS query_ids, | |
formats_ AS formats, | |
users_ AS users, | |
client_names_ AS client_names, | |
http_user_agents_ AS http_user_agents | |
ORDER BY | |
host_name, | |
query_shape_id, | |
settings_id, | |
flush_time DESC | |
) | |
SELECT | |
host_id AS n, | |
query_shape_id AS q, | |
settings_id AS s, | |
flush_time AS flush, | |
replaceOne(duration_since_prev_flush, 'seconds', 's') AS prev, | |
rows, | |
formatReadableSize(data_uncompressed_bytes) AS data | |
,sample_query | |
,Settings AS sample_settings | |
-- ,query_ids | |
-- ,formats | |
-- ,users | |
-- ,client_names | |
-- ,http_user_agents | |
FROM | |
T3 | |
SETTINGS skip_unavailable_shards = 1; | |
-- SELECT * FROM monitor_flush_errors(db_name='default', table_name='upclick_metrics', last_x_minutes=10) | |
CREATE OR REPLACE VIEW monitor_flush_errors AS | |
WITH | |
{db_name:String} AS db_name, | |
{table_name:String} AS table_name, | |
{last_x_minutes:UInt32} AS last_x_minutes | |
SELECT | |
DENSE_RANK() OVER(ORDER BY hostName()) AS n, | |
max(event_time) AS flush, | |
status, | |
exception, | |
any(query_id) AS query_id | |
FROM | |
clusterAllReplicas(default, system.asynchronous_insert_log) | |
WHERE | |
status <> 'Ok' | |
AND database = db_name AND table = table_name | |
AND event_time > now() - toIntervalMinute(last_x_minutes) | |
GROUP BY hostName(), status, exception | |
ORDER BY | |
hostName(), flush DESC; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment