無印吉澤

Site Reliability Engineering(SRE)、ソフトウェア開発、クラウドコンピューティングなどについて、吉澤が調べたり試したことを書いていくブログです。

Rails 5 で艦これアーケードのプレイデータ管理ツール "Admiral Stats" を開発中

f:id:muziyoshiz:20160828002458p:plain:w600

開発のきっかけ

このブログでゲームのことは書いたことなかったと思いますが、個人的には、長い期間かけてチマチマやるゲームが好きで、ここ数年は Ingress と艦これをやってます。

今年の4月には、艦これのアーケード版(艦これアーケード)もリリースされて、これも週1くらいのペースでゲーセンに通ってプレイしてたりします。

この艦これアーケードは筐体がネットワーク接続されており、自分のプレイデータをあとから SEGA の公式サイト で閲覧できるようになってます。このサイトで結構細かいデータまで見られるのですが、見られるデータはアクセス時の最新状態のみです。

このプレイデータを過去の分まで記録して時系列データとして可視化したら面白そう

と思いついたのと、

どうせ作るなら6月末にリリースされた Ruby on Rails 5 でも使ってみようか

ということで、この夏休みを使ってプレイデータ管理ツールを作ってみました。今回の記事は、このツール "Admiral Stats" の開発中間報告です。

2016-09-03追記

この記事の公開後、9/3にサービスリリースしました。Twitter アカウントでログインして使えます。ぜひお試しください。

www.admiral-stats.com

艦これアーケードとは?

艦これを全く知らない人向けに説明すると、艦これアーケードとは、艦娘と呼ばれるキャラのカードを集めて、選りすぐりのデッキを作成し、ステージを攻略していくアクションゲームです。ステージをクリアするたびに、ランダムで新たなカードが排出されます。

どのステージでどのカードが出やすいか、などのカード排出に関する法則性は全く公開されていないため、「自分(たち)が試したらこうだった」という情報が Wiki などで頻繁にやりとりされています。この法則性をつかむために(あるいは自分の不運をなぐさめるために)プレイデータを記録している人も多いと思います。

ちなみに、サービス開始直後は何時間も待たないとプレイできないほど人気でしたが、最近は少し待てばプレイできる程度に空いてきています。秋にゲーム内イベントがあるらしいので、それまでは空いてるんじゃないでしょうか。

Admiral Stats とは?

今回開発した Admiral Stats は、この艦これアーケードのプレイデータを可視化するサイトです。SEGA 公式のプレイヤーズサイト が対応していない、時系列での可視化に対応しています。

Ingress を知っている人なら Agent Stats の艦これアーケード版」 という説明が一番分かりやすいと思います。実際、Agent Stats からの連想で Admiral Stats を作ることを思いつきましたし、名前も Agent Stats からの連想で付けました*1

f:id:muziyoshiz:20160828004335p:plain
Agent Stats の画面例

Admiral Stats の画面サンプル

サンプル 1:カードの入手履歴

Admiral Stats にまずログインすると、最近のプレイで入手したカードの一覧が表示されます。ずっとプレイしていてカードが増えてくると、「あれ、このカードって前にゲットしたっけ? 今日が初めてだっけ?」とわからなくなってくるのですが(自分はそうでした)、そういう場合を想定した機能です。

f:id:muziyoshiz:20160828003035p:plain

サンプル 2:カードの入手数・入手率のグラフ

カードの種類(ノーマル、レアなど)ごとの入手数、入手率のグラフです。Admiral Stats の内部に各カードのリリース時期のデータを登録してあるため、入手率は減少することもあります。

f:id:muziyoshiz:20160828003052p:plain

サンプル 3:レベル・経験値のグラフ

艦娘のレベル・経験値だけでなく、艦種(駆逐艦とか)や艦隊全体の累計レベル・経験値も表示できます。

f:id:muziyoshiz:20160828003101p:plain

サンプル 4:カード入手状況の一覧表示

公式サイトでも見られる情報なのですが、Admiral Stats では情報量を絞る代わりに、1ページにまとめて表示します。

f:id:muziyoshiz:20160828003114p:plain

データのアップロード方法

艦これアーケードの公式サイトは、残念ながら、プレイデータのダウンロード機能を提供していません。ただ、このサイトはとても綺麗に作られていて、プレイデータはすべて API 用の URL から JSON で取得し、Web ブラウザ側で画面を描画しています。

そのため、今回はこの JSON をそのままファイルに出力する admiral_stats_exporter というエクスポートツールを作りました。このツールが出力した JSON ファイルを Admiral Stats にアップロードすると、上記のサンプルのような画面が表示されます。

f:id:muziyoshiz:20160828002458p:plain:w600

Admiral Stats へのログイン方法

メールアドレスの管理をしたくなかったので、Twitter アカウントでログインする方法を採用しました。 Admiral Stats から SEGA のサイトに直接アクセスすることはないので、SEGA ID などの登録は必要ありません。

Admiral Stats の公開予定

実装は一通り終わりました。ローカルの仮想マシンで動かせば、自分1人で使う分には実用的に使えています。

ただ、どうせなら元ネタの Agent Stats の 「全ユーザとの比較」ページ のように統計情報を表示できると、もっと面白くなるんじゃないかと思ってます。Agent Stats ではレベルや経験値の分布、プレイ傾向がわかる指標(攻撃重視か構築重視か、など)の分布が公開されています。艦これの場合、レアカードの所有率の分布とかでしょうか。

