ThreadPool

自定义线程池

线程池的意义:

  • 降低资源消耗,复用已创建的线程,降低开销、控制最大并发数;
  • 隔离线程环境,可以配置独立线程池,将较慢的线程与较快的隔离开,避免相互影响;
  • 实现任务线程队列缓冲策略和拒绝策略;
  • 实现某些与时间相关的功能,如定时执行和周期执行等。

结构图

image-20230616154309438

自定义阻塞队列

阻塞队列使用一个双端队列实现,方便进行操作

参数

  • 任务队列 ArrayDeque

  • 锁 ReentrantLock

  • 生产者、消费者条件变量

    image-20230616154626046

  • 最大容量 Capcity

具体实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
//阻塞队列
class BlockingQueue<T> {
// 任务队列
private Deque<T> deque = new ArrayDeque<>();

//锁
private ReentrantLock lock = new ReentrantLock();

//生产者条件变量
private Condition fullWaitSet = lock.newCondition();
//消费者条件变量
private Condition entryWaitSet = lock.newCondition();

//最大容量
private int capcity;


public BlockingQueue(int capcity) {
this.capcity = capcity;
}

public T poll(long timeout, TimeUnit unit) {
lock.lock();
try {
long nanos = unit.toNanos(timeout);
while (deque.isEmpty()) {
try {
//超时,返回空结果
if (nanos <= 0) return null;
//每次循环,nanos时每次的剩余时间
nanos = entryWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = deque.remove();
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}
//阻塞获取
public T take() {
lock.lock();
try {
//队列为是空时
while (deque.isEmpty()) {
try {
//将线程进入等待队列
entryWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//不为空,返回队头元素
T t = deque.removeFirst();
//唤醒线程
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}

//阻塞添加
public void put(T e) {
lock.lock();
try {
//队满
while (deque.size() == capcity) {
try {
fullWaitSet.await();
System.out.println("等待加入任务队列" + e);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
//put元素到队尾
deque.addLast(e);
System.out.println("加入任务队列执行" + e);
entryWaitSet.signal();
} finally {
lock.unlock();
}
}

//超时阻塞添加
public boolean offer(T task, long timeout, TimeUnit unit) {
lock.lock();
try {
long nanos = unit.toNanos(timeout);
while (deque.size() == capcity) {
try {
if (nanos <= 0) return false;
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
deque.addLast(task);
entryWaitSet.signal();
return true;
} finally {
lock.unlock();
}
}

public int size() {
lock.lock();
try {
return deque.size();
} finally {
lock.unlock();
}
}

public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
lock.lock();
try {
if (deque.size() == capcity){
rejectPolicy.reject(this, task);
}else {
System.out.println("加入队列"+task);
deque.addLast(task);
entryWaitSet.signal();
}
}finally {
lock.unlock();
}
}
}

自定义线程池

参数

  • 阻塞队列,用于保存任务

  • 阻塞队列,最大长度,最大任务数

  • 线程集合

  • 核心线程数

  • 超时时间

  • 时间单位

  • 拒绝策略

    image-20230616155051758

具体实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
class ThreadPool {
//任务队列
private BlockingQueue<Runnable> taskQueue;
//线程集合
private HashSet<Worker> workers = new HashSet<>();
//核心线程数
private int coreSize;
//超时时间
private long timeout;

private TimeUnit timeUnit;

private RejectPolicy<Runnable> rejectPolicy;


public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity,RejectPolicy<Runnable> rejectPolicy) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockingQueue<Runnable>(queueCapcity);
this.rejectPolicy = rejectPolicy;
}

//任务执行器
public void execute(Runnable task) {
//任务数小于核心数,直接交给worker执行
synchronized (this) {
if (workers.size() < coreSize) {
Worker worker = new Worker(task);
System.out.println("新增worker" + worker + "新增task" + task);
workers.add(worker);
worker.start();
} else {
//如果任务数超过核心数,加入任务队列暂存
taskQueue.tryPut(rejectPolicy,task);
}
}
}

//工作线程
class Worker extends Thread {
private Runnable task;

public Worker(Runnable runnable) {
this.task = runnable;
}

@Override
public void run() {
//1、task不为空,执行任务
//2、task执行完毕,接着去任务队列去寻找任务获取并执行
while (task != null || (task = taskQueue.poll(1000, TimeUnit.MILLISECONDS)) != null) {
try {
System.out.println("正在执行" + task);
task.run();
} catch (Exception e) {

} finally {
task = null;
}
}
//将该任务移除出工作队列
synchronized (workers) {
System.out.println("移除" + this);
workers.remove(this);
}

}
}
}

拒绝策略

1
2
3
interface RejectPolicy<T> {
void reject(BlockingQueue<T> queue, T task);
}