Netty分析

Scroll Down

写在前面

Netty是基于NIO的一个网络框架,他封装了NIO的晦涩的底层代码。剥离出来简明的API供人们使用,至于他的好处,网上有很多balabala的。我这里不在累述。

Client
    public static void main(String[] args) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class)
             .option(ChannelOption.TCP_NODELAY, true)
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                         /*
                          * ChannelInboundHandler按照注册的先后顺序执行,ChannelOutboundHandler按照注册的先后顺序逆序执行。
                          * HttpRequestDecoder、HttpObjectAggregator、HttpHandler为InboundHandler
                          * HttpContentCompressor、HttpResponseEncoder为OutboundHandler
                          * 在使用Handler的过程中,需要注意:
                          * 1、ChannelInboundHandler之间的传递,通过调用 ctx.fireChannelRead(msg) 实现;调用ctx.write(msg) 将传递到ChannelOutboundHandler。
                          * 2、ctx.write()方法执行后,需要调用flush()方法才能令它立即执行。
                          * 3、ChannelOutboundHandler 在注册的时候需要放在最后一个ChannelInboundHandler之前,否则将无法传递到ChannelOutboundHandler。
                          * 4、Handler的消费处理放在最后一个处理。
                          */   
                     p.addLast(new EchoClientHandler());
                 }
             });
        
            ChannelFuture f = b.connect(HOST, PORT).sync();
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }

闲言少叙,直接上代码(客户端代码+服务端代码)分析,根据代码 慢慢展开我对于netty的认知。在网络中我们要有个base,socket是tcp的介质,在netty中,channel就是socket的一个抽象。接下逐个分析这两端涉及到的类。
先从客户端开始

ChannelPipeline

netty源码分析之ChannelPipeline

要理解ChannelPipeline,我觉得理解了下面这句话和那个图就理解了

Each channel has its own pipeline and it is created automatically when a new channel is created.

译:每个通道都会自己的管道,当创建一个新的通道时,它会自动创建

下面的图摘自netty源码,我觉得他比网上对于管道的解释完美太多了,所以贴过来。这个图生动描述了(入站IO和出站IO)操作在管道中的流向和匹配handler处理的过程。

PS:出站 ==客户端到服务端==;入站 ==服务端到客户端==

ChannelPipeline

 *                                                 I/O Request
 *                                            via {@link Channel} or
 *                                        {@link ChannelHandlerContext}
 *                                                      |
 *  +---------------------------------------------------+---------------+
 *  |                           ChannelPipeline         |               |
 *  |                                                  \|/              |
 *  |    +---------------------+            +-----------+----------+    |
 *  |    | Inbound Handler  N  |            | Outbound Handler  1  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  |               |                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  .               |
 *  |               .                                   .               |
 *  | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
 *  |        [ method call]                       [method call]         |
 *  |               .                                   .               |
 *  |               .                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  |               |                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler  1  |            | Outbound Handler  M  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  +---------------+-----------------------------------+---------------+
 *                  |                                  \|/
 *  +---------------+-----------------------------------+---------------+
 *  |               |                                   |               |
 *  |       [ Socket.read() ]                    [ Socket.write() ]     |
 *  |                                                                   |
 *  |  Netty Internal I/O Threads (Transport Implementation)            |
 *  +-------------------------------------------------------------------+

image

接着我们继续看这个接口的的继承关系,他继承了ChannelInboundInvoker, ChannelOutboundInvoker, Iterable。继承了上述三个接口中的方法,比如connect方法,还有如建立连接时候的ChannelInboundHandler的channelRegistered方法invoke的fireChannelRegistered方法,ChannelInboundHandler的channelActive方法invoke的fireChannelActive等方法。基本呢这个ChannelPipeline接口的方法覆盖了channel的功能实现。

我们再回到代码上,通过SocketChannel的实例调用pipeline方法来获取的管道。我们继续追踪代码,但是channel是在哪里创建的呢,这就要去Bootstarp中找寻了。

客户端的channel创建则是在执行connect方法时候创建的。这个方法的逻辑是先去进行一系列的校验。先调用其父类AbstractBootstrap的公共校验方法validate,其后针对客户端需要校验远端地址的配置,都通过校验之后去创建channel并连上远端(也就是服务端)

    //Connect a Channel to the remote peer
    //创建通道并且链接到远程地址上
 public ChannelFuture connect() {
        validate();
        SocketAddress remoteAddress = this.remoteAddress;
        if (remoteAddress == null) {
            throw new IllegalStateException("remoteAddress not set");
        }

        return doResolveAndConnect(remoteAddress, config.localAddress());
    }

