【Java】基于线程池的独立任务并发执行器

目的:

对于多个独立的任务,可以以并发的方式执行任务,以提高 CPU 利用率,提高处理效率。

思路

在一个线程池中,开启指定数量的线程,每个线程从任务队列中获取任务执行。

执行的过程中,判断当前线程是否在执行任务的状态,如果没有执行任务,取一条任务执行,如果正在执行,则跳过,下轮再判断。

在所有任务执行完后,关闭线程池。

需要注意的是数据结构的选择,须选择并发类的数据结构,不然可能出现阻塞,死锁等情况。

(具体逻辑参考源码)

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* 并发执行器示例
*/
public class ConcurrentExecutorTest {

/**
* 测试
*/
public static void main(String[] args) {
for (int i = 0; i < 100; i++) {
test();
}
}

private static void test() {
Map<String, String> paramMap = new LinkedHashMap<>();
for (int i = 0; i < 10; i++) {
paramMap.put("key:" + i, "value:" + i);
}

final ConcurrentExecutor<String, String, Integer> executor = new ConcurrentExecutor<>(5, paramMap,
(k, v) -> {
ThreadUtil.sleep(10);
System.out.println(Thread.currentThread().getName() + "-" + v);
final int abs = Math.abs(Objects.hash(v));
if (abs % 3 == 0) {
int i = 1 / 0;
}
return abs;
});
executor.execute();
System.out.println("success result: " + executor.getSuccessResultMap());
System.out.println("error result: " + executor.getErrorResultMap());
}
}

测试结果

1
2
3
4
5
6
7
8
9
10
11
12
pool-1-thread-1-value:0
pool-1-thread-2-value:1
pool-1-thread-4-value:3
pool-1-thread-3-value:2
pool-1-thread-3-value:8
pool-1-thread-1-value:5
pool-1-thread-5-value:4
pool-1-thread-2-value:6
pool-1-thread-4-value:7
pool-1-thread-3-value:9
success result: {key:2=231604360, key:0=231604358, key:6=231604364, key:5=231604363, key:3=231604361, key:9=231604367, key:8=231604366}
error result: {key:1=java.lang.ArithmeticException: / by zero, key:4=java.lang.ArithmeticException: / by zero, key:7=java.lang.ArithmeticException: / by zero}

源码(方案 1)

基于自定义的 queue 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.collection.ConcurrentHashSet;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.BooleanUtil;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.function.BiFunction;

/**
* 并发执行器
* <p>
* 适用场景:每个任务是独立的,不耦合的
*
* @author lilou
* @since 2022/6/9 9:05
*/
public class ConcurrentExecutor<K, V, R> {
/**
* 任务参数映射(K:key的类型,V:值的类型)
*/
private final Map<K, V> paramMap;
/**
* 成功的任务结果映射(R:结果类型)
*/
private final Map<K, R> successResultMap;

/**
* 失败的任务结果映射
*/
private final Map<K, Throwable> errorResultMap;

/**
* 当前运行中的key集合
*/
private final Set<K> runningKeySet;

/**
* 候选任务key队列
*/
private final Queue<K> candidateKeyQueue;

/**
* 同时运行的最大线程数量
*/
private final int maxThreadNum;

/**
* 执行器
*/
private final ExecutorService executorService;

/**
* 具体任务策略
*/
private final BiFunction<K, V, R> biFunction;

/**
* 当前index线程的运行状态,可依据此状态,判断是否立刻从任务参数中获取任务执行
*/
private final Map<Integer, Boolean> currentIndexThreadRunningStatusMap;

public ConcurrentExecutor(int maxThreadNum, Map<K, V> paramMap, BiFunction<K, V, R> biFunction) {
Assert.notNull(paramMap, "paramMap不可为空");
Assert.isTrue(maxThreadNum > 0, "maxThreadNum不可小于1");

final int paramSize = paramMap.size();
this.maxThreadNum = Math.min(maxThreadNum, paramSize);
// tips: 须转换成同步类的map数据结构,如果错误地使用 this.paramMap = paramMap; 且外部使用了HashMap 或 LinkedHashMap,多测试几遍会发现,偶尔会陷入了阻塞
this.paramMap = Collections.synchronizedMap(paramMap);
this.candidateKeyQueue = new ConcurrentLinkedQueue<>(paramMap.keySet());
this.runningKeySet = new ConcurrentHashSet<>(paramSize);
this.biFunction = biFunction;
this.executorService = ThreadUtil.newExecutor(this.maxThreadNum, this.maxThreadNum, Integer.MAX_VALUE);
this.currentIndexThreadRunningStatusMap = new ConcurrentHashMap<>(this.maxThreadNum);
this.successResultMap = new ConcurrentHashMap<>(this.paramMap.size());
this.errorResultMap = new ConcurrentHashMap<>();
}


public void execute() {
while (CollUtil.isNotEmpty(paramMap)) {

// 最多同时有 maxRunningThreadNumber 同时消费 taskMap 中的数据
for (int i = 0; i < this.maxThreadNum; i++) {
int currentIndex = i;

// 当前线程上次还未执行完,暂时跳过
final Boolean isRunning = currentIndexThreadRunningStatusMap.getOrDefault(currentIndex, false);
if (BooleanUtil.isTrue(isRunning)) {
continue;
}

// 选择一个候选key
final K candidateKey = pickCandidateKey();
// 当前没有对应key的任务
if (Objects.isNull(candidateKey)) {
continue;
}

// 在线程池中运行任务
executorService.submit(() -> {
try {
currentIndexThreadRunningStatusMap.put(currentIndex, true);
final V data = paramMap.get(candidateKey);

// 开始执行任务
final R result = biFunction.apply(candidateKey, data);

// 存入正常结果
successResultMap.put(candidateKey, result);
} catch (Exception e) {
// 存入异常结果
errorResultMap.put(candidateKey, e);
} finally {
paramMap.remove(candidateKey);
candidateKeyQueue.remove(candidateKey);
currentIndexThreadRunningStatusMap.remove(currentIndex);
}
});
}
}
executorService.shutdown();
}


/**
* 从候选任务key队列中选择一个任务key
*/
private K pickCandidateKey() {
for (K candidateKey : candidateKeyQueue) {
if (!runningKeySet.contains(candidateKey)) {
runningKeySet.add(candidateKey);
return candidateKey;
}
}
return null;
}

public Map<K, R> getSuccessResultMap() {
return successResultMap;
}

public Map<K, Throwable> getErrorResultMap() {
return errorResultMap;
}
}

