Logstash

From Leo's Notes
Last edited on 8 November 2023, at 18:18.

Logstash is the open source copy of splunk - a log capturing and groking (parsing and processing) application.

Installation

It's probably easiest to run Logstash inside a container.

Out of date information

For detailed information, consult Logstash's tutorial (http:// Logstash.net/docs/). Prior to Logstash 1.4.0, the Logstash package comes as a monolithic .jar file. To get started, install java and run the jar file.

If you are using Logstash >= 1.4.0, it's probably easier to just install Logstash and ElasticSearch from their repository:

[logstash-1.4]
name=logstash repository for 1.4.x packages
baseurl=http://packages.elasticsearch.org/logstash/1.4/centos
gpgcheck=1
gpgkey=http://packages.elasticsearch.org/GPG-KEY-elasticsearch
enabled=1


[elasticsearch-1.3]
name=Elasticsearch repository for 1.3.x packages
baseurl=http://packages.elasticsearch.org/elasticsearch/1.3/centos
gpgcheck=1
gpgkey=http://packages.elasticsearch.org/GPG-KEY-elasticsearch
enabled=1

Then run yum install logstash elasticsearch

Configuration

The configuration files for Logstash are located at:

  1. /etc/sysconfig/logstash
  2. /etc/logstash/conf.d/

The files in the conf.d directory will be treated as a single configuration file in sequence by their filenames by Logstash.

You may want to change the DATA_DIR path in the Logstash configuration. The actual input/parse/output configurations will be placed in the conf.d directory. More on this below.

ElasticSearch's files are at:

  1. /etc/elasticsearch/elasticsearch.yml

You will need to configure ElasticSearch based on how you want to set up your search. For replication/sharding, you should ideally have more than one server. If you do have more than one server, make sure you have the node names set and have the autodiscoverer configured.

Configuration

Logstash requires configuration for the pipelines (inputs, groking, output) and any parsing patterns that you may have.

Pipelines

A pipeline consists of inputs, filtering (where parsing, groking, processing happens), and output (where the processed logs should be sent, usually ElasticSearch).

You can see the entire list of available plugins for each of these sections at: http:// Logstash.net/docs/1.4.2/

input {

	# Import syslog messages
	tcp {
		type => syslog_import
		port => 4401
	}

	# Accept syslog messages from hosts
	syslog {
		type => syslog
		port => 5544
	}
}



