無印吉澤

Site Reliability Engineering(SRE)、ソフトウェア開発、クラウドコンピューティングなどについて、吉澤が調べたり試したことを書いていくブログです。

手を動かす Spark MLlib & Word2Vec Part 1 (spark-ec2 でクラスタを構築するまで)

f:id:muziyoshiz:20160626223709p:plain

このシリーズについて

機械学習系のツールを全然使ったことがなかったので、勉強のためになにか1つ選んで、実際に手を動かしてみることにしました。マシンを並べて負荷分散することを想定して、まずは Spark MLlib を選びました。

このシリーズでは、Amazon EC2 上に構築した Spark Cluster (Standalone Mode) で、Wikipedia のデータから Word2Vec のモデルを作るところまでの方法を解説していきます。ただ、実際やってみてわかったのですが、Spark 自体、Spark MLlib の Word2Vec クラス、およびクラスタ構築に使った spark-ec2 に設定項目が多いせいで、細かいところで何度も何度もつまづきました……。

そのため、このシリーズでは各ステップについて、「最終的にやったこと」と、その最終的なやり方にたどり着くまでに「つまづいたこと」を分けました。やり方を知りたいだけの場合は「最終的にやったこと」の方だけ読んでください。「つまづいたこと」は、うまく行かなかった場合のための参考情報です。

Part 1 の範囲

Amazon EC2 に master 1台、slave 3台構成の Spark Cluster (Standalone mode) を構築し、spark-shell から Word2Vec を実行するところまで。

Spark をローカル環境(Mac)にインストールする

最終的にやったこと

まず、ローカル環境で Spark MLlib が動くかどうかを試してみました。環境は以下の通りです。

  • MacBook Pro (Retina, 15-inch, Mid 2014)
  • OS: OS X Yosemite 10.10.5
  • CPU: 2.2 GHz Intel Core i7
  • メモリ: 16GB 1600 MHz DDR3

OS X に Spark をインストールする場合、以下のコマンドだけでインストールできます(参考:ApacheSpark — BrewFormulas)。

% brew update
% brew install apache-spark

私が試した時点では Spark 1.6.1 でした。Java は Java 8 です。

% spark-shell --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.1
      /_/

Type --help for more information.

つまづいたこと

spark-shell ローカルモードで(--master local を指定して)実行すると、spark> というプロンプトが表示されるまでに、色々と WARN が出ます。

16/06/07 00:17:36 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)

BoneCP は JDBC Connection Pool ライブラリの名前です。scala - What do WARN messages mean when starting spark-shell? - Stack Overflow によると、ローカルモードで実行しているときは問題ないとのこと。

16/06/07 00:17:38 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
16/06/07 00:17:38 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException

こちらも、Hive metastore に接続できないことを表す WARN なので、ローカルモードでは関係ないと判断しました。

ローカル環境での Word2Vec の実行

最終的にやったこと

Spark MLlib のページ(Feature Extraction and Transformation)に、Spark MLlib に含まれる Word2Vec クラスを使ったサンプルコードがあります。これをローカルモードで実行してみます。

まず、サンプルコードで使っている text8.zip をダウンロードして、解凍します。これは、スペースで区切られた英単語が羅列された(意味のある文章ではない)100 MB のテキストファイルです。

% wget http://mattmahoney.net/dc/text8.zip
% unzip text8.zip
% ls -la text8
-rw-r--r--@ 1 myoshiz  staff  100000000  6  9  2006 text8

この text8 を置いたディレクトリで、以下のコマンドを実行します。spark.driver.memory はドライバのメモリ使用量を表すオプションで、デフォルトは 1g (1GB)です。

% spark-shell --master local \
--conf spark.driver.memory=5g \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer

spark-shell のプロンプトで、Word2Vec のサンプルコード を入力すれば、myModelPath ディレクトリ以下に、Word2Vec のモデルデータが生成されます。

なお、spark-shell の起動中は http://localhost:4040/ にアクセスすることで、ジョブの状態を確認できます。

つまづいたこと

最初は --conf spark.driver.memory=5g" を指定せずに spark-shell を起動していました。その状態でword2vec.fit(input)` を実行すると、OutOfMemoryError で spark-shell が落ちます。私の環境では、ファイルが 100MB だと落ちて、80MB まで減らすと落ちない、という状態でした。

scala> val model = word2vec.fit(input)
[Stage 0:>                                                          (0 + 1) / 3]
Exception in thread "refresh progress" java.lang.OutOfMemoryError: GC overhead limit exceeded
    at scala.StringContext.s(StringContext.scala:90)
(スタックトレース、および後続のエラーは省略)

エラーメッセージをもとに調べたところ、JVM の設定が悪いような情報をいくつか見かけました。

しかし、これを指定してもエラーメッセージは変わりませんでした。というか、私は Java 8 で実行していたので、そもそもこの設定には意味がありませんでした。

この Java の仕様変更を踏まえて、以下のように spark-shell を実行したところ、落ちなくなりました。ただし、この方法だと、OutOfMemoryError が出ないだけで、いつまでも処理が終わらないという状態になってしまいました……。

% SPARK_REPL_OPTS="-XX:MaxMetaspaceSize=1024m" spark-shell --master local

結局、Configuration に載っているメモリ関係のパラメータを一通り確認して、前述の spark.driver.memory を増やしたところ、うまく動いたようで、処理が完了しました。JVM のパラメータを変更する必要はなかったようです。

Amazon EC2 への Spark クラスタの構築(spark-ec2 を使った方法)

最終的にやったこと

Slave の台数を増やすことで、Spark MLlib の実行時間が短くなることを確認するために、Spark クラスタを構築しました。今回は Spark に同梱されている spark-ec2 というスクリプトを使って構築しました。このスクリプトの説明は Running Spark on EC2 - Spark 1.6.1 Documentation にあります。

Amazon Elastic MapReduce (EMR) で Spark を使えることは知っていますが、いずれオンプレに Spark クラスタを構築したかったのと、かといってマシンスペックを何パターンか試すときに手作業での構築は大変すぎたので spark-ec2 を使いました。

まず、AWS のマネジメントコンソールを使って、以下の設定を行います。今回は Spark の話がメインなので、AWS の設定の詳細は省略します。

  • IAM ユーザ "word2vec-user" の作成
  • IAM ユーザ "word2vec-user" に対する "AdministratorAccess" ポリシーのアタッチ(EC2 と S3 に絞っても良い)
  • EC2 でのキーペア "word2vec-key-pair" の作成
  • ローカルマシンに対する環境変数 AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY の設定
  • ローカルマシンに対するキーペアの配置(以下では /Users/myoshiz/.ssh/word2vec-key-pair.pem に置いたと仮定)

