Flink exposes a metric system that allows gathering and exposing metrics to external systems.
You can access the metric system from any user function that extends RichFunction by calling getRuntimeContext().getMetricGroup()
.
This method returns a MetricGroup
object on which you can create and register new metrics.
Flink supports Counters
, Gauges
, Histograms
and Meters
.
A Counter
is used to count something. The current value can be in- or decremented using inc()/inc(long n)
or dec()/dec(long n)
.
You can create and register a Counter
by calling counter(String name)
on a MetricGroup
.
Alternatively you can also use your own Counter
implementation:
A Gauge
provides a value of any type on demand. In order to use a Gauge
you must first create a class that implements the org.apache.flink.metrics.Gauge
interface.
There is no restriction for the type of the returned value.
You can register a gauge by calling gauge(String name, Gauge gauge)
on a MetricGroup
.
Note that reporters will turn the exposed object into a String
, which means that a meaningful toString()
implementation is required.
A Histogram
measures the distribution of long values.
You can register one by calling histogram(String name, Histogram histogram)
on a MetricGroup
.
Flink does not provide a default implementation for Histogram
, but offers a Wrapper that allows usage of Codahale/DropWizard histograms.
To use this wrapper add the following dependency in your pom.xml
:
You can then register a Codahale/DropWizard histogram like this:
A Meter
measures an average throughput. An occurrence of an event can be registered with the markEvent()
method. Occurrence of multiple events at the same time can be registered with markEvent(long n)
method.
You can register a meter by calling meter(String name, Meter meter)
on a MetricGroup
.
Flink offers a Wrapper that allows usage of Codahale/DropWizard meters.
To use this wrapper add the following dependency in your pom.xml
:
You can then register a Codahale/DropWizard meter like this:
Every metric is assigned an identifier under which it will be reported that is based on 3 components: the user-provided name when registering the metric, an optional user-defined scope and a system-provided scope.
For example, if A.B
is the sytem scope, C.D
the user scope and E
the name, then the identifier for the metric will be A.B.C.D.E
.
You can configure which delimiter to use for the identifier (default: .
) by setting the metrics.scope.delimiter
key in conf/flink-conf.yaml
.
You can define a user scope by calling either MetricGroup#addGroup(String name)
or MetricGroup#addGroup(int name)
.
The system scope contains context information about the metric, for example in which task it was registered or what job that task belongs to.
Which context information should be included can be configured by setting the following keys in conf/flink-conf.yaml
.
Each of these keys expect a format string that may contain constants (e.g. “taskmanager”) and variables (e.g. “<task_id>”) which will be replaced at runtime.
metrics.scope.jm
metrics.scope.jm.job
metrics.scope.tm
metrics.scope.tm.job
metrics.scope.task
metrics.scope.operator
There are no restrictions on the number or order of variables. Variables are case sensitive.
The default scope for operator metrics will result in an identifier akin to localhost.taskmanager.1234.MyJob.MyOperator.0.MyMetric
If you also want to include the task name but omit the task manager information you can specify the following format:
metrics.scope.operator: <host>.<job_name>.<task_name>.<operator_name>.<subtask_index>
This could create the identifier localhost.MyJob.MySource_->_MyOperator.MyOperator.0.MyMetric
.
Note that for this format string an identifier clash can occur should the same job be run multiple times concurrently, which can lead to inconsistent metric data. As such it is advised to either use format strings that provide a certain degree of uniqueness by including IDs (e.g <job_id>) or by assigning unique names to jobs and operators.
Metrics can be exposed to an external system by configuring one or several reporters in conf/flink-conf.yaml
.
metrics.reporters
: The list of named reporters.metrics.reporter.<name>.<config>
: Generic setting <config>
for the reporter named <name>
.metrics.reporter.<name>.class
: The reporter class to use for the reporter named <name>
.metrics.reporter.<name>.interval
: The reporter interval to use for the reporter named <name>
.metrics.reporter.<name>.scope.delimiter
: The delimiter to use for the identifier (default value use metrics.scope.delimiter
) for the reporter named <name>
.All reporters must at least have the class
property, some allow specifying a reporting interval
. Below,
we will list more settings specific to each reporter.
Example reporter configuration that specifies multiple reporters:
metrics.reporters: my_jmx_reporter,my_other_reporter
metrics.reporter.my_jmx_reporter.class: org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.my_jmx_reporter.port: 9020-9040
metrics.reporter.my_other_reporter.class: org.apache.flink.metrics.graphite.GraphiteReporter
metrics.reporter.my_other_reporter.host: 192.168.1.1
metrics.reporter.my_other_reporter.port: 10000
You can write your own Reporter
by implementing the org.apache.flink.metrics.reporter.MetricReporter
interface.
If the Reporter should send out reports regularly you have to implement the Scheduled
interface as well.
The following sections list the supported reporters.
You don’t have to include an additional dependency since the JMX reporter is available by default but not activated.
Parameters:
port
- the port on which JMX listens for connections. This can also be a port range. When a
range is specified the actual port is shown in the relevant job or task manager log. If you don’t
specify a port no extra JMX server will be started. Metrics are still available on the default
local JMX interface.Dependency:
Parameters:
host
- the gmond host address configured under udp_recv_channel.bind
in gmond.conf
port
- the gmond port configured under udp_recv_channel.port
in gmond.conf
tmax
- soft limit for how long an old metric should be retaineddmax
- hard limit for how long an old metric should be retainedttl
- time-to-live for transmitted UDP packetsaddressingMode
- UDP addressing mode to use (UNICAST/MULTICAST)Dependency:
Parameters:
host
- the Graphite server hostport
- the Graphite server portprotocol
- protocol to use (TCP/UDP)Dependency:
Parameters:
host
- the StatsD server hostport
- the StatsD server portFlink exposes the following system metrics:
Scope | Metrics | Description |
---|---|---|
JobManager | ||
TaskManager.Status | Network.AvailableMemorySegments | The number of unused memory segments. |
Network.TotalMemorySegments | The number of allocated memory segments. | |
TaskManager.Status.JVM | ClassLoader.ClassesLoaded | The total number of classes loaded since the start of the JVM. |
ClassLoader.ClassesUnloaded | The total number of classes unloaded since the start of the JVM. | |
GargabeCollector.<garbageCollector>.Count | The total number of collections that have occurred. | |
GargabeCollector.<garbageCollector>.Time | The total time spent performing garbage collection. | |
Memory.Heap.Used | The amount of heap memory currently used. | |
Memory.Heap.Committed | The amount of heap memory guaranteed to be available to the JVM. | |
Memory.Heap.Max | The maximum amount of heap memory that can be used for memory management. | |
Memory.NonHeap.Used | The amount of non-heap memory currently used. | |
Memory.NonHeap.Committed | The amount of non-heap memory guaranteed to be available to the JVM. | |
Memory.NonHeap.Max | The maximum amount of non-heap memory that can be used for memory management. | |
Memory.Direct.Count | The number of buffers in the direct buffer pool. | |
Memory.Direct.MemoryUsed | The amount of memory used by the JVM for the direct buffer pool. | |
Memory.Direct.TotalCapacity | The total capacity of all buffers in the direct buffer pool. | |
Memory.Mapped.Count | The number of buffers in the mapped buffer pool. | |
Memory.Mapped.MemoryUsed | The amount of memory used by the JVM for the mapped buffer pool. | |
Memory.Mapped.TotalCapacity | The number of buffers in the mapped buffer pool. | |
Threads.Count | The total number of live threads. | |
CPU.Load | The recent CPU usage of the JVM. | |
CPU.Time | The CPU time used by the JVM. | |
Job | ||
Task | currentLowWatermark | The lowest watermark a task has received. |
lastCheckpointDuration | The time it took to complete the last checkpoint. | |
lastCheckpointSize | The total size of the last checkpoint. | |
restartingTime | The time it took to restart the job. | |
numBytesInLocal | The total number of bytes this task has read from a local source. | |
numBytesInRemote | The total number of bytes this task has read from a remote source. | |
numBytesOut | The total number of bytes this task has emitted. | |
Operator | numRecordsIn | The total number of records this operator has received. |
numRecordsOut | The total number of records this operator has emitted. | |
numSplitsProcessed | The total number of InputSplits this data source has processed (if the operator is a data source). | |
latency | A latency gauge reporting the latency distribution from the different sources. |
Flink allows to track the latency of records traveling through the system. To enable the latency tracking
a latencyTrackingInterval
(in milliseconds) has to be set to a positive value in the ExecutionConfig
.
At the latencyTrackingInterval
, the sources will periodically emit a special record, called a LatencyMarker
.
The marker contains a timestamp from the time when the record has been emitted at the sources.
Latency markers can not overtake regular user records, thus if records are queuing up in front of an operator,
it will add to the latency tracked by the marker.
Note that the latency markers are not accounting for the time user records spend in operators as they are bypassing them. In particular the markers are not accounting for the time records spend for example in window buffers. Only if operators are not able to accept new records, thus they are queuing up, the latency measured using the markers will reflect that.
All intermediate operators keep a list of the last n
latencies from each source to compute
a latency distribution.
The sink operators keep a list from each source, and each parallel source instance to allow detecting
latency issues caused by individual machines.
Currently, Flink assumes that the clocks of all machines in the cluster are in sync. We recommend setting up an automated clock synchronisation service (like NTP) to avoid false latency results.