hi,我是阿昌,今天分享一下并发线程工具类分享可以更好的在实际项目中进行使用,废话不多说,上来先分享java类
特点:
- 支持信号量控制并发数
- 支持自定义线程
- 简单易用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
| @Slf4j @Setter@Getter public class ConcurrentExecutor<R> {
public final ExecutorService executorService;
private int permits;
public static final int DEFAULT_PERMITS = 4;
private final Semaphore semaphore;
public static final int SEMAPHORE_TIMEOUT_MILL_SEC = 100;
private List<R> syncExecuteResult = new ArrayList<>();
private List<CompletableFuture<R>> futureList = new ArrayList<>();
public ConcurrentExecutor(int permits, ExecutorService executorService) { this.permits = permits; this.semaphore = new Semaphore(permits); this.executorService = executorService; }
public static <R> ConcurrentExecutor<R> newConService(int permits, ExecutorService executorService) { return new ConcurrentExecutor<>(permits, executorService); }
public static <R> ConcurrentExecutor<R> newConService(ExecutorService executorService) { return new ConcurrentExecutor<>(ConcurrentExecutor.DEFAULT_PERMITS, executorService); }
public ConcurrentExecutor<R> submit(Supplier<R> supplier) { boolean acquire; try { acquire = semaphore.tryAcquire(SEMAPHORE_TIMEOUT_MILL_SEC, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { acquire = false; } if (acquire) { CompletableFuture<R> future = CompletableFuture.supplyAsync(supplier, executorService); future.whenComplete((t, throwable) -> semaphore.release()); futureList.add(future); } else { syncExecuteResult.add(supplier.get()); } return this; }
public CompletableFuture<List<R>> invokeAllFuture() { return CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])) .thenApply(e -> futureList.stream() .map(CompletableFuture::join) .collect(Collectors.toList())) .whenComplete((rs, throwable) -> { if (rs != null && syncExecuteResult != null && syncExecuteResult.size() > 0) { rs.addAll(syncExecuteResult); } }); }
public List<R> invokeAll() { return invokeAllFuture().join(); } }
|
这个工具类可以使用自定义线程池,并设置信号量各种来限制并发执行的线程数量,同时用invokeAllFuture()来让主线程阻塞,等待所有异步线程执行完毕后再执行;
使用案例:比如在多线程调用ai模型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| ConcurrentExecutor<String> executor = new ConcurrentExecutor<>(3, ItemConstant.aiThreadPoolExecutor); for (List<String> partList : Lists.partition(skuNameList, 5)) { executor.submit(() -> { String str = AiStrategyFactory.chatCompletions(partList); if (StrUtil.isNotBlank(str)) { System.out.println("ai调用执行结果:" + str); } return str; }); }
executor.invokeAll();
List<String> syncExecuteResult = executor.getSyncExecuteResult();
|