brew で Spark をインストールすると、spark-ec2 は入っていません。そのため、spark-ec2 を使うために、Apache Spark のダウンロードページ から zip ファイルをダウンロードします。今回は、以下のファイルを選択しました。

  • Spark release: 1.6.1
  • Package type: Pre-built for Hadoop 2.6 and later

ダウンロードした zip ファイルを解凍すると、ec2 ディレクトリに spark-ec2 というスクリプトが入っています。このディレクトリに移動し、まずは master 1台、slave 3台のクラスタを構築してみます。そのためには、以下のコマンドを実行します。

% ./spark-ec2 \
--key-pair=word2vec-key-pair \
--identity-file=/Users/myoshiz/.ssh/word2vec-key-pair.pem \
--region=us-west-1 \
--zone=us-west-1a \
--instance-type=m4.large \
--copy-aws-credentials \
--hadoop-major-version=yarn \
--slaves 3 \
launch spark-cluster

各オプションの意味と、上記の値を指定した理由は以下の通りです。

  • --region は、デフォルトはバージニア北部(us-east-1)が使われる。国内だとインスタンス利用費が若干高く、東海岸は遠いので、北カリフォルニア(us-west-1)を指定した。
  • --instance-type は、デフォルトでは m1.large が使われる。m1.large は古いインスタンスタイプのため、スペックに比して割高のため、同じく 2 vCPU、メモリ8GBの m4.large を指定。調べた時点では m1.large が $0.19/hour、m4.large が $0.14/hour だった。
  • --copy-aws-credentials を指定すると、環境変数に設定された AWS のアクセスキーが、master の hadoop にも設定される。ただし、後述の通り Spark に対しては設定されないので、hadoop コマンドを使わないなら、指定しなくても良い。
  • --hadoop-major-version=yarn は、使用する Hadoop のバージョンを指定する。今回は Pre-built for Hadoop 2.6 and later をダウンロードしているので、yarn を指定する必要がある。デフォルトは 1(Hadoop 1.0.4)。
  • --slaves は slave の台数を指定する。

10〜20分待つとクラスタの構築が完了し、以下のようなメッセージが表示されます。Mac から以下の URL にアクセスすると、Spark UI や、Ganglia の画面を確認できます。

Spark standalone cluster started at http://ec2-xxx-xxx-xxx-xxx.us-west-1.compute.amazonaws.com:8080
Ganglia started at http://ec2-xxx-xxx-xxx-xxx.us-west-1.compute.amazonaws.com:5080/ganglia
Done!

上記のホスト名は、以降の作業でも使うので、以下のように環境変数に設定しておきます。シェルの設定ファイル(.bash_profile とか)で指定してもいいですが、クラスタを作るたびにホスト名が変わる点だけは注意が必要です。

% export EC2_SPARK_MASTER=ec2-xxx-xxx-xxx-xxx.us-west-1.compute.amazonaws.com

つまづいたこと(1):GitHub の spark-ec2

brew で spark をインストールすると、そのなかには spark-ec2 が入っていません。そのため、このスクリプトだけ別に入手できないかと思い、GitHub で公開されている spark-ec2 を clone して実行してみました。

github.com

この spark-ec2 を実行すると、エラーも出ずに最後まで処理が進むのですが、Spark クラスタが起動しないようです。http://ec2-xxx-xxx-xxx-xxx.us-west-1.compute.amazonaws.com:8080 にアクセスしても応答がなく、spark-shell で --master spark://ec2-xxx-xxx-xxx-xxx.us-west-1.compute.amazonaws.com:7077 を指定しても接続できない、という状態になりました。

色々悩んだ結果、大人しく Apache Spark のダウンロードページ から zip ファイルをダウンロードして、そのなかの spark-ec2 を使ったところ、実行したコマンドの引数は同じにも関わらず、クラスタが起動しました。

GitHub 版も見た目はきちんと動いているように見えたために、他に原因があると思い込んでしまい、この問題で数日詰まってしまいました……。

つまづいたこと(2):--hadoop-major-version=yarn の指定

このオプションは、公式サイトの Running Spark on EC2 には書かれていません。しかし spark-ec2 --help を実行すると、以下のオプションが表示されます。

  --hadoop-major-version=HADOOP_MAJOR_VERSION
                        Major version of Hadoop. Valid options are 1 (Hadoop
                        1.0.4), 2 (CDH 4.2.0), yarn (Hadoop 2.4.0) (default:
                        1)

上記のオプションを指定しないと、クラスタの構築後に spark-shell を実行した時に、以下のようなエラーが出て sqlContext の初期化に失敗しました。

16/06/13 14:08:02 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MResourceUri" is tagged as "embedded-only" so does not have its own datastore table.
java.lang.RuntimeException: java.io.IOException: Filesystem closed
    at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522)
(中略)
<console>:16: error: not found: value sqlContext
         import sqlContext.implicits._
                ^
<console>:16: error: not found: value sqlContext
         import sqlContext.sql
                ^

Spark クラスタでの Word2Vec の実行

最終的にやったこと

spark-ec2 の login コマンドを使用すると、master にログインできます。もちろん ssh でもログインできますが、master のホスト名を書かなくてよいのがメリットだと思います。ちなみに、オプションの指定が面倒ですが、以下の3つは必須のようです。

% ./spark-ec2 \
--key-pair=word2vec-key-pair \
--identity-file=/Users/myoshiz/.ssh/word2vec-key-pair.pem \
--region=us-west-1 \
login spark-cluster

次に、先ほどと同じサンプルコードを実行するために、text8.zip をダウンロードします。また、このファイルを、クラスタ上で動作する HDFS にアップロードします。これは、ファイルを slave からアクセス可能にするための作業です。後ほど、HDFS の代わりに S3 を使う方法も紹介します。

$ wget http://mattmahoney.net/dc/text8.zip
$ unzip text8.zip
$ ./ephemeral-hdfs/bin/hadoop fs -put text8 /

ここまでの準備が終わったら、master 上で spark-shell を実行します。指定するオプションは以下の通りです。ローカルモードの場合とは、--master の指定が変わっています。

$ ./spark/bin/spark-shell \
--master spark://${EC2_SPARK_MASTER}:7077 \
--conf spark.driver.memory=5g \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer

ホスト名の指定が面倒ですが、--master spark://localhost:7077 という指定では接続できませんでした。

あとは、spark-shell で以下のように入力すると、Word2Vec が実行されます。ローカルモードとの違いは、textFile() や save() に渡されたファイルパスが、HDFS のファイルパスとして扱われることです。今回はルート直下に text8 を置いたため、/text8 のように指定しています。

import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}

val input = sc.textFile("/text8").map(line => line.split(" ").toSeq)

val word2vec = new Word2Vec()

val model = word2vec.fit(input)

val synonyms = model.findSynonyms("china", 40)

