無印吉澤

Site Reliability Engineering(SRE)、ソフトウェア開発、クラウドコンピューティングなどについて、吉澤が調べたり試したことを書いていくブログです。

Treasure Data Tech Talk 201607 レポート(古橋さんと成瀬さんの講演メモのみ)

f:id:muziyoshiz:20160724153127p:plain:w320

先週末に、Treasure Data Tech Talk に参加してきました。このイベントは毎回濃い話を聞けるので、行けるときはなるべく参加するようにしています。

今回は、古橋さんによる Digdag での YAML 利用の話と、成瀬さんによる PerfectQueue の話が特に面白かったです。以下、講演内容のメモと、公開済みのスライドです。

講演内容

DigdagはなぜYAMLなのか? (Sadayuki Furuhashi, @frsyuki)

  • Digdag とは何か?

    • Workflow automation system
    • Digdag で一番やりたいのはバッチデータ解析の自動化
  • Digdag の競合

    • OSS, Proprietary それぞれに競合がある
    • Workflow automation system は、ワークフローの定義方法によって3つに分類できる
      • プログラミング言語型:Luigi など
      • GUI型:Rundeck など
      • 定義ファイル+スクリプト型:Azkaban など
    • ワークフローの作りやすさと、カスタマイズの柔軟性のトレードオフ
  • Digdag

    • Digdag は定義ファイル+スクリプト型
    • 定義ファイル+スクリプト+俺たちのYAML
    • YAMLは便利だが、include できない、変数の埋め込みができない、(言語内DSLのように)プログラムが書けない、という欠点がある
    • Digdag では、YAML の仕様に従ったうえで、これらの欠点を克服した
  • include できる

    • YAML の仕様では、値(scalar)の前に "!" から始まる文字(タグと呼ばれる)を付与できる
    • 通常、YAMLパーサは、正規表現によるマッチでタグを決定して、自動的にタグを付与している
    • Digdag では "!include : filename" という表記を、ファイルインクルードの文法として使っている
    • !include の後ろに、" " を書く必要がある。このスペースが大事。このスペースのおかげで、通常の YAML パーサでも、キーが " "、値が filename のハッシュとして読み込める
    • ただし、この !include を複数書くと、キーが " " のハッシュが重複してしまう。Digdag の YAML パーサでは、複数の !include を書けるように、内部的にキーを UUID に書き換えている
  • 変数の埋め込みができる、プログラムが書ける

    • Java 8 は、Nashorn(ナスホーン)という JavaScript Engine を同梱している。これを使って ${} 内を評価している
    • だから Digdag は Java8 必須
  • Q) 何故YAMLをベースにした?

    • A) 比較的書きやすい、読みやすい。YAMLとして既存のプログラムから扱える。
  • Q) YAMLはグラフ構造を表現するのに適さないのでは?

    • A) YAMLはDAG、グラフを扱うわけではない。ツリーを扱っている。
  • Q) YAMLにコードを書けるとのことだが、悪さはできないのか?

    • A) 悪さできないように対策している。JavaScript はサンドボックス内でしか動作しない。そのためにJavaScriptを採用した。

PerfectQueueはいかにパーフェクトか、あるいはRubyとMySQLでジョブキューを作る試みについて (Yui Naruse, @nalsh)

  • Who is naruse

    • nkfメンテナ
    • Rubyコミッタ
    • Treasure DataではバックエンドのRubyを担当
  • そもそもジョブキューとは

    • FIFO
    • フロントエンドとバックエンドを疎結合化
  • PerfectQueue の特徴

    • MySQL で実装
    • At-least-once を優先(at-least-once と at-most-once はトレードオフの関係)
  • キューのデータ構造

CREATE TABLE `queue` (
    /* unique key (-> at most once) */
    id VARCHAR(255) NOT NULL,
    /* for FIFO's timeline */
    timeout INT NOT NULL,
    /* opaque data */
    data LONGBLOB NOT NULL,
    /* alive or finished */
    created_at INT,
    PRIMARY KEY (id)
)
  • タスクのライフサイクル

    • タスクの投入時に、timeout, created_at を現在時刻にする
    • タスクの取得時は、timeout が小さいものから優先的に取得し、timeout を 300 秒後に更新
    • タスクの実行中は、ハートビートとして、timeout を定期的に 300 秒後に更新
    • タスクの完了後は、created_at を NULL にして、一定時間保存するために timeout を 720 秒後に更新(タスクの重複を検出するため)
    • 前述の retention time を過ぎたら物理削除
  • タスクの取得、削除時の排他制御が大変

    • 排他処理のために、最初期は FOR UPDATE を使っていた
      • しかし、頻繁にデッドロックする
      • デッドロックを避けるには MySQL の気持ちになってクエリを書く必要がある
    • LOCK TABLES を使うと、テーブル全体をロックしてしまうので、SELECT にも影響
    • GET_LOCK が一番安全、でもクエリの書き方によってはデッドロックが発生した
      • ロックの delete クエリと acquire のクエリがバッティングしないように、タイミングの調整が必要(間隔を乱数で変える)
      • 調整しないと性能問題が起きた
      • さらに、ネットワーク遅延が発生すると影響大
        • GET_LOCK から RELEASE LOCK まで 3 RTT かかるので、影響大
    • queue テーブルに owner カラムを追加したうえで、FOR UPDATE を使うクエリに変更し、テーブルロックを不要にした
      • MySQL の気持ちになって書いたので安全
  • 結論

    • PerfectQueue はより完璧になった
  • Q) 何故 MySQL をキューに選んだのか?

    • A) 当時、Amazon RDS で PostgreSQL が使えなかった。RDBMS を選んだ理由は、Amazonで提供されている、フェイルオーバーがある、など。新しいジョブキューを開発したのは、ジョブキューにフェアスケジューリングの機能(Treasure Dataのサービスで必要)を付けたかったから。(古橋)

感想

Digdag での YAML の拡張(いや、標準の仕様に従っているので拡張と言うのは不適切か?)については、話としては面白かったんですが、そもそも YAML でプログラミングするのは辛そう、というのが第一の感想でした。Ansible もそんな感じのつらみがありますしね。AnsibleSpec みたいな、Digdag で書いたワークフローをテストするツールとかが、いずれ出てきたりするんでしょうか。

また、マークアップ言語として YAML にこだわる必要があるんだろうか、とも思ったのですが、じゃあ代替手段として何があるのか、と考えてみると、なかなか難しそうです。XML よりもマシな選択肢となると、いまは YAML なのかな……。HashiCorp の HCL のように、独自方式を作る方向もあったと思いますが、Digdag に独自方式を作るほどの要件はなかったんですかね。

手を動かす Spark MLlib & Word2Vec Part 2 (Wikipedia 英語版から Word2Vec モデルを作るまで)

f:id:muziyoshiz:20160626223709p:plain

このシリーズについて

