[Java How To Programming] [Home on 246net] [Home on Alles net]
Powered by SmartDoc

java.util.concurrentパッケージを用いたマルチスレッドプログラミングについて

Sun Sep 19 07:46:46 JST 2010
TAKAHASHI, Toru
torutk at gmail.com

目次

はじめに

Java SE 6時点では、マルチスレッドプログラミングを行う素材として、当初から持つThreadクラスと、Java 5で追加されたjava.util.concurrentパッケージがあります。また、Swing GUI上でスレッドを扱うjavax.swing.SwingWorkerクラスもJava SE 6から追加されています。

Java SE 6におけるスレッドの生成方法

スレッド生成方法の比較

スレッド生成方法の特性
項目 Threadクラス Executorインタフェース SwingWorkerクラス
スレッドプールの利用 なし あり あり(*1)
スレッド上限数の制御 なし あり あり(*1)
ロジックの記述 Runnable Runnable/Collable SwingWorker
遅延実行 なし あり なし
周期実行 なし あり なし
スレッドの実行結果の取得 なし あり あり
  • *1)SwingWorkerのソースコード(JDK 6 Update21に付属)を見ると、内部でThreadPoolExecutorクラスを生成し、スレッド数を10に設定している

Threadクラスを使う方法は、自由度は大きいのですが、リソースの上限を制御することなどの機能は、原始的なAPIを駆使してがかなり低レベルな処理を書かなくてはならないので大変です。それなりにマルチスレッドプログラミングをするのであれば、Executorフレームワークを使うのが適していると思います。なお、Swingのアプリケーションで、少数のスレッドを作るのであれば、SwingWorkerが便利です。

Executorフレームワーク

java.util.concurrentパッケージは、いろいろなポリシーで複数のスレッドを実行する仕組みも持つExecutorフレームワークを提供しています。また、マルチスレッドで実行する処理(タスク)を記述する方法として、以下があります。

タスクの記述方法
名前 方法 インタフェース 処理結果の受け取り
Runnableタスク void run()メソッドに記述 Runnable なし
Callableタスク T call()メソッドに記述 Collable<T> call()メソッドの戻り値をFuture経由で取得

ExecutorインタフェースとRunnableタスク

マルチスレッドで実行させる処理は、従来同様Runnableインタフェースで記述できます。サンプルプログラムとして、一定範囲内の整数の中から素数を見つけだす処理を複数並行に実行させるものを作ってみます。

まず、プログラム全体を示し、主要な部分はその後で抜粋して説明をします。

PrimeGenerator.java
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public final class PrimeGenerator implements Runnable {

    private final long from; 
    private final long to;   

    public PrimeGenerator(final long from, final long to) {
        this.from = from;
        this.to = to;
    }

    public void run() {
        for (long i = from; i <= to; i++) {
            if (isPrime(i)) {
                System.out.print(i + ",");
            }
        }
    }

    private boolean isPrime(long number) {
        for (long i = 2; i < number - 1; i++) {
            if (number % i == 0) {
                return false;
            }
        }
        return true;
    }

    public static final void main(final String[] args) {
        final int numTasks = 10;
        Executor executor = Executors.newFixedThreadPool(numTasks);
        final long range = (long)Math.pow(10, 4);
        for (int i = 0; i < numTasks; i++) {
            final long from = i * range + 1;
            final long to = (i + 1) * range;
            executor.execute(new PrimeGenerator(from, to));
        }
    }
}

マルチスレッドで実行する処理は、Runnableインタフェースを実装するクラスPrimeGeneratorを定義し、run()メソッドに記述します。

Runnableの実装
    public void run() {
        for (long i = from; i <= to; i++) {
            if (isPrime(i)) {
                System.out.print(i + ",");
            }
        }
    }

別スレッドでタスクを実行させるExecutorオブジェクトは、Executorsクラスのstaticメソッドで生成します。スレッドポリシーに応じていくつかのメソッドが提供されています。

Executorの種類
Executorsクラスのstaticメソッド ポリシー
newCachedThreadPool() 既作成で余剰スレッドがあれば再利用し、なければスレッドを新規作成する
newFixedThreadPool() 固定数のスレッドをプールし再利用する
newScheduledThreadPool() 遅延実行・周期実行可能なスレッドプール
newSingleThreadExecutor() 単一のワーカースレッドを利用する
newSingleThreadScheduledExecutor() 遅延実行・周期実行可能な単一ワーカースレッドを利用する
  • 各メソッドの戻り値型は、実際にはExecutor型ではなくそのサブインタフェースであるExecutorServiceやScheduledExecutorServiceです。

