メッセージキューで遊びたいのでJMSを試す

これからはメッセージキューが大切ということで、JavaのメッセージングAPIであるJMS(Java Messaging Service)を試してみます。


JMS試すにはメッセージングサーバーが必要で、Open MQとかを使います。けど、わざわざOpen MQとかをインストールして起動させるのもめんどいので、Glassfishを使います。GlassfishにはOpen MQが入ってて、そのまま使える状態になっています。NetBeans使いならGlassfish入ってますよね。
今回はv2.1を使いました。v3 preludeにはJMS入ってないみたい。
https://glassfish.dev.java.net/


本来ならGlassfishで使う場合にはJNDI経由でJMSサーバーを取得するのですが、今回はめんどうなので、直接つなぎます。
ということで、メッセージ送信側。プロバイダーというらしいですが、センダーでいいじゃん。
コンパイル・実行には、GLASSFISH_HOME\imq\lib\ims.jarとGLASSFISH_HOME\imq\lib\imq.jarをクラスパスに含めておきます。

import com.sun.messaging.ConnectionConfiguration;
import com.sun.messaging.QueueConnectionFactory;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;

public class Sender {
    public static void main(String[] a) throws Exception{
        //接続
        QueueConnectionFactory qcf = new QueueConnectionFactory();
        qcf.setProperty(ConnectionConfiguration.imqAddressList,
                "localhost:7676");
        QueueConnection con = qcf.createQueueConnection();
        //送信準備
        QueueSession sess = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue qu = sess.createQueue("MyQueue");
        QueueSender sender = sess.createSender(qu);

        //送信
        for(int i = 0; i < 10; ++i){
            TextMessage mess = sess.createTextMessage();
            mess.setText("yeahaa" + i);
            mess.setStringProperty("propertyName", "varr");
            sender.send(mess);
        }
        //切断
        con.close();

        System.out.println("connected!");
    }
}


QueueConnectionFactoryがjavax.jmsではなくcom.sun.messagingになってるのは、JNDI使ってGlassfishで設定したものを取ってくるわけじゃないから。
今回は10回送信させてます。Glassfishを起動させておいで、実行させます。
「connected!」と表示されればおっけ。
もしうまくいかないときは、Glassfishのログを見て、ちゃんとJMSが起動できているか確認してください。もし変な例外出てるときは、Glassfishのセットアップに失敗してる可能性があります。ここの手順どおり、ant -f setup.xmlを行ってみてください。
https://glassfish.dev.java.net/downloads/v2.1-b60e.html


で、メッセージ送信できたら、こんどは受信側。
NetBeansは標準出力を複数同時に見れないので、ウィンドウを出します。

import com.sun.messaging.ConnectionConfiguration;
import com.sun.messaging.QueueConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.swing.JFrame;
import javax.swing.JTextArea;

public class Receiver {
    public static void main(String[] args) throws Exception{
        //ウィンドウ準備
        JFrame frame = new JFrame("受信");
        frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
        final JTextArea textarea = new JTextArea();
        textarea.setRows(15);
        frame.add(textarea);
        frame.pack();
        frame.setVisible(true);

        //接続(送信と一緒)
        QueueConnectionFactory qcf = new QueueConnectionFactory();
        qcf.setProperty(ConnectionConfiguration.imqAddressList,
                "localhost:7676");
        QueueConnection con = qcf.createQueueConnection();

        //受信準備
        QueueSession sess = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue qu = sess.createQueue("MyQueue");
        QueueReceiver receive = sess.createReceiver(qu);

        //メッセージを受け取ったときの処理を登録
        receive.setMessageListener(new MessageListener() {
            /** メッセージ受け取ると呼ばれる */
            @Override
            public void onMessage(Message rmess) {
                try {
                    TextMessage mess = (TextMessage) rmess;
                    String text = mess.getText();
                    textarea.append(text + "\n");

                    String prop = mess.getStringProperty("propertyName");
                    textarea.append(prop + "\n");
                    Thread.sleep(500);
                } catch (JMSException e) {
                } catch (InterruptedException e){
                }
            }
        });
        
        //受信開始
        con.start();
        for(;;){
            Thread.sleep(1000);
        }
    }
}


実行すると、メッセージキューからメッセージを受け取って表示します。ここでふたつ実行させると、適当にメッセージを奪い合います。これ使えば、分散処理がいい感じにできそう。


通常の通信プログラムだと、受信プログラムは必ず先に立ち上げておいて受信状態を作らないといけないのですが、JMSサーバーが仲立ちしてくれるので、送信プログラムを起動してメッセージ投げたあとで受信プログラムを起動すれば大丈夫です。
送信側はとりあえずメッセージ投げれば終わり。受信側は適当に起動して適当にメッセージさばく。
この疎結合な感じがいいです。


ただ、メッセージ受信の最後で無限ループしてたり、なんだか美しくないってツッコミも入りそうです。
という人には、これ。EJBのメッセージドリブンBeanがオススメ。
こんな感じでJMSの受信処理が書けます。

import javax.ejb.MessageDriven;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

@MessageDriven(mappedName = "jms/MyQueue")
public class ReceiverBean implements MessageListener {
    public void onMessage(Message message) {
        try {
            TextMessage mess = (TextMessage) message;
            String text = mess.getText();
            System.out.println(text);
            String prop = mess.getStringProperty("propertyName");
            System.out.println(prop);
        } catch (JMSException ex) {
        }
    }
}


こいつをEJBとして登録しておけばいいってことです。Java EEえらい!