読者です 読者をやめる 読者になる 読者になる

無印吉澤

運用管理、プログラミング、クラウドコンピューティングなどに関する技術メモ置き場です。

Treasure Data Tech Talk 201607 レポート(古橋さんと成瀬さんの講演メモのみ)

Event Report Digdag PerfectQueue MySQL
f:id:muziyoshiz:20160724153127p:plain:w320

先週末に、Treasure Data Tech Talk に参加してきました。このイベントは毎回濃い話を聞けるので、行けるときはなるべく参加するようにしています。

今回は、古橋さんによる Digdag での YAML 利用の話と、成瀬さんによる PerfectQueue の話が特に面白かったです。以下、講演内容のメモと、公開済みのスライドです。

講演内容

DigdagはなぜYAMLなのか? (Sadayuki Furuhashi, @frsyuki)

  • Digdag とは何か?

    • Workflow automation system
    • Digdag で一番やりたいのはバッチデータ解析の自動化
  • Digdag の競合

    • OSS, Proprietary それぞれに競合がある
    • Workflow automation system は、ワークフローの定義方法によって3つに分類できる
      • プログラミング言語型:Luigi など
      • GUI型:Rundeck など
      • 定義ファイル+スクリプト型:Azkaban など
    • ワークフローの作りやすさと、カスタマイズの柔軟性のトレードオフ
  • Digdag

    • Digdag は定義ファイル+スクリプト型
    • 定義ファイル+スクリプト+俺たちのYAML
    • YAMLは便利だが、include できない、変数の埋め込みができない、(言語内DSLのように)プログラムが書けない、という欠点がある
    • Digdag では、YAML の仕様に従ったうえで、これらの欠点を克服した
  • include できる

    • YAML の仕様では、値(scalar)の前に "!" から始まる文字(タグと呼ばれる)を付与できる
    • 通常、YAMLパーサは、正規表現によるマッチでタグを決定して、自動的にタグを付与している
    • Digdag では "!include : filename" という表記を、ファイルインクルードの文法として使っている
    • !include の後ろに、" " を書く必要がある。このスペースが大事。このスペースのおかげで、通常の YAML パーサでも、キーが " "、値が filename のハッシュとして読み込める
    • ただし、この !include を複数書くと、キーが " " のハッシュが重複してしまう。Digdag の YAML パーサでは、複数の !include を書けるように、内部的にキーを UUID に書き換えている
  • 変数の埋め込みができる、プログラムが書ける

    • Java 8 は、Nashorn(ナスホーン)という JavaScript Engine を同梱している。これを使って ${} 内を評価している
    • だから Digdag は Java8 必須
  • Q) 何故YAMLをベースにした?

    • A) 比較的書きやすい、読みやすい。YAMLとして既存のプログラムから扱える。
  • Q) YAMLはグラフ構造を表現するのに適さないのでは?

    • A) YAMLはDAG、グラフを扱うわけではない。ツリーを扱っている。
  • Q) YAMLにコードを書けるとのことだが、悪さはできないのか?

    • A) 悪さできないように対策している。JavaScript はサンドボックス内でしか動作しない。そのためにJavaScriptを採用した。

PerfectQueueはいかにパーフェクトか、あるいはRubyとMySQLでジョブキューを作る試みについて (Yui Naruse, @nalsh)

  • Who is naruse

    • nkfメンテナ
    • Rubyコミッタ
    • Treasure DataではバックエンドのRubyを担当
  • そもそもジョブキューとは

    • FIFO
    • フロントエンドとバックエンドを疎結合化
  • PerfectQueue の特徴

    • MySQL で実装
    • At-least-once を優先(at-least-once と at-most-once はトレードオフの関係)
  • キューのデータ構造

CREATE TABLE `queue` (
    /* unique key (-> at most once) */
    id VARCHAR(255) NOT NULL,
    /* for FIFO's timeline */
    timeout INT NOT NULL,
    /* opaque data */
    data LONGBLOB NOT NULL,
    /* alive or finished */
    created_at INT,
    PRIMARY KEY (id)
)
  • タスクのライフサイクル

    • タスクの投入時に、timeout, created_at を現在時刻にする
    • タスクの取得時は、timeout が小さいものから優先的に取得し、timeout を 300 秒後に更新
    • タスクの実行中は、ハートビートとして、timeout を定期的に 300 秒後に更新
    • タスクの完了後は、created_at を NULL にして、一定時間保存するために timeout を 720 秒後に更新(タスクの重複を検出するため)
    • 前述の retention time を過ぎたら物理削除
  • タスクの取得、削除時の排他制御が大変

    • 排他処理のために、最初期は FOR UPDATE を使っていた
      • しかし、頻繁にデッドロックする
      • デッドロックを避けるには MySQL の気持ちになってクエリを書く必要がある
    • LOCK TABLES を使うと、テーブル全体をロックしてしまうので、SELECT にも影響
    • GET_LOCK が一番安全、でもクエリの書き方によってはデッドロックが発生した
      • ロックの delete クエリと acquire のクエリがバッティングしないように、タイミングの調整が必要(間隔を乱数で変える)
      • 調整しないと性能問題が起きた
      • さらに、ネットワーク遅延が発生すると影響大
        • GET_LOCK から RELEASE LOCK まで 3 RTT かかるので、影響大
    • queue テーブルに owner カラムを追加したうえで、FOR UPDATE を使うクエリに変更し、テーブルロックを不要にした
      • MySQL の気持ちになって書いたので安全
  • 結論

    • PerfectQueue はより完璧になった
  • Q) 何故 MySQL をキューに選んだのか?

    • A) 当時、Amazon RDS で PostgreSQL が使えなかった。RDBMS を選んだ理由は、Amazonで提供されている、フェイルオーバーがある、など。新しいジョブキューを開発したのは、ジョブキューにフェアスケジューリングの機能(Treasure Dataのサービスで必要)を付けたかったから。(古橋)

感想

Digdag での YAML の拡張(いや、標準の仕様に従っているので拡張と言うのは不適切か?)については、話としては面白かったんですが、そもそも YAML でプログラミングするのは辛そう、というのが第一の感想でした。Ansible もそんな感じのつらみがありますしね。AnsibleSpec みたいな、Digdag で書いたワークフローをテストするツールとかが、いずれ出てきたりするんでしょうか。

また、マークアップ言語として YAML にこだわる必要があるんだろうか、とも思ったのですが、じゃあ代替手段として何があるのか、と考えてみると、なかなか難しそうです。XML よりもマシな選択肢となると、いまは YAML なのかな……。HashiCorp の HCL のように、独自方式を作る方向もあったと思いますが、Digdag に独自方式を作るほどの要件はなかったんですかね。

手を動かす Spark MLlib & Word2Vec Part 2 (Wikipedia 英語版から Word2Vec モデルを作るまで)

Spark MLlib Machine Learning
f:id:muziyoshiz:20160626223709p:plain

このシリーズについて

実際に手を動かして Spark MLlib に慣れていこう、というシリーズです。

Spark を使うならそれなりに大きなデータを分散処理しないと面白くないと思い、Wikipedia のデータから Word2Vec のモデルを作るところまでやってみました。環境構築については Part 1 をご参照ください。

muziyoshiz.hatenablog.com

Part 2 の範囲

Wikipedia 英語版のデータから作成したコーパスを Amazon EC2 上の Spark Cluster で処理して、Word2Vec のモデルを作成するところまで。

Wikipedia 英語版のデータからコーパス作成

最終的にやったこと

Wikipedia:Database download から辿って、https://dumps.wikimedia.org/enwiki/ (HTTP) または Data dump torrents (BitTorrent) からダウンロードできます。

Wikipedia のデータは XML および SQL で公開されており、それぞれ色々ファイルがあります。今回は Wikipedia の本文からコーパスを作りたいので enwiki-latest-pages-articles.xml.bz2 をダウンロードしました。私がダウンロードした時の最新版は 2016-06-03 作成の 12.1 GB のファイルでした。

このファイルを解凍すると以下のような XML が入っています。<text> タブの中身が本文です。

  <page>
    <title>Anarchism</title>
    <ns>0</ns>
    <id>12</id>
    <revision>
      <id>721573764</id>
      <parentid>719202660</parentid>
      <timestamp>2016-05-22T19:25:12Z</timestamp>
      <contributor>
        <username>PBS-AWB</username>
        <id>11989454</id>
      </contributor>
      <comment>modification to template Cite SEP and possibly some gen fixes using [[Project:AWB|AWB]]</comment>
      <model>wikitext</model>
      <format>text/x-wiki</format>
      <text xml:space="preserve">{{Redirect2|Anarchist|Anarchists|the fictional character|Anarchist (comics)|other uses|Anarchists (disambiguation)}}
{{pp-move-indef}}
{{Use British English|date=January 2014}}
{{Anarchism sidebar}}
'''Anarchism''' is a [[political philosophy]] that advocates [[self-governance|self-governed]] societies based on voluntary institutions.
(中略)
[[Category:Far-left politics]]</text>
      <sha1>sfoc30irh5k1bj62ubt29wp1ygxark0</sha1>
    </revision>
  </page>

この XML から本文だけ取り出す方法を色々探してみたところ、wp2txt というツールがあったので、今回はこちらを使わせてもらいました。

github.com

wp2txt は Ruby で書かれており、関連する gem のインストールに苦労したという記事も見かけたのですが、以下の手順で問題なく動作しました。

% mkdir wp2txt
% cd wp2txt
% bundle init
% echo 'gem "wp2txt"' >> Gemfile
% bundle install
% mkdir output_dir
% bundle exec wp2txt -i enwiki-latest-pages-articles.xml.bz2 -o enwiki --no-heading --no-title --no-marker

今回は本文だけが必要なので、余分な出力をなるべく減らすためのオプション(--no-heading, --no-title, --no-marker)を付けました。それでも、MediaWiki のマークアップなどは残ってしまうのですが、それは Spark で除去することにします。

wp2txt の処理が終わると、enwiki ディレクトリに enwiki-latest-pages-articles.xml-<連番>.txt というファイルが作成されます。今回は合計 1,754 ファイル、18 GB でした。ちなみに、私の環境(MacBook Pro 2.2 GHz, 16GB DDR3)では、実行に19時間くらいかかりました。

つまづいたこと

Wikipedia のデータからコーパスを作る方法を色々探したのですが、英語圏も含めて、簡単にできる方法がなかなか見つかりませんでした。

wp2txt 以外で良さそうなものとしては wiki2vec というツールが、コーパスを作る機能を持っているようでした。ただ、時間の都合で今回は試せませんでした。ちなみに、この記事を書くために読み返していて気付きましたが、wiki2vec のパラメータとして minCount = 50, vectorSize = 500, windowSize = 10 という例が載っていました。次はこれで試してみるのもよいかもしれません。

github.com

その後、Word2Vec 関係の記事を探しまわるなかで、以下の記事から wp2txt の存在に気づき、今回は wp2txt を使うことにしました。

techblog.gmo-ap.jp

コーパスの、S3 へのアップロード

最終的にやったこと

Apache EC2 上に構築した Spark Cluster に Wikipedia のファイルを渡さなければいけないのですが、データサイズが大きいので、今回は S3 経由で渡しました。

Spark の textFile メソッドは gzip 圧縮されたファイルも読み込めるので、まずは先程のファイルを圧縮します。圧縮後のファイルは 6 GB になりました。

