This connector provides a Sink that can write to an Elasticsearch 2.x Index. To use this connector, add the following dependency to your project:
Note that the streaming connectors are currently not part of the binary distribution. See here for information about how to package the program with the libraries for cluster execution.
Instructions for setting up an Elasticsearch cluster can be found here. Make sure to set and remember a cluster name. This must be set when creating a Sink for writing to your cluster
The connector provides a Sink that can send data to an Elasticsearch 2.x Index.
The sink communicates with Elasticsearch via Transport Client
See here for information about the Transport Client.
The code below shows how to create a sink that uses a TransportClient
for communication:
A Map of Strings is used to configure the Sink. The configuration keys
are documented in the Elasticsearch documentation
here.
Especially important is the cluster.name
. parameter that must correspond to
the name of your cluster and with ElasticSearch 2x you also need to specify path.home
.
Internally, the sink uses a BulkProcessor
to send Action requests to the cluster.
This will buffer elements and Action Requests before sending to the cluster. The behaviour of the
BulkProcessor
can be configured using these config keys:
This now provides a list of Elasticsearch Nodes
to which the sink should connect via a TransportClient
.
More information about Elasticsearch can be found here.
For the execution of your Flink program, it is recommended to build a so-called uber-jar (executable jar) containing all your dependencies (see here for further information).
However,
when an uber-jar containing an Elasticsearch sink is executed,
an IllegalArgumentException
may occur,
which is caused by conflicting files of Elasticsearch and it’s dependencies
in META-INF/services
:
IllegalArgumentException[An SPI class of type org.apache.lucene.codecs.PostingsFormat with name 'Lucene50' does not exist. You need to add the corresponding JAR file supporting this SPI to your classpath. The current classpath supports the following names: [es090, completion090, XBloomFilter]]
If the uber-jar is build by means of maven, this issue can be avoided by adding the following bits to the pom file:
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/services/org.apache.lucene.codecs.Codec</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/services/org.apache.lucene.codecs.DocValuesFormat</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/services/org.apache.lucene.codecs.PostingsFormat</resource>
</transformer>