Category Archives: Logstash Tips

Finding the lag in your ELK cluster

By default, logstash sets @timestamp to the time when it processes the log.  Most people (correctly), use the date filter to set this to the event’s log.

Before you run the date{} filter, tuck away the value of @timestamp with a little bit of ruby:

ruby {
  code => "
    begin
      event['logstash_ts'] = event['@timestamp']
      rescue Exception => e
        event['logstash_ruby_exception'] = '[logstash_ts]: ' + e.message
    end
  "
}

This will create a new field called ‘logstash_ts’ that contains the timestamp when logstash received the event.

Now it’s safe to run your date{} filter to change @timestamp to the event’s timestamp.

After the date{} filter, compute the difference between the event’s time and the received time.  That’s the lag in getting the event processed by logstash:

ruby {
  code => "
    begin
      event['logstash_lag'] = ( ( event['logstash_ts'] - event['@timestamp'] ) ).to_f
    rescue Exception => e
      event['logstash_ruby_exception'] = '[logstash_lag]: ' + e.message
    end
  "
}

If this lag is high, the problem could be:

  • a delay in shipping the logs.  Check the health of your shipper.
  • a delay in processing the logs.  Is your logstash machine powerful enough?  Is your config “economical”?

There’s one more timestamp available, which is when Elasticsearch actually indexes the document.   You can enable the field in your mapping:

curl -XPUT localhost:9200/_template/timestamp -d '
 {
 "template" : "*",
 "order" : 10000,
 "mappings" : {
  "_default_" : {
   "_timestamp" : { "enabled" : true }
  }
 }
 }
 '

Note that this is deprecated in elasticsearch 2.0 and there’s no viable alternative!

Any new indexes created after that template is applied will have _timestamp stored for each document.

If the difference between our logstash_ts field and elasticsearch’s _timestamp is high, then:

  • you have connectivity problems between logstash and elasticsearch
  • elasticsearch is busy doing something else (check garbage collection!)
  • (maybe) you have a high index refresh_interval.

I haven’t found an easy way to compute the difference between these last two timestamps.  Scripted fields should work, but they’re disabled and/or poorly documented.

 

 

Handling log levels in logstash

Most logging frameworks include a concept of severity or priority, including tags like “WARNING”, “CRITICAL”, etc.

Breaking these off into their own fields in logstash makes a lot of sense.  Unfortunately, when you go to look for problems, you end up with a search like this:

log_level:("EMERG" OR "ALERT" OR "CRIT" OR "ERROR")

which is both inefficient (5 string comparisons) and unclear (did I miss or misspell one?).

What I like to do is create an additional, numeric representation of the log level, so that my search looks like this:

log_code:<=3

With the two fields, you can easily query for bad stuff, but still use log_level for display (in aggregations, etc).

I have standardized on the Apache LogLevel definitions:

Level Description Example
emerg Emergencies – system is unusable. “Child cannot open lock file. Exiting”
alert Action must be taken immediately. “getpwuid: couldn’t determine user name from uid”
crit Critical Conditions. “socket: Failed to get a socket, exiting child”
error Error conditions. “Premature end of script headers”
warn Warning conditions. “child process 1234 did not exit, sending another SIGHUP”
notice Normal but significant condition. “httpd: caught SIGBUS, attempting to dump core in …”
info Informational. “Server seems busy, (you may need to increase StartServers, or Min/MaxSpareServers)…”
debug Debug-level messages “Opening config file …”

My logstash config is broken into many smaller pieces.  The filter{} stanza for each type of log is contained in a separate file, and there are generic stanzas that run before and after the business logic.

If you’re processing a log file whose levels are different, you need to normalize them.  The translate{} filter is great for this:

translate {
  dictionary => [
    "WRN", "warn",
    "INF", "info",
    "DBG", "debug"
  ]
  field => "[loglevel][tmp]"
  destination => "[loglevel][name]"
  remove_field => [ "[loglevel][tmp]" ]
}

From what I can tell, translate{} won’t replace the source field, so the earlier grok{} matches into a temporary variable that is removed here.

Once [loglevel][name] is normalized, I use a post-processing config file to add the second [loglevel][code] field:

