Apache Pig support
Apache Pig is a platform for analyzing large data sets that consists of a high-level language for expressing data analysis programs, coupled with infrastructure for evaluating these programs. |
-- Pig website |
It provides a high-level, powerful, scripting-like transformation language which gets compiled into Map/Reduce jobs at runtime by the Pig compiler. To simplify working with arbitrary data, Pig associates a schema (or type information) with each data set for validation and performance. This in turn, breaks it down into discrete data types that can be transformed through various operators or custom functions (or UDFs). Data can be loaded from and stored to various storages such as the local file-system or HDFS, and with elasticsearch-hadoop into Elasticsearch as well.
editIn order to use elasticsearch-hadoop, its jar needs to be in Pig’s classpath. There are various ways of making that happen though typically the REGISTER command is used:
REGISTER /path/elasticsearch-hadoop.jar;
The command expects a proper URI that can be found either on the local file-system or remotely. Typically it’s best to use a distributed file-system (like HDFS or Amazon S3) and use that since the script might be executed on various machines.
As an alternative, when using the command-line, one can register additional jars through the -Dpig.additional.jars
option (that accepts an URI as well):
$ pig -Dpig.additional.jars=/path/elasticsearch-hadoop.jar:<other.jars> script.pig
or if the jars are on HDFS
$ pig \ -Dpig.additional.jars=hdfs://<cluster-name>:<cluster-port>/<path>/elasticsearch-hadoop.jar:<other.jars> script.pig
editWith Pig, one can specify the configuration properties (as an alternative to Hadoop Configuration
object) as a constructor parameter when declaring EsStorage
STORE B INTO 'radio/artists' USING org.elasticsearch.hadoop.pig.EsStorage ('es.http.timeout = 5m', 'es.index.auto.create = false');
elasticsearch-hadoop configuration (target resource) |
elasticsearch-hadoop option (http timeout) |
another elasticsearch-hadoop configuration (disable automatic index creation) |
To avoid having to specify the fully qualified class name (org.elasticsearch.hadoop.pig.EsStorage
), consider using a shortcut through DEFINE
DEFINE EsStorage org.elasticsearch.hadoop.pig.EsStorage();
Do note that it is possible (and recommended) to specify the configuration parameters to reduce script duplication, such as es.query
or es.mapping.names
DEFINE EsStorage org.elasticsearch.hadoop.pig.EsStorage('my.cfg.param=value');
Pig definitions are replaced as are; even though the syntax allows parametrization, Pig will silently ignore any parameters outside the DEFINE
Tuple field names
editAmong the various types available in Pig, tuple
s are used the most. Tuples are defined as “ordered sets of fields” (e.g. (19,2)
) however structurally they are shaped
as ordered maps since each field has a name, which may be defined or not (e.g. (field:19, another:2)
). The ordered aspect is important and forces elasticsearch-hadoop to use JSON arrays for tuples (using JSON objects is not an option as it does not preserve ordering besides the fact that it requires keys/names which might be or not available in a tuple).
Obeying the rule of least surprise, elasticsearch-hadoop by default will disregard a tuple’s field names, both when writing and reading.
To change this behavior (which in effect means treating tuples as arrays of maps instead of arrays), use the boolean property es.mapping.pig.tuple.use.field.names
(by default false
) and set it to true
The table below illustrates the difference between the two settings:
Tuple schema | Tuple value | Resulting JSON representation |
When using tuples, it is highly recommended to create the index mapping before-hand as it is quite common for tuples to contain mixed types (numbers, strings, other tuples, etc…) which, when mapped as an array (the default) can cause parsing errors (as the automatic mapping can infer the fields to be numbers instead of strings, etc…). In fact, the example above falls in this category since the tuple contains both a number (1
) and a string ("kimchy"
), which will the auto-detection to map both foo
and bar
as a number and thus causing an exception when encountering "kimchy"
. Please refer to this for more information.
Additionally consider breaking
ing the tuple into primitive/data atoms before sending the data off to Elasticsearch.
Reducers parallelism
editBy default, Pig will only use one reducer per job which in most cases is inefficient. To address these issue:
- Use the Parallel Features
As explained in the reference docs, out of the box Pig expects each reducer to process about 1 GB of data; unfortunately if the data is scattered
around the network this becomes inefficient as the entire job is effectively serialized. Change this by increasing the number of reducers to map that of your shards through the
property orPARALLEL
-- launch the Map/Reduce job with 5 reducers SET default_parallel 5;
or by using the PARALLEL
keyword with COGROUP
(inner), JOIN
(outer) and ORDER BY
- Disable split combination
- Out of the box Pig over-eagerly combines its input splits even if it does not know how big they are. This again kills parallelism since it serializes the queries to Elasticsearch ; typically this looks as follows in the logs:
20yy-mm-dd hh:mm:ss,mss [JobControl] INFO org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 25 20yy-mm-dd hh:mm:ss,mss [JobControl] INFO org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths (combined) to process : 1
Avoid this by setting pig.noSplitCombination
to true
(one can also use pig.splitCombination
to false
however we recommend the former) either by setting the property before invoking the script:
pig -Dpig.noSplitCombination=true myScript.pig
in the Pig script itself:
SET pig.noSplitCombination TRUE;
or through the global pig.properties
configuration in your Pig install:
Unfortunately elasticsearch-hadoop cannot set these properties automatically so the user has to do that manually per script or making them global through the Pig configuration as described above.
editOut of the box, elasticsearch-hadoop uses the Pig schema to map the data in Elasticsearch, using both the field names and types in the process. There are cases however when the names in Pig cannot
be used with Elasticsearch (invalid characters, existing names with different layout, etc…). For such cases, one can use the es.mapping.names
setting which accepts a comma-separated list of mapped names in the following format: Pig field name
: Elasticsearch field name
For example:
STORE B INTO '...' USING org.elasticsearch.hadoop.pig.EsStorage( 'es.mapping.names=date:@timestamp, uRL:url')
Pig column |
Since elasticsearch-hadoop 2.1, the Pig schema case sensitivity is preserved to Elasticsearch and back.
Writing data to Elasticsearch
editElasticsearch is exposed as a native Storage
to Pig so it can be used to store data into it:
-- load data from HDFS into Pig using a schema A = LOAD 'src/test/resources/artists.dat' USING PigStorage() AS (id:long, name, url:chararray, picture: chararray); -- transform data B = FOREACH A GENERATE name, TOTUPLE(url, picture) AS links; -- save the result to Elasticsearch STORE B INTO 'radio/artists' USING org.elasticsearch.hadoop.pig.EsStorage();
Elasticsearch resource (index and type) associated with the given storage |
additional configuration parameters can be passed inside the |
For cases where the id (or other metadata fields like ttl
or timestamp
) of the document needs to be specified, one can do so by setting the appropriate mapping, namely es.mapping.id
. Following the previous example, to indicate to Elasticsearch to use the field id
as the document id, update the Storage
STORE B INTO 'radio/artists USING org.elasticsearch.hadoop.pig.EsStorage('es.mapping.id=id'...);
Writing existing JSON to Elasticsearch
editWhen the job input data is already in JSON, elasticsearch-hadoop allows direct indexing without applying any transformation; the data is taken as is and sent directly to Elasticsearch. In such cases, one needs to indicate the json input by setting
the es.input.json
parameter. As such, in this case elasticsearch-hadoop expects to receive a tuple with a single field (representing the JSON document); the library will recognize common textual types such as chararray
or bytearray
otherwise it just calls toString
to get a hold of the JSON content.
Table 4. Pig types to use for JSON representation
Pig type |
Comment |
use this when the JSON data is represented as a |
use this if the JSON data is represented as a |
anything else |
make sure the |
Make sure the data is properly encoded, in UTF-8
. The field content is considered the final form of the document sent to Elasticsearch.
A = LOAD '/resources/artists.json' USING PigStorage() AS (json:chararray);" STORE B INTO 'radio/artists' USING org.elasticsearch.hadoop.pig.EsStorage('es.input.json=true'...);
Writing to dynamic/multi-resources
editOne can index the data to a different resource, depending on the row being read, by using patterns. Reusing the aforementioned media example, one could configure it as follows:
A = LOAD 'src/test/resources/media.dat' USING PigStorage() AS (name:chararray, type:chararray, year: chararray); STORE B INTO 'my-collection-{type}/doc' USING org.elasticsearch.hadoop.pig.EsStorage();
Tuple field used by the resource pattern. Any of the declared fields can be used. |
Resource pattern using field |
For each tuple about to be written, elasticsearch-hadoop will extract the type
field and use its value to determine the target resource.
The functionality is also available when dealing with raw JSON - in this case, the value will be extracted from the JSON document itself. Assuming the JSON source contains documents with the following structure:
the table declaration can be as follows:
A = LOAD '/resources/media.json' USING PigStorage() AS (json:chararray);" STORE B INTO 'my-collection-{media_type}/doc' USING org.elasticsearch.hadoop.pig.EsStorage('es.input.json=true');
Schema declaration for the tuple. Since JSON input is used, the schema is simply a holder to the raw data |
Resource pattern relying on fields within the JSON document and not on the table schema |
Reading data from Elasticsearch
editAs you would expect, loading the data is straight forward:
-- execute Elasticsearch query and load data into Pig A = LOAD 'radio/artists' USING org.elasticsearch.hadoop.pig.EsStorage('es.query=?me*'); DUMP A;
Due to a bug in Pig, LoadFunctions
are not aware of any schema associated with them. This means EsStorage
is forced to fully parse the documents
from Elasticsearch before passing the data to Pig for projection. In practice, this has little impact as long as a document top-level fields are used; for nested fields consider extracting the values
yourself in Pig.
Reading data from Elasticsearch as JSON
editIn the case where the results from Elasticsearch need to be in JSON format (typically to be sent down the wire to some other system), one can instruct elasticsearch-hadoop to return the data as is. By setting es.output.json
to true
, the connector will parse the response from Elasticsearch, identify the documents and, without converting them, return their content to the user as String/chararray
Type conversion
editIf automatic index creation is used, please review this section for more information.
Pig internally uses native java types for most of its types and elasticsearch-hadoop abides to that convention.
Pig type | Elasticsearch type |
Available in Pig 0.10 or higher |
Available in Pig 0.11 or higher |
Available in Pig 0.12 or higher |
While Elasticsearch understands the Pig types up to version 0.12.1, it is backwards compatible with Pig 0.9
It is worth mentioning that rich data types available only in Elasticsearch, such as GeoPoint
or GeoShape
are supported by converting their structure into the primitives available in the table above. For example, based on its storage a geo_point
might be
returned as a chararray
or a tuple
