Java8のStreamの目的と書きやすさや可読性、並行処理の効果について

さて、前回Java8のStreamの使い方をざっと見てみたのですけど、はてなブックマークのコメントで「Javaが使われている領域でこんな言語拡張は必要か」「可読性が損なわれていて単なる自己満足ではないか」のようなコメントがついていました。
実際どうなのか考えてみます。

Java8のStreamの目的

では、いまJavaが使われている領域を考えてみましょう。
Javaがいまよく使われているのは、クライアントサイドではなくサーバーサイドです。とくに、直接アクセスをうけつけるサーバーではなく、分散データ処理のためのHadoopやHBase、全文検索エンジンLuceneなど、バックエンド処理を行う製品のシェアが大きいように見えます。
TwitterGoogleでも、Javaで書かれたバックエンドが動いているようです。Facebookも分析系ではJavaを使っているようです。
大手サービスでバックエンドでの事例が多いとはいえ、手元をみると、TomcatGlassFishなど、Javaで書かれたアプリケーションサーバーが動いています。
このようにJavaは、インフラ系のサーバーを記述するためやデータ処理のために多く使われています。


そしていま、サーバーだけでなく携帯端末まで、すべてのコンピュータはマルチコア化してきています。
そのような中で、パフォーマンスのよいプログラムを書くためには、マルチコアを活かした並行処理が書ける必要があります。
つまり、Javaが使われている領域を考えれば、並行処理が書きやすくなる必要があるわけです。


並行処理を記述するためのJava仕様としては、JSR166シリーズがあります。
ここで、必要なインタフェースとして、intとintをうけとってintを返す演算を表すインタフェース、intとlongをうけとってintを返す演算を表すインタフェース、のように、さまざまな型の組み合わせを考えると132種類のインタフェースが必要という話がありました。
Java技術最前線 - Java SE 7徹底理解 第3回 Fork/Join FrameworkからProject Lambdaへ:ITpro


これを実際に使い分けるのは、いかに冗長な記述に慣れたJavaプログラマといえども面倒です。そこで、型推論によって、いちいち必要な型をプログラマが判断しなくてもよくなる必要がありました。
この点でいえば、実はlambda記法というのは飾りで、本命はそこに組み込まれた型推論であるということができます。
もっといえば、並行処理ではないほうのStreamは、単なるおまけ、並行処理ができない場合の記述も統一的に書けるようにしているにすぎないといえます。


もちろん、個人的には並行処理ではないStreamも非常にうれしいです。
前のエントリの反応を見て思ったのですが、Streamとlambda記法によって、Javaプログラマの間でも当たり前に関数型の話題ができるような土壌ができるように感じます。いままでは、関数型の話題ができるようになった人は、Scalaその他、別言語のプログラマになっちゃってました。
他の言語をやらなくても、関数型の話題ができる(可能性がある)というのは、Javaプログラマの能力の底上げになりそうです。実際には「底」はあがらないでしょうから、中間層の能力があがる中上げですかね。

Java8 Streamを使った並行処理の効果

というところで、実際にStreamを使った並行処理の効果を見てみましょう。
まずは、テスト用データとして、アルファベット大文字と数字の20文字からなるランダムな文字列を100万要素ほど作成します。

Random r = new Random(2111);
//List<String> data = intRange(0, 1_000_000)
List<String> data = range(0, 1_000_000)
    .mapToObj(i->
        r.ints().limit(20)
            .map(n -> Math.abs(n) % 36)
            .map(code -> (code < 10) ? '0' + code : 'A' + code - 10)
            .mapToObj(ch -> String.valueOf((char)ch))
            .collect(toStringBuilder())
            .toString())
    .collect(toList());

前のエントリでランダムを出力する無限ストリームを作ってみましたが、標準ライブラリのRandamクラスにも同様の無限ストリームが用意されていますね。


こんな感じで3件ほど表示してみましょう。

data.stream().limit(3).forEach(System.out::println);


こんなデータができてます。

1M3FX39ONOSZR9HEZT1E
E3X2WPI2F0IHUKIOR52L
BSG6JSN37Q1L9VJUBKRO


