スレッドで投機的実行を実現する方法を考えてみた

クリスタルデューク

スレッドのロックをする場合、読み込み処理同士は他に影響を与えないのでロックをする必要がない。
Javaでは、ReadWriteLockを使うと、読み込みスレッドは同時にいくつでも動くけど書き込みスレッドは他のスレッドが動いていると処理をまち、書き込み処理中は読み込み処理もできなくなる。

ただ、ReadWriteLockの場合、処理が読み込みなのか書き込みなのかがロック取得時に決まっている必要がある。
なので、あらかじめ読み込み処理か書き込み処理か決めれず、通常は書き込みを行わないけど条件によって書き込むというような場合にはReadWriteLockは使えない。


ということで、書き込みの頻度が少ない場合には、とりあえず処理を行っておいて、書き込みなければそのまま実行、書き込みがあっても他のスレッドによって値が変えられてなければやはりそのまま実行、もし他のスレッドによって書き込みがあれば、処理をキャンセルして新しい値を取得してやりなおし、という風にすると、無駄なロックが発生しにくくていい。
こういうのを投機的実行というけど、これをJavaのスレッドでやるとどんな感じになるか考えてみた。
スレッドごとに値のコピーをもつことになるので、たぶんこういうのをトランザクショナルメモリというんだと思う。ソフトウェアトランザクショナルメモリか。


で、そのソース

    public class TransactionalInteger{
        int value;
        final ThreadLocal<Integer> myValue = new ThreadLocal<Integer>();
        final ThreadLocal<Integer> readVer = new ThreadLocal<Integer>();
        final ThreadLocal<Boolean> modified = new ThreadLocal<Boolean>();
        int version = 0;

        public TransactionalInteger() {
        }

        public TransactionalInteger(int value) {
            this.value = value;
        }

        int get() {
            return myValue.get();
        }

        void set(int o) {
            myValue.set(o);
            modified.set(true);
        }

        void flush(){
            synchronized(TransactionalInteger.this){
                myValue.set(value);
                readVer.set(version);
                modified.set(false);
            }
        }

        void with(final Runnable runner) {
            for(;;) {
                flush();
                runner.run();
                synchronized(TransactionalInteger.this){
                    if(modified.get()){
                        if(version != readVer.get()){
                            //変更されている
                            continue;
                        }
                        value = myValue.get();
                        version++;
                    }
                }
                break;
            }
        }
    }


こんな感じで使う

    TransactionalInteger value = new TransactionalInteger(0);

    new Thread(){
        @Override
        public void run(){
            //なんか処理

            //valueを使う処理を投機的実行
            value.with(new Runnable(){
                @Override
                public void run(){
                    //値を使う
                    System.out.println(value.get());
                    //たまに書き込む
                    if(Math.random() < .3) value.set(value.get() + 1);
                }
            });// 書き込み処理をした場合、他のスレッドによって
               // 値が上書きされてたら、やりなおす
        }
   }.start();


動きが見たい人は、このサンプルで。

ソースはこれ

import java.util.HashSet;
import java.util.Random;
import java.util.concurrent.Semaphore;
import javax.swing.JTextField;

public class TransactionalThreadFrame extends javax.swing.JFrame {
    TransactionalInteger value = new TransactionalInteger(0);
    Semaphore sem = new Semaphore(4);
    /** Creates new form TransactionalThreadFrame */
    public TransactionalThreadFrame() {
        initComponents();

        JTextField[] fs = new JTextField[10];
        for(int i = 0; i < fs.length; ++i){
            fs[i] = new JTextField();
            add(fs[i]);
        }
        Random r = new Random();
        for(JTextField f : fs){
            new Thread(new Runner(f, 0, r.nextInt(3) == 0)).start();
        }


    }

    class Runner implements Runnable{
        JTextField f;
        int idx;
        boolean change;
        HashSet m;
        Integer s;
        public Runner(JTextField f, int idx, boolean change) {
            this.f = f;
            this.idx = idx;
            this.change = change;
            s = (int)(Math.random() * 10);
        }

        public void run() {
            value.with(new Runnable() {
                public void run() {
                    try {
                        String str = (change ? "変更" : "") + 
                            "電車" + value.get();
                        f.setText(str);
                        sem.acquire();
                        value.flush();
                        if(change){
                            value.set(value.get() + 1);
                        }
                        str = (change ? "変更" : "") + "電車" + value.get();

                        for(int i = 0; i < 100; ++i){
                            str = " " + str;
                            f.setText(str);
                            Thread.sleep(45 + s);
                        }
                    } catch (InterruptedException ex) {
                    }finally{
                        sem.release();
                    }
                }
            });
        }
    }


    public static class TransactionalInteger{
        int value;
        final ThreadLocal<Integer> myValue = new ThreadLocal<Integer>();
        final ThreadLocal<Integer> readVer = new ThreadLocal<Integer>();
        final ThreadLocal<Boolean> modified = new ThreadLocal<Boolean>();
        int version = 0;

        public TransactionalInteger() {
        }

        public TransactionalInteger(int value) {
            this.value = value;
        }

        int get() {
            return myValue.get();
        }

        void set(int o) {
            myValue.set(o);
            modified.set(true);
        }

        void flush(){
            synchronized(TransactionalInteger.this){
                myValue.set(value);
                readVer.set(version);
                modified.set(false);
            }
        }

        void with(final Runnable runner) {
            for(;;) {
                flush();
                runner.run();
                synchronized(TransactionalInteger.this){
                    if(modified.get()){
                        if(version != readVer.get()){
                            //変更されている
                            continue;
                        }
                        value = myValue.get();
                        version++;
                    }
                }
                break;
            }
        }
    }

    @SuppressWarnings("unchecked")
    private void initComponents() {

        setDefaultCloseOperation(javax.swing.WindowConstants.EXIT_ON_CLOSE);
        getContentPane().setLayout(new java.awt.GridLayout(0, 1));

        java.awt.Dimension screenSize = 
            java.awt.Toolkit.getDefaultToolkit().getScreenSize();
        setBounds(
            (screenSize.width-416)/2, (screenSize.height-337)/2, 416, 337);
    }

    /**
    * @param args the command line arguments
    */
    public static void main(String args[]) {
        java.awt.EventQueue.invokeLater(new Runnable() {
            public void run() {
                new TransactionalThreadFrame().setVisible(true);
            }
        });
    }

}