無印吉澤

Site Reliability Engineering(SRE)、ソフトウェア開発、クラウドコンピューティングなどについて、吉澤が調べたり試したことを書いていくブログです。

Fluentd Meetup 2016 Summer レポート 〜 v0.14 の新機能からプラグイン開発者向け API まで

f:id:muziyoshiz:20160602000413p:plain
  • イベント名:Fluentd Meetup 2016 Summer
  • 開催日時:2016-06-01(月)
  • 会場:イベント&コミュニティスペース dots.

約1年ぶりに開催された Fluentd Meetup に参加してきました。今回は、5月31日にリリースされたメジャーバージョンアップの v0.14 について、ユーザ向けの機能紹介から、プラグイン開発者向けの深い話まで、盛りだくさんの内容でした。自分でプラグインを書くくらい、Fluentd をヘビーに使う人向けのイベントという感じで、どの話も面白かったです。

最近、私は Fluentd を使う機会が全然なかったこともあって、「Fluentd も機能的には枯れてきて、そろそろ新機能もあまりないだろう」と思っていたのですが、まだこんなに改善の余地があったのか……とちょっと驚きました。個人的には、古橋さんの講演で将来の構想として出てきた、Kafka っぽい PubSub は便利そうなので是非実装してほしいです。

以下、講演内容のメモです。プレゼンに書かれてなくて口頭で説明されていたことや、個人的に気になったことを中心にメモしています。

講演内容

Past & Future of Fluentd (古橋 貞之, @frsyuki)

  • Fluentdの歴史

    • 2011年6月に最初のバージョンをリリースした。今年の6月で5年。ここまで続くと、かなり成功したOSSと言っていいのではないか。
    • 新メジャーバージョンの 0.14 が昨日リリースされた。
  • 2年前にリリースされたv0.12の機能の振り返り

    • タグの書き換えを不要にする label と filter
  • 将来の構想

    • Advanced back pressure - 転送先が詰まるのを防ぐ
    • PubSub & pull forward - Kafkaに近い発想。ログが必要なアプリが取りに行く
    • Compressed buffer
    • Distributed buffer - twitterがリリースした distributedlog や、Apache BookKeeper のような機能を Fluentd の buffer のところに入れる
    • Service plugins - パッケージ化されたソフト(例えば Norikra)をFluentdで動作させるようにする。service label を書くだけで起動。インストールが簡単になる
    • Schema-validated logging - バリデートされたログだけを転送することで、無駄な転送や、データベース書き込み時のエラーを回避する
    • Centralized config manager - Chef や Ansible を使うことも考えられるが、Dockerで起動したインスタンスのなかの設定に Chef などを使いたくない。これに fluentd-ui が組み合わさると嬉しい
  • Fluentdのロゴが変わる予定

v0.14 Overview (中川 真宏, @repeatedly)

  • New Plugin APIs

    • v0.12向けに作ったプラグインは、互換性レイヤがあるので、ほぼ動くはず
    • インスタンス変数に直接アクセスしているようなプラグインは、動かない可能性がある
    • 次の v1 でも互換性レイヤは残す予定
  • buffer の設計の大幅な見直し

    • v0.12 buffer design: chunkの保存方法。ElasticSearch に書き込むときに、インデックスの値を見て書き込み先を変える、といった場合に問題になった。リトライがうまくいかない。
    • v0.14で、chunkの管理が柔軟になり、このような問題は解決できる
  • Plugin Storage & Helpers

    • プラグイン内で使えるストレージレイヤ(KVS)の導入
    • プラグインの作成時によく使う雛形コードをhelperで提供
  • Time with nanosecond

    • ミリ秒とナノ秒のどちらにするか、という議論があったが、ナノ秒でログを出すシステムが増えてきたのでナノ秒
    • 内部的には Fluent::EventTime という形で持つ。既存のプラグインに対しては Integer っぽく振る舞うクラス。
  • ServerEngine based Supervisor

    • プロセスモデルを ServerEngine に置き換える
  • Windows support

    • Linux 向けに特化したコードを、Windows で動くように直したりした
    • HTTP RPC でバッファのフラッシュなどを行う
  • v0.14.x - v1

    • https://github.com/fluent/fluentd/issues/1000 に並んでいる
    • v1 を出す。v1 で新機能を入れることはない。v1 は今まで入れたAPIをFixする、と宣言するためのバージョン。フィードバックは v0.14 のうちにしてほしい
    • v1 は 2016 4Q か 2017 1Q に出る
    • いくつか v1 で消したい機能はある
  • v0.14 系でこれから入れる予定の目玉機能

    • Symmetric multi-core processing: パフォーマンス向上。TCPポートをWorker間で共用し、Workerで処理を分ける。ServerEngine を使って実現する
    • Counter API: Workerが分かれるので、カウンタを作りづらくなる点をカバー
    • secure-forward をコアに入れる。forward と secure-forward の細かい違いをなくす
  • ベンチマーク結果

    • CPU使用率が、v0.14 で若干上がっている
    • EC2で、100,000msgs/sec での測定結果によると、CPU使用率が4〜5%上がった
    • v0.14はリリースしたばかりで、性能改善できる余地が残っている。これから改善する予定
  • td-agent 3

    • fluentd v0.14 は td-agent 3 から採用(現行の td-agent 2 系には入らない)
    • Ruby 2.3 に上げる
    • Windows 向けパッケージも提供予定
    • リリース日は未定(v0.14 がまだ stable と言えないので)
  • Q) chunk から取得したログの一部で処理が失敗した場合、リトライしたいと思うが、その場合 chunk の詰め直しはできるか?

    • A) できない。その場合は、エラーストリームという機能を使ってほしい

