This connector provides access to data streams from RabbitMQ. To use this connector, add the following dependency to your project:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-rabbitmq_2.10</artifactId>
<version>1.2-SNAPSHOT</version>
</dependency>
Note that the streaming connectors are currently not part of the binary distribution. See linking with them for cluster execution here.
Follow the instructions from the RabbitMQ download page. After the installation the server automatically starts, and the application connecting to RabbitMQ can be launched.
A class which provides an interface for receiving data from RabbitMQ.
The followings have to be provided for the RMQSource(…)
constructor in order:
true
when correlation ids should be used, false
otherwise (default is false
).This source can be operated in three different modes:
Correlation ids are a RabbitMQ application feature. You have to set it in the message properties
when injecting messages into RabbitMQ. If you set usesCorrelationId
to true and do not supply
unique correlation ids, the source will throw an exception (if the correlation id is null) or ignore
messages with non-unique correlation ids. If you set usesCorrelationId
to false, then you don’t
have to supply correlation ids.
Example:
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost("localhost").setPort(5000).setUserName(..)
.setPassword(..).setVirtualHost("/").build();
DataStream<String> streamWithoutCorrelationIds = env
.addSource(new RMQSource<String>(connectionConfig, "hello", new SimpleStringSchema()))
.print
DataStream<String> streamWithCorrelationIds = env
.addSource(new RMQSource<String>(connectionConfig, "hello", true, new SimpleStringSchema()))
.print
val connectionConfig = new RMQConnectionConfig.Builder()
.setHost("localhost").setPort(5000).setUserName(..)
.setPassword(..).setVirtualHost("/").build()
streamWithoutCorrelationIds = env
.addSource(new RMQSource[String](connectionConfig, "hello", new SimpleStringSchema))
.print
streamWithCorrelationIds = env
.addSource(new RMQSource[String](connectionConfig, "hello", true, new SimpleStringSchema))
.print
A class providing an interface for sending data to RabbitMQ.
The followings have to be provided for the RMQSink(…)
constructor in order:
Example:
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost("localhost").setPort(5000).setUserName(..)
.setPassword(..).setVirtualHost("/").build();
stream.addSink(new RMQSink<String>(connectionConfig, "hello", new SimpleStringSchema()));
val connectionConfig = new RMQConnectionConfig.Builder()
.setHost("localhost").setPort(5000).setUserName(..)
.setPassword(..).setVirtualHost("/").build()
stream.addSink(new RMQSink[String](connectionConfig, "hello", new SimpleStringSchema))
More about RabbitMQ can be found here.