FANCOMI Ad-Tech Blog

株式会社ファンコミュニケーションズ nend・新規事業のエンジニア・技術ブログ

MongoDBとHiveでごにょごにょ事始め

こんにちは、s_mamedaifukuです。 先頭の ”s" は「塩」の ”s" です。

MongoDBとHiveの連携を試みている今日この頃です。

各ソフトウェアのバージョンは MongoDB : 2.6.1 Hadoop(EMR) : 1.0.3 Hive : 0.11.0.1 です。

HiveからMongoDBのデータを操作する

「MongoDB Java Driver 」と「MongoDB Connector for Hadoop」を使うことで、MongoDBをHadoopの入出力先とすることが出来ます。

今回は、mongo-java-driver-2.12.3.jar、mongo-hadoop-core-1.3.0.jarの2つ、そしてHiveを使用するためmongo-hadoop-hive-1.3.0.jarを加えた3つのJarファイルを予めダウンロードしS3へ置いておきます。

EMRを使う場合は、bootstrapスクリプトで3つのJarファイルを/home/haodoop/libへコピーします。 bootstrap.sh

[shell]

!/bin/sh

for JAR in mongo-java-driver-2.12.3.jar mongo-hadoop-core-1.3.0.jar mongo-hadoop-hive-1.3.0.jar do if [ ! -f /home/hadoop/lib/$JAR ]; then hadoop fs -copyToLocal s3://<bukect_name>/lib/$JAR /home/hadoop/lib/ fi done [/shell]

bootstrapスクリプトを指定して、EMRクラスタ立ち上げます。ここではインタラクティブモードで立ち上げています。

[shell] $ elastic-mapreduce \ --create \ --name mongodb_test \ --num-instances <num_instance> \ --instance-type <instance_type> \ --bootstrap-action s3://<bucket_name>/emr/script/bootstrap.sh \ --hive-versions 0.11.0.1 \ --hadoop-version 1.0.3 \ --hive-interactive \ --alive [/shell]

MongoDBのコレクションに対応するテーブルをHive上に作成します。

[shell] hive > CREATE EXTERNAL TABLE users ( > h_name STRING, > h_number INT, > h_created TIMESTAMP > ) > STORED BY 'com.mongodb.hadoop.hive.MongoStorageHandler' > WITH SERDEPROPERTIES('mongo.columns.mapping'='{"h_name":"name","h_number":"number","h_created":"created"}') > TBLPROPERTIES('mongo.uri'='mongodb://<mongodb_server>:27017/test.users'); [/shell]

カラム名に「h_」とプレフィックスを付けていますが、MongoDBのコレクションのフィールド名と揃えてしまっても問題ありません。 その場合は「 WITH SERDEPROPERTIES」の部分は必要なくなります。

ここでMongoDBにテストデータを保存してみると、

[shell] $ mongo --quiet test << EOS > for (var i = 1; i<= 1000 ; i++) { > var user = { > name : "name" + i , > number : i , > created : new Date() > }; > > db.users.save(user); > } > EOS [/shell]

Hive上からも確認できるようになりました。

[sql] hive> SELECT * FROM users; OK name1 1 2014-07-28 10:18:19.764 name2 2 2014-07-28 10:18:19.771 name3 3 2014-07-28 10:18:19.772 name4 4 2014-07-28 10:18:19.773 name5 5 2014-07-28 10:18:19.773 ... [/sql]

MongoDBのISODate型とHiveのTIMESTAMP型が対応するようになっています。 ログやHive上で時刻をJSTで扱っている場合などは注意が必要です。

以上でMongoDBのデータをHiveから操作できるようになりました。

S3上のログデータをMongoDBへ保存する

実運用ではfluent-plugin-mongoを使い、MongoDBへログデータを流し込んでいますが、 fluent-plugin-s3でS3に蓄積しておいたログにHiveで処理を加え、MongoDBへ保存するというのもありがちなパターンです。

元になるログデータはこんな感じです

[shell] 2014-07-25T18:00:00+09:00 data.nend.conversion {"name":"name1","number":1,"address":"address1","age":10,"created":"2014-07-25 18:00:00"} 2014-07-25T18:00:10+09:00 data.nend.conversion {"name":"name2","number":2,"address":"address2","age":20,"created":"2014-07-25 18:00:10"} 2014-07-25T18:00:20+09:00 data.nend.conversion {"name":"name3","number":3,"address":"address3","age":30,"created":"2014-07-25 18:00:20"} 2014-07-25T18:00:30+09:00 data.nend.conversion {"name":"name4","number":4,"address":"address4","age":40,"created":"2014-07-25 18:00:30"} 2014-07-25T18:00:40+09:00 data.nend.conversion {"name":"name5","number":5,"address":"address5","age":50,"created":"2014-07-25 18:00:40"} ... [/shell]

S3上のログをHiveから読み込むために外部テーブルを作成します。 1時間毎にパスが分かれていることを想定して、テーブルでも1時間毎にパーティションを作ってみます。

[sql] CREATE EXTERNAL TABLE IF NOT EXISTS user_logs ( time STRING, tag STRING, record STRING ) PARTITIONED BY ( dt STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'; [/sql]

処理対象時間のパーティションを作成すると、Hiveからログを認識できるようになります。

[sql] ALTER TABLE user_logs ADD IF NOT EXISTS PARTITION ( dt = "2014072518" ) LOCATION "s3://<bucket_name>/log/user/2014/07/25/18"; [/sql]

実際はHive上でもっとごにょごにょ感のある処理をしそうなものですが、 ここでは簡単に、ログデータのJSONを読み込んで必要なデータをusersテーブルにINSERTします。 MongoDBと関連付けられたHiveのテーブルに対してはINSERT OVERWRITEをしてもINSERT INTOの挙動となるようです。

[sql] INSERT INTO TABLE users SELECT j.name, j.number, j.created FROM user_logs LATERAL VIEW json_tuple( user_logs.record, 'name', 'number', 'created' ) j AS name, number, created WHERE dt = "2014072518"; [/sql]

実行後、MongoDBからデータの存在を確認できます。

[shell] mongo> db.users.find() { "id" : ObjectId("53d62f9ce4b047e6d515d5fe"), "name" : "name1", "number" : 1, "created" : ISODate("2014-07-25T09:00:00Z") } { "id" : ObjectId("53d62f9ce4b047e6d515d5ff"), "name" : "name2", "number" : 2, "created" : ISODate("2014-07-25T09:00:10Z") } { "id" : ObjectId("53d62f9ce4b047e6d515d600"), "name" : "name3", "number" : 3, "created" : ISODate("2014-07-25T09:00:20Z") } { "id" : ObjectId("53d62f9ce4b047e6d515d601"), "name" : "name4", "number" : 4, "created" : ISODate("2014-07-25T09:00:30Z") } { "_id" : ObjectId("53d62f9ce4b047e6d515d602"), "name" : "name5", "number" : 5, "created" : ISODate("2014-07-25T09:00:40Z") } ... [/shell]

データ量を増やしていった時のパフォーマンスや MongoDBに作成してあるインデックスのHive上での振る舞いなど 興味深い点はまだまだたくさんありますね。今後の研究課題としたいと思います。