Flink uses a concept called windows to divide a (potentially) infinite DataStream
into finite
slices based on the timestamps of elements or other criteria. This division is required when working
with infinite streams of data and performing transformations that aggregate elements.
Info We will mostly talk about keyed windowing here, i.e.
windows that are applied on a KeyedStream
. Keyed windows have the advantage that elements are
subdivided based on both window and key before being given to
a user function. The work can thus be distributed across the cluster
because the elements for different keys can be processed independently. If you absolutely have to,
you can check out non-keyed windowing where we describe how non-keyed
windows work.
For a windowed transformation you must at least specify a key (see specifying keys), a window assigner and a window function. The key divides the infinite, non-keyed, stream into logical keyed streams while the window assigner assigns elements to finite per-key windows. Finally, the window function is used to process the elements of each window.
The basic structure of a windowed transformation is thus as follows:
We will cover window assigners in a separate section below.
The window transformation can be one of reduce()
, fold()
or apply()
. Which respectively
takes a ReduceFunction
, FoldFunction
or WindowFunction
. We describe each of these ways
of specifying a windowed transformation in detail below: window functions.
For more advanced use cases you can also specify a Trigger
that determines when exactly a window
is being considered as ready for processing. These will be covered in more detail in
triggers.
The window assigner specifies how elements of the stream are divided into finite slices. Flink comes
with pre-implemented window assigners for the most typical use cases, namely tumbling windows,
sliding windows, session windows and global windows, but you can implement your own by
extending the WindowAssigner
class. All the built-in window assigners, except for the global
windows one, assign elements to windows based on time, which can either be processing time or event
time. Please take a look at our section on event time for more
information about how Flink deals with time.
Let’s first look at how each of these window assigners works before looking at how they can be used in a Flink program. We will be using abstract figures to visualize the workings of each assigner: in the following, the purple circles are elements of the stream, they are partitioned by some key (in this case user 1, user 2 and user 3) and the x-axis shows the progress of time.
Global windows are a way of specifying that we don’t want to subdivide our elements into windows. Each element is assigned to one single per-key global window. This windowing scheme is only useful if you also specify a custom trigger. Otherwise, no computation is ever going to be performed, as the global window does not have a natural end at which we could process the aggregated elements.
A tumbling windows assigner assigns elements to fixed length, non-overlapping windows of a specified window size. For example, if you specify a window size of 5 minutes, the window function will get 5 minutes worth of elements in each invocation.
The sliding windows assigner assigns elements to windows of fixed length equal to window size, as the tumbling windows assigner, but in this case, windows can be overlapping. The size of the overlap is defined by the user-specified parameter window slide. As windows are overlapping, an element can be assigned to multiple windows
For example, you could have windows of size 10 minutes that slide by 5 minutes. With this you get 10 minutes worth of elements in each invocation of the window function and it will be invoked for every 5 minutes of data.
The session windows assigner is ideal for cases where the window boundaries need to adjust to the incoming data. Both the tumbling windows and sliding windows assigner assign elements to windows that start at fixed time points and have a fixed window size. With session windows it is possible to have windows that start at individual points in time for each key and that end once there has been a certain period of inactivity. The configuration parameter is the session gap that specifies how long to wait for new data before considering a session as closed.
The built-in window assigners (except GlobalWindows
) come in two versions. One for processing-time
windowing and one for event-time windowing. The processing-time assigners assign elements to
windows based on the current clock of the worker machines while the event-time assigners assign
windows based on the timestamps of elements. Please have a look at
event time to learn about the difference between processing time
and event time and about how timestamps can be assigned to elements.
The following code snippets show how each of the window assigners can be used in a program:
Note, how we can specify a time interval by using one of Time.milliseconds(x)
, Time.seconds(x)
,
Time.minutes(x)
, and so on.
The time-based window assigners also take an optional offset
parameter that can be used to
change the alignment of windows. For example, without offsets hourly windows are aligned
with epoch, that is you will get windows such as 1:00 - 1:59
, 2:00 - 2:59
and so on. If you
want to change that you can give an offset. With an offset of 15 minutes you would, for example,
get 1:15 - 2:14
, 2:15 - 3:14
etc. Another important use case for offsets is when you
want to have daily windows and live in a timezone other than UTC-0. For example, in China
you would have to specify an offset of Time.hours(-8)
.
This example shows how an offset can be specified for tumbling event time windows (the other windows work accordingly):
The window function is used to process the elements of each window (and key) once the system determines that a window is ready for processing (see triggers for how the system determines when a window is ready).
The window function can be one of ReduceFunction
, FoldFunction
or WindowFunction
. The first
two can be executed more efficiently because Flink can incrementally aggregate the elements for each
window as they arrive. A WindowFunction
gets an Iterable
for all the elements contained in a
window and additional meta information about the window to which the elements belong.
A windowed transformation with a WindowFunction
cannot be executed as efficiently as the other
cases because Flink has to buffer all elements for a window internally before invoking the function.
This can be mitigated by combining a WindowFunction
with a ReduceFunction
or FoldFunction
to
get both incremental aggregation of window elements and the additional information that the
WindowFunction
receives. We will look at examples for each of these variants.
A reduce function specifies how two values can be combined to form one element. Flink can use this to incrementally aggregate the elements in a window.
A ReduceFunction
can be used in a program like this:
A ReduceFunction
specifies how two elements from the input can be combined to produce
an output element. This example will sum up the second field of the tuple for all elements
in a window.
A fold function can be specified like this:
A FoldFunction
specifies how elements from the input will be added to an initial
accumulator value (""
, the empty string, in our example). This example will compute
a concatenation of all the Long
fields of the input.
Using a WindowFunction
provides the most flexibility, at the cost of performance. The reason for this
is that elements cannot be incrementally aggregated for a window and instead need to be buffered
internally until the window is considered ready for processing. A WindowFunction
gets an
Iterable
containing all the elements of the window being processed. The signature of
WindowFunction
is this:
Here we show an example that uses a WindowFunction
to count the elements in a window. We do this
because we want to access information about the window itself to emit it along with the count.
This is very inefficient, however, and should be implemented with a
ReduceFunction
in practice. Below, we will see an example of how a ReduceFunction
can
be combined with a WindowFunction
to get both incremental aggregation and the added
information of a WindowFunction
.
A WindowFunction
can be combined with either a ReduceFunction
or a FoldFunction
to
incrementally aggregate elements as they arrive in the window.
When the window is closed, the WindowFunction
will be provided with the aggregated result.
This allows to incrementally compute windows while having access to the
additional window meta information of the WindowFunction
.
The following example shows how an incremental FoldFunction
can be combined with
a WindowFunction
to extract the number of events in the window and return also
the key and end time of the window.
The following example shows how an incremental ReduceFunction
can be combined with
a WindowFunction
to return the smallest event in a window along
with the start time of the window.
When working with event-time windowing it can happen that elements arrive late, i.e the watermark that Flink uses to keep track of the progress of event-time is already past the end timestamp of a window to which an element belongs. Please see event time and especially late elements for a more thorough discussion of how Flink deals with event time.
You can specify how a windowed transformation should deal with late elements and how much lateness is allowed. The parameter for this is called allowed lateness. This specifies by how much time elements can be late. Elements that arrive within the allowed lateness are still put into windows and are considered when computing window results. If elements arrive after the allowed lateness they will be dropped. Flink will also make sure that any state held by the windowing operation is garbage collected once the watermark passes the end of a window plus the allowed lateness.
Default By default, the allowed lateness is set to
0
. That is, elements that arrive behind the watermark will be dropped.
You can specify an allowed lateness like this:
Note When using the GlobalWindows
window assigner no
data is ever considered late because the end timestamp of the global window is Long.MAX_VALUE
.
A Trigger
determines when a window (as assigned by the WindowAssigner
) is ready for being
processed by the window function. The trigger observes how elements are added to windows
and can also keep track of the progress of processing time and event time. Once a trigger
determines that a window is ready for processing, it fires. This is the signal for the
window operation to take the elements that are currently in the window and pass them along to
the window function to produce output for the firing window.
Each WindowAssigner
(except GlobalWindows
) comes with a default trigger that should be
appropriate for most use cases. For example, TumblingEventTimeWindows
has an EventTimeTrigger
as
default trigger. This trigger simply fires once the watermark passes the end of a window.
You can specify the trigger to be used by calling trigger()
with a given Trigger
. The
whole specification of the windowed transformation would then look like this:
Flink comes with a few triggers out-of-box: there is the already mentioned EventTimeTrigger
that
fires based on the progress of event-time as measured by the watermark, the ProcessingTimeTrigger
does the same but based on processing time and the CountTrigger
fires once the number of elements
in a window exceeds the given limit.
Attention By specifying a trigger using trigger()
you
are overwriting the default trigger of a WindowAssigner
. For example, if you specify a
CountTrigger
for TumblingEventTimeWindows
you will no longer get window firings based on the
progress of time but only by count. Right now, you have to write your own custom trigger if
you want to react based on both time and count.
The internal Trigger
API is still considered experimental but you can check out the code
if you want to write your own custom trigger:
Trigger.java.
You can also leave out the keyBy()
when specifying a windowed transformation. This means, however,
that Flink cannot process windows for different keys in parallel, essentially turning the
transformation into a non-parallel operation.
Warning As mentioned in the introduction, non-keyed windows have the disadvantage that work cannot be distributed in the cluster because windows cannot be computed independently per key. This can have severe performance implications.
The basic structure of a non-keyed windowed transformation is as follows: