This section describes how the parallel execution of programs can be configured in Flink. A Flink
program consists of multiple tasks (transformations/operators, data sources, and sinks). A task is split into
several parallel instances for execution and each parallel instance processes a subset of the task’s
input data. The number of parallel instances of a task is called its parallelism.
The parallelism of a task can be specified in Flink on different levels.
Operator Level
The parallelism of an individual operator, data source, or data sink can be defined by calling its
setParallelism() method. For example, like this:
Execution Environment Level
As mentioned here Flink programs are executed in the context
of an execution environment. An
execution environment defines a default parallelism for all operators, data sources, and data sinks
it executes. Execution environment parallelism can be overwritten by explicitly configuring the
parallelism of an operator.
The default parallelism of an execution environment can be specified by calling the
setParallelism() method. To execute all operators, data sources, and data sinks with a parallelism
of 3, set the default parallelism of the execution environment as follows:
Client Level
The parallelism can be set at the Client when submitting jobs to Flink. The
Client can either be a Java or a Scala program. One example of such a Client is
Flink’s Command-line Interface (CLI).
For the CLI client, the parallelism parameter can be specified with -p. For
example:
./bin/flink run -p 10 ../examples/*WordCount-java*.jar
In a Java/Scala program, the parallelism is set as follows:
System Level
A system-wide default parallelism for all execution environments can be defined by setting the
parallelism.default property in ./conf/flink-conf.yaml. See the
Configuration documentation for details.