`
airu
  • 浏览: 267506 次
  • 性别: Icon_minigender_1
  • 来自: 云南
社区版块
存档分类
最新评论

netty4.0 之 EventLoop

 
阅读更多
在此之前,我们回顾下传统的网络编程。也就是socket。
socket编程中,客户端是发起连接的,服务端呢就监听某一个端口。
一档客户端连接被监听到,就在客户端和服务端建立一个连接,于是他们就可以传数据了。
NIO利用操作系统中的select,epoll等系统特性,避开了线程的开销和限制,由操作系统在数据到来时通知Selector, 当然NIO使用起来,还是需要自己去写一些框架性质的东西,不能专注于业务开发。netty 正是这样一个网络框架,使用它来获取的不仅仅是性能高效,更多的是业务的便捷开发。扯远了。现在就来看看EventLoop
public interface EventLoop extends EventExecutor, EventLoopGroup {
    @Override
    EventLoopGroup parent();
}

这个很无语。继续:
EventLoopGroup又继承了EventExecutorGroup, 我们从这个接口开始:
/**
 * The {@link EventExecutorGroup} is responsible to provide {@link EventExecutor}'s to use via its
 * {@link #next()} method. Beside this it also is responsible to handle their live-cycle and allows
 * to shut them down in a global fashion.
 *
 */
public interface EventExecutorGroup {

    /**
     * Returns one of the {@link EventExecutor}s that belong to this group.
     */
    EventExecutor next();

    /**
     * Shuts down all {@link EventExecutor}s managed by this group.
     *
     * @see ExecutorService#shutdown()
     */
    void shutdown();

    /**
     * Returns {@code true} if and only if {@link #shutdown()} has been called.
     *
     * @see ExecutorService#isShutdown()
     */
    boolean isShutdown();

    /**
     * Returns {@code true} if and only if {@link #shutdown()} has been called and all
     * {@link EventExecutor}s managed by this group has been terminated completely.
     *
     * @see ExecutorService#isTerminated()
     */
    boolean isTerminated();

    /**
     * Waits until {@link #isTerminated()} returns {@code true} or the specified amount of time
     * passes.
     *
     * @see ExecutorService#awaitTermination(long, TimeUnit)
     */
    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
}


文档说了。这就是通过next提供事件执行器的(EventExecutor)。从shutdown的文档可以知道,这个应该是管理了多个EventExecutor所以他是一组事件的管理者。

再来看具体的执行器(EventExecutor),发现她就是EventExecutorGroup的子接口,并且也继承了ScheduledExecutorService,这下基本明白了大半了,这些都是基于concurrent包中的ExecutorService啊。也就是说,神秘的EventLoop和线程的调度器有关。
/**
 * The {@link EventExecutor} is a special {@link ScheduledExecutorService} which comes
 * with some handy methods to see if a {@link Thread} is executed in a event loop.
 * Beside this it also extends the {@link EventExecutorGroup} to allow a generic way to
 * access methods.
 *
 */
public interface EventExecutor extends EventExecutorGroup, ScheduledExecutorService {

    /**
     * Returns a reference to itself.
     */
    @Override
    EventExecutor next();

    /**
     * Return the {@link EventExecutorGroup} which is the parent of this {@link EventExecutor},
     * or {@code null} if it has no parent

     */
    EventExecutorGroup parent();

    /**
     * Calls {@link #inEventLoop(Thread)} with {@link Thread#currentThread()} as argument
     */
    boolean inEventLoop();

    /**
     * Return {@code true} if the given {@link Thread} is executed in the event loop,
     * {@code false} otherwise.
     */
    boolean inEventLoop(Thread thread);
}


需要注意的接口是 boolean inEventLoop() 和 boolean inEventLoop(Thread thread)
这个是此接口的独特功能(比起ScheduledExecutorService)
判断一个线程是否在此调度器(ScheduledExecutorService)中。

再对EventExecutor和EventExecutorGroup了解后,再来看EventLoop和EventLoopGroup就容易多了,换成Loop后就多了几个接口,请看
/**
 * Special {@link EventExecutorGroup} which allows to register {@link Channel}'s that get
 * processed for later selection during the event loop.
 *
 */
public interface EventLoopGroup extends EventExecutorGroup {
    /**
     * Return the next {@link EventLoop} to use
     */
    @Override
    EventLoop next();

    /**
     * Register a {@link Channel} with this {@link EventLoop}. The returned {@link ChannelFuture}
     * will get notified once the registration was complete.
     */
    ChannelFuture register(Channel channel);

    /**
     * Register a {@link Channel} with this {@link EventLoop}. The passed {@link ChannelFuture}
     * will get notified once the registration was complete and also will get returned.
     */
    ChannelFuture register(Channel channel, ChannelFuture future);
}

这里多了两个方法。register和register 他们都是把通道注册到选择器(请看NIO中的channel的register)不同的Channel有不同的注册方式以及他们感兴趣的集合等等。这里可以找几个实现类便知。
再回到EventLoop, 则知道这是管理Channel和与Channel有关线程的类。
接口说完,就看看实现类。

EventLoop下就有EmbededEventLoop和抽象类SingleThreadEventLoop
SingleThreadEventLoop根据字面意思,可以看出管理event的线程是单一的。他的子类有AioEventLoop,LocalEventLoop,NioEventLoop,OioEventLoop,SingleThreadEventLoopImpl等。
这里的区别就是如何管理Channel和线程的关系。
Nio自然是使用selector,Aio则是使用了Future这样的特性,Oio一个线程对应一个Channel,local之类的就是使用了Channel自身内部的一个Unsafe类来实现。

用AioEventLoop来说吧。
首先定义一个ChannelFutureListener
 private final ChannelFutureListener registrationListener = new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            if (!future.isSuccess()) {
                return;
            }

            Channel ch = future.channel();
            channels.add(ch);
            ch.closeFuture().addListener(deregistrationListener);
        }
    };


注意完成时关闭注册监听器并且加入注销监听器。channels是一个Set
 private final Set<Channel> channels = Collections.newSetFromMap(new IdentityHashMap<Channel, Boolean>());

这个我也不知道为什么要如此费力。值得研究。经过网络搜索,弄明白了,这是从一个Map中得到一个Set,因为IdentityHashMap是一个特殊的Map,key的比较是比较是否是同一个对象。所以即使看起来都是一样的对象,实际上也不一样。(String s1 = new String("abc") 与 String s2 = new String("abc") 其实不是一个对象。)
下面是register的override
@Override
    public ChannelFuture register(Channel channel) {
        return super.register(channel).addListener(registrationListener);
    }


这里的super,就是他们的父类, 抽象类 SingleThreadEventLoop,他的register是如何实现的呢?直接上代码:
 @Override
    public ChannelFuture register(final Channel channel, final ChannelFuture future) {
        if (isShutdown()) {
            channel.unsafe().closeForcibly();
            future.setFailure(new EventLoopException("cannot register a channel to a shut down loop"));
            return future;
        }

        if (inEventLoop()) {
            channel.unsafe().register(this, future);
        } else {
            execute(new Runnable() {
                @Override
                public void run() {
                    channel.unsafe().register(SingleThreadEventLoop.this, future);
                }
            });
        }
        return future;
    }


对于NioEventLoop,则是覆写了这个方法。它使用Nio自身的select来处理线程和Channel的关系。看:
/**
     * Registers an arbitrary {@link SelectableChannel}, not necessarily created by Netty, to the {@link Selector}
     * of this event loop.  Once the specified {@link SelectableChannel} is registered, the specified {@code task} will
     * be executed by this event loop when the {@link SelectableChannel} is ready.
     */
    public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task) {
        if (ch == null) {
            throw new NullPointerException("ch");
        }
        if (interestOps == 0) {
            throw new IllegalArgumentException("interestOps must be non-zero.");
        }
        if ((interestOps & ~ch.validOps()) != 0) {
            throw new IllegalArgumentException(
                    "invalid interestOps: " + interestOps + "(validOps: " + ch.validOps() + ')');
        }
        if (task == null) {
            throw new NullPointerException("task");
        }

        if (isShutdown()) {
            throw new IllegalStateException("event loop shut down");
        }

        try {
            ch.register(selector, interestOps, task);
        } catch (Exception e) {
            throw new EventLoopException("failed to register a channel", e);
        }
    }


如果忘记了NIO的Selector的,可以自己去复习下。
这些EventLoop中的execute方法添加任务,在SingleEventLoop中的run方法则是消耗这些task,注意是for(;;)。
如果是NIO,则是处理SelectionKey
这就是SingleThreadEventLoop的一些东西。除此之外,自然还有MultiThreadEventLoop这样的,下一次再说。
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics