并发提速方案
hi,我是阿昌
,这里记录一下并发提速的方案之一,通过线程池
;
在开发的过程中会在部分场景去使用到并发执行的情况。
举例场景:针对任务a、b、c的情况下
任务间独立执行
;a、b、c三个任务执行的逻辑相同或不同,但相互不影响
任务间结果归并
;a、b、c三个任务执行的逻辑相同或不同,但执行的返回结果是一直的,在外层需要归并收集结果,下层需要针对3任务的归并结果继续执行逻辑;
在业务和技术允许的情况下,想要使用并发多线程去提速,那可以使用如下的建议来提速;
那核心的思路是分而治之
,针对不关联的业务进行并发执行,看需求是否需要最后收集结果然后再执行下面业务;
多线程工具类
那这个优化的点还是基于CompletableFuture
去多线程优化;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
| public class ConcurrentExecutor<R> {
public final ExecutorService executorService;
private int permits;
public static final int DEFAULT_PERMITS = 4;
private final Semaphore semaphore;
public static final int SEMAPHORE_TIMEOUT_MILL_SEC = 100;
private List<R> syncExecuteResult = new ArrayList<>();
private List<CompletableFuture<R>> futureList = new ArrayList<>();
public ConcurrentExecutor(int permits, ExecutorService executorService) { this.permits = permits; this.semaphore = new Semaphore(permits); this.executorService = executorService; }
public static <R> ConcurrentExecutor<R> newConService(int permits, ExecutorService executorService) { return new ConcurrentExecutor<>(permits, executorService); }
public static <R> ConcurrentExecutor<R> newConService(ExecutorService executorService) { return new ConcurrentExecutor<>(ConcurrentExecutor.DEFAULT_PERMITS, executorService); }
public ConcurrentExecutor<R> submit(Supplier<R> supplier) { boolean acquire; try { acquire = semaphore.tryAcquire(SEMAPHORE_TIMEOUT_MILL_SEC, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { acquire = false; } if (acquire) { CompletableFuture<R> future = CompletableFuture.supplyAsync(supplier, executorService); future.whenComplete((t, throwable) -> semaphore.release()); futureList.add(future); } else { syncExecuteResult.add(supplier.get()); } return this; }
public CompletableFuture<List<R>> invokeAllFuture() { return CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])) .thenApply(e -> futureList.stream() .map(CompletableFuture::join) .collect(Collectors.toList())) .whenComplete((rs, throwable) -> { if (rs != null && syncExecuteResult != null && syncExecuteResult.size() > 0) { rs.addAll(syncExecuteResult); } }); }
public List<R> invokeAll() { return invokeAllFuture().join(); } }
|
如下调用代码:
1 2 3 4 5 6 7 8 9 10 11
| ExecutorService executorService = getExecutor();
ConcurrentExecutor<Void> executor = new ConcurrentExecutor<>(5, executorService);
executor.submit(() -> { System.out.println("业务执行"); return null; });
List<Void> voids = executor.invokeAll();
|
在如上的代码中可以执行信号量
来控制执行的速率,同时也可以多线程去异步执行任务,在最后也能同意返回结果,或在外层定义容器类统一收集执行结果
CompletionService
案例演示
juc里面已提供好的针对如上场景的解决service;其实跟我们上面的工具类是相似的;
利用 CompletionService 可以也可以实现并发能力,而且还能让代码更简练。
CompletionService
接口的实现类是 ExecutorCompletionService,这个实现类的构造方法有两个,分别是:
- ExecutorCompletionService(Executor executor);
- ExecutorCompletionService(Executor executor, BlockingQueue> completionQueue)。
这两个构造方法都需要传入一个线程池,如果不指定 completionQueue,那么默认会使用无界的 LinkedBlockingQueue。任务执行结果的 Future 对象就是加入到 completionQueue 中。
下面的示例代码完整地展示了如何利用 CompletionService 来实现高性能的询价系统。其中,没有指定 completionQueue,因此默认使用无界的 LinkedBlockingQueue。
之后通过 CompletionService 接口提供的 submit() 方法提交了三个询价操作,这三个询价操作将会被 CompletionService 异步执行。
最后,通过 CompletionService 接口提供的 take() 方法获取一个 Future 对象(之前提到过,加入到阻塞队列中的是任务执行结果的 Future 对象),调用 Future 对象的 get() 方法就能返回询价操作的执行结果了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| ExecutorService executor = Executors.newFixedThreadPool(3);
CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);
cs.submit(()->getPriceByS1());
cs.submit(()->getPriceByS2());
cs.submit(()->getPriceByS3());
for (int i=0; i<3; i++) { Integer r = cs.take().get(); executor.execute(()->save(r)); }
|
接口说明
那介绍一下 CompletionService 接口提供的方法,CompletionService 接口提供的方法有 5 个,这 5 个方法的方法签名如下所示。
1 2 3 4 5
| Future<V> submit(Callable<V> task); Future<V> submit(Runnable task, V result); Future<V> take() throws InterruptedException; Future<V> poll(); Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
|
提交任务:
submit()
相关的方法有两个。
- 一个方法参数是Callable task,前面利用 CompletionService 实现询价系统的示例代码中,提交任务就是用的它。
- 另外一个方法有两个参数,分别是Runnable task和V result,这个方法类似于 ThreadPoolExecutor 的 Future submit(Runnable task, T result)
获取结果:
CompletionService 接口其余的 3 个方法,都是和阻塞队列相关的,take()、poll() 都是从阻塞队列中获取并移除一个元素;
它们的区别在于如果阻塞队列是空的,那么调用 take() 方法的线程会被阻塞,而 poll() 方法会返回 null 值。
poll(long timeout, TimeUnit unit) 方法支持以超时的方式获取并移除阻塞队列头部的一个元素,如果等待了 timeout unit 时间,阻塞队列还是空的,那么该方法会返回 null 值。
小结
自实现的工具类借助CompletableFuture
执行多线程并发处理业务,处理的途中也可以阻塞,同时也支持信号量控制并发度来避免打高并发度情况
CompletionService 将线程池 Executor 和阻塞队列 BlockingQueue 的功能融合在了一起,能够让批量异步任务的管理更简单。
除此之外,CompletionService 能够让异步任务的执行结果有序化,先执行完的先进入阻塞队列,利用这个特性,可以轻松实现后续处理的有序性,避免无谓的等待,同时还可以快速实现诸如 Forking Cluster 这样的需求。
CompletionService 的实现类 ExecutorCompletionService,需要自己创建线程池,虽看上去有些啰嗦,但好处是可以让多个 ExecutorCompletionService 的线程池业务隔离,这种隔离性能避免几个特别耗时的任务拖垮整个应用的风险。
那以上其实都能针对处理并发处理的场景,看业务使用哪一个方式,有相似的地方也有不相似的,具体业务场景使用具体的业务;
感谢您能看到这里!