Sparkアプリケーションの実行方法(spark-submit)

スポンサーリンク

Spark アプリケーションの実行コマンドである spark-submit の使用方法と実行のサンプルプログラムです。

spark-submitコマンド

spark-submitの基本構文は以下の通りです。

$ ${SPARK_HOME}/bin/spark-submit \
  --master <master-url> \
  --class <main-class>
  --name <name>
  ... # other options
  <application-jar> \
  [application-arguments]

 Submitting Applications - Spark Documentation

次のような起動オプションがあります。全てのオプションはspark-submit --helpで確認できます。

Options 内容
--master 動作モード。省略した場合は local モード。詳細は下記 Master URL の表を参照。
--class Java または Scala の main メソッドが実装されているアプリケーションのクラス。
--name アプリケーション名。Spark の Web UI に表示される。
--deploy-mode ドライバの起動先を Client か Cluster かを指定する。デフォルトは Client。
YARN の場合は--masterで指定するため通常使用しない。
--jars ドライバやエグゼキュータのクラスパスに追加したい JAR ファイルを指定。
通常は uber-JAR を使用するのが一般的。(カンマ区切り)
--files エグゼキュータにファイルを配布(カンマ区切り)
--conf SparkConf の設定オプションをプロパティ名=値の形式で指定する。
spark.*で始まるプロパティ名のみで、Java オプションや環境変数とは違う。
--properties-file SparkConf の設定が記述されたファイルのパスを指定。
デフォルトは${SPARK_HOME}/conf/spark-defaults.conf ※ テンプレート有
--driver-cores ドライバに割り当てるコア数(デフォルト1)
--driver-memory ドライバに割り当てるメモリ量(デフォルト1024M)
--num-executors 起動するエグゼキュータの数(デフォルト2)
--executor-cores エグゼキュータのコア数(デフォルト1)
--executor-memory エグゼキュータに割り当てるメモリ量(デフォルト1024M)

 Job Scheduling - Spark 1.5.2 Documentation

--masterの Master URL には次のような値が指定できます。

Master URL 内容
local ローカルでエグゼキュータに1 つのスレッドを割り当てる。
local[n] ローカルでエグゼキュータに n 個のスレッドを割り当てる。
local[*] ローカルでエグゼキュータにクライアントPCのコア数と同数のスレッドを割り当てる。
yarn-client YARN クラスタに接続。ドライバプログラムがクライアント上で動作。
 Running Spark on YARN
yarn-cluster YARN クラスタに接続。ドライバプログラムがクラスタ内の NodeManager 上で動作。
 Running Spark on YARN
spark://HOST:PORT Standalone クラスタに接続。デフォルトは7077ポート。
 Spark Standalone Mode
mesos://HOST:PORT Mesos クラスタに接続。デフォルトは5050ポート。ZooKeeper を使用してる場合はmesos://zk://...になる。 Running Spark on Mesos

Sparkアプリケーションの実行

 Quick Start にあるサンプルプログラムを Scala、Java、Python それぞれのパターンで実行します。--classの指定を分かり易くするためにパッケージ名を追加したことと、ファイルのパスを引数で受け取るようにしたこと以外は同じです。

Scala

ルートディレクトリを作成してソースコードを配置するディレクトリを作成します。パッケージ名はcom.example.sparkとします。

$ mkdir scala-sample
$ cd scala-sample
$ mkdir -p src/main/scala/com/example/spark
$ vi src/main/scala/com/example/spark/SimpleApp.scala

--classで指定するクラスであるSimpleApp.scalaを作成ます。

package com.example.spark

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = args(0) + "/README.md"
    val conf = new SparkConf().setAppName("Simple Application")
    val sc = new SparkContext(conf)
    val logData = sc.textFile(logFile, 2).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
  }
}

sbt でアプリケーションをビルドするためにsimple.sbtというファイルをプロジェクトのルートディレクトリ直下に作成します。

name := "Simple Project"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.5.2"

ディレクトリ構造は以下の通りです。

$ tree .
.
├── simple.sbt
└── src
    └── main
        └── scala
            └── com
                └── example
                    └── spark
                        └── SimpleApp.scala

6 directories, 2 files

sbt でビルドして作成された JAR ファイルを使用して実行してください。

$ sbt package
$ spark-submit \
  --master local \
  --class com.example.spark.SimpleApp \
  target/scala-2.10/simple-project_2.10-1.0.jar \
  /usr/local/Cellar/apache-spark/1.5.2
...
Lines with a: 60, Lines with b: 29

Java

基本は Scala と同じです。SimpleApp.javaを作成します。

package com.example.spark

import org.apache.spark.api.java.*;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;

public class SimpleApp {
  public static void main(String[] args) {
    String logFile = args[0] + "/README.md";
    SparkConf conf = new SparkConf().setAppName("Simple Application");
    JavaSparkContext sc = new JavaSparkContext(conf);
    JavaRDD<String> logData = sc.textFile(logFile).cache();

    long numAs = logData.filter(new Function<String, Boolean>() {
      public Boolean call(String s) { return s.contains("a"); }
    }).count();

    long numBs = logData.filter(new Function<String, Boolean>() {
      public Boolean call(String s) { return s.contains("b"); }
    }).count();

    System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
  }
}

Maven でビルドするためpom.xmlをルートディレクトリ直下に作成します。

<project>
  <groupId>edu.berkeley</groupId>
  <artifactId>simple-project</artifactId>
  <modelVersion>4.0.0</modelVersion>
  <name>Simple Project</name>
  <packaging>jar</packaging>
  <version>1.0</version>
  <dependencies>
    <dependency> <!-- Spark dependency -->
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.10</artifactId>
      <version>1.5.2</version>
    </dependency>
  </dependencies>
</project>

ディレクトリ構造は以下の通りです。

$ tree .
.
├── pom.xml
└── src
    └── main
        └── java
            └── com
                └── example
                    └── spark
                        └── SimpleApp.java

6 directories, 2 files

Maven でビルドして作成された JAR ファイルを使用して実行してください。

$ mvn package
$ spark-submit \
  --master local \
  --class com.example.spark.SimpleApp \
  target/simple-project-1.0.jar \
  /usr/local/Cellar/apache-spark/1.5.2
...
Lines with a: 60, Lines with b: 29

Python

SimpleApp.pyを作成します。

import sys
from pyspark import SparkContext

logFile = sys.argv[1] + "/README.md"
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()

print("Lines with a: %i, lines with b: %i" % (numAs, numBs))

Python の場合はビルドする必要がないのでスクリプトファイルをのまま指定してください。

$ spark-submit \
  --master local \
  SimpleApp.py \
  /usr/local/Cellar/apache-spark/1.5.2
...
Lines with a: 60, Lines with b: 29