FANCOMI Ad-Tech Blog

株式会社ファンコミュニケーションズ nend・新規事業のエンジニア・技術ブログ

Hadoop Streamingで、MapReduceをPHPで実装する

はじめに

こんにちは、t_oookaです。 今回は、Hadoop Streamingを使用して、PHPでmap,reduce処理を実装してみます。

Hadoop Streamingとは、hadoopが提供するユーティリティで、当該ユーティリティを使用することでJava以外の任意の言語やコマンド(標準入力、標準出力を扱える)などでmap,reduce処理を記述することができるようになります。

環境

構成

完全分散モードで、管理ノード(hadoop-master)1台、計算ノード(hadoop-slave)1台及びクライアント(hadoop-client)1台の合計3台の構成です。

なお、ジョブの投入などはクライアント(hadoop-client)から実行することとします。

Hadoop Streamingの実行方法

[bash] $ hadoop \ jar /path/to/hadoop-streaming.jar \ -files /path/to/sample_mapper.php,/path/to/sample_reducer.php \ -input /path/to/input_dir \ -output /path/to/output_dir \ -mapper mapper_script_name \ -reducer reducer_script_name [/bash]

Hadoop Streamingを利用する場合は、hadoop jarコマンドに「hadoop-streaming.jar」を指定します。

その他のオプションについては、以下の通りです。

オプション内容
fileshadoopクラスタへコピーするファイルを指定します。複数指定する場合は、「,」で区切ります
input解析対象のファイル名または、ディレクトリ名を指定します
output解析結果を出力するディレクトリ名を指定します
mappermap処理を実装したスクリプトをなどを指定します
reducerreduce処理を実装したスクリプトなどを指定します

PHPによるMapReduceの実装

今回は、AWSのELBのアクセスログからELBのステータスコード毎の件数を集計する処理を実装して動作を確認します。

ELBのアクセスログは、1行がスペースを区切り文字として13項目で構成されます。 ※ 詳細はElastic Load Balancing 開発者ガイド アクセスログをご参照ください。

map処理の実装

map処理では、hadoopの仕組みにより標準入力へELBのアクセスログが行単位で渡されてくるため、1行ごとに標準入力より読み取り、"ステータスコード"<TAB>"件数"を出力するようにしています。 ※ 今回の場合は、アクセスログ1行につき1件のため"件数"は1固定にしています。

sample_mapper.php [php]

!/usr/bin/php

