初めまして、弊社で1ヶ月半ほどインターンとして働いているt_sakaiです。
インターンシップの課題として、メモリ使用量が大きくて将来問題になりそうなバッチ処理をスケールアウトできるように書き換えるという課題に取り組みました。 解決手段として流行りのApache Sparkを使ってみたので、本記事ではこれについて書こうと思います。
本記事で使っている言語はScalaです。
Scalaについては、弊社のk_oomoriが書いた記事があるのでよろしければご覧ください。
紹介する内容
前編(本記事)
- 今回解決したかった課題
- なぜSparkを選んだのか
- RDDについて
- 簡単なプログラムをSparkで書き換える
後編
- 本番プログラムをSpark用に書き換える
- はまりどころ
紹介しない内容
- Hadoopとの比較
- Spark環境の構築
- 性能チューニング
解決したかった課題
弊社のScalaで書かれたある毎時のバッチ処理が、80GBほどのメモリを食っていました。消費メモリ量は扱うデータ量の大きさに依存して増加するようだったので、将来的に1台のサーバでは処理しきれなくなる日が来るのは明らかです。
今回の課題は、このバッチ処理は複数台のサーバで分散処理するようなプログラムを書いて、この処理をスケールアウトできるようにすることです。また、毎時行われる処理なので、1時間以内に終わらないといけないという制約もあります。
なぜSparkを選んだのか
分散処理用のソフトウェアはいくつもありますが、だいたい以下がSparkを採用した理由です。
- 処理がオンメモリで行われるため、実行速度が早い
- RDDという大規模データのMap-Reduce処理を行うためのデータ型を用いて、簡単に分散処理を記述できる(後述)
- Scalaで書ける(そもそもSpark自体がScalaで書かれている)
- 最近流行ってる
特に既存のコードをかなり再利用できるという点で、Scalaを使って書けるのが今回はかなり大きな利点でした。
RDDについて
さてここで、Sparkで分散処理を行う上で欠かせない概念、RDD (Resilient Distributed Dataset) について触れたいと思います。
RDDとはSparkライブラリに含まれるクラスです。RDD型として宣言された変数の中身はSparkクラスタ上に分散して保存されます。RDD型はScalaのList型とMap型を合わせたようなもので、様々なメソッド(map, filter, reduce, length...)を持っています。これらのメソッドを用いれば、すべての処理は自動的にSparkクラスタ上で分散処理されるという優れものです。
簡単なMap-Reduce処理をRDDで書いてみましょう。
val sc = new SparkContext(new SparkConf) val input = sc.parallelize(List(1, 2, 3)) // RDD型の変数を作成 val output = input.map(_ + 1) println(output.collect.toList) // => List(2, 3, 4) println(output.reduce(_ + _)) // => 9
このように、分散処理であることを全く意識せずにコードを書くことができます。すごい!
( ^o^)<すごい!List型のMap-Reduceが超簡単だ!
( ˘⊖˘) 。o(ん?でもMap型はどうするんだ?)
そんな疑問を持ったあなたのために、Key/Value Pair RDDというのが用意されています。
以下の処理では、2015年のプロ野球の勝ち数・負け数から順位を計算しています。
val winGameRdd = sc.parallelize(List( ('Giants', 75), // 「値を2つ持ったタプル」のリストを渡す ('Swallows', 76), ('Tigers', 70) ) ) // Key/Value Pair RDD型の変数を作成 val loseGameRdd = sc.parallelize(Map( 'Giants' -> 67, 'Swallows' -> 65, 'Tigers' -> 71 ).toList) // こうやっても作れる // 同じKeyの値でjoin val joinedRdd = winGameRdd join loseGameRdd joinedRdd.collect foreach println // (Giants,(75,67)) // (Swallows,(76,65)) // (Tigers,(70,71)) // 勝ち負けの差(貯金)を計算 val chokinRdd = joinedRdd mapValues { case(win, lose) => win - lose } // 貯金の多い順にソートして表示 chokinRdd.sortBy(_._2, false).collect foreach println // (Swallows,11) // (Giants,8) // (Tigers,-1)
簡単なプログラムをSparkで書き換える
それでは、実際のユースケースに近い(?)プログラムをSpark用に書き換えてみましょう。
今回僕が書き換えたプログラムには、多対多の関係の2種類のIDの組合せを再構成する処理が含まれていました。以下のような処理です。
val input: Map[Int, List[String]] = Map( 1 -> List('a', 'b', 'c'), 2 -> List('a', 'c', 'd'), 3 -> List('c', 'd', 'e') ) // ...処理 val output: Map[String, List[Int]] = Map( 'a' -> List(1, 2), 'b' -> List(1), 'c' -> List(1, 2, 3), 'd' -> List(2, 3), 'e' -> List(3) )
この処理を実現するのは実は非常に簡単で、たった数行でできます。
val output = sc.parallelize(input.toList) .flatMap { case (idA, idBs) => idBs.map(idB => (idB, idA)) } .groupByKey .mapValues(_.toList) .collectAsMap
まとめ
以上のように、SparkのRDDを使うと普通のコレクション操作を行う感じで分散コンピューティングを行うことができます。
ただし、今回僕が取り組んだ課題のように普通のScalaで書かれた既存のバッチ処理を書き換えようとすると、色々なはまりどころがあります。後編では、既存処理をSpark用に書き換える際にどのような問題があり、それをどのように解決するかについて説明します。