State & Checkpointing

All transformations in Flink may look like functions (in the functional processing terminology), but are in fact stateful operators. You can make every transformation (map, filter, etc) stateful by using Flink’s state interface or checkpointing instance fields of your function. You can register any instance field as managed state by implementing an interface. In this case, and also in the case of using Flink’s native state interface, Flink will automatically take consistent snapshots of your state periodically, and restore its value in the case of a failure.

The end effect is that updates to any form of state are the same under failure-free execution and execution under failures.

First, we look at how to make instance fields consistent under failures, and then we look at Flink’s state interface.

By default state checkpoints will be stored in-memory at the JobManager. For proper persistence of large state, Flink supports storing the checkpoints on file systems (HDFS, S3, or any mounted POSIX file system), which can be configured in the flink-conf.yaml or via StreamExecutionEnvironment.setStateBackend(…). See state backends for information about the available state backends and how to configure them.

Enabling Checkpointing

Flink has a checkpointing mechanism that recovers streaming jobs after failures. The checkpointing mechanism requires a persistent (or durable) source that can be asked for prior records again (Apache Kafka is a good example of such a source).

The checkpointing mechanism stores the progress in the data sources and data sinks, the state of windows, as well as the user-defined state (see Working with State) consistently to provide exactly once processing semantics. Where the checkpoints are stored (e.g., JobManager memory, file system, database) depends on the configured state backend.

The docs on streaming fault tolerance describe in detail the technique behind Flink’s streaming fault tolerance mechanism.

By default, checkpointing is disabled. To enable checkpointing, call enableCheckpointing(n) on the StreamExecutionEnvironment, where n is the checkpoint interval in milliseconds.

Other parameters for checkpointing include:

  • Number of retries: The setNumberOfExecutionRerties() method defines how many times the job is restarted after a failure. When checkpointing is activated, but this value is not explicitly set, the job is restarted infinitely often.

  • exactly-once vs. at-least-once: You can optionally pass a mode to the enableCheckpointing(n) method to choose between the two guarantee levels. Exactly-once is preferrable for most applications. At-least-once may be relevant for certain super-low-latency (consistently few milliseconds) applications.

  • number of concurrent checkpoints: By default, the system will not trigger another checkpoint while one is still in progress. This ensures that the topology does not spend too much time on checkpoints and not make progress with processing the streams. It is possible to allow for multiple overlapping checkpoints, which is interesting for pipelines that have a certain processing delay (for example because the functions call external services that need some time to respond) but that still want to do very frequent checkpoints (100s of milliseconds) to re-process very little upon failures.

  • checkpoint timeout: The time after which a checkpoint-in-progress is aborted, if it did not complete by then.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);

// advanced options:

// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);

// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
val env = StreamExecutionEnvironment.getExecutionEnvironment()

// start a checkpoint every 1000 ms
env.enableCheckpointing(1000)

// advanced options:

// set mode to exactly-once (this is the default)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig.setCheckpointTimeout(60000)

// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

Back to top

Using the Key/Value State Interface

The Key/Value state interface provides access to different types of state that are all scoped to the key of the current input element. This means that this type of state can only be used on a KeyedStream, which can be created via stream.keyBy(…).

Now, we will first look at the different types of state available and then we will see how they can be used in a program. The available state primitives are:

  • ValueState<T>: This keeps a value that can be updated and retrieved (scoped to key of the input element, mentioned above, so there will possibly be one value for each key that the operation sees). The value can be set using update(T) and retrieved using T value().

  • ListState<T>: This keeps a list of elements. You can append elements and retrieve an Iterable over all currently stored elements. Elements are added using add(T), the Iterable can be retrieved using Iterable<T> get().

  • ReducingState<T>: This keeps a single value that represents the aggregation of all values added to the state. The interface is the same as for ListState but elements added using add(T) are reduced to an aggregate using a specified ReduceFunction.

All types of state also have a method clear() that clears the state for the currently active key (i.e. the key of the input element).

It is important to keep in mind that these state objects are only used for interfacing with state. The state is not necessarily stored inside but might reside on disk or somewhere else. The second thing to keep in mind is that the value you get from the state depends on the key of the input element. So the value you get in one invocation of your user function can differ from the value in another invocation if the keys involved are different.

To get a state handle you have to create a StateDescriptor. This holds the name of the state (as we will later see you can create several states, and they have to have unique names so that you can reference them), the type of the values that the state holds, and possibly a user-specified function, such as a ReduceFunction. Depending on what type of state you want to retrieve, you create either a ValueStateDescriptor, a ListStateDescriptor or a ReducingStateDescriptor.

State is accessed using the RuntimeContext, so it is only possible in rich functions. Please see here for information about that, but we will also see an example shortly. The RuntimeContext that is available in a RichFunction has these methods for accessing state:

  • ValueState<T> getState(ValueStateDescriptor<T>)
  • ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
  • ListState<T> getListState(ListStateDescriptor<T>)

This is an example FlatMapFunction that shows how all of the parts fit together:

public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

    /**
     * The ValueState handle. The first field is the count, the second field a running sum.
     */
    private transient ValueState<Tuple2<Long, Long>> sum;

    @Override
    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {

        // access the state value
        Tuple2<Long, Long> currentSum = sum.value();

        // update the count
        currentSum.f0 += 1;

        // add the second field of the input value
        currentSum.f1 += input.f1;

        // update the state
        sum.update(currentSum);

        // if the count reaches 2, emit the average and clear the state
        if (currentSum.f0 >= 2) {
            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
            sum.clear();
        }
    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "average", // the state name
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
                        Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
        sum = getRuntimeContext().getState(descriptor);
    }
}

// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
        .keyBy(0)
        .flatMap(new CountWindowAverage())
        .print();

// the printed output will be (1,4) and (1,5)

This example implements a poor man’s counting window. We key the tuples by the first field (in the example all have the same key 1). The function stores the count and a running sum in a ValueState. Once the count reaches 2 it will emit the average and clear the state so that we start over from 0. Note that this would keep a different state value for each different input key if we had tuples with different values in the first field.

State in the Scala DataStream API

In addition to the interface described above, the Scala API has shortcuts for stateful map() or flatMap() functions with a single ValueState on KeyedStream. The user function gets the current value of the ValueState in an Option and must return an updated value that will be used to update the state.

val stream: DataStream[(String, Int)] = ...

val counts: DataStream[(String, Int)] = stream
  .keyBy(_._1)
  .mapWithState((in: (String, Int), count: Option[Int]) =>
    count match {
      case Some(c) => ( (in._1, c), Some(c + in._2) )
      case None => ( (in._1, 0), Some(in._2) )
    })

Checkpointing Instance Fields

Instance fields can be checkpointed by using the Checkpointed interface.

When the user-defined function implements the Checkpointed interface, the snapshotState(…) and restoreState(…) methods will be executed to draw and restore function state.

In addition to that, user functions can also implement the CheckpointNotifier interface to receive notifications on completed checkpoints via the notifyCheckpointComplete(long checkpointId) method. Note that there is no guarantee for the user function to receive a notification if a failure happens between checkpoint completion and notification. The notifications should hence be treated in a way that notifications from later checkpoints can subsume missing notifications.

The above example for ValueState can be implemented using instance fields like this:

public class CountWindowAverage
        extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
        implements Checkpointed<Tuple2<Long, Long>> {

    private Tuple2<Long, Long> sum = null;

    @Override
    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {

        // update the count
        sum.f0 += 1;

        // add the second field of the input value
        sum.f1 += input.f1;


        // if the count reaches 2, emit the average and clear the state
        if (sum.f0 >= 2) {
            out.collect(new Tuple2<>(input.f0, sum.f1 / sum.f0));
            sum = Tuple2.of(0L, 0L);
        }
    }

    @Override
    public void open(Configuration config) {
        if (sum == null) {
            // only recreate if null
            // restoreState will be called before open()
            // so this will already set the sum to the restored value
            sum = Tuple2.of(0L, 0L);
        }
    }

    // regularly persists state during normal operation
    @Override
    public Serializable snapshotState(long checkpointId, long checkpointTimestamp) {
        return sum;
    }

    // restores state on recovery from failure
    @Override
    public void restoreState(Tuple2<Long, Long> state) {
        sum = state;
    }
}

Stateful Source Functions

Stateful sources require a bit more care as opposed to other operators. In order to make the updates to the state and output collection atomic (required for exactly-once semantics on failure/recovery), the user is required to get a lock from the source’s context.

public static class CounterSource
        extends RichParallelSourceFunction<Long>
        implements Checkpointed<Long> {

    /**  current offset for exactly once semantics */
    private long offset;

    /** flag for job cancellation */
    private volatile boolean isRunning = true;

    @Override
    public void run(SourceContext<Long> ctx) {
        final Object lock = ctx.getCheckpointLock();

        while (isRunning) {
            // output and state update are atomic
            synchronized (lock) {
                ctx.collect(offset);
                offset += 1;
            }
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }

    @Override
    public Long snapshotState(long checkpointId, long checkpointTimestamp) {
        return offset;

    }

    @Override
	public void restoreState(Long state) {
        offset = state;
    }
}

Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the flink.streaming.api.checkpoint.CheckpointNotifier interface.

State Checkpoints in Iterative Jobs

Flink currently only provides processing guarantees for jobs without iterations. Enabling checkpointing on an iterative job causes an exception. In order to force checkpointing on an iterative program the user needs to set a special flag when enabling checkpointing: env.enableCheckpointing(interval, force = true).

Please note that records in flight in the loop edges (and the state changes associated with them) will be lost during failure.

Back to top