for((synonym, cosineSimilarity) <- synonyms) {
  println(s"$synonym $cosineSimilarity")
}

// Save and load model
model.save(sc, "/model_text8")
val sameModel = Word2VecModel.load(sc, "/model_text8")

以上により、Word2Vec のジョブが slave 上で実行されます。ただ、http://ec2-xxx-xxx-xxx-xxx.us-west-1.compute.amazonaws.com:8080 にアクセスするとわかるのですが、このままだと3台ある slave のうち、1台しか使われません。次は、ジョブを分散するために、Word2Vec のパラメータを変更します。

つまづいたこと

最初、ローカルディスクのファイルにアクセスできないことに気づきませんでした。file:// を付けても駄目でした。

次に、HDFS 上にファイルをアップロードする方法で悩んだのですが、これは ./ephemeral-hdfs/bin 以下のコマンドが使えることに気付いたあとは簡単でした。hadoop コマンドに馴染みのない人は、Apache Hadoop 2.7.1 – などが参考になると思います。

すべての slave に処理が分散されることの確認(Word2Vec のパラメータ変更)

最終的にやったこと

Word2Vec のパラメータは、Word2Vec クラスの setter で指定できます。用意された setter とそのデフォルト値は Word2Vec の API リファレンス に記載されています。

これらの setter のうち、setNumPartitions() でパーティション数を1よりも大きくすると、複数の slave 間で処理が分散されます。この値のデフォルトが1なので、そのままでは slave が1台しか使われません。

val word2vec = new Word2Vec()

// Set this
word2vec.setNumPartitions(4)

val model = word2vec.fit(input)

slave 3台で試したところ、パーティション数を4まで増やした段階で、すべてのslaveに処理が分散されました。ただ、5台で試したときには、パーティション数を6にしても、slave 4台しか使われませんでした。単純に slave の台数 + 1 にすればよいというわけではなさそうで、詳細はまだわかりませんが、少なくとも slave の台数よりも大きい数を指定する必要がありそうです。

ただ、このパーティション数を増やすと、増やした分だけ負荷分散されて処理時間が短くなっていく一方で、計算結果の正確さも落ちていくとのことです。Word2Vec の処理が分散しない理由を調べている際に、以下の情報を見かけました。

stackoverflow.com

  • イテレーションの数は、パーティション数と同じか、それ以下にすべき
  • 正確さのために、パーティション数は小さい値を使うべき
  • 結果(モデル)を正確にするためには、複数のイテレーションが必要

どれくらい結果が変わっていくのか、text8 を3台の slave 上で処理して調べてみました。以下は、numPartition = 1, 3, 6 での、"china" に類似した単語の上位10件です。パーティションが1個の場合の上位3件を太字にしています。text8 は意味のない文字列ですが、結果が変わっていく様子は参考になると思います。

numPartitions 1 3 6
1位 taiwan taiwan indonesia
2位 korea korea taiwan
3位 japan japan afghanistan
4位 mongolia mainland kazakhstan
5位 shanghai indonesia pakistan
6位 tibet india japan
7位 republic pakistan ireland
8位 india mongolia india
9位 manchuria thailand uzbekistan
10位 thailand africa iran

ちなみに、使われた slave の台数と、処理時間の関係は以下のようになりました。text8 くらいのデータ量(100 MB)だと、おおよそ、使われる slave の台数に応じて大きく処理時間が減るようです。

numPartitions 1 3 6
slave の台数 1 2 3
処理時間 6.3 min 3.6 min 1.9 min

つまづいたこと

負荷分散しない理由が Word2Vec のパラメータの方にある、ということが最初なかなか分からずに苦労しました。

普通に考えると「Spark MLlib から Word2Vec を使いたい人=負荷分散を期待している人」だから、Word2Vec のパラメータの初期値は負荷分散するようになっているはずだ(だから Spark のパラメータの方に問題があるはずだ)と思い込んでいました……。

Part 1 のまとめ

ここまでの手順で、Amazon EC2 上に Spark クラスタを構築する方法を確認できました。

次の Part 2 では、この Spark クラスタの slave の台数およびマシンスペックを強化し、処理するデータ量も Wikipedia 英語版(Gzip 圧縮した状態で 12 GB)まで増やしてみます。

Part 1 の主な参考文献

Habitat を触っていて気になった、細かいことあれこれ

f:id:muziyoshiz:20160617201522p:plain

Habitat について知りたい方は、まずは私がエンジニアブログに書いた Habitat の概要説明をご覧ください。自作のイメージ図を使って、Habitat のわかりにくい独自用語を解説しています。

recruit.gmo.jp

で、上記の記事を書いた時に、あまりにも細かすぎるので省いた話題がいくつかありました。放っておくと忘れそうなので、今回はその細かいことあれこれをご紹介します。

"Habitat" の意味

  • Habitat とは、居住環境、居住地、生息地、などの意味を持つ英単語です。
  • これを書いている時点で "Habitat" でググったところ 約 146,000,000 件ヒットしました。Chef といい、この会社はどうしてこう、検索しにくい名前をツールに付けてしまうのか……。

Habitat のバージョン番号

  • これを書いている時点で hab -V を実行したら hab 0.7.0/20160614231131 と出てきました。
  • 0.1 でも 1.0 でもない、これまた微妙なところを……。開発陣としては、現時点の実装をどれくらいの完成度だと思っているんでしょう? もう実サービスで使えるレベル? 少なくとも、ドキュメントのなかに「まだ production に使うな」というありがちな文章は見当たりませんでした。

Habitat の推奨環境

  • Habitat が推奨する、あるいは Habitat 開発者が最初にテストしている Linux ディストリビューションって何なんでしょう?
  • Habitat ファーストインプレッション にも書いた通り、現時点では Linux でしか Supervisor は動作しません。とはいえ、私が VirtualBox & CentOS 7 で試したところ、それでもチュートリアル通りには動きませんでした。
  • habitat-sh/habitat: Modern applications with built-in automation のトップディレクトリにある Dockerfile が FROM ubuntu:xenial で始まってるので、Ubuntu の可能性大。次に試すときは Ubuntu でやります。

Package と Artifact

  • ドキュメントを読んでいると、Package と同じ概念を Artifact と呼んでいる箇所がいくつかありました。Artifact という用語も使われているのか、この用語はもう廃止されたけどドキュメントの一部に残っているだけなのか?
  • そういえば、Package ファイルの拡張子の hart って、Habitat Artifact の略称なんですかね。

Depot への Package のアップロード

  • Habitat CLI reference の目次には "hab pkg upload" のリンクが載っているんですが、このリンクをクリックした先の説明はありませんでした。
  • hap pkg upload --help を実行すると、hab pkg upload [FLAGS] [OPTIONS] <HART_FILE>... と出てきます。なんだあるんじゃーんと思ってコマンドを叩いたら、チュートリアルで作った muziyoshiz/mytutorialapp を見事アップロードできました(アップロード先)。
  • で、アップロードはできたんですが、Habitat Web って、パッケージの削除機能がまだ無いみたいです。もしかして、これが CLI reference からコマンドが消されている理由では。削除機能が追加されたら消しますごめんなさい……。

Habitat の P2P ネットワークと Topology

  • Habitat は Supervisor 同士で P2P ネットワークを構築します。このネットワークのことを Ring あるいは Supervisor Ring と呼ぶようです。しかし、Supervisor Internals によると、このネットワークは SWIM (Scalable Weakly-consistent Infection-style process group Membership protocol) で構成されるとのこと。リングネットワークを組んでいるわけでもないのに Ring というのはモヤモヤします。
  • 一方、Habitat には Topology という用語があり、Supervisor 間の論理的な関係を定義できます。Running packages in topologies によると、現時点では standalone, leader-follower, initializer の3種類から選べるようです。Topology という単語が Network Topology を指しているわけではない、というのもなんだかモヤモヤします。

設定ファイルの一部で TOML 形式を採用

  • Habitat は設定ファイルを Handlebars 形式で書くことができて、この Handlebars に渡す変数を TOML 形式で定義できます。
  • TOML 形式って初耳だったので調べてみたところ、toml/toml-v0.4.0.md に仕様がありました。Tom's Obvious, Minimal Language の略で TOML なんですね。僕も便乗して YOML とか作ってやろうか(紛らわしすぎる)。
  • ちなみにこの仕様、有志による日本語訳が toml/toml-v0.4.0.md にて公開されていました。
  • HashiCorp にも HCL とかありますし、運用管理ツールを開発していると Yet Another な YAML が欲しくなるものなんでしょうか。

Habitat と Google Analytics

  • hab setup を実行してセットアップすると、その最後に、利用データを Habitat の Google Analytics アカウントにアップロードしてよいかと尋ねられます。No を選択すると ~/.hab/cache/analytics/OPTED_OUT に空のファイルが作られて、それきり何も質問されません。
  • ツールの利用状況を収集したい気持ちはよくわかります。でもまあ、No を選択しますよね。そういえば、最近 Google Analytics を後から導入して揉めたソフトがなにかあった気がしますけど、何でしたっけ?

とりとめもなくなってきたので、今日はこのへんで。

Fluentd Meetup 2016 Summer レポート 〜 v0.14 の新機能からプラグイン開発者向け API まで

f:id:muziyoshiz:20160602000413p:plain
  • イベント名:Fluentd Meetup 2016 Summer
  • 開催日時:2016-06-01(月)
  • 会場:イベント&コミュニティスペース dots.

約1年ぶりに開催された Fluentd Meetup に参加してきました。今回は、5月31日にリリースされたメジャーバージョンアップの v0.14 について、ユーザ向けの機能紹介から、プラグイン開発者向けの深い話まで、盛りだくさんの内容でした。自分でプラグインを書くくらい、Fluentd をヘビーに使う人向けのイベントという感じで、どの話も面白かったです。

最近、私は Fluentd を使う機会が全然なかったこともあって、「Fluentd も機能的には枯れてきて、そろそろ新機能もあまりないだろう」と思っていたのですが、まだこんなに改善の余地があったのか……とちょっと驚きました。個人的には、古橋さんの講演で将来の構想として出てきた、Kafka っぽい PubSub は便利そうなので是非実装してほしいです。

以下、講演内容のメモです。プレゼンに書かれてなくて口頭で説明されていたことや、個人的に気になったことを中心にメモしています。

講演内容

Past & Future of Fluentd (古橋 貞之, @frsyuki)

  • Fluentdの歴史

    • 2011年6月に最初のバージョンをリリースした。今年の6月で5年。ここまで続くと、かなり成功したOSSと言っていいのではないか。
    • 新メジャーバージョンの 0.14 が昨日リリースされた。
  • 2年前にリリースされたv0.12の機能の振り返り

    • タグの書き換えを不要にする label と filter
  • 将来の構想

    • Advanced back pressure - 転送先が詰まるのを防ぐ
    • PubSub & pull forward - Kafkaに近い発想。ログが必要なアプリが取りに行く
    • Compressed buffer
    • Distributed buffer - twitterがリリースした distributedlog や、Apache BookKeeper のような機能を Fluentd の buffer のところに入れる
    • Service plugins - パッケージ化されたソフト(例えば Norikra)をFluentdで動作させるようにする。service label を書くだけで起動。インストールが簡単になる
    • Schema-validated logging - バリデートされたログだけを転送することで、無駄な転送や、データベース書き込み時のエラーを回避する
    • Centralized config manager - Chef や Ansible を使うことも考えられるが、Dockerで起動したインスタンスのなかの設定に Chef などを使いたくない。これに fluentd-ui が組み合わさると嬉しい
  • Fluentdのロゴが変わる予定

v0.14 Overview (中川 真宏, @repeatedly)

  • New Plugin APIs

    • v0.12向けに作ったプラグインは、互換性レイヤがあるので、ほぼ動くはず
    • インスタンス変数に直接アクセスしているようなプラグインは、動かない可能性がある
    • 次の v1 でも互換性レイヤは残す予定
  • buffer の設計の大幅な見直し

    • v0.12 buffer design: chunkの保存方法。ElasticSearch に書き込むときに、インデックスの値を見て書き込み先を変える、といった場合に問題になった。リトライがうまくいかない。
    • v0.14で、chunkの管理が柔軟になり、このような問題は解決できる
  • Plugin Storage & Helpers

    • プラグイン内で使えるストレージレイヤ(KVS)の導入
    • プラグインの作成時によく使う雛形コードをhelperで提供
  • Time with nanosecond

    • ミリ秒とナノ秒のどちらにするか、という議論があったが、ナノ秒でログを出すシステムが増えてきたのでナノ秒
    • 内部的には Fluent::EventTime という形で持つ。既存のプラグインに対しては Integer っぽく振る舞うクラス。
  • ServerEngine based Supervisor

    • プロセスモデルを ServerEngine に置き換える
  • Windows support

    • Linux 向けに特化したコードを、Windows で動くように直したりした
    • HTTP RPC でバッファのフラッシュなどを行う
  • v0.14.x - v1

    • https://github.com/fluent/fluentd/issues/1000 に並んでいる
    • v1 を出す。v1 で新機能を入れることはない。v1 は今まで入れたAPIをFixする、と宣言するためのバージョン。フィードバックは v0.14 のうちにしてほしい
    • v1 は 2016 4Q か 2017 1Q に出る
    • いくつか v1 で消したい機能はある
  • v0.14 系でこれから入れる予定の目玉機能

    • Symmetric multi-core processing: パフォーマンス向上。TCPポートをWorker間で共用し、Workerで処理を分ける。ServerEngine を使って実現する
    • Counter API: Workerが分かれるので、カウンタを作りづらくなる点をカバー
    • secure-forward をコアに入れる。forward と secure-forward の細かい違いをなくす
  • ベンチマーク結果

    • CPU使用率が、v0.14 で若干上がっている
    • EC2で、100,000msgs/sec での測定結果によると、CPU使用率が4〜5%上がった
    • v0.14はリリースしたばかりで、性能改善できる余地が残っている。これから改善する予定
  • td-agent 3

    • fluentd v0.14 は td-agent 3 から採用(現行の td-agent 2 系には入らない)
    • Ruby 2.3 に上げる
    • Windows 向けパッケージも提供予定
    • リリース日は未定(v0.14 がまだ stable と言えないので)
  • Q) chunk から取得したログの一部で処理が失敗した場合、リトライしたいと思うが、その場合 chunk の詰め直しはできるか?

    • A) できない。その場合は、エラーストリームという機能を使ってほしい

