WARNING: Version 5.4 has passed its EOL date.
This documentation is no longer being maintained and may be removed. If you are running this version, we strongly advise you to upgrade. For the latest information, see the current release documentation.
Apache Storm support
editApache Storm support
editAdded in 2.1.
Apache Storm is a free and open source distributed realtime computation system. Storm makes it easy to reliably process unbounded streams of data, doing for realtime processing what Hadoop did for batch processing. |
||
-- Storm website |
With Storm, one can compute, transform and filter data typically in a streaming scenario. As opposed to the rest of the libraries mentioned in this documentation, Apache Storm is a computational framework that is not tied to Map/Reduce itself however it does integrate with Hadoop, mainly through HDFS.
Since elasticsearch-hadoop 5.0, Storm 1.0 or higher (older versions cannot be used unfortunately as Storm 1.x broke backwards compatibility though package renaming).
Installation
editIn order to use elasticsearch-hadoop, its jar needs to be available in Storm’s classpath. The Storm documentation covers this in detail but in short, one can either have the jar available on all Storm nodes or have elasticsearch-hadoop part of the jar being deployed (which we recommend). The latter approach allows isolation between the jobs and since the jar is self-contained, can be easily be moved across environments without additional setup making it much more robust.
Configuration
editThe Storm integration supports the configuration options described in the Configuration chapter plus a few more that are specific to Storm, namely:
Spout specific
edit-
es.storm.spout.reliable
(default false) -
Indicates whether the dedicated
EsSpout
is reliable, that is replays the documents in case of failure or not. By default it is set tofalse
since replaying requires the documents to be kept in memory until are being acknowledged. -
es.storm.spout.fields
(default "") -
Specify what fields
EsSpout
will declare in its topology and extract from the returned documents. By default is unset meaning the documents are returned asMap
s under the default fielddoc
. -
es.storm.spout.reliable.queue.size
(default 0) -
Applicable only if
es.storm.spout.reliable
istrue
. Sets the size of the queue which holds documents in memory to be replayed until they are acknowledged. By default, the queue is unbounded (0
) however in a production environment it is indicated to limit the queue to limit the consumption of memory. If the queue is full, theBolt
drops any incomingTuple
s and throws an exception. -
es.storm.spout.reliable.retries.per.tuple
(default 5) -
Applicable only if
es.storm.spout.reliable
istrue
. Set the number of retries (replays) of a failed tuple before giving up. Setting it to a negative value will cause the tuple to be replayed until acknowledged. -
es.storm.spout.reliable.handle.tuple.failure
(default abort) -
Applicable only if
es.storm.spout.reliable
istrue
. Indicates how to handle failing tuples after the number of retries is depleted. Possible values are :-
ignore
- the tuple is discarded
-
warn
- a warning message is logged and the tuple is discarded
-
strict
- an exception is thrown, aborting the current job
-
Bolt specific
edit-
es.storm.bolt.write.ack
(default false) -
Indicates whether the dedicated
EsBolt
is reliable, that is acknowledges theTuple
after it is written to Elasticsearch instead of when it receives it. By default it isfalse
. Note that turning this on increases the memory requirements of theBolt
since it has to keep the data in memory until it is fully written. -
es.storm.bolt.flush.entries.size
(default 1000) -
The number of entries that trigger a micro-batch write to Elasticsearch. By default, it uses the same value as
es.batch.size.entries
which, by default is1000
. -
es.storm.bolt.tick.tuple.flush
(default true) -
Whether or not to flush the existing data if the
Bolt
receives a Tick tuple. This heart-beat-like mechanism goes hand in hand with the flush limit above to create both a time and size trigger. When using Storm’s internal ticks, remember to set the tick interval:
// tick every 2 seconds builder.setBolt(...).addConfiguration(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 2)
Setting the configuration
editThe configuration can be set through Storm’s Config class, in which case they are available globally or, individually for each EsSpout
and EsBolt
instance. In general, it is recommended to set globally only the properties that apply to all the components and are unlikely to change:
Config conf = new Config(); conf.put("es.index.auto.create", "true"); StormSubmitter.submitTopology("myTopology", conf, topology);
For this reason, typically, one should use the per-component configuration model since it allows different configurations to be used within the same Storm topology:
Map conf = new HashMap(); conf.put("es.index.auto.create", "true"); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("esSpout", new EsSpout("index/type", conf));
Writing data to Elasticsearch
editThrough elasticsearch-hadoop, Elasticsearch is exposed to Storm through a native Bolt
, namely org.elasticsearch.storm.EsBolt
that writes the Storm Tuple
s to Elasticsearch:
import org.elasticsearch.storm.EsBolt; TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 10); builder.setBolt( "es-bolt", new EsBolt( "storm/docs" ), 5) .shuffleGrouping("spout");
elasticsearch-hadoop |
|
|
|
Various constructors are available for |
|
The number of |
The number of bolt instances depends highly on your topology and environment. In general a good rule of thumb is to take into account the number of the target index shards as well as the number of spouts sending data to it - a good formula is to take the minimum between the source spouts and the index shards; in this example 5. A high number of Bolt
s does not translate to a bigger through-put - make sure the Bolt
s are the bottleneck since increasing the number simply translates otherwise to wasted cycles.
For cases where the id (or other metadata fields like ttl
or timestamp
) of the document needs to be specified, one can do so by setting the appropriate mapping, namely es.mapping.id
. Thus assuming the documents contain a field called sentenceId
which is unique and is suitable for an identifier, one can update the job configuration as follows:
Map conf = new HashMap(); conf.put("es.mapping.id", "sentenceId"); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("esSpout", new EsSpout("index/type", conf));
Writing existing JSON to Elasticsearch
editIf the data passed to Storm is already in JSON format, EsBolt
can pass it directly to Elasticsearch without any transformation; the data is taken as is and sent over the wire. In such cases, one needs to indicate the JSON input by setting the es.input.json
parameter to true
. Furthermore, the Bolt
expects the receiving Tuple
to contain only one value/field representing the JSON document. By default, common textual types are recognized, such as chararray
or bytearray
; otherwise it falls back to calling toString
to get a hold of the JSON content.
String json1 = "{\"reason\" : \"business\",\"airport\" : \"SFO\"}"; String json2 = "{\"participants\" : 5,\"airport\" : \"OTP\"}"; Map conf = new HashMap(); conf.put("es.input.json", "true"); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("json-spout", new StringSpout(Arrays.asList(json1, json2)); builder.setBolt("es-bolt", new EsBolt("storm/json-trips", conf)) .shuffleGrouping("json-spout");
JSON document represented as a |
|
Option indicating the input is in JSON format |
|
Basic |
|
Configure |
Writing to dynamic/multi-resources
editIn cases where the data needs to be indexed based on its content, one can choose the target index based on a Tuple
field. Reusing the aforementioned media example, one can partition the documents based on their type. Assuming the document tuple contains fields media_type
, title
and year
one can index them as follows:
For each tuple about to be written, elasticsearch-hadoop will extract the type
field and use its value to determine the target resource. The functionality is also available when dealing with raw JSON - in this case, the value will be extracted from the JSON document itself.
The functionality is also available when dealing with raw JSON - in this case, the value will be extracted from the JSON document itself. Assuming the JSON source contains documents with the following structure:
the EsBolt
with the configuration:
Map conf = new HashMap(); conf.put("es.input.json", "true"); builder.setBolt("es-bolt", new EsBolt( "my-collection-{year}/{media_type}", conf) ).shuffleGrouping("spout");
Option indicating the input is in JSON format |
|
Resource pattern - notice how the pattern is used both in the index and the type |
|
Pass configuration to |
Reading data from Elasticsearch
editAs you can expect, for reading data (typically executing queries) elasticsearch-hadoop offers a dedicated Spout
through org.elasticsearch.storm.EsSpout
which executes the query in Elasticsearch and streams the results back to Apache Storm:
import org.elasticsearch.storm.EsSpout; TopologyBuilder builder = new TopologyBuilder(); builder.setSpout( "es-spout", new EsSpout( "storm/docs", "?q=me*), 5); builder.setBolt("bolt", new PrinterBolt()).shuffleGrouping("es-spout");
elasticsearch-hadoop |
|
|
|
The source Elasticsearch resource (index and type) for the data |
|
The query to execute (optional) - if no query is specified, the entire indexed data is streamed |
|
The number of |
The number of Spout
instances depends highly on your topology and environment. Typically you should use the number of shards of your target data as an indicator - if you index has 5 shards, create 5 EsSpout
s; however sometimes the shards number might be considerably bigger than the number of Spout
s you can add to your Apache Storm cluster; in that case, it is better to limit the number of EsSpout
instances. Last but not least, adding more EsSpout
instances than the number of shards of the source index does not improve performance; in fact the extra instances will just waste resources without processing anything.
Customizing EsSpout
fields
editSince Storm requires each Spout
to declare its fields when creating a topology, by default EsSpout
declares for its tuples a generic doc
field containing the documents returned (one per tuple) from Elasticsearch. When dealing with structured data (documents sharing the same fields), one can configure the EsSpout
to declare as fields the document properties effectively unwrapping the document as a Tuple
. By setting up es.storm.spout.fields
, EsSpout
will use them indicate to the Storm topology the tuple content and extract them from the returned document.
For example if the Elasticsearch documents contain 3 fields: name
, age
and gender
by setting es.storm.spout.fields
to name, age, gender
, instead of returning a tuple with one field (doc
, containing the document), a tuple containing
the three named fields (name
, age
and gender
) will be returned instead.