Elasticsearch and SIEM: implementing host portscan detection
Intro: using a SIEM approach
Effectively monitoring security across a large organization is a non-trivial task faced everyday by all sorts of organizations.The speed, scalability and flexibility of the Elastic stack can play as a great asset when trying to get visibility and proactively monitoring large amounts of data.
The traditional SIEM approach relies on normalization of the data from raw, based on a schema.
For example a failed login, be it from a Linux
Nov 26 12:15:04 zeus sshd[19571]: Failed password for ciro from 10.0.4.23 port 57961 ssh2
or a Windows host,
Log Name: Security Source: Microsoft-Windows-Security-Auditing Date: 27/11/2015 2:07:33 PM Event ID: 4625 Task Category: Logon Level: Information Keywords: Audit Failure User: N/A Computer: minerva Description: An account failed to log on. Subject: Security ID: NULL SID Account Name: - Account Domain: - Logon ID: 0x0 Logon Type: 3 Account For Which Logon Failed: Account Name: gennaro <.....>
will be indexed observing a common structured format:
"src_user": "ciro"
"src_ip": "10.0.0.111"
"auth_type": "ssh2"
“src_user”:”gennaro”
“src_ip”:”10.0.0.118”
“auth_type”:”3”
Using a field naming convention allows to build correlation logic abstracting from which source the event originated from, be it a Windows or a Linux failed login.
Also some tagging or categorization of the data can be performed,
grok{ match => { "message" => ["%{SSH_AUTH_1}","%{SSH_AUTH_2}"] } add_tag => [ "auth_success" ] }
grok{ match => { "message" => ["%{SSH_AUTH_3}","%{SSH_AUTH_4}"] } add_tag => [ "auth_failure" ] }
where SSH_AUTH_X are our custom defined grok patterns to match success/failure events.
Using this approach, correlation logic can be applied to all the events, regardless of the datasource from which the event originated from.
Following the same approach, we will show how to use the Elastic stack to cover a basic network security use case, TCP host portscan detection, for which we'll implement alerting via email.
Implementation I: datasource
When trying to detect whether a portscan against a given host on your premises was carried on , network traffic data becomes relevant.
For this use case we will want to monitor all events indicating a new TCP connection being initiated from source to target host, in short all TCP packets with SYN=1, ACK=0.
While we impatiently wait for Packetbeat Flows to be released and allow more out-of-the-box network protocol level capture capabilities, we'll use tcpdump capture using the below command for the purpose of this blog:
sudo tcpdump -i eth0 -n -tttt tcp[13] == 2 | nc localhost 5001
the above command will listen on the eth0 network interface of the monitored host and capture all and only the TCP packets indicating that a new TCP connection handshake was initiated, also avoiding resolving IP to hostnames for faster execution; then we pipe the results to netcat to send them to our Logstash instance for event processing, which we assume here to be running locally.
For convenience, we can launch the above command using a all time favourite linux CLI utility, screen.
!/bin/bash screen -d -m /bin/bash -c 'sudo tcpdump -i eth0 -n -tttt tcp[13] == 2 | nc localhost 5001'
This is what the captured raw data looks like
2016-02-09 13:51:09.625253 IP 192.168.1.105.60805 > 192.168.1.1.80: Flags [S], seq 2127832187, win 29200, options [mss 1460,sackOK,TS val 259965981 ecr 0,nop,wscale 7], length 0
Implementation II : event processing
We'll use logstash to mangle the data and extract the information relevant to this use case, namely timestamp, src_ip and dst_port.
grok{ match => {"message" => "%{TCPD_TIMESTAMP:timestamp} IP %{IP:src_ip}\.%{INT:src_port} > %{IP:dst_ip}\.%{INT:dst_port}(?<payload>[^$]+)"} add_tag => ["network","tcp_connection_started"] }
where TCPD_TIMESTAMP is a custom defined grok pattern to match 2016-02-09 13:51:09.625253
.
As we have extracted the information we were after (timestamp
,src_ip
,dst_ip
) we can decide to trash message
and payload
fields:
mutate{ remove_field => ["message","payload"] }
Next we send these events to Elasticsearch index logstash-tcpdump-%{+YYYY.MM.dd}
elasticsearch { hosts => "es-server:9200" index => "logstash-tcpdump-%{+YYYY.MM.dd}" user => "logstash" password => "verysecretpassword" ssl => true cacert => "/path/to/cacert.pem" }
Implementation III: searching for a portscan
We're now at the stage where events are coming into Elasticsearch and we want to be automatically alerted when our monitored host will receive (or launch!) a portscan.
This is what our indexed event looks like:
{ "@version": "1", "@timestamp": "2016-02-08T00:56:58.407Z", "host": "127.0.0.1", "port": 41433, "type": "tcpdump", "timestamp": "2016-02-08 01:56:58.407625", "src_ip": "192.168.1.105", "src_port": "55203", "dst_ip": "192.168.1.1", "dst_port": "80" }
We can define a TCP host portscan as a large amount of connections attempted within a short amount of time between a source and a target host, where the target port is always changing from connection to connection. How would this translate to an elasticsearch query?
GET logstash-tcpdump-*/_search { "size": 0, "query": { "bool": { "must": [ { "match": { "tags": "tcp_connection_started" } }, { "range": { "@timestamp": { "gte": "now-30s" } } } ] } }, "aggs": { "by_src_ip": { "terms": { "field": "src_ip" }, "aggs": { "by_target_ip": { "terms": { "field": "dst_ip", "order": { "unique_port_count": "desc" } }, "aggs": { "unique_port_count": { "cardinality": { "field": "dst_port" } } } } } } } }
We leverage here a killer feature of Elasticsearch: aggregations. Specifically terms and cardinality aggregations.
Note we're purely interested in aggregated results, hence setting size:0
. The response we receive looks like:
{ "took": 9, "timed_out": false, "_shards": { "total": 24, "successful": 24, "failed": 0 }, "hits": { "total": 46, "max_score": 0, "hits": [] }, "aggregations": { "by_src_ip": { "doc_count_error_upper_bound": 0, "sum_other_doc_count": 0, "buckets": [ { "key": "192.168.1.17", "doc_count": 44, "by_target_ip": { "doc_count_error_upper_bound": 0, "sum_other_doc_count": 0, "buckets": [ { "key": "192.168.1.105", "doc_count": 44, "unique_port_count": { "value": 41 } } ] } }, { "key": "192.168.1.105", "doc_count": 2, "by_target_ip": { "doc_count_error_upper_bound": 0, "sum_other_doc_count": 0, "buckets": [ { "key": "192.168.1.10", "doc_count": 1, "unique_port_count": { "value": 1 } }, { "key": "192.168.1.32", "doc_count": 1, "unique_port_count": { "value": 1 } } ] } } ] } } }
From the above we can infer that host 192.168.1.17 has initiated 41 different TCP connections against host 192.168.1.105 which seems suspicious: 192.168.1.17 is our attacker.
Also host 192.168.1.105 has initiated 2 TCP connections against hosts 192.168.1.10 and 192.168.1.32, which seems legitimate.
Next we'll see how we can use Watcher to automatically receive an email when an event like this happens.
Implementation IV: alert me!
Watcher is our friend here, all we need to do is to configure a service email account, then define a new Watch and define how to act when a portscan is detected.
First we define a schedule, how often should the Watch be executed:
"trigger": { "schedule": { "interval": "10s" } }
Next, define what query search_type to run, on what indices and document types:
"input": { "search": { "request": { "search_type": "query_then_fetch", "indices": [ "logstash-tcpdump-*" ], "types": [ "tcpdump" ], "body": { #<insert query discussed in previous paragraph here> }
Now specify what condition would trigger the watch:
"condition": { "script": { "inline": "for (int i = 0; i < ctx.payload.aggregations.by_src_ip.buckets.size(); i++) {for (int j = 0; j < ctx.payload.aggregations.by_src_ip.buckets[i].by_target_ip.buckets.size(); j++) {if (ctx.payload.aggregations.by_src_ip.buckets[i].by_target_ip.buckets[j].unique_port_count.value > threshold) return true;};};return false;", "params": { "threshold": 50 } } }
The above groovy script will scan our aggregated results and look for a unique_port_count
bucket where the cardinality
is greater than 50; so putting within context, if a host has established within 30 seconds timerange, more than 50 connection each using a different port against another host, we will call this a portscan.
Last, what action should our Watch perform once its conditions are met? Send a nice email to warn us!
"actions": { "email_administrator": { "transform": { "script": { "inline": "def target='';def attacker='';def body='';for (int i = 0; i < ctx.payload.aggregations.by_src_ip.buckets.size(); i++) {for (int j = 0; j < ctx.payload.aggregations.by_src_ip.buckets[i].by_target_ip.buckets.size(); j++) {if (ctx.payload.aggregations.by_src_ip.buckets[i].by_target_ip.buckets[j].unique_port_count.value > threshold) {target=ctx.payload.aggregations.by_src_ip.buckets[i].by_target_ip.buckets[j].key;attacker=ctx.payload.aggregations.by_src_ip.buckets[i].key;body='Detected portscan from ['+attacker+'] to ['+target+']. '+ctx.payload.aggregations.by_src_ip.buckets[i].by_target_ip.buckets[j].unique_port_count.value+ ' unique ports scanned.'; return [ body : body ];};};};", "params": { "threshold": 50 } } }, "email": { "profile": "standard", "attach_data": true, "priority": "high", "to": [ "[email protected]" ], "subject": "[Security Alert] - Port scan detected", "body": "{{ctx.payload.body}}" } } }
What we do here is scanning again through the results to pick the attacker and target hosts, plus the count of how many unique ports were scanned.
The resulting watch then becomes:
PUT _watcher/watch/port_scan_watch { "trigger": { "schedule": { "interval": "10s" } }, "input": { "search": { "request": { "search_type": "query_then_fetch", "indices": [ "logstash-tcpdump-*" ], "types": [ "tcpdump" ], "body": { "size": 0, "query": { "bool": { "must": [ { "match": { "tags": "tcp_connection_started" } }, { "range": { "@timestamp": { "gte": "now-30s" } } } ] } }, "aggs": { "by_src_ip": { "terms": { "field": "src_ip" }, "aggs": { "by_target_ip": { "terms": { "field": "dst_ip", "order": { "unique_port_count": "desc" } }, "aggs": { "unique_port_count": { "cardinality": { "field": "dst_port" } } } } } } } } } } }, "condition": { "script": { "inline": "for (int i = 0; i < ctx.payload.aggregations.by_src_ip.buckets.size(); i++) {for (int j = 0; j < ctx.payload.aggregations.by_src_ip.buckets[i].by_target_ip.buckets.size(); j++) {if (ctx.payload.aggregations.by_src_ip.buckets[i].by_target_ip.buckets[j].unique_port_count.value > threshold) return true;};};return false;", "params": { "threshold": 50 } } }, "throttle_period": "30s", "actions": { "email_administrator": { "transform": { "script": { "inline": "def target='';def attacker='';def body='';for (int i = 0; i < ctx.payload.aggregations.by_src_ip.buckets.size(); i++) {for (int j = 0; j < ctx.payload.aggregations.by_src_ip.buckets[i].by_target_ip.buckets.size(); j++) {if (ctx.payload.aggregations.by_src_ip.buckets[i].by_target_ip.buckets[j].unique_port_count.value > threshold) {target=ctx.payload.aggregations.by_src_ip.buckets[i].by_target_ip.buckets[j].key;attacker=ctx.payload.aggregations.by_src_ip.buckets[i].key;body='Detected portscan from ['+attacker+'] to ['+target+']. '+ctx.payload.aggregations.by_src_ip.buckets[i].by_target_ip.buckets[j].unique_port_count.value+ ' unique ports scanned.'; return [ body : body ];};};};", "params": { "threshold": 50 } } }, "email": { "profile": "standard", "attach_data": true, "priority": "high", "to": [ "[email protected]" ], "subject": "[Security Alert] - Port scan detected", "body": "{{ctx.payload.body}}" } } } }
Testing our setup: you got mail!
Now on to seeing some action, let's login to a host that has connectivity towards our monitored host (in this example 192.168.1.105) and launch a port scan against it:
Elastic-MacBook-Air:~ user$ nmap 192.168.1.105 -p1-500 Starting Nmap 6.47 ( http://nmap.org ) at 2016-02-09 15:38 CET Nmap scan report for w530 (192.168.1.105) Host is up (0.0078s latency). Not shown: 495 closed ports PORT STATE SERVICE 22/tcp open ssh 80/tcp open http 139/tcp open netbios-ssn 389/tcp open ldap 445/tcp open microsoft-ds
Explicitly looking to probe privileged ports from 1 to 500. A few seconds later, we receive an email:
Et voila! The alert was triggered and intended watch action was performed.
Note that we could have multiple detections from different hosts, however for the purpose of this blog post we limit ourselves to detecting and reporting only the first one in the list.
As a side node, if you like NMap, take a look at this blog post to see all the awesome things you can do using logstash-codec-nmap.
This is just an example of how to leverage the Elastic stack for performing security monitoring, creativity is the only limit.
Happy alerting!