All transformations in Flink may look like functions (in the functional processing terminology), but
are in fact stateful operators. You can make every transformation (map
, filter
, etc) stateful
by using Flink’s state interface or checkpointing instance fields of your function. You can register
any instance field
as managed state by implementing an interface. In this case, and also in the case of using
Flink’s native state interface, Flink will automatically take consistent snapshots of your state
periodically, and restore its value in the case of a failure.
The end effect is that updates to any form of state are the same under failure-free execution and execution under failures.
First, we look at how to make instance fields consistent under failures, and then we look at Flink’s state interface.
By default state checkpoints will be stored in-memory at the JobManager. For proper persistence of large
state, Flink supports storing the checkpoints on file systems (HDFS, S3, or any mounted POSIX file system),
which can be configured in the flink-conf.yaml
or via StreamExecutionEnvironment.setStateBackend(…)
.
See state backends for information
about the available state backends and how to configure them.
Flink has a checkpointing mechanism that recovers streaming jobs after failures. The checkpointing mechanism requires a persistent (or durable) source that can be asked for prior records again (Apache Kafka is a good example of such a source).
The checkpointing mechanism stores the progress in the data sources and data sinks, the state of windows, as well as the user-defined state (see Working with State) consistently to provide exactly once processing semantics. Where the checkpoints are stored (e.g., JobManager memory, file system, database) depends on the configured state backend.
The docs on streaming fault tolerance describe in detail the technique behind Flink’s streaming fault tolerance mechanism.
By default, checkpointing is disabled. To enable checkpointing, call enableCheckpointing(n)
on the StreamExecutionEnvironment
, where n is the checkpoint interval in milliseconds.
Other parameters for checkpointing include:
Number of retries: The setNumberOfExecutionRerties()
method defines how many times the job is restarted after a failure.
When checkpointing is activated, but this value is not explicitly set, the job is restarted infinitely often.
exactly-once vs. at-least-once: You can optionally pass a mode to the enableCheckpointing(n)
method to choose between the two guarantee levels.
Exactly-once is preferrable for most applications. At-least-once may be relevant for certain super-low-latency (consistently few milliseconds) applications.
number of concurrent checkpoints: By default, the system will not trigger another checkpoint while one is still in progress. This ensures that the topology does not spend too much time on checkpoints and not make progress with processing the streams. It is possible to allow for multiple overlapping checkpoints, which is interesting for pipelines that have a certain processing delay (for example because the functions call external services that need some time to respond) but that still want to do very frequent checkpoints (100s of milliseconds) to re-process very little upon failures.
checkpoint timeout: The time after which a checkpoint-in-progress is aborted, if it did not complete by then.
The Key/Value state interface provides access to different types of state that are all scoped to
the key of the current input element. This means that this type of state can only be used
on a KeyedStream
, which can be created via stream.keyBy(…)
.
Now, we will first look at the different types of state available and then we will see how they can be used in a program. The available state primitives are:
ValueState<T>
: This keeps a value that can be updated and
retrieved (scoped to key of the input element, mentioned above, so there will possibly be one value
for each key that the operation sees). The value can be set using update(T)
and retrieved using
T value()
.
ListState<T>
: This keeps a list of elements. You can append elements and retrieve an Iterable
over all currently stored elements. Elements are added using add(T)
, the Iterable can
be retrieved using Iterable<T> get()
.
ReducingState<T>
: This keeps a single value that represents the aggregation of all values
added to the state. The interface is the same as for ListState
but elements added using
add(T)
are reduced to an aggregate using a specified ReduceFunction
.
All types of state also have a method clear()
that clears the state for the currently
active key (i.e. the key of the input element).
It is important to keep in mind that these state objects are only used for interfacing with state. The state is not necessarily stored inside but might reside on disk or somewhere else. The second thing to keep in mind is that the value you get from the state depends on the key of the input element. So the value you get in one invocation of your user function can differ from the value in another invocation if the keys involved are different.
To get a state handle you have to create a StateDescriptor
. This holds the name of the state
(as we will later see you can create several states, and they have to have unique names so
that you can reference them), the type of the values that the state holds, and possibly
a user-specified function, such as a ReduceFunction
. Depending on what type of state you
want to retrieve, you create either a ValueStateDescriptor
, a ListStateDescriptor
or
a ReducingStateDescriptor
.
State is accessed using the RuntimeContext
, so it is only possible in rich functions.
Please see here for
information about that, but we will also see an example shortly. The RuntimeContext
that
is available in a RichFunction
has these methods for accessing state:
ValueState<T> getState(ValueStateDescriptor<T>)
ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
ListState<T> getListState(ListStateDescriptor<T>)
This is an example FlatMapFunction
that shows how all of the parts fit together:
This example implements a poor man’s counting window. We key the tuples by the first field
(in the example all have the same key 1
). The function stores the count and a running sum in
a ValueState
. Once the count reaches 2 it will emit the average and clear the state so that
we start over from 0
. Note that this would keep a different state value for each different input
key if we had tuples with different values in the first field.
In addition to the interface described above, the Scala API has shortcuts for stateful
map()
or flatMap()
functions with a single ValueState
on KeyedStream
. The user function
gets the current value of the ValueState
in an Option
and must return an updated value that
will be used to update the state.
Instance fields can be checkpointed by using the Checkpointed
interface.
When the user-defined function implements the Checkpointed
interface, the snapshotState(…)
and restoreState(…)
methods will be executed to draw and restore function state.
In addition to that, user functions can also implement the CheckpointNotifier
interface to receive notifications on
completed checkpoints via the notifyCheckpointComplete(long checkpointId)
method.
Note that there is no guarantee for the user function to receive a notification if a failure happens between
checkpoint completion and notification. The notifications should hence be treated in a way that notifications from
later checkpoints can subsume missing notifications.
The above example for ValueState
can be implemented using instance fields like this:
Stateful sources require a bit more care as opposed to other operators. In order to make the updates to the state and output collection atomic (required for exactly-once semantics on failure/recovery), the user is required to get a lock from the source’s context.
Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the flink.streaming.api.checkpoint.CheckpointNotifier
interface.
Flink currently only provides processing guarantees for jobs without iterations. Enabling checkpointing on an iterative job causes an exception. In order to force checkpointing on an iterative program the user needs to set a special flag when enabling checkpointing: env.enableCheckpointing(interval, force = true)
.
Please note that records in flight in the loop edges (and the state changes associated with them) will be lost during failure.