そこで他のユーザのデータもアップロードしてもらえるように、Admiral Stats を設置したサイトを公開するための準備中です。ただ、以下のような作業がまだ残っていて、公開できるのは1〜2週間先になる見込みです。

  • サーバのレンタル
  • SSL 証明書導入(Let's Encrypt)と HTTPS 対応
  • デプロイ自動化スクリプトの作成(場当たり的に開発環境を作ったので、必要な手順や設定を整理できてなくて……)
  • production 設定での動作確認
  • 最低限のテスト

もし、Admiral Stats を使ってみたい方は、admiral_stats_exporter で事前にプレイデータをエクスポートしておいてください。ただ、こちらはあくまで非公式のツールなので、リンク先の説明を理解したうえで、利用は自己責任でお願いします。

あと、このエクスポータは突貫で実装したツールなので、使いづらいのはご容赦ください……。本当は Agent Stats のように、スマホだけでエクスポートからインポートまで完結できると良いと思うんですけどね。そこまで手が回りませんでした。

Admiral Stats についての紹介はここまでで、これ以降は Rails 5 での実装に関する細かい話です。

実装の詳細

最近は PHP や Java で Web アプリを作っていたので、rails でまともにアプリを作るのは、Ruby on Rails 2 以来だったりします。そのため、Rails を使い慣れている人には当たり前の話が多いかもしれません。

開発環境

コーディングはホストOS(Mac OS X Yosemite)、実行はゲストOS(Vagrant + VirtualBox + CentOS 7.2)で行いました。IDE は、最近 IntelliJ に慣れてきたので RubyMine にしました。

  • IDE: RubyMine 2016.2.1
  • Ruby: ruby 2.3.1p112
  • Ruby on Rails: Rails 5.0.0.1

プラグイン

画面は Bootstrap のデフォルトのデザインをほぼそのまま採用し、グラフは Highcharts、表は Datatables で作りました。いずれも gem でインストールできました。便利ですね。

自分で明示的に導入したプラグインと、導入方法、参考にしたページなどは以下の通りです。

bootstrap-sass (3.3.7)

  • twbs/bootstrap-sass: Official Sass port of Bootstrap 2 and 3.
  • rails new を実行した時点で、Gemfiles に gem 'sass-rails', '~> 5.0' が入っていた。そのため、追加したのは gem 'bootstrap-sass', '~> 3.3.6' のみ。
  • application.css のファイル名を application.scss に変更し、以下の行を追加。
@import "bootstrap-sprockets";
@import "bootstrap";
  • application.scss にした時点で、元の CSS ファイルにあった *= require_tree . の文法は使えなくなる。そのため、rails generate controller <controller_name> で自動生成される <controller_name>.scss は、自動的には読み込まれない。もし読み込みたければ、各ファイルを明示的に @import で指定するか、css - Proper SCSS Asset Structure in Rails - Stack Overflow の回答(日本語訳)にあるような手段を使う必要がある。
  • Sass 自体については、後述する書籍と、Sass + Railsの基礎 - Qiita を主に参考にした。
  • Bootstrap の使い方については、公式サイトの Getting Started の Examples と、Components および CSS を参考にした。

jquery-datatables-rails (3.4.0)

  • jquery-datatables-rails の "Twitter Bootstrap 3 Installation" の手順に従ってインストール。ただし、Sass 版の Bootstrap をインストールしたので、application.scss には以下のように記載する。
@import "dataTables/bootstrap/3/jquery.dataTables.bootstrap";

highcharts-rails (4.2.5)

//= require highcharts
//= require highcharts/highcharts-more

// チャート画像のダウンロード機能
//= require highcharts/modules/exporting
//= require highcharts/modules/offline-exporting

omniauth (1.3.1), omniauth-twitter (1.2.1)

google-analytics-rails (1.1.0)

Ruby on Rails 5 を使ってみた感想

Rails 2 時代の知識のアップデートするために、まずは本屋で Rails 4 の本をいくつか流し読みしてから、そのうちの1冊を買ってきて読みました。これは内容が網羅的で、かつ読みやすい良書でした。

Ruby on Rails 4 アプリケーションプログラミング

Ruby on Rails 4 アプリケーションプログラミング

また、Rails 5 に関するページをいくつか流し読みしました。主に参考にしたページはこのあたりです。

今回の開発の範囲では、基本的な機能しか使わなかったせいか、Rails 5 だからという理由でつまづくことは特にありませんでした。本当に何もなくて、拍子抜けしたくらいです。

Rails 5 からデフォルトの開発用 Web サーバが Webrick から Puma に変わったとのことですが、特に意識せずに使えました。また、プラグインも、Rails 5 だから動かない、というものはありませんでした。

強いて言えば、いままでは rake db:migrate のように rake で実行していたコマンドが、rails db:migrate で実行できるようになったので、新しいやり方に慣れるためになるべく rails の方を使っていました。まあ、rake の方も使えるので、無理に rails を使う必要はなさそうですけど。

今後、Admiral Stats に機能を追加する機会があれば、API mode など、Rails 5 の新機能をうまく入れ込んでみたいと思います。

*1:Ingress ではプレイヤーのことを Agent と呼び、艦これでは提督(Admiral)と呼ぶため。

Treasure Data Tech Talk 201607 レポート(古橋さんと成瀬さんの講演メモのみ)

f:id:muziyoshiz:20160724153127p:plain:w320

先週末に、Treasure Data Tech Talk に参加してきました。このイベントは毎回濃い話を聞けるので、行けるときはなるべく参加するようにしています。

今回は、古橋さんによる Digdag での YAML 利用の話と、成瀬さんによる PerfectQueue の話が特に面白かったです。以下、講演内容のメモと、公開済みのスライドです。

講演内容

DigdagはなぜYAMLなのか? (Sadayuki Furuhashi, @frsyuki)

  • Digdag とは何か?

    • Workflow automation system
    • Digdag で一番やりたいのはバッチデータ解析の自動化
  • Digdag の競合

    • OSS, Proprietary それぞれに競合がある
    • Workflow automation system は、ワークフローの定義方法によって3つに分類できる
      • プログラミング言語型:Luigi など
      • GUI型:Rundeck など
      • 定義ファイル+スクリプト型:Azkaban など
    • ワークフローの作りやすさと、カスタマイズの柔軟性のトレードオフ
  • Digdag

    • Digdag は定義ファイル+スクリプト型
    • 定義ファイル+スクリプト+俺たちのYAML
    • YAMLは便利だが、include できない、変数の埋め込みができない、(言語内DSLのように)プログラムが書けない、という欠点がある
    • Digdag では、YAML の仕様に従ったうえで、これらの欠点を克服した
  • include できる

    • YAML の仕様では、値(scalar)の前に "!" から始まる文字(タグと呼ばれる)を付与できる
    • 通常、YAMLパーサは、正規表現によるマッチでタグを決定して、自動的にタグを付与している
    • Digdag では "!include : filename" という表記を、ファイルインクルードの文法として使っている
    • !include の後ろに、" " を書く必要がある。このスペースが大事。このスペースのおかげで、通常の YAML パーサでも、キーが " "、値が filename のハッシュとして読み込める
    • ただし、この !include を複数書くと、キーが " " のハッシュが重複してしまう。Digdag の YAML パーサでは、複数の !include を書けるように、内部的にキーを UUID に書き換えている
  • 変数の埋め込みができる、プログラムが書ける

    • Java 8 は、Nashorn(ナスホーン)という JavaScript Engine を同梱している。これを使って ${} 内を評価している
    • だから Digdag は Java8 必須
  • Q) 何故YAMLをベースにした?

    • A) 比較的書きやすい、読みやすい。YAMLとして既存のプログラムから扱える。
  • Q) YAMLはグラフ構造を表現するのに適さないのでは?

    • A) YAMLはDAG、グラフを扱うわけではない。ツリーを扱っている。
  • Q) YAMLにコードを書けるとのことだが、悪さはできないのか?

    • A) 悪さできないように対策している。JavaScript はサンドボックス内でしか動作しない。そのためにJavaScriptを採用した。

PerfectQueueはいかにパーフェクトか、あるいはRubyとMySQLでジョブキューを作る試みについて (Yui Naruse, @nalsh)

  • Who is naruse

    • nkfメンテナ
    • Rubyコミッタ
    • Treasure DataではバックエンドのRubyを担当
  • そもそもジョブキューとは

    • FIFO
    • フロントエンドとバックエンドを疎結合化
  • PerfectQueue の特徴

    • MySQL で実装
    • At-least-once を優先(at-least-once と at-most-once はトレードオフの関係)
  • キューのデータ構造

