引子
在之前的文章中有提到BootStrap启动类中绑定端口的内部实现如下:
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
之前只提到了 channel.bind方法绑定端口,但是未提到channel.eventLoop().execute()方法的内部实现。在运行过程中,channel.eventLoop()实际就是NioEventLoop对象,我们首先就来看它是如何启动线程的。
NioEventLoop.execute()
它的execute方法实现如下:
public void execute(Runnable task) {
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
方法中inEventLoop方法判断当前调用线程是否为NioEventLoop线程,由于当前是main主线程,并且NioEventLoop线程还未创建,所以inEventLoop()方法返回false,会执行startThread方法。这个startThread方法最终实现如下:
private void doStartThread() {
executor.execute(new Runnable() {
@Override
public void run() {
......
SingleThreadEventExecutor.this.run();
......
}
}
可以看出这个方法做的就是调用线程池对象的execute方法创建线程,源码中的executor对象就是在分析NioEventLoopGroup那篇文章中说到的ThreadPerTaskExecutor线程池对象。
另外运行过程中SingleThreadEventExecutor.this实际就是NioEventLoop对象,它的run方法里面封装了启动过程,这部分内容将在下面继续阐述。
NioEventLoop.run()
在这个run方法中主要做了下面几件事:
- select() 检查是否有IO事件
- processSelectedKeys() 处理IO事件
- runAllTasks() 处理异步任务队列
下面就这几个方面来看下源码是如何实现的。
select() 检查是否有IO事件:
@Override
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
......
可以看出run方法中首先写了个死循环进行轮询。在调试过程中发现switch默认走的是SelectStrategy.SELECT
分支,即执行select(wakenUp.getAndSet(false));
,这个方法实现分为下面几个方面:
- 设置deadline超时时间
- 阻塞式select
- 避免jdk空轮询的bug
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow(); // part: 1
selectCnt = 1;
}
break;
}
......
int selectedKeys = selector.select(timeoutMillis); // part: 2
selectCnt ++;
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
break;
}
在select方法内部又是一个没有条件的for循环。首先计算select操作允许的超时时间timeoutMillis和select操作的截止时间selectDeadLineNanos。在for循环中,如果当前循环的时间已经超过截止时间,则执行选择器的selector.selectNow();
方法,selectNow方法是非阻塞的,并且直接跳出for循环,此次select操作就结束了,这就是第一部分:设置deadline超时时间,并进行非阻塞select调用。
第二部分就是当前时间没有超时的情况下执行选择器的select方法进行阻塞调用。select方法返回selectedKeys不为0的时候表示选择器已经选好了通道,跳出循环,本次select操作也就结束了。
关于第三部分netty是如何避免jdk空轮询的bug的,还是不懂,这里就先预留了个问题了。。。
processSelectedKeys() 处理IO事件
处理IO事件主要分为两个部分:selected keySet优化和processSelectedKeys()方法的执行。
- selected keySet优化
在我们实例化EventLoop对象,执行构造函数的时候,调用了selector = openSelector();
,在这个方法内部创建了一个selectedKeySet 对象:final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
那这个selectedKeySet是种什么类型呢:
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
private SelectionKey[] keysA;
private int keysASize;
SelectedSelectionKeySet() {
keysA = new SelectionKey[1024];
}
......
查看源码可知,这个继承了AbstractSet的对象,实际并不是用Set类型存放SelectionKey,而是用数组来实现的。这样做的目的是为了降低时间复杂度。
- 执行processSelectedKeys()方法
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
上述方法中,当selectedKeys不为空时,执行的processSelectedKeysOptimized方法代码如下:
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
for (int i = 0;; i ++) {
final SelectionKey k = selectedKeys[i];
if (k == null) {
break;
}
selectedKeys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
}
......
}
}
可以看到在这个方法中开始遍历selectedKeys这个数组,并通过SelectionKey获取绑定的attachment属性,而这个属性实际就是NioSocketChannel对象,所以接着执行processSelectedKey()这个方法:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
return;
}
}
}
可以看到在这个函数内部最终实现的就是处理各种IO事件的逻辑。代码中的readyOps这个准备好的操作与
SelectionKey.OP_WRITE进行与操作,即位运算,这个在单片机编程中比较常见,最终使用NioSocketChannel的unsafe()方法获取unsafe对象,数据读写都是通过这个unsafe对象来实现的。可以想象这个unsafe对象肯定实现了最终写到缓冲区ButeBuf的操作,并且还和pipeline进行有关联,方便事件和数据在各个ChannelHandler之间进行传递。关于这个unsafe对象的分析,我们以后继续。
runAllTasks() 处理异步任务队列
- task的分类和添加
在Netty内部有两个任务队列,一个是普通队列,另外一个是定时任务队列。在NioEventLoop的构造函数中:
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
......
}
调用了父类SingleThreadEventLoop的构造函数:
protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, threadFactory, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
tailTasks = newTaskQueue(maxPendingTasks);
}
构造函数中执行newTaskQueue()方法创建一个LinkedBlockingQueue任务队列:
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
}
分析到这可以得知在NioEventLoop对象初始化的时候创建了一个LinkedBlockingQueue类型的任务队列,下面再回过头来看下NioEventLoop执行execute方法的时候是如何处理这个队列的,查看addTask(task)方法代码:
protected void addTask(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
if (!offerTask(task)) {
reject(task);
}
}
final boolean offerTask(Runnable task) {
if (isShutdown()) {
reject();
}
return taskQueue.offer(task);
}
可以看出执行execute方法的时候,就是将传入的task加到任务队列里面去。那这个传入的task就是我们之前分析的绑定端口时执行NioEventLoop的execute方法时传入的Runnable接口实现,即将绑定端口这个任务加到任务队列中。
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
分析到这可知NioEventLoop的execute方法执行过程中做了启动线程和将添加绑定任务到任务队列这两步,在启动线程后执行的NioEventLoop的run方法这个死循环中不断的执行runAllTasks()方法,这里面做的就是执行任务的具体过程。
- 任务的聚合
我们接着来分析这个runAllTasks()方法:
protected boolean runAllTasks() {
assert inEventLoop();
boolean fetchedAll;
boolean ranAtLeastOne = false;
do {
fetchedAll = fetchFromScheduledTaskQueue();
if (runAllTasksFrom(taskQueue)) {
ranAtLeastOne = true;
}
} while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
if (ranAtLeastOne) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
}
afterRunningAllTasks();
return ranAtLeastOne;
}
看到这个运行所有任务的方法中首先执行的是fetchFromScheduledTaskQueue(),里面做的是从定时任务队列里面获取第一条任务,并将这条定时任务放到taskQueue里面。如果由于taskQueue任务队列空间不足,则重新放回到定时任务队列中;如果空间足够的话,就循环将所有到当前时间要执行的定时任务全部放到普通任务队列中。所以这就实现了普通任务队列和定时任务队列的聚合。
private boolean fetchFromScheduledTaskQueue() {
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
Runnable scheduledTask = pollScheduledTask(nanoTime);
while (scheduledTask != null) {
if (!taskQueue.offer(scheduledTask)) {
// No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
scheduledTask = pollScheduledTask(nanoTime);
}
return true;
}
- 任务的执行
分析了任务的聚合后,我们再看runAllTasks()方法中的runAllTasksFrom(taskQueue)
方法实现:
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
Runnable task = pollTaskFrom(taskQueue);
if (task == null) {
return false;
}
for (;;) {
safeExecute(task);
task = pollTaskFrom(taskQueue);
if (task == null) {
return true;
}
}
}
这个方法的逻辑就是从任务队列中循环取出所有任务并执行,safeExecute()
方法实现:
protected static void safeExecute(Runnable task) {
try {
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception. Task: {}", task, t);
}
}
可以看到最终就是执行task.run()方法来执行任务。
网友评论