Elasticsearch 2.x Connector

This connector provides a Sink that can write to an Elasticsearch 2.x Index. To use this connector, add the following dependency to your project:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-elasticsearch2_2.10</artifactId>
  <version>1.2-SNAPSHOT</version>
</dependency>

Note that the streaming connectors are currently not part of the binary distribution. See here for information about how to package the program with the libraries for cluster execution.

Installing Elasticsearch 2.x

Instructions for setting up an Elasticsearch cluster can be found here. Make sure to set and remember a cluster name. This must be set when creating a Sink for writing to your cluster

Elasticsearch 2.x Sink

The connector provides a Sink that can send data to an Elasticsearch 2.x Index.

The sink communicates with Elasticsearch via Transport Client

See here for information about the Transport Client.

The code below shows how to create a sink that uses a TransportClient for communication:

File dataDir = ....;

DataStream<String> input = ...;

Map<String, String> config = new HashMap<>();
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1");
config.put("cluster.name", "my-cluster-name");

List<InetSocketAddress> transports = new ArrayList<>();
transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
transports.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));

input.addSink(new ElasticsearchSink(config, transports, new ElasticsearchSinkFunction<String>() {
  public IndexRequest createIndexRequest(String element) {
    Map<String, String> json = new HashMap<>();
    json.put("data", element);

    return Requests.indexRequest()
            .index("my-index")
            .type("my-type")
            .source(json);
  }

  @Override
  public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
    indexer.add(createIndexRequest(element));
  }
}));
val dataDir = ....;

val input: DataStream[String] = ...

val config = new util.HashMap[String, String]
config.put("bulk.flush.max.actions", "1")
config.put("cluster.name", "my-cluster-name")

val transports = new ArrayList[String]
transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300))
transports.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));

input.addSink(new ElasticsearchSink(config, transports, new ElasticsearchSinkFunction[String] {
  def createIndexRequest(element: String): IndexRequest = {
    val json = new util.HashMap[String, AnyRef]
    json.put("data", element)
    Requests.indexRequest.index("my-index").`type`("my-type").source(json)
  }

  override def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer) {
    indexer.add(createIndexRequest(element))
  }
}))

A Map of Strings is used to configure the Sink. The configuration keys are documented in the Elasticsearch documentation here. Especially important is the cluster.name. parameter that must correspond to the name of your cluster and with ElasticSearch 2x you also need to specify path.home.

Internally, the sink uses a BulkProcessor to send Action requests to the cluster. This will buffer elements and Action Requests before sending to the cluster. The behaviour of the BulkProcessor can be configured using these config keys:

  • bulk.flush.max.actions: Maximum amount of elements to buffer
  • bulk.flush.max.size.mb: Maximum amount of data (in megabytes) to buffer
  • bulk.flush.interval.ms: Interval at which to flush data regardless of the other two settings in milliseconds

This now provides a list of Elasticsearch Nodes to which the sink should connect via a TransportClient.

More information about Elasticsearch can be found here.

Packaging the Elasticsearch Connector into an Uber-jar

For the execution of your Flink program, it is recommended to build a so-called uber-jar (executable jar) containing all your dependencies (see here for further information).

However, when an uber-jar containing an Elasticsearch sink is executed, an IllegalArgumentException may occur, which is caused by conflicting files of Elasticsearch and it’s dependencies in META-INF/services:

IllegalArgumentException[An SPI class of type org.apache.lucene.codecs.PostingsFormat with name 'Lucene50' does not exist.  You need to add the corresponding JAR file supporting this SPI to your classpath.  The current classpath supports the following names: [es090, completion090, XBloomFilter]]

If the uber-jar is build by means of maven, this issue can be avoided by adding the following bits to the pom file:

<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
    <resource>META-INF/services/org.apache.lucene.codecs.Codec</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
    <resource>META-INF/services/org.apache.lucene.codecs.DocValuesFormat</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
   <resource>META-INF/services/org.apache.lucene.codecs.PostingsFormat</resource>
</transformer>