Update By Query API

edit

The update-by-query API is new and should still be considered experimental. The API may change in ways that are not backwards compatible

The simplest usage of _update_by_query just performs an update on every document in the index without changing the source. This is useful to pick up a new property or some other online mapping change. Here is the API:

POST /twitter/_update_by_query?conflicts=proceed

That will return something like this:

{
  "took" : 147,
  "timed_out": false,
  "updated": 120,
  "batches": 1,
  "version_conflicts": 0,
  "noops": 0,
  "retries": 0,
  "throttled_millis": 0,
  "requests_per_second": "unlimited",
  "throttled_until_millis": 0,
  "total": 120,
  "failures" : [ ]
}

_update_by_query gets a snapshot of the index when it starts and indexes what it finds using internal versioning. That means that you’ll get a version conflict if the document changes between the time when the snapshot was taken and when the index request is processed. When the versions match the document is updated and the version number is incremented.

All update and query failures cause the _update_by_query to abort and are returned in the failures of the response. The updates that have been performed still stick. In other words, the process is not rolled back, only aborted. While the first failure causes the abort all failures that are returned by the failing bulk request are returned in the failures element so it’s possible for there to be quite a few.

If you want to simply count version conflicts not cause the _update_by_query to abort you can set conflicts=proceed on the url or "conflicts": "proceed" in the request body. The first example does this because it is just trying to pick up an online mapping change and a version conflict simply means that the conflicting document was updated between the start of the _update_by_query and the time when it attempted to update the document. This is fine because that update will have picked up the online mapping update.

Back to the API format, you can limit _update_by_query to a single type. This will only update tweet`s from the `twitter index:

POST /twitter/tweet/_update_by_query?conflicts=proceed

You can also limit _update_by_query using the Query DSL. This will update all documents from the twitter index for the user kimchy:

POST /twitter/_update_by_query?conflicts=proceed
{
  "query": { 
    "term": {
      "user": "kimchy"
    }
  }
}

The query must be passed as a value to the query key, in the same way as the Search API. You can also use the q parameter in the same way as the search api.

So far we’ve only been updating documents without changing their source. That is genuinely useful for things like picking up new properties but it’s only half the fun. _update_by_query supports a script object to update the document. This will increment the likes field on all of kimchy’s tweets:

POST /twitter/_update_by_query
{
  "script": {
    "inline": "ctx._source.likes++"
  },
  "query": {
    "term": {
      "user": "kimchy"
    }
  }
}

Just as in Update API you can set ctx.op = "noop" if your script decides that it doesn’t have to make any changes. That will cause _update_by_query to omit that document from its updates. Setting ctx.op to anything else is an error. If you want to delete by a query you can use the Delete by Query plugin instead. Setting any other field in ctx is an error.

Note that we stopped specifying conflicts=proceed. In this case we want a version conflict to abort the process so we can handle the failure.

This API doesn’t allow you to move the documents it touches, just modify their source. This is intentional! We’ve made no provisions for removing the document from its original location.

It’s also possible to do this whole thing on multiple indexes and multiple types at once, just like the search API:

POST /twitter,blog/tweet,post/_update_by_query

If you provide routing then the routing is copied to the scroll query, limiting the process to the shards that match that routing value:

POST /twitter/_update_by_query?routing=1

By default _update_by_query uses scroll batches of 1000. You can change the batch size with the scroll_size URL parameter:

POST twitter/_update_by_query?scroll_size=100

URL Parameters

edit

In addition to the standard parameters like pretty, the Update By Query API also supports refresh, wait_for_completion, consistency, and timeout.

Sending the refresh will update all shards in the index being updated when the request completes. This is different than the Index API’s refresh parameter which causes just the shard that received the new data to be indexed.

If the request contains wait_for_completion=false then Elasticsearch will perform some preflight checks, launch the request, and then return a task which can be used with Tasks APIs to cancel or get the status of the task. For now, once the request is finished the task is gone and the only place to look for the ultimate result of the task is in the Elasticsearch log file. This will be fixed soon.

consistency controls how many copies of a shard must respond to each write request. timeout controls how long each write request waits for unavailable shards to become available. Both work exactly how they work in the Bulk API.

requests_per_second can be set to any decimal number (1.4, 6, 1000, etc) and throttles the number of requests per second that the update by query issues. The throttling is done waiting between bulk batches so that it can manipulate the scroll timeout. The wait time is the difference between the time it took the batch to complete and the time requests_per_second * requests_in_the_batch. Since the batch isn’t broken into multiple bulk requests large batch sizes will cause Elasticsearch to create many requests and then wait for a while before starting the next set. This is "bursty" instead of "smooth". The default is unlimited which is also the only non-number value that it accepts.

