読者です 読者をやめる 読者になる 読者になる

SparkのRDDについて

スポンサーリンク

Apache Spark の RDD について。

RDDの基本

耐障害性分散データセットであるRDD(Resilient Distributed Dataset)の特徴は以下の通りです。

  • イミュータブルなオブジェクトの分散コレクションである。
  • 復数のパーティションに分割されクラスタの各ノード上で処理される。
  • 生成や変換が遅延評価される。

RDD は復数のマシンから構成されるクラスタ上での分散処理を前提として設計されており、内部的にはパーティションに分割されています。Spark ではこのパーティションが分散処理の単位となり、パーティションごとに復数のマシンで処理することによって、単一のマシンでは処理しきれない大量のデータを扱うことができるのです。

 Scala Doc - org.apache.spark.rdd.RDD
 Java Doc - org.apache.spark.api.java.JavaRDD
 Python Doc - pyspark.RDD

RDDの生成

2つの方法で RDD を生成することができます。

1つは外部のデータセットをロードする方法です。

JavaRDD<String> distFile = sc.textFile("data.txt");  // Java
val distFile = sc.textFile("data.txt")  // Scala

もう1つはオブジェクトのコレクションをSparkContext#parallelizeメソッドに渡して分散させることです。Java の場合はJavaSparkContext#parallelizeになります。一度生成すると分散データセットは並列して操作することが可能です。

JavaRDD<String> distData = sc.parallelize(Arrays.asList("hoge", "fuga", "bar"));  // Java
val distData = sc.parallelize(Array("hoge", "fuga", "bar"))  // Scala

RDDの操作

RDD には変換アクションという2種類の処理が適用できます。

変換は既存の RDD を変化させるわけではなく「 RDD を加工して新しい RDD を得る」操作であり、アクションは「 RDD の内容を元に目的の結果を得る」操作です。 変換とアクションを判別する簡単な方法は、返される型を確認する方法です。変換は RDD を返しますが、アクションはそれ以外のデータ型を返します。

変換とアクションが区別されているのは、RDD に対する Spark 演算処理のやり方が異なるためです。 変換は定義された新しい RDD を返すだけであり、Spark はその処理を遅延評価形式でしか実行しません。 すなわち、定義された RDD は初めてアクションで使用された時点で生成処理が行われることになります。

変換(Transformations)

変換は既存の RDD を変化させるわけではなく、新しい RDD を得る操作です。

メソッド 備考
filter 条件を満たす要素のみ抽出する。
map 各要素に関数を適用して別の要素に変換する。
flatMap 各要素に関数を適用して別の要素に変換する。関数の返り値はイテレータ。
distinct 重複する要素を取り除く。

 RDD 変換 - Spark Documentation

アクション(Actions)

メソッド 備考
collect 要素を配列に格納して返す。
first 先頭の要素を返す。
count 要素数を返す。
max 最大の値を返す。
min 最小の値を返す。
take 先頭n個の要素を配列に格納して返す。

collectは RDD 全体を取り出すため、データセット全体がマシンのメモリに収まらなくてはいけません。多くの場合、RDD は大きすぎるので collect を使用するべきではなく、HDFS や S3 にデータを書き出すのが一般的です。

重要なのは、新しいアクションを呼ぶたびに、RDD 全体が最初から計算し直される点です。この非効率性を回避するために中間結果を永続化しておくことができます。

 RDD アクション - Spark Documentation

遅延評価

RDD に対する変換の評価は遅延させられます。すなわち、Spark はアクションが実行されるまで、変換の処理を始めません。また、データのロード(RDD の生成)も変換と同様に遅延評価され、必要になる(アクションが実行される)までデータはロードされません。

もう少しわかりやすく言うと、変換のfilterを呼んだ場合、Spark はその操作を実行する変わりに、操作が要求されたことを示すメタデータを内部的に記録します。RDD は特定のデータを格納しているものと考えるのではなく、変換を通じて構成されるデータの計算方法を含むものと考えるのが良いでしょう。

例えば次のようにファイルを読み込んでfilterの変換を実行した後にfirstのアクションを呼ぶ処理があった場合で考えてみたいと思います。

val input = sc.textFile("data.txt")
val result = input.filter(a => a.contains("Spark"))
result.first()

通常であれば初めにsc.textFileでファイルを読み込んだ時点で、全ての行が保存されたinputオブジェクトが生成され、次にfilterメソッドでフィルタリングされた結果としてresultオブジェクトが生成されます。 そうするとinputオブジェクトで使用された多くのストレージ領域が無駄になってしまいますが、遅延評価の場合は全体を見渡し、結果として必要なデータだけを計算することが可能になります。 そのためfirstアクションが呼ばれた場合は、ファイルの読み込みは条件にマッチする最初の行を見つけるまでになるので、ファイル全体を読み込むこともありません。

これらは直感に反することですが、遅延評価というアプローチもビッグデータを扱っていれば納得できますし、プログラムも複雑な処理を書くことなく、シンプルな操作群として構成することが可能です。

永続化(キャッシング)

Spark の RDD は遅延評価されるため、アクションが呼ばれるたびにデータをロードして計算し直されるのがデフォルトです。 これでは非効率なので、同じ RDD を何回もアクションで再利用したいのであれば、永続化させることで複数回計算されることを防ぐことができます。

永続化メソッド

RDD を永続化するには、RDD に対してpersistメソッドを呼び出します。引数には目的に応じて永続化レベルを設定できます。ただし、一度設定したら永続化を解除するまで変更することは出来ません。 永続化レベルはorg.apache.spark.storage.StorageLevelオブジェクトの定数から選択でき、以下の通りです。 レプリケーションは基本1台のみですが、2台のマシンにレプリケーションしたい場合は各永続化レベルの末尾に_2を付けて下さい。

永続化レベル 永続化先 シリアライズ
DISK_ONLY ディスクのみに格納
MEMORY_ONLY メモリのみに格納
MEMORY_ONLY_SER メモリのみに格納
MEMORY_AND_DISK メモリからあふれた分はディスクに格納
MEMORY_AND_DISK_SER メモリからあふれた分はディスクに格納
OFF_HEAP  Tachyonに格納

また、RDD にはcacheメソッドが用意されていますが、これはpersistStorageLevel.MEMORY_ONLYを設定して呼び出すのと同様です。

永続化のタイミング

RDD に対するpersistは最初のアクションよりも前に呼ぶ必要があります。 persistメソッドやcacheメソッドは、RDD を永続化することを宣言しているだけで遅延評価されるためです。

val result = input.map(x =>x * x)
result.persist(StorageLevel.MEMORY_AND_DISK)
println(result.count())
println(result.collect().mkString(","))
result.unpersist()

障害が発生した場合

RDD を計算するノードは、自分のパーティションを保存することになり、データを永続化したノードに障害があった場合、Spark は失われたデータのパーティションのみを再度実体化するようにタスクがスケジューリングされます。 そして当該パーティションの処理を含むタスクが割り当てられたエグゼキュータは再度永続化を試みます。このように RDD をいつでも再計算できることが、RDD が Resilient である理由でもあります。

また、ノードに障害があっても速度の低下を起こさないように、データを復数のノードに複製(レプリケーションを2台)しておくこともできます。

容量を超える場合

メモリに収まらないデータをキャッシュしようとした場合、Spark は Least Recently Used(LRU)キャッシュポリシーを使い、古いパーティションを自動的に退避させます。 メモリのみを使用する永続化レベルの場合、当該パーティションは次にアクセスされた時には計算し直されることになります。 ただし、メモリとディスクを使用する永続化レベルであれば、そのパーティションはディスクに書き出されます。 すなわちどちらの場合でも、ユーザーは Spark が大量にキャッシュしすぎてジョブが失敗することを心配する必要はありません。

とはいえ、永続化は便利な一方でエグゼキュータのメモリやディスクを消費するので、利用しない RDD はunpersistメソッドを呼び出して永続化を解除するようにしてください。

 RDD 永続化 - Spark Documentation