無印吉澤

Site Reliability Engineering(SRE)、ソフトウェア開発、クラウドコンピューティングなどについて、吉澤が調べたり試したことを書いていくブログです。

手を動かす Spark MLlib & Word2Vec Part 2 (Wikipedia 英語版から Word2Vec モデルを作るまで)

f:id:muziyoshiz:20160626223709p:plain

このシリーズについて

実際に手を動かして Spark MLlib に慣れていこう、というシリーズです。

Spark を使うならそれなりに大きなデータを分散処理しないと面白くないと思い、Wikipedia のデータから Word2Vec のモデルを作るところまでやってみました。環境構築については Part 1 をご参照ください。

muziyoshiz.hatenablog.com

Part 2 の範囲

Wikipedia 英語版のデータから作成したコーパスを Amazon EC2 上の Spark Cluster で処理して、Word2Vec のモデルを作成するところまで。

Wikipedia 英語版のデータからコーパス作成

最終的にやったこと

Wikipedia:Database download から辿って、https://dumps.wikimedia.org/enwiki/ (HTTP) または Data dump torrents (BitTorrent) からダウンロードできます。

Wikipedia のデータは XML および SQL で公開されており、それぞれ色々ファイルがあります。今回は Wikipedia の本文からコーパスを作りたいので enwiki-latest-pages-articles.xml.bz2 をダウンロードしました。私がダウンロードした時の最新版は 2016-06-03 作成の 12.1 GB のファイルでした。

このファイルを解凍すると以下のような XML が入っています。<text> タブの中身が本文です。

  <page>
    <title>Anarchism</title>
    <ns>0</ns>
    <id>12</id>
    <revision>
      <id>721573764</id>
      <parentid>719202660</parentid>
      <timestamp>2016-05-22T19:25:12Z</timestamp>
      <contributor>
        <username>PBS-AWB</username>
        <id>11989454</id>
      </contributor>
      <comment>modification to template Cite SEP and possibly some gen fixes using [[Project:AWB|AWB]]</comment>
      <model>wikitext</model>
      <format>text/x-wiki</format>
      <text xml:space="preserve">{{Redirect2|Anarchist|Anarchists|the fictional character|Anarchist (comics)|other uses|Anarchists (disambiguation)}}
{{pp-move-indef}}
{{Use British English|date=January 2014}}
{{Anarchism sidebar}}
'''Anarchism''' is a [[political philosophy]] that advocates [[self-governance|self-governed]] societies based on voluntary institutions.
(中略)
[[Category:Far-left politics]]</text>
      <sha1>sfoc30irh5k1bj62ubt29wp1ygxark0</sha1>
    </revision>
  </page>

この XML から本文だけ取り出す方法を色々探してみたところ、wp2txt というツールがあったので、今回はこちらを使わせてもらいました。

github.com

wp2txt は Ruby で書かれており、関連する gem のインストールに苦労したという記事も見かけたのですが、以下の手順で問題なく動作しました。

% mkdir wp2txt
% cd wp2txt
% bundle init
% echo 'gem "wp2txt"' >> Gemfile
% bundle install
% mkdir output_dir
% bundle exec wp2txt -i enwiki-latest-pages-articles.xml.bz2 -o enwiki --no-heading --no-title --no-marker

今回は本文だけが必要なので、余分な出力をなるべく減らすためのオプション(--no-heading, --no-title, --no-marker)を付けました。それでも、MediaWiki のマークアップなどは残ってしまうのですが、それは Spark で除去することにします。

wp2txt の処理が終わると、enwiki ディレクトリに enwiki-latest-pages-articles.xml-<連番>.txt というファイルが作成されます。今回は合計 1,754 ファイル、18 GB でした。ちなみに、私の環境(MacBook Pro 2.2 GHz, 16GB DDR3)では、実行に19時間くらいかかりました。

つまづいたこと

Wikipedia のデータからコーパスを作る方法を色々探したのですが、英語圏も含めて、簡単にできる方法がなかなか見つかりませんでした。

wp2txt 以外で良さそうなものとしては wiki2vec というツールが、コーパスを作る機能を持っているようでした。ただ、時間の都合で今回は試せませんでした。ちなみに、この記事を書くために読み返していて気付きましたが、wiki2vec のパラメータとして minCount = 50, vectorSize = 500, windowSize = 10 という例が載っていました。次はこれで試してみるのもよいかもしれません。

