Java 8 引入了 CompletableFuture 类,这是 Java 异步编程的一个重要进展。
CompletableFuture 提供了一种基于未来结果的异步编程模型,允许你以更加直观和易于理解的方式编写非阻塞代码。
CompletableFuture 主要用于:
特点:
使用示例:
1 2 3 4 5 6 |
public static void main(String[] args) { Thread thread = new Thread(() -> { System.out.println(Thread.currentThread().getName() + " is running..."); }); thread.start(); } |
特点:
使用示例:
有返回值:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService executor = Executors.newFixedThreadPool(2); // 创建线程池
Callable<Integer> task = () -> { Thread.sleep(1000); return 42; };
Future<Integer> result = executor.submit(task); // 提交任务并获得 Future 对象 System.out.println("Task result: " + result.get()); // 获取结果
executor.shutdown(); // 关闭线程池 } |
无返回值:
1 2 3 4 5 6 7 8 9 10 11 |
public static void main(String[] args) throws InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(2); // 创建线程池
Runnable task = () -> { System.out.println(Thread.currentThread().getName() + " is running..."); };
executor.execute(task); // 提交任务
executor.shutdown(); // 关闭线程池 } |
特点:
使用示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
public static void main(String[] args) throws InterruptedException { int totalThreads = 3; CountDownLatch latch = new CountDownLatch(totalThreads); // 初始化计数器为3
Runnable task = () -> { try { Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + " finished."); } catch (InterruptedException e) { e.printStackTrace(); } finally { latch.countDown(); // 每个线程完成后减少计数器 } };
// 启动多个线程 for (int i = 0; i < totalThreads; i++) { new Thread(task).start(); }
latch.await(); // 等待计数器归零 System.out.println("All tasks are finished."); } |
特点:
使用示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
public static void main(String[] args) throws InterruptedException { int totalThreads = 3; CyclicBarrier barrier = new CyclicBarrier(totalThreads, () -> { System.out.println("All threads reached the barrier point, proceeding..."); });
Runnable task = () -> { try { Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + " reached the barrier."); barrier.await(); // 等待其他线程到达屏障点 } catch (Exception e) { e.printStackTrace(); } };
// 启动多个线程 for (int i = 0; i < totalThreads; i++) { new Thread(task).start(); } } |
特点:
使用示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
import java.util.concurrent.*;
public class ForkJoinPoolExample { public static void main(String[] args) { ForkJoinPool pool = new ForkJoinPool(); // 创建 ForkJoinPool int[] array = {1, 2, 3, 4, 5, 6, 7, 8}; RecursiveTask<Integer> task = new SumTask(array, 0, array.length); int result = pool.invoke(task); // 执行任务并获取结果 System.out.println("Sum is: " + result); } }
class SumTask extends RecursiveTask<Integer> { private int[] array; private int start, end;
public SumTask(int[] array, int start, int end) { this.array = array; this.start = start; this.end = end; }
@Override protected Integer compute() { if (end - start <= 2) { // 基础情况 int sum = 0; for (int i = start; i < end; i++) { sum += array[i]; } return sum; } else { int mid = (start + end) / 2; SumTask task1 = new SumTask(array, start, mid); SumTask task2 = new SumTask(array, mid, end); task1.fork(); // 异步执行 task2.fork(); return task1.join() + task2.join(); // 合并结果 } } } |
特点:
使用示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return 42; });
// 链式调用,处理结果 CompletableFuture<Integer> result = future.thenApplyAsync(value -> value * 2);
System.out.println("Result: " + result.get()); // 输出结果 } |
并发方式 | 特点 | 优点 | 缺点 |
---|---|---|---|
Thread | - 最基本的线程创建方式- 通过继承Thread 或实现Runnable 接口创建任务 | - 简单直观 | - 需要手动管理线程,容易资源浪费或死锁- 无法直接返回任务结果- 对复杂任务协调不便 |
ExecutorService | - 通过线程池管理线程- 提供任务的调度、执行、生命周期管理 | - 提供线程池避免手动创建和销毁线程,减少资源浪费- 支持任务的结果返回 | - 任务间依赖和组合较复杂-get() 方法阻塞线程,难以实现非阻塞 |
CountDownLatch | - 用于等待多个任务完成后执行后续操作- 使用计数器控制任务执行 | - 可以控制任务同步,确保多个任务完成后继续执行 | - 只适用于等待任务完成,无法处理任务的依赖关系- 只能使用一次 |
CyclicBarrier | - 用于多个线程在某一点上等待- 可重复使用,适合同步多任务 | - 可重复使用,适合多次任务同步 | - 不如CompletableFuture 灵活- 仅适合特定的同步场景 |
ForkJoinPool | - 专为递归分治任务设计的线程池- 支持任务拆分和合并 | - 高效利用多核处理器,适合分治算法- 支持任务拆分和合并 | - 对于非递归任务不适合- 异常处理不如CompletableFuture 灵活 |
CompletableFuture | - 基于Future 设计的异步编程API- 支持非阻塞的任务组合和回调处理 | - 支持链式调用,异步任务组合,避免阻塞- 可以处理异常,支持并行处理和同步等待- 支持thenApply、thenAccept 等多种处理方式,简化代码 | - 复杂任务时调试困难- 异常处理仍较为复杂- 比ExecutorService 稍显复杂 |
CompletableFuture实现了Future接口和CompletionStage接口。
相关接口 | 描述 |
---|---|
Future | 是一个表示异步计算结果的接口。它提供了方法来检查异步计算是否完成、获取计算的结果以及取消计算。 |
CompletionStage | 是一个表示异步计算结果的接口,提供了处理计算结果的非阻塞操作。与 Future 不同,CompletionStage 采用链式调用,可以更灵活地组合多个异步操作。 |
Future接口是JDK 5引入的,该接口属于java.util.concurrent包。
Future接口的目的是表示异步计算的结果,它允许你提交一个任务给一个 Executor(执行器),并在稍后获取任务的结果。
主要方法:
方法 | 描述 |
---|---|
get() | 阻塞当前线程,直到异步计算完成,并返回计算结果 |
get(long timeout, TimeUnit unit) | 阻塞当前线程,直到异步计算完成或超时,并返回计算结果 |
isDone() | 检查异步计算是否完成 |
cancel(boolean mayInterruptIfRunning) | 尝试取消异步计算 |
isCancelled() | 检查异步计算是否被取消。 |
CompletionStage 接口是 Java 8 引入的一个重要接口,用于描述异步计算的生命周期和结果。
CompletionStage 提供了一套方法,用于处理异步计算的结果、组合多个计算、处理异常等。
主要方法:
方法 | 描述 |
---|---|
thenApply | 在当前阶段完成后,应用给定的 Function,并返回一个新的 CompletionStage。 |
thenAcceptAsync | 异步地执行指定的 Consumer,并返回一个新的 CompletionStage,该阶段没有结果。 |
thenComposeAsync | 异步地将当前阶段的结果应用于一个返回 CompletionStage 的函数,并返回一个新的 CompletionStage。 |
thenCombine | 在两个 CompletionStage 都完成后,使用给定的 BiFunction 合并它们的结果,并返回一个新的 CompletionStage。 |
runAfterEitherAsync | 在任意一个给定的两个 CompletionStage 完成后,异步地执行指定的 Runnable。 |
thenAccept | 在当前阶段完成后,执行指定的 Consumer,并返回一个新的 CompletionStage,该阶段没有结果。 |
runAfterEither | 在任意一个给定的两个 CompletionStage 完成后,执行指定的 Runnable。 |
thenCombineAsync | 在两个 CompletionStage 都完成后,异步地使用给定的 BiFunction 合并它们的结果,并返回一个新的 CompletionStage。 |
thenAcceptBothAsync | 在两个 CompletionStage 都完成后,异步地执行指定的 BiConsumer,并返回一个新的 CompletionStage。 |
applyToEither | 在两个 CompletionStage 中任意一个完成后,应用给定的 Function,并返回一个新的 CompletionStage。 |
applyToEitherAsync | 在两个 CompletionStage 中任意一个完成后,异步地应用给定的 Function,并返回一个新的 CompletionStage。 |
runAfterBothAsync | 在两个 CompletionStage 都完成后,异步地执行指定的 Runnable,并返回一个新的 CompletionStage。 |
thenAcceptBothAsync | 在两个 CompletionStage 都完成后,异步地执行指定的 BiConsumer。 |
acceptEitherAsync | 在两个 CompletionStage 中任意一个完成后,异步地执行指定的 Consumer,并返回一个新的 CompletionStage。 |
handleAsync | 异步地处理当前阶段的结果或异常,应用给定的 BiFunction,并返回一个新的 CompletionStage。 |
thenComposeAsync | 同 thenCompose,但异步地应用给定的函数,并返回一个新的 CompletionStage。 |
thenCombineAsync | 同 thenCombine,但异步地使用给定的 BiFunction 合并两个 CompletionStage 的结果。 |
exceptionally | 如果当前阶段以异常完成,则应用指定的 Function 处理该异常,并返回一个新的 CompletionStage。 |
acceptEither | 在两个 CompletionStage 中任意一个完成后,执行指定的 Consumer。 |
thenCompose | 将当前阶段的结果应用于一个返回 CompletionStage 的函数,并返回一个新的 CompletionStage。 |
handle | 处理当前阶段的结果或异常,应用给定的 BiFunction,并返回一个新的 CompletionStage。 |
thenAcceptBoth | 在两个 CompletionStage 都完成后,执行指定的 BiConsumer。 |
thenApplyAsync | 异步地应用给定的 Function,并返回一个新的 CompletionStage。 |
whenCompleteAsync | 异步地执行指定的 BiConsumer,无论结果如何,并返回一个新的 CompletionStage。 |
applyToEitherAsync | 同 applyToEither,但异步地应用给定的 Function,并返回一个新的 CompletionStage。 |
acceptEitherAsync | 同 acceptEither,但异步地执行指定的 Consumer,并返回一个新的 CompletionStage。 |
runAfterEitherAsync | 同 runAfterEither,但异步地执行指定的 Runnable,并返回一个新的 CompletionStage。 |
thenRunAsync | 异步地执行指定的 Runnable,并返回一个新的 CompletionStage,该阶段没有结果。 |
runAfterBoth | 在两个 CompletionStage 都完成后,执行指定的 Runnable。 |
whenComplete | 在当前阶段完成后,无论结果如何,执行指定的 BiConsumer,并返回一个新的 CompletionStage。 |
thenRunAsync | 异步地执行指定的 Runnable,并返回一个新的 CompletionStage,该阶段没有结果。 |
方法 | 描述 |
---|---|
supplyAsync() | 异步地运行一个带返回值的任务。 |
runAsync() | 异步地运行一个无返回值的任务。 |
thenApply() | 当 CompletableFuture 任务完成时执行某个操作,并返回新的结果。 |
thenAccept() | 当任务完成时执行某个操作,但不返回结果。 |
thenRun() | 当任务完成时执行某个操作,无需返回结果。 |
exceptionally() | 用于处理任务执行中发生的异常。 |
handle() | 处理任务执行中的正常结果或异常结果。 |
allOf() | 等待多个 CompletableFuture 全部完成,返回一个新的 CompletableFuture。 |
anyOf() | 等待多个 CompletableFuture 中的任意一个完成。 |
CompletableFuture.supplyAsync() 和 CompletableFuture.runAsync() 是最常用的启动异步任务的方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
public static void main(String[] args) throws ExecutionException, InterruptedException { // 带返回值的异步任务 CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); // 模拟耗时任务 } catch (InterruptedException e) { e.printStackTrace(); } return 42; // 返回结果 });
// 获取异步任务的结果 Integer result = future.get(); // 阻塞,直到任务完成 System.out.println("Result: " + result); } |
通过 thenApply(), thenAccept(), thenRun() 等方法,可以将多个异步任务串联在一起。
1 2 3 4 5 6 7 8 9 10 11 12 13 |
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { return 42; // 返回结果 });
// 链式调用,先处理结果,再转换 CompletableFuture<Integer> resultFuture = future .thenApply(value -> value * 2) // 将值乘以2 .thenApply(value -> value + 10); // 再加10
Integer result = resultFuture.get(); // 获取最终结果 System.out.println("Final Result: " + result); // 输出 94 } |
使用 thenCombine()、thenCompose()、allOf() 和 anyOf() 等方法可以组合多个异步任务,执行复杂的操作。
示例1:组合两个异步任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> { return 10; });
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> { return 20; });
// 合并两个任务的结果 CompletableFuture<Integer> combinedFuture = future1 .thenCombine(future2, (result1, result2) -> result1 + result2); // 将两个结果相加
Integer result = combinedFuture.get(); // 获取最终结果 System.out.println("Combined Result: " + result); // 输出 30 } |
示例2:使用 allOf() 等待多个任务完成
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> { try { Thread.sleep(1000); System.out.println("Task 1 completed"); } catch (InterruptedException e) { e.printStackTrace(); } });
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> { try { Thread.sleep(1500); System.out.println("Task 2 completed"); } catch (InterruptedException e) { e.printStackTrace(); } });
// 等待多个任务全部完成 CompletableFuture.allOf(future1, future2).join();
System.out.println("All tasks are completed."); } |
在异步任务中,异常可能会发生。CompletableFuture 提供了 exceptionally() 和 handle() 方法来处理异常。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { if (true) { throw new RuntimeException("Something went wrong!"); } return 42; });
// 使用 exceptionally 处理异常并提供默认值 CompletableFuture<Integer> resultFuture = future.exceptionally(ex -> { System.out.println("Exception occurred: " + ex.getMessage()); return -1; // 返回默认值 });
Integer result = resultFuture.get(); // 获取结果 System.out.println("Result: " + result); // 输出 -1 } |
使用 CompletableFuture.supplyAsync() 或 runAsync() 来并行执行多个任务,然后使用 allOf() 或 anyOf() 等方法等待这些任务的完成。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); return 1; } catch (InterruptedException e) { return 0; } });
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(500); return 2; } catch (InterruptedException e) { return 0; } });
// 等待所有任务完成并合并结果 CompletableFuture<Integer> result = future1 .thenCombine(future2, (res1, res2) -> res1 + res2); // 将两个结果相加
System.out.println("Combined result: " + result.get()); // 输出 3 } |
通过 thenApply() 等方法可以对异步任务的结果进行转换处理。
1 2 3 4 5 6 7 8 |
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 10);
// 转换结果:将值乘以2 CompletableFuture<Integer> transformedFuture = future.thenApply(value -> value * 2);
System.out.println("Transformed Result: " + transformedFuture.get()); // 输出 20 } |