Fluentd ServerEngine Integration & Windows Support (成田 律太, @naritta)

  • 自己紹介

    • 2015年の Treasure Data のインターンシップに参加し、今年入社した
  • ServerEngine

    • Unicorn と大体同じようなもの
    • Supervisor, server, worker の構成を簡単に作れるようにするためのフレームワーク
    • Worker Module, Server Module を Ruby で書く
    • Worker type: thread, process, spawn
    • v0.14 では spawn worker type を利用している。Windows に fork の機能がないため
  • Fluentd で ServerEngine を採用したことによるメリット

    • auto restart
      • Linux でも Windows でも、fluentd のプロセス(worker)をKILLすると、自動的にリスタートするのを確認できる
    • live restart
      • 設定変更したあとでシグナル(または HTTP RPC)を送ると、worker だけをリスタートできる
      • server が TCP 接続を持っているので、ログは欠損しない
    • socket manager
      • server 側で listen して、socket を worker に共有する
      • 将来的にはマルチコアでソケットを共有するために ServerEngine を使う(いまはマルチコアで動かすには in_multiprocess plugin を使う必要がある)
    • signal handler
      • signal handler と log rotation は、Fluentd ユーザにとってのメリットというより、内部実装に関するメリット
      • シグナルをキューに溜めて、シグナル同士の競合を防ぐ
    • log rotation
      • ruby core にすでにポーティングされている機能
      • マルチプロセスでのログローテーションなど
  • Q) ServerEngine が入ることで、設定ファイルが増える?

    • A) Fluentd が内部的に ServerEngine の設定ファイルを作って、ServerEngine に渡す。ユーザ側は Fluentd/td-agent の設定ファイルだけを編集すれば良い

