Flink is compatible with Apache Hadoop MapReduce interfaces and therefore allows reusing code that was implemented for Hadoop MapReduce.
You can:
Writable
data types in Flink programs.InputFormat
as a DataSource.OutputFormat
as a DataSink.Mapper
as FlatMapFunction.Reducer
as GroupReduceFunction.This document shows how to use existing Hadoop MapReduce code with Flink. Please refer to the Connecting to other systems guide for reading from Hadoop supported file systems.
Support for Haddop input/output formats is part of the flink-java
and
flink-scala
Maven modules that are always required when writing Flink jobs.
The code is located in org.apache.flink.api.java.hadoop
and
org.apache.flink.api.scala.hadoop
in an additional sub-package for the
mapred
and mapreduce
API.
Support for Hadoop Mappers and Reducers is contained in the flink-hadoop-compatibility
Maven module.
This code resides in the org.apache.flink.hadoopcompatibility
package.
Add the following dependency to your pom.xml
if you want to reuse Mappers
and Reducers.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.10</artifactId>
<version>1.2-SNAPSHOT</version>
</dependency>
Flink supports all Hadoop Writable
and WritableComparable
data types
out-of-the-box. You do not need to include the Hadoop Compatibility dependency,
if you only want to use your Hadoop data types. See the
Programming Guide for more details.
Hadoop input formats can be used to create a data source by using
one of the methods readHadoopFile
or createHadoopInput
of the
ExecutionEnvironment
. The former is used for input formats derived
from FileInputFormat
while the latter has to be used for general purpose
input formats.
The resulting DataSet
contains 2-tuples where the first field
is the key and the second field is the value retrieved from the Hadoop
InputFormat.
The following example shows how to use Hadoop’s TextInputFormat
.
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<LongWritable, Text>> input =
env.readHadoopFile(new TextInputFormat(), LongWritable.class, Text.class, textPath);
// Do something with the data.
[...]
val env = ExecutionEnvironment.getExecutionEnvironment
val input: DataSet[(LongWritable, Text)] =
env.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath)
// Do something with the data.
[...]
Flink provides a compatibility wrapper for Hadoop OutputFormats
. Any class
that implements org.apache.hadoop.mapred.OutputFormat
or extends
org.apache.hadoop.mapreduce.OutputFormat
is supported.
The OutputFormat wrapper expects its input data to be a DataSet containing
2-tuples of key and value. These are to be processed by the Hadoop OutputFormat.
The following example shows how to use Hadoop’s TextOutputFormat
.
// Obtain the result we want to emit
DataSet<Tuple2<Text, IntWritable>> hadoopResult = [...]
// Set up the Hadoop TextOutputFormat.
HadoopOutputFormat<Text, IntWritable> hadoopOF =
// create the Flink wrapper.
new HadoopOutputFormat<Text, IntWritable>(
// set the Hadoop OutputFormat and specify the job.
new TextOutputFormat<Text, IntWritable>(), job
);
hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
TextOutputFormat.setOutputPath(job, new Path(outputPath));
// Emit data using the Hadoop TextOutputFormat.
hadoopResult.output(hadoopOF);
// Obtain your result to emit.
val hadoopResult: DataSet[(Text, IntWritable)] = [...]
val hadoopOF = new HadoopOutputFormat[Text,IntWritable](
new TextOutputFormat[Text, IntWritable],
new JobConf)
hadoopOF.getJobConf.set("mapred.textoutputformat.separator", " ")
FileOutputFormat.setOutputPath(hadoopOF.getJobConf, new Path(resultPath))
hadoopResult.output(hadoopOF)
Hadoop Mappers are semantically equivalent to Flink’s FlatMapFunctions and Hadoop Reducers are equivalent to Flink’s GroupReduceFunctions. Flink provides wrappers for implementations of Hadoop MapReduce’s Mapper
and Reducer
interfaces, i.e., you can reuse your Hadoop Mappers and Reducers in regular Flink programs. At the moment, only the Mapper and Reduce interfaces of Hadoop’s mapred API (org.apache.hadoop.mapred
) are supported.
The wrappers take a DataSet<Tuple2<KEYIN,VALUEIN>>
as input and produce a DataSet<Tuple2<KEYOUT,VALUEOUT>>
as output where KEYIN
and KEYOUT
are the keys and VALUEIN
and VALUEOUT
are the values of the Hadoop key-value pairs that are processed by the Hadoop functions. For Reducers, Flink offers a wrapper for a GroupReduceFunction with (HadoopReduceCombineFunction
) and without a Combiner (HadoopReduceFunction
). The wrappers accept an optional JobConf
object to configure the Hadoop Mapper or Reducer.
Flink’s function wrappers are
org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction
,org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction
, andorg.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction
.and can be used as regular Flink FlatMapFunctions or GroupReduceFunctions.
The following example shows how to use Hadoop Mapper
and Reducer
functions.
// Obtain data to process somehow.
DataSet<Tuple2<Text, LongWritable>> text = [...]
DataSet<Tuple2<Text, LongWritable>> result = text
// use Hadoop Mapper (Tokenizer) as MapFunction
.flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(
new Tokenizer()
))
.groupBy(0)
// use Hadoop Reducer (Counter) as Reduce- and CombineFunction
.reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(
new Counter(), new Counter()
));
Please note: The Reducer wrapper works on groups as defined by Flink’s groupBy() operation. It does not consider any custom partitioners, sort or grouping comparators you might have set in the JobConf
.
The following example shows a complete WordCount implementation using Hadoop data types, Input- and OutputFormats, and Mapper and Reducer implementations.
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Set up the Hadoop TextInputFormat.
Job job = Job.getInstance();
HadoopInputFormat<LongWritable, Text> hadoopIF =
new HadoopInputFormat<LongWritable, Text>(
new TextInputFormat(), LongWritable.class, Text.class, job
);
TextInputFormat.addInputPath(job, new Path(inputPath));
// Read data using the Hadoop TextInputFormat.
DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopIF);
DataSet<Tuple2<Text, LongWritable>> result = text
// use Hadoop Mapper (Tokenizer) as MapFunction
.flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(
new Tokenizer()
))
.groupBy(0)
// use Hadoop Reducer (Counter) as Reduce- and CombineFunction
.reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(
new Counter(), new Counter()
));
// Set up the Hadoop TextOutputFormat.
HadoopOutputFormat<Text, IntWritable> hadoopOF =
new HadoopOutputFormat<Text, IntWritable>(
new TextOutputFormat<Text, IntWritable>(), job
);
hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
TextOutputFormat.setOutputPath(job, new Path(outputPath));
// Emit data using the Hadoop TextOutputFormat.
result.output(hadoopOF);
// Execute Program
env.execute("Hadoop WordCount");