并发提速方案
阿昌 Java小菜鸡

并发提速方案

hi,我是阿昌,这里记录一下并发提速的方案之一,通过线程池

在开发的过程中会在部分场景去使用到并发执行的情况。

举例场景:针对任务a、b、c的情况下

  1. 任务间独立执行;a、b、c三个任务执行的逻辑相同或不同,但相互不影响
  2. 任务间结果归并;a、b、c三个任务执行的逻辑相同或不同,但执行的返回结果是一直的,在外层需要归并收集结果,下层需要针对3任务的归并结果继续执行逻辑;

在业务和技术允许的情况下,想要使用并发多线程去提速,那可以使用如下的建议来提速;

那核心的思路是分而治之,针对不关联的业务进行并发执行,看需求是否需要最后收集结果然后再执行下面业务;

多线程工具类

那这个优化的点还是基于CompletableFuture去多线程优化;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
public class ConcurrentExecutor<R> {

public final ExecutorService executorService;

/**
* 信号量
*/
private int permits;

/**
* 默认信号量大小
*/
public static final int DEFAULT_PERMITS = 4;

/**
* 信号量实例
*/
private final Semaphore semaphore;

public static final int SEMAPHORE_TIMEOUT_MILL_SEC = 100;
/**
* 需要异步执行的任务
*/
private List<R> syncExecuteResult = new ArrayList<>();

private List<CompletableFuture<R>> futureList = new ArrayList<>();

public ConcurrentExecutor(int permits, ExecutorService executorService) {
this.permits = permits;
this.semaphore = new Semaphore(permits);//并发数;控制
this.executorService = executorService;
}

public static <R> ConcurrentExecutor<R> newConService(int permits, ExecutorService executorService) {
return new ConcurrentExecutor<>(permits, executorService);
}

public static <R> ConcurrentExecutor<R> newConService(ExecutorService executorService) {
return new ConcurrentExecutor<>(ConcurrentExecutor.DEFAULT_PERMITS, executorService);
}

/**
* 提交单个任务
*
* @return
*/
public ConcurrentExecutor<R> submit(Supplier<R> supplier) {
boolean acquire;
try {
acquire = semaphore.tryAcquire(SEMAPHORE_TIMEOUT_MILL_SEC, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
acquire = false;
}
if (acquire) {
CompletableFuture<R> future = CompletableFuture.supplyAsync(supplier, executorService);
future.whenComplete((t, throwable) -> semaphore.release());
futureList.add(future);
} else {
syncExecuteResult.add(supplier.get());
}
return this;
}

/**
* 等待所有任务结束,无超时
*
* @return CompletableFuture<List < R>>
*/
public CompletableFuture<List<R>> invokeAllFuture() {
return CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]))
.thenApply(e -> futureList.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()))
.whenComplete((rs, throwable) -> {
if (rs != null && syncExecuteResult != null && syncExecuteResult.size() > 0) {
rs.addAll(syncExecuteResult);
}
});
}

public List<R> invokeAll() {
return invokeAllFuture().join();
}
}

如下调用代码:

1
2
3
4
5
6
7
8
9
10
11
//获取线程池
ExecutorService executorService = getExecutor();
//创建并发执行工具类
ConcurrentExecutor<Void> executor = new ConcurrentExecutor<>(5, executorService);
//提交任务
executor.submit(() -> {
System.out.println("业务执行");
return null;
});
//阻塞并等待提交任务执行完毕
List<Void> voids = executor.invokeAll();

在如上的代码中可以执行信号量来控制执行的速率,同时也可以多线程去异步执行任务,在最后也能同意返回结果,或在外层定义容器类统一收集执行结果


CompletionService

案例演示

juc里面已提供好的针对如上场景的解决service;其实跟我们上面的工具类是相似的;

利用 CompletionService 可以也可以实现并发能力,而且还能让代码更简练。

CompletionService 接口的实现类是 ExecutorCompletionService,这个实现类的构造方法有两个,分别是:

  • ExecutorCompletionService(Executor executor);
  • ExecutorCompletionService(Executor executor, BlockingQueue> completionQueue)。

这两个构造方法都需要传入一个线程池,如果不指定 completionQueue,那么默认会使用无界的 LinkedBlockingQueue。任务执行结果的 Future 对象就是加入到 completionQueue 中。

下面的示例代码完整地展示了如何利用 CompletionService 来实现高性能的询价系统。其中,没有指定 completionQueue,因此默认使用无界的 LinkedBlockingQueue。

之后通过 CompletionService 接口提供的 submit() 方法提交了三个询价操作,这三个询价操作将会被 CompletionService 异步执行。

最后,通过 CompletionService 接口提供的 take() 方法获取一个 Future 对象(之前提到过,加入到阻塞队列中的是任务执行结果的 Future 对象),调用 Future 对象的 get() 方法就能返回询价操作的执行结果了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 创建CompletionService
CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);
// 异步向S1询价
cs.submit(()->getPriceByS1());
// 异步向S2询价
cs.submit(()->getPriceByS2());
// 异步向S3询价
cs.submit(()->getPriceByS3());
// 将询价结果异步保存到数据库
for (int i=0; i<3; i++) {
Integer r = cs.take().get();
executor.execute(()->save(r));
}

接口说明

那介绍一下 CompletionService 接口提供的方法,CompletionService 接口提供的方法有 5 个,这 5 个方法的方法签名如下所示。

1
2
3
4
5
Future<V> submit(Callable<V> task);
Future<V> submit(Runnable task, V result);
Future<V> take() throws InterruptedException;
Future<V> poll();
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;

提交任务:

submit() 相关的方法有两个。

  • 一个方法参数是Callable task,前面利用 CompletionService 实现询价系统的示例代码中,提交任务就是用的它。
  • 另外一个方法有两个参数,分别是Runnable task和V result,这个方法类似于 ThreadPoolExecutor 的 Future submit(Runnable task, T result)

获取结果:

CompletionService 接口其余的 3 个方法,都是和阻塞队列相关的,take()、poll() 都是从阻塞队列中获取并移除一个元素;

它们的区别在于如果阻塞队列是空的,那么调用 take() 方法的线程会被阻塞,而 poll() 方法会返回 null 值。

poll(long timeout, TimeUnit unit) 方法支持以超时的方式获取并移除阻塞队列头部的一个元素,如果等待了 timeout unit 时间,阻塞队列还是空的,那么该方法会返回 null 值。


小结

  • 自实现的工具类借助CompletableFuture执行多线程并发处理业务,处理的途中也可以阻塞,同时也支持信号量控制并发度来避免打高并发度情况

  • CompletionService 将线程池 Executor 和阻塞队列 BlockingQueue 的功能融合在了一起,能够让批量异步任务的管理更简单。

    除此之外,CompletionService 能够让异步任务的执行结果有序化,先执行完的先进入阻塞队列,利用这个特性,可以轻松实现后续处理的有序性,避免无谓的等待,同时还可以快速实现诸如 Forking Cluster 这样的需求。

    CompletionService 的实现类 ExecutorCompletionService,需要自己创建线程池,虽看上去有些啰嗦,但好处是可以让多个 ExecutorCompletionService 的线程池业务隔离,这种隔离性能避免几个特别耗时的任务拖垮整个应用的风险。

那以上其实都能针对处理并发处理的场景,看业务使用哪一个方式,有相似的地方也有不相似的,具体业务场景使用具体的业务;

感谢您能看到这里!

 请作者喝咖啡