読者です 読者をやめる 読者になる 読者になる

SparkのPairRDDについて

Spark
スポンサーリンク

Apache Spark の ペアRDD(PairRDDFunctions)について。

ペアRDDの生成

キー/値のペア RDD に対して適用できる特別な関数がPairRDDFunctionsクラスに実装されています。この関数を使用するために タプル型の RDD が必要になってくるのですが、使用する言語によって取り扱いが違っています。

 Working with Key-Value Pairs - Spark Programming Guide

Scalaの場合

Scala でペア RDD を生成するにはmap関数でタプル型を返してRDD[(K, V)]を生成します。そして、タプル型からなる RDD に対しては暗黙的な型変換によってPairRDDFunctionsの関数が使用できますので、使用に関してあまり意識する必要はありません

val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)

 PairRDDFunctions - Scala Doc

Javaの場合

Java でペア RDD を扱うにはJavaPairRDDというクラスに明示的な変換をする必要があります。JavaPairRDDからはPairRDDFunctionsの関数が使用できます。

変換にはmap関数の代わりにmapToPairflatMapToPair関数を使用しなければいけません。これらは引数にタプル型を返すPairFunctionPairFlatMapFunctionを受け取ります。

そして、Java には組み込みのタプル型がないので、scala.Table2クラスを使ってnew Tuple2(key, val)でタプルを生成します。要素にはtuble._1()およびtuble._2()というメソッドでアクセス可能です。

JavaRDD<String> lines = sc.textFile("data.txt");
JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);

 JavaPairRDD - Java Doc
 PairRDDFunctions - Java Doc

Pythonの場合

Python でペア RDD を生成するにはmap関数でタプル型を返す必要があります。Python は Java や Scala とは異なっており、全ての関数は RDD のベースクラスに実装されているので、ペア RDD 用の関数を実行しても RDD 内のデータ型が適切でなかった場合には実行時に失敗します。

lines = sc.textFile("data.txt")
pairs = lines.map(lambda x: (x.split(" ")[0], 1))
counts = pairs.reduceByKey(lambda a, b: a + b)

 pyspark.RDD - Python Doc

ペアRDDの変換

ペアRDD では  標準の RDD で使えるすべての変換が使えます。ただし、ペアRDD にはタプルが含まれるので、個々の要素ではなくタプルを操作するものでなければいけません。

以下は、ペアRDD に対して適用できるPairRDDFunctionsクラスに実装されている変換で、データがキー/値形式になっている利点を生かすことができます。

メソッド 引数 内容
reduceByKey (func: (V, V) => V) 同じキーの値を結合する。
(func: (V, V) => V,
numPartitions: Int)
(partitioner: Partitioner,
func: (V, V) => V)
groupByKey - 同じキーの値をグループ化して返す。
戻り型:RDD[(K, Iterable[V])]
(numPartitions: Int)
(partitioner: Partitioner)
combineByKey (createCombiner: (V) ⇒ C,
mergeValue: (C, V) ⇒ C,
mergeCombiners: (C, C) ⇒ C)
キー毎に値を結合する。
(createCombiner: (V) ⇒ C,
mergeValue: (C, V) ⇒ C,
mergeCombiners: (C, C) ⇒ C,
numPartitions: Int)
(createCombiner: (V) ⇒ C,
mergeValue: (C, V) ⇒ C,
mergeCombiners: (C, C) ⇒ C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null)
aggregateByKey
mapValues (f: V => U) 値のみに関数を適用する。
flatMapValues (f: (V) ⇒ TraversableOnce[U]) イテレータを返す関数を値に適用する。
keys - キーのみからなる`RDD[K]`を返す。
values - 値のみからなる`RDD[V]`を返す。
countApproxDistinctByKey (relativeSD: Double = 0.05) キー毎に値の種類数を返す。
戻り型:RDD[(K, Long)]
(relativeSD: Double,
numPartitions: Int)
(relativeSD: Double,
partitioner: Partitioner)
(p: Int, sp: Int,
partitioner: Partitioner)
foldByKey (zeroValue: V)
(func: (V, V) ⇒ V)
キー毎に値を畳み込む。
戻り型:RDD[(K, V)]
(zeroValue: V,
numPartitions: Int)
(func: (V, V) ⇒ V)
(zeroValue: V,
partitioner: Partitioner)
(func: (V, V) ⇒ V)
partitionBy (partitioner: Partitioner) partitionerで分割されたRDDを返す。
戻り型:RDD[(K, V)]
sampleByKey (withReplacement: Boolean,
fractions: Map[K, Double],
seed: Long = Utils.random.nextLong)
 Basic Statistics - spark.mllib
