Tambourine作業メモ

主にスキル習得のためにやった作業のメモ。他人には基本的に無用のものです。

Apache Spark で遊びたい(4)

Quick Startの続き。"More on Dataset Operations"に沿って遊んでみる。

データは、大昔のとあるサーバのjavaheapのデータを使うことにする。こんなデータ。

> head -n5 javaheap.app01 
09:00:11	550792720	1543502336
09:01:10	321361976	1543502336
09:02:15	485179520	1543502336
09:03:10	96284072	1543502336
09:04:11	839722800	1543502336

時刻、ヒープ使用量、ヒープサイズの順。ヒープ使用量がヒープサイズに近づくとヒープ拡張が起きる。ヒープ拡張と、大量のGCが起きるタイミングがサーバのパフォーマンス的にヤバいポイントだったので、こういうデータを取っていた。もう10年前のデータで、(今はよく知らないんだけど)その当時の32bit JDKで最大ヒープサイズ4GBは結構キツかったことを覚えている。

読み込んで、ヒープ使用量の最大値を探してみよう。

まず、ファイルを読み込む。

scala> val heap01 = spark.read.textFile("javaheap.app01")
2018-05-03 22:54:45 WARN  ObjectStore:568 - Failed to get database global_temp, returning NoSuchObjectException
heap01: org.apache.spark.sql.Dataset[String] = [value: string]

次に、ヒープ使用量の列だけ取り出す。Scalaの文字列は基本的にjava.lang.Stringだけど、いろいろとトレイトは足されているらしい。まだ勉強中でよくわからない。とりあえず、String#splitはjava.lang.Stringのものが使える。それにしても、正規表現正規表現オブジェクトじゃなくて、文字列で渡すの意味わからない。そして、それを数字に変換する。

scala> heap01.map(line => line.split("\\s+")).map(ary => ary(1).toInt).first
2018-05-03 23:19:37 ERROR Executor:91 - Exception in task 0.0 in stage 2.0 (TID 2)
java.lang.NumberFormatException: For input string: "2456209624"
	at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
	at java.lang.Integer.parseInt(Integer.java:583)
	at java.lang.Integer.parseInt(Integer.java:615)
(以下略)

おっと、軽く2^31を上回ってる値があった。

scala> heap01.map(line => line.split("\\s+")).map(ary => ary(1).toLong).first
res4: Long = 550792720

最大を求めてみよう。まずは、reduceで。

scala> heap01.map(line => line.split("\\s+")).map(ary => ary(1).toLong).reduce((a, b) => if (a > b) a else b)
res5: Long = 3353203544

もちろん、普通はmaxを使うだろう。

scala> heap01.map(line => line.split("\\s+")).map(ary => ary(1).toLong).max
<console>:26: error: value max is not a member of org.apache.spark.sql.Dataset[Long]
       heap01.map(line => line.split("\\s+")).map(ary => ary(1).toLong).max

え、違うの?

Quick Startに書いてあるのはこうだった。

scala> heap01.map(line => line.split("\\s+")).map(ary => ary(1).toLong).reduce((a, b) => Math.max(a, b))
res7: Long = 3353203544

あっているか自信がないので、irbで確かめる。

> irb
irb(main):001:0> heap01 = File.readlines("javaheap.app01")
=> ["09:00:11\t550792720\t1543502336\n", (以下略)
irb(main):002:0> heap01.map{|line| line.split(/\s+/).at(1).to_i}.max
=> 3353203544

整数のサイズを気にしなくて良いのは、Rubyの美点である。そういえば、最近(かな?2.4からですね)、Fixnumなくなったらしい。もちろん、内部的にCのintでおさまる整数値は特別扱いされることは変わらない。

次の例は、flatMapとgroupByKeyを使うもの。flatMapは置いておくことにして、groupByKeyで、各最大ヒープ値の列が何行あるか、数えてみることにする。

scala> heap01.map(line => line.split("\\s+")).groupByKey(ary => ary(2))
res0: org.apache.spark.sql.KeyValueGroupedDataset[String,Array[String]] = KeyValueGroupedDataset: [key: [value: string], value: [value: array<string>]]

scala> val maxHeaps = heap01.map(line => line.split("\\s+")).groupByKey(ary => ary(2)).count
maxHeaps: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]

scala> maxHeaps.collect
res1: Array[(String, Long)] = Array((3162503680,4), (2558523904,1), (1543502336,19), (1816132096,2), (2462054912,14), (2822765056,65), (0,2), (4253022720,1), (3103783424,26), (2101344768,18), (2554329600,1), (4248828416,27), (4244634112,249), (2407528960,2), (2378168832,34), (2864708096,3), (2575301120,2), (3154115072,3), (2483026432,2), (2537552384,2), (2629827072,3), (4299160064,6), (3674208768,2), (2466249216,7), (3124754944,35), (2625632768,21), (2533358080,42), (2793404928,106), (3091200512,64), (2109733376,5), (1795160576,9), (2881485312,3))

groupByKeyすると、戻りの型はKeyValueGroupedDataset[String, Array[String] ] になる。これはなんとなくわからなくもない。直感的にはDataset[Taple[String, Array[String] ] ] が出来れば良いような気がする。.NETのGroupByはそんな感じだった気がする。調べた。VB風に書くと、戻りの型はIEnumerable(Of IGrouping(Of TKey, TElement))である。RubyもEnumerable#group_byの戻りはHashだ。やってみよう。

irb(main):004:0> heap01.map{|line| line.split(/\s+/)}.group_by{|ary| ary[2]}
=> {"1543502336"=>[["09:00:11", "550792720", "1543502336"], (略)

それに対してcountを実施すると、Mapのキーの数が返ってくることを期待すると思うんだ。

irb(main):005:0> heap01.map{|line| line.split(/\s+/)}.group_by{|ary| ary[2]}.count
=> 32

しかし、KeyValueGroupedDataset#countはDataset[(K, Long)]を返すことになっている。つまり、キーとそのキーでグルーピングされた要素の数のタプルのDatasetである。うーん、便利なのか。便利かな。よくわからない。