このサンプルコードでは、固定数のスレッドプールを使用しています。

Executorの取得
        Executor executor = Executors.newFixedThreadPool(numTasks);

スレッドの実行には、Executorインタフェースのexecute()メソッドを呼びます。

Executorにタスクの実行依頼
            executor.execute(new PrimeGenerator(from, to));

mainメソッドが終了しない

Executorは、内部でスレッドを保持し続けているため、mainメソッドが終了しても、Javaプロセスは終了しません。終了させるには、後述するExecutorServiceインタフェースを使用します。

ExecutorインタフェースとCallableタスク

Runnableタスクの場合、処理の結果を受け渡す方法が提供されていません。前節のサンプルでは、run()メソッド内でprintするやり方でごまかしていますが、実用的なプログラムでは、結果を受け取りたいケースがあります。そのような場合、Executorフレームワークでは、Callableタスクを用いて結果を受け渡します。

まず、プログラム全体を示します。

PrimeGeneratorCallable.java
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ExecutionException;
import java.util.List;
import java.util.ArrayList;
import java.util.Collections;

public final class PrimeGeneratorCallable implements Callable<List<Long>> {

    private final long from; 
    private final long to;   

    public PrimeGeneratorCallable(final long from, final long to) {
        this.from = from;
        this.to = to;
    }

    public List<Long> call() {
        List<Long> primes = new ArrayList<Long>();
        for (long i = from; i <= to; i++) {
            if (isPrime(i)) {
                primes.add(i);
            }
        }
        return primes;
    }

    private boolean isPrime(long number) {
        for (long i = 2; i < number - 1; i++) {
            if (number % i == 0) {
                return false;
            }
        }
        return true;
    }

    public static final void main(final String[] args) {
        final int numTasks = 10;
        ExecutorService executor = Executors.newFixedThreadPool(numTasks);
        final long range = (long)Math.pow(10, 4);
        List<Future<List<Long>>> futures = new ArrayList<Future<List<Long>>>();
        for (int i = 0; i < numTasks; i++) {
            final long from = i * range + 1;
            final long to = (i + 1) * range;
            futures.add(executor.submit(new PrimeGeneratorCallable(from, to)));
        }

        executor.shutdown();

        List<Long> totalPrimes = new ArrayList<Long>();
        for (Future<List<Long>> future : futures) {
            try {
                List<Long> primes = future.get();
                totalPrimes.addAll(primes);
            } catch (InterruptedException e) {
                System.err.println("Cannot get result:" + e + ", continue");
                Thread.currentThread().interrupt();
                continue;
            } catch (ExecutionException e) {
                System.err.println("Cannot get result:" + e + ", continue");
                continue;
            }
        }
        Collections.sort(totalPrimes);
        for (Long prime : totalPrimes) {
            System.out.print(prime + ",");
        }
    }
}

マルチスレッドで実行し、結果を返す処理は、Collableインタフェースを実装するクラスPrimeGeneratorCallableを定義し、callメソッドに記述しています。

Callableの実装
    public List<Long> call() {
        List<Long> primes = new ArrayList<Long>();
        for (long i = from; i <= to; i++) {
            if (isPrime(i)) {
                primes.add(i);
            }
        }
        return primes;
    }

結果を戻り値で返しますが、その型は処理によって違いますから、ジェネリックスの型パラメータで戻り値型を指定します。このサンプルでは、素数のリストを返すためList<Long>型を指定しています。

Collableタスクを実行するには、Executorインタフェースではなく、ExecutorのサブインタフェースであるExecutorServiceインタフェースが必要です。なお、表「Executorの種類」で示したExecutor生成メソッドは戻り値型がExecutorServiceでもあるので、以下のように書くだけで取得できます。

Executorの取得
        ExecutorService executor = Executors.newFixedThreadPool(numTasks);

Collableタスクを実行し、後で結果を取得するために、ここではExecutorServiceインタフェースのsubmitメソッドを呼び出しています。

タスクの実行
            futures.add(executor.submit(new PrimeGeneratorCallable(from, to)));

submitメソッドは、Future型を戻り値として返します。ここで取得したFutureオブジェクトに対して、結果を取得したりタスクをキャンセルしたりの操作ができます。サンプルコードでは複数のタスクを実行しているので、それぞれのsubmit呼び出しの戻り値をコレクション(List<Future<Integer>>)に格納しています。

Collableタスクを全て実行したあと、ExecutorServiceのshutdown()メソッドを呼び出しています。shutdown()を呼ぶと、それ以降新たなタスクの実行依頼を拒否し、現在実行しているタスクがあればその終了を待ちます。

