Category: Technical

ELK

How to push temperature and humidity from Telldus API…

Introduction

After noticing that logstash now has a http_poller plugin and the fact that Telldus Live provides a JSON API i thought getting the data into elasticsearch for graphing in Kibana would be a piece of cake. It turns it wasn’t really piece of cake but I took the time to piece together the challenges and try to package them so it will hopefully be a breeze for you assuming you already have an ELK stack set up. There are plenty of guides for setting that up so I wont go into that in this post.

Temperatures from Telldus API visualized in Kibana
Temperatures from Telldus API visualized in Kibana

Here we go…

Authentication

The first reason you can’t just point the http_poller toward the telldus API is that it is protected by oauth 1 authentication. I have some familiarity with python programming so I decided to build a simply python web server that would authenticate toward the telldus API server and expose it without authentication locally on my network. Mapping terminology here was the biggest challenge I ran into here. Basically this is how you map telldus API terminology to OAuth terminology:

Public key = Consumer key
Private key = Consumer secret
Token = Token secret
Token Secret = Token secret

I then proceeded to create a docker container and docker-compose in order to easily configure and run this simple oauth web server.

Incompatible Data Structure

If you don’t care about how this solution works you can safely skip this section. Basically, once you get the data from a sensor from the Telldus API – the JSON is not formed in a way that elasticsearch can handle well, essentially all the sensor values are inside of a list of objects. Since this data structure is not easily mapped into elasticsearch’s dot-separated key structure you are stuck with sensor value data data is not possible to graph.

The JSON looks like this:

 ...
    "data" :
        [
            {
                "name": "temperature",
                "value": 12
            },
            {
                "name": "humidity",
                "value": 40
            }
        ]
    ...

So to solve this I added a transformation of json object lists to key-value objects instead based on an attribute that can serve as a key. In the above, the “name” attribute is a perfect fit. After transformation the easily digestible JSON will instead look like this:

    "data" :
        {
            "temperature":
                {
                    "value": 12
                },
            "humidity":
                {
                    "value": 40
                }
        }

Putting the pieces together

Set up the oauth proxy using docker-compose. First, make sure you have docker-compose installed. Again, plenty of guides available for this so I wont cover it in this post.

Essentially you need to clone the repo, configure the keys and transformation setting in the docker-compose file and the run the docker container…

git clone https://github.com/kribor/oauth-proxy.git
cd oauth-proxy

Then use your favorite text editor to modify the docker-compose file:

version: '2'
services:
   oauth_proxy:
     build: .
     image: kribor/oauth-proxy
     ports:
       - "5000:5000"
     volumes:
       - .:/code
     environment:
       - OAUTH_PROXY_BASE_URL=https://api.telldus.com/
       - OAUTH_PROXY_CONSUMER_KEY=<consumer-key>
       - OAUTH_PROXY_CONSUMER_SECRET=<consumer-secret>
       - OAUTH_PROXY_TOKEN=<token>
       - OAUTH_PROXY_TOKEN_SECRET=<token-secret>
       - OAUTH_PROXY_JSON_LIST_TO_MAP=data:name

Then your ready to run it using:

docker-compose run

Viola! You now have a nice JSON endpoint for logstash http_poller plugin to check.

You can test it out by finding the URL to one of your sensors (log in to Telldus Live and go to the API explorer to find URLs and IDs of your sensors). Once you find one you can verify that everything is working by performing a curl toward the local proxy. You should see the sensor output JSON, something like this:

curl http://localhost:5000/json/sensor/info?id=11204641

{"clientName": "tec", "id": "11204641", "timezoneoffset": 7200, "editable": 1, "lastUpdated": 1493491193, "protocol": "fineoffset", "sensorId": "151", "name": "vaxthus", "ignored": 0, "keepHistory": "0", "data": {"humidity": {"value": "60", "scale": "0"}, "temp": {"value": "5.8", "scale": "0"}}, "battery": "254"}

The last piece now is to configure logstash to poll this data and push it into elasticsearch. While this is not the focus of this post, here’s an example logstash config to achieve this:

input {
  http_poller {
    # List of urls to hit
    # URLs can either have a simple format for a get request
    # Or use more complex HTTP features
    urls => {
      in => "http://localhost:5000/json/sensor/info?id=738560"
      out => "http://localhost:5000/json/sensor/info?id=738560"
      greenhouse => "http://localhost:5000/json/sensor/info?id=1120464"
    }
    # Maximum amount of time to wait for a request to complete
    request_timeout => 30
    # How far apart requests should be - 5 min
    schedule => { cron => "*/5 * * * * UTC"}
    # Decode the results as JSON
    codec => "json"
    # Store metadata about the request in this key
    metadata_target => "http_poller_metadata"
  }
}
filter {
  # Fix the lastupdated field so that it becomes a valid date.
  date {
    match => [ "lastUpdated","UNIX" ]
    target => " lastUpdatedDate"
  }
  mutate {
    remove_field => [ "lastUpdated" ]
    # Ensure elasticsearch treats sensor values as numbers
    convert => { 
      "battery" => "integer" 
      "[data][temp][scale]" => "float"
      "[data][temp][value]" => "float"
      "id" => "integer"
      "[data][humidity][scale]" => "float"
      "[data][humidity][value]" => "float"
      "keephistory" => "boolean"
    }
  }
}
output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "logstash-telldus-%{+YYYY.MM.dd}"
  }
}
Linux

