FANCOMI Ad-Tech Blog

株式会社ファンコミュニケーションズ nend・新規事業のエンジニア・技術ブログ

【前編】Apache Sparkを使って、メモリ使用量が大きいバッチ処理をスケールアウト

初めまして、弊社で1ヶ月半ほどインターンとして働いているt_sakaiです。

インターンシップの課題として、メモリ使用量が大きくて将来問題になりそうなバッチ処理をスケールアウトできるように書き換えるという課題に取り組みました。 解決手段として流行りのApache Sparkを使ってみたので、本記事ではこれについて書こうと思います。

本記事で使っている言語はScalaです。

Spark + Scala

Scalaについては、弊社のk_oomoriが書いた記事があるのでよろしければご覧ください。

fancs.hatenablog.com

紹介する内容

前編(本記事)

  • 今回解決したかった課題
  • なぜSparkを選んだのか
  • RDDについて
  • 簡単なプログラムをSparkで書き換える

後編

  • 本番プログラムをSpark用に書き換える
  • はまりどころ

紹介しない内容

  • Hadoopとの比較
  • Spark環境の構築
  • 性能チューニング

解決したかった課題

弊社のScalaで書かれたある毎時のバッチ処理が、80GBほどのメモリを食っていました。消費メモリ量は扱うデータ量の大きさに依存して増加するようだったので、将来的に1台のサーバでは処理しきれなくなる日が来るのは明らかです。

今回の課題は、このバッチ処理複数台のサーバで分散処理するようなプログラムを書いて、この処理をスケールアウトできるようにすることです。また、毎時行われる処理なので、1時間以内に終わらないといけないという制約もあります。

なぜSparkを選んだのか

分散処理用のソフトウェアはいくつもありますが、だいたい以下がSparkを採用した理由です。

  • 処理がオンメモリで行われるため、実行速度が早い
  • RDDという大規模データのMap-Reduce処理を行うためのデータ型を用いて、簡単に分散処理を記述できる(後述)
  • Scalaで書ける(そもそもSpark自体がScalaで書かれている)
  • 最近流行ってる

特に既存のコードをかなり再利用できるという点で、Scalaを使って書けるのが今回はかなり大きな利点でした。

RDDについて

さてここで、Sparkで分散処理を行う上で欠かせない概念、RDD (Resilient Distributed Dataset) について触れたいと思います。

RDDとはSparkライブラリに含まれるクラスです。RDD型として宣言された変数の中身はSparkクラスタ上に分散して保存されます。RDD型はScalaのList型とMap型を合わせたようなもので、様々なメソッド(map, filter, reduce, length...)を持っています。これらのメソッドを用いれば、すべての処理は自動的にSparkクラスタ上で分散処理されるという優れものです。

簡単なMap-Reduce処理をRDDで書いてみましょう。

val sc = new SparkContext(new SparkConf)

val input = sc.parallelize(List(1, 2, 3)) // RDD型の変数を作成
val output = input.map(_ + 1)
println(output.collect.toList) // => List(2, 3, 4)
println(output.reduce(_ + _)) // => 9

このように、分散処理であることを全く意識せずにコードを書くことができます。すごい!

( ^o^)<すごい!List型のMap-Reduceが超簡単だ!

( ˘⊖˘) 。o(ん?でもMap型はどうするんだ?)

そんな疑問を持ったあなたのために、Key/Value Pair RDDというのが用意されています。

以下の処理では、2015年のプロ野球の勝ち数・負け数から順位を計算しています。

val winGameRdd = sc.parallelize(List(
    ('Giants', 75), // 「値を2つ持ったタプル」のリストを渡す
    ('Swallows', 76),
    ('Tigers', 70) )
) // Key/Value Pair RDD型の変数を作成

val loseGameRdd = sc.parallelize(Map(
    'Giants' -> 67,
    'Swallows' -> 65,
    'Tigers' -> 71
).toList) // こうやっても作れる

// 同じKeyの値でjoin
val joinedRdd = winGameRdd join loseGameRdd

joinedRdd.collect foreach println
// (Giants,(75,67))
// (Swallows,(76,65))
// (Tigers,(70,71))

// 勝ち負けの差(貯金)を計算
val chokinRdd = joinedRdd mapValues {
    case(win, lose) => win - lose
}

// 貯金の多い順にソートして表示
chokinRdd.sortBy(_._2, false).collect foreach println
// (Swallows,11)
// (Giants,8)
// (Tigers,-1)

簡単なプログラムをSparkで書き換える

それでは、実際のユースケースに近い(?)プログラムをSpark用に書き換えてみましょう。

今回僕が書き換えたプログラムには、多対多の関係の2種類のIDの組合せを再構成する処理が含まれていました。以下のような処理です。

val input: Map[Int, List[String]] = Map(
    1 -> List('a', 'b', 'c'),
    2 -> List('a', 'c', 'd'),
    3 -> List('c', 'd', 'e')
)

// ...処理

val output: Map[String, List[Int]] = Map(
    'a' -> List(1, 2),
    'b' -> List(1),
    'c' -> List(1, 2, 3),
    'd' -> List(2, 3),
    'e' -> List(3)
)

この処理を実現するのは実は非常に簡単で、たった数行でできます。

val output = sc.parallelize(input.toList)
    .flatMap { case (idA, idBs) => idBs.map(idB => (idB, idA)) }
    .groupByKey
    .mapValues(_.toList)
    .collectAsMap

まとめ

以上のように、SparkのRDDを使うと普通のコレクション操作を行う感じで分散コンピューティングを行うことができます。

ただし、今回僕が取り組んだ課題のように普通のScalaで書かれた既存のバッチ処理を書き換えようとすると、色々なはまりどころがあります。後編では、既存処理をSpark用に書き換える際にどのような問題があり、それをどのように解決するかについて説明します。

fancs.hatenablog.com