実際に手を動かして Spark MLlib に慣れていこう、というシリーズです。

Spark を使うならそれなりに大きなデータを分散処理しないと面白くないと思い、Wikipedia のデータから Word2Vec のモデルを作るところまでやってみました。環境構築については Part 1 をご参照ください。

muziyoshiz.hatenablog.com

Part 2 の範囲

Wikipedia 英語版のデータから作成したコーパスを Amazon EC2 上の Spark Cluster で処理して、Word2Vec のモデルを作成するところまで。

Wikipedia 英語版のデータからコーパス作成

最終的にやったこと

Wikipedia:Database download から辿って、https://dumps.wikimedia.org/enwiki/ (HTTP) または Data dump torrents (BitTorrent) からダウンロードできます。

Wikipedia のデータは XML および SQL で公開されており、それぞれ色々ファイルがあります。今回は Wikipedia の本文からコーパスを作りたいので enwiki-latest-pages-articles.xml.bz2 をダウンロードしました。私がダウンロードした時の最新版は 2016-06-03 作成の 12.1 GB のファイルでした。

このファイルを解凍すると以下のような XML が入っています。<text> タブの中身が本文です。

  <page>
    <title>Anarchism</title>
    <ns>0</ns>
    <id>12</id>
    <revision>
      <id>721573764</id>
      <parentid>719202660</parentid>
      <timestamp>2016-05-22T19:25:12Z</timestamp>
      <contributor>
        <username>PBS-AWB</username>
        <id>11989454</id>
      </contributor>
      <comment>modification to template Cite SEP and possibly some gen fixes using [[Project:AWB|AWB]]</comment>
      <model>wikitext</model>
      <format>text/x-wiki</format>
      <text xml:space="preserve">{{Redirect2|Anarchist|Anarchists|the fictional character|Anarchist (comics)|other uses|Anarchists (disambiguation)}}
{{pp-move-indef}}
{{Use British English|date=January 2014}}
{{Anarchism sidebar}}
'''Anarchism''' is a [[political philosophy]] that advocates [[self-governance|self-governed]] societies based on voluntary institutions.
(中略)
[[Category:Far-left politics]]</text>
      <sha1>sfoc30irh5k1bj62ubt29wp1ygxark0</sha1>
    </revision>
  </page>

この XML から本文だけ取り出す方法を色々探してみたところ、wp2txt というツールがあったので、今回はこちらを使わせてもらいました。

github.com

wp2txt は Ruby で書かれており、関連する gem のインストールに苦労したという記事も見かけたのですが、以下の手順で問題なく動作しました。

% mkdir wp2txt
% cd wp2txt
% bundle init
% echo 'gem "wp2txt"' >> Gemfile
% bundle install
% mkdir output_dir
% bundle exec wp2txt -i enwiki-latest-pages-articles.xml.bz2 -o enwiki --no-heading --no-title --no-marker

今回は本文だけが必要なので、余分な出力をなるべく減らすためのオプション(--no-heading, --no-title, --no-marker)を付けました。それでも、MediaWiki のマークアップなどは残ってしまうのですが、それは Spark で除去することにします。

wp2txt の処理が終わると、enwiki ディレクトリに enwiki-latest-pages-articles.xml-<連番>.txt というファイルが作成されます。今回は合計 1,754 ファイル、18 GB でした。ちなみに、私の環境(MacBook Pro 2.2 GHz, 16GB DDR3)では、実行に19時間くらいかかりました。

つまづいたこと

Wikipedia のデータからコーパスを作る方法を色々探したのですが、英語圏も含めて、簡単にできる方法がなかなか見つかりませんでした。

wp2txt 以外で良さそうなものとしては wiki2vec というツールが、コーパスを作る機能を持っているようでした。ただ、時間の都合で今回は試せませんでした。ちなみに、この記事を書くために読み返していて気付きましたが、wiki2vec のパラメータとして minCount = 50, vectorSize = 500, windowSize = 10 という例が載っていました。次はこれで試してみるのもよいかもしれません。

github.com

その後、Word2Vec 関係の記事を探しまわるなかで、以下の記事から wp2txt の存在に気づき、今回は wp2txt を使うことにしました。

techblog.gmo-ap.jp

コーパスの、S3 へのアップロード

最終的にやったこと

Apache EC2 上に構築した Spark Cluster に Wikipedia のファイルを渡さなければいけないのですが、データサイズが大きいので、今回は S3 経由で渡しました。

Spark の textFile メソッドは gzip 圧縮されたファイルも読み込めるので、まずは先程のファイルを圧縮します。圧縮後のファイルは 6 GB になりました。

