Handling log levels in logstash

Most logging frameworks include a concept of severity or priority, including tags like “WARNING”, “CRITICAL”, etc.

Breaking these off into their own fields in logstash makes a lot of sense.  Unfortunately, when you go to look for problems, you end up with a search like this:

log_level:("EMERG" OR "ALERT" OR "CRIT" OR "ERROR")

which is both inefficient (5 string comparisons) and unclear (did I miss or misspell one?).

What I like to do is create an additional, numeric representation of the log level, so that my search looks like this:

log_code:<=3

With the two fields, you can easily query for bad stuff, but still use log_level for display (in aggregations, etc).

I have standardized on the Apache LogLevel definitions:

Level Description Example
emerg Emergencies – system is unusable. “Child cannot open lock file. Exiting”
alert Action must be taken immediately. “getpwuid: couldn’t determine user name from uid”
crit Critical Conditions. “socket: Failed to get a socket, exiting child”
error Error conditions. “Premature end of script headers”
warn Warning conditions. “child process 1234 did not exit, sending another SIGHUP”
notice Normal but significant condition. “httpd: caught SIGBUS, attempting to dump core in …”
info Informational. “Server seems busy, (you may need to increase StartServers, or Min/MaxSpareServers)…”
debug Debug-level messages “Opening config file …”

My logstash config is broken into many smaller pieces.  The filter{} stanza for each type of log is contained in a separate file, and there are generic stanzas that run before and after the business logic.

If you’re processing a log file whose levels are different, you need to normalize them.  The translate{} filter is great for this:

translate {
  dictionary => [
    "WRN", "warn",
    "INF", "info",
    "DBG", "debug"
  ]
  field => "[loglevel][tmp]"
  destination => "[loglevel][name]"
  remove_field => [ "[loglevel][tmp]" ]
}

From what I can tell, translate{} won’t replace the source field, so the earlier grok{} matches into a temporary variable that is removed here.

Once [loglevel][name] is normalized, I use a post-processing config file to add the second [loglevel][code] field:

if [loglevel] and [loglevel][name] {
  translate {
    dictionary => [
      "emerg", "0",
      "alert", "1",
      "crit", "2",
      "error", "3",
      "warn", "4",
      "notice", "5",
      "info", "6",
      "debug", "7"
    ]
    field => "[loglevel][name]"
    destination => "[loglevel][code]"
  }

  # make it an int
  mutate {
    convert => [ "[loglevel][code]", "integer" ]
  }

 

Leave a Reply

Your email address will not be published. Required fields are marked *