filter {

	if [type] == "syslog" {
		# Does the syslog parsing.
		syslog_pri { }

		mutate {
			replace => [ "@source", "%{logsource}" ]
			replace => [ "@message", "%{message}" ]
			replace => [ "@program", "%{program}" ]
			replace => [ "@type", "syslog" ]
		}

		# Date is parsed and placed into @timstamp.
		date {
			match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss", "ISO8601" ]
		}

		# Clean up the extra syslog_ fields generated above from grok.
		mutate {
			remove_field => [ "syslog_hostname", "syslog_message", "syslog_program", "syslog_timestamp", "type", "message", "logsource", "program"]
		}
	}

	# For imported syslog messages...
	if [type] == "syslog_import" {
		
		if [message] =~ /last message repeated.*/ {
			drop {
			}
		}
		
		if [message] == "" {
			drop {
			}
		}


		# Parse with grok
		grok {
			# Use the custom SYSLOGYEARTIMESTAMP pattern from the patterns
			# directory. We need this to define year.
			patterns_dir => "./patterns"

			# The pattern to match.
			# This is the standard syslog pattern.
			match => { "message" => "%{SYSLOGYEARTIMESTAMP:syslog_timestamp} (%{USER:syslog_user}\@)?%{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }

			# Add a few intermediate fields
			add_field => [ "received_at", "%{@timestamp}" ]
			add_field => [ "received_from", "%{host}" ]
		}
		
		# When the above grok parsing fails, a '_grokparsefailure' tag gets
		# added to the message. In that case, we attempt to update some fields.
		# Why? Beats me.
		if !("_grokparsefailure" in [tags]) {
			mutate {
				replace => [ "@source", "%{syslog_hostname}" ]
				replace => [ "@message", "%{syslog_message}" ]
				replace => [ "@program", "%{syslog_program}" ]
				replace => [ "@type", "syslog imported" ]
			}
		}

		# Parse the date. This puts it into the @timestamp field on a successful
		# parse.
		date {
			match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss", "YYYY MMM  d HH:mm:ss", "YYYY MMM dd HH:mm:ss" ]
		}

		# Clean up the extra syslog_ fields generated above from grok.
		mutate {
			remove_field => [ "syslog_hostname", "syslog_message", "syslog_program", "syslog_timestamp", "type", "message", "host" ]
		}

	}

}


output {
	# Debugging
	# stdout {
	# 	codec => json
	# }

	elasticsearch {
		# Define our own... hosted on my computer
		# bind_host => "leo-linux"
		# bind_port => 9200
		host => "127.0.0.1"
		port => 9300

		cluster => "logstash_es"
		node_name => "logstash_0"

		# Index defaults to 'logstash-%{+YYYY.MM.dd}'
		# The templates being used can be defined using:
		template => "/etc/logstash/template/logstash.json"
		
	}
}


Inputs

The inputs define what Logstash will listen to for information. The configuration above listens on a tcp port (for backlog imports) for raw text and another port for syslog.

Parsing / Filtering

For each of the inputs, certain things are done in order to parse the input data into variables which are then passed to the output section.

Operations to the data are done through more sets of plugins (think: functions). For example, text parsing is done through grok. Parameters to these 'functions' are passed as parameters in the grok block. When grok fails at parsing a certain string, it will add an additional tag (ie: variable) called _grokparsefailure which can be used later on in the parse section.

Variables starting with a '@' are used by ElasticSearch to denote mandatory fields (... I think?) which are defined in the ElasticSearch template file.

Example

grok {
	# Use the custom SYSLOGYEARTIMESTAMP pattern from the patterns
	# directory. We need this to define year.
	patterns_dir => "./patterns"

	# The pattern to match.
	# This is the standard syslog pattern.
	match => { "message" => "%{SYSLOGYEARTIMESTAMP:syslog_timestamp} (%{USER:syslog_user}\@)?%{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }

	# Add a few intermediate fields
	add_field => [ "received_at", "%{@timestamp}" ]
	add_field => [ "received_from", "%{host}" ]
}

This grok instance attempts to match the incoming message to the defined pattern. The syntax defining matched strings is %{PATTERN_NAME:variable_name} where PATTERN_NAME is a grok-pattern defined in /patterns/* in the .jar file and also in the directory defined in the patterns_dir directory, and variable_name is the name that can be used to referenced the matched value later on in the grok instance.

The SYSLOGYEARTIMESTAMP pattern is a custom pattern defined in my ./patterns directory.

cat patterns/extra 
SYSLOGYEARTIMESTAMP %{YEAR} %{MONTH} +%{MONTHDAY} %{TIME}

In the case above, syslog messages being imported whose date field matches the format given in SYSLOGYEARTIMESTAMP will be placed in the variable syslog_timestamp.

Outputs

The outputs section defines what Logstash will do with the variables generated from the parsing/filtering section.

To debug the inputs/filtering section, you can do:

stdout {
	codec => json
}

Variables / tags generated can be seen as part of a json object.

ElasticSearch takes in a template which defines the schema of indexes generated by Logstash. This template is optional, since Logstash will use a default template by default.

In the configuration example above, a template was defined for the ElasticSearch output.

{
    "template": "logstash-*",
    "settings" : {
        "index.query.default_field" : "@message"
    },
    "mappings": {
        "_default_": {
            "_all": { "enabled": false },
            "_source": { "compress": false },
            "dynamic_templates": [
                {
                    "fields_template" : {
                        "mapping": { "type": "string", "index": "not_analyzed" },
                        "path_match": "@fields.*"
                    }
                },
                {
                    "tags_template" : {
                        "mapping": { "type": "string", "index": "not_analyzed" },
                        "path_match": "@tags.*"
                    }
                }
            ],
            "properties" : {
                "@fields": { "type": "object", "dynamic": true, "path": "full" },
                "@timestamp" : { "type" : "date", "index" : "not_analyzed" },
                "@program" : { "type" : "string", "index" : "not_analyzed" },
                "@source" : { "type" : "string", "index" : "not_analyzed" },
                "@message" : { "type" : "string", "analyzer" : "whitespace" },
                "@type" : { "type" : "string", "index" : "not_analyzed" }
             }
        }
    }
}

The variables/tags that were generated from the parse field should match the property names defined in the template file. Depending on what you want out of ElasticSearch, you may or may not want to have every field analyzed.

Be careful with templates though. If the properties defined in the template file are not provided by the filtering/parsing section, the log entry will not be added to ElasticSearch.



Example

Rsyslog Logging

Use a pipeline similar to the following:

input {
  udp {
    port => 5144
      type => "syslog"
      tags => ["linux","system","syslog"]
  }
}


# determine and parse type of syslog message
filter {

  if [type] == "syslog" { 
  
    # look for and, if found, decode syslog priority
    if [message] =~ "^<[0-9]{1,3}>" { 
      grok {
        match => [ "message", "^<%{NONNEGINT:priority:int}>" ]
      }
      if [priority] <= 191 {
        # check for RFC 3164 vs RFC 5424
        if [message] =~ "^<[0-9]{1,3}>[0-9]{1,2} " {
          mutate {
            add_tag => ["syslog_rfc5424"]
          }
        }
        else {
          mutate {
            add_tag =>  ["syslog_rfc3164"]
          }
        }
      }
      else {
        mutate {  
          add_tag => ["syslog_priority_invalid"]
        }
      }
    } else {
      # only RFC 3164 allows a message to specify no priority
      mutate {  
        add_tag => [ "syslog_rfc3164", "syslog_priority_missing" ]
      }
    }

    # RFC 3164 suggests adding priority if it's missing. 
    # Even if missing, syslog_pri filter adds the default priority.
    syslog_pri {
      syslog_pri_field_name => "priority"
    }

    # parse both RFC 3164 and 5424
    grok {
      patterns_dir => "/etc/logstash/pattern.d"
      match => [ "message", "%{SYSLOG}" ]
      tag_on_failure => [ "_grokparsefailure_syslog" ]
    }
    
    # Check if a timestamp source was found and work out elapsed time recieving log
    # Note, mutate filter will convert a date object to a string not in ISO8601 format, so rather use ruby filter
    ruby {
      code => "event.set('timestamp_logstash', event.get('@timestamp'))"
#      old style ruby code (<v5.0)
#      code => "event['timestamp_logstash'] = event['@timestamp']"
    }

    if [timestamp_source] {
      date {
        locale => en
        # assume timezone for cases where it isn't provided
        timezone => "Europe/Berlin"
        match => [ "timestamp_source", "MMM d H:m:s", "MMM d H:m:s", "ISO8601" ]
      }
      # add a field for delta (in seconds) between logsource and logstash
      ruby {
        code => "event.set('time_elapsed_logstash', (event.get('timestamp_logstash') - event.get('@timestamp')))"
#        old style ruby code (<v5.0)
#        code => "event['time_elapsed_logstash'] = event['timestamp_logstash'] - event['@timestamp']"
      }
    }
    else {
      mutate {
        add_tag => ["syslog_timestamp_source_missing"]
      }
    }
    
    # Check if a host source was found
    if ! [host_source] {
      mutate {
        add_tag => ["syslog_host_source_missing"]
      }
    }

    # discard redundant info
    mutate {
      remove_field => [ "priority" ] #redundant and less useful once severity and facility are decoded
      replace => { "message" => "%{message_content}" } 
      remove_field => [ "message_syslog", "message_content" ] #already in content message
    } 
    
    # normalize for logstash fields
    mutate {
      rename => { "host" => "syslog_hostname" }
      rename => { "host_source" => "received_from" }
      rename => { "program" => "syslog_program" }
    }
  }
}

output {
#  stdout { codec => rubydebug } # - Useful for debugging
  elasticsearch {
    hosts => [ "elasticsearch:9200" ]
      index => "syslog-%{+YYYY.MM.dd}"
  }
}

Notice that the grok uses %{SYSLOG}. This is a grok pattern that's defined as a file under the patterns directory. The contents of that is given below:

### ref: https://github.com/logstash-plugins/logstash-input-syslog/issues/15#issuecomment-355655279

# This is a flexable grok pattern file for syslog. By default, it attempts to be
# relaxed and accomodate implimentation variations.

# valid priority range from 0 to 191, but 00 or 001 technically not legitimate
# according to RFC 3164.
SYSLOGPRINUMSTRICT (?:0|(?:(?:[1-9][0-9])|(?:1[0-8][0-9])|(?:19[0-1])))
# the example below is less precise but hopefully faster. Rather use range 
# checking logic in conf.
SYSLOGPRINUMRELAXED [0-9]{1,3}
SYSLOGPRISTRICT <%{SYSLOGPRINUMSTRICT:priority:int}>
SYSLOGPRIRELAXED <%{SYSLOGPRINUMRELAXED:priority:int}>
SYSLOGPRI %{SYSLOGPRIRELAXED}

# RFC3164
SYSLOG3164TIMESTAMPSTRICT (?:(?:Jan)|(?:Feb)|(?:Mar)|(?:Apr)|(?:May)|(?:Jun)|(?:Jul)|(?:Aug)|(?:Sep)|(?:Oct)|(?:Nov)|(?:Dec)) (?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9]) (?:2[0123]|[01][0-9]):(?:[0-5][0-9]):(?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?)
# Try be even more flexable then RFC3164 and also allow ISO8601 timestamps.
SYSLOG3164TIMESTAMPRELAXED (?:%{MONTH} +%{MONTHDAY} %{TIME})|%{TIMESTAMP_ISO8601}
SYSLOG3164TIMESTAMP %{SYSLOG3164TIMESTAMPRELAXED:timestamp_source}
# Hostname or IP allowed in RFC 3164, but not supposed to be FQDN. Can be 
# flexable and allow it.
HOSTNAMEONLY (?!-)[a-zA-Z0-9-]{1,63}(?<!-)
SYSLOG3164HOSTNAMESTRICT (?:%{HOSTNAMEONLY}|%{IP})
SYSLOG3164HOSTNAMERELAXED %{IPORHOST}
SYSLOG3164HOSTNAME %{SYSLOG3164HOSTNAMERELAXED:host_source}
# For the RFC3164 header, avoid matching RFC 5424 with a negative lookhead for a
# 5424 version number. Also assume that given a timestamp, a hostname aught 
# to follow 
SYSLOG3164HDR ^(?:%{SYSLOGPRI}(?!%{SYSLOG5424VER} ))?(?:%{SYSLOG3164TIMESTAMP} (:?%{SYSLOG3164HOSTNAME} )?)?
# The pattern below is bit stricter than the RFC definiton for tags. Technically 
# the tag is supposed to be only alphanumeric and terminate on first 
# non-alphanum character. However, many programs don't obey that. Generally 
# a colon or left sqaure bracket terminates the tag. In addition, exclude '<'
# character as not appropriate for a program name, given it can cause confusion 
# with a syslog priority header
SYSLOG3164TAG [^:\[<]{1,32}
SYSLOG3164PID \[%{POSINT:pid}\]
SYSLOG3164CONTENT %{GREEDYDATA:message_content}
SYSLOG3164MSG (%{SYSLOG3164TAG:program}(?:%{SYSLOG3164PID})?: ?)?%{SYSLOG3164CONTENT}
SYSLOG3164 %{SYSLOG3164HDR}%{SYSLOG3164MSG:message_syslog}

# RFC5424
SYSLOG5424VER [0-9]{1,2}
# Timestamp is ISO8601 - the version in grok-patterns wasn't as strict as it was defined in the RFC
SYSLOG5424TIMESTAMPSTRICT [0-9]{4}-(?:0[1-9]|1[0-2])-(?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])T(?:[01][0-9]|2[0123]):(?:[0-5][0-9]):(?:[0-5][0-9])(?:[.][0-9]{1,6})?(?:Z|[+-](?:[01][0-9]|2[0123]):[0-5][0-9])
SYSLOG5424TIMESTAMPRELAXED %{TIMESTAMP_ISO8601}
SYSLOG5424TIMESTAMP %{SYSLOG5424TIMESTAMPRELAXED}
# Hostname can be FQDN, DNS label/hostname only or IP
SYSLOGRFC5424HOSTNAME %{IPORHOST}
SYSLOG5424PRINTASCII [!-~]+
SYSLOG5424APPNAME [!-~]{1,48}
SYSLOG5424PROCID [!-~]{1,128}
SYSLOG5424MSGID [!-~]{1,32}
# Practically, only one version for now, and trying to parse future versions 
# would be unwise. So 1 'hardcoded'.
SYSLOG5424HDR ^%{SYSLOGPRI}1 (?:%{SYSLOG5424TIMESTAMP:timestamp_source}|-) (?:%{SYSLOGRFC5424HOSTNAME:host_source}|-) (?:%{SYSLOG5424APPNAME:program}|-) (?:%{SYSLOG5424PROCID:pid}|-) (?:%{SYSLOG5424MSGID:msgid}|-)
# Replace the 1 above with %{SYSLOG5424VER:syslog_version} to cater for 
# additional versions.
SYSLOG5424STRUCTDATA \[%{DATA}\]+
SYSLOG5424MSG %{GREEDYDATA:message_content}
SYSLOG5424 %{SYSLOG5424HDR} (?<message_syslog>(?:%{SYSLOG5424STRUCTDATA:structured_data}|-)( ?%{SYSLOG5424MSG})?)

# Try match and capture RFC 5424 first, given RFC 3164 allows messages without any syslog header. 
# Otherwise, RFC 3164 could accidentally capture an RFC 5424 priority and header as the tag or host of a raw message
SYSLOG %{SYSLOG5424}|%{SYSLOG3164}

Start Logstash. Configure rsyslog so that all logs are sent to the logstash server by appending to rsyslog.conf:

*.* @10.1.2.3:5144

One '@' denotes UDP. Use '@@' to have logs shipped via TCP. TCP transfers might be a bad idea since your rsyslog server might get bogged down trying to resend logs if the syslog server dies.

Tasks

Deleting specific logs

Determine which index your logs are stored (it's typically named after the date of the logs in question). In Kibana, you can then navigate to the Dev Tools page and then run the following:

POST /syslog-2023.11.08/_delete_by_query
{
    "query": {
        "bool": {
            "must": [],
            "filter": [
                {
                    "range": {
                        "@timestamp": {
                            "format": "strict_date_optional_time",
                            "gte": "2023-11-08T17:42:21.633Z",
                            "lte": "2023-11-08T17:51:03.818Z"
                        }
                    }
                },
                {
                    "match_phrase": {
                        "syslog_facility": "user-level"
                    }
                }
            ],
            "should": [],
            "must_not": []
        }
    }
}

See also