Skip to content

Instantly share code, notes, and snippets.

@lefred
Last active October 14, 2022 07:54
Show Gist options
  • Save lefred/73fa7ed87acf3c9c4510c21a63850ef5 to your computer and use it in GitHub Desktop.
Save lefred/73fa7ed87acf3c9c4510c21a63850ef5 to your computer and use it in GitHub Desktop.
Fn Application to store slow query log to Object Storage
import io
import json
import logging
import oci
import base64
from mysql.connector import connection
from datetime import datetime
from fdk import response
def handler(ctx, data: io.BytesIO = None):
try:
cfg = ctx.Config()
obs_bucket = cfg["bucket"]
obs_namespace = cfg["namespace"]
oci_fingerprint = cfg["oci_fingerprint"]
oci_region = cfg["oci_region"]
oci_user = cfg["oci_user"]
oci_tenancy = cfg["oci_tenancy"]
oci_key = cfg["oci_key"]
except (Exception, ValueError) as ex:
logging.getLogger().error("ERROR: Missing configuration key", ex)
raise
try:
body = json.loads(data.getvalue())
mds_host = body.get("mds_host")
mds_port = body.get("mds_port")
mds_user = body.get("mds_user")
mds_pwd = body.get("mds_password")
mds_name = body.get("mds_name")
slowlog_name = mds_name.replace(".", "_")
slowlog_name = slowlog_name.replace(" ", "_")
slowlog_name = "slow_{}_{}.json".format(
slowlog_name, datetime.utcnow().strftime("%Y%m%d%H%M")
)
except (Exception, ValueError) as ex:
logging.getLogger().error("ERROR: Missing parameter", ex)
raise
try:
with open("/tmp/.oci_config", "w") as f:
f.write("[DEFAULT]\n")
f.write("user={}\n".format(oci_user))
f.write("fingerprint={}\n".format(oci_fingerprint))
f.write("tenancy={}\n".format(oci_tenancy))
f.write("region={}\n".format(oci_region))
f.write("key_file=/tmp/key.pem\n")
with open("/tmp/key.pem", "w") as g:
g.write(base64.b64decode(oci_key.encode("ascii")).decode("ascii"))
except (Exception, ValueError) as ex:
logging.getLogger().error("ERROR: Problem creating OCI config", ex)
raise
logging.getLogger().info("Inside Python MySQL Slow Log function")
try:
cnx = connection.MySQLConnection(
user=mds_user,
password=mds_pwd,
host=mds_host,
port=mds_port,
database="performance_schema",
)
cursor = cnx.cursor(dictionary=True)
except (Exception, ValueError) as ex:
logging.getLogger().error("ERROR: Problem connecting to MySQL", ex)
raise
query = "update performance_schema.setup_consumers set ENABLED='YES' where NAME like 'events_statements_histo%'"
logging.getLogger().info("Enabling consumers")
try:
cursor.execute(query)
except (Exception, ValueError) as ex:
logging.getLogger().error("ERROR: Problem executing the query", ex)
raise
query = """select *, concat(date_sub(now(),
INTERVAL (
select VARIABLE_VALUE from performance_schema.global_status
where variable_name='UPTIME')-TIMER_START*10e-13 second)) start_time,
concat(timer_wait/1e+9) timer_wait_ms, concat(round(timer_wait/1e+12,6)) timer_wait_s,
concat(round(lock_time/1e+12,6)) lock_time_s,
format_pico_time(timer_wait) wait_human,
concat(round(unix_timestamp(date_sub(now(),INTERVAL (
select VARIABLE_VALUE from performance_schema.global_status
where variable_name='UPTIME')-TIMER_START*10e-13 second)))) timestamp_rnd,
concat(unix_timestamp(date_sub(now(),INTERVAL (
select VARIABLE_VALUE from performance_schema.global_status
where variable_name='UPTIME')-TIMER_START*10e-13 second))) timestamp
from performance_schema.events_statements_history_long"""
cursor.execute(query)
logging.getLogger().info("Query to retrieve all statements")
content_log = json.dumps(cursor.fetchall())
logging.getLogger().info(
"Data fetched from Performance_Schema... let's truncate it now"
)
query = "truncate table performance_schema.events_statements_history_long"
cursor.execute(query)
config = oci.config.from_file("/tmp/.oci_config")
logging.getLogger().info("Connecting to Object Storage")
object_storage_client = oci.object_storage.ObjectStorageClient(config)
namespace = object_storage_client.get_namespace().data
logging.getLogger().info("Storing to Object Storage {}".format(slowlog_name))
object_storage_client.put_object(namespace, obs_bucket, slowlog_name, content_log)
return response.Response(
ctx,
response_data=json.dumps(
{"message": "MySQL Slow Log saved: {}".format(slowlog_name)}
),
headers={"Content-Type": "application/json"},
)
import io
import json
import logging
import oci
import base64
from mysql.connector import connection
from datetime import datetime
from fdk import response
def handler(ctx, data: io.BytesIO = None):
try:
cfg = ctx.Config()
obs_bucket = cfg["bucket"]
obs_namespace = cfg["namespace"]
oci_fingerprint = cfg["oci_fingerprint"]
oci_region = cfg["oci_region"]
oci_user = cfg["oci_user"]
oci_tenancy = cfg["oci_tenancy"]
oci_key = cfg["oci_key"]
except (Exception, ValueError) as ex:
logging.getLogger().error("ERROR: Missing configuration key", ex)
raise
try:
body = json.loads(data.getvalue())
mds_host = body.get("mds_host")
mds_port = body.get("mds_port")
mds_user = body.get("mds_user")
mds_pwd = body.get("mds_password")
mds_name = body.get("mds_name")
slowlog_name = mds_name.replace(".", "_")
slowlog_name = slowlog_name.replace(" ", "_")
slowlog_name = "slow_{}_{}.log".format(
slowlog_name, datetime.utcnow().strftime("%Y%m%d%H%M")
)
except (Exception, ValueError) as ex:
logging.getLogger().error("ERROR: Missing parameter", ex)
raise
try:
with open("/tmp/.oci_config", "w") as f:
f.write("[DEFAULT]\n")
f.write("user={}\n".format(oci_user))
f.write("fingerprint={}\n".format(oci_fingerprint))
f.write("tenancy={}\n".format(oci_tenancy))
f.write("region={}\n".format(oci_region))
f.write("key_file=/tmp/key.pem\n")
with open("/tmp/key.pem", "w") as g:
g.write(base64.b64decode(oci_key.encode("ascii")).decode("ascii"))
except (Exception, ValueError) as ex:
logging.getLogger().error("ERROR: Problem creating OCI config", ex)
raise
logging.getLogger().info("Inside Python MySQL Slow Log function")
try:
cnx = connection.MySQLConnection(
user=mds_user,
password=mds_pwd,
host=mds_host,
port=mds_port,
database="performance_schema",
)
cursor = cnx.cursor(dictionary=True)
except (Exception, ValueError) as ex:
logging.getLogger().error("ERROR: Problem connecting to MySQL", ex)
raise
query = "update performance_schema.setup_consumers set ENABLED='YES' where NAME like 'events_statements_histo%'"
logging.getLogger().info("Enabling consumers")
try:
cursor.execute(query)
except (Exception, ValueError) as ex:
logging.getLogger().error("ERROR: Problem executing the query", ex)
raise
query = """select *, concat(date_sub(now(),
INTERVAL (
select VARIABLE_VALUE from performance_schema.global_status
where variable_name='UPTIME')-TIMER_START*10e-13 second)) start_time,
concat(timer_wait/1e+9) timer_wait_ms, concat(round(timer_wait/1e+12,6)) timer_wait_s,
concat(round(lock_time/1e+12,6)) lock_time_s,
format_pico_time(timer_wait) wait_human,
concat(round(unix_timestamp(date_sub(now(),INTERVAL (
select VARIABLE_VALUE from performance_schema.global_status
where variable_name='UPTIME')-TIMER_START*10e-13 second)))) timestamp_rnd,
concat(unix_timestamp(date_sub(now(),INTERVAL (
select VARIABLE_VALUE from performance_schema.global_status
where variable_name='UPTIME')-TIMER_START*10e-13 second))) timestamp
from performance_schema.events_statements_history_long"""
cursor.execute(query)
logging.getLogger().info("Query to retrieve all statements")
content_log = ""
rows = cursor.fetchall()
logging.getLogger().info(
"Data fetched from Performance_Schema... let's truncate it now"
)
query = "truncate table performance_schema.events_statements_history_long"
cursor.execute(query)
for row in rows:
if row["SQL_TEXT"]:
log_time = datetime.strptime(row["start_time"], "%Y-%m-%d %H:%M:%S.%f")
content_log = content_log + "# Time: {}Z\n".format(log_time.isoformat("T"))
content_log = content_log + "# User@Host: n/a [] @ n/a [] Id: {}\n".format(
row["THREAD_ID"]
)
content_log = (
content_log
+ "# Query_time: {} Lock_time: {} Rows_sent: {} Rows_examined: {} Rows_affected: {}\n".format(
row["timer_wait_s"],
row["lock_time_s"],
row["ROWS_SENT"],
row["ROWS_EXAMINED"],
row["ROWS_AFFECTED"],
)
)
content_log = (
content_log
+ "# Bytes_sent: n/a Tmp_tables: {} Tmp_disk_tables: {} Tmp_table_sizes: n/a\n".format(
row["CREATED_TMP_TABLES"], row["CREATED_TMP_DISK_TABLES"]
)
)
content_log = (
content_log
+ "# Full_scan: {} Full_join: {} Tmp_table: {} Tmp_table_on_disk: {}\n".format(
["no", "yes"][int(row["SELECT_SCAN"]) > 0],
["no", "yes"][int(row["SELECT_FULL_JOIN"]) > 0],
["no", "yes"][int(row["CREATED_TMP_TABLES"]) > 0],
["no", "yes"][int(row["CREATED_TMP_DISK_TABLES"]) > 0],
)
)
content_log = (
content_log
+ "# Merge_passes: {} Execution_engine: {}\n".format(
row["SORT_MERGE_PASSES"], row["EXECUTION_ENGINE"]
)
)
content_log = (
content_log
+ "# No_index_used: {} Cpu_time: {}\n".format(
["no", "yes"][int(row["NO_INDEX_USED"]) > 0],
row["CPU_TIME"],
)
)
if 'MAX_TOTAL_MEMORY' in row.keys():
content_log = (
content_log
+ "# Max_memory: {}\n".format(
row["MAX_TOTAL_MEMORY"],
)
)
content_log = content_log + "SET timestamp={};\n".format(
row["timestamp_rnd"]
)
content_log = content_log + "{};\n".format(row["SQL_TEXT"])
config = oci.config.from_file("/tmp/.oci_config")
logging.getLogger().info("Connecting to Object Storage")
object_storage_client = oci.object_storage.ObjectStorageClient(config)
namespace = object_storage_client.get_namespace().data
logging.getLogger().info("Storing to Object Storage {}".format(slowlog_name))
object_storage_client.put_object(namespace, obs_bucket, slowlog_name, content_log)
return response.Response(
ctx,
response_data=json.dumps(
{"message": "MySQL Slow Log saved: {}".format(slowlog_name)}
),
headers={"Content-Type": "application/json"},
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment