Skip to content

Instantly share code, notes, and snippets.

@mjpowersjr
Last active October 10, 2020 08:25
Show Gist options
  • Star 21 You must be signed in to star a gist
  • Fork 6 You must be signed in to fork a gist
  • Save mjpowersjr/740a9583e9ec8b49e0a3 to your computer and use it in GitHub Desktop.
Save mjpowersjr/740a9583e9ec8b49e0a3 to your computer and use it in GitHub Desktop.
Parsing the MySQL slow query log via Logstash (the easy way?)

The MySQL slow query log is a difficult format to extract information from. After looking at various examples with mixed results, I realized that it's much easier to configure MySQL to write the slow query log to a table in CSV format!

From the MySQL documentation:

By default, the log tables use the CSV storage engine that writes data in comma-separated values format. For users who have access to the .CSV files that contain log table data, the files are easy to import into other programs such as spreadsheets that can process CSV input.

my.cnf

Note: don't forget to open up permissions on your slow query log CSV file so logstash can read it!

# enable slow query log
slow_query_log = 1
# configure log output to CSV table
log_output = table

# optionally tweak slow query log behavior
#log_queries_not_using_indexes = 1
#long_query_time = 10
#min_examined_row_limit = 100

mysql_slow_log.conf

Example logstash configuration:

input {
  file {
    # slow query table is located at:
    #   <mysql datadir>/mysql/slow_log.CSV
    path => "/var/lib/mysql/mysql/slow_log.CSV"
    type => "mysql_slow_query"
    start_position => "beginning"
  }
}

filter {
  # mysql escapes double quotes with backslashes, but
  # ruby expects pairs of double quotes
  mutate { gsub => [ "message", '\\"', '""' ] }

  csv {
    columns => [ "start_time", "user_host", "query_time", "lock_time",
                 "rows_sent", "rows_examined", "db", "last_insert_id",
                 "insert_id", "server_id", "sql_text" ]
  }

  # convert various fields to integer
  mutate { convert => [ "rows_sent", "integer" ] }
  mutate { convert => [ "rows_examined", "integer" ] }
  mutate { convert => [ "last_insert_id", "integer" ] }
  mutate { convert => [ "insert_id", "integer" ] }
  mutate { convert => [ "server_id", "integer" ] }

  # convert start_time to @timestamp
  date {
    match => [ "start_time", "YYYY-MM-DD HH:mm:ss" ]
    remove_field => [ "start_time" ]
  }

  # normalize query_time from HH::mm::ss to seconds
  ruby { code => "event['query_time'] = event['query_time'] ? event['query_time'].split(':').inject(0){|a, m| a = a * 60 + m.to_i} : 0" }

  # normalize lock_time from HH:mm:ss to seconds
  ruby { code => "event['lock_time'] = event['lock_time'] ? event['lock_time'].split(':').inject(0){|a, m| a = a * 60 + m.to_i} : 0" }

  # remove original message
  mutate { remove_field => [ "message" ] }

}

output {
  stdout { codec => rubydebug }
}

The logstash filter was adapted from the slow_log table definition:

| slow_log | CREATE TABLE `slow_log` (
  `start_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  `user_host` mediumtext NOT NULL,
  `query_time` time NOT NULL,
  `lock_time` time NOT NULL,
  `rows_sent` int(11) NOT NULL,
  `rows_examined` int(11) NOT NULL,
  `db` varchar(512) NOT NULL,
  `last_insert_id` int(11) NOT NULL,
  `insert_id` int(11) NOT NULL,
  `server_id` int(10) unsigned NOT NULL,
  `sql_text` mediumtext NOT NULL
) ENGINE=CSV DEFAULT CHARSET=utf8 COMMENT='Slow log' |

logstash output

{
          "@version" => "1",
        "@timestamp" => "2015-01-07T18:20:02.000Z",
              "type" => "mysql_slow_query",
              "host" => "hera",
              "path" => "/var/lib/mysql/mysql/slow_log.CSV",
         "user_host" => "root[root] @ localhost []",
        "query_time" => 0,
         "lock_time" => 0,
         "rows_sent" => 0,
     "rows_examined" => 21,
                "db" => "foo",
    "last_insert_id" => 0,
         "insert_id" => 0,
         "server_id" => 0,
          "sql_text" => "select * from person where name like \"foo%\""
}
Copy link

ghost commented Mar 5, 2015

Recently I needed this to parsing mariaDb slow logs and it worked for me after slight changes in filter part:

filter {
  # mysql escapes double quotes with backslashes, but
  # ruby expects pairs of double quotes
  mutate { gsub => [ "message", '\\"', '""' ] }

  csv {
    columns => [ "start_time", "user_host", "query_time", "lock_time",
                 "rows_sent", "rows_examined", "db", "last_insert_id",
                 "insert_id", "server_id", "sql_text" ]
  }

  # convert various fields to integer
  mutate { convert => [ "rows_sent", "integer" ] }
  mutate { convert => [ "rows_examined", "integer" ] }
  mutate { convert => [ "last_insert_id", "integer" ] }
  mutate { convert => [ "insert_id", "integer" ] }
  mutate { convert => [ "server_id", "integer" ] }

  #remove microseconds from start_time
  grok {
     match => ["start_time","%{DATESTAMP_EVENTLOG:logtime}\.[0-9]+"]
     #remove_field => [ "start_time" ]
 }

  # convert logtime to @timestamp
  date {
    match => [ "logtime", "YYYY-MM-DD HH:mm:ss" ]
    #remove_field => [ "logtime" ]
  }

  # normalize query_time from HH::mm::ss.SSSSSS to seconds
  ruby { code => "event['query_time'] = event['query_time'] ? event['query_time'].split(':').inject(0){|a, m| a = a * 60 + m.to_f} : 0" }

  # normalize lock_time from HH:mm:ss to seconds
  ruby { code => "event['lock_time'] = event['lock_time'] ? event['lock_time'].split(':').inject(0){|a, m| a = a * 60 + m.to_f} : 0" }

  # remove original message
  mutate { remove => [ "message" ] }

}

mainly it is because mariadb logs time with microseconds 2015-10-10 10:11:12.456852

@mjpowersjr
Copy link
Author

Thanks for sharing!

@zozo6015
Copy link

Hi, how did you open up permission I have kind of tried anything but logstash-forwarder was still not able to access it. I was getting permission denied.

@zozo6015
Copy link

Nevermind I just reconfigured logstash-forwarder to run as root without chroot

@aharsani
Copy link

corrected @jurajsim MariaDb slow_log.CSV filter

filter {
    if [type] == "mysql_slow" {

      # mysql escapes double quotes with backslashes, but
      # ruby expects pairs of double quotes
      mutate { gsub => [ "message", '\\"', '""' ] }

      csv {
        columns => [ "start_time", "user_host", "query_time", "lock_time",
                 "rows_sent", "rows_examined", "db", "last_insert_id",
                 "insert_id", "server_id", "sql_text", "thread_id"]
      }

      # convert various fields to integer
      mutate { convert => [ "rows_sent", "integer" ] }
      mutate { convert => [ "rows_examined", "integer" ] }
      mutate { convert => [ "last_insert_id", "integer" ] }
      mutate { convert => [ "insert_id", "integer" ] }
      mutate { convert => [ "server_id", "integer" ] }
      mutate { convert => [ "thread_id", "integer" ] }

      # convert start_time to @timestamp
      date {
        match => [ "start_time", "yyyy-MM-dd HH:mm:ss.SSSSSS" ]
        remove_field => [ "start_time" ]
      }

      # normalize query_time from HH::mm::ss.SSSSSS to seconds
      ruby { code => "event['query_time'] = event['query_time'] ? event['query_time'].split(':').inject(0){|a, m| a * 60 + m.to_f} : 0" }

      # normalize lock_time from HH:mm:ss.SSSSSS to seconds
      ruby { code => "event['lock_time'] = event['lock_time'] ? event['lock_time'].split(':').inject(0){|a, m| a * 60 + m.to_f} : 0" }

      # remove original message
      mutate { remove => [ "message" ] }
    } 
}

@strarsis
Copy link

strarsis commented Dec 4, 2016

@aharsani:
Apparently the wrong option name is used ('remove' instead 'remove_field'):

[...]
# remove original message
      mutate { remove => [ "message" ] }
[...]

Fixed configuration:

[...]
# remove original message
      mutate { remove_field => [ "message" ] }
[...]

@mjpowersjr
Copy link
Author

@starsis - Thanks for posting, it looks like the 'remove' option was renamed to 'remove_field' sometime during a recent logstash release. I updated the gist to reflect your fix.

@WAlekseev
Copy link

Logstash 5+ have changed API

Ruby exception occurred: Direct event field references (i.e. event['field']) have been disabled in favor of using event get and set methods (e.g. event.get('field')). Please consult the Logstash 5.0 breaking changes documentation for more details.

This code works for me:

ruby { code => "event.get('query_time') ? event.set('query_time', event.get('query_time').split(':').inject(0){|a, m| a = a * 60 + m.to_i}) : 0" }
ruby { code => "event.get('lock_time') ? event.set('lock_time', event.get('lock_time').split(':').inject(0){|a, m| a = a * 60 + m.to_i}) : 0" }

@alexanderWallrabenstein
Copy link

alexanderWallrabenstein commented Jun 12, 2017

Thanks for sharing. Helped a lot.

I had to tweak the date filter:

# convert start_time to @timestamp
      date {
         match => [ "start_time", "yyyy-MM-dd HH:mm:ss" ]
         remove_field => [ "start_time" ]
      }

@viperey
Copy link

viperey commented Oct 10, 2017

For some reason, the start_date format was not correct in my case.

    date {
      match => [ "start_time", "yyyy-MM-dd HH:mm:ss" ]
      remove_field => [ "start_time" ]
    }

With this values (lower case) it was correct. The one you provided was sending today's log (10th of October) to the 1st of October.

Logstash version is 2.2

@choppedpork
Copy link

Huge thanks to @mjpowersjr and everyone who contributed - this is a ground breaking hack as far as I'm concerned! I've initially set out on my slowlog in ELK adventure by attempting to use the Filebeat mysql module but failed miserably, possibly because it doesn't understand MariaDB timestamps.

I've combined the fixes from above into a working config for MariaDB 10.1.22 and Logstash 5.4, I've also added the rows_affected field present in the CSV and added converting query_time and lock_time into float because being able to sort on those rocks! :)

filter {
  # mysql escapes double quotes with backslashes, but
  # ruby expects pairs of double quotes
  mutate { gsub => [ "message", '\\"', '""' ] }

  csv {
    columns => [ "start_time", "user_host", "query_time", "lock_time",
                 "rows_sent", "rows_examined", "db", "last_insert_id",
                 "insert_id", "server_id", "sql_text", "thread_id", "rows_affected" ]
  }

  # convert various fields to integer
  mutate { convert => [ "rows_sent", "integer" ] }
  mutate { convert => [ "rows_examined", "integer" ] }
  mutate { convert => [ "rows_affected", "integer" ] }
  mutate { convert => [ "last_insert_id", "integer" ] }
  mutate { convert => [ "insert_id", "integer" ] }
  mutate { convert => [ "server_id", "integer" ] }

  # convert start_time to @timestamp
  date {
    match => [ "start_time", "yyyy-MM-dd HH:mm:ss.SSSSSS" ]
    remove_field => [ "start_time" ]
  }

  # normalize query_time from HH::mm::ss.SSSSSS to seconds
  ruby { code => "event.get('query_time') ? event.set('query_time', event.get('query_time').split(':').inject(0){|a, m| a = a * 60 + m.to_f}) : 0" }

  # normalize lock_time from HH:mm:ss.SSSSSS to seconds
  ruby { code => "event.get('lock_time') ? event.set('lock_time', event.get('lock_time').split(':').inject(0){|a, m| a * 60 + m.to_f}) : 0" }

  mutate { convert => [ "query_time", "float" ] }
  mutate { convert => [ "lock_time", "float" ] }

  # remove original message
  mutate { remove_field => [ "message" ] }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment