Java 7の並列ライブラリをためしてみる

と言っても1メソッド呼び出してるだけですけど。

ソースは最後においていますが、整数のリストの合計を取得してます。
次のようなCallableインタフェースを実装したクラスを用意して

SummaryTask implements Callable<Integer>


そのクラスのリストを用意して

List<SummaryTask> summaryTasks = new ArrayList<>();


一気に実行開始

List<Future<Integer>> results = new ForkJoinPool().invokeAll(summaryTasks);


最後に結果を集計という感じです。

for(Future<Integer> future : results){
    result += future.get();
}


実行結果としては、ぼくのマシンはデュアルコアなので、だいたい2倍の速度になってます。

processors:2
24771919
24771919
seq:15681 para:7564 diff:2.073109


なんか、2倍以上に速くなってるんですけど、テレビとかブラウザとか他のタスクに邪魔されにくくなったってことでしょうかね。
今回データ数は50万件でやってますが、10万件だと1.3倍程度にしか速くならなくて、これはデータが少ないとスレッド起動のオーバーヘッドがバカにならなくなるということじゃないかと思います。


ところで、50万回のループには次のように指定して桁数を見やすくしています。

for(int i = 0; i < 500_000; ++i){


これはJava7で拡張された構文を使っているわけですが、他にも次のように菱形指定でGenericsの型指定を省略したり

List<Integer> data = new ArrayList<>();


次のようにして複数の例外を取得したりしてます。

catch(InterruptedException | ExecutionException ex){


他の構文拡張としては、switchで文字列が使えるようになったとか、自動リソース管理とかあるわけですが、やっぱどうも地味でJava 5の構文拡張でのGenericsとかオートボクシングに比べると素敵感はあまりないですね。


というわけで、ソースはこちら

import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;

/**
 *
 * @author naoki
 */
public class ParallelSum {
    public static void main(String[] args){
        //プロセッサ数の表示
        int procs = ManagementFactory.getOperatingSystemMXBean().getAvailableProcessors();
        System.out.printf("processors:%d%n", procs);
        
        //データの準備
        List<Integer> data = new ArrayList<>();
        Random rand = new Random();
        for(int i = 0; i < 500_000; ++i){
            data.add(rand.nextInt(100));
        }
        
        //逐次合計
        System.out.println(sequencialSum(data));
        
        //とりあえず時間計測前に何度か呼び出しておく
        for(int i = 0; i < 10; ++i){
            sequencialSum(data);
        }
        //時間の計測
        long time;
        time = System.currentTimeMillis();
        for(int i = 0; i < 1000; ++i){
            sequencialSum(data);
        }
        long resultS = System.currentTimeMillis() - time;
        
        //並列合計
        System.out.println(parallelSum(data));
        //とりあえず時間計測前に何度か呼び出しておく
        for(int i = 0; i < 10; ++i){
            parallelSum(data);
        }
        //時間の計測
        time = System.currentTimeMillis();
        for(int i = 0; i < 1000; ++i){
            parallelSum(data);
        }
        long resultP = System.currentTimeMillis() - time;
        
        //結果表示
        System.out.printf("seq:%d para:%d diff:%f%n", resultS, resultP, (double)resultS / resultP);
    }
    
    /** 逐次合計の処理
     * @param data データ
     * @return 合計
     */
    public static int sequencialSum(List<Integer> data){
        int sum = 0;
        for(int i = 0; i < data.size(); ++i){
            sum += data.get(i);
        }
        return sum;
    }
    
    /** 逐次合計の処理
     * @param data データ
     * @return 合計
     */
    public static int parallelSum(List<Integer> data){
        //プロセッサ数を得る
        int procs = ManagementFactory.getOperatingSystemMXBean().getAvailableProcessors();
        //データをタスクに分解する
        List<SummaryTask> summaryTasks = new ArrayList<>();
        for(int i = 0; i < procs; ++i){
            summaryTasks.add(new SummaryTask(data, data.size() * i / procs, data.size() * (i + 1) / procs));
        }
        
        //タスクの実行
        List<Future<Integer>> results = new ForkJoinPool().invokeAll(summaryTasks);
        
        //各タスクでの結果を合計する
        int result = 0;
        try{
            for(Future<Integer> future : results){
                result += future.get();
            }
        }catch(InterruptedException | ExecutionException ex){
            //なんもしない
        }
        return result;
    }
    
    /** 並列合計での各部分の処理を行う */
    static class SummaryTask implements Callable<Integer> {
        List<Integer> data;
        int start;
        int end;

        public SummaryTask(List<Integer> data, int start, int end) {
            this.data = data;
            this.start = start;
            this.end = end;
        }
        
        @Override
        public Integer call() throws Exception {
            int sum = 0;
            for(int i = start; i < end; ++i){
                sum += data.get(i);
            }
            return sum;
        }
    }
    
}