CREATE TABLE `queue` (
    /* unique key (-> at most once) */
    id VARCHAR(255) NOT NULL,
    /* for FIFO's timeline */
    timeout INT NOT NULL,
    /* opaque data */
    data LONGBLOB NOT NULL,
    /* alive or finished */
    created_at INT,
    PRIMARY KEY (id)
)
  • タスクのライフサイクル

    • タスクの投入時に、timeout, created_at を現在時刻にする
    • タスクの取得時は、timeout が小さいものから優先的に取得し、timeout を 300 秒後に更新
    • タスクの実行中は、ハートビートとして、timeout を定期的に 300 秒後に更新
    • タスクの完了後は、created_at を NULL にして、一定時間保存するために timeout を 720 秒後に更新(タスクの重複を検出するため)
    • 前述の retention time を過ぎたら物理削除
  • タスクの取得、削除時の排他制御が大変

    • 排他処理のために、最初期は FOR UPDATE を使っていた
      • しかし、頻繁にデッドロックする
      • デッドロックを避けるには MySQL の気持ちになってクエリを書く必要がある
    • LOCK TABLES を使うと、テーブル全体をロックしてしまうので、SELECT にも影響
    • GET_LOCK が一番安全、でもクエリの書き方によってはデッドロックが発生した
      • ロックの delete クエリと acquire のクエリがバッティングしないように、タイミングの調整が必要(間隔を乱数で変える)
      • 調整しないと性能問題が起きた
      • さらに、ネットワーク遅延が発生すると影響大
        • GET_LOCK から RELEASE LOCK まで 3 RTT かかるので、影響大
    • queue テーブルに owner カラムを追加したうえで、FOR UPDATE を使うクエリに変更し、テーブルロックを不要にした
      • MySQL の気持ちになって書いたので安全
  • 結論

    • PerfectQueue はより完璧になった
  • Q) 何故 MySQL をキューに選んだのか?

    • A) 当時、Amazon RDS で PostgreSQL が使えなかった。RDBMS を選んだ理由は、Amazonで提供されている、フェイルオーバーがある、など。新しいジョブキューを開発したのは、ジョブキューにフェアスケジューリングの機能(Treasure Dataのサービスで必要)を付けたかったから。(古橋)

感想

Digdag での YAML の拡張(いや、標準の仕様に従っているので拡張と言うのは不適切か?)については、話としては面白かったんですが、そもそも YAML でプログラミングするのは辛そう、というのが第一の感想でした。Ansible もそんな感じのつらみがありますしね。AnsibleSpec みたいな、Digdag で書いたワークフローをテストするツールとかが、いずれ出てきたりするんでしょうか。

また、マークアップ言語として YAML にこだわる必要があるんだろうか、とも思ったのですが、じゃあ代替手段として何があるのか、と考えてみると、なかなか難しそうです。XML よりもマシな選択肢となると、いまは YAML なのかな……。HashiCorp の HCL のように、独自方式を作る方向もあったと思いますが、Digdag に独自方式を作るほどの要件はなかったんですかね。

手を動かす Spark MLlib & Word2Vec Part 2 (Wikipedia 英語版から Word2Vec モデルを作るまで)

f:id:muziyoshiz:20160626223709p:plain

このシリーズについて

実際に手を動かして Spark MLlib に慣れていこう、というシリーズです。

Spark を使うならそれなりに大きなデータを分散処理しないと面白くないと思い、Wikipedia のデータから Word2Vec のモデルを作るところまでやってみました。環境構築については Part 1 をご参照ください。

muziyoshiz.hatenablog.com

Part 2 の範囲

Wikipedia 英語版のデータから作成したコーパスを Amazon EC2 上の Spark Cluster で処理して、Word2Vec のモデルを作成するところまで。

Wikipedia 英語版のデータからコーパス作成

最終的にやったこと

Wikipedia:Database download から辿って、https://dumps.wikimedia.org/enwiki/ (HTTP) または Data dump torrents (BitTorrent) からダウンロードできます。

Wikipedia のデータは XML および SQL で公開されており、それぞれ色々ファイルがあります。今回は Wikipedia の本文からコーパスを作りたいので enwiki-latest-pages-articles.xml.bz2 をダウンロードしました。私がダウンロードした時の最新版は 2016-06-03 作成の 12.1 GB のファイルでした。

このファイルを解凍すると以下のような XML が入っています。<text> タブの中身が本文です。

  <page>
    <title>Anarchism</title>
    <ns>0</ns>
    <id>12</id>
    <revision>
      <id>721573764</id>
      <parentid>719202660</parentid>
      <timestamp>2016-05-22T19:25:12Z</timestamp>
      <contributor>
        <username>PBS-AWB</username>
        <id>11989454</id>
      </contributor>
      <comment>modification to template Cite SEP and possibly some gen fixes using [[Project:AWB|AWB]]</comment>
      <model>wikitext</model>
      <format>text/x-wiki</format>
      <text xml:space="preserve">{{Redirect2|Anarchist|Anarchists|the fictional character|Anarchist (comics)|other uses|Anarchists (disambiguation)}}
{{pp-move-indef}}
{{Use British English|date=January 2014}}
{{Anarchism sidebar}}
'''Anarchism''' is a [[political philosophy]] that advocates [[self-governance|self-governed]] societies based on voluntary institutions.
(中略)
[[Category:Far-left politics]]</text>
      <sha1>sfoc30irh5k1bj62ubt29wp1ygxark0</sha1>
    </revision>
  </page>

この XML から本文だけ取り出す方法を色々探してみたところ、wp2txt というツールがあったので、今回はこちらを使わせてもらいました。

github.com

wp2txt は Ruby で書かれており、関連する gem のインストールに苦労したという記事も見かけたのですが、以下の手順で問題なく動作しました。

% mkdir wp2txt
% cd wp2txt
% bundle init
% echo 'gem "wp2txt"' >> Gemfile
% bundle install
% mkdir output_dir
% bundle exec wp2txt -i enwiki-latest-pages-articles.xml.bz2 -o enwiki --no-heading --no-title --no-marker

今回は本文だけが必要なので、余分な出力をなるべく減らすためのオプション(--no-heading, --no-title, --no-marker)を付けました。それでも、MediaWiki のマークアップなどは残ってしまうのですが、それは Spark で除去することにします。

wp2txt の処理が終わると、enwiki ディレクトリに enwiki-latest-pages-articles.xml-<連番>.txt というファイルが作成されます。今回は合計 1,754 ファイル、18 GB でした。ちなみに、私の環境(MacBook Pro 2.2 GHz, 16GB DDR3)では、実行に19時間くらいかかりました。

つまづいたこと

Wikipedia のデータからコーパスを作る方法を色々探したのですが、英語圏も含めて、簡単にできる方法がなかなか見つかりませんでした。

wp2txt 以外で良さそうなものとしては wiki2vec というツールが、コーパスを作る機能を持っているようでした。ただ、時間の都合で今回は試せませんでした。ちなみに、この記事を書くために読み返していて気付きましたが、wiki2vec のパラメータとして minCount = 50, vectorSize = 500, windowSize = 10 という例が載っていました。次はこれで試してみるのもよいかもしれません。

github.com

その後、Word2Vec 関係の記事を探しまわるなかで、以下の記事から wp2txt の存在に気づき、今回は wp2txt を使うことにしました。

techblog.gmo-ap.jp

コーパスの、S3 へのアップロード

最終的にやったこと

Apache EC2 上に構築した Spark Cluster に Wikipedia のファイルを渡さなければいけないのですが、データサイズが大きいので、今回は S3 経由で渡しました。

Spark の textFile メソッドは gzip 圧縮されたファイルも読み込めるので、まずは先程のファイルを圧縮します。圧縮後のファイルは 6 GB になりました。

