TaskRabbit is Hiring!

We’re a tight-knit team that’s passionate about building a solution that helps people by maximizing their time, talent and skills. We are actively hiring for our Engineering and Design teams. Click To Learn more

Aaron Binns

Elasticsearch Failure and Recovery

@ 31 Oct 2014

elasticsearch java

Here at TaskRabbit, we use Elasticsearch a bunch. Broadly speaking, our use of Elasticsearch can be split into two buckets:

  • production services
  • event logging and analytics

Production services encompass a wide range of complex queries against a number of indexes representing Taskers, Clients, Jobs and chat threads. For event logging and analytics, we have a Resque Bus worker that listens for all events and indexes them in Elasticsearch. Then, we use Kibana to monitor, track and analyze the events.

Up until about two months ago we had a single cluster for everything: production services as well as logging & analysis. This was working ok, but the event log indexes were growing and the VMs all had not-very-big SSDs. At the rate the event log indexes were growing, we’d be feeling the pinch on storage space after not too long. In addition, we experienced some mysterious performance problems, which we thought could be (partially) attributable to the different in operational characteristics of the production services vs. event logging and analysis. The production services require lots of queries with some amount of document updates, whereas the event logging adds a new document for every new event with a low volume of queries from kibana reports and ad hoc analysis.

Due to these factors, it became clear that we needed to split up the cluster. At the time, our simplest option was to provision some “bigdata” VMs that had sufficient CPU/RAM to support the analysis needs but were backed with larger, and slower, spinning disks. We figured that we could spin-up a couple of these “bigdata” VMs, add them to the cluster, then (somehow) migrate the event log indexes over to them.

After some investigation, it looked like using tags to manage index shard allocation would do the trick. We could tag the event log indexes with “bigdata” and then route requests to the corresponding nodes. The main thing we weren’t sure of was whether this approach would also re-balance all the shards so that the event log index shards would be automagically migrated over to the “bigdata” nodes.

We gave it a whirl, and it looked like everything worked! The event log index shards automagically migrated over to the “bigdata” nodes and new documents and queries were routed to the correct nodes.

All was well…or so it seemed.

Fast forward to the middle of October; something was rotten in Denmark.


We noticed data missing from the event log index. Oh noes!

Mind the gap

We’re not entirely confident in our diagnosis, but we think it had to do with a partial index replication due to staggered failure of the “bigdata” nodes, as discussed in our message to the Elasticsearch mailing list.

To summarize, we had two “bigdata” nodes and bigdata1 crashed, leaving bigdata2 as the sole “bigdata” node, but the indexes required 1 replica of each shard, so the cluster dutifully chose to honor replication over the shard allocation strategy and started replicating shards from bigdata2 to the “production” nodes. But, before the indexes were completely replicated, bigdata2 crashed, leaving a partial replica on one of the “production” nodes.

This is bad enough, but it gets worse because there are always new events to be logged and the Elasticsearch cluster dutifully added them to the partially replicated indexes on the “production” nodes. So, now we have new documents being written to a partially replicated index. To the Elasticsearch cluster, these are the “current” indexes.

Then, the bigdata[1,2] nodes come back online and Elasticsearch decides that their indexes are older than the “current” ones on the “production” nodes and dutifully replicates the “current” event log index from the “production” nodes back over to the “bigdata” nodes.

In the end, we wound up with event log indexes on the “bigdata” nodes missing a big chunk of data.

Split the Cluster

We quickly decided that having a single, partitioned cluster – with the event log index on “bigdata” nodes and the production indexes on the rest – was not a good long-term solution; so, we decided to split our cluster in two: one cluster for production indexes and one cluster for event logs and analysis.

The problem was that we were changing the transmission on the school bus full of children as it hurled down the freeway at breakneck speed.

We didn’t have extra “bigdata” VMs in our back pocket, so we had to quickly pull them out of the production cluster, move their Elasticsearch index data directories to a safe spot, then reconstitute them into a new “bigdata” cluster as fast as possible, because new events were queuing up in the ResqueBus, waiting to be written to the Elasticsearch event log index.

Fortunately, that all went off without a hitch and we soon had two clusters and all our back-end services talking to the appropriate one. The only remaining thing we needed to do was restore the backed-up event log data into the the new “bigdata” cluster. We had saved the index shards on the “bigdata” nodes before reconstituting them into the new cluster, but they were just sitting in a backup directory, not part of the live cluster.

Restore Event Log Data

Elasticsearch does not let you just drop index shards into its index data directory. There is cluster metadata stored in the shards and trying to copy them from one cluster to another is verboten.

If we did have extra “bigdata” VMs to spare, then we could have spun those up then used ElasticDump to dump the documents out of the production cluster and then imported them into the new “bigdata” cluster. But, as described above, we didn’t have that luxury and we were in the situation where we had these Elasticsearch Lucene index shards sitting in a back-up directory. We needed to extract the event documents from these on-disk indexes and re-import them back into the new “bigdata” cluster.

In theory, this is fairly straightforward. By default, when a document is added to Elasticsearch, the original JSON document is stored in the _source field of the Elasticsearch document. We did not change these defaults for our event log indexes, so all of the events that we indexed with Elasticsearch should be stored verbatim in the documents’ _source field. So, to retrieve the original event exactly as it was sent to Elasticsearch, we just had to go get it from the _source field of the Lucene document.

This is also, in theory, straightforward. Just a few lines of Java code is necessary to use the Lucene API to open up an index, get a document and retrieve the value for any stored field. But, there is always a difference between theory and practice, and in this situtation, that difference is due to enhancement in Lucene 4.x which allows applications to define their own custom binary formats for storing field values. And this is exactly what Elasticsearch does – it registers some custom code to store fields in a compression format of its own choosing.

Fortunately, the same code that Elasticsearch uses to store compressed values can be used to retrieve and decompress them.

Elasticsearch Index Dumper

I whipped up a simple command-line Java program to open up a Lucene index (presumably from Elasticsearch), iterate through the documents, and emit the contents of the _source stored field.

You can get it from: Elasticsearch Index Dumper

With this, I could now just run it with a list of Elasticsearch Lucene index shard directories and capture the output to a file. Then, use ElasticDump to read the documents from that file and re-index them into the new “bigdata” cluster.

For example, the backed-up event log indexes were originally partitioned by month, so in the backup directory, we had something of the form


where the indexes reflect the year and month of the event log data. In each of these directories were the index shards, such as:

$ ls -1F /data/es/indexes/events_2014_07/

where each shard contains an Elasticsearch Lucene index, e.g.

$ ls -1F /data/es/indexes/events_2014_07/0/

So, to dump out the documents from these index shards, simply

$ mkdir /data/es_export
$ git clone https://github.com/taskrabbit/esid
$ cd esid
$ export ES_HOME=/usr/local/elasticsearch
$ ant all
$ ./bin/esid /data/es/indexes/events_2014_07/?/index > /data/es_export/events_2014_07.json

And, as mentioned earlier, the output is of a form that ElasticDump can read, which makes re-importing this data into the new “bigdata” Elasticsearch cluster a snap:

$ elasticdump --input=/data/es_export/events_2014_07.json --output=http://<server magic>/events_2014_07

Et voilà.

Left as an exercise to the reader is a short Bash script to iterate over the indexes, dump each one to a file, then re-import them into a corresponding index in the new cluster.


Coments Loading...