Skip to content

Instantly share code, notes, and snippets.

@lefred
Last active August 23, 2023 15:24
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Embed
What would you like to do?
Get replication information
use sys;
DROP VIEW IF EXISTS replication_status_full;
CREATE
ALGORITHM = MERGE
SQL SECURITY INVOKER
VIEW replication_status_full
AS
SELECT
concat(s.channel_name, ' (', w.worker_id,')') AS channel,
c.host,
c.port,
c.user,
s.source_uuid,
s.group_name,
s.last_heartbeat_timestamp,
c.heartbeat_interval,
s.service_state io_state,
st.processlist_state io_thread_state,
s.last_error_number io_errno,
s.last_error_message io_errmsg,
s.last_error_timestamp io_errtime,
co.service_state co_state,
cot.processlist_state co_thread_state,
co.last_error_number co_errno,
co.last_error_message co_errmsg,
co.last_error_timestamp co_errtime,
w.service_state w_state,
wt.processlist_state w_thread_state,
w.last_error_number w_errno,
w.last_error_message w_errmsg,
w.last_error_timestamp w_errtime,
TIMEDIFF(NOW(6), IF(TIMEDIFF(s.LAST_QUEUED_TRANSACTION_START_QUEUE_TIMESTAMP,
s.LAST_HEARTBEAT_TIMESTAMP) >= 0,
s.LAST_QUEUED_TRANSACTION_START_QUEUE_TIMESTAMP,
s.LAST_HEARTBEAT_TIMESTAMP)
) as time_since_last_message,
IF(s.LAST_QUEUED_TRANSACTION=''
OR s.LAST_QUEUED_TRANSACTION=latest_w.LAST_APPLIED_TRANSACTION,
'IDLE', 'APPLYING') as applier_busy_state,
IF(co.SERVICE_STATE = 'OFF'
OR s.SERVICE_STATE = 'OFF', 'null',
IF(
GTID_SUBTRACT(s.LAST_QUEUED_TRANSACTION,
w.LAST_APPLIED_TRANSACTION) = ''
OR
UNIX_TIMESTAMP(w.APPLYING_TRANSACTION_ORIGINAL_COMMIT_TIMESTAMP) = 0,
'none',
TIMEDIFF(
NOW(6),w.APPLYING_TRANSACTION_ORIGINAL_COMMIT_TIMESTAMP
)
)
) AS lag_from_original,
IF(co.SERVICE_STATE = 'OFF'
OR s.SERVICE_STATE = 'OFF', 'null',
IF(
GTID_SUBTRACT(s.LAST_QUEUED_TRANSACTION,
w.LAST_APPLIED_TRANSACTION) = ''
OR
UNIX_TIMESTAMP(w.APPLYING_TRANSACTION_IMMEDIATE_COMMIT_TIMESTAMP) = 0,
'none',
TIMEDIFF(
NOW(6),w.APPLYING_TRANSACTION_IMMEDIATE_COMMIT_TIMESTAMP
)
)
) AS lag_from_immediate,
format_pico_time((unix_timestamp(LAST_QUEUED_TRANSACTION_START_QUEUE_TIMESTAMP) -
unix_timestamp(LAST_QUEUED_TRANSACTION_ORIGINAL_COMMIT_TIMESTAMP)) * 100000000000) transport_time,
format_pico_time((unix_timestamp(LAST_QUEUED_TRANSACTION_END_QUEUE_TIMESTAMP) -
unix_timestamp(LAST_QUEUED_TRANSACTION_START_QUEUE_TIMESTAMP)) * 1000000000000) time_to_relay_log,
format_pico_time((unix_timestamp(w.LAST_APPLIED_TRANSACTION_END_APPLY_TIMESTAMP) -
unix_timestamp(w.LAST_APPLIED_TRANSACTION_START_APPLY_TIMESTAMP)) * 1000000000000) apply_time,
w.LAST_APPLIED_TRANSACTION AS last_applied_transaction,
s.LAST_QUEUED_TRANSACTION AS last_queued_transaction,
GTID_SUBTRACT(s.RECEIVED_TRANSACTION_SET, (select variable_value from performance_schema.global_variables where variable_name='gtid_executed') ) as queued_gtid_set_to_apply
FROM performance_schema.replication_connection_configuration c
JOIN performance_schema.replication_connection_status s
ON c.channel_name = s.channel_name
LEFT JOIN performance_schema.replication_applier_status_by_coordinator co
ON c.channel_name = co.channel_name
JOIN performance_schema.replication_applier_status a
ON c.channel_name = a.channel_name
JOIN performance_schema.replication_applier_status_by_worker w
ON c.channel_name = w.channel_name
LEFT JOIN (
SELECT * FROM performance_schema.replication_applier_status_by_worker LIMIT 1
) latest_w
ON c.channel_name = latest_w.channel_name
LEFT JOIN performance_schema.threads st
ON s.thread_id = st.thread_id
LEFT JOIN performance_schema.threads cot
ON co.thread_id = cot.thread_id
LEFT JOIN performance_schema.threads wt
ON w.thread_id = wt.thread_id;
DROP VIEW IF EXISTS x$replication_status_full;
CREATE
ALGORITHM = MERGE
SQL SECURITY INVOKER
VIEW x$replication_status_full
AS
SELECT
concat(s.channel_name, ' (', w.worker_id,')') AS channel,
c.host,
c.port,
c.user,
s.source_uuid,
s.group_name,
s.last_heartbeat_timestamp,
c.heartbeat_interval,
s.service_state io_state,
st.processlist_state io_thread_state,
s.last_error_number io_errno,
s.last_error_message io_errmsg,
s.last_error_timestamp io_errtime,
co.service_state co_state,
cot.processlist_state co_thread_state,
co.last_error_number co_errno,
co.last_error_message co_errmsg,
co.last_error_timestamp co_errtime,
w.service_state w_state,
wt.processlist_state w_thread_state,
w.last_error_number w_errno,
w.last_error_message w_errmsg,
w.last_error_timestamp w_errtime,
TIMEDIFF(NOW(6), IF(TIMEDIFF(s.LAST_QUEUED_TRANSACTION_START_QUEUE_TIMESTAMP,
s.LAST_HEARTBEAT_TIMESTAMP) >= 0,
s.LAST_QUEUED_TRANSACTION_START_QUEUE_TIMESTAMP,
s.LAST_HEARTBEAT_TIMESTAMP)
) as time_since_last_message,
IF(s.LAST_QUEUED_TRANSACTION=''
OR s.LAST_QUEUED_TRANSACTION=latest_w.LAST_APPLIED_TRANSACTION,
'IDLE', 'APPLYING') as applier_busy_state,
IF(co.SERVICE_STATE = 'OFF'
OR s.SERVICE_STATE = 'OFF', 'null',
IF(
GTID_SUBTRACT(s.LAST_QUEUED_TRANSACTION,
w.LAST_APPLIED_TRANSACTION) = ''
OR
UNIX_TIMESTAMP(w.APPLYING_TRANSACTION_ORIGINAL_COMMIT_TIMESTAMP) = 0,
'none',
TIMEDIFF(
NOW(6),w.APPLYING_TRANSACTION_ORIGINAL_COMMIT_TIMESTAMP
)
)
) AS lag_from_original,
IF(co.SERVICE_STATE = 'OFF'
OR s.SERVICE_STATE = 'OFF', 'null',
IF(
GTID_SUBTRACT(s.LAST_QUEUED_TRANSACTION,
w.LAST_APPLIED_TRANSACTION) = ''
OR
UNIX_TIMESTAMP(w.APPLYING_TRANSACTION_IMMEDIATE_COMMIT_TIMESTAMP) = 0,
'none',
TIMEDIFF(
NOW(6),w.APPLYING_TRANSACTION_IMMEDIATE_COMMIT_TIMESTAMP
)
)
) AS lag_from_immediate,
((unix_timestamp(LAST_QUEUED_TRANSACTION_START_QUEUE_TIMESTAMP) -
unix_timestamp(LAST_QUEUED_TRANSACTION_ORIGINAL_COMMIT_TIMESTAMP)) * 100000000000) transport_time,
((unix_timestamp(LAST_QUEUED_TRANSACTION_END_QUEUE_TIMESTAMP) -
unix_timestamp(LAST_QUEUED_TRANSACTION_START_QUEUE_TIMESTAMP)) * 1000000000000) time_to_relay_log,
((unix_timestamp(w.LAST_APPLIED_TRANSACTION_END_APPLY_TIMESTAMP) -
unix_timestamp(w.LAST_APPLIED_TRANSACTION_START_APPLY_TIMESTAMP)) * 1000000000000) apply_time,
w.LAST_APPLIED_TRANSACTION AS last_applied_transaction,
s.LAST_QUEUED_TRANSACTION AS last_queued_transaction,
GTID_SUBTRACT(s.RECEIVED_TRANSACTION_SET, (select variable_value from performance_schema.global_variables where variable_name='gtid_executed')) as queued_gtid_set_to_apply
FROM performance_schema.replication_connection_configuration c
JOIN performance_schema.replication_connection_status s
ON c.channel_name = s.channel_name
LEFT JOIN performance_schema.replication_applier_status_by_coordinator co
ON c.channel_name = co.channel_name
JOIN performance_schema.replication_applier_status a
ON c.channel_name = a.channel_name
JOIN performance_schema.replication_applier_status_by_worker w
ON c.channel_name = w.channel_name
LEFT JOIN (
SELECT * FROM performance_schema.replication_applier_status_by_worker LIMIT 1
) latest_w
ON c.channel_name = latest_w.channel_name
LEFT JOIN performance_schema.threads st
ON s.thread_id = st.thread_id
LEFT JOIN performance_schema.threads cot
ON co.thread_id = cot.thread_id
LEFT JOIN performance_schema.threads wt
ON w.thread_id = wt.thread_id;
DROP VIEW IF EXISTS replication_status;
CREATE
ALGORITHM = MERGE
SQL SECURITY INVOKER
VIEW replication_status
AS
SELECT
concat(s.channel_name, ' (', w.worker_id,')') AS channel,
s.service_state io_state,
co.service_state co_state,
w.service_state w_state,
IF(co.SERVICE_STATE = 'OFF'
OR s.SERVICE_STATE = 'OFF', 'null',
IF(
GTID_SUBTRACT(s.LAST_QUEUED_TRANSACTION,
w.LAST_APPLIED_TRANSACTION) = ''
OR
UNIX_TIMESTAMP(w.APPLYING_TRANSACTION_ORIGINAL_COMMIT_TIMESTAMP) = 0,
'none',
TIMEDIFF(
NOW(6),w.APPLYING_TRANSACTION_ORIGINAL_COMMIT_TIMESTAMP
)
)
) AS lag_from_original,
IF(co.SERVICE_STATE = 'OFF'
OR s.SERVICE_STATE = 'OFF', 'null',
IF(
GTID_SUBTRACT(s.LAST_QUEUED_TRANSACTION,
w.LAST_APPLIED_TRANSACTION) = ''
OR
UNIX_TIMESTAMP(w.APPLYING_TRANSACTION_IMMEDIATE_COMMIT_TIMESTAMP) = 0,
'none',
TIMEDIFF(
NOW(6),w.APPLYING_TRANSACTION_IMMEDIATE_COMMIT_TIMESTAMP
)
)
) AS lag_from_immediate
FROM performance_schema.replication_connection_configuration c
JOIN performance_schema.replication_connection_status s
ON c.channel_name = s.channel_name
LEFT JOIN performance_schema.replication_applier_status_by_coordinator co
ON c.channel_name = co.channel_name
JOIN performance_schema.replication_applier_status a
ON c.channel_name = a.channel_name
JOIN performance_schema.replication_applier_status_by_worker w
ON c.channel_name = w.channel_name
LEFT JOIN (
SELECT * FROM performance_schema.replication_applier_status_by_worker LIMIT 1
) latest_w
ON c.channel_name = latest_w.channel_name
LEFT JOIN performance_schema.threads st
ON s.thread_id = st.thread_id
LEFT JOIN performance_schema.threads cot
ON co.thread_id = cot.thread_id
LEFT JOIN performance_schema.threads wt
ON w.thread_id = wt.thread_id;
DROP VIEW IF EXISTS replication_lag;
CREATE
ALGORITHM = MERGE
SQL SECURITY INVOKER
VIEW replication_lag
AS
SELECT
s.channel_name,
max(
IF(co.SERVICE_STATE = 'OFF'
OR s.SERVICE_STATE = 'OFF', 'null',
IF(
GTID_SUBTRACT(s.LAST_QUEUED_TRANSACTION,
w.LAST_APPLIED_TRANSACTION) = ''
OR
UNIX_TIMESTAMP(w.APPLYING_TRANSACTION_ORIGINAL_COMMIT_TIMESTAMP) = 0,
0,
TIMEDIFF(
NOW(6),w.APPLYING_TRANSACTION_ORIGINAL_COMMIT_TIMESTAMP
)
)
)) AS max_lag_from_original,
max(IF(co.SERVICE_STATE = 'OFF'
OR s.SERVICE_STATE = 'OFF', 'null',
IF(
GTID_SUBTRACT(s.LAST_QUEUED_TRANSACTION,
w.LAST_APPLIED_TRANSACTION) = ''
OR
UNIX_TIMESTAMP(w.APPLYING_TRANSACTION_IMMEDIATE_COMMIT_TIMESTAMP) = 0,
0,
TIMEDIFF(
NOW(6),w.APPLYING_TRANSACTION_IMMEDIATE_COMMIT_TIMESTAMP
)
)
)) AS max_lag_from_immediate
FROM performance_schema.replication_connection_configuration c
JOIN performance_schema.replication_connection_status s
ON c.channel_name = s.channel_name
LEFT JOIN performance_schema.replication_applier_status_by_coordinator co
ON c.channel_name = co.channel_name
JOIN performance_schema.replication_applier_status_by_worker w
ON c.channel_name = w.channel_name
GROUP BY 1 order by 2 desc, 3 desc;
DROP VIEW IF EXISTS replication_lag_human;
CREATE
ALGORITHM = MERGE
SQL SECURITY INVOKER
VIEW replication_lag_human
AS
SELECT channel_name, format_pico_time(MAX(seconds_behind_source)*1000000) lag_behind_source FROM (
SELECT
CHANNEL_NAME,
MAX(TIMESTAMPDIFF(MICROSECOND, APPLYING_TRANSACTION_IMMEDIATE_COMMIT_TIMESTAMP, NOW()))
AS seconds_behind_source
FROM performance_schema.replication_applier_status_by_worker GROUP BY CHANNEL_NAME
UNION
SELECT w.CHANNEL_NAME, MIN(if(GTID_SUBTRACT(LAST_QUEUED_TRANSACTION, LAST_APPLIED_TRANSACTION) = '', 0,
TIMESTAMPDIFF(MICROSECOND, LAST_APPLIED_TRANSACTION_IMMEDIATE_COMMIT_TIMESTAMP, NOW())))
AS seconds_behind_source
FROM performance_schema.replication_applier_status_by_worker AS w
JOIN performance_schema.replication_connection_status GROUP BY w.CHANNEL_NAME
) a GROUP BY CHANNEL_NAME;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment