This section is relevant for program running on Event Time. For an introduction to Event Time,
Processing Time, and Ingestion Time, please refer to the event time introduction
To work with Event Time, streaming programs need to set the time characteristic accordingly.
Assigning Timestamps
In order to work with Event Time, Flink needs to know the events’ timestamps, meaning each element in the
stream needs to get its event timestamp assigned. That happens usually by accessing/extracting the
timestamp from some field in the element.
Timestamp assignment goes hand-in-hand with generating watermarks, which tell the system about
the progress in event time.
There are two ways to assign timestamps and generate Watermarks:
Directly in the data stream source
Via a timestamp assigner / watermark generator: in Flink timestamp assigners also define the watermarks to be emitted
Attention Both timestamps and watermarks are specified as
millliseconds since the Java epoch of 1970-01-01T00:00:00Z.
Source Functions with Timestamps and Watermarks
Stream sources can also directly assign timestamps to the elements they produce and emit Watermarks. In that case,
no Timestamp Assigner is needed.
To assign a timestamp to an element in the source directly, the source must use the collectWithTimestamp(...)
method on the SourceContext. To generate Watermarks, the source must call the emitWatermark(Watermark) function.
Below is a simple example of a source (non-checkpointed) that assigns timestamps and generates Watermarks
depending on special events:
Note: If the streaming program uses a TimestampAssigner on a stream where elements have a timestamp already,
those timestamps will be overwritten by the TimestampAssigner. Similarly, Watermarks will be overwritten as well.
Timestamp Assigners / Watermark Generators
Timestamp Assigners take a stream and produce a new stream with timestamped elements and watermarks. If the
original stream had timestamps and/or watermarks already, the timestamp assigner overwrites them.
The timestamp assigners usually are specified immediately after the data source but it is not strictly required to do so.
A common pattern is, for example, to parse (MapFunction) and filter (FilterFunction) before the timestamp assigner.
In any case, the timestamp assigner needs to be specified before the first operation on event time
(such as the first window operation). As a special case, when using Kafka as the source of a streaming job,
Flink allows the specification of a timestamp assigner / watermark emitter inside
the source (or consumer) itself. More information on how to do so can be found in the
Kafka Connector documentation.
NOTE: The remainder of this section presents the main interfaces a programmer has
to implement in order to create her own timestamp extractors/watermark emitters.
To see the pre-implemented extractors that ship with Flink, please refer to the
Pre-defined Timestamp Extractors / Watermark Emitters page.
With Periodic Watermarks
The AssignerWithPeriodicWatermarks assigns timestamps and generates watermarks periodically (possibly depending
on the stream elements, or purely based on processing time).
The interval (every n milliseconds) in which the watermark will be generated is defined via
ExecutionConfig.setAutoWatermarkInterval(...). Each time, the assigner’s getCurrentWatermark() method will be
called, and a new Watermark will be emitted, if the returned Watermark is non-null and larger than the previous
Watermark.
Two simple examples of timestamp assigners with periodic watermark generation are below.
With Punctuated Watermarks
To generate Watermarks whenever a certain event indicates that a new watermark can be generated, use the
AssignerWithPunctuatedWatermarks. For this class, Flink will first call the extractTimestamp(...) method
to assign the element a timestamp, and then immediately call for that element the
checkAndGetNextWatermark(...) method.
The checkAndGetNextWatermark(...) method gets the timestamp that was assigned in the extractTimestamp(...)
method, and can decide whether it wants to generate a Watermark. Whenever the checkAndGetNextWatermark(...)
method returns a non-null Watermark, and that Watermark is larger than the latest previous Watermark, that
new Watermark will be emitted.
Note: It is possible to generate a watermark on every single event. However, because each watermark causes some
computation downstream, an excessive number of watermarks slows down performance.