Fluentd ServerEngine Integration & Windows Support (成田 律太, @naritta)

  • 自己紹介

    • 2015年の Treasure Data のインターンシップに参加し、今年入社した
  • ServerEngine

    • Unicorn と大体同じようなもの
    • Supervisor, server, worker の構成を簡単に作れるようにするためのフレームワーク
    • Worker Module, Server Module を Ruby で書く
    • Worker type: thread, process, spawn
    • v0.14 では spawn worker type を利用している。Windows に fork の機能がないため
  • Fluentd で ServerEngine を採用したことによるメリット

    • auto restart
      • Linux でも Windows でも、fluentd のプロセス(worker)をKILLすると、自動的にリスタートするのを確認できる
    • live restart
      • 設定変更したあとでシグナル(または HTTP RPC)を送ると、worker だけをリスタートできる
      • server が TCP 接続を持っているので、ログは欠損しない
    • socket manager
      • server 側で listen して、socket を worker に共有する
      • 将来的にはマルチコアでソケットを共有するために ServerEngine を使う(いまはマルチコアで動かすには in_multiprocess plugin を使う必要がある)
    • signal handler
      • signal handler と log rotation は、Fluentd ユーザにとってのメリットというより、内部実装に関するメリット
      • シグナルをキューに溜めて、シグナル同士の競合を防ぐ
    • log rotation
      • ruby core にすでにポーティングされている機能
      • マルチプロセスでのログローテーションなど
  • Q) ServerEngine が入ることで、設定ファイルが増える?

    • A) Fluentd が内部的に ServerEngine の設定ファイルを作って、ServerEngine に渡す。ユーザ側は Fluentd/td-agent の設定ファイルだけを編集すれば良い

