Redis Connector

This connector provides a Sink that can write to Redis and also can publish data to Redis PubSub. To use this connector, add the following dependency to your project:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-redis_2.10</artifactId>
  <version>1.2-SNAPSHOT</version>
</dependency>

Version Compatibility: This module is compatible with Redis 2.8.5.

Note that the streaming connectors are currently not part of the binary distribution. You need to link them for cluster execution explicitly.

Installing Redis

Follow the instructions from the Redis download page.

Redis Sink

A class providing an interface for sending data to Redis. The sink can use three different methods for communicating with different type of Redis environments:

  1. Single Redis Server
  2. Redis Cluster
  3. Redis Sentinel

This code shows how to create a sink that communicate to a single redis server:

public static class RedisExampleMapper implements RedisMapper<Tuple2<String, String>>{

    @Override
    public RedisCommandDescription getCommandDescription() {
        return new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME");
    }

    @Override
    public String getKeyFromData(Tuple2<String, String> data) {
        return data.f0;
    }

    @Override
    public String getValueFromData(Tuple2<String, String> data) {
        return data.f1;
    }
}
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build();

DataStream<String> stream = ...;
stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());
class RedisExampleMapper extends RedisMapper[(String, String)]{
  override def getCommandDescription: RedisCommandDescription = {
    new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME")
  }

  override def getKeyFromData(data: (String, String)): String = data._1

  override def getValueFromData(data: (String, String)): String = data._2
}
val conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build()
stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper))

This example code does the same, but for Redis Cluster:

FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
    .setNodes(new HashSet<InetSocketAddress>(Arrays.asList(new InetSocketAddress(5601)))).build();

DataStream<String> stream = ...;
stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());
val conf = new FlinkJedisPoolConfig.Builder().setNodes(...).build()
stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper))

This example shows when the Redis environment is with Sentinels:

FlinkJedisSentinelConfig conf = new FlinkJedisSentinelConfig.Builder()
    .setMasterName("master").setSentinels(...).build();

DataStream<String> stream = ...;
stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());
val conf = new FlinkJedisSentinelConfig.Builder().setMasterName("master").setSentinels(...).build()
stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper))

This section gives a description of all the available data types and what Redis command used for that.

Data Type Redis Command [Sink] Redis Command [Source]
HASHHSET--NA--
LIST RPUSH, LPUSH --NA--
SETSADD--NA--
PUBSUBPUBLISH--NA--
STRINGSET--NA--
HYPER_LOG_LOGPFADD--NA--
SORTED_SETZADD--NA--

More about Redis can be found here.