具体创建channel的做法,我们需要继续追踪doResolveAndConnect方法。

 /**
     * @see #connect()
     */
    private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();

        if (regFuture.isDone()) {
            if (!regFuture.isSuccess()) {
                return regFuture;
            }
            return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    // Directly obtain the cause and do a null check so we only need one volatile read in case of a
                    // failure.
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();
                        doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }

ok,当我们进入doResolveAndConnect方法之后,我们就知道了channel是如何创建得了。也就是initAndRegister,该方法主要工作就是initAndRegister,针对这个Future是否完成做的后续操作,例如没有完成之时创建一个PendingRegistrationPromise。配合addListener进行回调,没有异常情况时的解析远程地址进行连接执行。

看上述代码构造一个ChannelPromise的两种方式1:PendingRegistrationPromise:2:channel.newPromise(),均是调用DefaultChannelPromise类的构造方法来创建一个ChannelPromise实例,创建了实例是为了进而去调用doResolveAndConnect0方法,最后执行doConnect连接到远程服务端。doConnect执行逻辑为:获取channel的EventLoop提交一个连接远程节点的任务给线程池。然后注册一个连接失败的listener。回调操作为失败后关闭channel。这就是客户端连接服务端的代码流程。

ChannelPromiseChannelFuture的关系有必要在这里说明一下,他是参照了Scala的设计。Promis是Future的子类,可以修改Future的状态。一旦Promise确认改变之后,Future就不会再变了。

image

    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
            if (channel != null) {
                // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                channel.unsafe().closeForcibly();
            }
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }

        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
        return regFuture;
    }

(PS:额扯的远了,还好没走远!!!)我们是要解释ChannelPipeline思绪切回来,我们继续看initAndRegister方法.阿西吧!我们终于找到了channe是哪里创建了,美滋滋!!!,好既然这样我们继续查看一下这个工厂方法,一探究竟。channelFactory.newChannel() shit....找到了根据传入的Class反射创建。得了,在回头看下Client的代码这里channel方法传入的就是NioSocketChannel.class,那么该ChannelFactory就是在这里创建。OK channel怎么创建我们已经明白了,根据Each channel has its own pipeline and it is created automatically when a new channel is created.这句经典之句。我们理所应当就要去追寻NioSocketChannel中pipeline的构建。

public B channel(Class<? extends C> channelClass) {
        if (channelClass == null) {
            throw new NullPointerException("channelClass");
        }
        return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
    }

@Override
    public T newChannel() {
        try {
            return clazz.getConstructor().newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + clazz, t);
        }
    }

关于反射创建解释的比较模糊,我还是来一波魔性画图解释下这个调用关系,bootstarp执行channel方法,调用的是ReflectiveChannelFactory去执行newChannel方法,其中的执行逻辑是调用了NioSocketChannel类的无参构造,在NioSocketChannel类中的具体调用呢如下图所示。这里的代码比较关键 应为与多层调用,在第二个构造器中, 会调用newSocket方法 用SelectorProvider打开一个新的SocketChannel,底层呢就是调用Java NIO 开一个socket。用于TCP连接。

 private static SocketChannel newSocket(SelectorProvider provider) {
        try {
            return provider.openSocketChannel();
        } catch (IOException e) {
            throw new ChannelException("Failed to open a socket.", e);
        }
    }

image

NioSocketChannel类的pipeline继承自AbstractChannel,客官往里瞅,小妹服务很nice的。手动滑稽,在AbstractChannel的构造方法中看到了。客官请看 pipeline = newChannelPipeline();,现在有木有理解了那句话:channel的创建必然有pipeline的创建。

客官请低下头瞅瞅newChannelPipeline方法,这个方法究竟干了什么事情呢,纸面上看就是一些校验还有把我们创建的NioSocketChannel赋值给本类的channel。创建两个的AbstractChannelHandlerContext的对象一个的head一个tail,但是我们细致的观察该类,再看下AbstractChannelHandlerContext的类结构就会发现,此处的初始化是实例化了一个存放ChannelHandler的双向链表AbstractChannelHandlerContext,那么headtail就是链表的两个句柄。

 protected DefaultChannelPipeline newChannelPipeline() {
        return new DefaultChannelPipeline(this);
    }
 protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);
        tail = new TailContext(this);
        head = new HeadContext(this);
        head.next = tail;
        tail.prev = head;
    }

那我们就看看这个两个句柄是何方神圣。从继承结构上来看,HeadContext 继承了AbstractChannelHandlerContext实现了 ChannelOutboundHandler, ChannelInboundHandler.而TailContext只是 继承了AbstractChannelHandlerContext实现了 ChannelOutboundHandler。从这一点上我们就能看出来 HeadContext 可以通知出站操作,也可以实现状态更改的触发的回调,而TailContext只可以通知出站操作。接下来我们再看下对应的二者的构造方法。他们均调用了其父类的构造方法。先看HeadContext,它的传入参数为 inbound = false, outbound = true 而TailContext则和他相反outbound = false, inbound = true。这就说明了TailContext 处理的都是inboundHandler HeadContext处理的都是outgoundHandler,此时在回去看下ChannelPipeline那个经典流向图 是不是有一种恍然大明白的感觉!!!哈哈哈(PS 沈腾魔性的既视感。。)