if [loglevel] and [loglevel][name] {
  translate {
    dictionary => [
      "emerg", "0",
      "alert", "1",
      "crit", "2",
      "error", "3",
      "warn", "4",
      "notice", "5",
      "info", "6",
      "debug", "7"
    ]
    field => "[loglevel][name]"
    destination => "[loglevel][code]"
  }

  # make it an int
  mutate {
    convert => [ "[loglevel][code]", "integer" ]
  }

 

Using conditionals in your filter{} blocks

Wrapping your filter logic in conditional expressions can be very important.

If your configuration is split up into many files, logstash will combine and run all of the stanzas.  Also, using conditionals to limit the amount of processing performed will make this step faster.

To tell if an event contains a given tag:

if "value" in [tags] {
}

For string fields:

if [FIELD] =~ /.+/ {
     # exists
}
 
if [FIELD] !~ /.+/ {
     # doesn't exist
}

For numeric fields:

if [FIELD] {
     # exists
}

Processing common event information with grok{}

If you’re lucky, most of your log messages in a given input will arrive in a standard format, typically with a set of common fields at the front (date, time, server, etc).

Rather than multiple grok{} patterns that are looking across the entire message, like these:

grok {
    match => ["message", "%{SYSLOGTIMESTAMP:syslogtime} %{HOSTNAME:sysloghost} Save this %{WORD:word1}"]
    tag_on_failure => ["_grokparsefailure_match1"]
}

grok {
    match => ["message", "%{SYSLOGTIMESTAMP:syslogtime} %{HOSTNAME:sysloghost} Save this other %{WORD:word2}"]
    tag_on_failure => ["_grokparsefailure_match2"]
}

I like to split off the common stuff:

grok {
    match => ["message", "%{SYSLOGTIMESTAMP:syslogtime} %{HOSTNAME:sysloghost} %{GREEDYDATA:message}"]
    overwrite => [ "message" ]
    tag_on_failure => ["_grokparsefailure_syslog"]
}

Note that the last pattern puts the results into the field “message”.  Since that field already exists, we have to use the “overwrite” setting to update it.

Then use smaller patterns against this smaller “message” for your application specific info:

grok {
    match => ["message", "Save this %{WORD:word1}"]
    tag_on_failure => ["_grokparsefailure_match1"]
}

This is easier to read, and the later grok{}s will be running smaller regexps
against smaller input, which should be faster.

Handling exceptions in the ruby{} filter

If you need to drop into ruby for your processing, be sure to handle any exceptions.  If you don’t, the filter worker will exit, possibly halting your processing.

The following code will allow the worker to continue, and will add a field to the event so you can track them down later:

    ruby {
        code => "
          begin
            # your great code goes here
          rescue Exception => e
            event['ruby_exception'] = 'YOUR_FILTER_NAME: ' + e.message
          end
       "
    }

 

I add a panel for this field to my cluster health dashboard in kibana.

Casting variables in the ruby{} filter

Note that the mutate{} filter contains a convert{} operation, which is very useful.

If you find yourself in ruby for another purpose, you might want to cast the variables while your there.  Here are some examples:

      ruby {
        code => "
          begin
            event['foo'] = event['foo'].to_f
            event['bar'] = event['bar'].to_i
          rescue Exception;
            event['ruby_exception'] = 'cast'
          end
        "
      }

Combining lines with the multiline{} filter

The multiline filter is designed to combine messages that span lines into a single event that can be easily processed with other logstash filters.  Systems that throw large exceptions (e.g. Java) are the standard use-case for this filter.

At the most basic, you need to provide three pieces of information to the filter:

  • ‘pattern’: the regular expression that signals the start of a new event.
  • ‘what’: the action to take with a line that does or doesn’t match the pattern.
  • ‘negate’: how the does/doesn’t for ‘what’ is decided.

When ‘negate’ is set to true, read it as “when my PATTERN doesn’t match, do WHAT”; when false, read it as “when my PATTERN does match, do WHAT”.

In this example, ‘negate’ is true, so we read it as “when my timestamp pattern doesn’t match, keep the line with the previous entry”:

filter {
    multiline {
      negate => 'true'
      pattern => "^%{TIMESTAMP_ISO8601} "
      what => 'previous'
    }
}

This filter should be used first, so that other filters will see the single event.

Until a new line matches the pattern, logstash is expecting more lines to join, so it won’t release the combined event.  There is an enable_flush option, but it should not be used in production.  In logstash version 1.5, the flush will be “production ready”.

When using multiline, you cannot use multiple filter workers, as each worker would be reading a different line.  If you attempt this configuration, logstash will not start.

If your application writes log entries in a way where they can overlap with each other, the basic filter can’t help you.  However, if your system prints a common string in each message (a UUID, etc), you can use that to combine messages.  See the ‘stream_identity’ option.

You should also consider using the multiline{} codec, so that messages are combined in the input{} phase.  Note that the codec doesn’t offer the ‘stream_identity’ option.

 

Changing @timestamp with the date{} filter

By default, elasticsearch sets the @timestamp field to the time that the document was written into elasticsearch.  For log file data, one of the first things you’ll want to do with logstash is to set @timestamp to be the value from the log event you’re processing.

To keep this example simple, we’ll assume that you’ve already grok’ed the event and have a field called “my_timestamp” that contains the correct information from the log event, e.g. “14/Mar/2015:09:26:53 -0500”.

filter {
  date {
    match => [ 'my_timestamp', "dd/MMM/yyyy:HH:mm:ss Z" ]
    remove_field => [ 'my_timestamp' ]
  }
}

The match{} takes the “my_timestamp” field, applies it against the specified pattern, and puts the resulting date object into the @timestamp field.

Once @timestamp has been populated, we no longer need “my_timestamp”, so it is removed.