批量发送

通过了解 Netty4的原理我们可以得知,当调用 channel 的 writeAndFlush 方法时,Netty 会判断当前发送请求的线程是否是当前 channel 所绑定的 EventLoop 线程,如果不是 EventLooop 则会构造一个写任务 WriteTask 并将其提交到 EventLoop 中稍后执行,其代码(io.netty.channel.AbstractChannelHandlerContext)如下:

private void write(Object msg, boolean flush, ChannelPromise promise) {
        ObjectUtil.checkNotNull(msg, "msg");
        try {
            if (isNotValidPromise(promise, true)) {
                ReferenceCountUtil.release(msg);
                // cancelled
                return;
            }
        } catch (RuntimeException e) {
            ReferenceCountUtil.release(msg);
            throw e;
        }
 
        final AbstractChannelHandlerContext next = findContextOutbound(flush ?
                (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
	  	//判断当前线程是否是该channel绑定的EventLoop
        if (executor.inEventLoop()) {
            if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else {
                next.invokeWrite(m, promise);
            }
        } else {
            final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
            //将写任务提交到EventLoop上稍后执行
            if (!safeExecute(executor, task, promise, m, !flush)) {
                // We failed to submit the WriteTask. We need to cancel it so we decrement the pending bytes
                // and put it back in the Recycler for re-use later.
                //
                // See https://github.com/netty/netty/issues/8343.
                task.cancel();
            }
        }
    }

从这部分源码我们可以了解到 Netty 写消息时总是会保证把任务提交到 EventLoop 线程上处理,而每调度一次 EventLoop 线程去执行写任务 WriteTask 却只能写一个消息。其线程模型如下图:

image|500
而改造后的做法是将所有的消息都先提交到一个 WriteQueue 消息写队列上,内部会获取一次 EventLoop 并提交一个任务,该任务的逻辑比较简单,那就是从消息队列上不断的取消息出来并调用 Netty 的 write。其核心源码如下:

@Override
protected void flush(MessageTuple item) {
  prepare(item);
  Object finalMessage = multiMessage;
  if (multiMessage.size() == 1) {
    finalMessage = multiMessage.get(0);
  }
  channel.writeAndFlush(finalMessage).addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
      ChannelPromise cp;
      while ((cp = promises.poll()) != null) {
        if (future.isSuccess()){
          cp.setSuccess();
        } else {
          cp.setFailure(future.cause());
        }
      }
    }
  });
  this.multiMessage.removeMessages();
}

执行该 flush 的逻辑时,是处于 EventLoop 线程的,而从前面的 Netty 源码我们知道,当写动作处于 EventLoop 线程中时是会立即执行写动作的,并不会出现线程切换的行为!那么相较于之前每次都直接在用户线程中调用 writeAndFlush 而言,大幅度的减少了用户线程与 EventLoop 线程的切换次数,也使得一次 WriteTask 写出的消息数量有了大幅度提高,达到批量发包的效果,以此提高 dubbo 协议在 小报文 场景下的性能。改造后的模型如下图:

image|500

总结

参考源码,

 
    public void enqueue(T message, Executor executor) {
        queue.add(message);
        scheduleFlush(executor);
    }
 
    protected void scheduleFlush(Executor executor) {
        if (scheduled.compareAndSet(false, true)) {
            executor.execute(() -> this.run(executor));
        }
    }

核心就是当 当前的线程 执行 send 操作的时候,中间的添加的其他信息,先等待,放到队列中,然后等到上一个执行完毕,一次性执行 n 条。
比较适用于异步长连接的场景,在 A 执行动作后, 等待过程中, B、C、D 都可以继续执行动作,然后等待一起发送。
通过这种方式,减少在 IO 过程中用户与内核的切换开销

参考

Dubbo 性能调优总结文档 · Issue10915 · apache/dubbo · GitHub
issue 10727: optimize performance for `dubbo` protocol by icodening · Pull Request10728 · apache/dubbo · GitHub