Response body

edit

The JSON response looks like this:

{
  "took" : 639,
  "updated": 0,
  "batches": 1,
  "version_conflicts": 2,
  "retries": 0,
  "throttled_millis": 0,
  "failures" : [ ]
}
took
The number of milliseconds from start to end of the whole operation.
updated
The number of documents that were successfully updated.
batches
The number of scroll responses pulled back by the the update by query.
version_conflicts
The number of version conflicts that the update by query hit.
retries
The number of retries that the update by query did in response to a full queue.
throttled_millis
Number of milliseconds the request slept to conform to requests_per_second.
failures
Array of all indexing failures. If this is non-empty then the request aborted because of those failures. See conflicts for how to prevent version conflicts from aborting the operation.

Works with the Task API

edit

While Update By Query is running you can fetch their status using the Task API:

POST /_tasks/?pretty&detailed=true&action=*byquery

The responses looks like:

{
  "nodes" : {
    "r1A2WoRbTwKZ516z6NEs5A" : {
      "name" : "Tyrannus",
      "transport_address" : "127.0.0.1:9300",
      "host" : "127.0.0.1",
      "ip" : "127.0.0.1:9300",
      "attributes" : {
        "testattr" : "test",
        "portsfile" : "true"
      },
      "tasks" : {
        "r1A2WoRbTwKZ516z6NEs5A:36619" : {
          "node" : "r1A2WoRbTwKZ516z6NEs5A",
          "id" : 36619,
          "type" : "transport",
          "action" : "indices:data/write/update/byquery",
          "status" : {    
            "total" : 6154,
            "updated" : 3500,
            "created" : 0,
            "deleted" : 0,
            "batches" : 4,
            "version_conflicts" : 0,
            "noops" : 0,
            "retries": 0,
            "throttled_millis": 0
          },
          "description" : ""
        }
      }
    }
  }
}

this object contains the actual status. It is just like the response json with the important addition of the total field. total is the total number of operations that the reindex expects to perform. You can estimate the progress by adding the updated, created, and deleted fields. The request will finish when their sum is equal to the total field.

Works with the Cancel Task API

edit

Any Update By Query can be canceled using the Task Cancel API:

POST /_tasks/{task_id}/_cancel

The task_id can be found using the tasks API above.

Cancelation should happen quickly but might take a few seconds. The task status API above will continue to list the task until it is wakes to cancel itself.

Rethrottling

edit

The value of requests_per_second can be changed on a running update by query using the _rethrottle API:

POST /_update_by_query/{task_id}/_rethrottle?requests_per_second=unlimited

The task_id can be found using the tasks API above.

Just like when setting it on the _update_by_query API requests_per_second can be either unlimited to disable throttling or any decimal number like 1.7 or 12 to throttle to that level. Rethrottling that speeds up the query takes effect immediately but rethrotting that slows down the query will take effect on after completing the current batch. This prevents scroll timeouts.

Pick up a new property

edit

Say you created an index without dynamic mapping, filled it with data, and then added a mapping value to pick up more fields from the data:

PUT test
{
  "mappings": {
    "test": {
      "dynamic": false,   
      "properties": {
        "text": {"type": "string"}
      }
    }
  }
}

POST test/test?refresh
{
  "text": "words words",
  "flag": "bar"
}'
POST test/test?refresh
{
  "text": "words words",
  "flag": "foo"
}'
PUT test/_mapping/test   
{
  "properties": {
    "text": {"type": "string"},
    "flag": {"type": "string", "analyzer": "keyword"}
  }
}

This means that new fields won’t be indexed, just stored in _source.

This updates the mapping to add the new flag field. To pick up the new field you have to reindex all documents with it.

Searching for the data won’t find anything:

POST test/_search?filter_path=hits.total
{
  "query": {
    "match": {
      "flag": "foo"
    }
  }
}
{
  "hits" : {
    "total" : 0
  }
}

But you can issue an _update_by_query request to pick up the new mapping:

POST test/_update_by_query?refresh&conflicts=proceed
POST test/_search?filter_path=hits.total
{
  "query": {
    "match": {
      "flag": "foo"
    }
  }
}
{
  "hits" : {
    "total" : 1
  }
}

You can do the exact same thing when adding a field to a multifield.