charteechartee12-05 13:37

[转]Netty进阶之路--消息发送机制、消息接收时的内存申请机制

《Netty进阶之路》第5章、第6章分别提出两个问题:消息发送时没有限流造成消息积压,内存飙升;消息接收时没有注意正确的内存申请,导致内存和CPU占用不稳定。

1 消息发送机制
??Netty的消息发送前判断当前线程是否Netty NIOEventLoopLoop线程,如果是则直接执行写,否则封装成WriteTask添加到NIOEventLoopLoop的任务队列中,由NIOEventLoopLoop线程执行WriteTask写任务(这样做的好处是把IO操作统一交给NIOEventLoopLoop线程,防止因为IO而阻塞业务线程),接下来所有待发送消息进入ChannelOutboundBuffer链表中排队等待真正的写出,因此Netty的消息发送是线程安全的。

    //AbstractChannelHandlerContext
    private void write(Object msg, boolean flush, ChannelPromise promise) {
        ObjectUtil.checkNotNull(msg, "msg");
        // 省略参数校验

        final AbstractChannelHandlerContext next = findContextOutbound(flush ?
                (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
        // 如果是EventLoop线程则直接写
        if (executor.inEventLoop()) {
            if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else {
                next.invokeWrite(m, promise);
            }
        } else {
        // 否则封装成WriteTask,加入EventLoop的任务队列
            final AbstractWriteTask task;
            if (flush) {
                task = WriteAndFlushTask.newInstance(next, m, promise);
            }  else {
                task = WriteTask.newInstance(next, m, promise);
            }
            if (!safeExecute(executor, task, promise, m)) {
                // 加入任务队列失败会取消,等后面重新天剑
                task.cancel();
            }
        }
    }
    // 安全启动
    private static boolean safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) {
        try {
            //启动任务
            executor.execute(runnable);
            return true;
        } catch (Throwable cause) { ... }
    }


// SingleThreadEventExecutor
    @Override
    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        boolean inEventLoop = inEventLoop();
        addTask(task);
        if (!inEventLoop) {
            // 检查eventLoop线程是否启动,没有启动则立即启动
            startThread();
    }
    // 启动线程
    private void doStartThread() {
        assert thread == null;
        executor.execute(new Runnable() {
            @Override
            public void run() {
            // 省略非重点
                    try {
                        // 循环执行队列中的所有任务,包括了写任务WriteTask
                        for (;;) {
                            if (confirmShutdown()) {
                                break;
                            }
                        }
                    } finally { ... }

    // 确认线程没有中断,
    protected boolean confirmShutdown() {
        if (!isShuttingDown()) {
            return false;
        }
        runAllTasks();
    ....
    }
    //最后在safeExecute()方法中执行
    Runnable task = pollTaskFrom(taskQueue);
    task.run();


最后殊途同归,写操作流程回到AbstractChannelHandlerContext,下图红框:
 

        // 这里是AbstractChannelHandlerContext$WriteAndFlushTask类
        @Override
        public void write(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
            //加入待发送队列
            super.write(ctx, msg, promise);
            //发送
            ctx.invokeFlush();
        }


    // 这里是ChannelOutboundBuffer类
    // 由上述super.write(ctx, msg, promise);调用,作用是把消息加入待发送队列,用单向链表管理
    public void addMessage(Object msg, int size, ChannelPromise promise) {
        Entry entry = Entry.newInstance(msg, size, total(msg), promise);
        if (tailEntry == null) {
            flushedEntry = null;
        } else {
            Entry tail = tailEntry;
            tail.next = entry;
        }
        tailEntry = entry;
        if (unflushedEntry == null) {
            unflushedEntry = entry;
        }
        // 增加待发送消息的总字节数
        incrementPendingOutboundBytes(entry.pendingSize, false);
    }

    // 这里是NioSocketChannel
    // 由ctx.invokeFlush()调用
    @Override
    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        ... ...
        // 从ChannelOutboundBuffer获取链表第一个待写消息
        // 真正的消息发送位置
        final int localWrittenBytes = ch.write(buffer);
        // 清除ChannelOutboundBuffer链表的已写消息
        ... ...


完整流程如下图:

??¨è??é???????¥?????????è?°
2 消息高低水位
??ChannelOutBoundBuffer不仅维护待发送消息链表,还提供高低水位机制防止消息积压导致的内存不足,其原理是判断链表消息总字节数是否高于设定的高水位,如果是则设置Channel的状态为不可写,此外还会在ChannelPipline发送通知事件,若ChannelPipline中有监听器则可以感知到Channel的状态变化。当消息总长度降低到设定的低水位则恢复可写状态。

3 Netty消息接收时的内存申请机制
??Netty底层使用java.nio.channels.SocketChannel读取消息,读取时传入ByteBuf,SocketChannel会把网卡缓冲区的数据拷贝到ByteBuf(可以是堆内存,也可以是堆外直接内存)。这时涉及到ByteBuf内存分配,分配方式如下:
?? 默认使用RecvByteBufAllocator分配内存,它有记忆功能,初次内存分配1024byte,后续根据容量的实际使用情况调整每次的容量分配值。要分配的容量值使用一个Table记录“index-容量”:例如初次内存分配1024byte,若实际使用大于1024byte,下次分配容量扩充为2048byte,再不够则扩充为4096;若实际使用小于1024byte,下次容量缩减为512,若仍然有多余,则缩减为496byte。

        // AbstractNioByteChannel类
        @Override
        public final void read() {
            final ChannelConfig config = config();
            if (shouldBreakReadReady(config)) {
                clearReadPending();
                return;
            }
            final ChannelPipeline pipeline = pipeline();
            final ByteBufAllocator allocator = config.getAllocator();
            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
            allocHandle.reset(config);

            ByteBuf byteBuf = null;
            boolean close = false;
            try {
                do {
                    byteBuf = allocHandle.allocate(allocator);
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));


??本文总结了《Netty进阶之路》第5、6章消息发送和接收的原理,消息发送时的高低水位控制内存占用。
————————————————
版权声明:本文为CSDN博主「旧城灯火」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/qq_32717909/article/details/93376764

 

程序之家二维码

小额赞赏

000
评论