private final CopyOnWriteArrayList<T> sharedList;  
private final ThreadLocal<List<Object>> threadList;  
private final AtomicInteger waiters;  
private final SynchronousQueue<T> handoffQueue;
  • sharedList 用来缓存所有的连接,是一个 CopyOnWriteArrayList 结构。
  • threadList 用来缓存某个线程所使用的所有连接,相当于快速引用,是一个ThreadLocal类型的ArrayList。
  • waiters 当前正在获取连接的等待者数量。AtomicInteger,就是一个自增对象。当waiters的数量大于0时候,意味着有线程正在获取资源。
  • handoffQueue 0容量的快速传递队列,SynchronousQueue 类型的队列,非常有用。

ConcurrentBag 里面的元素,为了能够无锁化操作,需要使用一些变量来标识现在处于的状态。抽象的接口如下:

public interface IConcurrentBagEntry{  
    int STATE_NOT_IN_USE = 0;  
    int STATE_IN_USE = 1;  
    int STATE_REMOVED = -1;  
    int STATE_RESERVED = -2;  
    boolean compareAndSet(int expectState, int newState);  
    void setState(int newState);  
    int getState();  
}

有了这些数据结构的支持,我们的 ConcurrentBag 就可以实现它光的宣称了。

public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException
public void requite(final T bagEntry)  
{  
   bagEntry.setState(STATE_NOT_IN_USE);  
   for (var i = 0; waiters.get() > 0; i++) {  
      if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {  
         return;  
      }  
      else if ((i & 0xff) == 0xff) {  
         parkNanos(MICROSECONDS.toNanos(10));  
      }  
      else {  
         Thread.yield();  
      }  
   }  
   final var threadLocalList = threadList.get();  
   if (threadLocalList.size() < 50) {  
      threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);  
   }  
}
  1. 使用 ThreadLocal 来缓存本地资源引用,使用线程封闭的资源来减少锁的冲突
  2. 采用读多写少的线程安全的 CopyOnWriteArrayList 来缓存所有对象,几乎不影响读取效率
  3. 使用基于 CAS 的 AtomicInteger 来计算等待者的数量,无锁操作使得计算更加快速
  4. 0容量的交换队列 SynchronousQueue,使得对象传递更加迅速
  5. 采用 compareAndSet 的 CAS 原语来控制状态的变更,安全且效率高。很多核心代码都是这么设计的
  6. 在循环中使用 park、yield 等方法,避免死循环占用大量 CPU
  7. 需要了解并发数据结构中的 offer、poll、peek、put、take、add、remove 方法的区别,并灵活应用
  8. CAS 在设置状态时,采用了 volatile 关键字修饰,对于 volatile 的使用也是一个常见的优化点
  9. 需要了解 WeakReference 弱引用在垃圾回收时候的表现