写在前面
Netty是基于NIO的一个网络框架,他封装了NIO的晦涩的底层代码。剥离出来简明的API供人们使用,至于他的好处,网上有很多balabala的。我这里不在累述。
Client
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
/*
* ChannelInboundHandler按照注册的先后顺序执行,ChannelOutboundHandler按照注册的先后顺序逆序执行。
* HttpRequestDecoder、HttpObjectAggregator、HttpHandler为InboundHandler
* HttpContentCompressor、HttpResponseEncoder为OutboundHandler
* 在使用Handler的过程中,需要注意:
* 1、ChannelInboundHandler之间的传递,通过调用 ctx.fireChannelRead(msg) 实现;调用ctx.write(msg) 将传递到ChannelOutboundHandler。
* 2、ctx.write()方法执行后,需要调用flush()方法才能令它立即执行。
* 3、ChannelOutboundHandler 在注册的时候需要放在最后一个ChannelInboundHandler之前,否则将无法传递到ChannelOutboundHandler。
* 4、Handler的消费处理放在最后一个处理。
*/
p.addLast(new EchoClientHandler());
}
});
ChannelFuture f = b.connect(HOST, PORT).sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
闲言少叙,直接上代码(客户端代码+服务端代码)分析,根据代码 慢慢展开我对于netty的认知。在网络中我们要有个base,socket是tcp的介质,在netty中,channel就是socket的一个抽象。接下逐个分析这两端涉及到的类。
先从客户端开始
ChannelPipeline
netty源码分析之ChannelPipeline
要理解ChannelPipeline,我觉得理解了下面这句话和那个图就理解了
Each channel has its own pipeline and it is created automatically when a new channel is created.
译:每个通道都会自己的管道,当创建一个新的通道时,它会自动创建
下面的图摘自netty源码,我觉得他比网上对于管道的解释完美太多了,所以贴过来。这个图生动描述了(入站IO和出站IO)操作在管道中的流向和匹配handler处理的过程。
PS:出站 ==客户端到服务端==;入站 ==服务端到客户端==
* I/O Request
* via {@link Channel} or
* {@link ChannelHandlerContext}
* |
* +---------------------------------------------------+---------------+
* | ChannelPipeline | |
* | \|/ |
* | +---------------------+ +-----------+----------+ |
* | | Inbound Handler N | | Outbound Handler 1 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* | | \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler N-1 | | Outbound Handler 2 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ . |
* | . . |
* | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
* | [ method call] [method call] |
* | . . |
* | . \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler 2 | | Outbound Handler M-1 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* | | \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler 1 | | Outbound Handler M | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* +---------------+-----------------------------------+---------------+
* | \|/
* +---------------+-----------------------------------+---------------+
* | | | |
* | [ Socket.read() ] [ Socket.write() ] |
* | |
* | Netty Internal I/O Threads (Transport Implementation) |
* +-------------------------------------------------------------------+
接着我们继续看这个接口的的继承关系,他继承了ChannelInboundInvoker, ChannelOutboundInvoker, Iterable。继承了上述三个接口中的方法,比如
connect
方法,还有如建立连接时候的ChannelInboundHandler的channelRegistered
方法invoke的fireChannelRegistered
方法,ChannelInboundHandler的channelActive方法invoke的fireChannelActive
等方法。基本呢这个ChannelPipeline接口的方法覆盖了channel的功能实现。
我们再回到代码上,通过SocketChannel的实例调用
pipeline
方法来获取的管道。我们继续追踪代码,但是channel是在哪里创建的呢,这就要去Bootstarp中找寻了。
客户端的channel创建则是在执行
connect
方法时候创建的。这个方法的逻辑是先去进行一系列的校验。先调用其父类AbstractBootstrap的公共校验方法validate
,其后针对客户端需要校验远端地址的配置,都通过校验之后去创建channel并连上远端(也就是服务端)
//Connect a Channel to the remote peer
//创建通道并且链接到远程地址上
public ChannelFuture connect() {
validate();
SocketAddress remoteAddress = this.remoteAddress;
if (remoteAddress == null) {
throw new IllegalStateException("remoteAddress not set");
}
return doResolveAndConnect(remoteAddress, config.localAddress());
}
具体创建channel的做法,我们需要继续追踪
doResolveAndConnect
方法。
/**
* @see #connect()
*/
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.isDone()) {
if (!regFuture.isSuccess()) {
return regFuture;
}
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// Directly obtain the cause and do a null check so we only need one volatile read in case of a
// failure.
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
}
}
});
return promise;
}
}
ok,当我们进入
doResolveAndConnect
方法之后,我们就知道了channel是如何创建得了。也就是initAndRegister
,该方法主要工作就是initAndRegister
,针对这个Future是否完成做的后续操作,例如没有完成之时创建一个PendingRegistrationPromise
。配合addListener
进行回调,没有异常情况时的解析远程地址进行连接执行。
看上述代码构造一个
ChannelPromise
的两种方式1:PendingRegistrationPromise
:2:channel.newPromise()
,均是调用DefaultChannelPromise
类的构造方法来创建一个ChannelPromise
实例,创建了实例是为了进而去调用doResolveAndConnect0
方法,最后执行doConnect
连接到远程服务端。doConnect
执行逻辑为:获取channel的EventLoop提交一个连接远程节点的任务给线程池。然后注册一个连接失败的listener。回调操作为失败后关闭channel。这就是客户端连接服务端的代码流程。
ChannelPromise
和ChannelFuture
的关系有必要在这里说明一下,他是参照了Scala的设计。Promis是Future的子类,可以修改Future的状态。一旦Promise确认改变之后,Future就不会再变了。
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
(PS:额扯的远了,还好没走远!!!)我们是要解释
ChannelPipeline
思绪切回来,我们继续看initAndRegister
方法.阿西吧!我们终于找到了channe是哪里创建了,美滋滋!!!,好既然这样我们继续查看一下这个工厂方法,一探究竟。channelFactory.newChannel()
shit....找到了根据传入的Class反射创建。得了,在回头看下Client的代码这里channel
方法传入的就是NioSocketChannel.class
,那么该ChannelFactory
就是在这里创建。OK channel怎么创建我们已经明白了,根据Each channel has its own pipeline and it is created automatically when a new channel is created.这句经典之句。我们理所应当就要去追寻NioSocketChannel
中pipeline的构建。
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
@Override
public T newChannel() {
try {
return clazz.getConstructor().newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
关于反射创建解释的比较模糊,我还是来一波魔性画图解释下这个调用关系,bootstarp执行channel方法,调用的是
ReflectiveChannelFactory
去执行newChannel
方法,其中的执行逻辑是调用了NioSocketChannel
类的无参构造,在NioSocketChannel
类中的具体调用呢如下图所示。这里的代码比较关键 应为与多层调用,在第二个构造器中, 会调用newSocket
方法 用SelectorProvider
打开一个新的SocketChannel,底层呢就是调用Java NIO 开一个socket。用于TCP连接。
private static SocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openSocketChannel();
} catch (IOException e) {
throw new ChannelException("Failed to open a socket.", e);
}
}
NioSocketChannel
类的pipeline
继承自AbstractChannel
,客官往里瞅,小妹服务很nice的。手动滑稽,在AbstractChannel
的构造方法中看到了。客官请看pipeline = newChannelPipeline();
,现在有木有理解了那句话:channel的创建必然有pipeline的创建。
客官请低下头瞅瞅
newChannelPipeline
方法,这个方法究竟干了什么事情呢,纸面上看就是一些校验还有把我们创建的NioSocketChannel
赋值给本类的channel。创建两个的AbstractChannelHandlerContext
的对象一个的head
一个tail
,但是我们细致的观察该类,再看下AbstractChannelHandlerContext
的类结构就会发现,此处的初始化是实例化了一个存放ChannelHandler的双向链表AbstractChannelHandlerContext,那么head
和tail
就是链表的两个句柄。
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
那我们就看看这个两个句柄是何方神圣。从继承结构上来看,
HeadContext
继承了AbstractChannelHandlerContext
实现了ChannelOutboundHandler
,ChannelInboundHandler
.而TailContext
只是 继承了AbstractChannelHandlerContext
实现了ChannelOutboundHandler
。从这一点上我们就能看出来HeadContext
可以通知出站操作,也可以实现状态更改的触发的回调,而TailContext
只可以通知出站操作。接下来我们再看下对应的二者的构造方法。他们均调用了其父类的构造方法。先看HeadContext
,它的传入参数为 inbound = false, outbound = true 而TailContext
则和他相反outbound = false, inbound = true。这就说明了TailContext
处理的都是inboundHandlerHeadContext
处理的都是outgoundHandler,此时在回去看下ChannelPipeline那个经典流向图 是不是有一种恍然大明白的感觉!!!哈哈哈(PS 沈腾魔性的既视感。。)
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, false, true);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, TAIL_NAME, true, false);
setAddComplete();
}
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
boolean inbound, boolean outbound) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executor = executor;
this.inbound = inbound;
this.outbound = outbound;
// Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
ordered = executor == null || executor instanceof OrderedEventExecutor;
}
ok 到这里我们就算是了解了一丢丢netty的ChannelPipeline来龙去脉了,但是这远远是不够的低。作为netty的大动脉,必须还得深入研究一波。接下来带我去解开NioSocketChannel的腰带。我呸!!! 是面纱。蹭波热度强推一波良心国产电影
unsafe
我们在探寻创建
ChannelPipeline
的时候,看到在AbstractChannel
创建了一个Channel
的内部接口Unsafe
,我靠,这是个什么的接口,创建channel实现它干鸡毛啊。我还寻思JDK的后门Unsafe类呢。细看其中的方法,便发现了,接口中的那些方法都是个java和socket的交互,换句话来讲,这些方法面向的是java底层进而和Netty交互。
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
interface Unsafe {
RecvByteBufAllocator.Handle recvBufAllocHandle();
SocketAddress localAddress();
SocketAddress remoteAddress();
void register(EventLoop eventLoop, ChannelPromise promise);
void bind(SocketAddress localAddress, ChannelPromise promise);
void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
void disconnect(ChannelPromise promise);
void close(ChannelPromise promise);
void closeForcibly();
void deregister(ChannelPromise promise);
void beginRead();
void write(Object msg, ChannelPromise promise);
void flush();
ChannelPromise voidPromise();
ChannelOutboundBuffer outboundBuffer();
}
NioSocketChannel
NioSocketChannel
类的继承关系
在上面的ChannelPipeline中我们涉及到了NioSocketChannel的一些影踪,我们再这里就二探其究。
NioEventLoopGroup
NioEventLoopGroup
类的继承关系
要详细了解
NioEventLoopGroup
,我们需要先了解一下Reactor模式
Reactor模式
Ractor模式我们以Linux的C/S通信来开始讲述下。
Linux的C/S通信之select
首先呢,在没有epoll技术之前的时候,Linux的网络通信采用的是select的方式。也就是内核轮询FD(文件描述符)的形式来实现。当随着文件描述符打开的越来越多的时候,那么到那个时候 轮询的fd数目越多,轮询的时间成本无疑是非常大。效率自然就低下了。每个select轮询的FD的大小在linux/posix_types.h头文件中定义了一个宏。
#define__FD_SETSIZE 1024
这个宏表明Select最多同时监听1024个fd,当然,可以通过修改头文件再重编译内核来扩大这个数目。先不说有没有人可以编译内核成功。就算改大了之后也当FD达到一个数量级别还是会有同样的问题。
内核空间和用户空间内存拷贝问题,select需要复制大量的句柄数据结构,产生巨大的开销;
没有对应关系返回的是句柄数组,那么应用程序访问的呢当然也需要遍历句柄数组。
当一个程序拿到FD(就绪状态)之后,没有对这个FD做任何操作,那么其余的轮询依旧会把那个FD包含在其中。这既是所谓的水平触发
Linux的C/S通信之poll方式。
相比select,poll使用链表保存文件描述符,因此没有了监视文件数量的限制。其余的缺点还是有的。
基于select和poll这样的通信方式。这里就要也引出一个C10K问题了也就是单机百万并发问题。
C10K问题
C10K问题,这是简书上的回答。供诸君参考。
我通俗的说一下这个C10K问题吧。就是单机支持100万访问。就Select通信方式来说。默认支持的FD的个数为1024个。这样就要有1K个线程才能满足100万的访问对吧。试想一下1K个进程的,姑且先不说开进程的开销。就算开了1K个。那么进程上下文切换的代价,内核空间和用户空间的来回拷贝,句柄数组的轮询,这么多问题单机系统是无法驾驭的。没有财力和人力的小公司,就会被业务的并发压死。在这中模式下就是加机器加机器。没点财力物力,搞不起的,也就是国外的googel之流的能搞起。腾讯tm是基于原始的UDP协议自己封装的,巧妙地避过了这个问题。
怎么解决这个问题呢。就技术扯皮而言不考虑机器性能的情况
- 职责单一原则一个进程处理一个连接,不用考虑上下文切换。需要的资源不是一般的大,而且啊这个可拓展新不大,算是理想环境下了。
- IO多路复用技术,单个个进程处理多个连接。
IO多路复用技术实现细节
- 基于select模式,没有任何scoket等待文件描述符变为ready,循环处理多个连接,处理完了就释放,进而别的连接就复用了原来的线程,但是一旦某一个FD没有ready那就阻塞了整个应用。
- 基于1的实现方式,当有连接过来的时候先去进行一下判断当前持有的FD是否ready,如果没有就放弃当前,去处理新的。把原来未就绪的FD和原来的线程绑定关系的存放起来,然后循环这个未就绪的,当文件句柄发生改变之后,再给这个FD绑定上连接再去切到该线程上去处理原来的。这种实现 同样受限于规模
- poll的方式之前有说过是select的进阶版,他到时没有FD上线的限制。但是效率依旧不怎么样,还是有逐个排查所有文件句柄状态效率不高的问题。
- epoll方式 既然poll方式而已,还是回逐个遍历所有文件句柄,那么只要返回有句柄变化的不就节省了很多工作?。epoll就是采取的这种方式,不过epoll采取的是全新的数据结构。,但是依赖于只能在linux平台使用。epoll技术的编程模型就是异步非阻塞回调,也可以叫做Reactor,事件驱动,事件轮循(EventLoop)。Nginx,libevent,node.js(单线程模式下的Reactor模式)这些就是Epoll时代的产物!
Linux的C/S通信之epoll方式。(2002年以后出现的)
epoll方式和原来的select/poll方式截然不同,epoll的设计思路,是把select/poll单个的操作拆分为1个epoll_create+多个epoll_ctrl+一个epoll_wait。构建了一个eventpoll的文件系统(数据结构为B+Tree),这样的设计也符合linux一切皆文件的宗旨。
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);
//eventpoll结构体
struct eventpoll {
/*自旋转锁用于唤醒*/
spinlock_t lock;
/*原子锁用于执行epoll_ctl方法时候,防止并发问题*/
struct mutex mtx;
/*调用 sys_epoll_wait() 之后添加的阻塞队列 */
wait_queue_head_t wq;
/* 调用 file->poll() 之后的添加的阻塞队列*/
wait_queue_head_t poll_wait;
/* 就绪状态的的FD 列表的头结点*/
struct list_head rdllist;
/* 红黑树的根节点*/
struct rb_root rbr;
struct epitem *ovflist;
struct user_struct *user;
};
struct epitem {
/* RB tree node 红黑树节点 根节点为在eventpoll的rbr */
struct rb_node rbn;
/* 就绪状态的的FD链表,也就是ready了的是时间过来之后存放在这里 该链表的头结点存放在eventpoll的rdllist中*/
struct list_head rdllink;
/*
* Works together "struct eventpoll"->ovflist in keeping the
* single linked chain of items.
*/
struct epitem *next;
/* The file descriptor information this item refers to */
struct epoll_filefd ffd;
/* Number of active wait queue attached to poll operations */
int nwait;
/* List containing poll wait queues */
struct list_head pwqlist;
/* The "container" of this item */
struct eventpoll *ep;
/* List header used to link this item to the "struct file" items list */
struct list_head fllink;
/* The structure that describe the interested events and the source fd */
struct epoll_event event;
};
调用
epoll_create
方法会创建一个eventpoll结构体如上面代码所示的结构,并且返回在该eventpoll文件系统的FD;
调用epoll_ctl
方法用来在eventpoll中添加,删除以及修改epitem监听项。
调用epoll_wait
方法收集在epoll文件系统中已经ready的事件。也就是返回eventpoll结构体中的rdllist中的句柄。
OK !现在我们来分析与一下这个
eventpoll
结构体,至于为什么要两个锁呢,其实很好理解,一个原子锁也就是独占锁,是为了并发访问共享资源eventpoll,全局就一个。
Bootstrap
Server
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new EchoServerHandler());
}
});
ChannelFuture f = b.bind(PORT).sync();
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}