1. 什么是任务编排
任务编排(Task Orchestration)是指将多个任务按照特定的依赖关系和执行顺序进行组织和管理的过程,以确保各个任务能够按照预定的逻辑顺序高效地执行。
2. 为什么需要任务编排
在复杂的业务场景中,任务间通常存在依赖关系,也就是某个任务会依赖另一个任务的执行结果,在这种情况下,我们需要通过任务编排,来确保任务按照正确的顺序进行执行。
在任务编排中,可以定义任务之间的依赖性、并行执行的策略、错误处理机制等,从而提高整体工作流的效率和可靠性。通过任务编排,可以实现复杂业务流程的自动化管理,优化资源利用,提高系统的灵活性与可维护性。
例如,以下任务的执行顺序:
其中,任务二要等任务一执行完才能执行,而任务四要等任务二和任务三全部执行完才能执行。
3. 任务编排实现
任务编排和控制的主要手段有以下:
- Future
- CompletableFuture
- CountDownLatch
- Semaphore
- CyclicBarrier
但如果是全局线程池,想要实现精准的任务编排,只能使用 Future 或 CompletableFuture。
3.1 Future 任务编排
使用 Future 实现上述 4 个任务的编排(任务二要等任务一执行完才能执行,而任务四要等任务二和任务三全部执行完才能执行)
import java.util.concurrent.*; import java.util.Arrays; public class TaskOrchestrator { public static void main(String[] args) { // 创建一个线程池来执行任务 ExecutorService executor = Executors.newFixedThreadPool(5); // 定义任务一 Future taskOneResult = executor.submit(new Callable() { @Override public String call() throws Exception { Thread.sleep(2000); // 模拟耗时操作 return "Task One Result"; } }); // 定义任务二,依赖任务一 Future taskTwoResult = executor.submit(new Callable() { @Override public String call() throws Exception { String result = taskOneResult.get(); // 阻塞等待任务一完成 Thread.sleep(1000); // 模拟耗时操作 return "Task Two Result, got: " + result; } }); // 定义任务三 Future taskThreeResult = executor.submit(new Callable() { @Override public String call() throws Exception { Thread.sleep(1500); // 模拟耗时操作 return "Task Three Result"; } }); // 定义任务四,依赖任务二和任务三 Future taskFourResult = executor.submit(new Callable() { @Override public String call() throws Exception { String taskTwoOutput = taskTwoResult.get(); // 阻塞等待任务二完成 String taskThreeOutput = taskThreeResult.get(); // 阻塞等待任务三完成 Thread.sleep(500); // 模拟耗时操作 return "Task Four Result, got: " + taskTwoOutput + " and " + taskThreeOutput; } }); // 打印最终结果 try { System.out.println("Final Result: " + taskFourResult.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } }
3.2 CompletableFuture 任务编排
CompletableFutrue 提供的方法有很多,但最常用和最实用的核心方法只有以下几个:
接下来,使用 CompletableFuture 实现上述 4 个任务的编排(任务二要等任务一执行完才能执行,而任务四要等任务二和任务三全部执行完才能执行):
import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; public class CompletableFutureExample { public static void main(String[] args) { // 任务一:返回 "Task 1 result" CompletableFuture task1 = CompletableFuture.supplyAsync(() -> { try { // 模拟耗时操作 Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } return "Task 1 result"; }); // 任务二:依赖任务一,返回 "Task 2 result" + 任务一的结果 CompletableFuture task2 = task1.handle((result1, throwable) -> { try { // 模拟耗时操作 Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } return "Task 2 result " + result1; }); // 任务三:和任务一、任务二并行执行,返回 "Task 3 result" CompletableFuture task3 = CompletableFuture.supplyAsync(() -> { try { // 模拟耗时操作 Thread.sleep(800); // 任务三可能比任务二先完成 } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } return "Task 3 result"; }); // 任务四:依赖任务二和任务三,等待它们都完成后执行,返回 "Task 4 result" + 任务二和任务三的结果 CompletableFuture task4 = CompletableFuture.allOf(task2, task3).handle((res, throwable) -> { try { // 这里不需要显式等待,因为 allOf 已经保证了它们完成 return "Task 4 result with " + task2.get() + " and " + task3.get(); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } }); // 获取任务四的结果并打印 String finalResult = task4.join(); System.out.println(finalResult); } }
到此这篇关于Java 实现线程池任务编排的示例代码的文章就介绍到这了,更多相关Java 线程池任务编排内容请搜索IT俱乐部以前的文章或继续浏览下面的相关文章希望大家以后多多支持IT俱乐部!