源码(方案 2)

基于 executorService.invokeAll

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package com.lyloou.component.util.concurrent;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.date.TimeInterval;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.thread.ThreadUtil;
import lombok.SneakyThrows;

import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.function.BiFunction;

/**
* 并发执行器
* <p>
* 适用场景:每个任务是独立的,不耦合的
*
* @author lilou
* @since 2022/6/9 9:05
*/
public class ConcurrentExecutor2<K, V, R> {
/**
* 任务参数映射(K:key的类型,V:值的类型)
*/
private final Map<K, V> paramMap;
/**
* 成功的任务结果映射
*/
private final Map<K, R> successResultMap;

/**
* 失败的任务结果映射
*/
private final Map<K, Throwable> errorResultMap;

/**
* 执行器
*/
private final ExecutorService executorService;

/**
* 具体任务策略
*/
private final BiFunction<K, V, R> biFunction;


public ConcurrentExecutor2(int maxThreadNum, Map<K, V> paramMap, BiFunction<K, V, R> biFunction) {
Assert.notNull(paramMap, "paramMap不可为空");
Assert.isTrue(maxThreadNum > 0, "maxThreadNum不可小于1");

// 同时运行的最大线程数量
int maxThreadNum1 = Math.min(maxThreadNum, paramMap.size());
// tips: 如果错误地使用 this.paramMap = paramMap; 多测试几遍会发现,偶尔会陷入了阻塞
this.paramMap = Collections.synchronizedMap(paramMap);
this.biFunction = biFunction;
this.executorService = ThreadUtil.newExecutor(maxThreadNum1, maxThreadNum1, Integer.MAX_VALUE);
this.successResultMap = new ConcurrentHashMap<>(this.paramMap.size());
this.errorResultMap = new ConcurrentHashMap<>();
}


public void execute() throws InterruptedException {
while (CollUtil.isNotEmpty(paramMap)) {
List<Callable<R>> callableList = new ArrayList<>();
paramMap.forEach((k, v) -> callableList.add(() -> {
try {
final R result = biFunction.apply(k, v);
successResultMap.put(k, result);
} catch (Exception e) {
errorResultMap.put(k, e);
} finally {
paramMap.remove(k);
}
return null;
}));
// 在线程池中运行任务
executorService.invokeAll(callableList);
executorService.shutdown();
}
}

public Map<K, R> getSuccessResultMap() {
return successResultMap;
}

public Map<K, Throwable> getErrorResultMap() {
return errorResultMap;
}

/**
* 测试
*/
public static void main(String[] args) throws InterruptedException {
final TimeInterval timer = DateUtil.timer();
for (int i = 0; i < 100; i++) {
test();
}
System.out.println(timer.intervalMs());
}

private static void test() throws InterruptedException {
Map<String, String> paramMap = new LinkedHashMap<>();
for (int i = 0; i < 10; i++) {
paramMap.put("key:" + i, "value:" + i);
}

final ConcurrentExecutor2<String, String, Integer> executor = new ConcurrentExecutor2<>(5, paramMap,
(k, v) -> {
ThreadUtil.sleep(10);
System.out.println(Thread.currentThread().getName() + "-" + v);
final int abs = Math.abs(Objects.hash(v));
if (abs % 3 == 0) {
int i = 1 / 0;
}
return abs;
});
executor.execute();
System.out.println("success result: " + executor.getSuccessResultMap());
System.out.println("error result: " + executor.getErrorResultMap());
}
}