Quick Startの続き。"More on Dataset Operations"に沿って遊んでみる。
データは、大昔のとあるサーバのjavaheapのデータを使うことにする。こんなデータ。
> head -n5 javaheap.app01 09:00:11 550792720 1543502336 09:01:10 321361976 1543502336 09:02:15 485179520 1543502336 09:03:10 96284072 1543502336 09:04:11 839722800 1543502336
時刻、ヒープ使用量、ヒープサイズの順。ヒープ使用量がヒープサイズに近づくとヒープ拡張が起きる。ヒープ拡張と、大量のGCが起きるタイミングがサーバのパフォーマンス的にヤバいポイントだったので、こういうデータを取っていた。もう10年前のデータで、(今はよく知らないんだけど)その当時の32bit JDKで最大ヒープサイズ4GBは結構キツかったことを覚えている。
読み込んで、ヒープ使用量の最大値を探してみよう。
まず、ファイルを読み込む。
scala> val heap01 = spark.read.textFile("javaheap.app01") 2018-05-03 22:54:45 WARN ObjectStore:568 - Failed to get database global_temp, returning NoSuchObjectException heap01: org.apache.spark.sql.Dataset[String] = [value: string]
次に、ヒープ使用量の列だけ取り出す。Scalaの文字列は基本的にjava.lang.Stringだけど、いろいろとトレイトは足されているらしい。まだ勉強中でよくわからない。とりあえず、String#splitはjava.lang.Stringのものが使える。それにしても、正規表現を正規表現オブジェクトじゃなくて、文字列で渡すの意味わからない。そして、それを数字に変換する。
scala> heap01.map(line => line.split("\\s+")).map(ary => ary(1).toInt).first 2018-05-03 23:19:37 ERROR Executor:91 - Exception in task 0.0 in stage 2.0 (TID 2) java.lang.NumberFormatException: For input string: "2456209624" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Integer.parseInt(Integer.java:583) at java.lang.Integer.parseInt(Integer.java:615) (以下略)
おっと、軽く2^31を上回ってる値があった。
scala> heap01.map(line => line.split("\\s+")).map(ary => ary(1).toLong).first res4: Long = 550792720
最大を求めてみよう。まずは、reduceで。
scala> heap01.map(line => line.split("\\s+")).map(ary => ary(1).toLong).reduce((a, b) => if (a > b) a else b) res5: Long = 3353203544
もちろん、普通はmaxを使うだろう。
scala> heap01.map(line => line.split("\\s+")).map(ary => ary(1).toLong).max <console>:26: error: value max is not a member of org.apache.spark.sql.Dataset[Long] heap01.map(line => line.split("\\s+")).map(ary => ary(1).toLong).max
え、違うの?
Quick Startに書いてあるのはこうだった。
scala> heap01.map(line => line.split("\\s+")).map(ary => ary(1).toLong).reduce((a, b) => Math.max(a, b)) res7: Long = 3353203544
あっているか自信がないので、irbで確かめる。
> irb irb(main):001:0> heap01 = File.readlines("javaheap.app01") => ["09:00:11\t550792720\t1543502336\n", (以下略) irb(main):002:0> heap01.map{|line| line.split(/\s+/).at(1).to_i}.max => 3353203544
整数のサイズを気にしなくて良いのは、Rubyの美点である。そういえば、最近(かな?2.4からですね)、Fixnumなくなったらしい。もちろん、内部的にCのintでおさまる整数値は特別扱いされることは変わらない。
次の例は、flatMapとgroupByKeyを使うもの。flatMapは置いておくことにして、groupByKeyで、各最大ヒープ値の列が何行あるか、数えてみることにする。
scala> heap01.map(line => line.split("\\s+")).groupByKey(ary => ary(2)) res0: org.apache.spark.sql.KeyValueGroupedDataset[String,Array[String]] = KeyValueGroupedDataset: [key: [value: string], value: [value: array<string>]] scala> val maxHeaps = heap01.map(line => line.split("\\s+")).groupByKey(ary => ary(2)).count maxHeaps: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint] scala> maxHeaps.collect res1: Array[(String, Long)] = Array((3162503680,4), (2558523904,1), (1543502336,19), (1816132096,2), (2462054912,14), (2822765056,65), (0,2), (4253022720,1), (3103783424,26), (2101344768,18), (2554329600,1), (4248828416,27), (4244634112,249), (2407528960,2), (2378168832,34), (2864708096,3), (2575301120,2), (3154115072,3), (2483026432,2), (2537552384,2), (2629827072,3), (4299160064,6), (3674208768,2), (2466249216,7), (3124754944,35), (2625632768,21), (2533358080,42), (2793404928,106), (3091200512,64), (2109733376,5), (1795160576,9), (2881485312,3))
groupByKeyすると、戻りの型はKeyValueGroupedDataset[String, Array[String] ] になる。これはなんとなくわからなくもない。直感的にはDataset[Taple[String, Array[String] ] ] が出来れば良いような気がする。.NETのGroupByはそんな感じだった気がする。調べた。VB風に書くと、戻りの型はIEnumerable(Of IGrouping(Of TKey, TElement))である。RubyもEnumerable#group_byの戻りはHashだ。やってみよう。
irb(main):004:0> heap01.map{|line| line.split(/\s+/)}.group_by{|ary| ary[2]} => {"1543502336"=>[["09:00:11", "550792720", "1543502336"], (略)
それに対してcountを実施すると、Mapのキーの数が返ってくることを期待すると思うんだ。
irb(main):005:0> heap01.map{|line| line.split(/\s+/)}.group_by{|ary| ary[2]}.count => 32
しかし、KeyValueGroupedDataset#countはDataset[(K, Long)]を返すことになっている。つまり、キーとそのキーでグルーピングされた要素の数のタプルのDatasetである。うーん、便利なのか。便利かな。よくわからない。