Fluentd v0.14 Plugin API Updates (田籠 聡, @tagomoris)

  • 何故新しい API セットを作ったか?

    • いままでのプラグインには、開発者向けドキュメントがなかった。みんな他の人の書いたプラグインの真似をして書いている
    • プラグイン内で独自にスレッドを起動するようなことが必要で、そのせいでテストしづらい
    • あるプラグインで使えるパラメータが、他のプラグインでも使えるのかどうかがわかりづらい
    • 割りとコアな処理をプラグイン側で上書きしてしまっていることがよくある
    • などなど
    • 上記のような問題を解決したい。また、便利な機能も一緒に提供することで、移行を進めたい
  • Compatibility of plugins

    • v0.12のプラグインは Fluent::Input などのクラスのサブクラス
    • compatibility layer: Fluent::Compat::Klass で、v0.12 にしかないメソッドを提供するなどして、互換性を維持
  • Compatibility of configurations

    • v0.12 形式のパラメータを v0.14 形式に自動変換する helper を提供する。plugin 開発者にこれを使ってもらう必要がある
    • v0.14 API を使ったプラグインを、v0.12 で動かすことはできない(ようにした)。gemspec に依存関係を書いてもらう必要がある
  • v0.14 プラグインクラスの書き方

    • すべてのクラスを Fluent::Plugin モジュールの下に置く(いままでは Fluent 直下だった)
    • 親クラスのメソッド(#configure とか)をオーバライドするときは必ず super を呼ぶ
    • super を呼んでない場合は、互換性レイヤが代わりに super を呼んで、WARNING を出す
    • 親クラス(Fluent::Plugin::Input とか)の階層を整理
  • Fluent::Plugin::Output

    • Output Plugin は物凄くいろいろ変わった。v0.12 まではやりたいことによって異なるクラスを使い分ける必要があったが、1個の Output クラスに統合された
    • chunk を分ける条件を細かく指定できるようになった。いままではサイズのみの chunking だったが、例えば時間でも chunking できるし、時間とサイズの組合せでもできる
    • Non-buffered, buffered synchronous, buffered asynchronous のいずれか1個を実装してあれば動く
  • Delayed commit

    • 書き込みの ACK を待ちたい場合、いままでは write メソッドの中で待つ必要があった。write メソッドが終わると、送ったデータが chunk から消されてしまうため
    • v0.14 では try_write メソッドで commit を待たずに return する。この時点では chunk が消されない。非同期のチェックスレッドが ACK を受け取って #commit_write が呼ばれた時点で、chunk が消される
    • ユースケース:分散ファイルシステムに書き込んでから、正しく読めることを確認してから chunk を消したい場合
  • configurations: flushing buffers

    • chunk を flush するタイミングを、細かく制御できるようになった
    • flush_mode: immediate → すぐに書き込んで欲しいが、失敗した時はリトライして欲しい場合に使うモード
  • Retries

    • いままではリトライ回数(retry_count)しか指定できなかったが、タイムアウト(retry_timeout)を指定できるようになった(どれだけの期間、リトライし続けるか)
    • いままでは72時間リトライし続けていた。この仕様は自明ではなかった。いつまで待てば諦めが付くのか、誰にもわからない状態だった
  • その他のプラグイン: Buffer, Parser, Formatter, Storage, ...

    • これらを v0.14 では "Owned" plugin と呼ぶ → 他のプラグイン経由で実体化されるプラグイン
      • 対義語は primary plugins → Input, Output, Filter plugin
    • Storage plugin:プラグイン内でデータを永続化したい場合に使うプラグイン
      • 例:file plugin の pos file で管理している情報を、storage plugin で永続化
  • Plugin Helpers

    • ヘルパーを使いたいときは、明示的に "helpers :name" と指定してもらうことで、どのヘルパーを使っているのか明示されるようにした
  • New Test Drivers

    • いままでは実行タイミングに依存していたようなコードを、きちんとテストできるようにした
    • 例えば、flushのタイミングを制御するとか
  • Plans for v0.14.x

    • 古橋さんの話+α
    • plugin generator を入れようと思っている。いまは、他の人の書いたプラグインの真似をしてプラグインを書く、という状態で、これは良くない
    • 新しいAPI:Fluentd 全体で使えるバッファサイズの上限を指定、Counter API
  • 開発者向けのドキュメントはこれから書く(書かなければいけないと思ってはいる)

OS X + Docker Machine + Cloudera QuickStart Docker Image で Spark MLlib のお試し環境を構築する

f:id:muziyoshiz:20160529223041p:plain

はじめに

Cloudera は以前から、Hadoop の機能を簡単に試すための VM イメージを配布しています(Cloudera QuickStart VM のダウンロードページ)。配布されているイメージは KVM 版、Virtual Box 版、VMWare 版の3種類です。

しかし、Vagrant で使える box ファイル版は提供されておらず、コマンド一発での環境構築はできませんでした。もちろん、Virtual Box 版のイメージから box ファイルを作ることは可能ですが、Cloudera のバージョンアップに追従する作業は面倒でした。

しかし、去年の12月から、Cloudera 社が QuickStart VM の Docker イメージ版を公式に配布するようになりました。以下は、公式配布に関する Cloudera 公式のブログ記事です。

blog.cloudera.com

最近、Spark MLlib を勉強するための環境を作る機会があったので、せっかくなので Cloudera QuickStart Docker Image で環境構築してみました。その際に、普通に進めるとうまくいかないポイントがいくつかあったので、そのときの構築手順をまとめておきます。

動作環境

  • MacBook Pro (Retina, 15-inch, Mid 2014)
    • 16GB onboard memory
  • OS X Yosemite version 10.10.5

構築される環境

今回の手順を実行すると、最終的に以下のような環境が構築されます。

  • VirtualBox 上で、Docker 用の Linux VM(名前:default)が動作する。
    • この Linux VM は CPU 4コア、メモリ 9GB を使用(デフォルトは CPU 1コア、メモリ 2GB)
  • 上記の Linux VM 上で、Cloudera Quickstart Docker Image から作られたコンテナが動作する。
    • このコンテナはメモリ 8GB を使用
  • Docker コンテナ内のシェルから、spark-shell を実行できる。

Cloudera Express は 8GB 以上のメモリを必要とします(参考:Cloudera QuickStart VM)。そのため、構築手順のなかで、Linux VM のメモリをデフォルトより増やす作業を行います。

構築手順

Docker Toolboxのインストール

OS X への Docker のインストール方法は、Installation on Mac OS X が詳しいです。OS X 上で Linux の Docker イメージを直接動かすことはできないため、VirtualBox で Linux VM(名前は default)を動作させ、この Linux VM 上で Docker daemon を動作させます。

f:id:muziyoshiz:20160529182710p:plain
(※ Installation on Mac OS X より抜粋)

まず、Docker Toolbox からインストーラをダウンロードして、Docker Toolbox をインストールします。現時点の最新版は DockerToolbox-1.11.1b.pkg です。

インストール後に以下のコマンドを実行すると、Docker のバージョンを確認できます。この時点では、まだ default VM が動いていないので、Docker daemon に接続できないというメッセージが出ます。

% docker version
Client:
 Version:      1.11.1
 API version:  1.23
 Go version:   go1.5.4
 Git commit:   5604cbe
 Built:        Tue Apr 26 23:44:17 2016
 OS/Arch:      darwin/amd64
Cannot connect to the Docker daemon. Is the docker daemon running on this host?

default VM の初期設定

OS X の Launchpad から Docker Quickstart Terminal を実行します。しばらく待って、Docker のアイコンが出てくれば設定完了です。この例では、Docker ホストの IP アドレスは 192.168.99.100 になりました。

Last login: Sat May 21 00:24:50 on ttys004
bash --login '/Applications/Docker/Docker Quickstart Terminal.app/Contents/Resources/Scripts/start.sh'
% bash --login '/Applications/Docker/Docker Quickstart Terminal.app/Contents/Resources/Scripts/start.sh'
Running pre-create checks...
Creating machine...
(default) Copying /Users/myoshiz/.docker/machine/cache/boot2docker.iso to /Users/myoshiz/.docker/machine/machines/default/boot2docker.iso...
(default) Creating VirtualBox VM...
(default) Creating SSH key...
(default) Starting the VM...
(default) Check network to re-create if needed...
(default) Waiting for an IP...
Waiting for machine to be running, this may take a few minutes...
Detecting operating system of created instance...
Waiting for SSH to be available...
Detecting the provisioner...
Provisioning with boot2docker...
Copying certs to the local machine directory...
Copying certs to the remote machine...
Setting Docker configuration on the remote daemon...
Checking connection to Docker...
Docker is up and running!
To see how to connect your Docker Client to the Docker Engine running on this virtual machine, run: /usr/local/bin/docker-machine env default


                        ##         .
                  ## ## ##        ==
               ## ## ## ## ##    ===
           /"""""""""""""""""\___/ ===
      ~~~ {~~ ~~~~ ~~~ ~~~~ ~~~ ~ /  ===- ~~~
           \______ o           __/
             \    \         __/
              \____\_______/


docker is configured to use the default machine with IP 192.168.99.100
For help getting started, check out the docs at https://docs.docker.com

ちなみに、この表示が出た後で VirtualBox Manager を開くと、default という名前の VM が「実行中」になっていることが確認できます。

default VM のCPUコア、メモリ使用量の変更

Docker コンテナが 8GB のメモリを使えるようにするには、Docker が動作する default VM のメモリ使用量を 8GB 以上にする必要があります。メモリを増やす方法は、以下のページを参考にしました。

具体的な手順は以下の通りです。

  • docker-machine stop default を実行し、default VM を停止
  • VirtualBox Manager を起動し、"default" という名前の VM の「設定」欄を開く
  • 設定の「システム」タブで、「マザーボード」を選択し、メインメモリーを 2048MB から 9216MB(9GB)に変更
  • 同じく、設定の「システム」タブで、「プロセッサー」を選択し、プロセッサー数を 1 から 4 に変更
  • 「OK」ボタンを押して、設定を閉じる
  • docker-machine start default を実行し、default VM を再起動

docker info コマンドを実行すると、メモリとCPUコア数が増えていることが確認できます。

% docker info
Containers: 0
 Running: 0
 Paused: 0
 Stopped: 0
Images: 0
Server Version: 1.11.1
Storage Driver: aufs
 Root Dir: /mnt/sda1/var/lib/docker/aufs
 Backing Filesystem: extfs
 Dirs: 0
 Dirperm1 Supported: true
Logging Driver: json-file
Cgroup Driver: cgroupfs
Plugins:
 Volume: local
 Network: bridge null host
Kernel Version: 4.4.8-boot2docker
Operating System: Boot2Docker 1.11.1 (TCL 7.0); HEAD : 7954f54 - Wed Apr 27 16:36:45 UTC 2016
OSType: linux
Architecture: x86_64
CPUs: 4
Total Memory: 8.762 GiB
Name: default
ID: 5L66:CKKI:BIK7:BRUT:H6NI:XMFL:AR36:R6UQ:YOFG:OQPM:M5L6:PU6L
Docker Root Dir: /mnt/sda1/var/lib/docker
Debug mode (client): false
Debug mode (server): true
 File Descriptors: 13
 Goroutines: 32
 System Time: 2016-05-24T13:18:09.388371356Z
 EventsListeners: 0
Registry: https://index.docker.io/v1/
Labels:
 provider=virtualbox

Cloudera QuickStart Docker Image のダウンロード

Cloudera QuickStart Docker Image は Docker Hub で公開されています(Docker Hub の cloudera/quickstart ページ)。そのため docker pull コマンドでイメージをダウンロードできます。ダウンロードサイズが 4.4GB あるので、結構時間がかかりました。

% docker pull cloudera/quickstart:latest
latest: Pulling from cloudera/quickstart
1d00652ce734: Pull complete
Digest: sha256:f91bee4cdfa2c92ea3652929a22f729d4d13fc838b00f120e630f91c941acb63
Status: Downloaded newer image for cloudera/quickstart:latest
% docker images
REPOSITORY            TAG                 IMAGE ID            CREATED             SIZE
cloudera/quickstart   latest              4239cd2958c6        6 weeks ago         6.336 GB

コンテナの起動

以下のコマンドを実行して、コンテナを起動します。

docker run --hostname=quickstart.cloudera \
--privileged=true -t -i -d -p 18888:8888 -p 17180:7180 -p 10080:80 \
-m 8192m cloudera/quickstart:latest /usr/bin/docker-quickstart

このコマンドは Docker Hub の cloudera/quickstart ページ にあるものをベースに、以下の修正を加えたものです。

  • コンテナをデーモンとして起動(あとから docker exec で接続)
  • -m 8192m を指定して、メモリ使用量を 8GB に指定
  • マッピングされるポート番号が起動のたびに変わるのを避けるために、マッピング先を、元のポート番号に10000足した値に固定

コンテナの起動後は、docker exec コマンドで、コンテナ上のシェルに接続します。以下の例では、コンテナ ID の一部を指定して接続しています。

% docker ps
CONTAINER ID        IMAGE                        COMMAND                  CREATED             STATUS              PORTS                                                                     NAMES
048c433261a7        cloudera/quickstart:latest   "/usr/bin/docker-quic"   23 seconds ago      Up 23 seconds       0.0.0.0:10080->80/tcp, 0.0.0.0:17180->7180/tcp, 0.0.0.0:18888->8888/tcp   gigantic_poitras
% docker exec -it 048c bash

接続後に free コマンドでメモリ使用量を見ると、8GB 以上のメモリを使っているように見えます。

[root@quickstart /]# free
             total       used       free     shared    buffers     cached
Mem:       9187728    5623316    3564412     169020      34224    1127784
-/+ buffers/cache:    4461308    4726420
Swap:      3227556          0    3227556

ただ、cgroup の設定を確認すると、1024*1024*1024*8 = 8589934592 で 8GB になっていたので、制限は効いているのではないかと思います。このあたりはちょっと自信がないです。

[root@quickstart /]# cat /sys/fs/cgroup/memory/memory.limit_in_bytes
8589934592

Cloudera Express の起動

コンテナ内で以下のコマンドを実行し、Cloudera Express を起動します。このコマンドは、コンテナの再起動後にも実行する必要があります。

[root@quickstart /]# /home/cloudera/cloudera-manager --express --force
[QuickStart] Shutting down CDH services via init scripts...
kafka-server: unrecognized service
JMX enabled by default
Using config: /etc/zookeeper/conf/zoo.cfg
[QuickStart] Disabling CDH services on boot...
error reading information on service kafka-server: No such file or directory
[QuickStart] Starting Cloudera Manager server...
[QuickStart] Waiting for Cloudera Manager API...
[QuickStart] Starting Cloudera Manager agent...
[QuickStart] Configuring deployment...
Submitted jobs: 15
[QuickStart] Deploying client configuration...
Submitted jobs: 16
[QuickStart] Starting Cloudera Management Service...
Submitted jobs: 24
[QuickStart] Enabling Cloudera Manager daemons on boot...
________________________________________________________________________________

Success! You can now log into Cloudera Manager from the QuickStart VM's browser:

    http://quickstart.cloudera:7180

    Username: cloudera
    Password: cloudera

出てくるメッセージは QuickStart VM の場合と全く一緒ですね……。

Web UI の接続確認

ここまで来れば、ホスト OS(OS X)の Web ブラウザから、Cloudera の Web UI の動作を確認できます。

また、この時点ではアクセスできませんが、Cloudera Manager から Hue を起動した後に、以下のアドレスで Hue にアクセスできます。

Cloudera Manager からサービス起動

http://192.168.99.100:17180/ にアクセスし、以下の手順で、Spark Shell の動作に必要なサービスを起動します。

  • Cloudera Manager Service の起動を待つ
  • クロックオフセットの設定を修正する(後述)
  • 他のサービスを、以下の順に起動する
    • HDFS
    • Hive
    • YARN
    • Spark

サービスの起動までに時間が掛かります。起動した直後は Bad Health のことがありますが、その場合はしばらく待てば起動します。

また、他の人の環境でも起こる問題かはわからないのですが、私の環境では Host のステータスが以下のようになる問題が発生しました。

致命的なヘルスの問題:クロックのオフセット
(英語では Bad health: Clock Offset)

この問題は、同じ OS X マシンで Cloudera QuickStart VM を使ったときにも発生したのですが、そのときと同じ対処方法が有効でした。以下の手順に従って、オフセットの設定を無効にすれば、ステータスが Good に戻って、問題なく動作するようになります。

  • Cloudera Managerのメニューから、 Host(ホスト) → Configuration(設定) と遷移する
  • 検索キーワード欄を使って、Host Clock Offset Thresholds(ホストクロックオフセットのしきい値) を表示する
  • この設定の「Warning(警告)」と「Critical(致命的)」を、両方とも「Never(行わない)」に変更する
  • 「Save Changes(変更の保存)」ボタンを押す

Spark Shell の起動

Spark Shell を初めて起動する前に、以下のコマンドを実行してください。

[root@quickstart /]# sudo -u hdfs hadoop fs -chmod 777 /user/spark
[root@quickstart /]# sudo -u spark hadoop fs -chmod 777 /user/spark/applicationHistory

上記のコマンドを実行しなかった場合、Spark Shell の起動中に以下のエラーが表示されてしまいます。どうも、Cloudera QuickStart の不備のようです(参考:Solved: [CDH 5.5 VirtualBox] unable to connect to Spark Ma... - Cloudera Community)。

16/05/29 06:37:15 ERROR spark.SparkContext: Error initializing SparkContext.
org.apache.hadoop.security.AccessControlException: Permission denied: user=root, access=WRITE, inode="/user/spark/applicationHistory":spark:supergroup:drwxr-xr-x

そして、以下のコマンドを実行すると Spark Shell が起動します。Cloudera Quickstart VM は擬似分散モードで動作しているので、引数には --master yarn-client を指定する必要があります。

[root@quickstart /]# spark-shell --master yarn-client
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.0
      /_/

Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_67)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc (master = yarn-client, app id = application_1464516865260_0001).
SQL context available as sqlContext.

scala>

以上で、Spark MLlib を試すための環境構築は完了です。

Spark MLlib を試す

Apache Spark入門 動かして学ぶ最新並列分散処理フレームワーク (NEXT ONE)

Apache Spark入門 動かして学ぶ最新並列分散処理フレームワーク (NEXT ONE)

構築に成功したか確認するために、Apache Spark 入門の8章に掲載されていたコードを Spark Shell で実行してみました。内容は、K-means を用いたクラスタリングです。

HDFS へのサンプルデータのアップロード

Cloudera QuickStart には Spark MLlib のサンプルデータが含まれていませんでした。そのため、まずはテストに使うサンプルデータをダウンロードして、HDFS にアップロードします。

[root@quickstart /]# curl https://raw.githubusercontent.com/apache/spark/master/data/mllib/kmeans_data.txt > kmeans_data.txt
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0    72    0    72    0     0     40      0 --:--:--  0:00:01 --:--:--    47
[root@quickstart /]# cat kmeans_data.txt
0.0 0.0 0.0
0.1 0.1 0.1
0.2 0.2 0.2
9.0 9.0 9.0
9.1 9.1 9.1
9.2 9.2 9.2
[root@quickstart /]# hadoop fs -put kmeans_data.txt /tmp
[root@quickstart /]# hadoop fs -ls /tmp
Found 5 items
drwxrwxrwx   - hdfs   supergroup          0 2016-05-29 13:02 /tmp/.cloudera_health_monitoring_canary_files
drwxrwxrwt   - mapred mapred              0 2016-04-06 02:26 /tmp/hadoop-yarn
drwx-wx-wx   - hive   supergroup          0 2016-05-29 09:57 /tmp/hive
-rw-r--r--   1 root   supergroup         72 2016-05-29 13:02 /tmp/kmeans_data.txt
drwxrwxrwt   - mapred hadoop              0 2016-05-29 10:19 /tmp/logs

サンプルデータの RDD への変換

Spark Shell を起動して、HDFS からサンプルデータを読み込んで、RDD に変換します。

[root@quickstart /]# spark-shell --master yarn-client
(中略)

scala> import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.clustering.KMeans

scala> import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.Vectors

scala> val data = sc.textFile("hdfs://localhost/tmp/kmeans_data.txt")
data: org.apache.spark.rdd.RDD[String] = hdfs://localhost/tmp/kmeans_data.txt MapPartitionsRDD[1] at textFile at <console>:29

scala> val parsedData = data.map{ s =>
     |   Vectors.dense(
     |     s.split(' ').map(_.toDouble))
     | }.cache()
parsedData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MapPartitionsRDD[2] at map at <console>:31

KMeans のモデルの作成

これ以降は、本に載っているコードをそのまま実行できました。KMeans.train メソッドでモデルを生成します。

scala> val numClusters = 2
numClusters: Int = 2

scala> val numIterations = 20
numIterations: Int = 20

scala> val clusters = KMeans.train(parsedData, numClusters, numIterations)
16/05/29 13:07:00 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
16/05/29 13:07:00 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
clusters: org.apache.spark.mllib.clustering.KMeansModel = org.apache.spark.mllib.clustering.KMeansModel@5d27cdd2

上記の WARN は、native library のロードに失敗したことを表しているようです(参考:Using Spark MLlib)。Hardware acceleration が効かないだけなので、この WARN は無視します。

生成されたモデルの確認

生成されたクラスタの数、および各クラスタの中心の座標を確認します。

scala> clusters.k
res0: Int = 2

scala> clusters.clusterCenters
res1: Array[org.apache.spark.mllib.linalg.Vector] = Array([0.1,0.1,0.1], [9.099999999999998,9.099999999999998,9.099999999999998])

新たなベクトルを与えて、いずれのクラスタに分類されるか確認します。

scala> val vec1 = Vectors.dense(0.3, 0.3, 0.3)
vec1: org.apache.spark.mllib.linalg.Vector = [0.3,0.3,0.3]

scala> clusters.predict(vec1)
res2: Int = 0

scala> val vec2 = Vectors.dense(8.0, 8.0, 8.0)
vec2: org.apache.spark.mllib.linalg.Vector = [8.0,8.0,8.0]

scala> clusters.predict(vec2)
res3: Int = 1

元のファイルに含まれていた座標が、いずれのクラスタに分類されるかを判定します。判定結果は HDFS に出力します。

scala> val predictedLabels = parsedData.map(vec => clusters.predict(vec))
predictedLabels: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[32] at map at <console>:39

scala> predictedLabels.saveAsTextFile("/tmp/output")

scala> exit
[root@quickstart mllib]# hadoop fs -ls /tmp/output
Found 3 items
-rw-r--r--   1 root supergroup          0 2016-05-29 07:12 /tmp/output/_SUCCESS
-rw-r--r--   1 root supergroup          8 2016-05-29 07:12 /tmp/output/part-00000
-rw-r--r--   1 root supergroup          4 2016-05-29 07:12 /tmp/output/part-00001
[root@quickstart mllib]# hadoop fs -cat /tmp/output/part-*
1
1
1
0
0
0

まとめ

一通りの作業を試してみて、Cloudera QuickStart Docker Image で Spark MLlib のお試し環境を構築できることがわかりました。

使ってみた感想としては、コマンドラインから QuickStart のコンテナを起動・停止できるのは便利でした。その一方で、この QuickStart Docker Image は、Cloudera QuickStart VM を単純に Docker image にしただけ、という感じで、Web ブラウザから行わなければいけない設定やサービス起動は相変わらず多かったです。そのため、この Docker image をベースに環境構築を自動化するのは大変そうです。

Spark を CDH と一緒に使いたいという特別な理由がなければ、もっと軽量な Docker イメージを探したほうがよいかもしれませんね……。

社内Wikiに情報を書くときに守ってほしい、たったひとつのルール

f:id:muziyoshiz:20160425013924j:plain

このページについて

これは、社内 Wiki に情報を書くときに、私が個人的に守っていて、チームメンバにもできるだけ守ってほしいルールの紹介記事です。このルールを実際に運用するためのコツについても、基本ルールの派生という形で紹介します。

想定する環境

この記事は、社内に Wiki があって、フォーマットが決まったドキュメント(仕様書や手順書など)とは別に、各人がメモを自由に書いて共有できるような環境を想定しています。

Wiki の種類は問いません。PukiWiki、MediaWiki、Confluence、Esa、Redmine などのプロジェクト管理ツール付属の Wiki などなど……何でもいいです。Word 文書などにも適用できなくはないですが、文書を気軽に分けるのが難しい場面には、あまり向かないかと思います。

ルール:「このページについて」という欄をページの先頭に用意し、そのページの概要を1〜2文で書く

守ってほしいルールはこれだけです。単純ですが強力で、きちんと守ろうとすると意外と面倒なルールでもあります。

例えば、何かのインストール作業をメモしておきたい場合、「このページについて」欄には以下のような文章を書きます。

## このページについて

これは、システムAの開発環境を構築するために、Windows 7 にソフトウェアBをインストールした際のメモです。

## 手順

(これ以降に、実際に実行した手順のメモ)

または、何か新機能の実装をしたくて、事前に設計を検討した場合は、こんな感じで書きます。

## このページについて

これは、機能Cの設計についてチーム内で議論するために、個人的に作成した設計案です。設計の事前知識として、既存のソースコードを調査した結果も含めています。

## 設計案

(これ以降に設計案とソースコードの解説)

Wikipedia の記事は、よく「〜とは、…である。」という一文から始まりますが、それをイメージしてもらえると分かりやすいかもしれません。

同じような内容がページの先頭に書いてあれば、この欄の名前は「このページについて」にしなくても、「概要」でも "About this page" でもなんでも良いです。

このルールの目的

このルールを徹底する目的は、そのページを開いた人に 「このページに、自分がいま知りたい情報が書かれているか?」を瞬時に判断する ための情報を与えることです。

社内 Wiki を実際に使っている人なら、このページにはこんなことが書いてあるだろうなーと思って読み始めたら、実際は全然違うことが書かれていた、という経験が何度もあると思います。例えば、こんな感じです。

  • インストール手順書かと思って読んでいたら、インストール時に試したことを適当な順序で書いてあるだけのメモだった。書いてある手順を上から順に実行したところ、途中で失敗してしまった。で、メモにも「このコマンドは失敗した」とか書いてある。
  • ある機能の設計に関するメモかと思ったら、単なる設計案の一つだった。最終的に実装されたソースコードの設計とは全く一致していなかった。

誤解を避けるために強調しておくと、私は、中途半端なメモを Wiki に書かないで欲しい、とか、そういう中途半端なメモは消して欲しい、とか言いたいわけではありません。

ただ、そのページがどういう状態なのかが明示されていないと、

あるページAを、ここには正確な情報が書いてある、と思って読み始める
↓
期待を裏切られる
↓
別のページBを、ここには正確な情報が書いてある、と思って読み始める
↓
期待を裏切られる
↓
この Wiki に書いてある情報はどれも信用できない! となって Wiki を読まなくなる

という悪循環を起こしかねません。そうなると他の人がいくら Wiki に有効な情報を書いていっても、その情報は読まれない、というもったいないことになってしまいます。

ページの先頭にただのメモだと明示してあれば、読み手が 「他に有力な情報源が見当たらないので、これを頑張って読み解こう」 と判断することも十分ありえますし、結果として必要な情報が書いてなくても、期待を大きく裏切ることにはなりません。Wiki 全体でそういう状態を保ち、モラルハザードを防ぐ……というのがこのルールの目的です。

以下は、このルールを実際に運用するために守ったほうがよい、このルールの派生系です。

派生(1):ページを更新し終わったタイミングで「このページについて」の内容を見直す

ページの内容を色々書いているうちに、最初に書こうと思っていた内容とはズレていってしまう、ということはよくあると思います。理想的には、内容がズレた時点で直したほうがよいのですが、少なくともページを更新し終わったタイミングでは必ず直すようにしましょう。

例えば、設計案を色々書いていたつもりが、最終的な設計についてもそのページに書いてしまい、他のページにはそこまで詳しい情報がまだ書かれてない、という状態になったとします。

もちろん最終的な設計のページをきちんと用意したほうが良いですが、「このページについて」欄を以下のように直すだけでも、後から読む人にとっては役立ちます。

## このページについて

これは、機能Cの設計についてチーム内で議論するために、個人的に作成した設計案です。最終的に採用したのは案3で、このページに書いてある案3の詳細は、最終的な設計と同じです。

派生(2):「このページについて」から外れる内容が増えてきたら、別のページに分ける

書いているうちに「このページについて」には書いてない内容が増えてきた、あるいは内容に合わせて「このページについて」を直したら1〜2行では収まらなくなった、という場合にはページを分けましょう。

例えば、あるシステムAの開発環境構築手順書を作っているうちに、内容が膨らんできて、「このページについて」欄が、

## このページについて

これは、システムAの開発環境を構築するための手順書です。このシステムのサブシステムB、C、Dの知識がないと、この手順は意味不明に見えるかもしれません。そのため、このシステムのサブシステムB、C、Dの関係についても、構築手順書の途中で説明します。

のようになったら、恐らくそのサブシステムB、C、Dの解説は別のページに分けたほうがいいでしょう。そして、以下のようにリンクを書くだけにするのが無難です。

## このページについて

これは、システムAの開発環境を構築するための手順書です。これらの手順が何をしているか理解するためには、このシステムAの構成に関する知識が必要です。システムAのサブシステムについての知識がない場合は、必ず先に [サブシステムの解説](http://wiki.example.com/subsystem) を読んでください。

そうしないと、読み手が、1個の長いページのなかで、自分の知りたい情報を探しまわる羽目になってしまいます。

別のページに分けたら読まれないかも?と思うかもしれませんが、そういう人は、同じページ内に書いてあってもどうせ読みません。それなら、読む気のある人向けに、少しでもわかりやすくしたほうが良いです。

派生(3):他の人が書いた「このページについて」も直していい

他の人が書いたページでも、「このページについて」に書かれた内容と、本文が違っていたら、積極的に直しましょう。また、そもそも「このページについて」欄が無かったら追加しましょう。

基本ルールに書いた通り、目的は Wiki 全体の利用価値向上、モラルハザード防止です。そのためには、内容は直さずに、「このページについて」欄を直すだけでも十分意味があります。

例えば、あるページを読んで、その内容が間違っていた場合、私だったらこういう文章をページの冒頭に書いて、内容は修正も削除もせずに残しておきます。

## このページについて

私(吉澤)が 2016-04-25 に調べた範囲では、このページに書かれた設計は、現在の実装と違っているようです。そのため、設計情報としては参考にしないことを推奨します。

ただ、当時の設計思想など、何かの参考になる可能性を否定しきれないため、削除せずに残しておきます。

派生(4):本文は頑張らない

Wiki に書かれた情報の品質は高ければ高いほど良いですが、敷居が上がって、Wiki に書くのが一人か二人だけ、になってしまうのも困ります。そのため、「このページについて」欄以外のルールはあまり多くしないほうが良いでしょう。

また、一般的な話として、「他人のために細かく書いてあげている」というつもりで書いていると、読んでもらえなかったときの心理的ダメージが大きいです。そのため、手間のかかる本文の品質は、自分が後で読んで困らない程度にしておくことをお勧めします。

まとめ

今回は、普段はあまり書かないタイプの、仕事のノウハウについての記事でした。

私は自他共に認めるメモ魔で、やたらあちこちにメモを書き散らしています。そうやって書き溜めているメモがたまに同僚から「役立った」と言ってもらえることもあるのですが、それと同時に「自分はそこまでできないし、やらない」という反応をされることが多かったりします。

書く分量が少なくても、少しの工夫で、後から読んで役立つような文章を書くことはできるはずなんだけどなあ……と思い、自分なりに本質的と考える部分をまとめてみました。Wiki も普及してだいぶ経つので、今更感のある話題ではありますが、他の人のノウハウも色々伺ってみたいところです。

Swim.com の workout データをエクスポートする方法

Swim.com 活用のすすめ

自分以外に使っている人を見たことないですが、Swim.com というスイマー用の SNS があります。

この SNS は、泳いだ距離や速さを自動的に記録できるスイムウォッチや、Swim.com アプリが動くスマートウォッチに対応しており、スイマー同士でスコアを競えるようになってます(参考:Swim.com に対応したスイムウォッチ一覧)。まあ、SNS ですけど、自分のデータをアップロードして、確認するためだけにも使えます。

僕は、去年の9月までは Pebble Time で水泳データを記録していたのですが、10月以降はより高機能な Garmin Swim というスイムウォッチに乗り換えました。Garmin Swim のデータは Garmin Connect というサイトで確認できるのですが、以前のタイムとも比較したいので、Garmin Connect のデータを Swim.com にも連携させています。

図にすると、以下のような感じです。

f:id:muziyoshiz:20160422003752p:plain

このように連携させることで、以下の Workout 画面(https://www.swim.com/my-workouts/)で、Pebble Time の測定値と Garmin Swim の測定値を一覧できます。

f:id:muziyoshiz:20160422002408p:plain

今回の目的

Swim.com で一覧表示はできるんですが、1回あたりに泳いだ距離や、泳ぐペースの変化をグラフ表示するような機能はありません。また、Garmin Connect には CSV エクスポート機能があるのですが、Swim.com の方には CSV エクスポート機能も API もなんにもありません。厳しい……。

異なるスイムウォッチで測定した結果をなんとかグラフ化したいと思い、Workout 画面のデータをエクスポートする処理を実装しました。

採用した方法:JSON を手作業でテキストファイルに貼り付けて、Ruby のスクリプトで CSV に変換

色々試したのですが、Swim.com はログイン機能が特殊で、単にユーザ名とパスワードを POST するだけではログインできそうになかったので、自動化は早々に諦めました。厳しい……。

その一方で、一度ログインしてしまえば、Workout 画面の情報は GET で簡単に取得できることがわかりました。Workout 画面は最新の10件のデータを表示し、ページの末尾に到達すると次の10件を読み込むのですが、内部的には以下の URL に GET でアクセスしていました。

https://www.swim.com/workout/listing/search?pageIndex=1&pageSize=10&orderBy=workoutdate&orderDir=DESC&_=1461254054552

ざっと試した感じでは、この pageSize を 10 から 1000 に変えれば 1000 件取得できるし、orderDir を DESC から ASC に変えれば昇順に取得できるみたいです。やさしい……。

というわけで、こんな手順でエクスポートすることにしました。

  1. Web ブラウザで Swim.com にログインする
  2. Web ブラウザで https://www.swim.com/workout/listing/search?pageIndex=1&pageSize=1000&orderBy=workoutdate&orderDir=ASC にアクセスする
  3. 2 で表示された JSON を swim_com.json にコピペする
  4. swim_com.json と同じディレクトリに swim_com_parser.rb を置いて、ruby swim_com_parser.rb を実行する
  5. パース結果が swim_com.csv に出力されるので、このファイルをよしなにする

swim_com_parser.rb は Gist で公開しておきました。ご参考ください。

JSON Parser for swim.com workout data

関連記事

qiita.com