メッセージキューを使って分散MapReduceを実装する

さて、JMSでメッセージキューも使えるようになって、HadoopMapReduceも試してみた。そうするとやりたくなるのがメッセージキューを使った分散MapReduceの実装ですね。ということで、JMSを使ってメッセージキューによる分散MapReduceをやってみました。実際にはローカルでしか動かないのですが、コンセプトモデルということで。
メッセージキューで遊びたいのでJMSを試す
HadoopでのMapReduceを気軽に試すサンプル


Hadoopサンプルで作ったのと同じように、クラスがJavaファイル中でimportされている回数を数えてみます。
考え方として、ちょっと強引ですが、GoogleHadoopMapReduce分散ファイルシステム付きメッセージキューといえます。けど小規模につつましくやる分には分散ファイルシステムは必要ないので、MapとReduceを分散することだけ考えてみます。分散ファイルシステムを使わず、自分でファイル分割すればいいかなと。
今回の入力ファイルは別ファイルに分割されているのですが、問題はMapperの出力結果をReducerに渡す中間ファイルの分割です。

Reducerの処理を考えると、中間ファイルを振り分けるとき、同じキーは同じファイルに保存されるようにしておく必要があります。そこで、キー文字列のハッシュ値をとって、その値を20で割った余りによってファイルを振り分けます。実際には、下の桁を使うのは気持ち悪いので、下2桁けずってます。


ということで、プログラムはメッセージ送信プログラム(CountImportSender)と、メッセージを受信してMap/Reduceを実行するプログラム(CountImportMapReduce)に分かれます。ソースは最後に。
CountImportSenderでは、まず入力ファイル名をMap用メッセージキューに入れます。で、そのメッセージが全部処理されるのを待って、こんどは中間ファイル名をReduce用メッセージキューに入れます。そのメッセージが全部処理されれば、Map/Reduce処理が終了というわけです。
CountImportMapReduceは、MapメッセージとReduceメッセージの処理を行います。Mapメッセージの処理では、受け取った名前のファイルからimport文を読み込んで、そのクラス名を適当な中間ファイルに入れます。Reduceメッセージの処理では、受け取った名前の中間ファイルから、クラス名をカウントして結果ファイルに書き込みます。


CountImportMapReduceをいくつか起動しておいてCountImportSenderを実行すると、適当にMap/Reduceの処理が分散して行われます。

ここで、CountImportSenderをたくさん立ち上げれば、適当に処理が振り分けられるのがいいところです。
ネットワーク越しのファイル読み込みを考慮したり中間ファイルの生成先を工夫すると、それなりに面白いことができそうです。


ソースはここから。
ライブラリや実行環境は、JMSのエントリを参考にしてください。
まずはメッセージ送信側のCountImportSender。INPUT_PATH、OUTPUT_PATHは適当に指定してください。

import com.sun.messaging.ConnectionConfiguration;
import com.sun.messaging.QueueConnectionFactory;
import java.io.*;
import java.util.LinkedList;
import javax.jms.*;

public class CountImportSender {
    static final String INPUT_PATH = "C:/jmsmapreduce/src";
    static final String OUTPUT_PATH = "C:/jmsmapreduce/temp";
    static final String MAPQUEUE_NAME = "CountMapperQueue";
    static final String REDUCEQUEUE_NAME = "CountReducerQueue";
    static final int REDUCE_COUNT = 20;

    public static void main(String[] args) throws JMSException, IOException, InterruptedException{
        //入力フォルダの確認
        File in = new File(INPUT_PATH);
        if(!in.exists()){
            System.out.println(in.getCanonicalPath() + "がありません。");
            return;
        }

        //結果フォルダの準備
        File out = new File(OUTPUT_PATH);
        if(!out.exists()) out.mkdirs();
        for(File f : out.listFiles()) f.delete();
        File resultFile = new File(out, "result.txt");

        //メッセージサーバーに接続
        QueueConnectionFactory qcf = new QueueConnectionFactory();
        qcf.setProperty(ConnectionConfiguration.imqAddressList,
                "localhost:7676");
        QueueConnection con = qcf.createQueueConnection();
        QueueSession sess = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

        //Mapメッセージ
        System.out.println("Map");
        Queue qu = sess.createQueue(MAPQUEUE_NAME);
        QueueSender sender = sess.createSender(qu);
        LinkedList<File> dirs = new LinkedList<File>();
        dirs.add(in);
        while(!dirs.isEmpty()){
            for(File f : dirs.pop().listFiles()){
                //各ファイルの処理
                if(f.isDirectory()){
                    dirs.add(f);
                    continue;
                }
                if(!f.getName().endsWith(".java")) continue;
                //Javaソースのファイル名をメッセージに追加
                TextMessage msg = sess.createTextMessage(f.getCanonicalPath());
                msg.setStringProperty("outPath", out.getCanonicalPath());
                msg.setIntProperty("reduceCount", REDUCE_COUNT);
                sender.send(msg);
            }
        }

        //Map終了待ち
        QueueBrowser qb = sess.createBrowser(qu);
        while(qb.getEnumeration().hasMoreElements()){
            Thread.sleep(100);
        }

        //Reduceメッセージ
        System.out.println("Reduce");
        qu = sess.createQueue(REDUCEQUEUE_NAME);
        sender = sess.createSender(qu);
        for(int i = 0; i < REDUCE_COUNT; ++i){
            //中間ファイルをメッセージに追加
            File f = new File(out, String.format("%02d.txt", i));
            TextMessage msg = sess.createTextMessage(f.getCanonicalPath());
            msg.setStringProperty("outFile", resultFile.getCanonicalPath());
            sender.send(msg);
        }

        //Reduce終了待ち
        qb = sess.createBrowser(qu);
        while(qb.getEnumeration().hasMoreElements()){
            Thread.sleep(100);
        }
        //切断
        System.out.println("Finish");
        con.close();
    }
}


メッセージを受信してMap/Reduce処理を行うCountImportMapReduce。中間ファイルがなかった場合の処理を書いてなかったので、適当に追加したほうがいいかも。

import com.sun.messaging.ConnectionConfiguration;
import com.sun.messaging.QueueConnectionFactory;
import java.io.*;
import java.util.*;
import javax.jms.*;
import javax.jms.Queue;
import javax.swing.*;

public class CountImportMapReduce{
    public static void main(String[] args) throws JMSException, InterruptedException{
        //ウィンドウ準備
        JFrame frame = new JFrame("受信");
        frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
        final JTextArea textarea = new JTextArea();
        //textarea.setColumns(20);
        textarea.setRows(15);
        frame.add(textarea);
        frame.pack();
        frame.setVisible(true);

        //接続
        QueueConnectionFactory qcf = new QueueConnectionFactory();
        qcf.setProperty(ConnectionConfiguration.imqAddressList,
                "localhost:7676");
        QueueConnection con = qcf.createQueueConnection();
        QueueSession sess = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

        //Mapper準備
        Queue qu = sess.createQueue(CountImportSender.MAPQUEUE_NAME);
        QueueReceiver receive = sess.createReceiver(qu);
        receive.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message rmess) {
                try {
                    //Map処理
                    TextMessage mess = (TextMessage) rmess;
                    String filename = mess.getText();
                    String outPath = mess.getStringProperty("outPath");
                    File outDir = new File(outPath);
                    int reduceCount = mess.getIntProperty("reduceCount");
                    textarea.append("Map:" + filename + "\n");

                    FileReader fr = new FileReader(filename);
                    BufferedReader buf = new BufferedReader(fr);
                    for(String line; (line = buf.readLine()) != null;){
                        if(!line.startsWith("import ")) continue;
                        String className = line.substring(7, line.length() - 1);
                        //ハッシュコードによってファイルを振り分ける
                        int hash = className.hashCode();
                        if(hash < 0) hash = -hash;
                        int num = (hash / 100) % reduceCount;
                        //ファイルに書き込む
                        File outFile = new File(outDir, String.format("%02d.txt", num));
                        FileWriter fw = new FileWriter(outFile, true);
                        fw.append(className + "\n");
                        fw.close();
                    }
                    buf.close();
                    fr.close();
                } catch (IOException ex) {
                } catch (JMSException e) {
                }
            }
        });

        //Reduce準備
        qu = sess.createQueue(CountImportSender.REDUCEQUEUE_NAME);
        receive = sess.createReceiver(qu);
        receive.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message rmess) {
                try {
                    //Reduce処理
                    TextMessage mess = (TextMessage) rmess;
                    String filename = mess.getText();
                    String outFileName = mess.getStringProperty("outFile");
                    textarea.append("Redulce:" + filename + "\n");

                    //クラス数のカウント
                    Map<String, Integer> counter = new HashMap<String, Integer>();
                    FileReader fr = new FileReader(filename);
                    BufferedReader buf = new BufferedReader(fr);
                    for(String line; (line = buf.readLine()) != null;){
                        if(counter.containsKey(line)){
                            counter.put(line, counter.get(line) + 1);
                        }else{
                            counter.put(line, 1);
                        }
                    }
                    buf.close();
                    fr.close();
                    //出力
                    FileWriter fw = new FileWriter(outFileName, true);
                    for(Map.Entry<String, Integer> me : counter.entrySet()){
                        fw.append(String.format("%s\t%d\n", me.getKey(), me.getValue()));
                    }
                    fw.close();
                } catch (IOException ex) {
                } catch (JMSException ex) {
                }
            }
        });

        //メッセージ受信開始
        con.start();
        for(;;){
            Thread.sleep(1000);
        }
    }
}