% gzip enwiki/*.txt

そして、これを S3 にアップロードします。aws s3 cp コマンドはワイルドカードが使えないので、一括アップロード時には、ディレクトリ名を指定して --recursive を指定する必要があります。

% aws s3 cp enwiki s3://my-bucket-name/ --recursive

aws コマンドがない場合は、Installing the AWS Command Line Interface - AWS Command Line Interface に従ってインストールしてください。Mac の場合は pip でインストールします。

つまづいたこと

最初は、以下のように1ファイルにまとめてアップロードしたのですが、これだとファイル読み込みが全く分散されませんでした。

% cat enwiki/*.txt > enwiki.txt
% gzip enwiki.txt
% aws s3 cp enwiki.txt.gz s3://my-bucket-name/

今回は wp2txt がファイルを複数に分割してくれていましたが、他の方法でコーパスを作った場合も、ファイルを分けてアップロードしたほうがいいですね。

spark-submit で使う jar の作成

最終的にやったこと

いままでは spark-shell で Word2Vec を実行していましたが、データ量が増えると実行時間が長くなって、EC2 インスタンスへの接続が切れてしまう可能性が出てきます。そのため、Word2Vec を実行する簡単な jar を作って、spark-submit で実行することにします。

以下がそのコードです。

Simple Word2Vec application

Feature Extraction and Transformation にあるサンプルに、以下の修正を加えています。

  • コマンドライン引数から、読み込むファイル、モデルの出力先、Word2Vecのパラメータを設定
  • repartition メソッドを使って、RDD を分割(これをしないと読み込み後の処理が分散されない)
  • split する際の区切り文字に、スペース以外の文字も含めることで、MediaWiki 記法の文字を除去
  • filter で長さが 1 の文字を除去
  • 処理の最後で、実行時間を出力

jar をビルドしたい場合は、以下のリポジトリを使ってください。

github.com

sbt assembly を実行すると target/scala-2.11 ディレクトリに word2vec-model-generator-assembly-0.0.1.jar ができます。この jar ファイルだけ master ノードに持っていけば実行できます。

つまづいたこと

最初は repartition メソッドを使わなかったのですが、その場合、ファイルの読み込みは分散しても、その後の処理が途中まで全く分散されませんでした。repartition メソッドに渡す引数は scala - Spark: Repartition strategy after reading text file - Stack Overflow を参考にしました。

MediaWiki記法を除去するルールは、ローカルマシンで enwiki-latest-pages-articles.xml-0001.txt を処理して手探りで決めたのですが、もっと良い方法がありそうです。例えば、先ほど紹介した wiki2vec は、うまく処理していそうです("Word2Vec Corpus" の節を参照)。

Amazon EC2 への Spark クラスタの構築(5台構成)

最終的にやったこと

Part 1 でもクラスタを構築しましたが、それは1回削除して、Slave の台数とスペックを増やしたクラスタを作り直しました。ちなみに、spark-ec2 destory spark-cluster でクラスタを削除できます。

大規模データで Spark MLlib を試すのは初めてなので、手間取っている間にマシンが無駄に動いている……という可能性があったので(というか実際そうなったので)、少しケチって以下のスペックで構築しました。

  • master: r3.large ($0.185/hour, 2 vCPU, 15 GB memory, 1 x 32 SSD) 1台
  • slave: m3.2xlarge ($0.616/hour, 8 vCPU, 30 GB memory, 2 x 80 SSD) 5台

構築時のコマンドは以下の通りです。

./spark-ec2 \
--key-pair=word2vec-key-pair \
--identity-file=/Users/myoshiz/.ssh/word2vec-key-pair.pem \
--region=us-west-1 \
--zone=us-west-1a \
--master-instance-type=r3.large \
--instance-type=m3.2xlarge \
--copy-aws-credentials \
--hadoop-major-version=yarn \
--slaves 5 \
launch spark-cluster > spark-submit.log 2>&1 &

構築が完了したら、前回同様に .bash_profile に環境変数 EC2_SPARK_MASTER を設定します。また、今回は環境変数 AWS_ACCESS_KEY_ID と AWS_SECRET_ACCESS_KEY も設定します。これは、Spark アプリケーションから S3 にアクセスするために必要な設定です。

export EC2_SPARK_MASTER=ec2-xxx-xxx-xxx-xxx.us-west-1.compute.amazonaws.com
export AWS_ACCESS_KEY_ID={{ IAM ユーザ "word2vec-user" のアクセスキーID }}
export AWS_SECRET_ACCESS_KEY={{ IAM ユーザ "word2vec-user" のシークレットアクセスキー}}

つまづいたこと

Part 1 でも書きましたが、--copy-aws-credential を指定しても、Spark に対してはアクセスキーID、シークレットアクセスキーが設定されません。環境変数を設定したところ、Spark(spark-shell, spark-submit)から S3 にアクセスできるようになったので、今回はこの方法で済ませました。

なお、AWS のインスタンスプロファイルを使ってアクセス権限を与えることもできると思いますが、今回は試していません。クラスタを作ったり壊したりを繰り返していたため、そのたびに AWS マネジメントコンソールをいじるのは面倒だったので……。

spark-submit の実行

最終的にやったこと

先ほど作った jar を、master にコピーします。

% scp -i ~/.ssh/word2vec-key-pair.pem word2vec-model-generator-assembly-0.0.1.jar root@${EC2_SPARK_MASTER}:/root/

そして、以下のようにコマンドを実行すると、Word2Vec アプリケーションが実行されます。ssh 接続が切れた場合のために、バックグラウンドで実行し、標準出力はファイルに出力させておきます。

$ ./spark/bin/spark-submit \
--master spark://${EC2_SPARK_MASTER}:7077 \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.driver.memory=11g \
--conf spark.akka.frameSize=1024 \
--class jp.muziyoshiz.word2vec.Word2VecModelGenerator \
word2vec-model-generator-assembly-0.0.1.jar \
s3n://my-bucket-name/enwiki/*.txt.gz \
s3n://my-bucket-name/enwiki_model \
10 20 50 > spark-submit-wp8.log 2>&1 &

Spark から S3 にアクセスする際は、aws コマンドで指定するときの s3:// ではなくて、s3n:// を指定する必要がある点に注意です。

Word2Vec のパラメータを変えて、何度か実行してみたところ、実行時間は以下のようになりました。

Pattern No. numPartition numIteration minCount vectorSize 実行時間
1 10 1 5 10 75.0 min
2 10 1 20 20 82.6 min
3 10 1 20 50 130.6 min

最初のパターン1は、vectorSize をかなり減らしたにも関わらず時間がかかり、後述するようにモデルの精度もよくありませんでした。

そのため、パターン2では minCount(単語の最小出現回数)を大きくして、vocabSize(モデルに含まれる単語数)を減らしました。その結果、vectorSize をパターン1の2倍にしたにも関わらず、実行時間は1.1倍程度に収まり、モデルの精度も若干上がりました。

最後に、他のパラメータは同じままで vectorSize のみ50に増やしたところ、vectorSizeはパターン2の2.5倍で、実行時間は1.5倍になりました。

実行時間の内訳を Spark UI で確認したところ、処理の最後に slave 1台で実行するタスク(Locality Level NODE_LOCAL のタスク)があり、これが1〜2時間かかっていました。このタスクがボトルネックになっているということは、少なくとも Word2Vec については、Spark MLlib による分散処理のメリットって、もしかしてあまり無いとか……?

ただ、今回は numIteration を 1 で固定にしましたが、精度を上げるためには numPartition と同じ 10 まで上げたほうがよいはずです。numIteration を増やせば、分散処理されるタスクが占める割合も増えるので、Spark の恩恵が得られるのではないかと思います。それはまた今度試してみます。

つまづいたこと(1):ドライバのメモリ使用量を増やさないと落ちる

最初に実行したところ、以下のエラーが発生してジョブが止まりました。

16/06/25 09:39:46 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() for one-way message.
java.lang.IllegalStateException: unread block data

Spark UI の Environment タブで、Spark Properties を確認したところ、spark.executor.memory は m3.2xlarge(メモリ 30GB)に合わせて 26837m に指定されていたのですが、spark.driver.memory のほうは何も指定されていませんでした。spark.driver.memory のデフォルトは 1g です。

これを r3.large のメモリ 15GB から 4GB を引いた 11GB(11g)に指定したところ、このエラーは出なくなりました。また、Environment タブで、spark.driver.memory が指定されていることを確認できました。

つまづいたこと(2):vocabSize*vectorSize が大きすぎると落ちる

numPartition numIteration minCount vectorSize
10 1 5 100

Spark MLlib の vectorSize のデフォルト値は 100 なので、最初はこの値を使っていました。しかし、上記のパラメータの組み合わせで実行したところ、Stage 1 の処理の途中で以下のエラーが出て、タスクが止まってしまいました。

Exception in thread "main" java.lang.RuntimeException: Please increase minCount or decrease vectorSize in Word2Vec to avoid an OOM. You are highly recommended to make your vocabSize*vectorSize, which is 3856720*100 for now, less than `Int.MaxValue/8`.
        at org.apache.spark.mllib.feature.Word2Vec.fit(Word2Vec.scala:319)
(スタックトレースは省略)

Int.MaxValue/8 = 268435455 です。つまり、vocabSize(単語数)*vectorSize がこの上限を大幅に超えていることが原因のようです。Int の最大値の8分の1ってなんでまた……。

とにかく、単語数を減らすか、vectorSize を減らす必要があることがわかりました。そのため、これ以降のテストでは vectorSize を減らすパターン(パターン1)と、minCount を増やして単語数を減らすパターン(パターン2〜3)を試しました。

つまづいたこと(3):モデルのサイズが大きすぎると akka のフレームサイズ上限を超えて落ちる

Word2Vec.fit() の最後、生成された Word2Vec モデルを parquet 形式で出力するところで、以下のエラーが出て落ちました。

16/06/25 11:28:06 ERROR InsertIntoHadoopFsRelation: Aborting job.
org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 1793:0 was 213710572 bytes, which exceeds max allowed: spark.akka.frameSize (134217728 bytes) - reserved (204800 bytes). Consider increasing spark.akka.frameSize or using broadcast variables for large values.

spark.akka.frameSize のデフォルトは 128(単位は MB)で、タスクのサイズがこれを超えると落ちるようです。小さいデータセットでは出なかったエラーなのですが、Wikipedia 規模になると出るようです。1〜2時間待ったあとで、最後の最後にこのエラーで落ちると、非常に(精神的にも金銭的にも)痛いです……。

設定可能な上限値は調べても分かりませんでしたが、ひとまず --conf spark.akka.frameSize=1024 を指定して 1GB にしたところ、Word2Vec モデルの出力まで成功しました。

ローカルマシン上での Word2Vec モデルの利用

最終的にやったこと

先ほどの spark-submit の実行により、Word2Vec モデルが S3 にアップロードされました。このモデルをローカルマシンにダウンロードして使ってみます。

% aws s3 cp s3://my-bucket-name/enwiki_model ./enwiki_model --recursive

以下のコマンドで spark-shell を起動します。Part 1 で使ったオプションに加えて、--conf spark.kryoserializer.buffer.max=1g を指定しています。

% spark-shell --master local \
--conf spark.driver.memory=5g \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.kryoserializer.buffer.max=1g

そして、spark-shell のプロンプトで以下を実行し、Word2Vec モデルをロードします。

import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}

val model = Word2VecModel.load(sc, "enwiki_model")

また、今回は Apache Spark 入門に書かれたメソッドを使って、関係性を加味した推測を行ってみます。有名な例で言うと、「"king" に対する "kings" は、"queen" に対する何か?」という関係を、Word2Vecモデルから推測することができます。

以下は、Apache Spark 入門の p.224 から抜粋したコードです。これを spark-shell に貼り付けるか、ファイルに書いておいて :load <file name> でロードして使います。

import org.apache.spark.mllib.linalg.Vectors

def relationWords(w1: String, w2: String, target: String, model: Word2VecModel) :Array[(String, Double)] = {
    val b = breeze.linalg.Vector(model.getVectors(w1))
    val a = breeze.linalg.Vector(model.getVectors(w2))
    val c = breeze.linalg.Vector(model.getVectors(target))
    val x = c + (a - b)
    model.findSynonyms(Vectors.dense(x.toArray.map(_.toDouble)), 10)
}

パターン1:minCount = 5, vectorSize = 10

ベクトル数が小さいためか、精度はかなり悪いです。Tokyoという都市名、Japanという国名に対する類似語を求めても、上位10件にそれらしいものが現れません。

scala> model.findSynonyms("Tokyo", 10).foreach(println)
(Rebounder,5.026514312082014)
(Fivepenny,5.006271473525809)
(Riviera,4.9806280562664655)
(Pirmahal,4.977896311409738)
(A2217,4.973896329049228)
(Pestújhely,4.967955406306887)
(Tri,4.966647609406325)
(Cigarros,4.966214313196464)
(Seahorses,4.9657892250050715)
(Club,4.965424934604451)

scala> model.findSynonyms("Japan", 10).foreach(println)
(Prabda,3.8591253451462766)
(Skateabout,3.789246081518729)
(detailslink,3.756286768742609)
(Oceania,3.7439580152901946)
(Daeges,3.743037606956309)
(Equestrianism,3.73990681262581)
(Miegs,3.7392088293670396)
(Fleuth,3.735308547592705)
(KBID-LP,3.730579527776324)
(Powerlifting,3.717090309581691)

関係性を加味した推測も、以下のようにうまく行きませんでした。

scala> relationWords("king", "kings", "queen", model).foreach(println)
(satsuma-biwa,4.95347264322314)
(shoguns,4.93869343414127)
(mystics,4.931215483461304)
(Zelimxan,4.925167012454619)
(Christianized,4.922235458369835)
(veneration,4.921893688910249)
(Shi’i,4.921205040607001)
(Russified,4.917586471812209)
(pagan,4.912822109308089)
(revered,4.911351827558269)

scala> relationWords("prince", "king", "princess", model).foreach(println)
(Pandava,4.2101984410814834)
(Aegisthus,4.207452272387961)
(bandit,4.202362575975742)
(amanuensis,4.194580140364399)
(Aerope,4.188601884423512)
(tradesman,4.178661804898081)
(Candaules,4.177194064593601)
(princess,4.173209621638307)
(Shoulang,4.165125455530385)
(Seibei,4.163678291883964)

パターン2:minCount = 20, vectorSize = 20

単語数を減らし、ベクトル数を上げた結果、精度が若干向上しました。Tokyo に対する類義語として、日本の都市の Osaka、Sapporo が出てくるようになりました。一方で、Japan に対する類義語のほうは、あまり改善が見られません。

scala> model.findSynonyms("Tokyo", 10).foreach(println)
(Wrestle,7.689458080058069)
(Split,7.626499879518354)
(Osaka,7.620597049534027)
(Sapporo,7.556529623946273)
(Setagaya,7.513748270603075)
(Hiroshima,7.490792005499523)
(Shinjuku,7.45951304352636)
(Kanazawa,7.459122453399323)
(Expo,7.453010168798164)
(ESCOM,7.447874763780933)

scala> model.findSynonyms("Japan", 10).foreach(println)
(Tokyo,5.679376270328159)
(AXN,5.640570343734289)
(Wrestle,5.60396135079362)
(Expo,5.590382781259281)
(TV2,5.522196857434101)
(Hanoi,5.495135749493573)
(TV6,5.490184062697079)
(Kyoto,5.486577183328772)
(Skate,5.4760554670281065)
(Benelux,5.430530293625971)

関係性を加味した推測は、まだあまりうまくいきません。ただ、後者のほうは5位に正解の "queen" が出ているので、若干精度が向上しています。

scala> relationWords("king", "kings", "queen", model).foreach(println)
(pagan,6.667731329068959)
(garb,6.659426546093454)
(gods,6.648366573398432)
(symbolised,6.648168276539841)
(sacred,6.6085783714277975)
(personages,6.598811565877372)
(veneration,6.597536687593547)
(puranas,6.590383098194837)
(deities,6.588936982768422)
(beauties,6.588806331810932)

scala> relationWords("prince", "king", "princess", model).foreach(println)
(lord,6.574825899196509)
(princess,6.522661208674787)
(bride,6.521167177599623)
(lady,6.492377997870626)
(queen,6.479450084505509)
(first-born,6.466189456944019)
(king,6.441766970616445)
(blessed,6.441764119985444)
(beloved,6.4396910737789606)
(bridegroom,6.423838321417851)

パターン3:minCount = 20, vectorSize = 50

ベクトル数を大幅に増やした結果、かなり精度が向上しました。Tokyo に対して日本の都市、Japan に対してアジアの国名や首都が表示されるようになりました。

scala> model.findSynonyms("Tokyo", 10).foreach(println)
(Osaka,6.442472711716078)
(Fukuoka,6.3918200759436)
(Saitama,6.343209033208874)
(Setagaya,6.237343626467007)
(Japan,6.063812875793321)
(Sapporo,6.027676167552773)
(Nagano,5.955215285602899)
(Kobe,5.891646194480255)
(Yamagata,5.86912171881318)
(Shibuya,5.835765966270005)

scala> model.findSynonyms("Japan", 10).foreach(println)
(Tokyo,5.510337298616405)
(Korea,5.509610108188756)
(China,5.486622516556292)
(Fukuoka,5.378651363703807)
(Taiwan,5.377869828524535)
(Seoul,5.321357314331263)
(Shizuoka,5.31678565272272)
(Prefecture,5.297746506109964)
(Hamamatsu,5.159312705112953)
(Kanagawa,5.157422752148916)

関係性を加味した推測も、かなり精度が向上しました。前者は正解の "queens" が2位、後者も正解の "queen" が2位に表示されています。イテレーション数(numIteration)を増やすなど、更にパラメータを調整すれば、これらの単語が1位に上がることが期待できます。

scala> relationWords("king", "kings", "queen", model).foreach(println)
(realms,6.380416904839411)
(queens,6.292521776188793)
(knightly,6.2558567330155626)
(consorts,6.241017073100756)
(kings,6.200374546691251)
(kindreds,6.17249501613232)
(lamas,6.1721177720161915)
(monuments,6.147651372785442)
(patrilineal,6.1288029631730545)
(depictions,6.121416883901753)

scala> relationWords("prince", "king", "princess", model).foreach(println)
(princess,5.956775488416378)
(queen,5.9055082324742685)
(slaying,5.793197818446893)
(king’s,5.696965618712307)
(betrothal,5.59067630474941)
(goddess,5.58159904439838)
(apparition,5.554027664552106)
(martyrdom,5.534826668619817)
(Pelops,5.503355785910461)
(ancestress,5.4953139994512545)

つまづいたこと

spark-shell の起動時に --conf spark.kryoserializer.buffer.max=1g を指定しないと、Word2VecModel.load() の呼び出しで落ちました。

16/06/25 22:00:26 WARN TaskSetManager: Lost task 1.0 in stage 2.0 (TID 3, localhost): TaskKilled (killed intentionally)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, localhost): org.apache.spark.SparkException: Kryo serialization failed: Buffer overflow. Available: 1, required: 4. To avoid this, increase spark.kryoserializer.buffer.max value.
    at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:299)
(スタックトレースは省略)

あと、余談ですが、zsh を使っていると --master local[*] の指定ができないようです。以下のようなエラーが出ます。bash なら指定できるので、spark-shell の実行時だけ bash に切り替えました。

% spark-shell --master local[*] \
--conf spark.driver.memory=5g \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.kryoserializer.buffer.max=1g
zsh: no matches found: local[*]

Part 2 のまとめ

Wikipedia の大規模データを、Amazon EC2 上に構築した Spark Cluster で処理できました。また、実際にいろいろなパラメータを試してみて、大規模データを処理する際に注意すべき Spark のプロパティや、Word2Vec のパラメータを把握することができました。

今回生成した Word2Vec のモデルはまだあまり精度が良くありませんが、以下の方針で精度を上げることができそうです。

  • minCount を大きくして、単語数(vocabSize)を小さくする
  • vectorSize を増やす
  • numIteration を numPartition に近づける
  • 元データからノイズ(MediaWiki記法など)を除去する方法を改善する

実際に試してみて、Word2Vec の場合は slave 1台で実行される処理がボトルネックになっていることがわかりました。Amazon EC2 の高いマシンを借りているのに、slave 1台だけが頑張っていて、残りの4台は遊んでいる、というのはなかなか焦ります。

Slave 1台しか動かない時間を短くする方法としては、vocabSize*vectorSize を小さくするしかないのですかね? 実際のアプリで Word2Vec を使う際には、Spark MLlib での処理の前に、そのアプリで使わない単語を除去してしまって単語数を大幅に減らしておく、などの対策が必要かもしれません。

Part 2 の主な参考文献

手を動かす Spark MLlib & Word2Vec Part 1 (spark-ec2 でクラスタを構築するまで)

Spark MLlib Machine Learning
f:id:muziyoshiz:20160626223709p:plain

このシリーズについて

機械学習系のツールを全然使ったことがなかったので、勉強のためになにか1つ選んで、実際に手を動かしてみることにしました。マシンを並べて負荷分散することを想定して、まずは Spark MLlib を選びました。

このシリーズでは、Amazon EC2 上に構築した Spark Cluster (Standalone Mode) で、Wikipedia のデータから Word2Vec のモデルを作るところまでの方法を解説していきます。ただ、実際やってみてわかったのですが、Spark 自体、Spark MLlib の Word2Vec クラス、およびクラスタ構築に使った spark-ec2 に設定項目が多いせいで、細かいところで何度も何度もつまづきました……。

そのため、このシリーズでは各ステップについて、「最終的にやったこと」と、その最終的なやり方にたどり着くまでに「つまづいたこと」を分けました。やり方を知りたいだけの場合は「最終的にやったこと」の方だけ読んでください。「つまづいたこと」は、うまく行かなかった場合のための参考情報です。

Part 1 の範囲

Amazon EC2 に master 1台、slave 3台構成の Spark Cluster (Standalone mode) を構築し、spark-shell から Word2Vec を実行するところまで。

Spark をローカル環境(Mac)にインストールする

最終的にやったこと

まず、ローカル環境で Spark MLlib が動くかどうかを試してみました。環境は以下の通りです。

  • MacBook Pro (Retina, 15-inch, Mid 2014)
  • OS: OS X Yosemite 10.10.5
  • CPU: 2.2 GHz Intel Core i7
  • メモリ: 16GB 1600 MHz DDR3

OS X に Spark をインストールする場合、以下のコマンドだけでインストールできます(参考:ApacheSpark — BrewFormulas)。

% brew update
% brew install apache-spark

私が試した時点では Spark 1.6.1 でした。Java は Java 8 です。

% spark-shell --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.1
      /_/

Type --help for more information.

つまづいたこと

spark-shell ローカルモードで(--master local を指定して)実行すると、spark> というプロンプトが表示されるまでに、色々と WARN が出ます。

16/06/07 00:17:36 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)

BoneCP は JDBC Connection Pool ライブラリの名前です。scala - What do WARN messages mean when starting spark-shell? - Stack Overflow によると、ローカルモードで実行しているときは問題ないとのこと。

16/06/07 00:17:38 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
16/06/07 00:17:38 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException

こちらも、Hive metastore に接続できないことを表す WARN なので、ローカルモードでは関係ないと判断しました。

ローカル環境での Word2Vec の実行

最終的にやったこと

Spark MLlib のページ(Feature Extraction and Transformation)に、Spark MLlib に含まれる Word2Vec クラスを使ったサンプルコードがあります。これをローカルモードで実行してみます。

まず、サンプルコードで使っている text8.zip をダウンロードして、解凍します。これは、スペースで区切られた英単語が羅列された(意味のある文章ではない)100 MB のテキストファイルです。

% wget http://mattmahoney.net/dc/text8.zip
% unzip text8.zip
% ls -la text8
-rw-r--r--@ 1 myoshiz  staff  100000000  6  9  2006 text8

この text8 を置いたディレクトリで、以下のコマンドを実行します。spark.driver.memory はドライバのメモリ使用量を表すオプションで、デフォルトは 1g (1GB)です。

% spark-shell --master local \
--conf spark.driver.memory=5g \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer

spark-shell のプロンプトで、Word2Vec のサンプルコード を入力すれば、myModelPath ディレクトリ以下に、Word2Vec のモデルデータが生成されます。

なお、spark-shell の起動中は http://localhost:4040/ にアクセスすることで、ジョブの状態を確認できます。

つまづいたこと

最初は --conf spark.driver.memory=5g" を指定せずに spark-shell を起動していました。その状態でword2vec.fit(input)` を実行すると、OutOfMemoryError で spark-shell が落ちます。私の環境では、ファイルが 100MB だと落ちて、80MB まで減らすと落ちない、という状態でした。

scala> val model = word2vec.fit(input)
[Stage 0:>                                                          (0 + 1) / 3]
Exception in thread "refresh progress" java.lang.OutOfMemoryError: GC overhead limit exceeded
    at scala.StringContext.s(StringContext.scala:90)
(スタックトレース、および後続のエラーは省略)

エラーメッセージをもとに調べたところ、JVM の設定が悪いような情報をいくつか見かけました。

しかし、これを指定してもエラーメッセージは変わりませんでした。というか、私は Java 8 で実行していたので、そもそもこの設定には意味がありませんでした。

この Java の仕様変更を踏まえて、以下のように spark-shell を実行したところ、落ちなくなりました。ただし、この方法だと、OutOfMemoryError が出ないだけで、いつまでも処理が終わらないという状態になってしまいました……。

% SPARK_REPL_OPTS="-XX:MaxMetaspaceSize=1024m" spark-shell --master local

結局、Configuration に載っているメモリ関係のパラメータを一通り確認して、前述の spark.driver.memory を増やしたところ、うまく動いたようで、処理が完了しました。JVM のパラメータを変更する必要はなかったようです。

Amazon EC2 への Spark クラスタの構築(spark-ec2 を使った方法)

最終的にやったこと

Slave の台数を増やすことで、Spark MLlib の実行時間が短くなることを確認するために、Spark クラスタを構築しました。今回は Spark に同梱されている spark-ec2 というスクリプトを使って構築しました。このスクリプトの説明は Running Spark on EC2 - Spark 1.6.1 Documentation にあります。

Amazon Elastic MapReduce (EMR) で Spark を使えることは知っていますが、いずれオンプレに Spark クラスタを構築したかったのと、かといってマシンスペックを何パターンか試すときに手作業での構築は大変すぎたので spark-ec2 を使いました。

まず、AWS のマネジメントコンソールを使って、以下の設定を行います。今回は Spark の話がメインなので、AWS の設定の詳細は省略します。

  • IAM ユーザ "word2vec-user" の作成
  • IAM ユーザ "word2vec-user" に対する "AdministratorAccess" ポリシーのアタッチ(EC2 と S3 に絞っても良い)
  • EC2 でのキーペア "word2vec-key-pair" の作成
  • ローカルマシンに対する環境変数 AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY の設定
  • ローカルマシンに対するキーペアの配置(以下では /Users/myoshiz/.ssh/word2vec-key-pair.pem に置いたと仮定)

brew で Spark をインストールすると、spark-ec2 は入っていません。そのため、spark-ec2 を使うために、Apache Spark のダウンロードページ から zip ファイルをダウンロードします。今回は、以下のファイルを選択しました。

  • Spark release: 1.6.1
  • Package type: Pre-built for Hadoop 2.6 and later

ダウンロードした zip ファイルを解凍すると、ec2 ディレクトリに spark-ec2 というスクリプトが入っています。このディレクトリに移動し、まずは master 1台、slave 3台のクラスタを構築してみます。そのためには、以下のコマンドを実行します。

% ./spark-ec2 \
--key-pair=word2vec-key-pair \
--identity-file=/Users/myoshiz/.ssh/word2vec-key-pair.pem \
--region=us-west-1 \
--zone=us-west-1a \
--instance-type=m4.large \
--copy-aws-credentials \
--hadoop-major-version=yarn \
--slaves 3 \
launch spark-cluster

各オプションの意味と、上記の値を指定した理由は以下の通りです。

  • --region は、デフォルトはバージニア北部(us-east-1)が使われる。国内だとインスタンス利用費が若干高く、東海岸は遠いので、北カリフォルニア(us-west-1)を指定した。
  • --instance-type は、デフォルトでは m1.large が使われる。m1.large は古いインスタンスタイプのため、スペックに比して割高のため、同じく 2 vCPU、メモリ8GBの m4.large を指定。調べた時点では m1.large が $0.19/hour、m4.large が $0.14/hour だった。
  • --copy-aws-credentials を指定すると、環境変数に設定された AWS のアクセスキーが、master の hadoop にも設定される。ただし、後述の通り Spark に対しては設定されないので、hadoop コマンドを使わないなら、指定しなくても良い。
  • --hadoop-major-version=yarn は、使用する Hadoop のバージョンを指定する。今回は Pre-built for Hadoop 2.6 and later をダウンロードしているので、yarn を指定する必要がある。デフォルトは 1(Hadoop 1.0.4)。
  • --slaves は slave の台数を指定する。

10〜20分待つとクラスタの構築が完了し、以下のようなメッセージが表示されます。Mac から以下の URL にアクセスすると、Spark UI や、Ganglia の画面を確認できます。

Spark standalone cluster started at http://ec2-xxx-xxx-xxx-xxx.us-west-1.compute.amazonaws.com:8080
Ganglia started at http://ec2-xxx-xxx-xxx-xxx.us-west-1.compute.amazonaws.com:5080/ganglia
Done!

上記のホスト名は、以降の作業でも使うので、以下のように環境変数に設定しておきます。シェルの設定ファイル(.bash_profile とか)で指定してもいいですが、クラスタを作るたびにホスト名が変わる点だけは注意が必要です。

% export EC2_SPARK_MASTER=ec2-xxx-xxx-xxx-xxx.us-west-1.compute.amazonaws.com

つまづいたこと(1):GitHub の spark-ec2

brew で spark をインストールすると、そのなかには spark-ec2 が入っていません。そのため、このスクリプトだけ別に入手できないかと思い、GitHub で公開されている spark-ec2 を clone して実行してみました。

github.com

この spark-ec2 を実行すると、エラーも出ずに最後まで処理が進むのですが、Spark クラスタが起動しないようです。http://ec2-xxx-xxx-xxx-xxx.us-west-1.compute.amazonaws.com:8080 にアクセスしても応答がなく、spark-shell で --master spark://ec2-xxx-xxx-xxx-xxx.us-west-1.compute.amazonaws.com:7077 を指定しても接続できない、という状態になりました。

色々悩んだ結果、大人しく Apache Spark のダウンロードページ から zip ファイルをダウンロードして、そのなかの spark-ec2 を使ったところ、実行したコマンドの引数は同じにも関わらず、クラスタが起動しました。

GitHub 版も見た目はきちんと動いているように見えたために、他に原因があると思い込んでしまい、この問題で数日詰まってしまいました……。

つまづいたこと(2):--hadoop-major-version=yarn の指定

このオプションは、公式サイトの Running Spark on EC2 には書かれていません。しかし spark-ec2 --help を実行すると、以下のオプションが表示されます。

  --hadoop-major-version=HADOOP_MAJOR_VERSION
                        Major version of Hadoop. Valid options are 1 (Hadoop
                        1.0.4), 2 (CDH 4.2.0), yarn (Hadoop 2.4.0) (default:
                        1)

上記のオプションを指定しないと、クラスタの構築後に spark-shell を実行した時に、以下のようなエラーが出て sqlContext の初期化に失敗しました。

16/06/13 14:08:02 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MResourceUri" is tagged as "embedded-only" so does not have its own datastore table.
java.lang.RuntimeException: java.io.IOException: Filesystem closed
    at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522)
(中略)
<console>:16: error: not found: value sqlContext
         import sqlContext.implicits._
                ^
<console>:16: error: not found: value sqlContext
         import sqlContext.sql
                ^

Spark クラスタでの Word2Vec の実行

最終的にやったこと

spark-ec2 の login コマンドを使用すると、master にログインできます。もちろん ssh でもログインできますが、master のホスト名を書かなくてよいのがメリットだと思います。ちなみに、オプションの指定が面倒ですが、以下の3つは必須のようです。

% ./spark-ec2 \
--key-pair=word2vec-key-pair \
--identity-file=/Users/myoshiz/.ssh/word2vec-key-pair.pem \
--region=us-west-1 \
login spark-cluster

次に、先ほどと同じサンプルコードを実行するために、text8.zip をダウンロードします。また、このファイルを、クラスタ上で動作する HDFS にアップロードします。これは、ファイルを slave からアクセス可能にするための作業です。後ほど、HDFS の代わりに S3 を使う方法も紹介します。

$ wget http://mattmahoney.net/dc/text8.zip
$ unzip text8.zip
$ ./ephemeral-hdfs/bin/hadoop fs -put text8 /

ここまでの準備が終わったら、master 上で spark-shell を実行します。指定するオプションは以下の通りです。ローカルモードの場合とは、--master の指定が変わっています。

$ ./spark/bin/spark-shell \
--master spark://${EC2_SPARK_MASTER}:7077 \
--conf spark.driver.memory=5g \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer

ホスト名の指定が面倒ですが、--master spark://localhost:7077 という指定では接続できませんでした。

あとは、spark-shell で以下のように入力すると、Word2Vec が実行されます。ローカルモードとの違いは、textFile() や save() に渡されたファイルパスが、HDFS のファイルパスとして扱われることです。今回はルート直下に text8 を置いたため、/text8 のように指定しています。

import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}

val input = sc.textFile("/text8").map(line => line.split(" ").toSeq)

val word2vec = new Word2Vec()

val model = word2vec.fit(input)

val synonyms = model.findSynonyms("china", 40)

for((synonym, cosineSimilarity) <- synonyms) {
  println(s"$synonym $cosineSimilarity")
}

// Save and load model
model.save(sc, "/model_text8")
val sameModel = Word2VecModel.load(sc, "/model_text8")

以上により、Word2Vec のジョブが slave 上で実行されます。ただ、http://ec2-xxx-xxx-xxx-xxx.us-west-1.compute.amazonaws.com:8080 にアクセスするとわかるのですが、このままだと3台ある slave のうち、1台しか使われません。次は、ジョブを分散するために、Word2Vec のパラメータを変更します。

つまづいたこと

最初、ローカルディスクのファイルにアクセスできないことに気づきませんでした。file:// を付けても駄目でした。

次に、HDFS 上にファイルをアップロードする方法で悩んだのですが、これは ./ephemeral-hdfs/bin 以下のコマンドが使えることに気付いたあとは簡単でした。hadoop コマンドに馴染みのない人は、Apache Hadoop 2.7.1 – などが参考になると思います。

すべての slave に処理が分散されることの確認(Word2Vec のパラメータ変更)

最終的にやったこと

Word2Vec のパラメータは、Word2Vec クラスの setter で指定できます。用意された setter とそのデフォルト値は Word2Vec の API リファレンス に記載されています。

これらの setter のうち、setNumPartitions() でパーティション数を1よりも大きくすると、複数の slave 間で処理が分散されます。この値のデフォルトが1なので、そのままでは slave が1台しか使われません。

val word2vec = new Word2Vec()

// Set this
word2vec.setNumPartitions(4)

val model = word2vec.fit(input)

slave 3台で試したところ、パーティション数を4まで増やした段階で、すべてのslaveに処理が分散されました。ただ、5台で試したときには、パーティション数を6にしても、slave 4台しか使われませんでした。単純に slave の台数 + 1 にすればよいというわけではなさそうで、詳細はまだわかりませんが、少なくとも slave の台数よりも大きい数を指定する必要がありそうです。

ただ、このパーティション数を増やすと、増やした分だけ負荷分散されて処理時間が短くなっていく一方で、計算結果の正確さも落ちていくとのことです。Word2Vec の処理が分散しない理由を調べている際に、以下の情報を見かけました。

stackoverflow.com

  • イテレーションの数は、パーティション数と同じか、それ以下にすべき
  • 正確さのために、パーティション数は小さい値を使うべき
  • 結果(モデル)を正確にするためには、複数のイテレーションが必要

どれくらい結果が変わっていくのか、text8 を3台の slave 上で処理して調べてみました。以下は、numPartition = 1, 3, 6 での、"china" に類似した単語の上位10件です。パーティションが1個の場合の上位3件を太字にしています。text8 は意味のない文字列ですが、結果が変わっていく様子は参考になると思います。

numPartitions 1 3 6
1位 taiwan taiwan indonesia
2位 korea korea taiwan
3位 japan japan afghanistan
4位 mongolia mainland kazakhstan
5位 shanghai indonesia pakistan
6位 tibet india japan
7位 republic pakistan ireland
8位 india mongolia india
9位 manchuria thailand uzbekistan
10位 thailand africa iran

ちなみに、使われた slave の台数と、処理時間の関係は以下のようになりました。text8 くらいのデータ量(100 MB)だと、おおよそ、使われる slave の台数に応じて大きく処理時間が減るようです。

numPartitions 1 3 6
slave の台数 1 2 3
処理時間 6.3 min 3.6 min 1.9 min

つまづいたこと

負荷分散しない理由が Word2Vec のパラメータの方にある、ということが最初なかなか分からずに苦労しました。

普通に考えると「Spark MLlib から Word2Vec を使いたい人=負荷分散を期待している人」だから、Word2Vec のパラメータの初期値は負荷分散するようになっているはずだ(だから Spark のパラメータの方に問題があるはずだ)と思い込んでいました……。

Part 1 のまとめ

ここまでの手順で、Amazon EC2 上に Spark クラスタを構築する方法を確認できました。

次の Part 2 では、この Spark クラスタの slave の台数およびマシンスペックを強化し、処理するデータ量も Wikipedia 英語版(Gzip 圧縮した状態で 12 GB)まで増やしてみます。

Part 1 の主な参考文献

Habitat を触っていて気になった、細かいことあれこれ

Habitat Chef
f:id:muziyoshiz:20160617201522p:plain

Habitat について知りたい方は、まずは私がエンジニアブログに書いた Habitat の概要説明をご覧ください。自作のイメージ図を使って、Habitat のわかりにくい独自用語を解説しています。

recruit.gmo.jp

で、上記の記事を書いた時に、あまりにも細かすぎるので省いた話題がいくつかありました。放っておくと忘れそうなので、今回はその細かいことあれこれをご紹介します。

"Habitat" の意味

  • Habitat とは、居住環境、居住地、生息地、などの意味を持つ英単語です。
  • これを書いている時点で "Habitat" でググったところ 約 146,000,000 件ヒットしました。Chef といい、この会社はどうしてこう、検索しにくい名前をツールに付けてしまうのか……。

Habitat のバージョン番号

  • これを書いている時点で hab -V を実行したら hab 0.7.0/20160614231131 と出てきました。
  • 0.1 でも 1.0 でもない、これまた微妙なところを……。開発陣としては、現時点の実装をどれくらいの完成度だと思っているんでしょう? もう実サービスで使えるレベル? 少なくとも、ドキュメントのなかに「まだ production に使うな」というありがちな文章は見当たりませんでした。

Habitat の推奨環境

  • Habitat が推奨する、あるいは Habitat 開発者が最初にテストしている Linux ディストリビューションって何なんでしょう?
  • Habitat ファーストインプレッション にも書いた通り、現時点では Linux でしか Supervisor は動作しません。とはいえ、私が VirtualBox & CentOS 7 で試したところ、それでもチュートリアル通りには動きませんでした。
  • habitat-sh/habitat: Modern applications with built-in automation のトップディレクトリにある Dockerfile が FROM ubuntu:xenial で始まってるので、Ubuntu の可能性大。次に試すときは Ubuntu でやります。

Package と Artifact

  • ドキュメントを読んでいると、Package と同じ概念を Artifact と呼んでいる箇所がいくつかありました。Artifact という用語も使われているのか、この用語はもう廃止されたけどドキュメントの一部に残っているだけなのか?
  • そういえば、Package ファイルの拡張子の hart って、Habitat Artifact の略称なんですかね。

Depot への Package のアップロード

  • Habitat CLI reference の目次には "hab pkg upload" のリンクが載っているんですが、このリンクをクリックした先の説明はありませんでした。
  • hap pkg upload --help を実行すると、hab pkg upload [FLAGS] [OPTIONS] <HART_FILE>... と出てきます。なんだあるんじゃーんと思ってコマンドを叩いたら、チュートリアルで作った muziyoshiz/mytutorialapp を見事アップロードできました(アップロード先)。
  • で、アップロードはできたんですが、Habitat Web って、パッケージの削除機能がまだ無いみたいです。もしかして、これが CLI reference からコマンドが消されている理由では。削除機能が追加されたら消しますごめんなさい……。

Habitat の P2P ネットワークと Topology

  • Habitat は Supervisor 同士で P2P ネットワークを構築します。このネットワークのことを Ring あるいは Supervisor Ring と呼ぶようです。しかし、Supervisor Internals によると、このネットワークは SWIM (Scalable Weakly-consistent Infection-style process group Membership protocol) で構成されるとのこと。リングネットワークを組んでいるわけでもないのに Ring というのはモヤモヤします。
  • 一方、Habitat には Topology という用語があり、Supervisor 間の論理的な関係を定義できます。Running packages in topologies によると、現時点では standalone, leader-follower, initializer の3種類から選べるようです。Topology という単語が Network Topology を指しているわけではない、というのもなんだかモヤモヤします。

設定ファイルの一部で TOML 形式を採用

  • Habitat は設定ファイルを Handlebars 形式で書くことができて、この Handlebars に渡す変数を TOML 形式で定義できます。
  • TOML 形式って初耳だったので調べてみたところ、toml/toml-v0.4.0.md に仕様がありました。Tom's Obvious, Minimal Language の略で TOML なんですね。僕も便乗して YOML とか作ってやろうか(紛らわしすぎる)。
  • ちなみにこの仕様、有志による日本語訳が toml/toml-v0.4.0.md にて公開されていました。
  • HashiCorp にも HCL とかありますし、運用管理ツールを開発していると Yet Another な YAML が欲しくなるものなんでしょうか。

Habitat と Google Analytics

  • hab setup を実行してセットアップすると、その最後に、利用データを Habitat の Google Analytics アカウントにアップロードしてよいかと尋ねられます。No を選択すると ~/.hab/cache/analytics/OPTED_OUT に空のファイルが作られて、それきり何も質問されません。
  • ツールの利用状況を収集したい気持ちはよくわかります。でもまあ、No を選択しますよね。そういえば、最近 Google Analytics を後から導入して揉めたソフトがなにかあった気がしますけど、何でしたっけ?

とりとめもなくなってきたので、今日はこのへんで。

Fluentd Meetup 2016 Summer レポート 〜 v0.14 の新機能からプラグイン開発者向け API まで

Fluentd Event Report
f:id:muziyoshiz:20160602000413p:plain
  • イベント名:Fluentd Meetup 2016 Summer
  • 開催日時:2016-06-01(月)
  • 会場:イベント&コミュニティスペース dots.

約1年ぶりに開催された Fluentd Meetup に参加してきました。今回は、5月31日にリリースされたメジャーバージョンアップの v0.14 について、ユーザ向けの機能紹介から、プラグイン開発者向けの深い話まで、盛りだくさんの内容でした。自分でプラグインを書くくらい、Fluentd をヘビーに使う人向けのイベントという感じで、どの話も面白かったです。

最近、私は Fluentd を使う機会が全然なかったこともあって、「Fluentd も機能的には枯れてきて、そろそろ新機能もあまりないだろう」と思っていたのですが、まだこんなに改善の余地があったのか……とちょっと驚きました。個人的には、古橋さんの講演で将来の構想として出てきた、Kafka っぽい PubSub は便利そうなので是非実装してほしいです。

以下、講演内容のメモです。プレゼンに書かれてなくて口頭で説明されていたことや、個人的に気になったことを中心にメモしています。

講演内容

Past & Future of Fluentd (古橋 貞之, @frsyuki)

  • Fluentdの歴史

    • 2011年6月に最初のバージョンをリリースした。今年の6月で5年。ここまで続くと、かなり成功したOSSと言っていいのではないか。
    • 新メジャーバージョンの 0.14 が昨日リリースされた。
  • 2年前にリリースされたv0.12の機能の振り返り

    • タグの書き換えを不要にする label と filter
  • 将来の構想

    • Advanced back pressure - 転送先が詰まるのを防ぐ
    • PubSub & pull forward - Kafkaに近い発想。ログが必要なアプリが取りに行く
    • Compressed buffer
    • Distributed buffer - twitterがリリースした distributedlog や、Apache BookKeeper のような機能を Fluentd の buffer のところに入れる
    • Service plugins - パッケージ化されたソフト(例えば Norikra)をFluentdで動作させるようにする。service label を書くだけで起動。インストールが簡単になる
    • Schema-validated logging - バリデートされたログだけを転送することで、無駄な転送や、データベース書き込み時のエラーを回避する
    • Centralized config manager - Chef や Ansible を使うことも考えられるが、Dockerで起動したインスタンスのなかの設定に Chef などを使いたくない。これに fluentd-ui が組み合わさると嬉しい
  • Fluentdのロゴが変わる予定

v0.14 Overview (中川 真宏, @repeatedly)

  • New Plugin APIs

    • v0.12向けに作ったプラグインは、互換性レイヤがあるので、ほぼ動くはず
    • インスタンス変数に直接アクセスしているようなプラグインは、動かない可能性がある
    • 次の v1 でも互換性レイヤは残す予定
  • buffer の設計の大幅な見直し

    • v0.12 buffer design: chunkの保存方法。ElasticSearch に書き込むときに、インデックスの値を見て書き込み先を変える、といった場合に問題になった。リトライがうまくいかない。
    • v0.14で、chunkの管理が柔軟になり、このような問題は解決できる
  • Plugin Storage & Helpers

    • プラグイン内で使えるストレージレイヤ(KVS)の導入
    • プラグインの作成時によく使う雛形コードをhelperで提供
  • Time with nanosecond

    • ミリ秒とナノ秒のどちらにするか、という議論があったが、ナノ秒でログを出すシステムが増えてきたのでナノ秒
    • 内部的には Fluent::EventTime という形で持つ。既存のプラグインに対しては Integer っぽく振る舞うクラス。
  • ServerEngine based Supervisor

    • プロセスモデルを ServerEngine に置き換える
  • Windows support

    • Linux 向けに特化したコードを、Windows で動くように直したりした
    • HTTP RPC でバッファのフラッシュなどを行う
  • v0.14.x - v1

    • https://github.com/fluent/fluentd/issues/1000 に並んでいる
    • v1 を出す。v1 で新機能を入れることはない。v1 は今まで入れたAPIをFixする、と宣言するためのバージョン。フィードバックは v0.14 のうちにしてほしい
    • v1 は 2016 4Q か 2017 1Q に出る
    • いくつか v1 で消したい機能はある
  • v0.14 系でこれから入れる予定の目玉機能

    • Symmetric multi-core processing: パフォーマンス向上。TCPポートをWorker間で共用し、Workerで処理を分ける。ServerEngine を使って実現する
    • Counter API: Workerが分かれるので、カウンタを作りづらくなる点をカバー
    • secure-forward をコアに入れる。forward と secure-forward の細かい違いをなくす
  • ベンチマーク結果

    • CPU使用率が、v0.14 で若干上がっている
    • EC2で、100,000msgs/sec での測定結果によると、CPU使用率が4〜5%上がった
    • v0.14はリリースしたばかりで、性能改善できる余地が残っている。これから改善する予定
  • td-agent 3

    • fluentd v0.14 は td-agent 3 から採用(現行の td-agent 2 系には入らない)
    • Ruby 2.3 に上げる
    • Windows 向けパッケージも提供予定
    • リリース日は未定(v0.14 がまだ stable と言えないので)
  • Q) chunk から取得したログの一部で処理が失敗した場合、リトライしたいと思うが、その場合 chunk の詰め直しはできるか?

    • A) できない。その場合は、エラーストリームという機能を使ってほしい

Fluentd ServerEngine Integration & Windows Support (成田 律太, @naritta)

  • 自己紹介

    • 2015年の Treasure Data のインターンシップに参加し、今年入社した
  • ServerEngine

    • Unicorn と大体同じようなもの
    • Supervisor, server, worker の構成を簡単に作れるようにするためのフレームワーク
    • Worker Module, Server Module を Ruby で書く
    • Worker type: thread, process, spawn
    • v0.14 では spawn worker type を利用している。Windows に fork の機能がないため
  • Fluentd で ServerEngine を採用したことによるメリット

    • auto restart
      • Linux でも Windows でも、fluentd のプロセス(worker)をKILLすると、自動的にリスタートするのを確認できる
    • live restart
      • 設定変更したあとでシグナル(または HTTP RPC)を送ると、worker だけをリスタートできる
      • server が TCP 接続を持っているので、ログは欠損しない
    • socket manager
      • server 側で listen して、socket を worker に共有する
      • 将来的にはマルチコアでソケットを共有するために ServerEngine を使う(いまはマルチコアで動かすには in_multiprocess plugin を使う必要がある)
    • signal handler
      • signal handler と log rotation は、Fluentd ユーザにとってのメリットというより、内部実装に関するメリット
      • シグナルをキューに溜めて、シグナル同士の競合を防ぐ
    • log rotation
      • ruby core にすでにポーティングされている機能
      • マルチプロセスでのログローテーションなど
  • Q) ServerEngine が入ることで、設定ファイルが増える?

    • A) Fluentd が内部的に ServerEngine の設定ファイルを作って、ServerEngine に渡す。ユーザ側は Fluentd/td-agent の設定ファイルだけを編集すれば良い

Fluentd v0.14 Plugin API Updates (田籠 聡, @tagomoris)

  • 何故新しい API セットを作ったか?

    • いままでのプラグインには、開発者向けドキュメントがなかった。みんな他の人の書いたプラグインの真似をして書いている
    • プラグイン内で独自にスレッドを起動するようなことが必要で、そのせいでテストしづらい
    • あるプラグインで使えるパラメータが、他のプラグインでも使えるのかどうかがわかりづらい
    • 割りとコアな処理をプラグイン側で上書きしてしまっていることがよくある
    • などなど
    • 上記のような問題を解決したい。また、便利な機能も一緒に提供することで、移行を進めたい
  • Compatibility of plugins

    • v0.12のプラグインは Fluent::Input などのクラスのサブクラス
    • compatibility layer: Fluent::Compat::Klass で、v0.12 にしかないメソッドを提供するなどして、互換性を維持
  • Compatibility of configurations

    • v0.12 形式のパラメータを v0.14 形式に自動変換する helper を提供する。plugin 開発者にこれを使ってもらう必要がある
    • v0.14 API を使ったプラグインを、v0.12 で動かすことはできない(ようにした)。gemspec に依存関係を書いてもらう必要がある
  • v0.14 プラグインクラスの書き方

    • すべてのクラスを Fluent::Plugin モジュールの下に置く(いままでは Fluent 直下だった)
    • 親クラスのメソッド(#configure とか)をオーバライドするときは必ず super を呼ぶ
    • super を呼んでない場合は、互換性レイヤが代わりに super を呼んで、WARNING を出す
    • 親クラス(Fluent::Plugin::Input とか)の階層を整理
  • Fluent::Plugin::Output

    • Output Plugin は物凄くいろいろ変わった。v0.12 まではやりたいことによって異なるクラスを使い分ける必要があったが、1個の Output クラスに統合された
    • chunk を分ける条件を細かく指定できるようになった。いままではサイズのみの chunking だったが、例えば時間でも chunking できるし、時間とサイズの組合せでもできる
    • Non-buffered, buffered synchronous, buffered asynchronous のいずれか1個を実装してあれば動く
  • Delayed commit

    • 書き込みの ACK を待ちたい場合、いままでは write メソッドの中で待つ必要があった。write メソッドが終わると、送ったデータが chunk から消されてしまうため
    • v0.14 では try_write メソッドで commit を待たずに return する。この時点では chunk が消されない。非同期のチェックスレッドが ACK を受け取って #commit_write が呼ばれた時点で、chunk が消される
    • ユースケース:分散ファイルシステムに書き込んでから、正しく読めることを確認してから chunk を消したい場合
  • configurations: flushing buffers

    • chunk を flush するタイミングを、細かく制御できるようになった
    • flush_mode: immediate → すぐに書き込んで欲しいが、失敗した時はリトライして欲しい場合に使うモード
  • Retries

    • いままではリトライ回数(retry_count)しか指定できなかったが、タイムアウト(retry_timeout)を指定できるようになった(どれだけの期間、リトライし続けるか)
    • いままでは72時間リトライし続けていた。この仕様は自明ではなかった。いつまで待てば諦めが付くのか、誰にもわからない状態だった
  • その他のプラグイン: Buffer, Parser, Formatter, Storage, ...

    • これらを v0.14 では "Owned" plugin と呼ぶ → 他のプラグイン経由で実体化されるプラグイン
      • 対義語は primary plugins → Input, Output, Filter plugin
    • Storage plugin:プラグイン内でデータを永続化したい場合に使うプラグイン
      • 例:file plugin の pos file で管理している情報を、storage plugin で永続化
  • Plugin Helpers

    • ヘルパーを使いたいときは、明示的に "helpers :name" と指定してもらうことで、どのヘルパーを使っているのか明示されるようにした
  • New Test Drivers

    • いままでは実行タイミングに依存していたようなコードを、きちんとテストできるようにした
    • 例えば、flushのタイミングを制御するとか
  • Plans for v0.14.x

    • 古橋さんの話+α
    • plugin generator を入れようと思っている。いまは、他の人の書いたプラグインの真似をしてプラグインを書く、という状態で、これは良くない
    • 新しいAPI:Fluentd 全体で使えるバッファサイズの上限を指定、Counter API
  • 開発者向けのドキュメントはこれから書く(書かなければいけないと思ってはいる)

OS X + Docker Machine + Cloudera QuickStart Docker Image で Spark MLlib のお試し環境を構築する

Docker Spark Machine Learning
f:id:muziyoshiz:20160529223041p:plain

はじめに

Cloudera は以前から、Hadoop の機能を簡単に試すための VM イメージを配布しています(Cloudera QuickStart VM のダウンロードページ)。配布されているイメージは KVM 版、Virtual Box 版、VMWare 版の3種類です。

しかし、Vagrant で使える box ファイル版は提供されておらず、コマンド一発での環境構築はできませんでした。もちろん、Virtual Box 版のイメージから box ファイルを作ることは可能ですが、Cloudera のバージョンアップに追従する作業は面倒でした。

しかし、去年の12月から、Cloudera 社が QuickStart VM の Docker イメージ版を公式に配布するようになりました。以下は、公式配布に関する Cloudera 公式のブログ記事です。

blog.cloudera.com

最近、Spark MLlib を勉強するための環境を作る機会があったので、せっかくなので Cloudera QuickStart Docker Image で環境構築してみました。その際に、普通に進めるとうまくいかないポイントがいくつかあったので、そのときの構築手順をまとめておきます。

動作環境

  • MacBook Pro (Retina, 15-inch, Mid 2014)
    • 16GB onboard memory
  • OS X Yosemite version 10.10.5

構築される環境

今回の手順を実行すると、最終的に以下のような環境が構築されます。

  • VirtualBox 上で、Docker 用の Linux VM(名前:default)が動作する。
    • この Linux VM は CPU 4コア、メモリ 9GB を使用(デフォルトは CPU 1コア、メモリ 2GB)
  • 上記の Linux VM 上で、Cloudera Quickstart Docker Image から作られたコンテナが動作する。
    • このコンテナはメモリ 8GB を使用
  • Docker コンテナ内のシェルから、spark-shell を実行できる。

Cloudera Express は 8GB 以上のメモリを必要とします(参考:Cloudera QuickStart VM)。そのため、構築手順のなかで、Linux VM のメモリをデフォルトより増やす作業を行います。

構築手順

Docker Toolboxのインストール

OS X への Docker のインストール方法は、Installation on Mac OS X が詳しいです。OS X 上で Linux の Docker イメージを直接動かすことはできないため、VirtualBox で Linux VM(名前は default)を動作させ、この Linux VM 上で Docker daemon を動作させます。

f:id:muziyoshiz:20160529182710p:plain
(※ Installation on Mac OS X より抜粋)

まず、Docker Toolbox からインストーラをダウンロードして、Docker Toolbox をインストールします。現時点の最新版は DockerToolbox-1.11.1b.pkg です。

インストール後に以下のコマンドを実行すると、Docker のバージョンを確認できます。この時点では、まだ default VM が動いていないので、Docker daemon に接続できないというメッセージが出ます。

% docker version
Client:
 Version:      1.11.1
 API version:  1.23
 Go version:   go1.5.4
 Git commit:   5604cbe
 Built:        Tue Apr 26 23:44:17 2016
 OS/Arch:      darwin/amd64
Cannot connect to the Docker daemon. Is the docker daemon running on this host?

default VM の初期設定

OS X の Launchpad から Docker Quickstart Terminal を実行します。しばらく待って、Docker のアイコンが出てくれば設定完了です。この例では、Docker ホストの IP アドレスは 192.168.99.100 になりました。

Last login: Sat May 21 00:24:50 on ttys004
bash --login '/Applications/Docker/Docker Quickstart Terminal.app/Contents/Resources/Scripts/start.sh'
% bash --login '/Applications/Docker/Docker Quickstart Terminal.app/Contents/Resources/Scripts/start.sh'
Running pre-create checks...
Creating machine...
(default) Copying /Users/myoshiz/.docker/machine/cache/boot2docker.iso to /Users/myoshiz/.docker/machine/machines/default/boot2docker.iso...
(default) Creating VirtualBox VM...
(default) Creating SSH key...
(default) Starting the VM...
(default) Check network to re-create if needed...
(default) Waiting for an IP...
Waiting for machine to be running, this may take a few minutes...
Detecting operating system of created instance...
Waiting for SSH to be available...
Detecting the provisioner...
Provisioning with boot2docker...
Copying certs to the local machine directory...
Copying certs to the remote machine...
Setting Docker configuration on the remote daemon...
Checking connection to Docker...
Docker is up and running!
To see how to connect your Docker Client to the Docker Engine running on this virtual machine, run: /usr/local/bin/docker-machine env default


                        ##         .
                  ## ## ##        ==
               ## ## ## ## ##    ===
           /"""""""""""""""""\___/ ===
      ~~~ {~~ ~~~~ ~~~ ~~~~ ~~~ ~ /  ===- ~~~
           \______ o           __/
             \    \         __/
              \____\_______/


docker is configured to use the default machine with IP 192.168.99.100
For help getting started, check out the docs at https://docs.docker.com

ちなみに、この表示が出た後で VirtualBox Manager を開くと、default という名前の VM が「実行中」になっていることが確認できます。

default VM のCPUコア、メモリ使用量の変更

Docker コンテナが 8GB のメモリを使えるようにするには、Docker が動作する default VM のメモリ使用量を 8GB 以上にする必要があります。メモリを増やす方法は、以下のページを参考にしました。

具体的な手順は以下の通りです。

  • docker-machine stop default を実行し、default VM を停止
  • VirtualBox Manager を起動し、"default" という名前の VM の「設定」欄を開く
  • 設定の「システム」タブで、「マザーボード」を選択し、メインメモリーを 2048MB から 9216MB(9GB)に変更
  • 同じく、設定の「システム」タブで、「プロセッサー」を選択し、プロセッサー数を 1 から 4 に変更
  • 「OK」ボタンを押して、設定を閉じる
  • docker-machine start default を実行し、default VM を再起動

docker info コマンドを実行すると、メモリとCPUコア数が増えていることが確認できます。

% docker info
Containers: 0
 Running: 0
 Paused: 0
 Stopped: 0
Images: 0
Server Version: 1.11.1
Storage Driver: aufs
 Root Dir: /mnt/sda1/var/lib/docker/aufs
 Backing Filesystem: extfs
 Dirs: 0
 Dirperm1 Supported: true
Logging Driver: json-file
Cgroup Driver: cgroupfs
Plugins:
 Volume: local
 Network: bridge null host
Kernel Version: 4.4.8-boot2docker
Operating System: Boot2Docker 1.11.1 (TCL 7.0); HEAD : 7954f54 - Wed Apr 27 16:36:45 UTC 2016
OSType: linux
Architecture: x86_64
CPUs: 4
Total Memory: 8.762 GiB
Name: default
ID: 5L66:CKKI:BIK7:BRUT:H6NI:XMFL:AR36:R6UQ:YOFG:OQPM:M5L6:PU6L
Docker Root Dir: /mnt/sda1/var/lib/docker
Debug mode (client): false
Debug mode (server): true
 File Descriptors: 13
 Goroutines: 32
 System Time: 2016-05-24T13:18:09.388371356Z
 EventsListeners: 0
Registry: https://index.docker.io/v1/
Labels:
 provider=virtualbox

Cloudera QuickStart Docker Image のダウンロード

Cloudera QuickStart Docker Image は Docker Hub で公開されています(Docker Hub の cloudera/quickstart ページ)。そのため docker pull コマンドでイメージをダウンロードできます。ダウンロードサイズが 4.4GB あるので、結構時間がかかりました。

% docker pull cloudera/quickstart:latest
latest: Pulling from cloudera/quickstart
1d00652ce734: Pull complete
Digest: sha256:f91bee4cdfa2c92ea3652929a22f729d4d13fc838b00f120e630f91c941acb63
Status: Downloaded newer image for cloudera/quickstart:latest
% docker images
REPOSITORY            TAG                 IMAGE ID            CREATED             SIZE
cloudera/quickstart   latest              4239cd2958c6        6 weeks ago         6.336 GB

コンテナの起動

以下のコマンドを実行して、コンテナを起動します。

docker run --hostname=quickstart.cloudera \
--privileged=true -t -i -d -p 18888:8888 -p 17180:7180 -p 10080:80 \
-m 8192m cloudera/quickstart:latest /usr/bin/docker-quickstart

このコマンドは Docker Hub の cloudera/quickstart ページ にあるものをベースに、以下の修正を加えたものです。

  • コンテナをデーモンとして起動(あとから docker exec で接続)
  • -m 8192m を指定して、メモリ使用量を 8GB に指定
  • マッピングされるポート番号が起動のたびに変わるのを避けるために、マッピング先を、元のポート番号に10000足した値に固定

コンテナの起動後は、docker exec コマンドで、コンテナ上のシェルに接続します。以下の例では、コンテナ ID の一部を指定して接続しています。

% docker ps
CONTAINER ID        IMAGE                        COMMAND                  CREATED             STATUS              PORTS                                                                     NAMES
048c433261a7        cloudera/quickstart:latest   "/usr/bin/docker-quic"   23 seconds ago      Up 23 seconds       0.0.0.0:10080->80/tcp, 0.0.0.0:17180->7180/tcp, 0.0.0.0:18888->8888/tcp   gigantic_poitras
% docker exec -it 048c bash

接続後に free コマンドでメモリ使用量を見ると、8GB 以上のメモリを使っているように見えます。

[root@quickstart /]# free
             total       used       free     shared    buffers     cached
Mem:       9187728    5623316    3564412     169020      34224    1127784
-/+ buffers/cache:    4461308    4726420
Swap:      3227556          0    3227556

ただ、cgroup の設定を確認すると、1024*1024*1024*8 = 8589934592 で 8GB になっていたので、制限は効いているのではないかと思います。このあたりはちょっと自信がないです。

[root@quickstart /]# cat /sys/fs/cgroup/memory/memory.limit_in_bytes
8589934592

Cloudera Express の起動

コンテナ内で以下のコマンドを実行し、Cloudera Express を起動します。このコマンドは、コンテナの再起動後にも実行する必要があります。

[root@quickstart /]# /home/cloudera/cloudera-manager --express --force
[QuickStart] Shutting down CDH services via init scripts...
kafka-server: unrecognized service
JMX enabled by default
Using config: /etc/zookeeper/conf/zoo.cfg
[QuickStart] Disabling CDH services on boot...
error reading information on service kafka-server: No such file or directory
[QuickStart] Starting Cloudera Manager server...
[QuickStart] Waiting for Cloudera Manager API...
[QuickStart] Starting Cloudera Manager agent...
[QuickStart] Configuring deployment...
Submitted jobs: 15
[QuickStart] Deploying client configuration...
Submitted jobs: 16
[QuickStart] Starting Cloudera Management Service...
Submitted jobs: 24
[QuickStart] Enabling Cloudera Manager daemons on boot...
________________________________________________________________________________

Success! You can now log into Cloudera Manager from the QuickStart VM's browser:

    http://quickstart.cloudera:7180

    Username: cloudera
    Password: cloudera

出てくるメッセージは QuickStart VM の場合と全く一緒ですね……。

Web UI の接続確認

ここまで来れば、ホスト OS(OS X)の Web ブラウザから、Cloudera の Web UI の動作を確認できます。

また、この時点ではアクセスできませんが、Cloudera Manager から Hue を起動した後に、以下のアドレスで Hue にアクセスできます。

Cloudera Manager からサービス起動

http://192.168.99.100:17180/ にアクセスし、以下の手順で、Spark Shell の動作に必要なサービスを起動します。

  • Cloudera Manager Service の起動を待つ
  • クロックオフセットの設定を修正する(後述)
  • 他のサービスを、以下の順に起動する
    • HDFS
    • Hive
    • YARN
    • Spark

サービスの起動までに時間が掛かります。起動した直後は Bad Health のことがありますが、その場合はしばらく待てば起動します。

また、他の人の環境でも起こる問題かはわからないのですが、私の環境では Host のステータスが以下のようになる問題が発生しました。

致命的なヘルスの問題:クロックのオフセット
(英語では Bad health: Clock Offset)

この問題は、同じ OS X マシンで Cloudera QuickStart VM を使ったときにも発生したのですが、そのときと同じ対処方法が有効でした。以下の手順に従って、オフセットの設定を無効にすれば、ステータスが Good に戻って、問題なく動作するようになります。

  • Cloudera Managerのメニューから、 Host(ホスト) → Configuration(設定) と遷移する
  • 検索キーワード欄を使って、Host Clock Offset Thresholds(ホストクロックオフセットのしきい値) を表示する
  • この設定の「Warning(警告)」と「Critical(致命的)」を、両方とも「Never(行わない)」に変更する
  • 「Save Changes(変更の保存)」ボタンを押す

Spark Shell の起動

Spark Shell を初めて起動する前に、以下のコマンドを実行してください。

[root@quickstart /]# sudo -u hdfs hadoop fs -chmod 777 /user/spark
[root@quickstart /]# sudo -u spark hadoop fs -chmod 777 /user/spark/applicationHistory

上記のコマンドを実行しなかった場合、Spark Shell の起動中に以下のエラーが表示されてしまいます。どうも、Cloudera QuickStart の不備のようです(参考:Solved: [CDH 5.5 VirtualBox] unable to connect to Spark Ma... - Cloudera Community)。

16/05/29 06:37:15 ERROR spark.SparkContext: Error initializing SparkContext.
org.apache.hadoop.security.AccessControlException: Permission denied: user=root, access=WRITE, inode="/user/spark/applicationHistory":spark:supergroup:drwxr-xr-x

そして、以下のコマンドを実行すると Spark Shell が起動します。Cloudera Quickstart VM は擬似分散モードで動作しているので、引数には --master yarn-client を指定する必要があります。

[root@quickstart /]# spark-shell --master yarn-client
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.0
      /_/

Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_67)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc (master = yarn-client, app id = application_1464516865260_0001).
SQL context available as sqlContext.

scala>

以上で、Spark MLlib を試すための環境構築は完了です。

Spark MLlib を試す

Apache Spark入門 動かして学ぶ最新並列分散処理フレームワーク (NEXT ONE)

Apache Spark入門 動かして学ぶ最新並列分散処理フレームワーク (NEXT ONE)

  • 作者: 株式会社NTTデータ,猿田浩輔,土橋昌,吉田耕陽,佐々木徹,都築正宜,下垣徹
  • 出版社/メーカー: 翔泳社
  • 発売日: 2015/10/29
  • メディア: 大型本
  • この商品を含むブログを見る

構築に成功したか確認するために、Apache Spark 入門の8章に掲載されていたコードを Spark Shell で実行してみました。内容は、K-means を用いたクラスタリングです。

HDFS へのサンプルデータのアップロード

Cloudera QuickStart には Spark MLlib のサンプルデータが含まれていませんでした。そのため、まずはテストに使うサンプルデータをダウンロードして、HDFS にアップロードします。

[root@quickstart /]# curl https://raw.githubusercontent.com/apache/spark/master/data/mllib/kmeans_data.txt > kmeans_data.txt
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0    72    0    72    0     0     40      0 --:--:--  0:00:01 --:--:--    47
[root@quickstart /]# cat kmeans_data.txt
0.0 0.0 0.0
0.1 0.1 0.1
0.2 0.2 0.2
9.0 9.0 9.0
9.1 9.1 9.1
9.2 9.2 9.2
[root@quickstart /]# hadoop fs -put kmeans_data.txt /tmp
[root@quickstart /]# hadoop fs -ls /tmp
Found 5 items
drwxrwxrwx   - hdfs   supergroup          0 2016-05-29 13:02 /tmp/.cloudera_health_monitoring_canary_files
drwxrwxrwt   - mapred mapred              0 2016-04-06 02:26 /tmp/hadoop-yarn
drwx-wx-wx   - hive   supergroup          0 2016-05-29 09:57 /tmp/hive
-rw-r--r--   1 root   supergroup         72 2016-05-29 13:02 /tmp/kmeans_data.txt
drwxrwxrwt   - mapred hadoop              0 2016-05-29 10:19 /tmp/logs

サンプルデータの RDD への変換

Spark Shell を起動して、HDFS からサンプルデータを読み込んで、RDD に変換します。

[root@quickstart /]# spark-shell --master yarn-client
(中略)

scala> import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.clustering.KMeans

scala> import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.Vectors

scala> val data = sc.textFile("hdfs://localhost/tmp/kmeans_data.txt")
data: org.apache.spark.rdd.RDD[String] = hdfs://localhost/tmp/kmeans_data.txt MapPartitionsRDD[1] at textFile at <console>:29

scala> val parsedData = data.map{ s =>
     |   Vectors.dense(
     |     s.split(' ').map(_.toDouble))
     | }.cache()
parsedData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MapPartitionsRDD[2] at map at <console>:31

KMeans のモデルの作成

これ以降は、本に載っているコードをそのまま実行できました。KMeans.train メソッドでモデルを生成します。

scala> val numClusters = 2
numClusters: Int = 2

scala> val numIterations = 20
numIterations: Int = 20

scala> val clusters = KMeans.train(parsedData, numClusters, numIterations)
16/05/29 13:07:00 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
16/05/29 13:07:00 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
clusters: org.apache.spark.mllib.clustering.KMeansModel = org.apache.spark.mllib.clustering.KMeansModel@5d27cdd2

上記の WARN は、native library のロードに失敗したことを表しているようです(参考:Using Spark MLlib)。Hardware acceleration が効かないだけなので、この WARN は無視します。

生成されたモデルの確認

生成されたクラスタの数、および各クラスタの中心の座標を確認します。

scala> clusters.k
res0: Int = 2

scala> clusters.clusterCenters
res1: Array[org.apache.spark.mllib.linalg.Vector] = Array([0.1,0.1,0.1], [9.099999999999998,9.099999999999998,9.099999999999998])

新たなベクトルを与えて、いずれのクラスタに分類されるか確認します。

scala> val vec1 = Vectors.dense(0.3, 0.3, 0.3)
vec1: org.apache.spark.mllib.linalg.Vector = [0.3,0.3,0.3]

scala> clusters.predict(vec1)
res2: Int = 0

scala> val vec2 = Vectors.dense(8.0, 8.0, 8.0)
vec2: org.apache.spark.mllib.linalg.Vector = [8.0,8.0,8.0]

scala> clusters.predict(vec2)
res3: Int = 1

元のファイルに含まれていた座標が、いずれのクラスタに分類されるかを判定します。判定結果は HDFS に出力します。

scala> val predictedLabels = parsedData.map(vec => clusters.predict(vec))
predictedLabels: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[32] at map at <console>:39

scala> predictedLabels.saveAsTextFile("/tmp/output")

scala> exit
[root@quickstart mllib]# hadoop fs -ls /tmp/output
Found 3 items
-rw-r--r--   1 root supergroup          0 2016-05-29 07:12 /tmp/output/_SUCCESS
-rw-r--r--   1 root supergroup          8 2016-05-29 07:12 /tmp/output/part-00000
-rw-r--r--   1 root supergroup          4 2016-05-29 07:12 /tmp/output/part-00001
[root@quickstart mllib]# hadoop fs -cat /tmp/output/part-*
1
1
1
0
0
0

まとめ

一通りの作業を試してみて、Cloudera QuickStart Docker Image で Spark MLlib のお試し環境を構築できることがわかりました。

使ってみた感想としては、コマンドラインから QuickStart のコンテナを起動・停止できるのは便利でした。その一方で、この QuickStart Docker Image は、Cloudera QuickStart VM を単純に Docker image にしただけ、という感じで、Web ブラウザから行わなければいけない設定やサービス起動は相変わらず多かったです。そのため、この Docker image をベースに環境構築を自動化するのは大変そうです。

Spark を CDH と一緒に使いたいという特別な理由がなければ、もっと軽量な Docker イメージを探したほうがよいかもしれませんね……。

社内Wikiに情報を書くときに守ってほしい、たったひとつのルール

Wiki
f:id:muziyoshiz:20160425013924j:plain

このページについて

これは、社内 Wiki に情報を書くときに、私が個人的に守っていて、チームメンバにもできるだけ守ってほしいルールの紹介記事です。このルールを実際に運用するためのコツについても、基本ルールの派生という形で紹介します。

想定する環境

この記事は、社内に Wiki があって、フォーマットが決まったドキュメント(仕様書や手順書など)とは別に、各人がメモを自由に書いて共有できるような環境を想定しています。

Wiki の種類は問いません。PukiWiki、MediaWiki、Confluence、Esa、Redmine などのプロジェクト管理ツール付属の Wiki などなど……何でもいいです。Word 文書などにも適用できなくはないですが、文書を気軽に分けるのが難しい場面には、あまり向かないかと思います。

ルール:「このページについて」という欄をページの先頭に用意し、そのページの概要を1〜2文で書く

守ってほしいルールはこれだけです。単純ですが強力で、きちんと守ろうとすると意外と面倒なルールでもあります。

例えば、何かのインストール作業をメモしておきたい場合、「このページについて」欄には以下のような文章を書きます。

## このページについて

これは、システムAの開発環境を構築するために、Windows 7 にソフトウェアBをインストールした際のメモです。

## 手順

(これ以降に、実際に実行した手順のメモ)

または、何か新機能の実装をしたくて、事前に設計を検討した場合は、こんな感じで書きます。

## このページについて

これは、機能Cの設計についてチーム内で議論するために、個人的に作成した設計案です。設計の事前知識として、既存のソースコードを調査した結果も含めています。

## 設計案

(これ以降に設計案とソースコードの解説)

Wikipedia の記事は、よく「〜とは、…である。」という一文から始まりますが、それをイメージしてもらえると分かりやすいかもしれません。

同じような内容がページの先頭に書いてあれば、この欄の名前は「このページについて」にしなくても、「概要」でも "About this page" でもなんでも良いです。

このルールの目的

このルールを徹底する目的は、そのページを開いた人に 「このページに、自分がいま知りたい情報が書かれているか?」を瞬時に判断する ための情報を与えることです。

社内 Wiki を実際に使っている人なら、このページにはこんなことが書いてあるだろうなーと思って読み始めたら、実際は全然違うことが書かれていた、という経験が何度もあると思います。例えば、こんな感じです。

  • インストール手順書かと思って読んでいたら、インストール時に試したことを適当な順序で書いてあるだけのメモだった。書いてある手順を上から順に実行したところ、途中で失敗してしまった。で、メモにも「このコマンドは失敗した」とか書いてある。
  • ある機能の設計に関するメモかと思ったら、単なる設計案の一つだった。最終的に実装されたソースコードの設計とは全く一致していなかった。

誤解を避けるために強調しておくと、私は、中途半端なメモを Wiki に書かないで欲しい、とか、そういう中途半端なメモは消して欲しい、とか言いたいわけではありません。

ただ、そのページがどういう状態なのかが明示されていないと、

あるページAを、ここには正確な情報が書いてある、と思って読み始める
↓
期待を裏切られる
↓
別のページBを、ここには正確な情報が書いてある、と思って読み始める
↓
期待を裏切られる
↓
この Wiki に書いてある情報はどれも信用できない! となって Wiki を読まなくなる

という悪循環を起こしかねません。そうなると他の人がいくら Wiki に有効な情報を書いていっても、その情報は読まれない、というもったいないことになってしまいます。

ページの先頭にただのメモだと明示してあれば、読み手が 「他に有力な情報源が見当たらないので、これを頑張って読み解こう」 と判断することも十分ありえますし、結果として必要な情報が書いてなくても、期待を大きく裏切ることにはなりません。Wiki 全体でそういう状態を保ち、モラルハザードを防ぐ……というのがこのルールの目的です。

以下は、このルールを実際に運用するために守ったほうがよい、このルールの派生系です。

派生(1):ページを更新し終わったタイミングで「このページについて」の内容を見直す

ページの内容を色々書いているうちに、最初に書こうと思っていた内容とはズレていってしまう、ということはよくあると思います。理想的には、内容がズレた時点で直したほうがよいのですが、少なくともページを更新し終わったタイミングでは必ず直すようにしましょう。

例えば、設計案を色々書いていたつもりが、最終的な設計についてもそのページに書いてしまい、他のページにはそこまで詳しい情報がまだ書かれてない、という状態になったとします。

もちろん最終的な設計のページをきちんと用意したほうが良いですが、「このページについて」欄を以下のように直すだけでも、後から読む人にとっては役立ちます。

## このページについて

これは、機能Cの設計についてチーム内で議論するために、個人的に作成した設計案です。最終的に採用したのは案3で、このページに書いてある案3の詳細は、最終的な設計と同じです。

派生(2):「このページについて」から外れる内容が増えてきたら、別のページに分ける

書いているうちに「このページについて」には書いてない内容が増えてきた、あるいは内容に合わせて「このページについて」を直したら1〜2行では収まらなくなった、という場合にはページを分けましょう。

例えば、あるシステムAの開発環境構築手順書を作っているうちに、内容が膨らんできて、「このページについて」欄が、

## このページについて

これは、システムAの開発環境を構築するための手順書です。このシステムのサブシステムB、C、Dの知識がないと、この手順は意味不明に見えるかもしれません。そのため、このシステムのサブシステムB、C、Dの関係についても、構築手順書の途中で説明します。

のようになったら、恐らくそのサブシステムB、C、Dの解説は別のページに分けたほうがいいでしょう。そして、以下のようにリンクを書くだけにするのが無難です。

## このページについて

これは、システムAの開発環境を構築するための手順書です。これらの手順が何をしているか理解するためには、このシステムAの構成に関する知識が必要です。システムAのサブシステムについての知識がない場合は、必ず先に [サブシステムの解説](http://wiki.example.com/subsystem) を読んでください。

そうしないと、読み手が、1個の長いページのなかで、自分の知りたい情報を探しまわる羽目になってしまいます。

別のページに分けたら読まれないかも?と思うかもしれませんが、そういう人は、同じページ内に書いてあってもどうせ読みません。それなら、読む気のある人向けに、少しでもわかりやすくしたほうが良いです。

派生(3):他の人が書いた「このページについて」も直していい

他の人が書いたページでも、「このページについて」に書かれた内容と、本文が違っていたら、積極的に直しましょう。また、そもそも「このページについて」欄が無かったら追加しましょう。

基本ルールに書いた通り、目的は Wiki 全体の利用価値向上、モラルハザード防止です。そのためには、内容は直さずに、「このページについて」欄を直すだけでも十分意味があります。

例えば、あるページを読んで、その内容が間違っていた場合、私だったらこういう文章をページの冒頭に書いて、内容は修正も削除もせずに残しておきます。

## このページについて

私(吉澤)が 2016-04-25 に調べた範囲では、このページに書かれた設計は、現在の実装と違っているようです。そのため、設計情報としては参考にしないことを推奨します。

ただ、当時の設計思想など、何かの参考になる可能性を否定しきれないため、削除せずに残しておきます。

派生(4):本文は頑張らない

Wiki に書かれた情報の品質は高ければ高いほど良いですが、敷居が上がって、Wiki に書くのが一人か二人だけ、になってしまうのも困ります。そのため、「このページについて」欄以外のルールはあまり多くしないほうが良いでしょう。

また、一般的な話として、「他人のために細かく書いてあげている」というつもりで書いていると、読んでもらえなかったときの心理的ダメージが大きいです。そのため、手間のかかる本文の品質は、自分が後で読んで困らない程度にしておくことをお勧めします。

まとめ

今回は、普段はあまり書かないタイプの、仕事のノウハウについての記事でした。

私は自他共に認めるメモ魔で、やたらあちこちにメモを書き散らしています。そうやって書き溜めているメモがたまに同僚から「役立った」と言ってもらえることもあるのですが、それと同時に「自分はそこまでできないし、やらない」という反応をされることが多かったりします。

書く分量が少なくても、少しの工夫で、後から読んで役立つような文章を書くことはできるはずなんだけどなあ……と思い、自分なりに本質的と考える部分をまとめてみました。Wiki も普及してだいぶ経つので、今更感のある話題ではありますが、他の人のノウハウも色々伺ってみたいところです。