Register a custom serializer for your Flink program

If you use a custom type in your Flink program which cannot be serialized by the Flink type serializer, Flink falls back to using the generic Kryo serializer. You may register your own serializer or a serialization system like Google Protobuf or Apache Thrift with Kryo. To do that, simply register the type class and the serializer in the ExecutionConfig of your Flink program.

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// register the class of the serializer as serializer for a type
env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, MyCustomSerializer.class);

// register an instance as serializer for a type
MySerializer mySerializer = new MySerializer();
env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, mySerializer);

Note that your custom serializer has to extend Kryo’s Serializer class. In the case of Google Protobuf or Apache Thrift, this has already been done for you:

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// register the Google Protobuf serializer with Kryo
env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, ProtobufSerializer.class);

// register the serializer included with Apache Thrift as the standard serializer
// TBaseSerializer states it should be initialized as a default Kryo serializer
env.getConfig().addDefaultKryoSerializer(MyCustomType.class, TBaseSerializer.class);

For the above example to work, you need to include the necessary dependencies in your Maven project file (pom.xml). In the dependency section, add the following for Apache Thrift:

<dependency>
	<groupId>com.twitter</groupId>
	<artifactId>chill-thrift</artifactId>
	<version>0.5.2</version>
</dependency>
<!-- libthrift is required by chill-thrift -->
<dependency>
	<groupId>org.apache.thrift</groupId>
	<artifactId>libthrift</artifactId>
	<version>0.6.1</version>
	<exclusions>
		<exclusion>
			<groupId>javax.servlet</groupId>
			<artifactId>servlet-api</artifactId>
		</exclusion>
		<exclusion>
			<groupId>org.apache.httpcomponents</groupId>
			<artifactId>httpclient</artifactId>
		</exclusion>
	</exclusions>
</dependency>

For Google Protobuf you need the following Maven dependency:

<dependency>
	<groupId>com.twitter</groupId>
	<artifactId>chill-protobuf</artifactId>
	<version>0.5.2</version>
</dependency>
<!-- We need protobuf for chill-protobuf -->
<dependency>
	<groupId>com.google.protobuf</groupId>
	<artifactId>protobuf-java</artifactId>
	<version>2.5.0</version>
</dependency>

Please adjust the versions of both libraries as needed.