Executorインタフェースのときは、shutdownを呼ぶことができなかったので、タスクが完了していても、Executorの中で待機中のスレッドが存在し続けるため、mainメソッド終了ではJavaVMが終了しませんでした。ExecutorServiceでは、shutdownメソッドを呼ぶことで終了することが可能となります。

タスクの実行依頼が完了した後は、タスクの処理結果を取得します。

タスクの実行結果取得
        for (Future<List<Long>> future : futures) {
            try {
                List<Long> primes = future.get();
                totalPrimes.addAll(primes);
            } catch (InterruptedException e) {
                System.err.println("Cannot get result:" + e + ", continue");
                Thread.currentThread().interrupt();
                continue;
            } catch (ExecutionException e) {
                System.err.println("Cannot get result:" + e + ", continue");
                continue;
            }
        }

ExecutorServiceのsubmitメソッドの戻り値であるFutureオブジェクトのget()メソッドで、Callableタスクの実行結果を取得することができます。なお、タスクの実行が完了していなかった場合、getメソッドはその完了を待ちます(ブロッキング)。

getメソッドは、ブロッキングメソッドなので、チェック例外であるInterruptedExceptionおよびExecutionExceptionをスローする仕様です。チェック例外なので、呼出側ではエラー処理を記述します。InterruptedExceptionの対処については、以下記事が詳しいです。

まとめてCallableタスクを起動する

前節のサンプルでは、Callableタスクの起動に際して、必要なスレッド数だけループを回してsubmitメソッドを呼び出していました。ExecutorServiceインタフェースには、複数のタスクを一度に設定して実行するinvokeAllメソッドが提供されているので、これを使うと便利なことがあります。

複数Callableタスクをまとめて実行
    // Callableタスクを複数生成し、リストに格納する
    List<Callable<List<Long>>> tasks = new ArrayList<Callable<List<Long>>>();
    for (int i = 0; i < numTasks; i++) {
        final long from = i * range + 1;
        final long to = (i + 1) * range;
        tasks.add(new PrimeGeneratorCallable2(from, to));
    }

    // ExecutorServiceを取得し、Callableタスク(複数)を実行する
    ExecutorService executor = Executors.newFixedThreadPool(numTasks);
    try {
        List<Future<List<Long>>> futures = executor.invokeAll(tasks);
    } catch (InterruptedException e) {
        // タスク実行は中断し、完了したタスクの結果もとれない
    }

タスクをあらかじめ生成しておき、ExecutorServiceのinvokeAllメソッドにまとめて入力します。invokeAllは、Futureのリストを返却するので、そのあと順次各Futureからタスク実行結果を取得します。

複数Callableタスクを実行し、先に完了したタスクの結果を順次取得する

処理の終わったタスクの結果を順次受け取りさばいていきたい場合があります。前に挙げた方法では、タスクが完了したか途中かによらず、順番にFutureオブジェクトのget()メソッドを呼び出すので、get()メソッドを呼び出したFutureオブジェクトのタスクがまだ実行途中であれば、そのタスクが完了するまでブロックしてしまいます。FutureオブジェクトのisDone()メソッドで完了したことを検査してからget()メソッドを呼ぶこともできますが、ポーリング(ビジーウェイト)になるので弊害があります。

そこで、CompletionServiceを使用します。

複数Callableタスクを実行し、完了した結果から取得する
    ExecutorService executor = Executors.newFixedThreadPool(numTasks);
    CompletionService<List<Long>> completion = new ExecutorCompletionService<List<Long>>(executor);
    for (int i = 0; i < numTasks; i++) {
        final long from = i * range + 1;
        final long to = (i + 1) * range;
        completion.submit(new PrimeGeneratorCallable3(from, to));
    }

スレッドの実行エンジンは前と同じくExecutorServiceインスタンスで持ちますが、あらたにCompletionExecutorServiceでラップします。CompletionServiceは、実行完了したタスクのFutureを内部のキューに完了順に格納します。

複数Callableタスクを実行し、完了した結果から取得する
    for (int i = 0; i < numTasks; i++) {
        try {
            Future<List<Long>> future = completion.take();
            List<Long> primes = future.get();
            totalPrimes.addAll(primes);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            continue;
        } catch (ExecutionException e) {
            continue;
        }
    }

CompletionServiceインタフェースのtakeメソッドを呼び出し、完了したタスクのFutureオジェクトを取得します。完了したタスクがまだないときは、ブロックします。takeで取り出す順番は、タスクが完了した順番です。