HeadContext(DefaultChannelPipeline pipeline) {
        super(pipeline, null, HEAD_NAME, false, true);
        unsafe = pipeline.channel().unsafe();
        setAddComplete();
    }
TailContext(DefaultChannelPipeline pipeline) {
        super(pipeline, null, TAIL_NAME, true, false);
        setAddComplete();
    }
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
                              boolean inbound, boolean outbound) {
    this.name = ObjectUtil.checkNotNull(name, "name");
    this.pipeline = pipeline;
    this.executor = executor;
    this.inbound = inbound;
    this.outbound = outbound;
    // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
    ordered = executor == null || executor instanceof OrderedEventExecutor;
}

ok 到这里我们就算是了解了一丢丢netty的ChannelPipeline来龙去脉了,但是这远远是不够的低。作为netty的大动脉,必须还得深入研究一波。接下来带我去解开NioSocketChannel的腰带。我呸!!! 是面纱。蹭波热度强推一波良心国产电影

unsafe

我们在探寻创建ChannelPipeline的时候,看到在AbstractChannel 创建了一个Channel的内部接口Unsafe ,我靠,这是个什么的接口,创建channel实现它干鸡毛啊。我还寻思JDK的后门Unsafe类呢。细看其中的方法,便发现了,接口中的那些方法都是个java和socket的交互,换句话来讲,这些方法面向的是java底层进而和Netty交互。

 protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
     
 }

 interface Unsafe {
        RecvByteBufAllocator.Handle recvBufAllocHandle();
        SocketAddress localAddress();
        SocketAddress remoteAddress();
        void register(EventLoop eventLoop, ChannelPromise promise);
        void bind(SocketAddress localAddress, ChannelPromise promise);
        void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
        void disconnect(ChannelPromise promise);
        void close(ChannelPromise promise);
        void closeForcibly();
        void deregister(ChannelPromise promise);
        void beginRead();
        void write(Object msg, ChannelPromise promise);
        void flush();
        ChannelPromise voidPromise();
        ChannelOutboundBuffer outboundBuffer();
    }

NioSocketChannel

NioSocketChannel类的继承关系

image

在上面的ChannelPipeline中我们涉及到了NioSocketChannel的一些影踪,我们再这里就二探其究。

NioEventLoopGroup

NioEventLoopGroup类的继承关系

image

要详细了解NioEventLoopGroup,我们需要先了解一下Reactor模式

Reactor模式

Ractor模式我们以Linux的C/S通信来开始讲述下。

Linux的C/S通信之select

首先呢,在没有epoll技术之前的时候,Linux的网络通信采用的是select的方式。也就是内核轮询FD(文件描述符)的形式来实现。当随着文件描述符打开的越来越多的时候,那么到那个时候 轮询的fd数目越多,轮询的时间成本无疑是非常大。效率自然就低下了。每个select轮询的FD的大小在linux/posix_types.h头文件中定义了一个宏。

#define__FD_SETSIZE   1024

这个宏表明Select最多同时监听1024个fd,当然,可以通过修改头文件再重编译内核来扩大这个数目。先不说有没有人可以编译内核成功。就算改大了之后也当FD达到一个数量级别还是会有同样的问题。

内核空间和用户空间内存拷贝问题,select需要复制大量的句柄数据结构,产生巨大的开销;

没有对应关系返回的是句柄数组,那么应用程序访问的呢当然也需要遍历句柄数组。

当一个程序拿到FD(就绪状态)之后,没有对这个FD做任何操作,那么其余的轮询依旧会把那个FD包含在其中。这既是所谓的水平触发

Linux的C/S通信之poll方式。

相比select,poll使用链表保存文件描述符,因此没有了监视文件数量的限制。其余的缺点还是有的。
基于select和poll这样的通信方式。这里就要也引出一个C10K问题了也就是单机百万并发问题。

C10K问题

C10K问题,这是简书上的回答。供诸君参考。
我通俗的说一下这个C10K问题吧。就是单机支持100万访问。就Select通信方式来说。默认支持的FD的个数为1024个。这样就要有1K个线程才能满足100万的访问对吧。试想一下1K个进程的,姑且先不说开进程的开销。就算开了1K个。那么进程上下文切换的代价,内核空间和用户空间的来回拷贝,句柄数组的轮询,这么多问题单机系统是无法驾驭的。没有财力和人力的小公司,就会被业务的并发压死。在这中模式下就是加机器加机器。没点财力物力,搞不起的,也就是国外的googel之流的能搞起。腾讯tm是基于原始的UDP协议自己封装的,巧妙地避过了这个问题。

