ビッグデータを分散処理する手法として、とても有名です。いまさら感は否めないのですが、この手法をとても身近なやり方でわかりやすく実装しているプログラムを見つけましたので、取り上げてみたいと思います。
その名は、bashreduce。文字通り、Linuxのシェルスクリプト(Bash)で、MapReduceを実現しています。
https://github.com/rcrowley/bashreduce
map/reduceができるという興味以上に、コードの中身がすごい。Unixのパイプ処理のすさまじさを体験できます。
ということで、注目する部分をピックアップして、コードリーディングをしてみたいと思います。
https://github.com/rcrowley/bashreduce/blob/master/br
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
mkfifo $jobpath/{in,out}/$host_idx # Listen for work (remote) ssh -n $host "mkdir -p $nodepath/" pid=$(ssh -n $host "nc -l -p $port_out >$nodepath/$host_idx \ 2>>$tmp/br_stderr </dev/null & jobs -l" \ | awk {'print $2'}) # Do work (remote) ssh -n $host "tail -s0.1 -f --pid=$pid $nodepath/$host_idx \ 2>>$tmp/br_stderr </dev/null \ | LC_ALL='$LC_ALL' sort -S$sort_mem -T$tmp -k$column,$column \ 2>>$tmp/br_stderr \ $map $reduce \ | nc -q0 -l -p $port_in >>$tmp/br_stderr &" # Send work (local) nc $host $port_in >$jobpath/in/$host_idx & # Receive results (local) nc -q0 $host $port_out <$jobpath/out/$host_idx & out_files="$out_files $jobpath/out/$host_idx" # ++i port_in=$(($port_in + 2)) port_out=$(($port_in + 1)) host_idx=$(($host_idx + 1)) |
このプログラムの前準備の部分です。(ループ内部)
リモートホスト$hostは何台でも増設ができるようになっており、リモートホストにはSSHでパスワードなしでログインできるようにしておくこと以外、何も準備する必要はありません。
1 |
ssh-keygen |
これで、生成される.ssh/id_rsa.pubをリモードの.ssh/authorized_keysに追加します。
・名前付きパイプ
・netcat(nc)によるバックドア
この二つを駆使して、リモートへの接続、リモートでのコマンド実行、結果の取得をしています。ローカル・リモートホスト間は、ホストの数だけそれぞれ送受信二つずつコネクションをはります。
(tail –pidで、プロセスが死んだときに終了する部分、こうやって使うのか・・フムフム)
ローカル > リモート > ローカル とパイプ接続がされた状態にして、データ入力を待ちます。
次は、データを分散して書き込むところです。(mapに相当)
https://github.com/rcrowley/bashreduce/blob/master/brutils/brp.c
1 |
fputs(line.buf, pouts[fnv_hash(line.col_beg, line.col_end) % pouts_len]); // write it to the correct file |
https://github.com/rcrowley/bashreduce/blob/master/brutils/brutils.h
1 2 3 4 5 6 |
unsigned int fnv_hash(const char *p, const char *end) { unsigned int h = 2166136261UL; for (; p != end; ++p) h = (h * 16777619) ^ *p; return h; } |
fnv_hash関数でデータ内容からハッシュ値を生成して、それを出力ファイル数pouts_len分に振り分けます。この出力ファイルというのは名前付きパイプです。ここにデータを流し込むと、リモードに送られるしくみです。ちなみにfnvというのは、
Fowler-Noll-Vo hash function
という意味で、簡単なアルゴリズムでハッシュをつくるしくみのようです。
http://en.wikipedia.org/wiki/Fowler%E2%80%93Noll%E2%80%93Vo_hash_function
http://wowdev.jp/?p=873
これにより、同じデータが違うサーバに振り分けられることがなくなり、独立性がたもたれ、処理効率があがります。ここが肝なのでしょう。
最後はデータをまとめるところです。(reduceに相当)
1 2 |
eval "$BRM - $(($column - 1)) $(find $jobpath/in/ -type p | xargs) \ ${output:+| pv >$output}" |
$outputに使われている、コロン演算子が標準出力かファイルかの切り替えをシンプルにしています。
bashreduceはオリジナルや他のバージョンもあるのですが、このバージョンがわかりやすかったので、使いました。実行は、Ubuntuがよさそうです。
Cのプログラムがない場合もLinuxコマンドで代替えしています。
とても勉強になるコードでした。(ほとんど個人メモ)