sampleByKeyExact (withReplacement: Boolean,
fractions: Map[K, Double],
seed: Long = Utils.random.nextLong)
 Basic Statistics - spark.mllib

復数のペアRDDに対する変換

メソッド 引数 内容
join (other: RDD[(K, W)]) 2つのRDDの内部結合(Inner Join)を行う。
戻り型:RDD[(K, (V, W))]
(other: RDD[(K, W)],
numPartitions: Int
(other: RDD[(K, W)],
partitioner: Partitioner)
leftOuterJoin (other: RDD[(K, W)]) 2つのRDDの外部結合(Left Outer Join)を行う。
戻り型:RDD[(K, (V, Option[W]))]
(other: RDD[(K, W)],
numPartitions: Int)
(other: RDD[(K, W)],
partitioner: Partitioner)
rightOuterJoin (other: RDD[(K, W)]) 2つのRDDの外部結合(Right Outer Join)を行う。
戻り型:RDD[(K, (V, Option[W]))]
(other: RDD[(K, W)],
numPartitions: Int)
(other: RDD[(K, W)],
partitioner: Partitioner)
fullOuterJoin (other: RDD[(K, W)]) 2つのRDDの完全外部結合(Full Outer Join)を行う。
戻り型:RDD[(K, (Option[V], Option[W]))]
(other: RDD[(K, W)],
numPartitions: Int)
(other: RDD[(K, W)],
partitioner: Partitioner)
subtractByKey (other: RDD[(K, W)]) 他のRDDに含まれているキーを持つ要素を取り除く。
戻り型:RDD[(K, V)]
(other: RDD[(K, W)],
numPartitions: Int)
(other: RDD[(K, W)],
partitioner: Partitioner)
cogroup (other: RDD[(K, W)]) 両方のRDDの同じキーを持つデータをグループ化する。
同名メソッドで引数に他のRDDを3つまで指定可能。
戻り型:RDD[(K, (Iterable[V], Iterable[W]))]
(other: RDD[(K, W)],
numPartitions: Int)
(other: RDD[(K, W)],
partitioner: Partitioner)
groupWith (other: RDD[(K, W)]) cogroup のエイリアス。他のRDDを3つまで指定可能。
戻り型:RDD[(K, (Iterable[V], Iterable[W]))]

ペアRDDのアクション

変換と同様に、 標準の RDD で使えるすべてのアクションが使えます。

以下は、ペアRDD に対して適用できるPairRDDFunctionsクラスに実装されているアクションで、データがキー/値形式になっている利点を生かすことができます。

メソッド 引数 内容
countByKey - キー毎に要素数をカウントする。
戻り型:Map[K, Long]
countByKeyApprox (timeout: Long,
confidence: Double = 0.95)
タイムアウト付きの countByKey。
戻り型:PartialResult[Map[K, BoundedDouble]]
collectAsMap - 要素をMapに格納し直す。同じキーが復数ある場合は、
そのうちの1つだけ返される。戻り型:Map[K, V]
lookup (key: K) 指定したキーに関連づけられたすべての値を返す。
戻り型:Seq[V]
reduceByKeyLocally (func: (V, V) ⇒ V) キー毎に値の演算を行う。reduceByKeyToDriver は非推奨になり、reduceByKeyLocallyのエイリアスとなっている。
戻り型:Map[K, V]