<?php while( ! feof(STDIN) ) { // ELBのアクセスログ1行分を標準入力から読込 $line = trim(fgets(STDIN)); if($line == '') continue;

// スペースを区切り文字として、項目を分割して配列へ格納 $values = explode(" ", $line); // 8項目目の「elb_status_code」をステータスコードとして"ステータスコード"<TAB>"1"として出力 printf("%s\t1\n", $values[7]); } ?> [/php]

reduce処理の実装

reduce処理では、map処理での出力が"ステータスコード"でソートされた状態で、標準入力に渡されてきます。 今回はステータスコード毎の件数を取得したいため、単純に件数分を合計する実装としています。

sample_reducer.php [php]

!/usr/bin/php

<?php

$current_code = null; $total = 0;

while ( ! feof(STDIN) ) { // map処理の出力 "ステータスコード"<TAB>"1" $line = trim(fgets(STDIN)); if($line == '') continue;

list($code, $count) = explode("\t", $line); if ($current_code != null && $current_code != $code) { printf("%s\t%d\n", $current_code, $total); $current_code = $code; $total = $count; } else { $current_code = $code; $total = $total + $count; } } printf("%s\t%d\n", $current_code, $total); ?> [/php]

hadoop-streamingの動作イメージは第28回 RubyとHadoopで分散処理 Hadoop Streamingの仕組みに、わかりやすい解説がありますので、こちらをご参照ください。

動作確認

Hadoop Streamingで、MapReduce処理を実行してみます。

S3上に保存されたELBのアクセスログ(100ファイル、8,014,771行)を解析し、結果をHDFS上へ出力します。

[bash] $ hadoop \ jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \ -files /tmp/elblogs/sample_mapper.php,/tmp/elblogs/sample_reducer.php \ -input s3n://hadoop-streaming/input \ -output output/elblogs/php \ -mapper sample_mapper.php \ -reducer sample_reducer.php [/bash]

解析結果を確認します。

[bash] $ hadoop fs -ls 'output/elblogs/php/' Found 2 items -rw-r--r-- 3 hdfs hadoop 0 2014-09-25 08:35 output/elblogs/php/_SUCCESS -rw-r--r-- 3 hdfs hadoop 44 2014-09-25 08:35 output/elblogs/php/part-00000 $ sudo -u hdfs hadoop fs -cat 'output/elblogs/php/part-*' 200 7998596 302 15629 400 4 403 291 404 251 [/bash]

おまけ

hadoopからAmazon S3を利用する

hadoopで使用するファイルシステムHDFSに限定されず、Amazon S3も利用できるようになっています。 S3を利用するために、以下の設定を追加しています。

/etc/hadoop/conf/core-site.xml [xml] <configuration> ...(省略)... <property> <name>fs.s3n.awsAccessKeyId</name> <value>${s3のアクセスキーID}</value> </property> <property> <name>fs.s3n.awsSecretAccessKey</name> <value>${s3のシークレットキー}</value> </property> </configuration> [/xml]

Javaで実装した場合とのパフォーマンスを検証してみる

timeコマンドよる処理時間の比較

PHP(Hadoop Streaming)の場合の処理時間

回数realusersys
16m 0.508s2m46.480s2m29.841s
25m56.944s2m48.870s2m27.128s
35m48.787s2m49.020s2m28.295s
46m 1.281s2m46.608s2m29.689s
55m58.266s2m47.257s2m28.369s

Javaの場合の処理時間

回数realusersys
13m19.946s1m49.552s0m20.004s
22m34.365s1m45.980s0m19.789s
32m48.140s1m46.779s0m19.896s
42m56.134s1m48.744s0m19.982s
52m36.993s1m48.872s0m19.816s

Javaでの実装

com.fancs.adn.StatusCodeCounter.java [java] package com.fancs.adn;

import java.io.IOException; import java.util.Iterator;

import org.apache.commons.lang.StringUtils;

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class StatusCodeCounter {

public static void main(String[] args) {   boolean result = false; try { // ジョブの設定 Configuration conf = new Configuration(); Job job = new Job(conf, "StatusCodeCounter"); job.setJarByClass(StatusCodeCounter.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // map処理を実装したクラスの指定 job.setMapperClass(MapperForStatusCodeCounter.class); // reduce処理を実装したクラスを指定 job.setReducerClass(ReducerForStatusCodeCounter.class); // map処理の入力元を引数から取得して指定する FileInputFormat.addInputPath(job, new Path(args[0])); // reudce処理の出力先を引数から取得して指定する FileOutputFormat.setOutputPath(job, new Path(args[1])); // ジョブの実行を開始する result = job.waitForCompletion(true); } catch(Exception e) { e.printStackTrace(); } finally { System.exit(result ? 0 : 1); } }

public static class MapperForStatusCodeCounter extends Mapper<LongWritable, Text, Text, IntWritable> {

private Text outKey = new Text();
private IntWritable outValue = new IntWritable(1);

@Override
protected void map(LongWritable key, Text value, Context context)
  throws IOException, InterruptedException {
  // ELBのアクセスログ1行を文字列として取得
  String line = value.toString();
  if(StringUtils.isEmpty(line)) {
    return;
  }
  line = line.trim();
  // スペースを区切り文字として、各項目を配列へ格納
  String[] items = line.split(&quot;\\s&quot;);
  // 8項目目のELBのステータスコードをキーとして設定
  outKey.set(items[7]);
  // key : value = &quot;ステータスコード&quot; : 1 として出力
  context.write(outKey, outValue);
}

}

public static class ReducerForStatusCodeCounter extends Reducer<Text, IntWritable, Text, IntWritable> {

Text outKey = new Text();
IntWritable outValue = new IntWritable();

@Override
protected void reduce(Text key, Iterable&lt;IntWritable&gt; values, Context context)
  throws IOException, InterruptedException {

  // ステータスコードをキーとしてそのまま設定
  outKey.set(key);
  int count = 0;

  /*
   * ステータスコードに対する件数が、[1, 1, 1, ....]の形式で
   * 渡されてくるため、件数分イテレートして件数をカウント
   */
  Iterator&lt;IntWritable&gt; ite = values.iterator();
  while(ite.hasNext()) {
    int c = ite.next().get();
    count += c;
  }

  outValue.set(count);
  // key : value = &quot;ステータスコード&quot; : &quot;合計&quot;として出力
  context.write(outKey, outValue);
}

} } [/java]

パッケージング(jarの生成)

hadoopで実行するため、Javaによる実装をjarファイルへパッケージングします。 上記のJavaコンパイル及びパッケージングにApache Antを利用しました。

build.xml [xml] <?xml version="1.0" encoding="UTF-8"?> <project name="hadoop-streaming-sample" basedir="." default="package">

<property name="project" value="hadoop-streaming-sample" /> <property name="version" value="0.0.1" /> <property name="src.dir" value="src/main/java" /> <property name="dest.dir" value="target/classes" />

<!-- クラスパスの設定 --> <path id="classpath"> <fileset dir="/usr/lib/hadoop/client"> <include name="*.jar" /> </fileset> <pathelement location="/usr/share/java/commons-lang.jar" /> </path>

<!-- パッケージング(jarの生成) --> <target name="package" depends="compile"> <jar destfile="${project}-${version}.jar" basedir="${dest.dir}" /> </target>

<!-- Javaコンパイル --> <target name="compile" depends="clean"> <mkdir dir="${dest.dir}" /> <javac srcdir="${src.dir}" destdir="${dest.dir}"> <classpath> <path refid="classpath" /> </classpath> </javac> </target>

<target name="clean"> <delete dir="${dest.dir}" /> <delete file="${project}-${version}.jar" /> </target> </project> [/xml]

antでパッケージングを実行します。

[bash] $ ant Buildfile: build.xml

clean: [delete] Deleting directory /path/to/projects/hadoop-streaming-sample/target/classes [delete] Deleting: /path/to/projects/hadoop-streaming-sample/hadoop-streaming-sample-0.0.1.jar

compile: [mkdir] Created dir: /path/to/projects/hadoop-streaming-sample/target/classes [javac] Compiling 2 source files to /path/to/projects/hadoop-streaming-sample/target/classes [javac] Note: Some input files use or override a deprecated API. [javac] Note: Recompile with -Xlint:deprecation for details.

package: [jar] Building jar: /path/to/projects/hadoop-streaming-sample/hadoop-streaming-sample-0.0.1.jar

BUILD SUCCESSFUL Total time: 1 second [/bash]

実行

[bash] $ hadoop jar hadoop-streaming-sample-0.0.1.jar \ com.fancs.adn.StatusCodeCounter \ s3n://hadoop-streaming/input \ output/elblogs/java [/bash]

Javaで実装した場合は、hadoop jarコマンドにパッケージングしたjarファイルを指定します。 そのあとの引数は、順に、ジョブの設定を記述したmainメソッドをもつクラス名、map処理に入力元、reduce処理の出力先を指定します。

所感

Hadoopというと、なぜかJavaで実装しなくてはというような思い込みがありmap,reduce処理を実装するのはちょっと...面倒という印象がありましたが、Hadoop Streamingを使用すれば、簡単なものであればシェルやコマンドで十分実装できてしまうので、お手軽に分散処理を実装したい場合は、アリだなと思いました。

今回の場合は、思っていた以上に処理時間の差がでてしまったのですが、全てがHadoop Streamingの仕組み上のオーバヘッドなのかどうかはソースコードを調べるなど今後の課題にしたいと思います。 オーバヘッドなしで利用したいのであれば、Javaで、ある程度のオーバヘッドが許容できるのであればHadoop Streamingを選択するなどの考え方ができそうです。

参考