さて、では各データの数値以外を取り除いて、文字が残ったもののうち、数字の合計が30より小さいものについて、数値の合計を求めるという処理を書いてみます。

public static long streamSum(List<String> data){
    return data.stream()
            .map(d -> d.replaceAll("[^0-9]", ""))//数字以外を取り除く
            .filter(d -> !d.isEmpty())
            .filter(d -> d.chars().map(ch -> ch - '0').sum() < 30)//数字の合計が30より小さい
            .mapToLong(d -> Long.parseLong(d)).sum();
}

結果は「14795382494128」になります。


これを並行処理が行えるように書き換えるには、streamメソッドの呼び出しをparallelStreamメソッドに置き換えるだけです。

public static long parallelSum(List<String> data){
    return data.parallelStream()
            .map(d -> d.replaceAll("[^0-9]", ""))//数字以外を取り除く
            .filter(d -> !d.isEmpty())
            .filter(d -> d.chars().map(ch -> ch - '0').sum() < 30)//数字の合計が30より小さい
            .mapToLong(d -> Long.parseLong(d)).sum();
}


ついでに、Streamを使わず普通に手続き的に書いたものも用意しておきます。

public static long proceduralSum(List<String> data){
    long result = 0;
    for(String d : data){
        String numOnly = d.replaceAll("[^0-9]", "");
        if(numOnly.isEmpty()) continue;
        int total = 0;
        for(char ch : numOnly.toCharArray()){
            total += ch - '0';
        }
        if(total >= 30) continue;
        long value = Long.parseLong(numOnly);
        result += value;
    }
    return result;
}

なんかほっとしますね。


さて、それでは、実行速度を計るためのメソッドを用意します。

public static Long bench(Function<List<String>, Long> proc, List<String> data){
    long start = System.nanoTime();
    proc.apply(data);
    return System.nanoTime() - start;
}


では、先ほどの処理メソッドをリストにしておきます。ついでに対応する名前のリストも作っておきましょう。

List<Function<List<String>, Long>> procs = 
        Arrays.asList(
            Bench::streamSum, Bench::parallelSum, Bench::proceduralSum);
List<String> names = Arrays.asList("Stream Sum", "Parallel Sum", "Procedural Sum");


それではベンチマーク実行です。10回実行して最初の3回は捨てて残りの平均を秒として表示しています。

zip(procs.stream(), names.stream(), 
    (proc, name) -> String.format("-- %s --%n%.3f",
            name,
            range(0, 10)
                .mapToLong(i -> bench(proc, data))
                .substream(3)//最初の3回は切り捨て
                .average().orElse(0) / 1000 / 1000 / 1000)//ナノ秒を秒に
).forEach(System.out::println);

Core i7 2600K 3.4GHz、メモリ16GBのWindows7で、lambda-8-b86で動かすと、こんな感じの結果になりました。

-- Stream Sum --
1.218
-- Parallel Sum --
0.326
-- Procedural Sum --
1.031

4コア8スレッドで、さすがに処理時間1/4とまではいきませんが近い感じに処理時間が削減されています。


また、手続き型で書いた場合も1割以上処理時間が削減しています。やはりStreamでの記述は、細かいメソッド呼び出しに分割されたり、遅延実行などの仕組みなどが入ったりで、オーバーヘッドがそれなりにあるということでしょう。もちろん、まだ開発版なので性能改善はあると思いますが、おそらく傾向としてはリリース版でもこのような感じになるのではないでしょうか。
パフォーマンスが気になるときにはStreamより手続き型で書いたほうがいいけど、それよりparallelStreamにしようぜ、文句あるならがんばって手書きで並列化しようって感じですね。

Streamでの書きやすさと可読性

さて、それでは改めて、今回のStreamでの記述を見てみましょう。

public static long streamSum(List<String> data){
    return data.stream()
            .map(d -> d.replaceAll("[^0-9]", ""))
            .filter(d -> !d.isEmpty())
            .filter(d -> d.chars().map(ch -> ch - '0').sum() < 30)
            .mapToLong(d -> Long.parseLong(d)).sum();
}