怎么解决这个问题呢。就技术扯皮而言不考虑机器性能的情况

  1. 职责单一原则一个进程处理一个连接,不用考虑上下文切换。需要的资源不是一般的大,而且啊这个可拓展新不大,算是理想环境下了。
  2. IO多路复用技术,单个个进程处理多个连接。
IO多路复用技术实现细节
  1. 基于select模式,没有任何scoket等待文件描述符变为ready,循环处理多个连接,处理完了就释放,进而别的连接就复用了原来的线程,但是一旦某一个FD没有ready那就阻塞了整个应用。
  2. 基于1的实现方式,当有连接过来的时候先去进行一下判断当前持有的FD是否ready,如果没有就放弃当前,去处理新的。把原来未就绪的FD和原来的线程绑定关系的存放起来,然后循环这个未就绪的,当文件句柄发生改变之后,再给这个FD绑定上连接再去切到该线程上去处理原来的。这种实现 同样受限于规模
  3. poll的方式之前有说过是select的进阶版,他到时没有FD上线的限制。但是效率依旧不怎么样,还是有逐个排查所有文件句柄状态效率不高的问题。
  4. epoll方式 既然poll方式而已,还是回逐个遍历所有文件句柄,那么只要返回有句柄变化的不就节省了很多工作?。epoll就是采取的这种方式,不过epoll采取的是全新的数据结构。,但是依赖于只能在linux平台使用。epoll技术的编程模型就是异步非阻塞回调,也可以叫做Reactor,事件驱动,事件轮循(EventLoop)。Nginx,libevent,node.js(单线程模式下的Reactor模式)这些就是Epoll时代的产物!
Linux的C/S通信之epoll方式。(2002年以后出现的)

epoll方式和原来的select/poll方式截然不同,epoll的设计思路,是把select/poll单个的操作拆分为1个epoll_create+多个epoll_ctrl+一个epoll_wait。构建了一个eventpoll的文件系统(数据结构为B+Tree),这样的设计也符合linux一切皆文件的宗旨。

 int epoll_create(int size);
 int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
 int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);
 
 //eventpoll结构体
 struct eventpoll {  
    /*自旋转锁用于唤醒*/
    spinlock_t lock;  
    /*原子锁用于执行epoll_ctl方法时候,防止并发问题*/
    struct mutex mtx;  
    /*调用 sys_epoll_wait() 之后添加的阻塞队列 */  
    wait_queue_head_t wq;  
    /* 调用 file->poll() 之后的添加的阻塞队列*/  
    wait_queue_head_t poll_wait;  
    /* 就绪状态的的FD 列表的头结点*/  
    struct list_head rdllist;  
    /* 红黑树的根节点*/  
    struct rb_root rbr;  
    struct epitem *ovflist;  
    struct user_struct *user;  
};  

struct epitem {  
    /* RB tree node 红黑树节点 根节点为在eventpoll的rbr */  
    struct rb_node rbn;  
    /* 就绪状态的的FD链表,也就是ready了的是时间过来之后存放在这里 该链表的头结点存放在eventpoll的rdllist中*/  
    struct list_head rdllink;  
    /* 
     * Works together "struct eventpoll"->ovflist in keeping the 
     * single linked chain of items. 
     */  
    struct epitem *next;  
    /* The file descriptor information this item refers to */  
    struct epoll_filefd ffd;  
    /* Number of active wait queue attached to poll operations */  
    int nwait;  
    /* List containing poll wait queues */  
    struct list_head pwqlist;  
    /* The "container" of this item */  
    struct eventpoll *ep;  
    /* List header used to link this item to the "struct file" items list */  
    struct list_head fllink;  
    /* The structure that describe the interested events and the source fd */  
    struct epoll_event event;  
};  

调用epoll_create方法会创建一个eventpoll结构体如上面代码所示的结构,并且返回在该eventpoll文件系统的FD;
调用epoll_ctl方法用来在eventpoll中添加,删除以及修改epitem监听项。
调用epoll_wait方法收集在epoll文件系统中已经ready的事件。也就是返回eventpoll结构体中的rdllist中的句柄。

OK !现在我们来分析与一下这个eventpoll结构体,至于为什么要两个锁呢,其实很好理解,一个原子锁也就是独占锁,是为了并发访问共享资源eventpoll,全局就一个。

Bootstrap

Server
    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 100)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     p.addLast(new EchoServerHandler());
                 }
             });

            ChannelFuture f = b.bind(PORT).sync();
            f.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
   }