Scala API Extensions

In order to keep a fair amount of consistency between the Scala and Java APIs, some of the features that allow a high-level of expressiveness in Scala have been left out from the standard APIs for both batch and streaming.

If you want to enjoy the full Scala experience you can choose to opt-in to extensions that enhance the Scala API via implicit conversions.

To use all the available extensions, you can just add a simple import for the DataSet API

import org.apache.flink.api.scala.extensions._

or the DataStream API

import org.apache.flink.streaming.api.scala.extensions._

Alternatively, you can import individual extensions a-là-carte to only use those you prefer.

Accept partial functions

Normally, both the DataSet and DataStream APIs don’t accept anonymous pattern matching functions to deconstruct tuples, case classes or collections, like the following:

val data: DataSet[(Int, String, Double)] = // [...]
data.map {
  case (id, name, temperature) => // [...]
  // The previous line causes the following compilation error:
  // "The argument types of an anonymous function must be fully known. (SLS 8.5)"
}

This extension introduces new methods in both the DataSet and DataStream Scala API that have a one-to-one correspondance in the extended API. These delegating methods do support anonymous pattern matching functions.

DataSet API

Method Original Example
mapWith map (DataSet)
data.mapWith {
  case (_, value) => value.toString
}
mapPartitionWith mapPartition (DataSet)
data.mapPartitionWith {
  case head #:: _ => head
}
flatMapWith flatMap (DataSet)
data.flatMapWith {
  case (_, name, visitTimes) => visitTimes.map(name -> _)
}
filterWith filter (DataSet)
data.filterWith {
  case Train(_, isOnTime) => isOnTime
}
reduceWith reduce (DataSet, GroupedDataSet)
data.reduceWith {
  case ((_, amount1), (_, amount2)) => amount1 + amount2
}
reduceGroupWith reduceGroup (GroupedDataSet)
data.reduceGroupWith {
  case id #:: value #:: _ => id -> value
}
groupingBy groupBy (DataSet)
data.groupingBy {
  case (id, _, _) => id
}
sortGroupWith sortGroup (GroupedDataSet)
grouped.sortGroupWith(Order.ASCENDING) {
  case House(_, value) => value
}
combineGroupWith combineGroup (GroupedDataSet)
grouped.combineGroupWith {
  case header #:: amounts => amounts.sum
}
projecting apply (JoinDataSet, CrossDataSet)
data1.join(data2).
  whereClause(case (pk, _) => pk).
  isEqualTo(case (_, fk) => fk).
  projecting {
    case ((pk, tx), (products, fk)) => tx -> products
  }

data1.cross(data2).projecting {
  case ((a, _), (_, b) => a -> b
}
projecting apply (CoGroupDataSet)
data1.coGroup(data2).
  whereClause(case (pk, _) => pk).
  isEqualTo(case (_, fk) => fk).
  projecting {
    case (head1 #:: _, head2 #:: _) => head1 -> head2
  }
}

DataStream API

Method Original Example
mapWith map (DataStream)
data.mapWith {
  case (_, value) => value.toString
}
mapPartitionWith mapPartition (DataStream)
data.mapPartitionWith {
  case head #:: _ => head
}
flatMapWith flatMap (DataStream)
data.flatMapWith {
  case (_, name, visits) => visits.map(name -> _)
}
filterWith filter (DataStream)
data.filterWith {
  case Train(_, isOnTime) => isOnTime
}
keyingBy keyBy (DataStream)
data.keyingBy {
  case (id, _, _) => id
}
mapWith map (ConnectedDataStream)
data.mapWith(
  map1 = case (_, value) => value.toString,
  map2 = case (_, _, value, _) => value + 1
)
flatMapWith flatMap (ConnectedDataStream)
data.flatMapWith(
  flatMap1 = case (_, json) => parse(json),
  flatMap2 = case (_, _, json, _) => parse(json)
)
keyingBy keyBy (ConnectedDataStream)
data.keyingBy(
  key1 = case (_, timestamp) => timestamp,
  key2 = case (id, _, _) => id
)
reduceWith reduce (KeyedDataStream, WindowedDataStream)
data.reduceWith {
  case ((_, sum1), (_, sum2) => sum1 + sum2
}
foldWith fold (KeyedDataStream, WindowedDataStream)
data.foldWith(User(bought = 0)) {
  case (User(b), (_, items)) => User(b + items.size)
}
applyWith apply (WindowedDataStream)
data.applyWith(0)(
  foldFunction = case (sum, amount) => sum + amount
  windowFunction = case (k, w, sum) => // [...]
)
projecting apply (JoinedDataStream)
data1.join(data2).
  whereClause(case (pk, _) => pk).
  isEqualTo(case (_, fk) => fk).
  projecting {
    case ((pk, tx), (products, fk)) => tx -> products
  }

For more information on the semantics of each method, please refer to the DataSet and DataStream API documentation.

To use this extension exclusively, you can add the following import:

import org.apache.flink.api.scala.extensions.acceptPartialFunctions

for the DataSet extensions and

import org.apache.flink.streaming.api.scala.extensions.acceptPartialFunctions

The following snippet shows a minimal example of how to use these extension methods together (with the DataSet API):

object Main {
  import org.apache.flink.api.scala.extensions._
  case class Point(x: Double, y: Double)
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val ds = env.fromElements(Point(1, 2), Point(3, 4), Point(5, 6))
    ds.filterWith {
      case Point(x, _) => x > 1
    }.reduceWith {
      case (Point(x1, y1), (Point(x2, y2))) => Point(x1 + y1, x2 + y2)
    }.mapWith {
      case Point(x, y) => (x, y)
    }.flatMapWith {
      case (x, y) => Seq("x" -> x, "y" -> y)
    }.groupingBy {
      case (id, value) => id
    }
  }
}