「各データの数値以外を取り除いて、文字が残ったもののうち、数字の合計が30より小さいものについて、数値の合計を求める」という、やりたいことがそのまま、それ以外は文字から数値への変換処理という具合になっています。
この点では、非常に書きやすく、また意図が読みやすいコードになっています。


一方で、手続き的な処理のコードです。

public static long proceduralSum(List<String> data){
    long result = 0;
    for(String d : data){
        String numOnly = d.replaceAll("[^0-9]", "");
        if(numOnly.isEmpty()) continue;
        int total = 0;
        for(char ch : numOnly.toCharArray()){
            total += ch - '0';
        }
        if(total >= 30) continue;
        long value = Long.parseLong(numOnly);
        result += value;
    }
    return result;
}

まず、今回のベンチのためにこのコードを書くとき、はっきりいって面倒でした。書きやすいコードだとは思いません。
一方、読みやすさとしてはどうでしょうか。処理の意図が必ずしも読み取りやすいとはいえません。その意味での可読性は低いです。その代わり、どのような処理が行われるかは正確に読み取ることができます。


Streamでの処理では、実際にどのような処理が行われるか、コードから読み取れません。mapやfilterのたびに新たなStreamが生成されているのか、それともsumの時点まで生成されないのか、先ほどのコードからはわかりません。並行実行してないということも、コードだけからはわからず、「streamメソッドでは逐次実行のStreamが生成される」という仕様から推測されるだけです。もうないでしょうが、仕様がかわって並行実行されるかもしれません。
Stream処理では、実行手順は隠されています。その点で、手続き型で書いたほうが実行手順が読み取りやすいです。


そして、Stream処理には実行手順以外にも隠されているものがあります。
それは、処理の中間状態です。


手続き型の処理を見てみると、変数が6つあります。
このうち、変数dと変数chは、拡張forによって要素が割り当てられる変数で、プログラマが値を割り当てる変数ではありません。値を受け取るための変数であり、引数に近いです。
変数numOnlyと変数valueについては、これは実はなくてもよい変数です。
たとえば変数numOnlyには「d.replaceAll("[^0-9]", "")」が割り当てられていますが、変数numOnlyの部分を「d.replaceAll("[^0-9]", "")」に置き換えても、そのまま処理が通ります。変数valueについても同様です。もちろん、可読性や処理効率の違いはありますが、手続きとしての違いはありません。
ここであげた、変数d・ch・numOnly・valueについては、変数の再代入を行っておらず、finalをつけることができます。Scalaでいえばvalですね。


値を受け取るための変数や、可読性・処理効率のための変数は、Streamでの処理でも使っていました。
問題は変数resultと変数totalです。


これは、0で初期化されていますが、かといってすべての変数resultや変数totalを0に置き換えることはできません。
処理の過程で再代入が行われているからです。
つまり、この2つの変数にはfinalをつけることができません。Scalaであればvarで宣言する必要があります。
もっと見れば、変数の再代入では「+=」演算子を使って、それまでの値に加算が行われています。このようにして、処理の途中経過の値をつくっているのです。
つまり、これは処理の中間状態を扱う変数だといえます。


このような変数は、たとえばListを構築する場合にはListの中間状態、文字列を構築する場合には文字列の中間状態を扱うために必要になります。そして、再代入や、オブジェクトの場合は状態変更が行われます。そのときの状態変更も、そこまでの状態を反映したものになります。
こうして、中間状態を管理するかどうかが、手続き型とStreamでの処理との大きな違いだといえます。


そうすると、手続き型とStreamでの書きやすさ・読みやすさの違いも明らかになります。
Streamで処理が書きやすく、それを手続き型に書き直すのが面倒なのは、Streamでの処理では中間状態の管理が不要だったのが、手続き型では改めて中間状態の管理コードを書く必要があるからです。
また、Streamで処理の意図が読みやすいのは、中間状態が隠されているからだといえます。
そして、手続き型で処理が見やすいというのは、中間状態の遷移が明確であるということです。要件の厳しいコードを書く場合には、例外処理など中間状態の細かな制御も必要になりますが、そのような場合には手続き型のほうが処理も書きやすいでしょう。


このように、手続き型とStreamでの処理には、読みやすさ・書きやすさに違いがあります。このような違いを踏まえることは、たとえ手続き型だけで処理を書くとしても大事なことだと思います。
そして、このような話題を、他の言語を借りずにJavaの文脈でできるようになることが、Java 8 Lambda導入の最大の利点だと思います。

まとめ

はよ仕様確定して


ベンチマークのソースはこれです。
※5/17 仕様がかわったので、java.util.stream.Streams.intRangeメソッドの部分をjava.util.stream.IntStream.rangeメソッドに変更しました。

import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.function.Function;
import static java.util.stream.Collectors.*;
import static java.util.stream.Streams.*;
import static java.util.stream.IntStream.*;

public class Bench {
    
    public static void main(String... args){
        //テスト用データ作成
        Random r = new Random(2111);
        //List<String> data = intRange(0, 1_000_000)
        List<String> data = range(0, 1_000_000)
            .mapToObj(i->
                r.ints().limit(20)
                    .map(n -> Math.abs(n) % 36)
                    .map(code -> (code < 10) ? '0' + code : 'A' + code - 10)
                    .mapToObj(ch -> String.valueOf((char)ch))
                    .collect(toStringBuilder())
                    .toString())
            .collect(toList());
        data.stream().limit(3).forEach(System.out::println);
        //1M3FX39ONOSZR9HEZT1E
        //E3X2WPI2F0IHUKIOR52L
        //BSG6JSN37Q1L9VJUBKRO

        //処理と名前を設定
        List<Function<List<String>, Long>> procs = 
                Arrays.asList(
                    Bench::streamSum, Bench::parallelSum, Bench::proceduralSum);
        List<String> names = Arrays.asList("Stream Sum", "Parallel Sum", "Procedurarl Sum");

        //処理結果を確認
        procs.stream()
                .map(proc -> proc.apply(data))
                .forEach(System.out::println);
        //14795382494128
        
        //何度かまわしておく
        //intRange(0, 3).forEach(i -> {
        range(0, 3).forEach(i -> {
            procs.forEach(proc -> proc.apply(data));
        });
        //ベンチ実行
        zip(procs.stream(), names.stream(), 
            (proc, name) -> String.format("-- %s --%n%.3f",
                    name,
                    //intRange(0, 10)
                    range(0, 10)
                        .mapToLong(i -> bench(proc, data))
                        .substream(3)//最初の3回は切り捨て
                        .average().orElse(0) / 1000 / 1000 / 1000)//ナノ秒を秒に
        ).forEach(System.out::println);
    }
    
    public static Long bench(Function<List<String>, Long> proc, List<String> data){
        long start = System.nanoTime();
        proc.apply(data);
        return System.nanoTime() - start;
    }
    
    public static long streamSum(List<String> data){
        return data.stream()
                .map(d -> d.replaceAll("[^0-9]", ""))//数字以外を取り除く
                .filter(d -> !d.isEmpty())
                .filter(d -> d.chars().map(ch -> ch - '0').sum() < 30)//数字の合計が30より小さい
                .mapToLong(d -> Long.parseLong(d)).sum();
    }

    public static long parallelSum(List<String> data){
        return data.parallelStream()
                .map(d -> d.replaceAll("[^0-9]", ""))//数字以外を取り除く
                .filter(d -> !d.isEmpty())
                .filter(d -> d.chars().map(ch -> ch - '0').sum() < 30)//数字の合計が30より小さい
                .mapToLong(d -> Long.parseLong(d)).sum();
    }

    public static long proceduralSum(List<String> data){
        long result = 0;
        for(final String d : data){
            String numOnly = d.replaceAll("[^0-9]", "");
            if(numOnly.isEmpty()) continue;
            int total = 0;
            for(char ch : numOnly.toCharArray()){
                total += ch - '0';
            }
            if(total >= 30) continue;
            long value = Long.parseLong(numOnly);
            result += value;
        }
        return result;
    }
}