Java SE 6時点では、マルチスレッドプログラミングを行う素材として、当初から持つThreadクラスと、Java 5で追加されたjava.util.concurrentパッケージがあります。また、Swing GUI上でスレッドを扱うjavax.swing.SwingWorkerクラスもJava SE 6から追加されています。
|
||||||||||||||||||||||||||||||||
|
Threadクラスを使う方法は、自由度は大きいのですが、リソースの上限を制御することなどの機能は、原始的なAPIを駆使してがかなり低レベルな処理を書かなくてはならないので大変です。それなりにマルチスレッドプログラミングをするのであれば、Executorフレームワークを使うのが適していると思います。なお、Swingのアプリケーションで、少数のスレッドを作るのであれば、SwingWorkerが便利です。
java.util.concurrentパッケージは、いろいろなポリシーで複数のスレッドを実行する仕組みも持つExecutorフレームワークを提供しています。また、マルチスレッドで実行する処理(タスク)を記述する方法として、以下があります。
名前 | 方法 | インタフェース | 処理結果の受け取り |
---|---|---|---|
Runnableタスク | void run()メソッドに記述 | Runnable | なし |
Callableタスク | T call()メソッドに記述 | Collable<T> | call()メソッドの戻り値をFuture経由で取得 |
マルチスレッドで実行させる処理は、従来同様Runnableインタフェースで記述できます。サンプルプログラムとして、一定範囲内の整数の中から素数を見つけだす処理を複数並行に実行させるものを作ってみます。
まず、プログラム全体を示し、主要な部分はその後で抜粋して説明をします。
import java.util.concurrent.Executor; import java.util.concurrent.Executors; public final class PrimeGenerator implements Runnable { private final long from; private final long to; public PrimeGenerator(final long from, final long to) { this.from = from; this.to = to; } public void run() { for (long i = from; i <= to; i++) { if (isPrime(i)) { System.out.print(i + ","); } } } private boolean isPrime(long number) { for (long i = 2; i < number - 1; i++) { if (number % i == 0) { return false; } } return true; } public static final void main(final String[] args) { final int numTasks = 10; Executor executor = Executors.newFixedThreadPool(numTasks); final long range = (long)Math.pow(10, 4); for (int i = 0; i < numTasks; i++) { final long from = i * range + 1; final long to = (i + 1) * range; executor.execute(new PrimeGenerator(from, to)); } } }
マルチスレッドで実行する処理は、Runnableインタフェースを実装するクラスPrimeGeneratorを定義し、run()メソッドに記述します。
public void run() { for (long i = from; i <= to; i++) { if (isPrime(i)) { System.out.print(i + ","); } } }
別スレッドでタスクを実行させるExecutorオブジェクトは、Executorsクラスのstaticメソッドで生成します。スレッドポリシーに応じていくつかのメソッドが提供されています。
|
||||||||||||||
|
このサンプルコードでは、固定数のスレッドプールを使用しています。
Executor executor = Executors.newFixedThreadPool(numTasks);
スレッドの実行には、Executorインタフェースのexecute()メソッドを呼びます。
executor.execute(new PrimeGenerator(from, to));
Executorは、内部でスレッドを保持し続けているため、mainメソッドが終了しても、Javaプロセスは終了しません。終了させるには、後述するExecutorServiceインタフェースを使用します。
Runnableタスクの場合、処理の結果を受け渡す方法が提供されていません。前節のサンプルでは、run()メソッド内でprintするやり方でごまかしていますが、実用的なプログラムでは、結果を受け取りたいケースがあります。そのような場合、Executorフレームワークでは、Callableタスクを用いて結果を受け渡します。
まず、プログラム全体を示します。
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Callable; import java.util.concurrent.Future; import java.util.concurrent.ExecutionException; import java.util.List; import java.util.ArrayList; import java.util.Collections; public final class PrimeGeneratorCallable implements Callable<List<Long>> { private final long from; private final long to; public PrimeGeneratorCallable(final long from, final long to) { this.from = from; this.to = to; } public List<Long> call() { List<Long> primes = new ArrayList<Long>(); for (long i = from; i <= to; i++) { if (isPrime(i)) { primes.add(i); } } return primes; } private boolean isPrime(long number) { for (long i = 2; i < number - 1; i++) { if (number % i == 0) { return false; } } return true; } public static final void main(final String[] args) { final int numTasks = 10; ExecutorService executor = Executors.newFixedThreadPool(numTasks); final long range = (long)Math.pow(10, 4); List<Future<List<Long>>> futures = new ArrayList<Future<List<Long>>>(); for (int i = 0; i < numTasks; i++) { final long from = i * range + 1; final long to = (i + 1) * range; futures.add(executor.submit(new PrimeGeneratorCallable(from, to))); } executor.shutdown(); List<Long> totalPrimes = new ArrayList<Long>(); for (Future<List<Long>> future : futures) { try { List<Long> primes = future.get(); totalPrimes.addAll(primes); } catch (InterruptedException e) { System.err.println("Cannot get result:" + e + ", continue"); Thread.currentThread().interrupt(); continue; } catch (ExecutionException e) { System.err.println("Cannot get result:" + e + ", continue"); continue; } } Collections.sort(totalPrimes); for (Long prime : totalPrimes) { System.out.print(prime + ","); } } }
マルチスレッドで実行し、結果を返す処理は、Collableインタフェースを実装するクラスPrimeGeneratorCallableを定義し、callメソッドに記述しています。
public List<Long> call() { List<Long> primes = new ArrayList<Long>(); for (long i = from; i <= to; i++) { if (isPrime(i)) { primes.add(i); } } return primes; }
結果を戻り値で返しますが、その型は処理によって違いますから、ジェネリックスの型パラメータで戻り値型を指定します。このサンプルでは、素数のリストを返すためList<Long>型を指定しています。
Collableタスクを実行するには、Executorインタフェースではなく、ExecutorのサブインタフェースであるExecutorServiceインタフェースが必要です。なお、表「Executorの種類」で示したExecutor生成メソッドは戻り値型がExecutorServiceでもあるので、以下のように書くだけで取得できます。
ExecutorService executor = Executors.newFixedThreadPool(numTasks);
Collableタスクを実行し、後で結果を取得するために、ここではExecutorServiceインタフェースのsubmitメソッドを呼び出しています。
futures.add(executor.submit(new PrimeGeneratorCallable(from, to)));
submitメソッドは、Future型を戻り値として返します。ここで取得したFutureオブジェクトに対して、結果を取得したりタスクをキャンセルしたりの操作ができます。サンプルコードでは複数のタスクを実行しているので、それぞれのsubmit呼び出しの戻り値をコレクション(List<Future<Integer>>)に格納しています。
Collableタスクを全て実行したあと、ExecutorServiceのshutdown()メソッドを呼び出しています。shutdown()を呼ぶと、それ以降新たなタスクの実行依頼を拒否し、現在実行しているタスクがあればその終了を待ちます。
Executorインタフェースのときは、shutdownを呼ぶことができなかったので、タスクが完了していても、Executorの中で待機中のスレッドが存在し続けるため、mainメソッド終了ではJavaVMが終了しませんでした。ExecutorServiceでは、shutdownメソッドを呼ぶことで終了することが可能となります。
タスクの実行依頼が完了した後は、タスクの処理結果を取得します。
for (Future<List<Long>> future : futures) { try { List<Long> primes = future.get(); totalPrimes.addAll(primes); } catch (InterruptedException e) { System.err.println("Cannot get result:" + e + ", continue"); Thread.currentThread().interrupt(); continue; } catch (ExecutionException e) { System.err.println("Cannot get result:" + e + ", continue"); continue; } }
ExecutorServiceのsubmitメソッドの戻り値であるFutureオブジェクトのget()メソッドで、Callableタスクの実行結果を取得することができます。なお、タスクの実行が完了していなかった場合、getメソッドはその完了を待ちます(ブロッキング)。
getメソッドは、ブロッキングメソッドなので、チェック例外であるInterruptedExceptionおよびExecutionExceptionをスローする仕様です。チェック例外なので、呼出側ではエラー処理を記述します。InterruptedExceptionの対処については、以下記事が詳しいです。
前節のサンプルでは、Callableタスクの起動に際して、必要なスレッド数だけループを回してsubmitメソッドを呼び出していました。ExecutorServiceインタフェースには、複数のタスクを一度に設定して実行するinvokeAllメソッドが提供されているので、これを使うと便利なことがあります。
// Callableタスクを複数生成し、リストに格納する List<Callable<List<Long>>> tasks = new ArrayList<Callable<List<Long>>>(); for (int i = 0; i < numTasks; i++) { final long from = i * range + 1; final long to = (i + 1) * range; tasks.add(new PrimeGeneratorCallable2(from, to)); } // ExecutorServiceを取得し、Callableタスク(複数)を実行する ExecutorService executor = Executors.newFixedThreadPool(numTasks); try { List<Future<List<Long>>> futures = executor.invokeAll(tasks); } catch (InterruptedException e) { // タスク実行は中断し、完了したタスクの結果もとれない }
タスクをあらかじめ生成しておき、ExecutorServiceのinvokeAllメソッドにまとめて入力します。invokeAllは、Futureのリストを返却するので、そのあと順次各Futureからタスク実行結果を取得します。
処理の終わったタスクの結果を順次受け取りさばいていきたい場合があります。前に挙げた方法では、タスクが完了したか途中かによらず、順番にFutureオブジェクトのget()メソッドを呼び出すので、get()メソッドを呼び出したFutureオブジェクトのタスクがまだ実行途中であれば、そのタスクが完了するまでブロックしてしまいます。FutureオブジェクトのisDone()メソッドで完了したことを検査してからget()メソッドを呼ぶこともできますが、ポーリング(ビジーウェイト)になるので弊害があります。
そこで、CompletionServiceを使用します。
ExecutorService executor = Executors.newFixedThreadPool(numTasks); CompletionService<List<Long>> completion = new ExecutorCompletionService<List<Long>>(executor); for (int i = 0; i < numTasks; i++) { final long from = i * range + 1; final long to = (i + 1) * range; completion.submit(new PrimeGeneratorCallable3(from, to)); }
スレッドの実行エンジンは前と同じくExecutorServiceインスタンスで持ちますが、あらたにCompletionExecutorServiceでラップします。CompletionServiceは、実行完了したタスクのFutureを内部のキューに完了順に格納します。
for (int i = 0; i < numTasks; i++) { try { Future<List<Long>> future = completion.take(); List<Long> primes = future.get(); totalPrimes.addAll(primes); } catch (InterruptedException e) { Thread.currentThread().interrupt(); continue; } catch (ExecutionException e) { continue; } }
CompletionServiceインタフェースのtakeメソッドを呼び出し、完了したタスクのFutureオジェクトを取得します。完了したタスクがまだないときは、ブロックします。takeで取り出す順番は、タスクが完了した順番です。