Apache Spark の ペアRDD(PairRDDFunctions)について。
ペアRDDの生成
キー/値のペア RDD に対して適用できる特別な関数がPairRDDFunctions
クラスに実装されています。この関数を使用するために タプル型の RDD が必要になってくるのですが、使用する言語によって取り扱いが違っています。
Working with Key-Value Pairs - Spark Programming Guide
Scalaの場合
Scala でペア RDD を生成するにはmap
関数でタプル型を返してRDD[(K, V)]
を生成します。そして、タプル型からなる RDD に対しては暗黙的な型変換によってPairRDDFunctions
の関数が使用できますので、使用に関してあまり意識する必要はありません
val lines = sc.textFile("data.txt") val pairs = lines.map(s => (s, 1)) val counts = pairs.reduceByKey((a, b) => a + b)
Javaの場合
Java でペア RDD を扱うにはJavaPairRDD
というクラスに明示的な変換をする必要があります。JavaPairRDD
からはPairRDDFunctions
の関数が使用できます。
変換にはmap
関数の代わりにmapToPair
やflatMapToPair
関数を使用しなければいけません。これらは引数にタプル型を返すPairFunction
やPairFlatMapFunction
を受け取ります。
そして、Java には組み込みのタプル型がないので、scala.Table2
クラスを使ってnew Tuple2(key, val)
でタプルを生成します。要素にはtuble._1()
およびtuble._2()
というメソッドでアクセス可能です。
JavaRDD<String> lines = sc.textFile("data.txt"); JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1)); JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);
JavaPairRDD - Java Doc
PairRDDFunctions - Java Doc
Pythonの場合
Python でペア RDD を生成するにはmap
関数でタプル型を返す必要があります。Python は Java や Scala とは異なっており、全ての関数は RDD のベースクラスに実装されているので、ペア RDD 用の関数を実行しても RDD 内のデータ型が適切でなかった場合には実行時に失敗します。
lines = sc.textFile("data.txt") pairs = lines.map(lambda x: (x.split(" ")[0], 1)) counts = pairs.reduceByKey(lambda a, b: a + b)
ペアRDDの変換
ペアRDD では 標準の RDD で使えるすべての変換が使えます。ただし、ペアRDD にはタプルが含まれるので、個々の要素ではなくタプルを操作するものでなければいけません。
以下は、ペアRDD に対して適用できるPairRDDFunctions
クラスに実装されている変換で、データがキー/値形式になっている利点を生かすことができます。
メソッド | 引数 | 内容 |
---|---|---|
reduceByKey | (func: (V, V) => V) | 同じキーの値を結合する。 |
(func: (V, V) => V, numPartitions: Int) | ||
(partitioner: Partitioner, func: (V, V) => V) | ||
groupByKey | - | 同じキーの値をグループ化して返す。 戻り型: RDD[(K, Iterable[V])] |
(numPartitions: Int) | ||
(partitioner: Partitioner) | ||
combineByKey | (createCombiner: (V) ⇒ C, mergeValue: (C, V) ⇒ C, mergeCombiners: (C, C) ⇒ C) |
キー毎に値を結合する。 |
(createCombiner: (V) ⇒ C, mergeValue: (C, V) ⇒ C, mergeCombiners: (C, C) ⇒ C, numPartitions: Int) | ||
(createCombiner: (V) ⇒ C, mergeValue: (C, V) ⇒ C, mergeCombiners: (C, C) ⇒ C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null) | ||
aggregateByKey | ||
mapValues | (f: V => U) | 値のみに関数を適用する。 |
flatMapValues | (f: (V) ⇒ TraversableOnce[U]) | イテレータを返す関数を値に適用する。 |
keys | - | キーのみからなる`RDD[K]`を返す。 |
values | - | 値のみからなる`RDD[V]`を返す。 |
countApproxDistinctByKey | (relativeSD: Double = 0.05) | キー毎に値の種類数を返す。 戻り型: RDD[(K, Long)] |
(relativeSD: Double, numPartitions: Int) | ||
(relativeSD: Double, partitioner: Partitioner) | ||
(p: Int, sp: Int, partitioner: Partitioner) | ||
foldByKey | (zeroValue: V) (func: (V, V) ⇒ V) |
キー毎に値を畳み込む。 戻り型: RDD[(K, V)] |
(zeroValue: V, numPartitions: Int) (func: (V, V) ⇒ V) | ||
(zeroValue: V, partitioner: Partitioner) (func: (V, V) ⇒ V) | ||
partitionBy | (partitioner: Partitioner) | partitionerで分割されたRDDを返す。 戻り型: RDD[(K, V)] |
sampleByKey | (withReplacement: Boolean, fractions: Map[K, Double], seed: Long = Utils.random.nextLong) |
Basic Statistics - spark.mllib |
sampleByKeyExact | (withReplacement: Boolean, fractions: Map[K, Double], seed: Long = Utils.random.nextLong) |
Basic Statistics - spark.mllib |
復数のペアRDDに対する変換
メソッド | 引数 | 内容 |
---|---|---|
join | (other: RDD[(K, W)]) | 2つのRDDの内部結合(Inner Join)を行う。 戻り型: RDD[(K, (V, W))] |
(other: RDD[(K, W)], numPartitions: Int | ||
(other: RDD[(K, W)], partitioner: Partitioner) | ||
leftOuterJoin | (other: RDD[(K, W)]) | 2つのRDDの外部結合(Left Outer Join)を行う。 戻り型: RDD[(K, (V, Option[W]))] |
(other: RDD[(K, W)], numPartitions: Int) | ||
(other: RDD[(K, W)], partitioner: Partitioner) | ||
rightOuterJoin | (other: RDD[(K, W)]) | 2つのRDDの外部結合(Right Outer Join)を行う。 戻り型: RDD[(K, (V, Option[W]))] |
(other: RDD[(K, W)], numPartitions: Int) | ||
(other: RDD[(K, W)], partitioner: Partitioner) | ||
fullOuterJoin | (other: RDD[(K, W)]) | 2つのRDDの完全外部結合(Full Outer Join)を行う。 戻り型: RDD[(K, (Option[V], Option[W]))] |
(other: RDD[(K, W)], numPartitions: Int) | ||
(other: RDD[(K, W)], partitioner: Partitioner) | ||
subtractByKey | (other: RDD[(K, W)]) | 他のRDDに含まれているキーを持つ要素を取り除く。 戻り型: RDD[(K, V)] |
(other: RDD[(K, W)], numPartitions: Int) | ||
(other: RDD[(K, W)], partitioner: Partitioner) | ||
cogroup | (other: RDD[(K, W)]) | 両方のRDDの同じキーを持つデータをグループ化する。 同名メソッドで引数に他のRDDを3つまで指定可能。 戻り型: RDD[(K, (Iterable[V], Iterable[W]))] |
(other: RDD[(K, W)], numPartitions: Int) | ||
(other: RDD[(K, W)], partitioner: Partitioner) | ||
groupWith | (other: RDD[(K, W)]) | cogroup のエイリアス。他のRDDを3つまで指定可能。 戻り型: RDD[(K, (Iterable[V], Iterable[W]))] |
ペアRDDのアクション
変換と同様に、 標準の RDD で使えるすべてのアクションが使えます。
以下は、ペアRDD に対して適用できるPairRDDFunctions
クラスに実装されているアクションで、データがキー/値形式になっている利点を生かすことができます。
メソッド | 引数 | 内容 |
---|---|---|
countByKey | - | キー毎に要素数をカウントする。 戻り型: Map[K, Long] |
countByKeyApprox | (timeout: Long, confidence: Double = 0.95) |
タイムアウト付きの countByKey。 戻り型: PartialResult[Map[K, BoundedDouble]] |
collectAsMap | - | 要素をMapに格納し直す。同じキーが復数ある場合は、 そのうちの1つだけ返される。戻り型: Map[K, V] |
lookup | (key: K) | 指定したキーに関連づけられたすべての値を返す。 戻り型: Seq[V] |
reduceByKeyLocally | (func: (V, V) ⇒ V) | キー毎に値の演算を行う。reduceByKeyToDriver は非推奨になり、reduceByKeyLocallyのエイリアスとなっている。 戻り型: Map[K, V] |