github.com

その後、Word2Vec 関係の記事を探しまわるなかで、以下の記事から wp2txt の存在に気づき、今回は wp2txt を使うことにしました。

techblog.gmo-ap.jp

コーパスの、S3 へのアップロード

最終的にやったこと

Apache EC2 上に構築した Spark Cluster に Wikipedia のファイルを渡さなければいけないのですが、データサイズが大きいので、今回は S3 経由で渡しました。

Spark の textFile メソッドは gzip 圧縮されたファイルも読み込めるので、まずは先程のファイルを圧縮します。圧縮後のファイルは 6 GB になりました。

% gzip enwiki/*.txt

そして、これを S3 にアップロードします。aws s3 cp コマンドはワイルドカードが使えないので、一括アップロード時には、ディレクトリ名を指定して --recursive を指定する必要があります。

% aws s3 cp enwiki s3://my-bucket-name/ --recursive

aws コマンドがない場合は、Installing the AWS Command Line Interface - AWS Command Line Interface に従ってインストールしてください。Mac の場合は pip でインストールします。

つまづいたこと

最初は、以下のように1ファイルにまとめてアップロードしたのですが、これだとファイル読み込みが全く分散されませんでした。

% cat enwiki/*.txt > enwiki.txt
% gzip enwiki.txt
% aws s3 cp enwiki.txt.gz s3://my-bucket-name/

今回は wp2txt がファイルを複数に分割してくれていましたが、他の方法でコーパスを作った場合も、ファイルを分けてアップロードしたほうがいいですね。

spark-submit で使う jar の作成

最終的にやったこと

いままでは spark-shell で Word2Vec を実行していましたが、データ量が増えると実行時間が長くなって、EC2 インスタンスへの接続が切れてしまう可能性が出てきます。そのため、Word2Vec を実行する簡単な jar を作って、spark-submit で実行することにします。

以下がそのコードです。

Simple Word2Vec application

Feature Extraction and Transformation にあるサンプルに、以下の修正を加えています。

  • コマンドライン引数から、読み込むファイル、モデルの出力先、Word2Vecのパラメータを設定
  • repartition メソッドを使って、RDD を分割(これをしないと読み込み後の処理が分散されない)
  • split する際の区切り文字に、スペース以外の文字も含めることで、MediaWiki 記法の文字を除去
  • filter で長さが 1 の文字を除去
  • 処理の最後で、実行時間を出力

jar をビルドしたい場合は、以下のリポジトリを使ってください。

github.com

sbt assembly を実行すると target/scala-2.11 ディレクトリに word2vec-model-generator-assembly-0.0.1.jar ができます。この jar ファイルだけ master ノードに持っていけば実行できます。

つまづいたこと

最初は repartition メソッドを使わなかったのですが、その場合、ファイルの読み込みは分散しても、その後の処理が途中まで全く分散されませんでした。repartition メソッドに渡す引数は scala - Spark: Repartition strategy after reading text file - Stack Overflow を参考にしました。

MediaWiki記法を除去するルールは、ローカルマシンで enwiki-latest-pages-articles.xml-0001.txt を処理して手探りで決めたのですが、もっと良い方法がありそうです。例えば、先ほど紹介した wiki2vec は、うまく処理していそうです("Word2Vec Corpus" の節を参照)。

Amazon EC2 への Spark クラスタの構築(5台構成)

最終的にやったこと

Part 1 でもクラスタを構築しましたが、それは1回削除して、Slave の台数とスペックを増やしたクラスタを作り直しました。ちなみに、spark-ec2 destory spark-cluster でクラスタを削除できます。

大規模データで Spark MLlib を試すのは初めてなので、手間取っている間にマシンが無駄に動いている……という可能性があったので(というか実際そうなったので)、少しケチって以下のスペックで構築しました。

  • master: r3.large ($0.185/hour, 2 vCPU, 15 GB memory, 1 x 32 SSD) 1台
  • slave: m3.2xlarge ($0.616/hour, 8 vCPU, 30 GB memory, 2 x 80 SSD) 5台

構築時のコマンドは以下の通りです。

./spark-ec2 \
--key-pair=word2vec-key-pair \
--identity-file=/Users/myoshiz/.ssh/word2vec-key-pair.pem \
--region=us-west-1 \
--zone=us-west-1a \
--master-instance-type=r3.large \
--instance-type=m3.2xlarge \
--copy-aws-credentials \
--hadoop-major-version=yarn \
--slaves 5 \
launch spark-cluster > spark-submit.log 2>&1 &

構築が完了したら、前回同様に .bash_profile に環境変数 EC2_SPARK_MASTER を設定します。また、今回は環境変数 AWS_ACCESS_KEY_ID と AWS_SECRET_ACCESS_KEY も設定します。これは、Spark アプリケーションから S3 にアクセスするために必要な設定です。

export EC2_SPARK_MASTER=ec2-xxx-xxx-xxx-xxx.us-west-1.compute.amazonaws.com
export AWS_ACCESS_KEY_ID={{ IAM ユーザ "word2vec-user" のアクセスキーID }}
export AWS_SECRET_ACCESS_KEY={{ IAM ユーザ "word2vec-user" のシークレットアクセスキー}}

つまづいたこと

Part 1 でも書きましたが、--copy-aws-credential を指定しても、Spark に対してはアクセスキーID、シークレットアクセスキーが設定されません。環境変数を設定したところ、Spark(spark-shell, spark-submit)から S3 にアクセスできるようになったので、今回はこの方法で済ませました。

なお、AWS のインスタンスプロファイルを使ってアクセス権限を与えることもできると思いますが、今回は試していません。クラスタを作ったり壊したりを繰り返していたため、そのたびに AWS マネジメントコンソールをいじるのは面倒だったので……。

spark-submit の実行

最終的にやったこと

先ほど作った jar を、master にコピーします。

% scp -i ~/.ssh/word2vec-key-pair.pem word2vec-model-generator-assembly-0.0.1.jar root@${EC2_SPARK_MASTER}:/root/

そして、以下のようにコマンドを実行すると、Word2Vec アプリケーションが実行されます。ssh 接続が切れた場合のために、バックグラウンドで実行し、標準出力はファイルに出力させておきます。

$ ./spark/bin/spark-submit \
--master spark://${EC2_SPARK_MASTER}:7077 \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.driver.memory=11g \
--conf spark.akka.frameSize=1024 \
--class jp.muziyoshiz.word2vec.Word2VecModelGenerator \
word2vec-model-generator-assembly-0.0.1.jar \
s3n://my-bucket-name/enwiki/*.txt.gz \
s3n://my-bucket-name/enwiki_model \
10 20 50 > spark-submit-wp8.log 2>&1 &

Spark から S3 にアクセスする際は、aws コマンドで指定するときの s3:// ではなくて、s3n:// を指定する必要がある点に注意です。

Word2Vec のパラメータを変えて、何度か実行してみたところ、実行時間は以下のようになりました。

Pattern No. numPartition numIteration minCount vectorSize 実行時間
1 10 1 5 10 75.0 min
2 10 1 20 20 82.6 min
3 10 1 20 50 130.6 min

最初のパターン1は、vectorSize をかなり減らしたにも関わらず時間がかかり、後述するようにモデルの精度もよくありませんでした。

そのため、パターン2では minCount(単語の最小出現回数)を大きくして、vocabSize(モデルに含まれる単語数)を減らしました。その結果、vectorSize をパターン1の2倍にしたにも関わらず、実行時間は1.1倍程度に収まり、モデルの精度も若干上がりました。

最後に、他のパラメータは同じままで vectorSize のみ50に増やしたところ、vectorSizeはパターン2の2.5倍で、実行時間は1.5倍になりました。

実行時間の内訳を Spark UI で確認したところ、処理の最後に slave 1台で実行するタスク(Locality Level NODE_LOCAL のタスク)があり、これが1〜2時間かかっていました。このタスクがボトルネックになっているということは、少なくとも Word2Vec については、Spark MLlib による分散処理のメリットって、もしかしてあまり無いとか……?

ただ、今回は numIteration を 1 で固定にしましたが、精度を上げるためには numPartition と同じ 10 まで上げたほうがよいはずです。numIteration を増やせば、分散処理されるタスクが占める割合も増えるので、Spark の恩恵が得られるのではないかと思います。それはまた今度試してみます。

つまづいたこと(1):ドライバのメモリ使用量を増やさないと落ちる

最初に実行したところ、以下のエラーが発生してジョブが止まりました。

16/06/25 09:39:46 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() for one-way message.
java.lang.IllegalStateException: unread block data

Spark UI の Environment タブで、Spark Properties を確認したところ、spark.executor.memory は m3.2xlarge(メモリ 30GB)に合わせて 26837m に指定されていたのですが、spark.driver.memory のほうは何も指定されていませんでした。spark.driver.memory のデフォルトは 1g です。

これを r3.large のメモリ 15GB から 4GB を引いた 11GB(11g)に指定したところ、このエラーは出なくなりました。また、Environment タブで、spark.driver.memory が指定されていることを確認できました。

つまづいたこと(2):vocabSize*vectorSize が大きすぎると落ちる

numPartition numIteration minCount vectorSize
10 1 5 100

Spark MLlib の vectorSize のデフォルト値は 100 なので、最初はこの値を使っていました。しかし、上記のパラメータの組み合わせで実行したところ、Stage 1 の処理の途中で以下のエラーが出て、タスクが止まってしまいました。

Exception in thread "main" java.lang.RuntimeException: Please increase minCount or decrease vectorSize in Word2Vec to avoid an OOM. You are highly recommended to make your vocabSize*vectorSize, which is 3856720*100 for now, less than `Int.MaxValue/8`.
        at org.apache.spark.mllib.feature.Word2Vec.fit(Word2Vec.scala:319)
(スタックトレースは省略)

Int.MaxValue/8 = 268435455 です。つまり、vocabSize(単語数)*vectorSize がこの上限を大幅に超えていることが原因のようです。Int の最大値の8分の1ってなんでまた……。

とにかく、単語数を減らすか、vectorSize を減らす必要があることがわかりました。そのため、これ以降のテストでは vectorSize を減らすパターン(パターン1)と、minCount を増やして単語数を減らすパターン(パターン2〜3)を試しました。

つまづいたこと(3):モデルのサイズが大きすぎると akka のフレームサイズ上限を超えて落ちる

Word2Vec.fit() の最後、生成された Word2Vec モデルを parquet 形式で出力するところで、以下のエラーが出て落ちました。

16/06/25 11:28:06 ERROR InsertIntoHadoopFsRelation: Aborting job.
org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 1793:0 was 213710572 bytes, which exceeds max allowed: spark.akka.frameSize (134217728 bytes) - reserved (204800 bytes). Consider increasing spark.akka.frameSize or using broadcast variables for large values.

spark.akka.frameSize のデフォルトは 128(単位は MB)で、タスクのサイズがこれを超えると落ちるようです。小さいデータセットでは出なかったエラーなのですが、Wikipedia 規模になると出るようです。1〜2時間待ったあとで、最後の最後にこのエラーで落ちると、非常に(精神的にも金銭的にも)痛いです……。

設定可能な上限値は調べても分かりませんでしたが、ひとまず --conf spark.akka.frameSize=1024 を指定して 1GB にしたところ、Word2Vec モデルの出力まで成功しました。

ローカルマシン上での Word2Vec モデルの利用

最終的にやったこと

先ほどの spark-submit の実行により、Word2Vec モデルが S3 にアップロードされました。このモデルをローカルマシンにダウンロードして使ってみます。

% aws s3 cp s3://my-bucket-name/enwiki_model ./enwiki_model --recursive

以下のコマンドで spark-shell を起動します。Part 1 で使ったオプションに加えて、--conf spark.kryoserializer.buffer.max=1g を指定しています。

% spark-shell --master local \
--conf spark.driver.memory=5g \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.kryoserializer.buffer.max=1g

そして、spark-shell のプロンプトで以下を実行し、Word2Vec モデルをロードします。

import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}

val model = Word2VecModel.load(sc, "enwiki_model")

また、今回は Apache Spark 入門に書かれたメソッドを使って、関係性を加味した推測を行ってみます。有名な例で言うと、「"king" に対する "kings" は、"queen" に対する何か?」という関係を、Word2Vecモデルから推測することができます。

以下は、Apache Spark 入門の p.224 から抜粋したコードです。これを spark-shell に貼り付けるか、ファイルに書いておいて :load <file name> でロードして使います。

import org.apache.spark.mllib.linalg.Vectors

def relationWords(w1: String, w2: String, target: String, model: Word2VecModel) :Array[(String, Double)] = {
    val b = breeze.linalg.Vector(model.getVectors(w1))
    val a = breeze.linalg.Vector(model.getVectors(w2))
    val c = breeze.linalg.Vector(model.getVectors(target))
    val x = c + (a - b)
    model.findSynonyms(Vectors.dense(x.toArray.map(_.toDouble)), 10)
}

パターン1:minCount = 5, vectorSize = 10

ベクトル数が小さいためか、精度はかなり悪いです。Tokyoという都市名、Japanという国名に対する類似語を求めても、上位10件にそれらしいものが現れません。

scala> model.findSynonyms("Tokyo", 10).foreach(println)
(Rebounder,5.026514312082014)
(Fivepenny,5.006271473525809)
(Riviera,4.9806280562664655)
(Pirmahal,4.977896311409738)
(A2217,4.973896329049228)
(Pestújhely,4.967955406306887)
(Tri,4.966647609406325)
(Cigarros,4.966214313196464)
(Seahorses,4.9657892250050715)
(Club,4.965424934604451)

scala> model.findSynonyms("Japan", 10).foreach(println)
(Prabda,3.8591253451462766)
(Skateabout,3.789246081518729)
(detailslink,3.756286768742609)
(Oceania,3.7439580152901946)
(Daeges,3.743037606956309)
(Equestrianism,3.73990681262581)
(Miegs,3.7392088293670396)
(Fleuth,3.735308547592705)
(KBID-LP,3.730579527776324)
(Powerlifting,3.717090309581691)

関係性を加味した推測も、以下のようにうまく行きませんでした。

scala> relationWords("king", "kings", "queen", model).foreach(println)
(satsuma-biwa,4.95347264322314)
(shoguns,4.93869343414127)
(mystics,4.931215483461304)
(Zelimxan,4.925167012454619)
(Christianized,4.922235458369835)
(veneration,4.921893688910249)
(Shi’i,4.921205040607001)
(Russified,4.917586471812209)
(pagan,4.912822109308089)
(revered,4.911351827558269)

scala> relationWords("prince", "king", "princess", model).foreach(println)
(Pandava,4.2101984410814834)
(Aegisthus,4.207452272387961)
(bandit,4.202362575975742)
(amanuensis,4.194580140364399)
(Aerope,4.188601884423512)
(tradesman,4.178661804898081)
(Candaules,4.177194064593601)
(princess,4.173209621638307)
(Shoulang,4.165125455530385)
(Seibei,4.163678291883964)

パターン2:minCount = 20, vectorSize = 20

単語数を減らし、ベクトル数を上げた結果、精度が若干向上しました。Tokyo に対する類義語として、日本の都市の Osaka、Sapporo が出てくるようになりました。一方で、Japan に対する類義語のほうは、あまり改善が見られません。

scala> model.findSynonyms("Tokyo", 10).foreach(println)
(Wrestle,7.689458080058069)
(Split,7.626499879518354)
(Osaka,7.620597049534027)
(Sapporo,7.556529623946273)
(Setagaya,7.513748270603075)
(Hiroshima,7.490792005499523)
(Shinjuku,7.45951304352636)
(Kanazawa,7.459122453399323)
(Expo,7.453010168798164)
(ESCOM,7.447874763780933)

scala> model.findSynonyms("Japan", 10).foreach(println)
(Tokyo,5.679376270328159)
(AXN,5.640570343734289)
(Wrestle,5.60396135079362)
(Expo,5.590382781259281)
(TV2,5.522196857434101)
(Hanoi,5.495135749493573)
(TV6,5.490184062697079)
(Kyoto,5.486577183328772)
(Skate,5.4760554670281065)
(Benelux,5.430530293625971)

関係性を加味した推測は、まだあまりうまくいきません。ただ、後者のほうは5位に正解の "queen" が出ているので、若干精度が向上しています。

scala> relationWords("king", "kings", "queen", model).foreach(println)
(pagan,6.667731329068959)
(garb,6.659426546093454)
(gods,6.648366573398432)
(symbolised,6.648168276539841)
(sacred,6.6085783714277975)
(personages,6.598811565877372)
(veneration,6.597536687593547)
(puranas,6.590383098194837)
(deities,6.588936982768422)
(beauties,6.588806331810932)

scala> relationWords("prince", "king", "princess", model).foreach(println)
(lord,6.574825899196509)
(princess,6.522661208674787)
(bride,6.521167177599623)
(lady,6.492377997870626)
(queen,6.479450084505509)
(first-born,6.466189456944019)
(king,6.441766970616445)
(blessed,6.441764119985444)
(beloved,6.4396910737789606)
(bridegroom,6.423838321417851)

パターン3:minCount = 20, vectorSize = 50

ベクトル数を大幅に増やした結果、かなり精度が向上しました。Tokyo に対して日本の都市、Japan に対してアジアの国名や首都が表示されるようになりました。

scala> model.findSynonyms("Tokyo", 10).foreach(println)
(Osaka,6.442472711716078)
(Fukuoka,6.3918200759436)
(Saitama,6.343209033208874)
(Setagaya,6.237343626467007)
(Japan,6.063812875793321)
(Sapporo,6.027676167552773)
(Nagano,5.955215285602899)
(Kobe,5.891646194480255)
(Yamagata,5.86912171881318)
(Shibuya,5.835765966270005)

scala> model.findSynonyms("Japan", 10).foreach(println)
(Tokyo,5.510337298616405)
(Korea,5.509610108188756)
(China,5.486622516556292)
(Fukuoka,5.378651363703807)
(Taiwan,5.377869828524535)
(Seoul,5.321357314331263)
(Shizuoka,5.31678565272272)
(Prefecture,5.297746506109964)
(Hamamatsu,5.159312705112953)
(Kanagawa,5.157422752148916)

関係性を加味した推測も、かなり精度が向上しました。前者は正解の "queens" が2位、後者も正解の "queen" が2位に表示されています。イテレーション数(numIteration)を増やすなど、更にパラメータを調整すれば、これらの単語が1位に上がることが期待できます。

scala> relationWords("king", "kings", "queen", model).foreach(println)
(realms,6.380416904839411)
(queens,6.292521776188793)
(knightly,6.2558567330155626)
(consorts,6.241017073100756)
(kings,6.200374546691251)
(kindreds,6.17249501613232)
(lamas,6.1721177720161915)
(monuments,6.147651372785442)
(patrilineal,6.1288029631730545)
(depictions,6.121416883901753)

scala> relationWords("prince", "king", "princess", model).foreach(println)
(princess,5.956775488416378)
(queen,5.9055082324742685)
(slaying,5.793197818446893)
(king’s,5.696965618712307)
(betrothal,5.59067630474941)
(goddess,5.58159904439838)
(apparition,5.554027664552106)
(martyrdom,5.534826668619817)
(Pelops,5.503355785910461)
(ancestress,5.4953139994512545)

つまづいたこと

spark-shell の起動時に --conf spark.kryoserializer.buffer.max=1g を指定しないと、Word2VecModel.load() の呼び出しで落ちました。

16/06/25 22:00:26 WARN TaskSetManager: Lost task 1.0 in stage 2.0 (TID 3, localhost): TaskKilled (killed intentionally)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, localhost): org.apache.spark.SparkException: Kryo serialization failed: Buffer overflow. Available: 1, required: 4. To avoid this, increase spark.kryoserializer.buffer.max value.
    at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:299)
(スタックトレースは省略)

あと、余談ですが、zsh を使っていると --master local[*] の指定ができないようです。以下のようなエラーが出ます。bash なら指定できるので、spark-shell の実行時だけ bash に切り替えました。

% spark-shell --master local[*] \
--conf spark.driver.memory=5g \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.kryoserializer.buffer.max=1g
zsh: no matches found: local[*]

Part 2 のまとめ

Wikipedia の大規模データを、Amazon EC2 上に構築した Spark Cluster で処理できました。また、実際にいろいろなパラメータを試してみて、大規模データを処理する際に注意すべき Spark のプロパティや、Word2Vec のパラメータを把握することができました。

今回生成した Word2Vec のモデルはまだあまり精度が良くありませんが、以下の方針で精度を上げることができそうです。

  • minCount を大きくして、単語数(vocabSize)を小さくする
  • vectorSize を増やす
  • numIteration を numPartition に近づける
  • 元データからノイズ(MediaWiki記法など)を除去する方法を改善する

実際に試してみて、Word2Vec の場合は slave 1台で実行される処理がボトルネックになっていることがわかりました。Amazon EC2 の高いマシンを借りているのに、slave 1台だけが頑張っていて、残りの4台は遊んでいる、というのはなかなか焦ります。

Slave 1台しか動かない時間を短くする方法としては、vocabSize*vectorSize を小さくするしかないのですかね? 実際のアプリで Word2Vec を使う際には、Spark MLlib での処理の前に、そのアプリで使わない単語を除去してしまって単語数を大幅に減らしておく、などの対策が必要かもしれません。

Part 2 の主な参考文献