并发线程工具类分享
阿昌 Java小菜鸡

hi,我是阿昌,今天分享一下并发线程工具类分享可以更好的在实际项目中进行使用,废话不多说,上来先分享java类

特点

  1. 支持信号量控制并发数
  2. 支持自定义线程
  3. 简单易用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
@Slf4j
@Setter@Getter
public class ConcurrentExecutor<R> {


public final ExecutorService executorService;

/**
* 信号量
*/
private int permits;

/**
* 默认信号量大小
*/
public static final int DEFAULT_PERMITS = 4;

/**
* 信号量实例
*/
private final Semaphore semaphore;

public static final int SEMAPHORE_TIMEOUT_MILL_SEC = 100;
/**
* 需要异步执行的任务
*/
private List<R> syncExecuteResult = new ArrayList<>();

private List<CompletableFuture<R>> futureList = new ArrayList<>();

public ConcurrentExecutor(int permits, ExecutorService executorService) {
this.permits = permits;
this.semaphore = new Semaphore(permits);//并发数;控制
this.executorService = executorService;
}

public static <R> ConcurrentExecutor<R> newConService(int permits, ExecutorService executorService) {
return new ConcurrentExecutor<>(permits, executorService);
}

public static <R> ConcurrentExecutor<R> newConService(ExecutorService executorService) {
return new ConcurrentExecutor<>(ConcurrentExecutor.DEFAULT_PERMITS, executorService);
}

/**
* 提交单个任务
*/
public ConcurrentExecutor<R> submit(Supplier<R> supplier) {
boolean acquire;
try {
acquire = semaphore.tryAcquire(SEMAPHORE_TIMEOUT_MILL_SEC, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
acquire = false;
}
if (acquire) {
CompletableFuture<R> future = CompletableFuture.supplyAsync(supplier, executorService);
future.whenComplete((t, throwable) -> semaphore.release());
futureList.add(future);
} else {
syncExecuteResult.add(supplier.get());
}
return this;
}

/**
* 等待所有任务结束,无超时
*/
public CompletableFuture<List<R>> invokeAllFuture() {
return CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]))
.thenApply(e -> futureList.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()))
.whenComplete((rs, throwable) -> {
if (rs != null && syncExecuteResult != null && syncExecuteResult.size() > 0) {
rs.addAll(syncExecuteResult);
}
});
}

public List<R> invokeAll() {
return invokeAllFuture().join();
}
}

这个工具类可以使用自定义线程池,并设置信号量各种来限制并发执行的线程数量,同时用invokeAllFuture()来让主线程阻塞,等待所有异步线程执行完毕后再执行;

使用案例:比如在多线程调用ai模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//多线程调用ai识别
ConcurrentExecutor<String> executor = new ConcurrentExecutor<>(3, ItemConstant.aiThreadPoolExecutor);
for (List<String> partList : Lists.partition(skuNameList, 5)) {
executor.submit(() -> {
String str = AiStrategyFactory.chatCompletions(partList);
if (StrUtil.isNotBlank(str)) {
System.out.println("ai调用执行结果:" + str);
}
return str;
});
}
//阻塞主线程,直到所有任务完成
executor.invokeAll();
//获取到所有线程执行结果,进行结果处理
List<String> syncExecuteResult = executor.getSyncExecuteResult();
 请作者喝咖啡