private final CopyOnWriteArrayList<T> sharedList;
private final ThreadLocal<List<Object>> threadList;
private final AtomicInteger waiters;
private final SynchronousQueue<T> handoffQueue;
sharedList
用来缓存所有的连接,是一个 CopyOnWriteArrayList 结构。threadList
用来缓存某个线程所使用的所有连接,相当于快速引用,是一个ThreadLocal类型的ArrayList。waiters
当前正在获取连接的等待者数量。AtomicInteger,就是一个自增对象。当waiters的数量大于0时候,意味着有线程正在获取资源。handoffQueue
0容量的快速传递队列,SynchronousQueue 类型的队列,非常有用。
ConcurrentBag 里面的元素,为了能够无锁化操作,需要使用一些变量来标识现在处于的状态。抽象的接口如下:
public interface IConcurrentBagEntry{
int STATE_NOT_IN_USE = 0;
int STATE_IN_USE = 1;
int STATE_REMOVED = -1;
int STATE_RESERVED = -2;
boolean compareAndSet(int expectState, int newState);
void setState(int newState);
int getState();
}
有了这些数据结构的支持,我们的 ConcurrentBag
就可以实现它光的宣称了。
public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException
public void requite(final T bagEntry)
{
bagEntry.setState(STATE_NOT_IN_USE);
for (var i = 0; waiters.get() > 0; i++) {
if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {
return;
}
else if ((i & 0xff) == 0xff) {
parkNanos(MICROSECONDS.toNanos(10));
}
else {
Thread.yield();
}
}
final var threadLocalList = threadList.get();
if (threadLocalList.size() < 50) {
threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
}
}
- 使用 ThreadLocal 来缓存本地资源引用,使用线程封闭的资源来减少锁的冲突
- 采用读多写少的线程安全的 CopyOnWriteArrayList 来缓存所有对象,几乎不影响读取效率
- 使用基于 CAS 的 AtomicInteger 来计算等待者的数量,无锁操作使得计算更加快速
- 0容量的交换队列 SynchronousQueue,使得对象传递更加迅速
- 采用 compareAndSet 的 CAS 原语来控制状态的变更,安全且效率高。很多核心代码都是这么设计的
- 在循环中使用 park、yield 等方法,避免死循环占用大量 CPU
- 需要了解并发数据结构中的 offer、poll、peek、put、take、add、remove 方法的区别,并灵活应用
- CAS 在设置状态时,采用了 volatile 关键字修饰,对于 volatile 的使用也是一个常见的优化点
- 需要了解 WeakReference 弱引用在垃圾回收时候的表现