Logstash
Logstash is the open source copy of splunk - a log capturing and groking (parsing and processing) application.
Installation
It's probably easiest to run Logstash inside a container.
Out of date information
For detailed information, consult Logstash's tutorial (http:// Logstash.net/docs/). Prior to Logstash 1.4.0, the Logstash package comes as a monolithic .jar file. To get started, install java and run the jar file.
If you are using Logstash >= 1.4.0, it's probably easier to just install Logstash and ElasticSearch from their repository:
[logstash-1.4]
name=logstash repository for 1.4.x packages
baseurl=http://packages.elasticsearch.org/logstash/1.4/centos
gpgcheck=1
gpgkey=http://packages.elasticsearch.org/GPG-KEY-elasticsearch
enabled=1
[elasticsearch-1.3]
name=Elasticsearch repository for 1.3.x packages
baseurl=http://packages.elasticsearch.org/elasticsearch/1.3/centos
gpgcheck=1
gpgkey=http://packages.elasticsearch.org/GPG-KEY-elasticsearch
enabled=1
Then run yum install logstash elasticsearch
Configuration
The configuration files for Logstash are located at:
/etc/sysconfig/logstash
/etc/logstash/conf.d/
The files in the conf.d directory will be treated as a single configuration file in sequence by their filenames by Logstash.
You may want to change the DATA_DIR path in the Logstash configuration. The actual input/parse/output configurations will be placed in the conf.d directory. More on this below.
ElasticSearch's files are at:
/etc/elasticsearch/elasticsearch.yml
You will need to configure ElasticSearch based on how you want to set up your search. For replication/sharding, you should ideally have more than one server. If you do have more than one server, make sure you have the node names set and have the autodiscoverer configured.
Configuration
Logstash requires configuration for the pipelines (inputs, groking, output) and any parsing patterns that you may have.
Pipelines
A pipeline consists of inputs, filtering (where parsing, groking, processing happens), and output (where the processed logs should be sent, usually ElasticSearch).
You can see the entire list of available plugins for each of these sections at: http:// Logstash.net/docs/1.4.2/
input {
# Import syslog messages
tcp {
type => syslog_import
port => 4401
}
# Accept syslog messages from hosts
syslog {
type => syslog
port => 5544
}
}
filter {
if [type] == "syslog" {
# Does the syslog parsing.
syslog_pri { }
mutate {
replace => [ "@source", "%{logsource}" ]
replace => [ "@message", "%{message}" ]
replace => [ "@program", "%{program}" ]
replace => [ "@type", "syslog" ]
}
# Date is parsed and placed into @timstamp.
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss", "ISO8601" ]
}
# Clean up the extra syslog_ fields generated above from grok.
mutate {
remove_field => [ "syslog_hostname", "syslog_message", "syslog_program", "syslog_timestamp", "type", "message", "logsource", "program"]
}
}
# For imported syslog messages...
if [type] == "syslog_import" {
if [message] =~ /last message repeated.*/ {
drop {
}
}
if [message] == "" {
drop {
}
}
# Parse with grok
grok {
# Use the custom SYSLOGYEARTIMESTAMP pattern from the patterns
# directory. We need this to define year.
patterns_dir => "./patterns"
# The pattern to match.
# This is the standard syslog pattern.
match => { "message" => "%{SYSLOGYEARTIMESTAMP:syslog_timestamp} (%{USER:syslog_user}\@)?%{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
# Add a few intermediate fields
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
}
# When the above grok parsing fails, a '_grokparsefailure' tag gets
# added to the message. In that case, we attempt to update some fields.
# Why? Beats me.
if !("_grokparsefailure" in [tags]) {
mutate {
replace => [ "@source", "%{syslog_hostname}" ]
replace => [ "@message", "%{syslog_message}" ]
replace => [ "@program", "%{syslog_program}" ]
replace => [ "@type", "syslog imported" ]
}
}
# Parse the date. This puts it into the @timestamp field on a successful
# parse.
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss", "YYYY MMM d HH:mm:ss", "YYYY MMM dd HH:mm:ss" ]
}
# Clean up the extra syslog_ fields generated above from grok.
mutate {
remove_field => [ "syslog_hostname", "syslog_message", "syslog_program", "syslog_timestamp", "type", "message", "host" ]
}
}
}
output {
# Debugging
# stdout {
# codec => json
# }
elasticsearch {
# Define our own... hosted on my computer
# bind_host => "leo-linux"
# bind_port => 9200
host => "127.0.0.1"
port => 9300
cluster => "logstash_es"
node_name => "logstash_0"
# Index defaults to 'logstash-%{+YYYY.MM.dd}'
# The templates being used can be defined using:
template => "/etc/logstash/template/logstash.json"
}
}
Inputs
The inputs define what Logstash will listen to for information. The configuration above listens on a tcp port (for backlog imports) for raw text and another port for syslog.
Parsing / Filtering
For each of the inputs, certain things are done in order to parse the input data into variables which are then passed to the output section.
Operations to the data are done through more sets of plugins (think: functions). For example, text parsing is done through grok
. Parameters to these 'functions' are passed as parameters in the grok
block. When grok
fails at parsing a certain string, it will add an additional tag (ie: variable) called _grokparsefailure
which can be used later on in the parse section.
Variables starting with a '@' are used by ElasticSearch to denote mandatory fields (... I think?) which are defined in the ElasticSearch template file.
Example
grok {
# Use the custom SYSLOGYEARTIMESTAMP pattern from the patterns
# directory. We need this to define year.
patterns_dir => "./patterns"
# The pattern to match.
# This is the standard syslog pattern.
match => { "message" => "%{SYSLOGYEARTIMESTAMP:syslog_timestamp} (%{USER:syslog_user}\@)?%{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
# Add a few intermediate fields
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
}
This grok instance attempts to match the incoming message to the defined pattern. The syntax defining matched strings is %{PATTERN_NAME:variable_name
} where PATTERN_NAME
is a grok-pattern defined in /patterns/*
in the .jar file and also in the directory defined in the patterns_dir
directory, and variable_name
is the name that can be used to referenced the matched value later on in the grok instance.
The SYSLOGYEARTIMESTAMP
pattern is a custom pattern defined in my ./patterns
directory.
cat patterns/extra
SYSLOGYEARTIMESTAMP %{YEAR} %{MONTH} +%{MONTHDAY} %{TIME}
In the case above, syslog messages being imported whose date field matches the format given in SYSLOGYEARTIMESTAMP
will be placed in the variable syslog_timestamp
.
Outputs
The outputs section defines what Logstash will do with the variables generated from the parsing/filtering section.
To debug the inputs/filtering section, you can do:
stdout {
codec => json
}
Variables / tags generated can be seen as part of a json object.
ElasticSearch takes in a template which defines the schema of indexes generated by Logstash. This template is optional, since Logstash will use a default template by default.
In the configuration example above, a template was defined for the ElasticSearch output.
{
"template": "logstash-*",
"settings" : {
"index.query.default_field" : "@message"
},
"mappings": {
"_default_": {
"_all": { "enabled": false },
"_source": { "compress": false },
"dynamic_templates": [
{
"fields_template" : {
"mapping": { "type": "string", "index": "not_analyzed" },
"path_match": "@fields.*"
}
},
{
"tags_template" : {
"mapping": { "type": "string", "index": "not_analyzed" },
"path_match": "@tags.*"
}
}
],
"properties" : {
"@fields": { "type": "object", "dynamic": true, "path": "full" },
"@timestamp" : { "type" : "date", "index" : "not_analyzed" },
"@program" : { "type" : "string", "index" : "not_analyzed" },
"@source" : { "type" : "string", "index" : "not_analyzed" },
"@message" : { "type" : "string", "analyzer" : "whitespace" },
"@type" : { "type" : "string", "index" : "not_analyzed" }
}
}
}
}
The variables/tags that were generated from the parse field should match the property names defined in the template file. Depending on what you want out of ElasticSearch, you may or may not want to have every field analyzed.
Be careful with templates though. If the properties defined in the template file are not provided by the filtering/parsing section, the log entry will not be added to ElasticSearch.
Example
Rsyslog Logging
Use a pipeline similar to the following:
input {
udp {
port => 5144
type => "syslog"
tags => ["linux","system","syslog"]
}
}
# determine and parse type of syslog message
filter {
if [type] == "syslog" {
# look for and, if found, decode syslog priority
if [message] =~ "^<[0-9]{1,3}>" {
grok {
match => [ "message", "^<%{NONNEGINT:priority:int}>" ]
}
if [priority] <= 191 {
# check for RFC 3164 vs RFC 5424
if [message] =~ "^<[0-9]{1,3}>[0-9]{1,2} " {
mutate {
add_tag => ["syslog_rfc5424"]
}
}
else {
mutate {
add_tag => ["syslog_rfc3164"]
}
}
}
else {
mutate {
add_tag => ["syslog_priority_invalid"]
}
}
} else {
# only RFC 3164 allows a message to specify no priority
mutate {
add_tag => [ "syslog_rfc3164", "syslog_priority_missing" ]
}
}
# RFC 3164 suggests adding priority if it's missing.
# Even if missing, syslog_pri filter adds the default priority.
syslog_pri {
syslog_pri_field_name => "priority"
}
# parse both RFC 3164 and 5424
grok {
patterns_dir => "/etc/logstash/pattern.d"
match => [ "message", "%{SYSLOG}" ]
tag_on_failure => [ "_grokparsefailure_syslog" ]
}
# Check if a timestamp source was found and work out elapsed time recieving log
# Note, mutate filter will convert a date object to a string not in ISO8601 format, so rather use ruby filter
ruby {
code => "event.set('timestamp_logstash', event.get('@timestamp'))"
# old style ruby code (<v5.0)
# code => "event['timestamp_logstash'] = event['@timestamp']"
}
if [timestamp_source] {
date {
locale => en
# assume timezone for cases where it isn't provided
timezone => "Europe/Berlin"
match => [ "timestamp_source", "MMM d H:m:s", "MMM d H:m:s", "ISO8601" ]
}
# add a field for delta (in seconds) between logsource and logstash
ruby {
code => "event.set('time_elapsed_logstash', (event.get('timestamp_logstash') - event.get('@timestamp')))"
# old style ruby code (<v5.0)
# code => "event['time_elapsed_logstash'] = event['timestamp_logstash'] - event['@timestamp']"
}
}
else {
mutate {
add_tag => ["syslog_timestamp_source_missing"]
}
}
# Check if a host source was found
if ! [host_source] {
mutate {
add_tag => ["syslog_host_source_missing"]
}
}
# discard redundant info
mutate {
remove_field => [ "priority" ] #redundant and less useful once severity and facility are decoded
replace => { "message" => "%{message_content}" }
remove_field => [ "message_syslog", "message_content" ] #already in content message
}
# normalize for logstash fields
mutate {
rename => { "host" => "syslog_hostname" }
rename => { "host_source" => "received_from" }
rename => { "program" => "syslog_program" }
}
}
}
output {
# stdout { codec => rubydebug } # - Useful for debugging
elasticsearch {
hosts => [ "elasticsearch:9200" ]
index => "syslog-%{+YYYY.MM.dd}"
}
}
Notice that the grok uses %{SYSLOG}
. This is a grok pattern that's defined as a file under the patterns directory. The contents of that is given below:
### ref: https://github.com/logstash-plugins/logstash-input-syslog/issues/15#issuecomment-355655279
# This is a flexable grok pattern file for syslog. By default, it attempts to be
# relaxed and accomodate implimentation variations.
# valid priority range from 0 to 191, but 00 or 001 technically not legitimate
# according to RFC 3164.
SYSLOGPRINUMSTRICT (?:0|(?:(?:[1-9][0-9])|(?:1[0-8][0-9])|(?:19[0-1])))
# the example below is less precise but hopefully faster. Rather use range
# checking logic in conf.
SYSLOGPRINUMRELAXED [0-9]{1,3}
SYSLOGPRISTRICT <%{SYSLOGPRINUMSTRICT:priority:int}>
SYSLOGPRIRELAXED <%{SYSLOGPRINUMRELAXED:priority:int}>
SYSLOGPRI %{SYSLOGPRIRELAXED}
# RFC3164
SYSLOG3164TIMESTAMPSTRICT (?:(?:Jan)|(?:Feb)|(?:Mar)|(?:Apr)|(?:May)|(?:Jun)|(?:Jul)|(?:Aug)|(?:Sep)|(?:Oct)|(?:Nov)|(?:Dec)) (?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9]) (?:2[0123]|[01][0-9]):(?:[0-5][0-9]):(?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?)
# Try be even more flexable then RFC3164 and also allow ISO8601 timestamps.
SYSLOG3164TIMESTAMPRELAXED (?:%{MONTH} +%{MONTHDAY} %{TIME})|%{TIMESTAMP_ISO8601}
SYSLOG3164TIMESTAMP %{SYSLOG3164TIMESTAMPRELAXED:timestamp_source}
# Hostname or IP allowed in RFC 3164, but not supposed to be FQDN. Can be
# flexable and allow it.
HOSTNAMEONLY (?!-)[a-zA-Z0-9-]{1,63}(?<!-)
SYSLOG3164HOSTNAMESTRICT (?:%{HOSTNAMEONLY}|%{IP})
SYSLOG3164HOSTNAMERELAXED %{IPORHOST}
SYSLOG3164HOSTNAME %{SYSLOG3164HOSTNAMERELAXED:host_source}
# For the RFC3164 header, avoid matching RFC 5424 with a negative lookhead for a
# 5424 version number. Also assume that given a timestamp, a hostname aught
# to follow
SYSLOG3164HDR ^(?:%{SYSLOGPRI}(?!%{SYSLOG5424VER} ))?(?:%{SYSLOG3164TIMESTAMP} (:?%{SYSLOG3164HOSTNAME} )?)?
# The pattern below is bit stricter than the RFC definiton for tags. Technically
# the tag is supposed to be only alphanumeric and terminate on first
# non-alphanum character. However, many programs don't obey that. Generally
# a colon or left sqaure bracket terminates the tag. In addition, exclude '<'
# character as not appropriate for a program name, given it can cause confusion
# with a syslog priority header
SYSLOG3164TAG [^:\[<]{1,32}
SYSLOG3164PID \[%{POSINT:pid}\]
SYSLOG3164CONTENT %{GREEDYDATA:message_content}
SYSLOG3164MSG (%{SYSLOG3164TAG:program}(?:%{SYSLOG3164PID})?: ?)?%{SYSLOG3164CONTENT}
SYSLOG3164 %{SYSLOG3164HDR}%{SYSLOG3164MSG:message_syslog}
# RFC5424
SYSLOG5424VER [0-9]{1,2}
# Timestamp is ISO8601 - the version in grok-patterns wasn't as strict as it was defined in the RFC
SYSLOG5424TIMESTAMPSTRICT [0-9]{4}-(?:0[1-9]|1[0-2])-(?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])T(?:[01][0-9]|2[0123]):(?:[0-5][0-9]):(?:[0-5][0-9])(?:[.][0-9]{1,6})?(?:Z|[+-](?:[01][0-9]|2[0123]):[0-5][0-9])
SYSLOG5424TIMESTAMPRELAXED %{TIMESTAMP_ISO8601}
SYSLOG5424TIMESTAMP %{SYSLOG5424TIMESTAMPRELAXED}
# Hostname can be FQDN, DNS label/hostname only or IP
SYSLOGRFC5424HOSTNAME %{IPORHOST}
SYSLOG5424PRINTASCII [!-~]+
SYSLOG5424APPNAME [!-~]{1,48}
SYSLOG5424PROCID [!-~]{1,128}
SYSLOG5424MSGID [!-~]{1,32}
# Practically, only one version for now, and trying to parse future versions
# would be unwise. So 1 'hardcoded'.
SYSLOG5424HDR ^%{SYSLOGPRI}1 (?:%{SYSLOG5424TIMESTAMP:timestamp_source}|-) (?:%{SYSLOGRFC5424HOSTNAME:host_source}|-) (?:%{SYSLOG5424APPNAME:program}|-) (?:%{SYSLOG5424PROCID:pid}|-) (?:%{SYSLOG5424MSGID:msgid}|-)
# Replace the 1 above with %{SYSLOG5424VER:syslog_version} to cater for
# additional versions.
SYSLOG5424STRUCTDATA \[%{DATA}\]+
SYSLOG5424MSG %{GREEDYDATA:message_content}
SYSLOG5424 %{SYSLOG5424HDR} (?<message_syslog>(?:%{SYSLOG5424STRUCTDATA:structured_data}|-)( ?%{SYSLOG5424MSG})?)
# Try match and capture RFC 5424 first, given RFC 3164 allows messages without any syslog header.
# Otherwise, RFC 3164 could accidentally capture an RFC 5424 priority and header as the tag or host of a raw message
SYSLOG %{SYSLOG5424}|%{SYSLOG3164}
Start Logstash. Configure rsyslog so that all logs are sent to the logstash server by appending to rsyslog.conf
:
*.* @10.1.2.3:5144
One '@
' denotes UDP. Use '@@
' to have logs shipped via TCP. TCP transfers might be a bad idea since your rsyslog server might get bogged down trying to resend logs if the syslog server dies.
Tasks
Deleting specific logs
Determine which index your logs are stored (it's typically named after the date of the logs in question). In Kibana, you can then navigate to the Dev Tools page and then run the following:
POST /syslog-2023.11.08/_delete_by_query
{
"query": {
"bool": {
"must": [],
"filter": [
{
"range": {
"@timestamp": {
"format": "strict_date_optional_time",
"gte": "2023-11-08T17:42:21.633Z",
"lte": "2023-11-08T17:51:03.818Z"
}
}
},
{
"match_phrase": {
"syslog_facility": "user-level"
}
}
],
"should": [],
"must_not": []
}
}
}
See also
- https://groups.google.com/forum/#!msg/ Logstash-users/X6kNHU0alBg/j95HZkTLo-EJ
- http://www.chriscowley.me.uk/blog/2014/03/21/ Logstash-on-centos-6/
- https://www.digitalocean.com/community/tutorials/how-to-use- Logstash-and-kibana-to-centralize-logs-on-centos-6
|