masscan meets ELK

Using Logstash, ElasticSearch and Kibana to visualize masscan findings

Posted by Emre Bastuz on January 21, 2016

Introduction

Let's say you did a masscan of your internal network and ended up with hundreds of thousands of findings and now you want to get some data visualization.

This post describes how this can be achieved by using a combination of

  • Logstash
  • ElasticSearch and
  • Kibana4

Before we start though, please make sure to read my earlier article masscan logging demystified.

All following details and configuration steps will assume that a patched version of masscan has been used to include the "Timestamp" field in the scan results.

Prerequisites

The prerequisites are as follows:

  • The scan results are in binary format (option "-oB")
  • The original scan was done either with a non-patched or patched version of masscan (for the scan itself it does not matter)
  • The original binary file has been converted to "grepable" format with the patched version of masscan (option --output-format grepable) as "myscan.txt"
  • The grepable format data file has been saved to the database system as /var/tmp/myscan.txt
  • ElasticSearch has been installed on the database system [1]
  • Logstash has been installed on the database system [2]
  • GeoIP has been insalled for use with LogStash [3]
  • Kibana4 has been installed on the database system [4]

Let's look at some log lines, just to have an impression of what data fields will be made available:

Timestamp: 1453384972   Host: 192.168.148.154 ()        Ports: 3306/open/tcp////
Timestamp: 1453384973   Host: 192.168.219.187 ()        Port: 161       Service: snmp   Banner: sysDescr:Huawei Versatile Routing Platform Software

Logstash configuration

The Logstash configuration must be made so that the fields in the scan result can be identified. GROK patterns will be used to extract the relevant data from the log lines.

The data fields that we will concentrate on are as follows:

  • timestamp
  • ip
  • port
  • state
  • ip protocol
  • service
  • banner

To further enhance the user experience, adding GeoIP data on the process is strongly recommended to make country, city, etc. information available - of course this only makes sense if the scanned network spans different countries.

Two "match" directives with different regular expressions/GROK patterns are required to process the findings (type "port open" or more details like "Service" or "Banner"):

  • Port is open: ^Timestamp: %{BASE10NUM:time_t}\s*Host: %{IPV4:ip} \(\)\s*Ports: %{BASE10NUM:port}/%{WORD:state}/%{WORD:ip_proto}////"
  • Port is open, service details are provided: ^Timestamp: %{BASE10NUM:time_t}\s*Host: %{IPV4:ip} \(\)\s*Port: %{BASE10NUM:port}\s*Service: %{WORD:service}\s*Banner: %{GREEDYDATA:banner}"

The complete Logstash configuration looks like this ("/etc/logstash/conf.d/server.conf"):

input {
  file {
    path => "/var/tmp/myscan.txt"
    start_position => "beginning"
  }
}

filter { grok { match => [ "message", "^Timestamp: %{BASE10NUM:time_t}\s*Host: %{IPV4:ip} ()\s*Ports: %{BASE10NUM:port}/%{WORD:state}/%{WORD:ip_proto}////", "message", "^Timestamp: %{BASE10NUM:time_t}\s*Host: %{IPV4:ip} ()\s*Port: %{BASE10NUM:port}\s*Service: %{WORD:service}\s*Banner: %{GREEDYDATA:banner}" ] }

date {
    match => [ "time_t", "UNIX" ]
}

geoip {
    source => "ip"
    target => "geoip"
    database => "/etc/logstash/GeoLiteCity.dat"
    add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
    add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
}

mutate {
    remove_field => [ "message", "host", "path", "time_t" ]
    convert => [ "[geoip][coordinates]", "float"]
}

}

output { elasticsearch { hosts => "127.0.0.1" index => "scan-%{+YYYY.MM.dd}" template_name => "scanresults" } }

Just a quick note one the following instruction:

date {
        match => [ "time_t", "UNIX" ]
}

This instruction will replace the time when the log entry was sent to ElasticSearch with the timestamp contained within the masscan log entry. I believe it provides more insight to know when the scan was actually done, as opposed to when the result was saved in the database.

With this configuration, Logstash is basically ready but before firing it up, some configuration needs to be done in ElasticSearch.

ElasticSearch index template configuration

In terms of configuration a custom index template needs to be provided in ElasticSearch.

This can be done with CURL:

