Enrich your Elasticsearch documents with Logstash
We saw in the previous post that we can do data enrichment within Elasticsearch® with the Elasticsearch Enrich Processor within an ingest pipeline. But sometimes, you need to perform more complex tasks or your source of data is not Elasticsearch but another source. Or maybe you want to store in Elasticsearch and also in a third-party system, in which case, moving the execution of your pipeline to Logstash® makes a lot of sense.
Enriching Elasticsearch data with Elasticsearch
With Logstash, that is super easy with a pipeline similar to this:
input {
# Read all documents from Elasticsearch
elasticsearch {
hosts => ["${ELASTICSEARCH_URL}"]
user => "elastic"
password => "${ELASTIC_PASSWORD}"
index => "kibana_sample_data_logs"
docinfo => true
ecs_compatibility => "disabled"
}
}
filter {
# Enrich every document with Elasticsearch
elasticsearch {
hosts => ["${ELASTICSEARCH_URL}"]
user => "elastic"
password => "${ELASTIC_PASSWORD}"
index => "vip"
query => "ip:%{[clientip]}"
sort => "ip:desc"
fields => {
"[name]" => "[name]"
"[vip]" => "[vip]"
}
}
mutate {
remove_field => ["@version", "@timestamp"]
}
}
output {
if [name] {
# Write all modified documents to Elasticsearch
elasticsearch {
manage_template => false
hosts => ["${ELASTICSEARCH_URL}"]
user => "elastic"
password => "${ELASTIC_PASSWORD}"
index => "%{[@metadata][_index]}"
document_id => "%{[@metadata][_id]}"
}
}
}
In total, we have 14074
events to parse. Not a lot, but enough for this demo. Here's a sample event:
{
"agent": "Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24",
"bytes": 1831,
"clientip": "30.156.16.164",
"extension": "",
"geo": {
"srcdest": "US:IN",
"src": "US",
"dest": "IN",
"coordinates": {
"lat": 55.53741389,
"lon": -132.3975144
}
},
"host": "elastic-elastic-elastic.org",
"index": "kibana_sample_data_logs",
"ip": "30.156.16.163",
"machine": {
"ram": 9663676416,
"os": "win xp"
},
"memory": 73240,
"message": "30.156.16.163 - - [2018-09-01T12:43:49.756Z] \"GET /wp-login.php HTTP/1.1\" 404 1831 \"-\" \"Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24\"",
"phpmemory": 73240,
"referer": "http://www.elastic-elastic-elastic.com/success/timothy-l-kopra",
"request": "/wp-login.php",
"response": 404,
"tags": [
"success",
"info"
],
"timestamp": "2023-03-18T12:43:49.756Z",
"url": "https://elastic-elastic-elastic.org/wp-login.php",
"utc_time": "2023-03-18T12:43:49.756Z",
"event": {
"dataset": "sample_web_logs"
}
}
As we saw in the previous post, the vip
index contains information about our customers:
{
"ip" : "30.156.16.164",
"vip": true,
"name": "David P"
}
We can run the pipeline with:
docker run \
--name=logstash \
--rm -it \
-v $(pwd)/logstash-config/pipeline/:/usr/share/logstash/pipeline/ \
-e XPACK_MONITORING_ENABLED=false \
-e ELASTICSEARCH_URL="$ELASTICSEARCH_URL" \
-e ELASTIC_PASSWORD="$ELASTIC_PASSWORD" \
docker.elastic.co/logstash/logstash:8.12.0
The enriched document is now looking like this:
{
"agent": "Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24",
"bytes": 1831,
"clientip": "30.156.16.164",
"extension": "",
"geo": {
"srcdest": "US:IN",
"src": "US",
"dest": "IN",
"coordinates": {
"lat": 55.53741389,
"lon": -132.3975144
}
},
"host": "elastic-elastic-elastic.org",
"index": "kibana_sample_data_logs",
"ip": "30.156.16.163",
"machine": {
"ram": 9663676416,
"os": "win xp"
},
"memory": 73240,
"message": "30.156.16.163 - - [2018-09-01T12:43:49.756Z] \"GET /wp-login.php HTTP/1.1\" 404 1831 \"-\" \"Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24\"",
"phpmemory": 73240,
"referer": "http://www.elastic-elastic-elastic.com/success/timothy-l-kopra",
"request": "/wp-login.php",
"response": 404,
"tags": [
"success",
"info"
],
"timestamp": "2023-03-18T12:43:49.756Z",
"url": "https://elastic-elastic-elastic.org/wp-login.php",
"utc_time": "2023-03-18T12:43:49.756Z",
"event": {
"dataset": "sample_web_logs"
},
"vip": true,
"name": "David P"
}
It's easy actually but there's a problem: it's slow. Doing a lookup over the network, even though Elasticsearch is blazing fast, still slows down the whole pipeline.
Using a static JDBC filter
I met recently Laurent, from the amazing Elastic Consulting team, at the ParisJUG and we spoke about that problem. He told me that one of his customers had to face that issue. He suggested to use instead a cache of Elasticsearch in Logstash.
The problem is: there's no such a filter cache plugin in Logstash. He found a very smart way to address that problem by leveraging the use of the Static JDBC filter plugin and the Elasticsearch JDBC Driver.
Note that this requires to have a platinum license (or a trial).
Adding the Elasticsearch JDBC driver
We need first to add the JDBC driver to our Logstash instance.
mdir -p logstash-config/lib
wget https://artifacts.elastic.co/maven/org/elasticsearch/plugin/x-pack-sql-jdbc/8.12.0/x-pack-sql-jdbc-8.12.0.jar
mv x-pack-sql-jdbc-8.12.0.jar logstash-config/lib
We will just need to share this directory with our Logstash docker instance.
time docker run \
--name=logstash \
--rm -it \
-v $(pwd)/logstash-config/pipeline/:/usr/share/logstash/pipeline/ \
-v $(pwd)/logstash-config/lib/:/tmp/lib/ \
-e XPACK_MONITORING_ENABLED=false \
-e ELASTICSEARCH_URL="$ELASTICSEARCH_URL" \
-e ELASTIC_PASSWORD="$ELASTIC_PASSWORD" \
docker.elastic.co/logstash/logstash:8.12.0
Updating the pipeline
The input
part does not change. But now, we want to create a temporary in memory table named vip
(for consistency). This table structure is defined with the local_db_objects
parameter:
jdbc_static {
local_db_objects => [ {
name => "vip"
index_columns => ["ip"]
columns => [
["name", "VARCHAR(255)"],
["vip", "BOOLEAN"],
["ip", "VARCHAR(64)"]
]
} ]
}
When the jdbc_static
starts, we want to read all the data set first from Elasticsearch vip
index. This is done within the loaders
option:
jdbc_static {
loaders => [ {
query => "select name, vip, ip from vip"
local_table => "vip"
} ]
jdbc_user => "elastic"
jdbc_password => "${ELASTIC_PASSWORD}"
jdbc_driver_class => "org.elasticsearch.xpack.sql.jdbc.EsDriver"
jdbc_driver_library => "/tmp/lib/x-pack-sql-jdbc-8.12.0.jar"
jdbc_connection_string => "jdbc:es://${ELASTICSEARCH_URL}"
}
Every time we need to do a lookup, we want to perform it using the following statement:
SELECT name, vip FROM vip WHERE ip = "THE_IP"
This can be defined using the local_lookups
parameter:
jdbc_static {
local_lookups => [ {
query => "SELECT name, vip FROM vip WHERE ip = :ip"
parameters => { "ip" => "clientip" }
target => "vip"
} ]
}
If no data is found, we can provide a default value using the default_hash
option:
jdbc_static {
local_lookups => [ {
query => "SELECT name, vip FROM vip WHERE ip = :ip"
parameters => { "ip" => "clientip" }
target => "vip"
default_hash => {
name => nil
vip => false
}
} ]
}
At the end, this will generate a vip.name
and vip.vip
fields in the event.
We can now define what we want to do with those temporary fields:
jdbc_static {
add_field => { name => "%{[vip][0][name]}" }
add_field => { vip => "%{[vip][0][vip]}" }
remove_field => ["vip"]
}
This gives the following filter:
filter {
# Enrich every document with Elasticsearch via static JDBC
jdbc_static {
loaders => [ {
query => "select name, vip, ip from vip"
local_table => "vip"
} ]
local_db_objects => [ {
name => "vip"
index_columns => ["ip"]
columns => [
["name", "VARCHAR(255)"],
["vip", "BOOLEAN"],
["ip", "VARCHAR(64)"]
]
} ]
local_lookups => [ {
query => "SELECT name, vip FROM vip WHERE ip = :ip"
parameters => { "ip" => "clientip" }
target => "vip"
default_hash => {
name => nil
vip => false
}
} ]
add_field => { name => "%{[vip][0][name]}" }
add_field => { vip => "%{[vip][0][vip]}" }
remove_field => ["vip"]
jdbc_user => "elastic"
jdbc_password => "${ELASTIC_PASSWORD}"
jdbc_driver_class => "org.elasticsearch.xpack.sql.jdbc.EsDriver"
jdbc_driver_library => "/tmp/lib/x-pack-sql-jdbc-8.12.0.jar"
jdbc_connection_string => "jdbc:es://${ELASTICSEARCH_URL}"
}
mutate {
remove_field => ["@version", "@timestamp"]
}
}
Writing modified documents to Elasticsearch
In the first pipeline, we were testing if the name
field actually exists in the event:
if [name] {
# Index to Elasticsearch
}
We can still use something similar but because we provided default values in case the ip
cannot be found in the Elasticsearch vip
index, it now generates a new _jdbcstaticdefaultsused
tag in the tags
table.
We can use it to know if we found something or not and if the former, send our data to Elasticsearch:
output {
if "_jdbcstaticdefaultsused" not in [tags] {
# Write all the modified documents to Elasticsearch
elasticsearch {
manage_template => false
hosts => ["${ELASTICSEARCH_URL}"]
user => "elastic"
password => "${ELASTIC_PASSWORD}"
index => "%{[@metadata][_index]}"
document_id => "%{[@metadata][_id]}"
}
}
}
Is it faster?
So when we run the test on this small data set, we can see that with the Elasticsearch filter approach, it takes a bit more than two minutes to enrich our data set:
real 2m3.146s
user 0m0.077s
sys 0m0.042s
When running the pipeline with the JDBC static filter approach, it now takes less than a minute:
real 0m48.575s
user 0m0.064s
sys 0m0.039s
As we can see, we have significantly reduced the execution time of this enrichment pipeline (a gain of around 60%).
You could try this strategy (or a similar one) if you have a tiny Elasticsearch index that can easily fit in Logstash JVM memory. If you have hundred of million of documents, you should still use the Elasticsearch Filter Plugin.
Conclusion
In this post, we saw how we can use the JDBC static filter plugin to speed up our data enrichment pipeline when we need to perform some lookups in Elasticsearch. In the next post, we will see how we can do similar enrichment on the edge with the Elastic Agent.
The release and timing of any features or functionality described in this post remain at Elastic's sole discretion. Any features or functionality not currently available may not be delivered on time or at all.