Fluentd v0.14 Plugin API Updates (田籠 聡, @tagomoris)

  • 何故新しい API セットを作ったか?

    • いままでのプラグインには、開発者向けドキュメントがなかった。みんな他の人の書いたプラグインの真似をして書いている
    • プラグイン内で独自にスレッドを起動するようなことが必要で、そのせいでテストしづらい
    • あるプラグインで使えるパラメータが、他のプラグインでも使えるのかどうかがわかりづらい
    • 割りとコアな処理をプラグイン側で上書きしてしまっていることがよくある
    • などなど
    • 上記のような問題を解決したい。また、便利な機能も一緒に提供することで、移行を進めたい
  • Compatibility of plugins

    • v0.12のプラグインは Fluent::Input などのクラスのサブクラス
    • compatibility layer: Fluent::Compat::Klass で、v0.12 にしかないメソッドを提供するなどして、互換性を維持
  • Compatibility of configurations

    • v0.12 形式のパラメータを v0.14 形式に自動変換する helper を提供する。plugin 開発者にこれを使ってもらう必要がある
    • v0.14 API を使ったプラグインを、v0.12 で動かすことはできない(ようにした)。gemspec に依存関係を書いてもらう必要がある
  • v0.14 プラグインクラスの書き方

    • すべてのクラスを Fluent::Plugin モジュールの下に置く(いままでは Fluent 直下だった)
    • 親クラスのメソッド(#configure とか)をオーバライドするときは必ず super を呼ぶ
    • super を呼んでない場合は、互換性レイヤが代わりに super を呼んで、WARNING を出す
    • 親クラス(Fluent::Plugin::Input とか)の階層を整理
  • Fluent::Plugin::Output

    • Output Plugin は物凄くいろいろ変わった。v0.12 まではやりたいことによって異なるクラスを使い分ける必要があったが、1個の Output クラスに統合された
    • chunk を分ける条件を細かく指定できるようになった。いままではサイズのみの chunking だったが、例えば時間でも chunking できるし、時間とサイズの組合せでもできる
    • Non-buffered, buffered synchronous, buffered asynchronous のいずれか1個を実装してあれば動く
  • Delayed commit

    • 書き込みの ACK を待ちたい場合、いままでは write メソッドの中で待つ必要があった。write メソッドが終わると、送ったデータが chunk から消されてしまうため
    • v0.14 では try_write メソッドで commit を待たずに return する。この時点では chunk が消されない。非同期のチェックスレッドが ACK を受け取って #commit_write が呼ばれた時点で、chunk が消される
    • ユースケース:分散ファイルシステムに書き込んでから、正しく読めることを確認してから chunk を消したい場合
  • configurations: flushing buffers

    • chunk を flush するタイミングを、細かく制御できるようになった
    • flush_mode: immediate → すぐに書き込んで欲しいが、失敗した時はリトライして欲しい場合に使うモード
  • Retries

    • いままではリトライ回数(retry_count)しか指定できなかったが、タイムアウト(retry_timeout)を指定できるようになった(どれだけの期間、リトライし続けるか)
    • いままでは72時間リトライし続けていた。この仕様は自明ではなかった。いつまで待てば諦めが付くのか、誰にもわからない状態だった
  • その他のプラグイン: Buffer, Parser, Formatter, Storage, ...

    • これらを v0.14 では "Owned" plugin と呼ぶ → 他のプラグイン経由で実体化されるプラグイン
      • 対義語は primary plugins → Input, Output, Filter plugin
    • Storage plugin:プラグイン内でデータを永続化したい場合に使うプラグイン
      • 例:file plugin の pos file で管理している情報を、storage plugin で永続化
  • Plugin Helpers

    • ヘルパーを使いたいときは、明示的に "helpers :name" と指定してもらうことで、どのヘルパーを使っているのか明示されるようにした
  • New Test Drivers

    • いままでは実行タイミングに依存していたようなコードを、きちんとテストできるようにした
    • 例えば、flushのタイミングを制御するとか
  • Plans for v0.14.x

    • 古橋さんの話+α
    • plugin generator を入れようと思っている。いまは、他の人の書いたプラグインの真似をしてプラグインを書く、という状態で、これは良くない
    • 新しいAPI:Fluentd 全体で使えるバッファサイズの上限を指定、Counter API
  • 開発者向けのドキュメントはこれから書く(書かなければいけないと思ってはいる)

OS X + Docker Machine + Cloudera QuickStart Docker Image で Spark MLlib のお試し環境を構築する

f:id:muziyoshiz:20160529223041p:plain

はじめに

Cloudera は以前から、Hadoop の機能を簡単に試すための VM イメージを配布しています(Cloudera QuickStart VM のダウンロードページ)。配布されているイメージは KVM 版、Virtual Box 版、VMWare 版の3種類です。

しかし、Vagrant で使える box ファイル版は提供されておらず、コマンド一発での環境構築はできませんでした。もちろん、Virtual Box 版のイメージから box ファイルを作ることは可能ですが、Cloudera のバージョンアップに追従する作業は面倒でした。

しかし、去年の12月から、Cloudera 社が QuickStart VM の Docker イメージ版を公式に配布するようになりました。以下は、公式配布に関する Cloudera 公式のブログ記事です。

blog.cloudera.com

最近、Spark MLlib を勉強するための環境を作る機会があったので、せっかくなので Cloudera QuickStart Docker Image で環境構築してみました。その際に、普通に進めるとうまくいかないポイントがいくつかあったので、そのときの構築手順をまとめておきます。

動作環境

  • MacBook Pro (Retina, 15-inch, Mid 2014)
    • 16GB onboard memory
  • OS X Yosemite version 10.10.5

構築される環境

今回の手順を実行すると、最終的に以下のような環境が構築されます。

  • VirtualBox 上で、Docker 用の Linux VM(名前:default)が動作する。
    • この Linux VM は CPU 4コア、メモリ 9GB を使用(デフォルトは CPU 1コア、メモリ 2GB)
  • 上記の Linux VM 上で、Cloudera Quickstart Docker Image から作られたコンテナが動作する。
    • このコンテナはメモリ 8GB を使用
  • Docker コンテナ内のシェルから、spark-shell を実行できる。

Cloudera Express は 8GB 以上のメモリを必要とします(参考:Cloudera QuickStart VM)。そのため、構築手順のなかで、Linux VM のメモリをデフォルトより増やす作業を行います。

構築手順

Docker Toolboxのインストール

OS X への Docker のインストール方法は、Installation on Mac OS X が詳しいです。OS X 上で Linux の Docker イメージを直接動かすことはできないため、VirtualBox で Linux VM(名前は default)を動作させ、この Linux VM 上で Docker daemon を動作させます。

f:id:muziyoshiz:20160529182710p:plain
(※ Installation on Mac OS X より抜粋)

まず、Docker Toolbox からインストーラをダウンロードして、Docker Toolbox をインストールします。現時点の最新版は DockerToolbox-1.11.1b.pkg です。

インストール後に以下のコマンドを実行すると、Docker のバージョンを確認できます。この時点では、まだ default VM が動いていないので、Docker daemon に接続できないというメッセージが出ます。

% docker version
Client:
 Version:      1.11.1
 API version:  1.23
 Go version:   go1.5.4
 Git commit:   5604cbe
 Built:        Tue Apr 26 23:44:17 2016
 OS/Arch:      darwin/amd64
Cannot connect to the Docker daemon. Is the docker daemon running on this host?

default VM の初期設定

OS X の Launchpad から Docker Quickstart Terminal を実行します。しばらく待って、Docker のアイコンが出てくれば設定完了です。この例では、Docker ホストの IP アドレスは 192.168.99.100 になりました。

Last login: Sat May 21 00:24:50 on ttys004
bash --login '/Applications/Docker/Docker Quickstart Terminal.app/Contents/Resources/Scripts/start.sh'
% bash --login '/Applications/Docker/Docker Quickstart Terminal.app/Contents/Resources/Scripts/start.sh'
Running pre-create checks...
Creating machine...
(default) Copying /Users/myoshiz/.docker/machine/cache/boot2docker.iso to /Users/myoshiz/.docker/machine/machines/default/boot2docker.iso...
(default) Creating VirtualBox VM...
(default) Creating SSH key...
(default) Starting the VM...
(default) Check network to re-create if needed...
(default) Waiting for an IP...
Waiting for machine to be running, this may take a few minutes...
Detecting operating system of created instance...
Waiting for SSH to be available...
Detecting the provisioner...
Provisioning with boot2docker...
Copying certs to the local machine directory...
Copying certs to the remote machine...
Setting Docker configuration on the remote daemon...
Checking connection to Docker...
Docker is up and running!
To see how to connect your Docker Client to the Docker Engine running on this virtual machine, run: /usr/local/bin/docker-machine env default


                        ##         .
                  ## ## ##        ==
               ## ## ## ## ##    ===
           /"""""""""""""""""\___/ ===
      ~~~ {~~ ~~~~ ~~~ ~~~~ ~~~ ~ /  ===- ~~~
           \______ o           __/
             \    \         __/
              \____\_______/


docker is configured to use the default machine with IP 192.168.99.100
For help getting started, check out the docs at https://docs.docker.com

ちなみに、この表示が出た後で VirtualBox Manager を開くと、default という名前の VM が「実行中」になっていることが確認できます。

default VM のCPUコア、メモリ使用量の変更

Docker コンテナが 8GB のメモリを使えるようにするには、Docker が動作する default VM のメモリ使用量を 8GB 以上にする必要があります。メモリを増やす方法は、以下のページを参考にしました。

具体的な手順は以下の通りです。

  • docker-machine stop default を実行し、default VM を停止
  • VirtualBox Manager を起動し、"default" という名前の VM の「設定」欄を開く
  • 設定の「システム」タブで、「マザーボード」を選択し、メインメモリーを 2048MB から 9216MB(9GB)に変更
  • 同じく、設定の「システム」タブで、「プロセッサー」を選択し、プロセッサー数を 1 から 4 に変更
  • 「OK」ボタンを押して、設定を閉じる
  • docker-machine start default を実行し、default VM を再起動

docker info コマンドを実行すると、メモリとCPUコア数が増えていることが確認できます。

% docker info
Containers: 0
 Running: 0
 Paused: 0
 Stopped: 0
Images: 0
Server Version: 1.11.1
Storage Driver: aufs
 Root Dir: /mnt/sda1/var/lib/docker/aufs
 Backing Filesystem: extfs
 Dirs: 0
 Dirperm1 Supported: true
Logging Driver: json-file
Cgroup Driver: cgroupfs
Plugins:
 Volume: local
 Network: bridge null host
Kernel Version: 4.4.8-boot2docker
Operating System: Boot2Docker 1.11.1 (TCL 7.0); HEAD : 7954f54 - Wed Apr 27 16:36:45 UTC 2016
OSType: linux
Architecture: x86_64
CPUs: 4
Total Memory: 8.762 GiB
Name: default
ID: 5L66:CKKI:BIK7:BRUT:H6NI:XMFL:AR36:R6UQ:YOFG:OQPM:M5L6:PU6L
Docker Root Dir: /mnt/sda1/var/lib/docker
Debug mode (client): false
Debug mode (server): true
 File Descriptors: 13
 Goroutines: 32
 System Time: 2016-05-24T13:18:09.388371356Z
 EventsListeners: 0
Registry: https://index.docker.io/v1/
Labels:
 provider=virtualbox

Cloudera QuickStart Docker Image のダウンロード

Cloudera QuickStart Docker Image は Docker Hub で公開されています(Docker Hub の cloudera/quickstart ページ)。そのため docker pull コマンドでイメージをダウンロードできます。ダウンロードサイズが 4.4GB あるので、結構時間がかかりました。

% docker pull cloudera/quickstart:latest
latest: Pulling from cloudera/quickstart
1d00652ce734: Pull complete
Digest: sha256:f91bee4cdfa2c92ea3652929a22f729d4d13fc838b00f120e630f91c941acb63
Status: Downloaded newer image for cloudera/quickstart:latest
% docker images
REPOSITORY            TAG                 IMAGE ID            CREATED             SIZE
cloudera/quickstart   latest              4239cd2958c6        6 weeks ago         6.336 GB

コンテナの起動

以下のコマンドを実行して、コンテナを起動します。

docker run --hostname=quickstart.cloudera \
--privileged=true -t -i -d -p 18888:8888 -p 17180:7180 -p 10080:80 \
-m 8192m cloudera/quickstart:latest /usr/bin/docker-quickstart

このコマンドは Docker Hub の cloudera/quickstart ページ にあるものをベースに、以下の修正を加えたものです。

  • コンテナをデーモンとして起動(あとから docker exec で接続)
  • -m 8192m を指定して、メモリ使用量を 8GB に指定
  • マッピングされるポート番号が起動のたびに変わるのを避けるために、マッピング先を、元のポート番号に10000足した値に固定

コンテナの起動後は、docker exec コマンドで、コンテナ上のシェルに接続します。以下の例では、コンテナ ID の一部を指定して接続しています。

% docker ps
CONTAINER ID        IMAGE                        COMMAND                  CREATED             STATUS              PORTS                                                                     NAMES
048c433261a7        cloudera/quickstart:latest   "/usr/bin/docker-quic"   23 seconds ago      Up 23 seconds       0.0.0.0:10080->80/tcp, 0.0.0.0:17180->7180/tcp, 0.0.0.0:18888->8888/tcp   gigantic_poitras
% docker exec -it 048c bash

接続後に free コマンドでメモリ使用量を見ると、8GB 以上のメモリを使っているように見えます。

[root@quickstart /]# free
             total       used       free     shared    buffers     cached
Mem:       9187728    5623316    3564412     169020      34224    1127784
-/+ buffers/cache:    4461308    4726420
Swap:      3227556          0    3227556

ただ、cgroup の設定を確認すると、1024*1024*1024*8 = 8589934592 で 8GB になっていたので、制限は効いているのではないかと思います。このあたりはちょっと自信がないです。

[root@quickstart /]# cat /sys/fs/cgroup/memory/memory.limit_in_bytes
8589934592

Cloudera Express の起動

コンテナ内で以下のコマンドを実行し、Cloudera Express を起動します。このコマンドは、コンテナの再起動後にも実行する必要があります。

[root@quickstart /]# /home/cloudera/cloudera-manager --express --force
[QuickStart] Shutting down CDH services via init scripts...
kafka-server: unrecognized service
JMX enabled by default
Using config: /etc/zookeeper/conf/zoo.cfg
[QuickStart] Disabling CDH services on boot...
error reading information on service kafka-server: No such file or directory
[QuickStart] Starting Cloudera Manager server...
[QuickStart] Waiting for Cloudera Manager API...
[QuickStart] Starting Cloudera Manager agent...
[QuickStart] Configuring deployment...
Submitted jobs: 15
[QuickStart] Deploying client configuration...
Submitted jobs: 16
[QuickStart] Starting Cloudera Management Service...
Submitted jobs: 24
[QuickStart] Enabling Cloudera Manager daemons on boot...
________________________________________________________________________________

Success! You can now log into Cloudera Manager from the QuickStart VM's browser:

    http://quickstart.cloudera:7180

    Username: cloudera
    Password: cloudera

出てくるメッセージは QuickStart VM の場合と全く一緒ですね……。

Web UI の接続確認

ここまで来れば、ホスト OS(OS X)の Web ブラウザから、Cloudera の Web UI の動作を確認できます。

また、この時点ではアクセスできませんが、Cloudera Manager から Hue を起動した後に、以下のアドレスで Hue にアクセスできます。

Cloudera Manager からサービス起動

http://192.168.99.100:17180/ にアクセスし、以下の手順で、Spark Shell の動作に必要なサービスを起動します。

  • Cloudera Manager Service の起動を待つ
  • クロックオフセットの設定を修正する(後述)
  • 他のサービスを、以下の順に起動する
    • HDFS
    • Hive
    • YARN
    • Spark

サービスの起動までに時間が掛かります。起動した直後は Bad Health のことがありますが、その場合はしばらく待てば起動します。

また、他の人の環境でも起こる問題かはわからないのですが、私の環境では Host のステータスが以下のようになる問題が発生しました。

致命的なヘルスの問題:クロックのオフセット
(英語では Bad health: Clock Offset)

この問題は、同じ OS X マシンで Cloudera QuickStart VM を使ったときにも発生したのですが、そのときと同じ対処方法が有効でした。以下の手順に従って、オフセットの設定を無効にすれば、ステータスが Good に戻って、問題なく動作するようになります。

  • Cloudera Managerのメニューから、 Host(ホスト) → Configuration(設定) と遷移する
  • 検索キーワード欄を使って、Host Clock Offset Thresholds(ホストクロックオフセットのしきい値) を表示する
  • この設定の「Warning(警告)」と「Critical(致命的)」を、両方とも「Never(行わない)」に変更する
  • 「Save Changes(変更の保存)」ボタンを押す

Spark Shell の起動

Spark Shell を初めて起動する前に、以下のコマンドを実行してください。

[root@quickstart /]# sudo -u hdfs hadoop fs -chmod 777 /user/spark
[root@quickstart /]# sudo -u spark hadoop fs -chmod 777 /user/spark/applicationHistory

上記のコマンドを実行しなかった場合、Spark Shell の起動中に以下のエラーが表示されてしまいます。どうも、Cloudera QuickStart の不備のようです(参考:Solved: [CDH 5.5 VirtualBox] unable to connect to Spark Ma... - Cloudera Community)。

16/05/29 06:37:15 ERROR spark.SparkContext: Error initializing SparkContext.
org.apache.hadoop.security.AccessControlException: Permission denied: user=root, access=WRITE, inode="/user/spark/applicationHistory":spark:supergroup:drwxr-xr-x

そして、以下のコマンドを実行すると Spark Shell が起動します。Cloudera Quickstart VM は擬似分散モードで動作しているので、引数には --master yarn-client を指定する必要があります。

[root@quickstart /]# spark-shell --master yarn-client
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.0
      /_/

Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_67)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc (master = yarn-client, app id = application_1464516865260_0001).
SQL context available as sqlContext.

scala>

以上で、Spark MLlib を試すための環境構築は完了です。

Spark MLlib を試す

Apache Spark入門 動かして学ぶ最新並列分散処理フレームワーク (NEXT ONE)

Apache Spark入門 動かして学ぶ最新並列分散処理フレームワーク (NEXT ONE)

構築に成功したか確認するために、Apache Spark 入門の8章に掲載されていたコードを Spark Shell で実行してみました。内容は、K-means を用いたクラスタリングです。

HDFS へのサンプルデータのアップロード

Cloudera QuickStart には Spark MLlib のサンプルデータが含まれていませんでした。そのため、まずはテストに使うサンプルデータをダウンロードして、HDFS にアップロードします。

[root@quickstart /]# curl https://raw.githubusercontent.com/apache/spark/master/data/mllib/kmeans_data.txt > kmeans_data.txt
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0    72    0    72    0     0     40      0 --:--:--  0:00:01 --:--:--    47
[root@quickstart /]# cat kmeans_data.txt
0.0 0.0 0.0
0.1 0.1 0.1
0.2 0.2 0.2
9.0 9.0 9.0
9.1 9.1 9.1
9.2 9.2 9.2
[root@quickstart /]# hadoop fs -put kmeans_data.txt /tmp
[root@quickstart /]# hadoop fs -ls /tmp
Found 5 items
drwxrwxrwx   - hdfs   supergroup          0 2016-05-29 13:02 /tmp/.cloudera_health_monitoring_canary_files
drwxrwxrwt   - mapred mapred              0 2016-04-06 02:26 /tmp/hadoop-yarn
drwx-wx-wx   - hive   supergroup          0 2016-05-29 09:57 /tmp/hive
-rw-r--r--   1 root   supergroup         72 2016-05-29 13:02 /tmp/kmeans_data.txt
drwxrwxrwt   - mapred hadoop              0 2016-05-29 10:19 /tmp/logs

サンプルデータの RDD への変換

Spark Shell を起動して、HDFS からサンプルデータを読み込んで、RDD に変換します。

[root@quickstart /]# spark-shell --master yarn-client
(中略)

scala> import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.clustering.KMeans

scala> import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.Vectors

scala> val data = sc.textFile("hdfs://localhost/tmp/kmeans_data.txt")
data: org.apache.spark.rdd.RDD[String] = hdfs://localhost/tmp/kmeans_data.txt MapPartitionsRDD[1] at textFile at <console>:29

scala> val parsedData = data.map{ s =>
     |   Vectors.dense(
     |     s.split(' ').map(_.toDouble))
     | }.cache()
parsedData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MapPartitionsRDD[2] at map at <console>:31

KMeans のモデルの作成

これ以降は、本に載っているコードをそのまま実行できました。KMeans.train メソッドでモデルを生成します。

scala> val numClusters = 2
numClusters: Int = 2

scala> val numIterations = 20
numIterations: Int = 20

scala> val clusters = KMeans.train(parsedData, numClusters, numIterations)
16/05/29 13:07:00 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
16/05/29 13:07:00 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
clusters: org.apache.spark.mllib.clustering.KMeansModel = org.apache.spark.mllib.clustering.KMeansModel@5d27cdd2

上記の WARN は、native library のロードに失敗したことを表しているようです(参考:Using Spark MLlib)。Hardware acceleration が効かないだけなので、この WARN は無視します。

生成されたモデルの確認

生成されたクラスタの数、および各クラスタの中心の座標を確認します。

scala> clusters.k
res0: Int = 2

scala> clusters.clusterCenters
res1: Array[org.apache.spark.mllib.linalg.Vector] = Array([0.1,0.1,0.1], [9.099999999999998,9.099999999999998,9.099999999999998])

新たなベクトルを与えて、いずれのクラスタに分類されるか確認します。

scala> val vec1 = Vectors.dense(0.3, 0.3, 0.3)
vec1: org.apache.spark.mllib.linalg.Vector = [0.3,0.3,0.3]

scala> clusters.predict(vec1)
res2: Int = 0

scala> val vec2 = Vectors.dense(8.0, 8.0, 8.0)
vec2: org.apache.spark.mllib.linalg.Vector = [8.0,8.0,8.0]

scala> clusters.predict(vec2)
res3: Int = 1

元のファイルに含まれていた座標が、いずれのクラスタに分類されるかを判定します。判定結果は HDFS に出力します。

scala> val predictedLabels = parsedData.map(vec => clusters.predict(vec))
predictedLabels: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[32] at map at <console>:39

scala> predictedLabels.saveAsTextFile("/tmp/output")

scala> exit
[root@quickstart mllib]# hadoop fs -ls /tmp/output
Found 3 items
-rw-r--r--   1 root supergroup          0 2016-05-29 07:12 /tmp/output/_SUCCESS
-rw-r--r--   1 root supergroup          8 2016-05-29 07:12 /tmp/output/part-00000
-rw-r--r--   1 root supergroup          4 2016-05-29 07:12 /tmp/output/part-00001
[root@quickstart mllib]# hadoop fs -cat /tmp/output/part-*
1
1
1
0
0
0

まとめ

一通りの作業を試してみて、Cloudera QuickStart Docker Image で Spark MLlib のお試し環境を構築できることがわかりました。

使ってみた感想としては、コマンドラインから QuickStart のコンテナを起動・停止できるのは便利でした。その一方で、この QuickStart Docker Image は、Cloudera QuickStart VM を単純に Docker image にしただけ、という感じで、Web ブラウザから行わなければいけない設定やサービス起動は相変わらず多かったです。そのため、この Docker image をベースに環境構築を自動化するのは大変そうです。

Spark を CDH と一緒に使いたいという特別な理由がなければ、もっと軽量な Docker イメージを探したほうがよいかもしれませんね……。