curl -XPUT 'http://127.0.0.1:9200/_template/scanresults?pretty=true' -d '
{
  "template": "scan-*",
  "settings": {
    "number_of_shards": 1,
    "index.refresh_interval": "5s"
  },
  "mappings": {
    "default": {
      "all": {
        "enabled": false
      },
      "properties": {
        "@timestamp": {
          "type": "date",
          "format": "dateOptionalTime"
        },
        "@version": {
          "type": "integer",
          "index": "not_analyzed"
        },
        "time_t": {
          "type": "date",
          "format": "strict_date_optional_time||epoch_millis"
        },
        "ip": {
          "type": "ip",
          "norms": {
            "enabled": false
          }
        },
        "ip_proto": {
          "type": "string",
          "index": "not_analyzed"
        },
        "port": {
          "type": "integer",
          "index": "notanalyzed"
        },
        "service": {
          "type": "string",
          "index": "not_analyzed"
        },
        "state": {
          "type": "string",
          "index": "not_analyzed"
        },
        "banner": {
          "type": "string",
          "index": "analyzed"
        },
"geoip": { "type": "object", "dynamic": true, "properties": { "location": { "type": "geo_point" } } } } } } }'

With the command above, the following data types are assigned to the fields (just mentioning the relevant ones):

field type
time_t date
ip ip
ip_proto string
proto string
service string
state string
service string
banner string

One benefit of setting the field type "ip" for example is, that search queries can contain ip ranges:

port: 23 AND ip: [192.168.0.0 TO 192.168.0.255 ]

Starting up LogStash will result in the grepable scan result being parsed and fed into the database.

Setting up Kibana

Let's start off by connecting to the newly started Kibana 4 by accessing http://yourip:5601/:

Screenshot - Initial Kibana4 page

The default settings here are geared towards using the logstash defaults so we have to make two tweaks here:

  • Replace the index pattern "logstash-" with "scan-"
  • Uncheck "Index contains time-based events"
  • Click "Create"

Screenshot - Tweak index

After the index has been processed you should be able to see the fields that have been configured as provided in the index template:

Screenshot - Fields

Switching to the pane "Discover" in Kibana now should show you a list of the entries in Elasticsearch. The overview containg all fields might be a little bit chaotic and can be further tuned by selecting only the relevant fields by hovering over it and clicking the "add" button:

Screenshot - Add field

Adding the fields ip, port, ip_proto, service and banner to "Selected Fields" will show you a nice grid with an overview of the findings:

Screenshot - Grid

Creating a bar chart in Kibana

Let's say you want to create a fancy bar chart that shows an overview of all the ports that been found and their distribution.

Follow these steps:

  • Go to the pane "Visualize"
  • Click "Vertical bar chart"
  • Select "from a new search" (default search being "*")
  • Select "X-Axis"
  • Select the aggregation "Terms" (which is like a "unique" sort on a specific field)
  • Select "Port" in the dropdown "Field"
  • Press the "Apply changes" button (the green one in the upper left corner with the "play" symbol)

Screenshot - Bar chart

You might have noticed: the port numbers greater than 999 are displayed on the bar chart with a divider, which is kind of annoying:

Screenshot - Divider

This can be resolved by configuring Kibana4 to not display them in the field level formatting:

  • Click the "Settings" pane
  • Click "Indices"
  • Click "scan-*"
  • Click the "Edit" or "controls" icon for the field "port"
  • Select "Number" in the pulldown menu for "Format"
  • Edit the value "0,0.[000]" in "Numeral.js format pattern" and replace it with "0"
  • Click "Update Field"

Screenshot - Edit field

Going back to "Visualize" will now show the barchart again with the port numbers without dividers:

Screenshot - No divider

Next steps

After having a great tool in place to list and visualize data, the next step is to come up with some nice data analysis for security research and also for having fun!

  • How do the charts look that you have created?
  • What data aggregation do you consider useful?

Feel free to use the comments section below to share your thoughts! :-)

References

[1] https://www.elastic.co/guide/en/elasticsearch/reference/current/setup-repositories.html
[2] https://www.elastic.co/guide/en/logstash/current/package-repositories.html
[3] https://www.digitalocean.com/community/tutorials/how-to-map-user-location-with-geoip-and-elk-elasticsearch-logstash-and-kibana
[4] https://www.elastic.co/downloads/kibana