Apache Spark の RDD について。
RDDの基本
耐障害性分散データセットであるRDD(Resilient Distributed Dataset)の特徴は以下の通りです。
- イミュータブルなオブジェクトの分散コレクションである。
- 復数のパーティションに分割されクラスタの各ノード上で処理される。
- 生成や変換が遅延評価される。
RDD は復数のマシンから構成されるクラスタ上での分散処理を前提として設計されており、内部的にはパーティションに分割されています。Spark ではこのパーティションが分散処理の単位となり、パーティションごとに復数のマシンで処理することによって、単一のマシンでは処理しきれない大量のデータを扱うことができるのです。
Scala Doc - org.apache.spark.rdd.RDD
Java Doc - org.apache.spark.api.java.JavaRDD
Python Doc - pyspark.RDD
RDDの生成
2つの方法で RDD を生成することができます。
1つは外部のデータセットをロードする方法です。
JavaRDD<String> distFile = sc.textFile("data.txt"); // Java val distFile = sc.textFile("data.txt") // Scala
もう1つはオブジェクトのコレクションをSparkContext#parallelize
メソッドに渡して分散させることです。Java の場合はJavaSparkContext#parallelize
になります。一度生成すると分散データセットは並列して操作することが可能です。
JavaRDD<String> distData = sc.parallelize(Arrays.asList("hoge", "fuga", "bar")); // Java val distData = sc.parallelize(Array("hoge", "fuga", "bar")) // Scala
RDDの操作
RDD には変換とアクションという2種類の処理が適用できます。
変換は既存の RDD を変化させるわけではなく「 RDD を加工して新しい RDD を得る」操作であり、アクションは「 RDD の内容を元に目的の結果を得る」操作です。 変換とアクションを判別する簡単な方法は、返される型を確認する方法です。変換は RDD を返しますが、アクションはそれ以外のデータ型を返します。
変換とアクションが区別されているのは、RDD に対する Spark 演算処理のやり方が異なるためです。 変換は定義された新しい RDD を返すだけであり、Spark はその処理を遅延評価形式でしか実行しません。 すなわち、定義された RDD は初めてアクションで使用された時点で生成処理が行われることになります。
変換(Transformations)
変換は既存の RDD を変化させるわけではなく、新しい RDD を得る操作です。
メソッド | 備考 |
---|---|
filter | 条件を満たす要素のみ抽出する。 |
map | 各要素に関数を適用して別の要素に変換する。 |
flatMap | 各要素に関数を適用して別の要素に変換する。関数の返り値はイテレータ。 |
distinct | 重複する要素を取り除く。 |
アクション(Actions)
メソッド | 備考 |
---|---|
collect | 要素を配列に格納して返す。 |
first | 先頭の要素を返す。 |
count | 要素数を返す。 |
max | 最大の値を返す。 |
min | 最小の値を返す。 |
take | 先頭n個の要素を配列に格納して返す。 |
collect
は RDD 全体を取り出すため、データセット全体がマシンのメモリに収まらなくてはいけません。多くの場合、RDD は大きすぎるので collect を使用するべきではなく、HDFS や S3 にデータを書き出すのが一般的です。
重要なのは、新しいアクションを呼ぶたびに、RDD 全体が最初から計算し直される点です。この非効率性を回避するために中間結果を永続化しておくことができます。
RDD アクション - Spark Documentation
遅延評価
RDD に対する変換の評価は遅延させられます。すなわち、Spark はアクションが実行されるまで、変換の処理を始めません。また、データのロード(RDD の生成)も変換と同様に遅延評価され、必要になる(アクションが実行される)までデータはロードされません。
もう少しわかりやすく言うと、変換のfilter
を呼んだ場合、Spark はその操作を実行する変わりに、操作が要求されたことを示すメタデータを内部的に記録します。RDD は特定のデータを格納しているものと考えるのではなく、変換を通じて構成されるデータの計算方法を含むものと考えるのが良いでしょう。
例えば次のようにファイルを読み込んでfilter
の変換を実行した後にfirst
のアクションを呼ぶ処理があった場合で考えてみたいと思います。
val input = sc.textFile("data.txt") val result = input.filter(a => a.contains("Spark")) result.first()
通常であれば初めにsc.textFile
でファイルを読み込んだ時点で、全ての行が保存されたinput
オブジェクトが生成され、次にfilter
メソッドでフィルタリングされた結果としてresult
オブジェクトが生成されます。
そうするとinput
オブジェクトで使用された多くのストレージ領域が無駄になってしまいますが、遅延評価の場合は全体を見渡し、結果として必要なデータだけを計算することが可能になります。
そのためfirst
アクションが呼ばれた場合は、ファイルの読み込みは条件にマッチする最初の行を見つけるまでになるので、ファイル全体を読み込むこともありません。
これらは直感に反することですが、遅延評価というアプローチもビッグデータを扱っていれば納得できますし、プログラムも複雑な処理を書くことなく、シンプルな操作群として構成することが可能です。
永続化(キャッシング)
Spark の RDD は遅延評価されるため、アクションが呼ばれるたびにデータをロードして計算し直されるのがデフォルトです。 これでは非効率なので、同じ RDD を何回もアクションで再利用したいのであれば、永続化させることで複数回計算されることを防ぐことができます。
永続化メソッド
RDD を永続化するには、RDD に対してpersist
メソッドを呼び出します。引数には目的に応じて永続化レベルを設定できます。ただし、一度設定したら永続化を解除するまで変更することは出来ません。
永続化レベルはorg.apache.spark.storage.StorageLevel
オブジェクトの定数から選択でき、以下の通りです。
レプリケーションは基本1台のみですが、2台のマシンにレプリケーションしたい場合は各永続化レベルの末尾に_2
を付けて下さい。
永続化レベル | 永続化先 | シリアライズ |
---|---|---|
DISK_ONLY | ディスクのみに格納 | 有 |
MEMORY_ONLY | メモリのみに格納 | 無 |
MEMORY_ONLY_SER | メモリのみに格納 | 有 |
MEMORY_AND_DISK | メモリからあふれた分はディスクに格納 | 無 |
MEMORY_AND_DISK_SER | メモリからあふれた分はディスクに格納 | 有 |
OFF_HEAP | Tachyonに格納 | 有 |
また、RDD にはcache
メソッドが用意されていますが、これはpersist
にStorageLevel.MEMORY_ONLY
を設定して呼び出すのと同様です。
永続化のタイミング
RDD に対するpersist
は最初のアクションよりも前に呼ぶ必要があります。
persist
メソッドやcache
メソッドは、RDD を永続化することを宣言しているだけで遅延評価されるためです。
val result = input.map(x =>x * x) result.persist(StorageLevel.MEMORY_AND_DISK) println(result.count()) println(result.collect().mkString(",")) result.unpersist()
障害が発生した場合
RDD を計算するノードは、自分のパーティションを保存することになり、データを永続化したノードに障害があった場合、Spark は失われたデータのパーティションのみを再度実体化するようにタスクがスケジューリングされます。 そして当該パーティションの処理を含むタスクが割り当てられたエグゼキュータは再度永続化を試みます。このように RDD をいつでも再計算できることが、RDD が Resilient である理由でもあります。
また、ノードに障害があっても速度の低下を起こさないように、データを復数のノードに複製(レプリケーションを2台)しておくこともできます。
容量を超える場合
メモリに収まらないデータをキャッシュしようとした場合、Spark は Least Recently Used(LRU)キャッシュポリシーを使い、古いパーティションを自動的に退避させます。 メモリのみを使用する永続化レベルの場合、当該パーティションは次にアクセスされた時には計算し直されることになります。 ただし、メモリとディスクを使用する永続化レベルであれば、そのパーティションはディスクに書き出されます。 すなわちどちらの場合でも、ユーザーは Spark が大量にキャッシュしすぎてジョブが失敗することを心配する必要はありません。
とはいえ、永続化は便利な一方でエグゼキュータのメモリやディスクを消費するので、利用しない RDD はunpersist
メソッドを呼び出して永続化を解除するようにしてください。