Keep track of your Zimbra 8.7+ server with an ELK stack

Kibana graphs are extremely useful for keeping track of server health! So why not use it to keep track of your Zimbra server?

This article describes how to set up grok filters for parsing email relay (inbound, outbound, internal etc.) and spam rejection in the new and awesome postscreen which has really cleaned up spam in zimbra with the 8.7 release.

This was actually what triggered me to do the work on this, I wanted to know how effective the spam processing in postscreen was.

Assumptions:

  • Zimbra server up and running 8.7 (the email relay filters probably work for older zimbra versions as well)
  • ELK stack up and running and zimbra syslog forwarded to elastic via logstash (this is where the grok magic happens).

Links:

  • Must have debugging of grok filters: https://grokdebug.herokuapp.com/Lets start with the postscreen blocks. These are logged with messages similar to:
    NOQUEUE: reject: RCPT from [89.36.215.249]:51025: 550 5.7.1 Service unavailable; client [89.36.215.249] blocked using b.barracudacentral.org; from=<test@mail.com>, to=<test@mail.com>, proto=ESMTP, helo=<test.biz>

    After some fiddling I ended up with the following grok filter to parse the interesting fields (I will update this as needed if it fails to parse:

    %{NOTSPACE:intro}: %{NOTSPACE:action}: (?<request>[^\[]*)()?\[%{IP:ip}\]:(%{WORD:port:int}:)? (?<response>(.+?);) (?<info>(.+?);)?( )?from=<(%{NOTSPACE:from})?>([ ,]*)?to=<%{NOTSPACE:to}>([ ,]*)?%{GREEDYDATA:rest}

    As for the postfix relay logs, entries look similar to the following:

    (03790-02) Passed CLEAN {RelayedInbound}, [194.9.95.232]:53692 [194.9.95.232] <test@mail.com> -> <test@mail.com>, Queue-ID: 1226A2527B4, Message-ID: <20160801035920.292A71125184@s543.mail.com>, mail_id: 3tRGIENE25RS, Hits: -3.187, size: 646354, queued_as: B562F25279C, 4548 ms
    
    (03781-02) Passed CLEAN {RelayedInternal}, ORIGINATING_POST/MYNETS LOCAL [127.0.0.1]:46216 <test@mail.com> -> <test@mail.com>, Queue-ID: E4898252753, Message-ID: <20160801030201.A5B08200D1@test.com>, mail_id: 7TEfKxqG7WtY, Hits: -2.9, size: 1731, queued_as: 65A12250A02, 2445 ms
    
    (32215-01) Passed CLEAN {RelayedOutbound}, ORIGINATING_POST/MYNETS LOCAL [127.0.0.1]:43169 <test@mail.com> -> <test@mail.com>, Queue-ID: DC109248CA6, Message-ID: <1132251092.44016.1469999135139.JavaMail.zimbra@test.com>, mail_id: BGm5Pu_UkU_5, Hits: 2.678, size: 401360, queued_as: 1BB3025241C, 2103 ms

    I will try to update the above as more examples comes with emails hitting my zimbra.

Just in case you don’t have everything sorted with logstash parsing etc. here’s an example of how I’ve set up logstash to parse syslog and send it on to a rabbit queue. With this setup you can set up rsyslog to forward logs directly to logstash on port 10514.

# This input block will listen on port 10514 for logs to come in.
# host should be an IP on the Logstash server.
# codec => "json" indicates that we expect the lines we're receiving to be in JSON format
# type => "rsyslog" is an optional identifier to help identify messaging streams in the pipeline.
input {
        syslog {
                port => 10514
                type => "logs"
        }
}

# This is an empty filter block.  You can later add other filters here to further process
# your log lines
filter {
  if [message] =~ "NOQUEUE: reject:" {
    grok {
      match => [ "message", "%{NOTSPACE:intro}: %{NOTSPACE:action}: (?<request>[^\[]*)()?\[%{IP:ip}\]:(%{WORD:port}:)? (?<response>(.+?);) (?<info>(.+?);)?( )?from=<(%{NOTSPACE:from})?>([ ,]*)?to=<%{NOTSPACE:to}>([ ,]*)?%{GREEDYDATA:rest}" ]
    }
    if "_grokparsefailure" not in [tags] {
      geoip { # https://www.elastic.co/guide/en/logstash/current/plugins-filters-geoip.html
        source => "ip"
      }
      mutate { 
        remove_field => [ "severity_label", "severity", "priority", "logsource", "facility_label", "facility" ] 
        replace => { "type" => "zimbra-block" }
      }
    }
  } 
  else if "Relayed" in [message] {
    grok {
      match => [ "message", "\(%{NOTSPACE:id}\) (?<result>[^\{]*) \{%{NOTSPACE:action}\}, (?<source_str>[^\[]*)\[%{IP:ip}\]:%{INT:port:int} (\[%{IP:ip2}\] )?\<(%{NOTSPACE:from})?\> -\> \<%{NOTSPACE:to}\>, (quarantine: %{NOTSPACE:quarantine},)?Queue-ID: %{NOTSPACE:queue_id}, Message-ID: \<%{NOTSPACE:message_id}\>, mail_id: %{NOTSPACE:mail_id}, Hits: %{NOTSPACE:hits:float}, size: %{NOTSPACE:size:int}, queued_as: %{NOTSPACE:queued_as}, (dkim_sd=%{NOTSPACE:dkim}, )?%{INT:proccesing_time:int} ms" ]
    }
    if "_grokparsefailure" not in [tags] {
      if [ip] not in ["127.0.0.1", "10.1.1.1"] {
        geoip { # https://www.elastic.co/guide/en/logstash/current/plugins-filters-geoip.html
          source => "ip"
        }
      }
      mutate { 
        remove_field => [ "severity_label", "severity", "priority", "logsource", "facility_label", "facility" ] 
        replace => { "type" => "zimbra-relayed" }
      }
    }
  }
}

# Output to elastic"
output {
  if [type] == "logs" {
    elasticsearch {
      hosts => [ "127.0.0.1" ]
      index => "logstash-syslog-%{+YYYY.MM.dd}"
    }
  }
  else if [type] == "zimbra-block" {
    elasticsearch {
      hosts => [ "127.0.0.1" ]
      index => "logstash-zimbra-block-%{+YYYY.MM.dd}"
    }
  }
  else if [type] == "zimbra-relayed" {
    elasticsearch {
      hosts => [ "127.0.0.1" ]
      index => "logstash-zimbra-relayed-%{+YYYY.MM.dd}"
    }
  }
}

 

Sending dicts, lists etc. as arguments to a fabric task using json

A quick tip if you need to send more complex objects than strings to a fabric task.

It’s basically just a matter of proper escaping and using python’s json library to parse. Keep in mind:

  • Put single quotes around the “argument” (the whole json string) and use doublequotes in the json structure itself
  • Escape all “,” in the json with “\,”

In short you can call a task like so:

fab [args] mytask:petdict='{"cat": "1"\, "dog": "1"}'

Then, from within the task itself you just…

import json
...
def mytask(petdict=None):
    if petdict:
        parsed_dict = json.loads(petdict)

Python Script to clean out old elasticsearch indices

Below you will find a simple python script that can be used together with crontab or manually to clear out old daily elasticsearch indices. You just need to specify the index name and how many days back (from today) that you want to remove.

Example use to delete syslog indices older than 90 days:

python clean-old-indices.py syslog 90

#!/usr/bin/env python
# -*- coding: utf8 -*-

__author__ = "Kristofer Borgström"
__credits__ = "Mattias Hemmingsson"

import sys, getopt, urllib2, datetime


def usage():
    print "clean-old-indices.py [-h <elastic host>] <index> <days-to-keep>"
    print "Example: python clean-old-indices.py -h 1.1.1.1 logstash-syslog 90"
    print "Default host is: localhost"


def es_execute(hostname, path, method="GET"):
    """
    Run the command against the elasticsearch server
    """

    url = "http://{0}:9200/{1}".format(hostname, path)

    req = urllib2.Request(url)
    req.get_method = lambda: method

    res = urllib2.urlopen(req)

    return res.read()


try:
    # Parse the arguments and options
    argv = sys.argv[1:]
    opts, args = getopt.getopt(argv, "h:")

    if len(args) != 2:
        raise getopt.GetoptError("")

    host = "localhost"
    for o, a in opts:
        print o
        if o == '-h':
            host = a

    arg_iter = iter(args)
    index_name = arg_iter.next()
    days = int(arg_iter.next())

    # Index cutoff definition, remove older than this date
    earliest_to_keep = datetime.date.today() - datetime.timedelta(days=days)

    index_cutoff = "{0}-{1}".format(index_name, earliest_to_keep.strftime("%Y.%m.%d"))

    all_indices =  es_execute(host, '_cat/indices')

    for line in all_indices.splitlines():
        index = line.split(" ")[2]

        if index.startswith(index_name) and index < index_cutoff:
            print "Deleting index: %s"% index
            es_execute(host, index, method="DELETE")

except getopt.GetoptError:
    usage()
    sys.exit(2)