% gzip enwiki/*.txt

そして、これを S3 にアップロードします。aws s3 cp コマンドはワイルドカードが使えないので、一括アップロード時には、ディレクトリ名を指定して --recursive を指定する必要があります。

% aws s3 cp enwiki s3://my-bucket-name/ --recursive

aws コマンドがない場合は、Installing the AWS Command Line Interface - AWS Command Line Interface に従ってインストールしてください。Mac の場合は pip でインストールします。

つまづいたこと

最初は、以下のように1ファイルにまとめてアップロードしたのですが、これだとファイル読み込みが全く分散されませんでした。

% cat enwiki/*.txt > enwiki.txt
% gzip enwiki.txt
% aws s3 cp enwiki.txt.gz s3://my-bucket-name/

今回は wp2txt がファイルを複数に分割してくれていましたが、他の方法でコーパスを作った場合も、ファイルを分けてアップロードしたほうがいいですね。

spark-submit で使う jar の作成

最終的にやったこと

いままでは spark-shell で Word2Vec を実行していましたが、データ量が増えると実行時間が長くなって、EC2 インスタンスへの接続が切れてしまう可能性が出てきます。そのため、Word2Vec を実行する簡単な jar を作って、spark-submit で実行することにします。

以下がそのコードです。

Simple Word2Vec application

Feature Extraction and Transformation にあるサンプルに、以下の修正を加えています。

  • コマンドライン引数から、読み込むファイル、モデルの出力先、Word2Vecのパラメータを設定
  • repartition メソッドを使って、RDD を分割(これをしないと読み込み後の処理が分散されない)
  • split する際の区切り文字に、スペース以外の文字も含めることで、MediaWiki 記法の文字を除去
  • filter で長さが 1 の文字を除去
  • 処理の最後で、実行時間を出力

jar をビルドしたい場合は、以下のリポジトリを使ってください。

github.com

sbt assembly を実行すると target/scala-2.11 ディレクトリに word2vec-model-generator-assembly-0.0.1.jar ができます。この jar ファイルだけ master ノードに持っていけば実行できます。

つまづいたこと

最初は repartition メソッドを使わなかったのですが、その場合、ファイルの読み込みは分散しても、その後の処理が途中まで全く分散されませんでした。repartition メソッドに渡す引数は scala - Spark: Repartition strategy after reading text file - Stack Overflow を参考にしました。

MediaWiki記法を除去するルールは、ローカルマシンで enwiki-latest-pages-articles.xml-0001.txt を処理して手探りで決めたのですが、もっと良い方法がありそうです。例えば、先ほど紹介した wiki2vec は、うまく処理していそうです("Word2Vec Corpus" の節を参照)。

Amazon EC2 への Spark クラスタの構築(5台構成)

最終的にやったこと

Part 1 でもクラスタを構築しましたが、それは1回削除して、Slave の台数とスペックを増やしたクラスタを作り直しました。ちなみに、spark-ec2 destory spark-cluster でクラスタを削除できます。

大規模データで Spark MLlib を試すのは初めてなので、手間取っている間にマシンが無駄に動いている……という可能性があったので(というか実際そうなったので)、少しケチって以下のスペックで構築しました。

  • master: r3.large ($0.185/hour, 2 vCPU, 15 GB memory, 1 x 32 SSD) 1台
  • slave: m3.2xlarge ($0.616/hour, 8 vCPU, 30 GB memory, 2 x 80 SSD) 5台

構築時のコマンドは以下の通りです。

./spark-ec2 \
--key-pair=word2vec-key-pair \
--identity-file=/Users/myoshiz/.ssh/word2vec-key-pair.pem \
--region=us-west-1 \
--zone=us-west-1a \
--master-instance-type=r3.large \
--instance-type=m3.2xlarge \
--copy-aws-credentials \
--hadoop-major-version=yarn \
--slaves 5 \
launch spark-cluster > spark-submit.log 2>&1 &

構築が完了したら、前回同様に .bash_profile に環境変数 EC2_SPARK_MASTER を設定します。また、今回は環境変数 AWS_ACCESS_KEY_ID と AWS_SECRET_ACCESS_KEY も設定します。これは、Spark アプリケーションから S3 にアクセスするために必要な設定です。

export EC2_SPARK_MASTER=ec2-xxx-xxx-xxx-xxx.us-west-1.compute.amazonaws.com
export AWS_ACCESS_KEY_ID={{ IAM ユーザ "word2vec-user" のアクセスキーID }}
export AWS_SECRET_ACCESS_KEY={{ IAM ユーザ "word2vec-user" のシークレットアクセスキー}}

つまづいたこと

Part 1 でも書きましたが、--copy-aws-credential を指定しても、Spark に対してはアクセスキーID、シークレットアクセスキーが設定されません。環境変数を設定したところ、Spark(spark-shell, spark-submit)から S3 にアクセスできるようになったので、今回はこの方法で済ませました。

なお、AWS のインスタンスプロファイルを使ってアクセス権限を与えることもできると思いますが、今回は試していません。クラスタを作ったり壊したりを繰り返していたため、そのたびに AWS マネジメントコンソールをいじるのは面倒だったので……。

spark-submit の実行

最終的にやったこと

先ほど作った jar を、master にコピーします。

% scp -i ~/.ssh/word2vec-key-pair.pem word2vec-model-generator-assembly-0.0.1.jar root@${EC2_SPARK_MASTER}:/root/

そして、以下のようにコマンドを実行すると、Word2Vec アプリケーションが実行されます。ssh 接続が切れた場合のために、バックグラウンドで実行し、標準出力はファイルに出力させておきます。

$ ./spark/bin/spark-submit \
--master spark://${EC2_SPARK_MASTER}:7077 \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.driver.memory=11g \
--conf spark.akka.frameSize=1024 \
--class jp.muziyoshiz.word2vec.Word2VecModelGenerator \
word2vec-model-generator-assembly-0.0.1.jar \
s3n://my-bucket-name/enwiki/*.txt.gz \
s3n://my-bucket-name/enwiki_model \
10 20 50 > spark-submit-wp8.log 2>&1 &

Spark から S3 にアクセスする際は、aws コマンドで指定するときの s3:// ではなくて、s3n:// を指定する必要がある点に注意です。

Word2Vec のパラメータを変えて、何度か実行してみたところ、実行時間は以下のようになりました。

Pattern No. numPartition numIteration minCount vectorSize 実行時間
1 10 1 5 10 75.0 min
2 10 1 20 20 82.6 min
3 10 1 20 50 130.6 min

最初のパターン1は、vectorSize をかなり減らしたにも関わらず時間がかかり、後述するようにモデルの精度もよくありませんでした。

そのため、パターン2では minCount(単語の最小出現回数)を大きくして、vocabSize(モデルに含まれる単語数)を減らしました。その結果、vectorSize をパターン1の2倍にしたにも関わらず、実行時間は1.1倍程度に収まり、モデルの精度も若干上がりました。

最後に、他のパラメータは同じままで vectorSize のみ50に増やしたところ、vectorSizeはパターン2の2.5倍で、実行時間は1.5倍になりました。

実行時間の内訳を Spark UI で確認したところ、処理の最後に slave 1台で実行するタスク(Locality Level NODE_LOCAL のタスク)があり、これが1〜2時間かかっていました。このタスクがボトルネックになっているということは、少なくとも Word2Vec については、Spark MLlib による分散処理のメリットって、もしかしてあまり無いとか……?

ただ、今回は numIteration を 1 で固定にしましたが、精度を上げるためには numPartition と同じ 10 まで上げたほうがよいはずです。numIteration を増やせば、分散処理されるタスクが占める割合も増えるので、Spark の恩恵が得られるのではないかと思います。それはまた今度試してみます。

つまづいたこと(1):ドライバのメモリ使用量を増やさないと落ちる

最初に実行したところ、以下のエラーが発生してジョブが止まりました。

16/06/25 09:39:46 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() for one-way message.
java.lang.IllegalStateException: unread block data

Spark UI の Environment タブで、Spark Properties を確認したところ、spark.executor.memory は m3.2xlarge(メモリ 30GB)に合わせて 26837m に指定されていたのですが、spark.driver.memory のほうは何も指定されていませんでした。spark.driver.memory のデフォルトは 1g です。

これを r3.large のメモリ 15GB から 4GB を引いた 11GB(11g)に指定したところ、このエラーは出なくなりました。また、Environment タブで、spark.driver.memory が指定されていることを確認できました。

つまづいたこと(2):vocabSize*vectorSize が大きすぎると落ちる

numPartition numIteration minCount vectorSize
10 1 5 100

Spark MLlib の vectorSize のデフォルト値は 100 なので、最初はこの値を使っていました。しかし、上記のパラメータの組み合わせで実行したところ、Stage 1 の処理の途中で以下のエラーが出て、タスクが止まってしまいました。

Exception in thread "main" java.lang.RuntimeException: Please increase minCount or decrease vectorSize in Word2Vec to avoid an OOM. You are highly recommended to make your vocabSize*vectorSize, which is 3856720*100 for now, less than `Int.MaxValue/8`.
        at org.apache.spark.mllib.feature.Word2Vec.fit(Word2Vec.scala:319)
(スタックトレースは省略)

Int.MaxValue/8 = 268435455 です。つまり、vocabSize(単語数)*vectorSize がこの上限を大幅に超えていることが原因のようです。Int の最大値の8分の1ってなんでまた……。

とにかく、単語数を減らすか、vectorSize を減らす必要があることがわかりました。そのため、これ以降のテストでは vectorSize を減らすパターン(パターン1)と、minCount を増やして単語数を減らすパターン(パターン2〜3)を試しました。

つまづいたこと(3):モデルのサイズが大きすぎると akka のフレームサイズ上限を超えて落ちる

Word2Vec.fit() の最後、生成された Word2Vec モデルを parquet 形式で出力するところで、以下のエラーが出て落ちました。

16/06/25 11:28:06 ERROR InsertIntoHadoopFsRelation: Aborting job.
org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 1793:0 was 213710572 bytes, which exceeds max allowed: spark.akka.frameSize (134217728 bytes) - reserved (204800 bytes). Consider increasing spark.akka.frameSize or using broadcast variables for large values.

spark.akka.frameSize のデフォルトは 128(単位は MB)で、タスクのサイズがこれを超えると落ちるようです。小さいデータセットでは出なかったエラーなのですが、Wikipedia 規模になると出るようです。1〜2時間待ったあとで、最後の最後にこのエラーで落ちると、非常に(精神的にも金銭的にも)痛いです……。

設定可能な上限値は調べても分かりませんでしたが、ひとまず --conf spark.akka.frameSize=1024 を指定して 1GB にしたところ、Word2Vec モデルの出力まで成功しました。

ローカルマシン上での Word2Vec モデルの利用

最終的にやったこと

先ほどの spark-submit の実行により、Word2Vec モデルが S3 にアップロードされました。このモデルをローカルマシンにダウンロードして使ってみます。

% aws s3 cp s3://my-bucket-name/enwiki_model ./enwiki_model --recursive

以下のコマンドで spark-shell を起動します。Part 1 で使ったオプションに加えて、--conf spark.kryoserializer.buffer.max=1g を指定しています。

% spark-shell --master local \
--conf spark.driver.memory=5g \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.kryoserializer.buffer.max=1g

そして、spark-shell のプロンプトで以下を実行し、Word2Vec モデルをロードします。

import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}

val model = Word2VecModel.load(sc, "enwiki_model")

また、今回は Apache Spark 入門に書かれたメソッドを使って、関係性を加味した推測を行ってみます。有名な例で言うと、「"king" に対する "kings" は、"queen" に対する何か?」という関係を、Word2Vecモデルから推測することができます。

以下は、Apache Spark 入門の p.224 から抜粋したコードです。これを spark-shell に貼り付けるか、ファイルに書いておいて :load <file name> でロードして使います。

import org.apache.spark.mllib.linalg.Vectors

def relationWords(w1: String, w2: String, target: String, model: Word2VecModel) :Array[(String, Double)] = {
    val b = breeze.linalg.Vector(model.getVectors(w1))
    val a = breeze.linalg.Vector(model.getVectors(w2))
    val c = breeze.linalg.Vector(model.getVectors(target))
    val x = c + (a - b)
    model.findSynonyms(Vectors.dense(x.toArray.map(_.toDouble)), 10)
}

パターン1:minCount = 5, vectorSize = 10

ベクトル数が小さいためか、精度はかなり悪いです。Tokyoという都市名、Japanという国名に対する類似語を求めても、上位10件にそれらしいものが現れません。

scala> model.findSynonyms("Tokyo", 10).foreach(println)
(Rebounder,5.026514312082014)
(Fivepenny,5.006271473525809)
(Riviera,4.9806280562664655)
(Pirmahal,4.977896311409738)
(A2217,4.973896329049228)
(Pestújhely,4.967955406306887)
(Tri,4.966647609406325)
(Cigarros,4.966214313196464)
(Seahorses,4.9657892250050715)
(Club,4.965424934604451)

scala> model.findSynonyms("Japan", 10).foreach(println)
(Prabda,3.8591253451462766)
(Skateabout,3.789246081518729)
(detailslink,3.756286768742609)
(Oceania,3.7439580152901946)
(Daeges,3.743037606956309)
(Equestrianism,3.73990681262581)
(Miegs,3.7392088293670396)
(Fleuth,3.735308547592705)
(KBID-LP,3.730579527776324)
(Powerlifting,3.717090309581691)

関係性を加味した推測も、以下のようにうまく行きませんでした。

scala> relationWords("king", "kings", "queen", model).foreach(println)
(satsuma-biwa,4.95347264322314)
(shoguns,4.93869343414127)
(mystics,4.931215483461304)
(Zelimxan,4.925167012454619)
(Christianized,4.922235458369835)
(veneration,4.921893688910249)
(Shi’i,4.921205040607001)
(Russified,4.917586471812209)
(pagan,4.912822109308089)
(revered,4.911351827558269)

scala> relationWords("prince", "king", "princess", model).foreach(println)
(Pandava,4.2101984410814834)
(Aegisthus,4.207452272387961)
(bandit,4.202362575975742)
(amanuensis,4.194580140364399)
(Aerope,4.188601884423512)
(tradesman,4.178661804898081)
(Candaules,4.177194064593601)
(princess,4.173209621638307)
(Shoulang,4.165125455530385)
(Seibei,4.163678291883964)

パターン2:minCount = 20, vectorSize = 20

単語数を減らし、ベクトル数を上げた結果、精度が若干向上しました。Tokyo に対する類義語として、日本の都市の Osaka、Sapporo が出てくるようになりました。一方で、Japan に対する類義語のほうは、あまり改善が見られません。

scala> model.findSynonyms("Tokyo", 10).foreach(println)
(Wrestle,7.689458080058069)
(Split,7.626499879518354)
(Osaka,7.620597049534027)
(Sapporo,7.556529623946273)
(Setagaya,7.513748270603075)
(Hiroshima,7.490792005499523)
(Shinjuku,7.45951304352636)
(Kanazawa,7.459122453399323)
(Expo,7.453010168798164)
(ESCOM,7.447874763780933)

scala> model.findSynonyms("Japan", 10).foreach(println)
(Tokyo,5.679376270328159)
(AXN,5.640570343734289)
(Wrestle,5.60396135079362)
(Expo,5.590382781259281)
(TV2,5.522196857434101)
(Hanoi,5.495135749493573)
(TV6,5.490184062697079)
(Kyoto,5.486577183328772)
(Skate,5.4760554670281065)
(Benelux,5.430530293625971)

関係性を加味した推測は、まだあまりうまくいきません。ただ、後者のほうは5位に正解の "queen" が出ているので、若干精度が向上しています。

scala> relationWords("king", "kings", "queen", model).foreach(println)
(pagan,6.667731329068959)
(garb,6.659426546093454)
(gods,6.648366573398432)
(symbolised,6.648168276539841)
(sacred,6.6085783714277975)
(personages,6.598811565877372)
(veneration,6.597536687593547)
(puranas,6.590383098194837)
(deities,6.588936982768422)
(beauties,6.588806331810932)

scala> relationWords("prince", "king", "princess", model).foreach(println)
(lord,6.574825899196509)
(princess,6.522661208674787)
(bride,6.521167177599623)
(lady,6.492377997870626)
(queen,6.479450084505509)
(first-born,6.466189456944019)
(king,6.441766970616445)
(blessed,6.441764119985444)
(beloved,6.4396910737789606)
(bridegroom,6.423838321417851)

パターン3:minCount = 20, vectorSize = 50

ベクトル数を大幅に増やした結果、かなり精度が向上しました。Tokyo に対して日本の都市、Japan に対してアジアの国名や首都が表示されるようになりました。

scala> model.findSynonyms("Tokyo", 10).foreach(println)
(Osaka,6.442472711716078)
(Fukuoka,6.3918200759436)
(Saitama,6.343209033208874)
(Setagaya,6.237343626467007)
(Japan,6.063812875793321)
(Sapporo,6.027676167552773)
(Nagano,5.955215285602899)
(Kobe,5.891646194480255)
(Yamagata,5.86912171881318)
(Shibuya,5.835765966270005)

scala> model.findSynonyms("Japan", 10).foreach(println)
(Tokyo,5.510337298616405)
(Korea,5.509610108188756)
(China,5.486622516556292)
(Fukuoka,5.378651363703807)
(Taiwan,5.377869828524535)
(Seoul,5.321357314331263)
(Shizuoka,5.31678565272272)
(Prefecture,5.297746506109964)
(Hamamatsu,5.159312705112953)
(Kanagawa,5.157422752148916)

関係性を加味した推測も、かなり精度が向上しました。前者は正解の "queens" が2位、後者も正解の "queen" が2位に表示されています。イテレーション数(numIteration)を増やすなど、更にパラメータを調整すれば、これらの単語が1位に上がることが期待できます。

scala> relationWords("king", "kings", "queen", model).foreach(println)
(realms,6.380416904839411)
(queens,6.292521776188793)
(knightly,6.2558567330155626)
(consorts,6.241017073100756)
(kings,6.200374546691251)
(kindreds,6.17249501613232)
(lamas,6.1721177720161915)
(monuments,6.147651372785442)
(patrilineal,6.1288029631730545)
(depictions,6.121416883901753)

scala> relationWords("prince", "king", "princess", model).foreach(println)
(princess,5.956775488416378)
(queen,5.9055082324742685)
(slaying,5.793197818446893)
(king’s,5.696965618712307)
(betrothal,5.59067630474941)
(goddess,5.58159904439838)
(apparition,5.554027664552106)
(martyrdom,5.534826668619817)
(Pelops,5.503355785910461)
(ancestress,5.4953139994512545)

つまづいたこと

spark-shell の起動時に --conf spark.kryoserializer.buffer.max=1g を指定しないと、Word2VecModel.load() の呼び出しで落ちました。

16/06/25 22:00:26 WARN TaskSetManager: Lost task 1.0 in stage 2.0 (TID 3, localhost): TaskKilled (killed intentionally)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, localhost): org.apache.spark.SparkException: Kryo serialization failed: Buffer overflow. Available: 1, required: 4. To avoid this, increase spark.kryoserializer.buffer.max value.
    at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:299)
(スタックトレースは省略)

あと、余談ですが、zsh を使っていると --master local[*] の指定ができないようです。以下のようなエラーが出ます。bash なら指定できるので、spark-shell の実行時だけ bash に切り替えました。

% spark-shell --master local[*] \
--conf spark.driver.memory=5g \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.kryoserializer.buffer.max=1g
zsh: no matches found: local[*]

Part 2 のまとめ

Wikipedia の大規模データを、Amazon EC2 上に構築した Spark Cluster で処理できました。また、実際にいろいろなパラメータを試してみて、大規模データを処理する際に注意すべき Spark のプロパティや、Word2Vec のパラメータを把握することができました。

今回生成した Word2Vec のモデルはまだあまり精度が良くありませんが、以下の方針で精度を上げることができそうです。

  • minCount を大きくして、単語数(vocabSize)を小さくする
  • vectorSize を増やす
  • numIteration を numPartition に近づける
  • 元データからノイズ(MediaWiki記法など)を除去する方法を改善する

実際に試してみて、Word2Vec の場合は slave 1台で実行される処理がボトルネックになっていることがわかりました。Amazon EC2 の高いマシンを借りているのに、slave 1台だけが頑張っていて、残りの4台は遊んでいる、というのはなかなか焦ります。

Slave 1台しか動かない時間を短くする方法としては、vocabSize*vectorSize を小さくするしかないのですかね? 実際のアプリで Word2Vec を使う際には、Spark MLlib での処理の前に、そのアプリで使わない単語を除去してしまって単語数を大幅に減らしておく、などの対策が必要かもしれません。

Part 2 の主な参考文献

手を動かす Spark MLlib & Word2Vec Part 1 (spark-ec2 でクラスタを構築するまで)

f:id:muziyoshiz:20160626223709p:plain

このシリーズについて

機械学習系のツールを全然使ったことがなかったので、勉強のためになにか1つ選んで、実際に手を動かしてみることにしました。マシンを並べて負荷分散することを想定して、まずは Spark MLlib を選びました。

このシリーズでは、Amazon EC2 上に構築した Spark Cluster (Standalone Mode) で、Wikipedia のデータから Word2Vec のモデルを作るところまでの方法を解説していきます。ただ、実際やってみてわかったのですが、Spark 自体、Spark MLlib の Word2Vec クラス、およびクラスタ構築に使った spark-ec2 に設定項目が多いせいで、細かいところで何度も何度もつまづきました……。

そのため、このシリーズでは各ステップについて、「最終的にやったこと」と、その最終的なやり方にたどり着くまでに「つまづいたこと」を分けました。やり方を知りたいだけの場合は「最終的にやったこと」の方だけ読んでください。「つまづいたこと」は、うまく行かなかった場合のための参考情報です。

Part 1 の範囲

Amazon EC2 に master 1台、slave 3台構成の Spark Cluster (Standalone mode) を構築し、spark-shell から Word2Vec を実行するところまで。

Spark をローカル環境(Mac)にインストールする

最終的にやったこと

まず、ローカル環境で Spark MLlib が動くかどうかを試してみました。環境は以下の通りです。

  • MacBook Pro (Retina, 15-inch, Mid 2014)
  • OS: OS X Yosemite 10.10.5
  • CPU: 2.2 GHz Intel Core i7
  • メモリ: 16GB 1600 MHz DDR3

OS X に Spark をインストールする場合、以下のコマンドだけでインストールできます(参考:ApacheSpark — BrewFormulas)。

% brew update
% brew install apache-spark

私が試した時点では Spark 1.6.1 でした。Java は Java 8 です。

% spark-shell --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.1
      /_/

Type --help for more information.

つまづいたこと

spark-shell ローカルモードで(--master local を指定して)実行すると、spark> というプロンプトが表示されるまでに、色々と WARN が出ます。

16/06/07 00:17:36 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)

BoneCP は JDBC Connection Pool ライブラリの名前です。scala - What do WARN messages mean when starting spark-shell? - Stack Overflow によると、ローカルモードで実行しているときは問題ないとのこと。

16/06/07 00:17:38 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
16/06/07 00:17:38 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException

こちらも、Hive metastore に接続できないことを表す WARN なので、ローカルモードでは関係ないと判断しました。

ローカル環境での Word2Vec の実行

最終的にやったこと

Spark MLlib のページ(Feature Extraction and Transformation)に、Spark MLlib に含まれる Word2Vec クラスを使ったサンプルコードがあります。これをローカルモードで実行してみます。

まず、サンプルコードで使っている text8.zip をダウンロードして、解凍します。これは、スペースで区切られた英単語が羅列された(意味のある文章ではない)100 MB のテキストファイルです。

% wget http://mattmahoney.net/dc/text8.zip
% unzip text8.zip
% ls -la text8
-rw-r--r--@ 1 myoshiz  staff  100000000  6  9  2006 text8

この text8 を置いたディレクトリで、以下のコマンドを実行します。spark.driver.memory はドライバのメモリ使用量を表すオプションで、デフォルトは 1g (1GB)です。

% spark-shell --master local \
--conf spark.driver.memory=5g \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer

spark-shell のプロンプトで、Word2Vec のサンプルコード を入力すれば、myModelPath ディレクトリ以下に、Word2Vec のモデルデータが生成されます。

なお、spark-shell の起動中は http://localhost:4040/ にアクセスすることで、ジョブの状態を確認できます。

つまづいたこと

最初は --conf spark.driver.memory=5g" を指定せずに spark-shell を起動していました。その状態でword2vec.fit(input)` を実行すると、OutOfMemoryError で spark-shell が落ちます。私の環境では、ファイルが 100MB だと落ちて、80MB まで減らすと落ちない、という状態でした。

scala> val model = word2vec.fit(input)
[Stage 0:>                                                          (0 + 1) / 3]
Exception in thread "refresh progress" java.lang.OutOfMemoryError: GC overhead limit exceeded
    at scala.StringContext.s(StringContext.scala:90)
(スタックトレース、および後続のエラーは省略)

エラーメッセージをもとに調べたところ、JVM の設定が悪いような情報をいくつか見かけました。

しかし、これを指定してもエラーメッセージは変わりませんでした。というか、私は Java 8 で実行していたので、そもそもこの設定には意味がありませんでした。

この Java の仕様変更を踏まえて、以下のように spark-shell を実行したところ、落ちなくなりました。ただし、この方法だと、OutOfMemoryError が出ないだけで、いつまでも処理が終わらないという状態になってしまいました……。

% SPARK_REPL_OPTS="-XX:MaxMetaspaceSize=1024m" spark-shell --master local

結局、Configuration に載っているメモリ関係のパラメータを一通り確認して、前述の spark.driver.memory を増やしたところ、うまく動いたようで、処理が完了しました。JVM のパラメータを変更する必要はなかったようです。

Amazon EC2 への Spark クラスタの構築(spark-ec2 を使った方法)

最終的にやったこと

Slave の台数を増やすことで、Spark MLlib の実行時間が短くなることを確認するために、Spark クラスタを構築しました。今回は Spark に同梱されている spark-ec2 というスクリプトを使って構築しました。このスクリプトの説明は Running Spark on EC2 - Spark 1.6.1 Documentation にあります。

Amazon Elastic MapReduce (EMR) で Spark を使えることは知っていますが、いずれオンプレに Spark クラスタを構築したかったのと、かといってマシンスペックを何パターンか試すときに手作業での構築は大変すぎたので spark-ec2 を使いました。

まず、AWS のマネジメントコンソールを使って、以下の設定を行います。今回は Spark の話がメインなので、AWS の設定の詳細は省略します。

  • IAM ユーザ "word2vec-user" の作成
  • IAM ユーザ "word2vec-user" に対する "AdministratorAccess" ポリシーのアタッチ(EC2 と S3 に絞っても良い)
  • EC2 でのキーペア "word2vec-key-pair" の作成
  • ローカルマシンに対する環境変数 AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY の設定
  • ローカルマシンに対するキーペアの配置(以下では /Users/myoshiz/.ssh/word2vec-key-pair.pem に置いたと仮定)

brew で Spark をインストールすると、spark-ec2 は入っていません。そのため、spark-ec2 を使うために、Apache Spark のダウンロードページ から zip ファイルをダウンロードします。今回は、以下のファイルを選択しました。

  • Spark release: 1.6.1
  • Package type: Pre-built for Hadoop 2.6 and later

ダウンロードした zip ファイルを解凍すると、ec2 ディレクトリに spark-ec2 というスクリプトが入っています。このディレクトリに移動し、まずは master 1台、slave 3台のクラスタを構築してみます。そのためには、以下のコマンドを実行します。

% ./spark-ec2 \
--key-pair=word2vec-key-pair \
--identity-file=/Users/myoshiz/.ssh/word2vec-key-pair.pem \
--region=us-west-1 \
--zone=us-west-1a \
--instance-type=m4.large \
--copy-aws-credentials \
--hadoop-major-version=yarn \
--slaves 3 \
launch spark-cluster

各オプションの意味と、上記の値を指定した理由は以下の通りです。

  • --region は、デフォルトはバージニア北部(us-east-1)が使われる。国内だとインスタンス利用費が若干高く、東海岸は遠いので、北カリフォルニア(us-west-1)を指定した。
  • --instance-type は、デフォルトでは m1.large が使われる。m1.large は古いインスタンスタイプのため、スペックに比して割高のため、同じく 2 vCPU、メモリ8GBの m4.large を指定。調べた時点では m1.large が $0.19/hour、m4.large が $0.14/hour だった。
  • --copy-aws-credentials を指定すると、環境変数に設定された AWS のアクセスキーが、master の hadoop にも設定される。ただし、後述の通り Spark に対しては設定されないので、hadoop コマンドを使わないなら、指定しなくても良い。
  • --hadoop-major-version=yarn は、使用する Hadoop のバージョンを指定する。今回は Pre-built for Hadoop 2.6 and later をダウンロードしているので、yarn を指定する必要がある。デフォルトは 1(Hadoop 1.0.4)。
  • --slaves は slave の台数を指定する。

10〜20分待つとクラスタの構築が完了し、以下のようなメッセージが表示されます。Mac から以下の URL にアクセスすると、Spark UI や、Ganglia の画面を確認できます。

Spark standalone cluster started at http://ec2-xxx-xxx-xxx-xxx.us-west-1.compute.amazonaws.com:8080
Ganglia started at http://ec2-xxx-xxx-xxx-xxx.us-west-1.compute.amazonaws.com:5080/ganglia
Done!

上記のホスト名は、以降の作業でも使うので、以下のように環境変数に設定しておきます。シェルの設定ファイル(.bash_profile とか)で指定してもいいですが、クラスタを作るたびにホスト名が変わる点だけは注意が必要です。

% export EC2_SPARK_MASTER=ec2-xxx-xxx-xxx-xxx.us-west-1.compute.amazonaws.com

つまづいたこと(1):GitHub の spark-ec2

brew で spark をインストールすると、そのなかには spark-ec2 が入っていません。そのため、このスクリプトだけ別に入手できないかと思い、GitHub で公開されている spark-ec2 を clone して実行してみました。

github.com

この spark-ec2 を実行すると、エラーも出ずに最後まで処理が進むのですが、Spark クラスタが起動しないようです。http://ec2-xxx-xxx-xxx-xxx.us-west-1.compute.amazonaws.com:8080 にアクセスしても応答がなく、spark-shell で --master spark://ec2-xxx-xxx-xxx-xxx.us-west-1.compute.amazonaws.com:7077 を指定しても接続できない、という状態になりました。

色々悩んだ結果、大人しく Apache Spark のダウンロードページ から zip ファイルをダウンロードして、そのなかの spark-ec2 を使ったところ、実行したコマンドの引数は同じにも関わらず、クラスタが起動しました。

GitHub 版も見た目はきちんと動いているように見えたために、他に原因があると思い込んでしまい、この問題で数日詰まってしまいました……。

つまづいたこと(2):--hadoop-major-version=yarn の指定

このオプションは、公式サイトの Running Spark on EC2 には書かれていません。しかし spark-ec2 --help を実行すると、以下のオプションが表示されます。

  --hadoop-major-version=HADOOP_MAJOR_VERSION
                        Major version of Hadoop. Valid options are 1 (Hadoop
                        1.0.4), 2 (CDH 4.2.0), yarn (Hadoop 2.4.0) (default:
                        1)

上記のオプションを指定しないと、クラスタの構築後に spark-shell を実行した時に、以下のようなエラーが出て sqlContext の初期化に失敗しました。

16/06/13 14:08:02 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MResourceUri" is tagged as "embedded-only" so does not have its own datastore table.
java.lang.RuntimeException: java.io.IOException: Filesystem closed
    at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522)
(中略)
<console>:16: error: not found: value sqlContext
         import sqlContext.implicits._
                ^
<console>:16: error: not found: value sqlContext
         import sqlContext.sql
                ^

Spark クラスタでの Word2Vec の実行

最終的にやったこと

spark-ec2 の login コマンドを使用すると、master にログインできます。もちろん ssh でもログインできますが、master のホスト名を書かなくてよいのがメリットだと思います。ちなみに、オプションの指定が面倒ですが、以下の3つは必須のようです。

% ./spark-ec2 \
--key-pair=word2vec-key-pair \
--identity-file=/Users/myoshiz/.ssh/word2vec-key-pair.pem \
--region=us-west-1 \
login spark-cluster

次に、先ほどと同じサンプルコードを実行するために、text8.zip をダウンロードします。また、このファイルを、クラスタ上で動作する HDFS にアップロードします。これは、ファイルを slave からアクセス可能にするための作業です。後ほど、HDFS の代わりに S3 を使う方法も紹介します。

$ wget http://mattmahoney.net/dc/text8.zip
$ unzip text8.zip
$ ./ephemeral-hdfs/bin/hadoop fs -put text8 /

ここまでの準備が終わったら、master 上で spark-shell を実行します。指定するオプションは以下の通りです。ローカルモードの場合とは、--master の指定が変わっています。

$ ./spark/bin/spark-shell \
--master spark://${EC2_SPARK_MASTER}:7077 \
--conf spark.driver.memory=5g \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer

ホスト名の指定が面倒ですが、--master spark://localhost:7077 という指定では接続できませんでした。

あとは、spark-shell で以下のように入力すると、Word2Vec が実行されます。ローカルモードとの違いは、textFile() や save() に渡されたファイルパスが、HDFS のファイルパスとして扱われることです。今回はルート直下に text8 を置いたため、/text8 のように指定しています。

import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}

val input = sc.textFile("/text8").map(line => line.split(" ").toSeq)

val word2vec = new Word2Vec()

val model = word2vec.fit(input)

val synonyms = model.findSynonyms("china", 40)

for((synonym, cosineSimilarity) <- synonyms) {
  println(s"$synonym $cosineSimilarity")
}

// Save and load model
model.save(sc, "/model_text8")
val sameModel = Word2VecModel.load(sc, "/model_text8")

以上により、Word2Vec のジョブが slave 上で実行されます。ただ、http://ec2-xxx-xxx-xxx-xxx.us-west-1.compute.amazonaws.com:8080 にアクセスするとわかるのですが、このままだと3台ある slave のうち、1台しか使われません。次は、ジョブを分散するために、Word2Vec のパラメータを変更します。

つまづいたこと

最初、ローカルディスクのファイルにアクセスできないことに気づきませんでした。file:// を付けても駄目でした。

次に、HDFS 上にファイルをアップロードする方法で悩んだのですが、これは ./ephemeral-hdfs/bin 以下のコマンドが使えることに気付いたあとは簡単でした。hadoop コマンドに馴染みのない人は、Apache Hadoop 2.7.1 – などが参考になると思います。

すべての slave に処理が分散されることの確認(Word2Vec のパラメータ変更)

最終的にやったこと

Word2Vec のパラメータは、Word2Vec クラスの setter で指定できます。用意された setter とそのデフォルト値は Word2Vec の API リファレンス に記載されています。

これらの setter のうち、setNumPartitions() でパーティション数を1よりも大きくすると、複数の slave 間で処理が分散されます。この値のデフォルトが1なので、そのままでは slave が1台しか使われません。

val word2vec = new Word2Vec()

// Set this
word2vec.setNumPartitions(4)

val model = word2vec.fit(input)

slave 3台で試したところ、パーティション数を4まで増やした段階で、すべてのslaveに処理が分散されました。ただ、5台で試したときには、パーティション数を6にしても、slave 4台しか使われませんでした。単純に slave の台数 + 1 にすればよいというわけではなさそうで、詳細はまだわかりませんが、少なくとも slave の台数よりも大きい数を指定する必要がありそうです。

ただ、このパーティション数を増やすと、増やした分だけ負荷分散されて処理時間が短くなっていく一方で、計算結果の正確さも落ちていくとのことです。Word2Vec の処理が分散しない理由を調べている際に、以下の情報を見かけました。

stackoverflow.com

  • イテレーションの数は、パーティション数と同じか、それ以下にすべき
  • 正確さのために、パーティション数は小さい値を使うべき
  • 結果(モデル)を正確にするためには、複数のイテレーションが必要

どれくらい結果が変わっていくのか、text8 を3台の slave 上で処理して調べてみました。以下は、numPartition = 1, 3, 6 での、"china" に類似した単語の上位10件です。パーティションが1個の場合の上位3件を太字にしています。text8 は意味のない文字列ですが、結果が変わっていく様子は参考になると思います。

numPartitions 1 3 6
1位 taiwan taiwan indonesia
2位 korea korea taiwan
3位 japan japan afghanistan
4位 mongolia mainland kazakhstan
5位 shanghai indonesia pakistan
6位 tibet india japan
7位 republic pakistan ireland
8位 india mongolia india
9位 manchuria thailand uzbekistan
10位 thailand africa iran

ちなみに、使われた slave の台数と、処理時間の関係は以下のようになりました。text8 くらいのデータ量(100 MB)だと、おおよそ、使われる slave の台数に応じて大きく処理時間が減るようです。

numPartitions 1 3 6
slave の台数 1 2 3
処理時間 6.3 min 3.6 min 1.9 min

つまづいたこと

負荷分散しない理由が Word2Vec のパラメータの方にある、ということが最初なかなか分からずに苦労しました。

普通に考えると「Spark MLlib から Word2Vec を使いたい人=負荷分散を期待している人」だから、Word2Vec のパラメータの初期値は負荷分散するようになっているはずだ(だから Spark のパラメータの方に問題があるはずだ)と思い込んでいました……。

Part 1 のまとめ

ここまでの手順で、Amazon EC2 上に Spark クラスタを構築する方法を確認できました。

次の Part 2 では、この Spark クラスタの slave の台数およびマシンスペックを強化し、処理するデータ量も Wikipedia 英語版(Gzip 圧縮した状態で 12 GB)まで増やしてみます。

Part 1 の主な参考文献