Keep track of your Zimbra 8.7+ server with an…

Kibana graphs are extremely useful for keeping track of server health! So why not use it to keep track of your Zimbra server?

This article describes how to set up grok filters for parsing email relay (inbound, outbound, internal etc.) and spam rejection in the new and awesome postscreen which has really cleaned up spam in zimbra with the 8.7 release.

This was actually what triggered me to do the work on this, I wanted to know how effective the spam processing in postscreen was.

Assumptions:

  • Zimbra server up and running 8.7 (the email relay filters probably work for older zimbra versions as well)
  • ELK stack up and running and zimbra syslog forwarded to elastic via logstash (this is where the grok magic happens).

Links:

  • Must have debugging of grok filters: https://grokdebug.herokuapp.com/Lets start with the postscreen blocks. These are logged with messages similar to:
    NOQUEUE: reject: RCPT from [89.36.215.249]:51025: 550 5.7.1 Service unavailable; client [89.36.215.249] blocked using b.barracudacentral.org; from=<test@mail.com>, to=<test@mail.com>, proto=ESMTP, helo=<test.biz>

    After some fiddling I ended up with the following grok filter to parse the interesting fields (I will update this as needed if it fails to parse:

    %{NOTSPACE:intro}: %{NOTSPACE:action}: (?<request>[^\[]*)()?\[%{IP:ip}\]:(%{WORD:port:int}:)? (?<response>(.+?);) (?<info>(.+?);)?( )?from=<(%{NOTSPACE:from})?>([ ,]*)?to=<%{NOTSPACE:to}>([ ,]*)?%{GREEDYDATA:rest}

    As for the postfix relay logs, entries look similar to the following:

    (03790-02) Passed CLEAN {RelayedInbound}, [194.9.95.232]:53692 [194.9.95.232] <test@mail.com> -> <test@mail.com>, Queue-ID: 1226A2527B4, Message-ID: <20160801035920.292A71125184@s543.mail.com>, mail_id: 3tRGIENE25RS, Hits: -3.187, size: 646354, queued_as: B562F25279C, 4548 ms
    
    (03781-02) Passed CLEAN {RelayedInternal}, ORIGINATING_POST/MYNETS LOCAL [127.0.0.1]:46216 <test@mail.com> -> <test@mail.com>, Queue-ID: E4898252753, Message-ID: <20160801030201.A5B08200D1@test.com>, mail_id: 7TEfKxqG7WtY, Hits: -2.9, size: 1731, queued_as: 65A12250A02, 2445 ms
    
    (32215-01) Passed CLEAN {RelayedOutbound}, ORIGINATING_POST/MYNETS LOCAL [127.0.0.1]:43169 <test@mail.com> -> <test@mail.com>, Queue-ID: DC109248CA6, Message-ID: <1132251092.44016.1469999135139.JavaMail.zimbra@test.com>, mail_id: BGm5Pu_UkU_5, Hits: 2.678, size: 401360, queued_as: 1BB3025241C, 2103 ms

    I will try to update the above as more examples comes with emails hitting my zimbra.

Just in case you don’t have everything sorted with logstash parsing etc. here’s an example of how I’ve set up logstash to parse syslog and send it on to a rabbit queue. With this setup you can set up rsyslog to forward logs directly to logstash on port 10514.

# This input block will listen on port 10514 for logs to come in.
# host should be an IP on the Logstash server.
# codec => "json" indicates that we expect the lines we're receiving to be in JSON format
# type => "rsyslog" is an optional identifier to help identify messaging streams in the pipeline.
input {
        syslog {
                port => 10514
                type => "logs"
        }
}

# This is an empty filter block.  You can later add other filters here to further process
# your log lines
filter {
  if [message] =~ "NOQUEUE: reject:" {
    grok {
      match => [ "message", "%{NOTSPACE:intro}: %{NOTSPACE:action}: (?<request>[^\[]*)()?\[%{IP:ip}\]:(%{WORD:port}:)? (?<response>(.+?);) (?<info>(.+?);)?( )?from=<(%{NOTSPACE:from})?>([ ,]*)?to=<%{NOTSPACE:to}>([ ,]*)?%{GREEDYDATA:rest}" ]
    }
    if "_grokparsefailure" not in [tags] {
      geoip { # https://www.elastic.co/guide/en/logstash/current/plugins-filters-geoip.html
        source => "ip"
      }
      mutate { 
        remove_field => [ "severity_label", "severity", "priority", "logsource", "facility_label", "facility" ] 
        replace => { "type" => "zimbra-block" }
      }
    }
  } 
  else if "Relayed" in [message] {
    grok {
      match => [ "message", "\(%{NOTSPACE:id}\) (?<result>[^\{]*) \{%{NOTSPACE:action}\}, (?<source_str>[^\[]*)\[%{IP:ip}\]:%{INT:port:int} (\[%{IP:ip2}\] )?\<(%{NOTSPACE:from})?\> -\> \<%{NOTSPACE:to}\>, (quarantine: %{NOTSPACE:quarantine},)?Queue-ID: %{NOTSPACE:queue_id}, Message-ID: \<%{NOTSPACE:message_id}\>, mail_id: %{NOTSPACE:mail_id}, Hits: %{NOTSPACE:hits:float}, size: %{NOTSPACE:size:int}, queued_as: %{NOTSPACE:queued_as}, (dkim_sd=%{NOTSPACE:dkim}, )?%{INT:proccesing_time:int} ms" ]
    }
    if "_grokparsefailure" not in [tags] {
      if [ip] not in ["127.0.0.1", "10.1.1.1"] {
        geoip { # https://www.elastic.co/guide/en/logstash/current/plugins-filters-geoip.html
          source => "ip"
        }
      }
      mutate { 
        remove_field => [ "severity_label", "severity", "priority", "logsource", "facility_label", "facility" ] 
        replace => { "type" => "zimbra-relayed" }
      }
    }
  }
}

# Output to elastic"
output {
  if [type] == "logs" {
    elasticsearch {
      hosts => [ "127.0.0.1" ]
      index => "logstash-syslog-%{+YYYY.MM.dd}"
    }
  }
  else if [type] == "zimbra-block" {
    elasticsearch {
      hosts => [ "127.0.0.1" ]
      index => "logstash-zimbra-block-%{+YYYY.MM.dd}"
    }
  }
  else if [type] == "zimbra-relayed" {
    elasticsearch {
      hosts => [ "127.0.0.1" ]
      index => "logstash-zimbra-relayed-%{+YYYY.MM.dd}"
    }
  }
}

 

Python

Sending dicts, lists etc. as arguments to a fabric…

A quick tip if you need to send more complex objects than strings to a fabric task.

It’s basically just a matter of proper escaping and using python’s json library to parse. Keep in mind:

  • Put single quotes around the “argument” (the whole json string) and use doublequotes in the json structure itself
  • Escape all “,” in the json with “\,”

In short you can call a task like so:

fab [args] mytask:petdict='{"cat": "1"\, "dog": "1"}'

Then, from within the task itself you just…

import json
...
def mytask(petdict=None):
    if petdict:
        parsed_dict = json.loads(petdict)
Linux

Python Script to clean out old elasticsearch indices

Below you will find a simple python script that can be used together with crontab or manually to clear out old daily elasticsearch indices. You just need to specify the index name and how many days back (from today) that you want to remove.

Example use to delete syslog indices older than 90 days:

python clean-old-indices.py syslog 90

#!/usr/bin/env python
# -*- coding: utf8 -*-

__author__ = "Kristofer Borgström"
__credits__ = "Mattias Hemmingsson"

import sys, getopt, urllib2, datetime


def usage():
    print "clean-old-indices.py [-h <elastic host>] <index> <days-to-keep>"
    print "Example: python clean-old-indices.py -h 1.1.1.1 logstash-syslog 90"
    print "Default host is: localhost"


def es_execute(hostname, path, method="GET"):
    """
    Run the command against the elasticsearch server
    """

    url = "http://{0}:9200/{1}".format(hostname, path)

    req = urllib2.Request(url)
    req.get_method = lambda: method

    res = urllib2.urlopen(req)

    return res.read()


try:
    # Parse the arguments and options
    argv = sys.argv[1:]
    opts, args = getopt.getopt(argv, "h:")

    if len(args) != 2:
        raise getopt.GetoptError("")

    host = "localhost"
    for o, a in opts:
        print o
        if o == '-h':
            host = a

    arg_iter = iter(args)
    index_name = arg_iter.next()
    days = int(arg_iter.next())

    # Index cutoff definition, remove older than this date
    earliest_to_keep = datetime.date.today() - datetime.timedelta(days=days)

    index_cutoff = "{0}-{1}".format(index_name, earliest_to_keep.strftime("%Y.%m.%d"))

    all_indices =  es_execute(host, '_cat/indices')

    for line in all_indices.splitlines():
        index = line.split(" ")[2]

        if index.startswith(index_name) and index < index_cutoff:
            print "Deleting index: %s"% index
            es_execute(host, index, method="DELETE")

except getopt.GetoptError:
    usage()
    sys.exit(2)