はじめに
こんにちは、t_oookaです。 今回は、Hadoop Streamingを使用して、PHPでmap,reduce処理を実装してみます。
Hadoop Streamingとは、hadoopが提供するユーティリティで、当該ユーティリティを使用することでJava以外の任意の言語やコマンド(標準入力、標準出力を扱える)などでmap,reduce処理を記述することができるようになります。
環境
構成
完全分散モードで、管理ノード(hadoop-master)1台、計算ノード(hadoop-slave)1台及びクライアント(hadoop-client)1台の合計3台の構成です。
なお、ジョブの投入などはクライアント(hadoop-client)から実行することとします。
Hadoop Streamingの実行方法
[bash] $ hadoop \ jar /path/to/hadoop-streaming.jar \ -files /path/to/sample_mapper.php,/path/to/sample_reducer.php \ -input /path/to/input_dir \ -output /path/to/output_dir \ -mapper mapper_script_name \ -reducer reducer_script_name [/bash]
Hadoop Streamingを利用する場合は、hadoop jarコマンドに「hadoop-streaming.jar」を指定します。
その他のオプションについては、以下の通りです。
オプション | 内容 |
---|---|
files | hadoopのクラスタへコピーするファイルを指定します。複数指定する場合は、「,」で区切ります |
input | 解析対象のファイル名または、ディレクトリ名を指定します |
output | 解析結果を出力するディレクトリ名を指定します |
mapper | map処理を実装したスクリプトをなどを指定します |
reducer | reduce処理を実装したスクリプトなどを指定します |
PHPによるMapReduceの実装
今回は、AWSのELBのアクセスログからELBのステータスコード毎の件数を集計する処理を実装して動作を確認します。
ELBのアクセスログは、1行がスペースを区切り文字として13項目で構成されます。 ※ 詳細はElastic Load Balancing 開発者ガイド アクセスログをご参照ください。
map処理の実装
map処理では、hadoopの仕組みにより標準入力へELBのアクセスログが行単位で渡されてくるため、1行ごとに標準入力より読み取り、"ステータスコード"<TAB>"件数"を出力するようにしています。 ※ 今回の場合は、アクセスログ1行につき1件のため"件数"は1固定にしています。
!/usr/bin/php
<?php while( ! feof(STDIN) ) { // ELBのアクセスログ1行分を標準入力から読込 $line = trim(fgets(STDIN)); if($line == '') continue;
// スペースを区切り文字として、項目を分割して配列へ格納 $values = explode(" ", $line); // 8項目目の「elb_status_code」をステータスコードとして"ステータスコード"<TAB>"1"として出力 printf("%s\t1\n", $values[7]); } ?> [/php]
reduce処理の実装
reduce処理では、map処理での出力が"ステータスコード"でソートされた状態で、標準入力に渡されてきます。 今回はステータスコード毎の件数を取得したいため、単純に件数分を合計する実装としています。
!/usr/bin/php
<?php
$current_code = null; $total = 0;
while ( ! feof(STDIN) ) { // map処理の出力 "ステータスコード"<TAB>"1" $line = trim(fgets(STDIN)); if($line == '') continue;
list($code, $count) = explode("\t", $line); if ($current_code != null && $current_code != $code) { printf("%s\t%d\n", $current_code, $total); $current_code = $code; $total = $count; } else { $current_code = $code; $total = $total + $count; } } printf("%s\t%d\n", $current_code, $total); ?> [/php]
※ hadoop-streamingの動作イメージは第28回 RubyとHadoopで分散処理 Hadoop Streamingの仕組みに、わかりやすい解説がありますので、こちらをご参照ください。
動作確認
Hadoop Streamingで、MapReduce処理を実行してみます。
S3上に保存されたELBのアクセスログ(100ファイル、8,014,771行)を解析し、結果をHDFS上へ出力します。
[bash] $ hadoop \ jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \ -files /tmp/elblogs/sample_mapper.php,/tmp/elblogs/sample_reducer.php \ -input s3n://hadoop-streaming/input \ -output output/elblogs/php \ -mapper sample_mapper.php \ -reducer sample_reducer.php [/bash]
解析結果を確認します。
[bash] $ hadoop fs -ls 'output/elblogs/php/' Found 2 items -rw-r--r-- 3 hdfs hadoop 0 2014-09-25 08:35 output/elblogs/php/_SUCCESS -rw-r--r-- 3 hdfs hadoop 44 2014-09-25 08:35 output/elblogs/php/part-00000 $ sudo -u hdfs hadoop fs -cat 'output/elblogs/php/part-*' 200 7998596 302 15629 400 4 403 291 404 251 [/bash]
おまけ
hadoopからAmazon S3を利用する
hadoopで使用するファイルシステムはHDFSに限定されず、Amazon S3も利用できるようになっています。 S3を利用するために、以下の設定を追加しています。
/etc/hadoop/conf/core-site.xml [xml] <configuration> ...(省略)... <property> <name>fs.s3n.awsAccessKeyId</name> <value>${s3のアクセスキーID}</value> </property> <property> <name>fs.s3n.awsSecretAccessKey</name> <value>${s3のシークレットキー}</value> </property> </configuration> [/xml]
Javaで実装した場合とのパフォーマンスを検証してみる
timeコマンドよる処理時間の比較
回数 | real | user | sys |
---|---|---|---|
1 | 6m 0.508s | 2m46.480s | 2m29.841s |
2 | 5m56.944s | 2m48.870s | 2m27.128s |
3 | 5m48.787s | 2m49.020s | 2m28.295s |
4 | 6m 1.281s | 2m46.608s | 2m29.689s |
5 | 5m58.266s | 2m47.257s | 2m28.369s |
Javaの場合の処理時間
回数 | real | user | sys |
---|---|---|---|
1 | 3m19.946s | 1m49.552s | 0m20.004s |
2 | 2m34.365s | 1m45.980s | 0m19.789s |
3 | 2m48.140s | 1m46.779s | 0m19.896s |
4 | 2m56.134s | 1m48.744s | 0m19.982s |
5 | 2m36.993s | 1m48.872s | 0m19.816s |
Javaでの実装
com.fancs.adn.StatusCodeCounter.java [java] package com.fancs.adn;
import java.io.IOException; import java.util.Iterator;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class StatusCodeCounter {
public static void main(String[] args) { boolean result = false; try { // ジョブの設定 Configuration conf = new Configuration(); Job job = new Job(conf, "StatusCodeCounter"); job.setJarByClass(StatusCodeCounter.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // map処理を実装したクラスの指定 job.setMapperClass(MapperForStatusCodeCounter.class); // reduce処理を実装したクラスを指定 job.setReducerClass(ReducerForStatusCodeCounter.class); // map処理の入力元を引数から取得して指定する FileInputFormat.addInputPath(job, new Path(args[0])); // reudce処理の出力先を引数から取得して指定する FileOutputFormat.setOutputPath(job, new Path(args[1])); // ジョブの実行を開始する result = job.waitForCompletion(true); } catch(Exception e) { e.printStackTrace(); } finally { System.exit(result ? 0 : 1); } }
public static class MapperForStatusCodeCounter extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text outKey = new Text();
private IntWritable outValue = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// ELBのアクセスログ1行を文字列として取得
String line = value.toString();
if(StringUtils.isEmpty(line)) {
return;
}
line = line.trim();
// スペースを区切り文字として、各項目を配列へ格納
String[] items = line.split("\\s");
// 8項目目のELBのステータスコードをキーとして設定
outKey.set(items[7]);
// key : value = "ステータスコード" : 1 として出力
context.write(outKey, outValue);
}
}
public static class ReducerForStatusCodeCounter extends Reducer<Text, IntWritable, Text, IntWritable> {
Text outKey = new Text();
IntWritable outValue = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
// ステータスコードをキーとしてそのまま設定
outKey.set(key);
int count = 0;
/*
* ステータスコードに対する件数が、[1, 1, 1, ....]の形式で
* 渡されてくるため、件数分イテレートして件数をカウント
*/
Iterator<IntWritable> ite = values.iterator();
while(ite.hasNext()) {
int c = ite.next().get();
count += c;
}
outValue.set(count);
// key : value = "ステータスコード" : "合計"として出力
context.write(outKey, outValue);
}
} } [/java]
パッケージング(jarの生成)
hadoopで実行するため、Javaによる実装をjarファイルへパッケージングします。 上記のJavaのコンパイル及びパッケージングにApache Antを利用しました。
build.xml [xml] <?xml version="1.0" encoding="UTF-8"?> <project name="hadoop-streaming-sample" basedir="." default="package">
<property name="project" value="hadoop-streaming-sample" /> <property name="version" value="0.0.1" /> <property name="src.dir" value="src/main/java" /> <property name="dest.dir" value="target/classes" />
<!-- クラスパスの設定 --> <path id="classpath"> <fileset dir="/usr/lib/hadoop/client"> <include name="*.jar" /> </fileset> <pathelement location="/usr/share/java/commons-lang.jar" /> </path>
<!-- パッケージング(jarの生成) --> <target name="package" depends="compile"> <jar destfile="${project}-${version}.jar" basedir="${dest.dir}" /> </target>
<!-- Javaのコンパイル --> <target name="compile" depends="clean"> <mkdir dir="${dest.dir}" /> <javac srcdir="${src.dir}" destdir="${dest.dir}"> <classpath> <path refid="classpath" /> </classpath> </javac> </target>
<target name="clean"> <delete dir="${dest.dir}" /> <delete file="${project}-${version}.jar" /> </target> </project> [/xml]
antでパッケージングを実行します。
[bash] $ ant Buildfile: build.xml
clean: [delete] Deleting directory /path/to/projects/hadoop-streaming-sample/target/classes [delete] Deleting: /path/to/projects/hadoop-streaming-sample/hadoop-streaming-sample-0.0.1.jar
compile: [mkdir] Created dir: /path/to/projects/hadoop-streaming-sample/target/classes [javac] Compiling 2 source files to /path/to/projects/hadoop-streaming-sample/target/classes [javac] Note: Some input files use or override a deprecated API. [javac] Note: Recompile with -Xlint:deprecation for details.
package: [jar] Building jar: /path/to/projects/hadoop-streaming-sample/hadoop-streaming-sample-0.0.1.jar
BUILD SUCCESSFUL Total time: 1 second [/bash]
実行
[bash] $ hadoop jar hadoop-streaming-sample-0.0.1.jar \ com.fancs.adn.StatusCodeCounter \ s3n://hadoop-streaming/input \ output/elblogs/java [/bash]
Javaで実装した場合は、hadoop jarコマンドにパッケージングしたjarファイルを指定します。 そのあとの引数は、順に、ジョブの設定を記述したmainメソッドをもつクラス名、map処理に入力元、reduce処理の出力先を指定します。
所感
Hadoopというと、なぜかJavaで実装しなくてはというような思い込みがありmap,reduce処理を実装するのはちょっと...面倒という印象がありましたが、Hadoop Streamingを使用すれば、簡単なものであればシェルやコマンドで十分実装できてしまうので、お手軽に分散処理を実装したい場合は、アリだなと思いました。
今回の場合は、思っていた以上に処理時間の差がでてしまったのですが、全てがHadoop Streamingの仕組み上のオーバヘッドなのかどうかはソースコードを調べるなど今後の課題にしたいと思います。 オーバヘッドなしで利用したいのであれば、Javaで、ある程度のオーバヘッドが許容できるのであればHadoop Streamingを選択するなどの考え方ができそうです。