1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
| package com.lyloou.component.util.concurrent;
import cn.hutool.core.collection.CollUtil; import cn.hutool.core.date.DateUtil; import cn.hutool.core.date.TimeInterval; import cn.hutool.core.lang.Assert; import cn.hutool.core.thread.ThreadUtil; import lombok.SneakyThrows;
import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.function.BiFunction;
public class ConcurrentExecutor2<K, V, R> {
private final Map<K, V> paramMap;
private final Map<K, R> successResultMap;
private final Map<K, Throwable> errorResultMap;
private final ExecutorService executorService;
private final BiFunction<K, V, R> biFunction;
public ConcurrentExecutor2(int maxThreadNum, Map<K, V> paramMap, BiFunction<K, V, R> biFunction) { Assert.notNull(paramMap, "paramMap不可为空"); Assert.isTrue(maxThreadNum > 0, "maxThreadNum不可小于1");
int maxThreadNum1 = Math.min(maxThreadNum, paramMap.size()); this.paramMap = Collections.synchronizedMap(paramMap); this.biFunction = biFunction; this.executorService = ThreadUtil.newExecutor(maxThreadNum1, maxThreadNum1, Integer.MAX_VALUE); this.successResultMap = new ConcurrentHashMap<>(this.paramMap.size()); this.errorResultMap = new ConcurrentHashMap<>(); }
public void execute() throws InterruptedException { while (CollUtil.isNotEmpty(paramMap)) { List<Callable<R>> callableList = new ArrayList<>(); paramMap.forEach((k, v) -> callableList.add(() -> { try { final R result = biFunction.apply(k, v); successResultMap.put(k, result); } catch (Exception e) { errorResultMap.put(k, e); } finally { paramMap.remove(k); } return null; })); executorService.invokeAll(callableList); executorService.shutdown(); } }
public Map<K, R> getSuccessResultMap() { return successResultMap; }
public Map<K, Throwable> getErrorResultMap() { return errorResultMap; }
public static void main(String[] args) throws InterruptedException { final TimeInterval timer = DateUtil.timer(); for (int i = 0; i < 100; i++) { test(); } System.out.println(timer.intervalMs()); }
private static void test() throws InterruptedException { Map<String, String> paramMap = new LinkedHashMap<>(); for (int i = 0; i < 10; i++) { paramMap.put("key:" + i, "value:" + i); }
final ConcurrentExecutor2<String, String, Integer> executor = new ConcurrentExecutor2<>(5, paramMap, (k, v) -> { ThreadUtil.sleep(10); System.out.println(Thread.currentThread().getName() + "-" + v); final int abs = Math.abs(Objects.hash(v)); if (abs % 3 == 0) { int i = 1 / 0; } return abs; }); executor.execute(); System.out.println("success result: " + executor.getSuccessResultMap()); System.out.println("error result: " + executor.getErrorResultMap()); } }
|