This connector provides a Sink that can write to an
Elasticsearch Index. To use this connector, add the
following dependency to your project:
Note that the streaming connectors are currently not part of the binary
distribution. See
here
for information about how to package the program with the libraries for
cluster execution.
Installing Elasticsearch
Instructions for setting up an Elasticsearch cluster can be found
here.
Make sure to set and remember a cluster name. This must be set when
creating a Sink for writing to your cluster
Elasticsearch Sink
The connector provides a Sink that can send data to an Elasticsearch Index.
The sink can use two different methods for communicating with Elasticsearch:
An embedded Node
The TransportClient
See here
for information about the differences between the two modes.
This code shows how to create a sink that uses an embedded Node for
communication:
Note how a Map of Strings is used to configure the Sink. The configuration keys
are documented in the Elasticsearch documentation
here.
Especially important is the cluster.name parameter that must correspond to
the name of your cluster.
Internally, the sink uses a BulkProcessor to send index requests to the cluster.
This will buffer elements before sending a request to the cluster. The behaviour of the
BulkProcessor can be configured using these config keys:
bulk.flush.max.actions: Maximum amount of elements to buffer
bulk.flush.max.size.mb: Maximum amount of data (in megabytes) to buffer
bulk.flush.interval.ms: Interval at which to flush data regardless of the other two
settings in milliseconds
This example code does the same, but with a TransportClient:
The difference is that we now need to provide a list of Elasticsearch Nodes
to which the sink should connect using a TransportClient.
More information about Elasticsearch can be found here.