% gzip enwiki/*.txt

そして、これを S3 にアップロードします。aws s3 cp コマンドはワイルドカードが使えないので、一括アップロード時には、ディレクトリ名を指定して --recursive を指定する必要があります。

% aws s3 cp enwiki s3://my-bucket-name/ --recursive

aws コマンドがない場合は、Installing the AWS Command Line Interface - AWS Command Line Interface に従ってインストールしてください。Mac の場合は pip でインストールします。

つまづいたこと

最初は、以下のように1ファイルにまとめてアップロードしたのですが、これだとファイル読み込みが全く分散されませんでした。

% cat enwiki/*.txt > enwiki.txt
% gzip enwiki.txt
% aws s3 cp enwiki.txt.gz s3://my-bucket-name/

今回は wp2txt がファイルを複数に分割してくれていましたが、他の方法でコーパスを作った場合も、ファイルを分けてアップロードしたほうがいいですね。

spark-submit で使う jar の作成

最終的にやったこと

いままでは spark-shell で Word2Vec を実行していましたが、データ量が増えると実行時間が長くなって、EC2 インスタンスへの接続が切れてしまう可能性が出てきます。そのため、Word2Vec を実行する簡単な jar を作って、spark-submit で実行することにします。

以下がそのコードです。

Simple Word2Vec application

Feature Extraction and Transformation にあるサンプルに、以下の修正を加えています。

  • コマンドライン引数から、読み込むファイル、モデルの出力先、Word2Vecのパラメータを設定
  • repartition メソッドを使って、RDD を分割(これをしないと読み込み後の処理が分散されない)
  • split する際の区切り文字に、スペース以外の文字も含めることで、MediaWiki 記法の文字を除去
  • filter で長さが 1 の文字を除去
  • 処理の最後で、実行時間を出力

jar をビルドしたい場合は、以下のリポジトリを使ってください。

github.com

sbt assembly を実行すると target/scala-2.11 ディレクトリに word2vec-model-generator-assembly-0.0.1.jar ができます。この jar ファイルだけ master ノードに持っていけば実行できます。

つまづいたこと

最初は repartition メソッドを使わなかったのですが、その場合、ファイルの読み込みは分散しても、その後の処理が途中まで全く分散されませんでした。repartition メソッドに渡す引数は scala - Spark: Repartition strategy after reading text file - Stack Overflow を参考にしました。

MediaWiki記法を除去するルールは、ローカルマシンで enwiki-latest-pages-articles.xml-0001.txt を処理して手探りで決めたのですが、もっと良い方法がありそうです。例えば、先ほど紹介した wiki2vec は、うまく処理していそうです("Word2Vec Corpus" の節を参照)。

Amazon EC2 への Spark クラスタの構築(5台構成)

最終的にやったこと

Part 1 でもクラスタを構築しましたが、それは1回削除して、Slave の台数とスペックを増やしたクラスタを作り直しました。ちなみに、spark-ec2 destory spark-cluster でクラスタを削除できます。

大規模データで Spark MLlib を試すのは初めてなので、手間取っている間にマシンが無駄に動いている……という可能性があったので(というか実際そうなったので)、少しケチって以下のスペックで構築しました。

  • master: r3.large ($0.185/hour, 2 vCPU, 15 GB memory, 1 x 32 SSD) 1台
  • slave: m3.2xlarge ($0.616/hour, 8 vCPU, 30 GB memory, 2 x 80 SSD) 5台

構築時のコマンドは以下の通りです。

./spark-ec2 \
--key-pair=word2vec-key-pair \
--identity-file=/Users/myoshiz/.ssh/word2vec-key-pair.pem \
--region=us-west-1 \
--zone=us-west-1a \
--master-instance-type=r3.large \
--instance-type=m3.2xlarge \
--copy-aws-credentials \
--hadoop-major-version=yarn \
--slaves 5 \
launch spark-cluster > spark-submit.log 2>&1 &

構築が完了したら、前回同様に .bash_profile に環境変数 EC2_SPARK_MASTER を設定します。また、今回は環境変数 AWS_ACCESS_KEY_ID と AWS_SECRET_ACCESS_KEY も設定します。これは、Spark アプリケーションから S3 にアクセスするために必要な設定です。

export EC2_SPARK_MASTER=ec2-xxx-xxx-xxx-xxx.us-west-1.compute.amazonaws.com
export AWS_ACCESS_KEY_ID={{ IAM ユーザ "word2vec-user" のアクセスキーID }}
export AWS_SECRET_ACCESS_KEY={{ IAM ユーザ "word2vec-user" のシークレットアクセスキー}}

つまづいたこと

Part 1 でも書きましたが、--copy-aws-credential を指定しても、Spark に対してはアクセスキーID、シークレットアクセスキーが設定されません。環境変数を設定したところ、Spark(spark-shell, spark-submit)から S3 にアクセスできるようになったので、今回はこの方法で済ませました。

なお、AWS のインスタンスプロファイルを使ってアクセス権限を与えることもできると思いますが、今回は試していません。クラスタを作ったり壊したりを繰り返していたため、そのたびに AWS マネジメントコンソールをいじるのは面倒だったので……。

spark-submit の実行

最終的にやったこと

先ほど作った jar を、master にコピーします。

% scp -i ~/.ssh/word2vec-key-pair.pem word2vec-model-generator-assembly-0.0.1.jar root@${EC2_SPARK_MASTER}:/root/

そして、以下のようにコマンドを実行すると、Word2Vec アプリケーションが実行されます。ssh 接続が切れた場合のために、バックグラウンドで実行し、標準出力はファイルに出力させておきます。

$ ./spark/bin/spark-submit \
--master spark://${EC2_SPARK_MASTER}:7077 \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.driver.memory=11g \
--conf spark.akka.frameSize=1024 \
--class jp.muziyoshiz.word2vec.Word2VecModelGenerator \
word2vec-model-generator-assembly-0.0.1.jar \
s3n://my-bucket-name/enwiki/*.txt.gz \
s3n://my-bucket-name/enwiki_model \
10 20 50 > spark-submit-wp8.log 2>&1 &

Spark から S3 にアクセスする際は、aws コマンドで指定するときの s3:// ではなくて、s3n:// を指定する必要がある点に注意です。

Word2Vec のパラメータを変えて、何度か実行してみたところ、実行時間は以下のようになりました。

Pattern No. numPartition numIteration minCount vectorSize 実行時間
1 10 1 5 10 75.0 min
2 10 1 20 20 82.6 min
3 10 1 20 50 130.6 min

最初のパターン1は、vectorSize をかなり減らしたにも関わらず時間がかかり、後述するようにモデルの精度もよくありませんでした。

そのため、パターン2では minCount(単語の最小出現回数)を大きくして、vocabSize(モデルに含まれる単語数)を減らしました。その結果、vectorSize をパターン1の2倍にしたにも関わらず、実行時間は1.1倍程度に収まり、モデルの精度も若干上がりました。

最後に、他のパラメータは同じままで vectorSize のみ50に増やしたところ、vectorSizeはパターン2の2.5倍で、実行時間は1.5倍になりました。

実行時間の内訳を Spark UI で確認したところ、処理の最後に slave 1台で実行するタスク(Locality Level NODE_LOCAL のタスク)があり、これが1〜2時間かかっていました。このタスクがボトルネックになっているということは、少なくとも Word2Vec については、Spark MLlib による分散処理のメリットって、もしかしてあまり無いとか……?

ただ、今回は numIteration を 1 で固定にしましたが、精度を上げるためには numPartition と同じ 10 まで上げたほうがよいはずです。numIteration を増やせば、分散処理されるタスクが占める割合も増えるので、Spark の恩恵が得られるのではないかと思います。それはまた今度試してみます。

つまづいたこと(1):ドライバのメモリ使用量を増やさないと落ちる

最初に実行したところ、以下のエラーが発生してジョブが止まりました。

16/06/25 09:39:46 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() for one-way message.
java.lang.IllegalStateException: unread block data

Spark UI の Environment タブで、Spark Properties を確認したところ、spark.executor.memory は m3.2xlarge(メモリ 30GB)に合わせて 26837m に指定されていたのですが、spark.driver.memory のほうは何も指定されていませんでした。spark.driver.memory のデフォルトは 1g です。

これを r3.large のメモリ 15GB から 4GB を引いた 11GB(11g)に指定したところ、このエラーは出なくなりました。また、Environment タブで、spark.driver.memory が指定されていることを確認できました。

つまづいたこと(2):vocabSize*vectorSize が大きすぎると落ちる

numPartition numIteration minCount vectorSize
10 1 5 100

Spark MLlib の vectorSize のデフォルト値は 100 なので、最初はこの値を使っていました。しかし、上記のパラメータの組み合わせで実行したところ、Stage 1 の処理の途中で以下のエラーが出て、タスクが止まってしまいました。

Exception in thread "main" java.lang.RuntimeException: Please increase minCount or decrease vectorSize in Word2Vec to avoid an OOM. You are highly recommended to make your vocabSize*vectorSize, which is 3856720*100 for now, less than `Int.MaxValue/8`.
        at org.apache.spark.mllib.feature.Word2Vec.fit(Word2Vec.scala:319)
(スタックトレースは省略)

Int.MaxValue/8 = 268435455 です。つまり、vocabSize(単語数)*vectorSize がこの上限を大幅に超えていることが原因のようです。Int の最大値の8分の1ってなんでまた……。

とにかく、単語数を減らすか、vectorSize を減らす必要があることがわかりました。そのため、これ以降のテストでは vectorSize を減らすパターン(パターン1)と、minCount を増やして単語数を減らすパターン(パターン2〜3)を試しました。

つまづいたこと(3):モデルのサイズが大きすぎると akka のフレームサイズ上限を超えて落ちる

Word2Vec.fit() の最後、生成された Word2Vec モデルを parquet 形式で出力するところで、以下のエラーが出て落ちました。

16/06/25 11:28:06 ERROR InsertIntoHadoopFsRelation: Aborting job.
org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 1793:0 was 213710572 bytes, which exceeds max allowed: spark.akka.frameSize (134217728 bytes) - reserved (204800 bytes). Consider increasing spark.akka.frameSize or using broadcast variables for large values.

spark.akka.frameSize のデフォルトは 128(単位は MB)で、タスクのサイズがこれを超えると落ちるようです。小さいデータセットでは出なかったエラーなのですが、Wikipedia 規模になると出るようです。1〜2時間待ったあとで、最後の最後にこのエラーで落ちると、非常に(精神的にも金銭的にも)痛いです……。

設定可能な上限値は調べても分かりませんでしたが、ひとまず --conf spark.akka.frameSize=1024 を指定して 1GB にしたところ、Word2Vec モデルの出力まで成功しました。

ローカルマシン上での Word2Vec モデルの利用

最終的にやったこと

先ほどの spark-submit の実行により、Word2Vec モデルが S3 にアップロードされました。このモデルをローカルマシンにダウンロードして使ってみます。

% aws s3 cp s3://my-bucket-name/enwiki_model ./enwiki_model --recursive

以下のコマンドで spark-shell を起動します。Part 1 で使ったオプションに加えて、--conf spark.kryoserializer.buffer.max=1g を指定しています。

% spark-shell --master local \
--conf spark.driver.memory=5g \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.kryoserializer.buffer.max=1g

そして、spark-shell のプロンプトで以下を実行し、Word2Vec モデルをロードします。

import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}

val model = Word2VecModel.load(sc, "enwiki_model")

また、今回は Apache Spark 入門に書かれたメソッドを使って、関係性を加味した推測を行ってみます。有名な例で言うと、「"king" に対する "kings" は、"queen" に対する何か?」という関係を、Word2Vecモデルから推測することができます。

以下は、Apache Spark 入門の p.224 から抜粋したコードです。これを spark-shell に貼り付けるか、ファイルに書いておいて :load <file name> でロードして使います。

import org.apache.spark.mllib.linalg.Vectors

def relationWords(w1: String, w2: String, target: String, model: Word2VecModel) :Array[(String, Double)] = {
    val b = breeze.linalg.Vector(model.getVectors(w1))
    val a = breeze.linalg.Vector(model.getVectors(w2))
    val c = breeze.linalg.Vector(model.getVectors(target))
    val x = c + (a - b)
    model.findSynonyms(Vectors.dense(x.toArray.map(_.toDouble)), 10)
}

パターン1:minCount = 5, vectorSize = 10

ベクトル数が小さいためか、精度はかなり悪いです。Tokyoという都市名、Japanという国名に対する類似語を求めても、上位10件にそれらしいものが現れません。

scala> model.findSynonyms("Tokyo", 10).foreach(println)
(Rebounder,5.026514312082014)
(Fivepenny,5.006271473525809)
(Riviera,4.9806280562664655)
(Pirmahal,4.977896311409738)
(A2217,4.973896329049228)
(Pestújhely,4.967955406306887)
(Tri,4.966647609406325)
(Cigarros,4.966214313196464)
(Seahorses,4.9657892250050715)
(Club,4.965424934604451)

scala> model.findSynonyms("Japan", 10).foreach(println)
(Prabda,3.8591253451462766)
(Skateabout,3.789246081518729)
(detailslink,3.756286768742609)
(Oceania,3.7439580152901946)
(Daeges,3.743037606956309)
(Equestrianism,3.73990681262581)
(Miegs,3.7392088293670396)
(Fleuth,3.735308547592705)
(KBID-LP,3.730579527776324)
(Powerlifting,3.717090309581691)

関係性を加味した推測も、以下のようにうまく行きませんでした。

scala> relationWords("king", "kings", "queen", model).foreach(println)
(satsuma-biwa,4.95347264322314)
(shoguns,4.93869343414127)
(mystics,4.931215483461304)
(Zelimxan,4.925167012454619)
(Christianized,4.922235458369835)
(veneration,4.921893688910249)
(Shi’i,4.921205040607001)
(Russified,4.917586471812209)
(pagan,4.912822109308089)
(revered,4.911351827558269)

scala> relationWords("prince", "king", "princess", model).foreach(println)
(Pandava,4.2101984410814834)
(Aegisthus,4.207452272387961)
(bandit,4.202362575975742)
(amanuensis,4.194580140364399)
(Aerope,4.188601884423512)
(tradesman,4.178661804898081)
(Candaules,4.177194064593601)
(princess,4.173209621638307)
(Shoulang,4.165125455530385)
(Seibei,4.163678291883964)

パターン2:minCount = 20, vectorSize = 20

単語数を減らし、ベクトル数を上げた結果、精度が若干向上しました。Tokyo に対する類義語として、日本の都市の Osaka、Sapporo が出てくるようになりました。一方で、Japan に対する類義語のほうは、あまり改善が見られません。

scala> model.findSynonyms("Tokyo", 10).foreach(println)
(Wrestle,7.689458080058069)
(Split,7.626499879518354)
(Osaka,7.620597049534027)
(Sapporo,7.556529623946273)
(Setagaya,7.513748270603075)
(Hiroshima,7.490792005499523)
(Shinjuku,7.45951304352636)
(Kanazawa,7.459122453399323)
(Expo,7.453010168798164)
(ESCOM,7.447874763780933)

scala> model.findSynonyms("Japan", 10).foreach(println)
(Tokyo,5.679376270328159)
(AXN,5.640570343734289)
(Wrestle,5.60396135079362)
(Expo,5.590382781259281)
(TV2,5.522196857434101)
(Hanoi,5.495135749493573)
(TV6,5.490184062697079)
(Kyoto,5.486577183328772)
(Skate,5.4760554670281065)
(Benelux,5.430530293625971)

関係性を加味した推測は、まだあまりうまくいきません。ただ、後者のほうは5位に正解の "queen" が出ているので、若干精度が向上しています。

scala> relationWords("king", "kings", "queen", model).foreach(println)
(pagan,6.667731329068959)
(garb,6.659426546093454)
(gods,6.648366573398432)
(symbolised,6.648168276539841)
(sacred,6.6085783714277975)
(personages,6.598811565877372)
(veneration,6.597536687593547)
(puranas,6.590383098194837)
(deities,6.588936982768422)
(beauties,6.588806331810932)

scala> relationWords("prince", "king", "princess", model).foreach(println)
(lord,6.574825899196509)
(princess,6.522661208674787)
(bride,6.521167177599623)
(lady,6.492377997870626)
(queen,6.479450084505509)
(first-born,6.466189456944019)
(king,6.441766970616445)
(blessed,6.441764119985444)
(beloved,6.4396910737789606)
(bridegroom,6.423838321417851)

パターン3:minCount = 20, vectorSize = 50

ベクトル数を大幅に増やした結果、かなり精度が向上しました。Tokyo に対して日本の都市、Japan に対してアジアの国名や首都が表示されるようになりました。

scala> model.findSynonyms("Tokyo", 10).foreach(println)
(Osaka,6.442472711716078)
(Fukuoka,6.3918200759436)
(Saitama,6.343209033208874)
(Setagaya,6.237343626467007)
(Japan,6.063812875793321)
(Sapporo,6.027676167552773)
(Nagano,5.955215285602899)
(Kobe,5.891646194480255)
(Yamagata,5.86912171881318)
(Shibuya,5.835765966270005)

scala> model.findSynonyms("Japan", 10).foreach(println)
(Tokyo,5.510337298616405)
(Korea,5.509610108188756)
(China,5.486622516556292)
(Fukuoka,5.378651363703807)
(Taiwan,5.377869828524535)
(Seoul,5.321357314331263)
(Shizuoka,5.31678565272272)
(Prefecture,5.297746506109964)
(Hamamatsu,5.159312705112953)
(Kanagawa,5.157422752148916)

関係性を加味した推測も、かなり精度が向上しました。前者は正解の "queens" が2位、後者も正解の "queen" が2位に表示されています。イテレーション数(numIteration)を増やすなど、更にパラメータを調整すれば、これらの単語が1位に上がることが期待できます。

scala> relationWords("king", "kings", "queen", model).foreach(println)
(realms,6.380416904839411)
(queens,6.292521776188793)
(knightly,6.2558567330155626)
(consorts,6.241017073100756)
(kings,6.200374546691251)
(kindreds,6.17249501613232)
(lamas,6.1721177720161915)
(monuments,6.147651372785442)
(patrilineal,6.1288029631730545)
(depictions,6.121416883901753)

scala> relationWords("prince", "king", "princess", model).foreach(println)
(princess,5.956775488416378)
(queen,5.9055082324742685)
(slaying,5.793197818446893)
(king’s,5.696965618712307)
(betrothal,5.59067630474941)
(goddess,5.58159904439838)
(apparition,5.554027664552106)
(martyrdom,5.534826668619817)
(Pelops,5.503355785910461)
(ancestress,5.4953139994512545)

つまづいたこと

spark-shell の起動時に --conf spark.kryoserializer.buffer.max=1g を指定しないと、Word2VecModel.load() の呼び出しで落ちました。

16/06/25 22:00:26 WARN TaskSetManager: Lost task 1.0 in stage 2.0 (TID 3, localhost): TaskKilled (killed intentionally)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, localhost): org.apache.spark.SparkException: Kryo serialization failed: Buffer overflow. Available: 1, required: 4. To avoid this, increase spark.kryoserializer.buffer.max value.
    at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:299)
(スタックトレースは省略)

あと、余談ですが、zsh を使っていると --master local[*] の指定ができないようです。以下のようなエラーが出ます。bash なら指定できるので、spark-shell の実行時だけ bash に切り替えました。

% spark-shell --master local[*] \
--conf spark.driver.memory=5g \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.kryoserializer.buffer.max=1g
zsh: no matches found: local[*]

Part 2 のまとめ

Wikipedia の大規模データを、Amazon EC2 上に構築した Spark Cluster で処理できました。また、実際にいろいろなパラメータを試してみて、大規模データを処理する際に注意すべき Spark のプロパティや、Word2Vec のパラメータを把握することができました。

今回生成した Word2Vec のモデルはまだあまり精度が良くありませんが、以下の方針で精度を上げることができそうです。

  • minCount を大きくして、単語数(vocabSize)を小さくする
  • vectorSize を増やす
  • numIteration を numPartition に近づける
  • 元データからノイズ(MediaWiki記法など)を除去する方法を改善する

実際に試してみて、Word2Vec の場合は slave 1台で実行される処理がボトルネックになっていることがわかりました。Amazon EC2 の高いマシンを借りているのに、slave 1台だけが頑張っていて、残りの4台は遊んでいる、というのはなかなか焦ります。

Slave 1台しか動かない時間を短くする方法としては、vocabSize*vectorSize を小さくするしかないのですかね? 実際のアプリで Word2Vec を使う際には、Spark MLlib での処理の前に、そのアプリで使わない単語を除去してしまって単語数を大幅に減らしておく、などの対策が必要かもしれません。

Part 2 の主な参考文献

手を動かす Spark MLlib & Word2Vec Part 1 (spark-ec2 でクラスタを構築するまで)

f:id:muziyoshiz:20160626223709p:plain

このシリーズについて

機械学習系のツールを全然使ったことがなかったので、勉強のためになにか1つ選んで、実際に手を動かしてみることにしました。マシンを並べて負荷分散することを想定して、まずは Spark MLlib を選びました。

このシリーズでは、Amazon EC2 上に構築した Spark Cluster (Standalone Mode) で、Wikipedia のデータから Word2Vec のモデルを作るところまでの方法を解説していきます。ただ、実際やってみてわかったのですが、Spark 自体、Spark MLlib の Word2Vec クラス、およびクラスタ構築に使った spark-ec2 に設定項目が多いせいで、細かいところで何度も何度もつまづきました……。

そのため、このシリーズでは各ステップについて、「最終的にやったこと」と、その最終的なやり方にたどり着くまでに「つまづいたこと」を分けました。やり方を知りたいだけの場合は「最終的にやったこと」の方だけ読んでください。「つまづいたこと」は、うまく行かなかった場合のための参考情報です。

Part 1 の範囲

Amazon EC2 に master 1台、slave 3台構成の Spark Cluster (Standalone mode) を構築し、spark-shell から Word2Vec を実行するところまで。

Spark をローカル環境(Mac)にインストールする

最終的にやったこと

まず、ローカル環境で Spark MLlib が動くかどうかを試してみました。環境は以下の通りです。

  • MacBook Pro (Retina, 15-inch, Mid 2014)
  • OS: OS X Yosemite 10.10.5
  • CPU: 2.2 GHz Intel Core i7
  • メモリ: 16GB 1600 MHz DDR3

OS X に Spark をインストールする場合、以下のコマンドだけでインストールできます(参考:ApacheSpark — BrewFormulas)。

% brew update
% brew install apache-spark

私が試した時点では Spark 1.6.1 でした。Java は Java 8 です。

% spark-shell --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.1
      /_/

Type --help for more information.

つまづいたこと

spark-shell ローカルモードで(--master local を指定して)実行すると、spark> というプロンプトが表示されるまでに、色々と WARN が出ます。

16/06/07 00:17:36 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)

BoneCP は JDBC Connection Pool ライブラリの名前です。scala - What do WARN messages mean when starting spark-shell? - Stack Overflow によると、ローカルモードで実行しているときは問題ないとのこと。

16/06/07 00:17:38 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
16/06/07 00:17:38 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException

こちらも、Hive metastore に接続できないことを表す WARN なので、ローカルモードでは関係ないと判断しました。

ローカル環境での Word2Vec の実行

最終的にやったこと

Spark MLlib のページ(Feature Extraction and Transformation)に、Spark MLlib に含まれる Word2Vec クラスを使ったサンプルコードがあります。これをローカルモードで実行してみます。

まず、サンプルコードで使っている text8.zip をダウンロードして、解凍します。これは、スペースで区切られた英単語が羅列された(意味のある文章ではない)100 MB のテキストファイルです。

% wget http://mattmahoney.net/dc/text8.zip
% unzip text8.zip
% ls -la text8
-rw-r--r--@ 1 myoshiz  staff  100000000  6  9  2006 text8

この text8 を置いたディレクトリで、以下のコマンドを実行します。spark.driver.memory はドライバのメモリ使用量を表すオプションで、デフォルトは 1g (1GB)です。

% spark-shell --master local \
--conf spark.driver.memory=5g \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer

spark-shell のプロンプトで、Word2Vec のサンプルコード を入力すれば、myModelPath ディレクトリ以下に、Word2Vec のモデルデータが生成されます。

なお、spark-shell の起動中は http://localhost:4040/ にアクセスすることで、ジョブの状態を確認できます。

つまづいたこと

最初は --conf spark.driver.memory=5g" を指定せずに spark-shell を起動していました。その状態でword2vec.fit(input)` を実行すると、OutOfMemoryError で spark-shell が落ちます。私の環境では、ファイルが 100MB だと落ちて、80MB まで減らすと落ちない、という状態でした。

scala> val model = word2vec.fit(input)
[Stage 0:>                                                          (0 + 1) / 3]
Exception in thread "refresh progress" java.lang.OutOfMemoryError: GC overhead limit exceeded
    at scala.StringContext.s(StringContext.scala:90)
(スタックトレース、および後続のエラーは省略)

エラーメッセージをもとに調べたところ、JVM の設定が悪いような情報をいくつか見かけました。

しかし、これを指定してもエラーメッセージは変わりませんでした。というか、私は Java 8 で実行していたので、そもそもこの設定には意味がありませんでした。

この Java の仕様変更を踏まえて、以下のように spark-shell を実行したところ、落ちなくなりました。ただし、この方法だと、OutOfMemoryError が出ないだけで、いつまでも処理が終わらないという状態になってしまいました……。

% SPARK_REPL_OPTS="-XX:MaxMetaspaceSize=1024m" spark-shell --master local

結局、Configuration に載っているメモリ関係のパラメータを一通り確認して、前述の spark.driver.memory を増やしたところ、うまく動いたようで、処理が完了しました。JVM のパラメータを変更する必要はなかったようです。

Amazon EC2 への Spark クラスタの構築(spark-ec2 を使った方法)

最終的にやったこと

Slave の台数を増やすことで、Spark MLlib の実行時間が短くなることを確認するために、Spark クラスタを構築しました。今回は Spark に同梱されている spark-ec2 というスクリプトを使って構築しました。このスクリプトの説明は Running Spark on EC2 - Spark 1.6.1 Documentation にあります。

Amazon Elastic MapReduce (EMR) で Spark を使えることは知っていますが、いずれオンプレに Spark クラスタを構築したかったのと、かといってマシンスペックを何パターンか試すときに手作業での構築は大変すぎたので spark-ec2 を使いました。

まず、AWS のマネジメントコンソールを使って、以下の設定を行います。今回は Spark の話がメインなので、AWS の設定の詳細は省略します。

  • IAM ユーザ "word2vec-user" の作成
  • IAM ユーザ "word2vec-user" に対する "AdministratorAccess" ポリシーのアタッチ(EC2 と S3 に絞っても良い)
  • EC2 でのキーペア "word2vec-key-pair" の作成
  • ローカルマシンに対する環境変数 AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY の設定
  • ローカルマシンに対するキーペアの配置(以下では /Users/myoshiz/.ssh/word2vec-key-pair.pem に置いたと仮定)

brew で Spark をインストールすると、spark-ec2 は入っていません。そのため、spark-ec2 を使うために、Apache Spark のダウンロードページ から zip ファイルをダウンロードします。今回は、以下のファイルを選択しました。

  • Spark release: 1.6.1
  • Package type: Pre-built for Hadoop 2.6 and later

ダウンロードした zip ファイルを解凍すると、ec2 ディレクトリに spark-ec2 というスクリプトが入っています。このディレクトリに移動し、まずは master 1台、slave 3台のクラスタを構築してみます。そのためには、以下のコマンドを実行します。

% ./spark-ec2 \
--key-pair=word2vec-key-pair \
--identity-file=/Users/myoshiz/.ssh/word2vec-key-pair.pem \
--region=us-west-1 \
--zone=us-west-1a \
--instance-type=m4.large \
--copy-aws-credentials \
--hadoop-major-version=yarn \
--slaves 3 \
launch spark-cluster

各オプションの意味と、上記の値を指定した理由は以下の通りです。

  • --region は、デフォルトはバージニア北部(us-east-1)が使われる。国内だとインスタンス利用費が若干高く、東海岸は遠いので、北カリフォルニア(us-west-1)を指定した。
  • --instance-type は、デフォルトでは m1.large が使われる。m1.large は古いインスタンスタイプのため、スペックに比して割高のため、同じく 2 vCPU、メモリ8GBの m4.large を指定。調べた時点では m1.large が $0.19/hour、m4.large が $0.14/hour だった。
  • --copy-aws-credentials を指定すると、環境変数に設定された AWS のアクセスキーが、master の hadoop にも設定される。ただし、後述の通り Spark に対しては設定されないので、hadoop コマンドを使わないなら、指定しなくても良い。
  • --hadoop-major-version=yarn は、使用する Hadoop のバージョンを指定する。今回は Pre-built for Hadoop 2.6 and later をダウンロードしているので、yarn を指定する必要がある。デフォルトは 1(Hadoop 1.0.4)。
  • --slaves は slave の台数を指定する。

10〜20分待つとクラスタの構築が完了し、以下のようなメッセージが表示されます。Mac から以下の URL にアクセスすると、Spark UI や、Ganglia の画面を確認できます。

Spark standalone cluster started at http://ec2-xxx-xxx-xxx-xxx.us-west-1.compute.amazonaws.com:8080
Ganglia started at http://ec2-xxx-xxx-xxx-xxx.us-west-1.compute.amazonaws.com:5080/ganglia
Done!

上記のホスト名は、以降の作業でも使うので、以下のように環境変数に設定しておきます。シェルの設定ファイル(.bash_profile とか)で指定してもいいですが、クラスタを作るたびにホスト名が変わる点だけは注意が必要です。

% export EC2_SPARK_MASTER=ec2-xxx-xxx-xxx-xxx.us-west-1.compute.amazonaws.com

つまづいたこと(1):GitHub の spark-ec2

brew で spark をインストールすると、そのなかには spark-ec2 が入っていません。そのため、このスクリプトだけ別に入手できないかと思い、GitHub で公開されている spark-ec2 を clone して実行してみました。

github.com

この spark-ec2 を実行すると、エラーも出ずに最後まで処理が進むのですが、Spark クラスタが起動しないようです。http://ec2-xxx-xxx-xxx-xxx.us-west-1.compute.amazonaws.com:8080 にアクセスしても応答がなく、spark-shell で --master spark://ec2-xxx-xxx-xxx-xxx.us-west-1.compute.amazonaws.com:7077 を指定しても接続できない、という状態になりました。

色々悩んだ結果、大人しく Apache Spark のダウンロードページ から zip ファイルをダウンロードして、そのなかの spark-ec2 を使ったところ、実行したコマンドの引数は同じにも関わらず、クラスタが起動しました。

GitHub 版も見た目はきちんと動いているように見えたために、他に原因があると思い込んでしまい、この問題で数日詰まってしまいました……。

つまづいたこと(2):--hadoop-major-version=yarn の指定

このオプションは、公式サイトの Running Spark on EC2 には書かれていません。しかし spark-ec2 --help を実行すると、以下のオプションが表示されます。

  --hadoop-major-version=HADOOP_MAJOR_VERSION
                        Major version of Hadoop. Valid options are 1 (Hadoop
                        1.0.4), 2 (CDH 4.2.0), yarn (Hadoop 2.4.0) (default:
                        1)

上記のオプションを指定しないと、クラスタの構築後に spark-shell を実行した時に、以下のようなエラーが出て sqlContext の初期化に失敗しました。

16/06/13 14:08:02 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MResourceUri" is tagged as "embedded-only" so does not have its own datastore table.
java.lang.RuntimeException: java.io.IOException: Filesystem closed
    at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522)
(中略)
<console>:16: error: not found: value sqlContext
         import sqlContext.implicits._
                ^
<console>:16: error: not found: value sqlContext
         import sqlContext.sql
                ^

Spark クラスタでの Word2Vec の実行

最終的にやったこと

spark-ec2 の login コマンドを使用すると、master にログインできます。もちろん ssh でもログインできますが、master のホスト名を書かなくてよいのがメリットだと思います。ちなみに、オプションの指定が面倒ですが、以下の3つは必須のようです。

% ./spark-ec2 \
--key-pair=word2vec-key-pair \
--identity-file=/Users/myoshiz/.ssh/word2vec-key-pair.pem \
--region=us-west-1 \
login spark-cluster

次に、先ほどと同じサンプルコードを実行するために、text8.zip をダウンロードします。また、このファイルを、クラスタ上で動作する HDFS にアップロードします。これは、ファイルを slave からアクセス可能にするための作業です。後ほど、HDFS の代わりに S3 を使う方法も紹介します。

$ wget http://mattmahoney.net/dc/text8.zip
$ unzip text8.zip
$ ./ephemeral-hdfs/bin/hadoop fs -put text8 /

ここまでの準備が終わったら、master 上で spark-shell を実行します。指定するオプションは以下の通りです。ローカルモードの場合とは、--master の指定が変わっています。

$ ./spark/bin/spark-shell \
--master spark://${EC2_SPARK_MASTER}:7077 \
--conf spark.driver.memory=5g \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer

ホスト名の指定が面倒ですが、--master spark://localhost:7077 という指定では接続できませんでした。

あとは、spark-shell で以下のように入力すると、Word2Vec が実行されます。ローカルモードとの違いは、textFile() や save() に渡されたファイルパスが、HDFS のファイルパスとして扱われることです。今回はルート直下に text8 を置いたため、/text8 のように指定しています。

import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}

val input = sc.textFile("/text8").map(line => line.split(" ").toSeq)

val word2vec = new Word2Vec()

val model = word2vec.fit(input)

val synonyms = model.findSynonyms("china", 40)

for((synonym, cosineSimilarity) <- synonyms) {
  println(s"$synonym $cosineSimilarity")
}

// Save and load model
model.save(sc, "/model_text8")
val sameModel = Word2VecModel.load(sc, "/model_text8")

以上により、Word2Vec のジョブが slave 上で実行されます。ただ、http://ec2-xxx-xxx-xxx-xxx.us-west-1.compute.amazonaws.com:8080 にアクセスするとわかるのですが、このままだと3台ある slave のうち、1台しか使われません。次は、ジョブを分散するために、Word2Vec のパラメータを変更します。

つまづいたこと

最初、ローカルディスクのファイルにアクセスできないことに気づきませんでした。file:// を付けても駄目でした。

次に、HDFS 上にファイルをアップロードする方法で悩んだのですが、これは ./ephemeral-hdfs/bin 以下のコマンドが使えることに気付いたあとは簡単でした。hadoop コマンドに馴染みのない人は、Apache Hadoop 2.7.1 – などが参考になると思います。

すべての slave に処理が分散されることの確認(Word2Vec のパラメータ変更)

最終的にやったこと

Word2Vec のパラメータは、Word2Vec クラスの setter で指定できます。用意された setter とそのデフォルト値は Word2Vec の API リファレンス に記載されています。

これらの setter のうち、setNumPartitions() でパーティション数を1よりも大きくすると、複数の slave 間で処理が分散されます。この値のデフォルトが1なので、そのままでは slave が1台しか使われません。

val word2vec = new Word2Vec()

// Set this
word2vec.setNumPartitions(4)

val model = word2vec.fit(input)

slave 3台で試したところ、パーティション数を4まで増やした段階で、すべてのslaveに処理が分散されました。ただ、5台で試したときには、パーティション数を6にしても、slave 4台しか使われませんでした。単純に slave の台数 + 1 にすればよいというわけではなさそうで、詳細はまだわかりませんが、少なくとも slave の台数よりも大きい数を指定する必要がありそうです。

ただ、このパーティション数を増やすと、増やした分だけ負荷分散されて処理時間が短くなっていく一方で、計算結果の正確さも落ちていくとのことです。Word2Vec の処理が分散しない理由を調べている際に、以下の情報を見かけました。

stackoverflow.com

  • イテレーションの数は、パーティション数と同じか、それ以下にすべき
  • 正確さのために、パーティション数は小さい値を使うべき
  • 結果(モデル)を正確にするためには、複数のイテレーションが必要

どれくらい結果が変わっていくのか、text8 を3台の slave 上で処理して調べてみました。以下は、numPartition = 1, 3, 6 での、"china" に類似した単語の上位10件です。パーティションが1個の場合の上位3件を太字にしています。text8 は意味のない文字列ですが、結果が変わっていく様子は参考になると思います。

numPartitions 1 3 6
1位 taiwan taiwan indonesia
2位 korea korea taiwan
3位 japan japan afghanistan
4位 mongolia mainland kazakhstan
5位 shanghai indonesia pakistan
6位 tibet india japan
7位 republic pakistan ireland
8位 india mongolia india
9位 manchuria thailand uzbekistan
10位 thailand africa iran

ちなみに、使われた slave の台数と、処理時間の関係は以下のようになりました。text8 くらいのデータ量(100 MB)だと、おおよそ、使われる slave の台数に応じて大きく処理時間が減るようです。

numPartitions 1 3 6
slave の台数 1 2 3
処理時間 6.3 min 3.6 min 1.9 min

つまづいたこと

負荷分散しない理由が Word2Vec のパラメータの方にある、ということが最初なかなか分からずに苦労しました。

普通に考えると「Spark MLlib から Word2Vec を使いたい人=負荷分散を期待している人」だから、Word2Vec のパラメータの初期値は負荷分散するようになっているはずだ(だから Spark のパラメータの方に問題があるはずだ)と思い込んでいました……。

Part 1 のまとめ

ここまでの手順で、Amazon EC2 上に Spark クラスタを構築する方法を確認できました。

次の Part 2 では、この Spark クラスタの slave の台数およびマシンスペックを強化し、処理するデータ量も Wikipedia 英語版(Gzip 圧縮した状態で 12 GB)まで増やしてみます。

Part 1 の主な参考文献

Habitat を触っていて気になった、細かいことあれこれ

f:id:muziyoshiz:20160617201522p:plain

Habitat について知りたい方は、まずは私がエンジニアブログに書いた Habitat の概要説明をご覧ください。自作のイメージ図を使って、Habitat のわかりにくい独自用語を解説しています。

recruit.gmo.jp

で、上記の記事を書いた時に、あまりにも細かすぎるので省いた話題がいくつかありました。放っておくと忘れそうなので、今回はその細かいことあれこれをご紹介します。

"Habitat" の意味

  • Habitat とは、居住環境、居住地、生息地、などの意味を持つ英単語です。
  • これを書いている時点で "Habitat" でググったところ 約 146,000,000 件ヒットしました。Chef といい、この会社はどうしてこう、検索しにくい名前をツールに付けてしまうのか……。

Habitat のバージョン番号

  • これを書いている時点で hab -V を実行したら hab 0.7.0/20160614231131 と出てきました。
  • 0.1 でも 1.0 でもない、これまた微妙なところを……。開発陣としては、現時点の実装をどれくらいの完成度だと思っているんでしょう? もう実サービスで使えるレベル? 少なくとも、ドキュメントのなかに「まだ production に使うな」というありがちな文章は見当たりませんでした。

Habitat の推奨環境

  • Habitat が推奨する、あるいは Habitat 開発者が最初にテストしている Linux ディストリビューションって何なんでしょう?
  • Habitat ファーストインプレッション にも書いた通り、現時点では Linux でしか Supervisor は動作しません。とはいえ、私が VirtualBox & CentOS 7 で試したところ、それでもチュートリアル通りには動きませんでした。
  • habitat-sh/habitat: Modern applications with built-in automation のトップディレクトリにある Dockerfile が FROM ubuntu:xenial で始まってるので、Ubuntu の可能性大。次に試すときは Ubuntu でやります。

Package と Artifact

  • ドキュメントを読んでいると、Package と同じ概念を Artifact と呼んでいる箇所がいくつかありました。Artifact という用語も使われているのか、この用語はもう廃止されたけどドキュメントの一部に残っているだけなのか?
  • そういえば、Package ファイルの拡張子の hart って、Habitat Artifact の略称なんですかね。

Depot への Package のアップロード

  • Habitat CLI reference の目次には "hab pkg upload" のリンクが載っているんですが、このリンクをクリックした先の説明はありませんでした。
  • hap pkg upload --help を実行すると、hab pkg upload [FLAGS] [OPTIONS] <HART_FILE>... と出てきます。なんだあるんじゃーんと思ってコマンドを叩いたら、チュートリアルで作った muziyoshiz/mytutorialapp を見事アップロードできました(アップロード先)。
  • で、アップロードはできたんですが、Habitat Web って、パッケージの削除機能がまだ無いみたいです。もしかして、これが CLI reference からコマンドが消されている理由では。削除機能が追加されたら消しますごめんなさい……。

Habitat の P2P ネットワークと Topology

  • Habitat は Supervisor 同士で P2P ネットワークを構築します。このネットワークのことを Ring あるいは Supervisor Ring と呼ぶようです。しかし、Supervisor Internals によると、このネットワークは SWIM (Scalable Weakly-consistent Infection-style process group Membership protocol) で構成されるとのこと。リングネットワークを組んでいるわけでもないのに Ring というのはモヤモヤします。
  • 一方、Habitat には Topology という用語があり、Supervisor 間の論理的な関係を定義できます。Running packages in topologies によると、現時点では standalone, leader-follower, initializer の3種類から選べるようです。Topology という単語が Network Topology を指しているわけではない、というのもなんだかモヤモヤします。

設定ファイルの一部で TOML 形式を採用

  • Habitat は設定ファイルを Handlebars 形式で書くことができて、この Handlebars に渡す変数を TOML 形式で定義できます。
  • TOML 形式って初耳だったので調べてみたところ、toml/toml-v0.4.0.md に仕様がありました。Tom's Obvious, Minimal Language の略で TOML なんですね。僕も便乗して YOML とか作ってやろうか(紛らわしすぎる)。
  • ちなみにこの仕様、有志による日本語訳が toml/toml-v0.4.0.md にて公開されていました。
  • HashiCorp にも HCL とかありますし、運用管理ツールを開発していると Yet Another な YAML が欲しくなるものなんでしょうか。

Habitat と Google Analytics

  • hab setup を実行してセットアップすると、その最後に、利用データを Habitat の Google Analytics アカウントにアップロードしてよいかと尋ねられます。No を選択すると ~/.hab/cache/analytics/OPTED_OUT に空のファイルが作られて、それきり何も質問されません。
  • ツールの利用状況を収集したい気持ちはよくわかります。でもまあ、No を選択しますよね。そういえば、最近 Google Analytics を後から導入して揉めたソフトがなにかあった気がしますけど、何でしたっけ?

とりとめもなくなってきたので、今日はこのへんで。