标签: Selector

Java NIO:Buffer、Channel 和 Selector

转载自:http://www.importnew.com/28007.html

本文将介绍 Java NIO 中三大组件 Buffer、Channel、Selector 的使用。

本来要一起介绍非阻塞 IO 和 JDK7 的异步 IO 的,不过因为之前的文章真的太长了,有点影响读者阅读,所以这里将它们放到另一篇文章中进行介绍。

Buffer

一个 Buffer 本质上是内存中的一块,我们可以将数据写入这块内存,之后从这块内存获取数据。

java.nio 定义了以下几个 Buffer 的实现,这个图读者应该也在不少地方见过了吧。

6

其实核心是*后的 ByteBuffer,前面的一大串类只是包装了一下它而已,我们使用*多的通常也是 ByteBuffer。

我们应该将 Buffer 理解为一个数组,IntBuffer、CharBuffer、DoubleBuffer 等分别对应 int[]、char[]、double[] 等。

MappedByteBuffer 用于实现内存映射文件,也不是本文关注的重点。

我觉得操作 Buffer 和操作数组、类集差不多,只不过大部分时候我们都把它放到了 NIO 的场景里面来使用而已。下面介绍 Buffer 中的几个重要属性和几个重要方法。

position、limit、capacity

就像数组有数组容量,每次访问元素要指定下标,Buffer 中也有几个重要属性:position、limit、capacity。

5

*好理解的当然是 capacity,它代表这个缓冲区的容量,一旦设定就不可以更改。比如 capacity 为 1024 的 IntBuffer,代表其一次可以存放 1024 个 int 类型的值。一旦 Buffer 的容量达到 capacity,需要清空 Buffer,才能重新写入值。

position 和 limit 是变化的,我们分别看下读和写操作下,它们是如何变化的。

position 的初始值是 0,每往 Buffer 中写入一个值,position 就自动加 1,代表下一次的写入位置。读操作的时候也是类似的,每读一个值,position 就自动加 1。

从写操作模式到读操作模式切换的时候(flip),position 都会归零,这样就可以从头开始读写了。

Limit:写操作模式下,limit 代表的是*大能写入的数据,这个时候 limit 等于 capacity。写结束后,切换到读模式,此时的 limit 等于 Buffer 中实际的数据大小,因为 Buffer 不一定被写满了。

7

初始化 Buffer

每个 Buffer 实现类都提供了一个静态方法 allocate(int capacity) 帮助我们快速实例化一个 Buffer。如:

1

2

3

4

ByteBuffer byteBuf = ByteBuffer.allocate(1024);

IntBuffer intBuf = IntBuffer.allocate(1024);

LongBuffer longBuf = LongBuffer.allocate(1024);

// ...

另外,我们经常使用 wrap 方法来初始化一个 Buffer。

1

2

3

public static ByteBuffer wrap(byte[] array) {

    ...

}

填充 Buffer

各个 Buffer 类都提供了一些 put 方法用于将数据填充到 Buffer 中,如 ByteBuffer 中的几个 put 方法:

1

2

3

4

5

6

7

// 填充一个 byte 值

public abstract ByteBuffer put(byte b);

// 在指定位置填充一个 int 值

public abstract ByteBuffer put(int index, byte b);

// 将一个数组中的值填充进去

public final ByteBuffer put(byte[] src) {...}

public ByteBuffer put(byte[] src, int offset, int length) {...}

上述这些方法需要自己控制 Buffer 大小,不能超过 capacity,超过会抛 java.nio.BufferOverflowException 异常。

对于 Buffer 来说,另一个常见的操作中就是,我们要将来自 Channel 的数据填充到 Buffer 中,在系统层面上,这个操作我们称为读操作,因为数据是从外部(文件或网络等)读到内存中。

1 int num = channel.read(buf);

上述方法会返回从 Channel 中读入到 Buffer 的数据大小。

提取 Buffer 中的值

前面介绍了写操作,每写入一个值,position 的值都需要加 1,所以 position *后会指向*后一次写入的位置的后面一个,如果 Buffer 写满了,那么 position 等于 capacity(position 从 0 开始)。

如果要读 Buffer 中的值,需要切换模式,从写入模式切换到读出模式。注意,通常在说 NIO 的读操作的时候,我们说的是从 Channel 中读数据到 Buffer 中,对应的是对 Buffer 的写入操作,初学者需要理清楚这个。

调用 Buffer 的 flip() 方法,可以进行模式切换。其实这个方法也就是设置了一下 position 和 limit 值罢了。

1

2

3

4

5

6

public final Buffer flip() {

    limit = position; // 将 limit 设置为实际写入的数据数量

    position = 0// 重置 position 为 0

    mark = -1// mark 之后再说

    return this;

}

对应写入操作的一系列 put 方法,读操作提供了一系列的 get 方法:

1

2

3

4

5

6

// 根据 position 来获取数据

public abstract byte get();

// 获取指定位置的数据

public abstract byte get(int index);

// 将 Buffer 中的数据写入到数组中

public ByteBuffer get(byte[] dst)

附一个经常使用的方法:

1 new String(buffer.array()).trim();

当然了,除了将数据从 Buffer 取出来使用,更常见的操作是将我们写入的数据传输到 Channel 中,如通过 FileChannel 将数据写入到文件中,通过 SocketChannel 将数据写入网络发送到远程机器等。对应的,这种操作,我们称之为写操作。

1 int num = channel.write(buf);

mark() & reset()

除了 position、limit、capacity 这三个基本的属性外,还有一个常用的属性就是 mark。

mark 用于临时保存 position 的值,每次调用 mark() 方法都会将 mark 设值为当前的 position,便于后续需要的时候使用。

1

2

3

4

public final Buffer mark() {

    mark = position;

    return this;

}

那到底什么时候用呢?考虑以下场景,我们在 position 为 5 的时候,先 mark() 一下,然后继续往下读,读到第 10 的时候,我想重新回到 position 为 5 的地方重新来一遍,那只要调一下 reset() 方法,position 就回到 5 了。

1

2

3

4

5

6

7

public final Buffer reset() {

    int m = mark;

    if (m < 0)

        throw new InvalidMarkException();

    position = m;

    return this;

}

rewind() & clear() & compact()

rewind():会重置 position 为 0,通常用于重新从头读写 Buffer。

1

2

3

4

5

public final Buffer rewind() {

    position = 0;

    mark = -1;

    return this;

}

clear():有点重置 Buffer 的意思,相当于重新实例化了一样。

通常,我们会先填充 Buffer,然后从 Buffer 读取数据,之后我们再重新往里填充新的数据,我们一般在重新填充之前先调用 clear()。

1

2

3

4

5

6

public final Buffer clear() {

    position = 0;

    limit = capacity;

    mark = -1;

    return this;

}

compact():和 clear() 一样的是,它们都是在准备往 Buffer 填充新的数据之前调用。

前面说的 clear() 方法会重置几个属性,但是我们要看到,clear() 方法并不会将 Buffer 中的数据清空,只不过后续的写入会覆盖掉原来的数据,也就相当于清空了数据了。

而 compact() 方法有点不一样,调用这个方法以后,会先处理还没有读取的数据,也就是 position 到 limit 之间的数据(还没有读过的数据),先将这些数据移到左边,然后在这个基础上再开始写入。很明显,此时 limit 还是等于 capacity,position 指向原来数据的右边。

Channel

所有的 NIO 操作始于通道,通道是数据来源或数据写入的目的地,主要地,我们将关心 java.nio 包中实现的以下几个 Channel:

8

  • FileChannel:文件通道,用于文件的读和写
  • DatagramChannel:用于 UDP 连接的接收和发送
  • SocketChannel:把它理解为 TCP 连接通道,简单理解就是 TCP 客户端
  • ServerSocketChannel:TCP 对应的服务端,用于监听某个端口进来的请求

这里不是很理解这些也没关系,后面介绍了代码之后就清晰了。还有,我们*应该关注,也是后面将会重点介绍的是 SocketChannel 和 ServerSocketChannel。

Channel 经常翻译为通道,类似 IO 中的流,用于读取和写入。它与前面介绍的 Buffer 打交道,读操作的时候将 Channel 中的数据填充到 Buffer 中,而写操作时将 Buffer 中的数据写入到 Channel 中。

9

10

至少读者应该记住一点,这两个方法都是 channel 实例的方法。

FileChannel

我想文件操作对于大家来说应该是*熟悉的,不过我们在说 NIO 的时候,其实 FileChannel 并不是关注的重点。而且后面我们说非阻塞的时候会看到,FileChannel 是不支持非阻塞的。

这里算是简单介绍下常用的操作吧,感兴趣的读者瞄一眼就是了。

初始化:

1

2

FileInputStream inputStream = new FileInputStream(new File("/data.txt"));

FileChannel fileChannel = inputStream.getChannel();

当然了,我们也可以从 RandomAccessFile#getChannel 来得到 FileChannel。

读取文件内容:

1

2

3

ByteBuffer buffer = ByteBuffer.allocate(1024);

 

int num = fileChannel.read(buffer);

前面我们也说了,所有的 Channel 都是和 Buffer 打交道的。

写入文件内容:

1

2

3

4

5

6

7

8

ByteBuffer buffer = ByteBuffer.allocate(1024);

buffer.put("随机写入一些内容到 Buffer 中".getBytes());

// Buffer 切换为读模式

buffer.flip();

while(buffer.hasRemaining()) {

    // 将 Buffer 中的内容写入文件

    fileChannel.write(buffer);

}

SocketChannel

我们前面说了,我们可以将 SocketChannel 理解成一个 TCP 客户端。虽然这么理解有点狭隘,因为我们在介绍 ServerSocketChannel 的时候会看到另一种使用方式。

打开一个 TCP 连接:

1 SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("https://www.javadoop.com"80));

当然了,上面的这行代码等价于下面的两行:

1

2

3

4

// 打开一个通道

SocketChannel socketChannel = SocketChannel.open();

// 发起连接

socketChannel.connect(new InetSocketAddress("https://www.javadoop.com"80));

SocketChannel 的读写和 FileChannel 没什么区别,就是操作缓冲区。

1

2

3

4

5

6

7

// 读取数据

socketChannel.read(buffer);

 

// 写入数据到网络连接中

while(buffer.hasRemaining()) {

    socketChannel.write(buffer);  

}

不要在这里停留太久,先继续往下走。

ServerSocketChannel

之前说 SocketChannel 是 TCP 客户端,这里说的 ServerSocketChannel 就是对应的服务端。

ServerSocketChannel 用于监听机器端口,管理从这个端口进来的 TCP 连接。

1

2

3

4

5

6

7

8

9

// 实例化

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

// 监听 8080 端口

serverSocketChannel.socket().bind(new InetSocketAddress(8080));

 

while (true) {

    // 一旦有一个 TCP 连接进来,就对应创建一个 SocketChannel 进行处理

    SocketChannel socketChannel = serverSocketChannel.accept();

}

这里我们可以看到 SocketChannel 的第二个实例化方式

到这里,我们应该能理解 SocketChannel 了,它不仅仅是 TCP 客户端,它代表的是一个网络通道,可读可写。

ServerSocketChannel 不和 Buffer 打交道了,因为它并不实际处理数据,它一旦接收到请求后,实例化 SocketChannel,之后在这个连接通道上的数据传递它就不管了,因为它需要继续监听端口,等待下一个连接。

DatagramChannel

UDP 和 TCP 不一样,DatagramChannel 一个类处理了服务端和客户端。

科普一下,UDP 是面向无连接的,不需要和对方握手,不需要通知对方,就可以直接将数据包投出去,至于能不能送达,它是不知道的

监听端口:

1

2

3

4

5

6

DatagramChannel channel = DatagramChannel.open();

channel.socket().bind(new InetSocketAddress(9090));

ByteBuffer buf = ByteBuffer.allocate(48);

buf.clear();

 

channel.receive(buf);

发送数据:

1

2

3

4

5

6

7

8

9

String newData = "New String to write to file..."

                    + System.currentTimeMillis();

 

ByteBuffer buf = ByteBuffer.allocate(48);

buf.clear();

buf.put(newData.getBytes());

buf.flip();

 

int bytesSent = channel.send(buf, new InetSocketAddress("jenkov.com"80));

Selector

NIO 三大组件就剩 Selector 了,Selector 建立在非阻塞的基础之上,大家经常听到的 多路复用 在 Java 世界中指的就是它,用于实现一个线程管理多个 Channel。

读者在这一节不能消化 Selector 也没关系,因为后续在介绍非阻塞 IO 的时候还得说到这个,这里先介绍一些基本的接口操作。

  • 首先,我们开启一个 Selector。你们爱翻译成选择器也好,多路复用器也好。
1 Selector selector = Selector.open();
  • 将 Channel 注册到 Selector 上。前面我们说了,Selector 建立在非阻塞模式之上,所以注册到 Selector 的 Channel 必须要支持非阻塞模式,FileChannel 不支持非阻塞,我们这里讨论*常见的 SocketChannel 和 ServerSocketChannel。
1

2

3

4

// 将通道设置为非阻塞模式,因为默认都是阻塞模式的

channel.configureBlocking(false);

// 注册

SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

 

register 方法的第二个 int 型参数(使用二进制的标记位)用于表明需要监听哪些感兴趣的事件,共以下四种事件:

  • SelectionKey.OP_READ

    对应 00000001,通道中有数据可以进行读取

  • SelectionKey.OP_WRITE

    对应 00000100,可以往通道中写入数据

  • SelectionKey.OP_CONNECT

    对应 00001000,成功建立 TCP 连接

  • SelectionKey.OP_ACCEPT

    对应 00010000,接受 TCP 连接

我们可以同时监听一个 Channel 中的发生的多个事件,比如我们要监听 ACCEPT 和 READ 事件,那么指定参数为二进制的 00010001 即十进制数值 17 即可。

注册方法返回值是 SelectionKey 实例,它包含了 Channel 和 Selector 信息,也包括了一个叫做 Interest Set 的信息,即我们设置的我们感兴趣的正在监听的事件集合。

  • 调用 select() 方法获取通道信息。用于判断是否有我们感兴趣的事件已经发生了。

Selector 的操作就是以上 3 步,这里来一个简单的示例,大家看一下就好了。之后在介绍非阻塞 IO 的时候,会演示一份可执行的示例代码。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

Selector selector = Selector.open();

 

channel.configureBlocking(false);

 

SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

 

while(true) {

  // 判断是否有事件准备好

  int readyChannels = selector.select();

  if(readyChannels == 0continue;

 

  // 遍历

  Set<SelectionKey> selectedKeys = selector.selectedKeys();

  Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

  while(keyIterator.hasNext()) {

    SelectionKey key = keyIterator.next();

 

    if(key.isAcceptable()) {

        // a connection was accepted by a ServerSocketChannel.

 

    else if (key.isConnectable()) {

        // a connection was established with a remote server.

 

    else if (key.isReadable()) {

        // a channel is ready for reading

 

    else if (key.isWritable()) {

        // a channel is ready for writing

    }

 

    keyIterator.remove();

  }

}

小结

到此为止,介绍了 Buffer、Channel 和 Selector 的常见接口。

Buffer 和数组差不多,它有 position、limit、capacity 几个重要属性。put() 一下数据、flip() 切换到读模式、然后用 get() 获取数据、clear() 一下清空数据、重新回到 put() 写入数据。

Channel 基本上只和 Buffer 打交道,*重要的接口就是 channel.read(buffer) 和 channel.write(buffer)。

Selector 用于实现非阻塞 IO,这里仅仅介绍接口使用。

深入浅出NIO之Selector实现原理

转载自:https://www.jianshu.com/p/0d497fe5484a

前言

Java NIO 由以下几个核心部分组成:
1、Buffer
2、Channel
3、Selector

Buffer和Channel在深入浅出NIO之Channel、Buffer一文中已经介绍过,本文主要讲解NIO的Selector实现原理。

之前进行socket编程时,accept方法会一直阻塞,直到有客户端请求的到来,并返回socket进行相应的处理。整个过程是流水线的,处理完一个请求,才能去获取并处理后面的请求,当然也可以把获取socket和处理socket的过程分开,一个线程负责accept,一个线程池负责处理请求。

但NIO提供了更好的解决方案,采用选择器(Selector)返回已经准备好的socket,并按顺序处理,基于通道(Channel)和缓冲区(Buffer)来进行数据的传输。

Selector

这里出来一个新概念,selector,具体是一个什么样的东西?

想想一个场景:在一个养鸡场,有这么一个人,每天的工作就是不停检查几个特殊的鸡笼,如果有鸡进来,有鸡出去,有鸡生蛋,有鸡生病等等,就把相应的情况记录下来,如果鸡场的负责人想知道情况,只需要询问那个人即可。

在这里,这个人就相当Selector,每个鸡笼相当于一个SocketChannel,每个线程通过一个Selector可以管理多个SocketChannel。

%title插图%num

为了实现Selector管理多个SocketChannel,必须将具体的SocketChannel对象注册到Selector,并声明需要监听的事件(这样Selector才知道需要记录什么数据),一共有4种事件:

1、connect:客户端连接服务端事件,对应值为SelectionKey.OP_CONNECT(8)
2、accept:服务端接收客户端连接事件,对应值为SelectionKey.OP_ACCEPT(16)
3、read:读事件,对应值为SelectionKey.OP_READ(1)
4、write:写事件,对应值为SelectionKey.OP_WRITE(4)

这个很好理解,每次请求到达服务器,都是从connect开始,connect成功后,服务端开始准备accept,准备就绪,开始读数据,并处理,*后写回数据返回。

所以,当SocketChannel有对应的事件发生时,Selector都可以观察到,并进行相应的处理。

服务端代码

为了更好的理解,先看一段服务端的示例代码

  1. ServerSocketChannel serverChannel = ServerSocketChannel.open();
  2. serverChannel.configureBlocking(false);
  3. serverChannel.socket().bind(new InetSocketAddress(port));
  4. Selector selector = Selector.open();
  5. serverChannel.register(selector, SelectionKey.OP_ACCEPT);
  6. while(true){
  7. int n = selector.select();
  8. if (n == 0) continue;
  9. Iterator ite = this.selector.selectedKeys().iterator();
  10. while(ite.hasNext()){
  11. SelectionKey key = (SelectionKey)ite.next();
  12. if (key.isAcceptable()){
  13. SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept();
  14. clntChan.configureBlocking(false);
  15. //将选择器注册到连接到的客户端信道,
  16. //并指定该信道key值的属性为OP_READ,
  17. //同时为该信道指定关联的附件
  18. clntChan.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufSize));
  19. }
  20. if (key.isReadable()){
  21. handleRead(key);
  22. }
  23. if (key.isWritable() && key.isValid()){
  24. handleWrite(key);
  25. }
  26. if (key.isConnectable()){
  27. System.out.println(“isConnectable = true”);
  28. }
  29. ite.remove();
  30. }
  31. }

服务端操作过程

1、创建ServerSocketChannel实例,并绑定指定端口;
2、创建Selector实例;
3、将serverSocketChannel注册到selector,并指定事件OP_ACCEPT,*底层的socket通过channel和selector建立关联;
4、如果没有准备好的socket,select方法会被阻塞一段时间并返回0;
5、如果底层有socket已经准备好,selector的select方法会返回socket的个数,而且selectedKeys方法会返回socket对应的事件(connect、accept、read or write);
6、根据事件类型,进行不同的处理逻辑;

在步骤3中,selector只注册了serverSocketChannel的OP_ACCEPT事件
1、如果有客户端A连接服务,执行select方法时,可以通过serverSocketChannel获取客户端A的socketChannel,并在selector上注册socketChannel的OP_READ事件。
2、如果客户端A发送数据,会触发read事件,这样下次轮询调用select方法时,就能通过socketChannel读取数据,同时在selector上注册该socketChannel的OP_WRITE事件,实现服务器往客户端写数据。

Selector实现原理

SocketChannel、ServerSocketChannel和Selector的实例初始化都通过SelectorProvider类实现,其中Selector是整个NIO Socket的核心实现。

  1. public static SelectorProvider provider() {
  2. synchronized (lock) {
  3. if (provider != null)
  4. return provider;
  5. return AccessController.doPrivileged(
  6. new PrivilegedAction<SelectorProvider>() {
  7. public SelectorProvider run() {
  8. if (loadProviderFromProperty())
  9. return provider;
  10. if (loadProviderAsService())
  11. return provider;
  12. provider = sun.nio.ch.DefaultSelectorProvider.create();
  13. return provider;
  14. }
  15. });
  16. }
  17. }

SelectorProvider在windows和linux下有不同的实现,provider方法会返回对应的实现。

这里不禁要问,Selector是如何做到同时管理多个socket?

下面我们看看Selector的具体实现,Selector初始化时,会实例化PollWrapper、SelectionKeyImpl数组和Pipe。

  1. WindowsSelectorImpl(SelectorProvider sp) throws IOException {
  2. super(sp);
  3. pollWrapper = new PollArrayWrapper(INIT_CAP);
  4. wakeupPipe = Pipe.open();
  5. wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();
  6. // Disable the Nagle algorithm so that the wakeup is more immediate
  7. SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
  8. (sink.sc).socket().setTcpNoDelay(true);
  9. wakeupSinkFd = ((SelChImpl)sink).getFDVal();
  10. pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
  11. }

pollWrapper用Unsafe类申请一块物理内存pollfd,存放socket句柄fdVal和events,其中pollfd共8位,0-3位保存socket句柄,4-7位保存events。

%title插图%num

%title插图%num

 

pollWrapper提供了fdVal和event数据的相应操作,如添加操作通过Unsafe的putInt和putShort实现。

  1. void putDescriptor(int i, int fd) {
  2. pollArray.putInt(SIZE_POLLFD * i + FD_OFFSET, fd);
  3. }
  4. void putEventOps(int i, int event) {
  5. pollArray.putShort(SIZE_POLLFD * i + EVENT_OFFSET, (short)event);
  6. }

先看看serverChannel.register(selector, SelectionKey.OP_ACCEPT)是如何实现的

  1. public final SelectionKey register(Selector sel, int ops, Object att)
  2. throws ClosedChannelException {
  3. synchronized (regLock) {
  4. SelectionKey k = findKey(sel);
  5. if (k != null) {
  6. k.interestOps(ops);
  7. k.attach(att);
  8. }
  9. if (k == null) {
  10. // New registration
  11. synchronized (keyLock) {
  12. if (!isOpen())
  13. throw new ClosedChannelException();
  14. k = ((AbstractSelector)sel).register(this, ops, att);
  15. addKey(k);
  16. }
  17. }
  18. return k;
  19. }
  20. }
  1. 如果该channel和selector已经注册过,则直接添加事件和附件。
  2. 否则通过selector实现注册过程。
  1. protected final SelectionKey register(AbstractSelectableChannel ch,
  2. int ops, Object attachment) {
  3. if (!(ch instanceof SelChImpl))
  4. throw new IllegalSelectorException();
  5. SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
  6. k.attach(attachment);
  7. synchronized (publicKeys) {
  8. implRegister(k);
  9. }
  10. k.interestOps(ops);
  11. return k;
  12. }
  13. protected void implRegister(SelectionKeyImpl ski) {
  14. synchronized (closeLock) {
  15. if (pollWrapper == null)
  16. throw new ClosedSelectorException();
  17. growIfNeeded();
  18. channelArray[totalChannels] = ski;
  19. ski.setIndex(totalChannels);
  20. fdMap.put(ski);
  21. keys.add(ski);
  22. pollWrapper.addEntry(totalChannels, ski);
  23. totalChannels++;
  24. }
  25. }

1、以当前channel和selector为参数,初始化SelectionKeyImpl 对象selectionKeyImpl ,并添加附件attachment。
2、如果当前channel的数量totalChannels等于SelectionKeyImpl数组大小,对SelectionKeyImpl数组和pollWrapper进行扩容操作。
3、如果totalChannels % MAX_SELECTABLE_FDS == 0,则多开一个线程处理selector。
4、pollWrapper.addEntry将把selectionKeyImpl中的socket句柄添加到对应的pollfd。
5、k.interestOps(ops)方法*终也会把event添加到对应的pollfd。

所以,不管serverSocketChannel,还是socketChannel,在selector注册的事件,*终都保存在pollArray中。

接着,再来看看selector中的select是如何实现一次获取多个有事件发生的channel的,底层由selector实现类的doSelect方法实现,如下:

  1. protected int doSelect(long timeout) throws IOException {
  2. if (channelArray == null)
  3. throw new ClosedSelectorException();
  4. this.timeout = timeout; // set selector timeout
  5. processDeregisterQueue();
  6. if (interruptTriggered) {
  7. resetWakeupSocket();
  8. return 0;
  9. }
  10. // Calculate number of helper threads needed for poll. If necessary
  11. // threads are created here and start waiting on startLock
  12. adjustThreadsCount();
  13. finishLock.reset(); // reset finishLock
  14. // Wakeup helper threads, waiting on startLock, so they start polling.
  15. // Redundant threads will exit here after wakeup.
  16. startLock.startThreads();
  17. // do polling in the main thread. Main thread is responsible for
  18. // first MAX_SELECTABLE_FDS entries in pollArray.
  19. try {
  20. begin();
  21. try {
  22. subSelector.poll();
  23. } catch (IOException e) {
  24. finishLock.setException(e); // Save this exception
  25. }
  26. // Main thread is out of poll(). Wakeup others and wait for them
  27. if (threads.size() > 0)
  28. finishLock.waitForHelperThreads();
  29. } finally {
  30. end();
  31. }
  32. // Done with poll(). Set wakeupSocket to nonsignaled for the next run.
  33. finishLock.checkForException();
  34. processDeregisterQueue();
  35. int updated = updateSelectedKeys();
  36. // Done with poll(). Set wakeupSocket to nonsignaled for the next run.
  37. resetWakeupSocket();
  38. return updated;
  39. }

其中 subSelector.poll() 是select的核心,由native函数poll0实现,readFds、writeFds 和exceptFds数组用来保存底层select的结果,数组的*个位置都是存放发生事件的socket的总数,其余位置存放发生事件的socket句柄fd。

  1. private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];
  2. private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1];
  3. private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1];
  4. private int poll() throws IOException{ // poll for the main thread
  5. return poll0(pollWrapper.pollArrayAddress,
  6. Math.min(totalChannels, MAX_SELECTABLE_FDS),
  7. readFds, writeFds, exceptFds, timeout);
  8. }

执行 selector.select() ,poll0函数把指向socket句柄和事件的内存地址传给底层函数。
1、如果之前没有发生事件,程序就阻塞在select处,当然不会一直阻塞,因为epoll在timeout时间内如果没有事件,也会返回;
2、一旦有对应的事件发生,poll0方法就会返回;
3、processDeregisterQueue方法会清理那些已经cancelled的SelectionKey;
4、updateSelectedKeys方法统计有事件发生的SelectionKey数量,并把符合条件发生事件的SelectionKey添加到selectedKeys哈希表中,提供给后续使用。

在早期的JDK1.4和1.5 update10版本之前,Selector基于select/poll模型实现,是基于IO复用技术的非阻塞IO,不是异步IO。在JDK1.5 update10和linux core2.6以上版本,sun优化了Selctor的实现,底层使用epoll替换了select/poll。

read实现

通过遍历selector中的SelectionKeyImpl数组,获取发生事件的socketChannel对象,其中保存了对应的socket,实现如下

  1. public int read(ByteBuffer buf) throws IOException {
  2. if (buf == null)
  3. throw new NullPointerException();
  4. synchronized (readLock) {
  5. if (!ensureReadOpen())
  6. return1;
  7. int n = 0;
  8. try {
  9. begin();
  10. synchronized (stateLock) {
  11. if (!isOpen()) {
  12. return 0;
  13. }
  14. readerThread = NativeThread.current();
  15. }
  16. for (;;) {
  17. n = IOUtil.read(fd, buf, –1, nd);
  18. if ((n == IOStatus.INTERRUPTED) && isOpen()) {
  19. // The system call was interrupted but the channel
  20. // is still open, so retry
  21. continue;
  22. }
  23. return IOStatus.normalize(n);
  24. }
  25. } finally {
  26. readerCleanup(); // Clear reader thread
  27. // The end method, which
  28. end(n > 0 || (n == IOStatus.UNAVAILABLE));
  29. // Extra case for socket channels: Asynchronous shutdown
  30. //
  31. synchronized (stateLock) {
  32. if ((n <= 0) && (!isInputOpen))
  33. return IOStatus.EOF;
  34. }
  35. assert IOStatus.check(n);
  36. }
  37. }
  38. }

*终通过Buffer的方式读取socket的数据。

wakeup实现

  1. public Selector wakeup() {
  2. synchronized (interruptLock) {
  3. if (!interruptTriggered) {
  4. setWakeupSocket();
  5. interruptTriggered = true;
  6. }
  7. }
  8. return this;
  9. }
  10. // Sets Windows wakeup socket to a signaled state.
  11. private void setWakeupSocket() {
  12. setWakeupSocket0(wakeupSinkFd);
  13. }
  14. private native void setWakeupSocket0(int wakeupSinkFd);

看来wakeupSinkFd这个变量是为wakeup方法使用的。
其中interruptTriggered为中断已触发标志,当pollWrapper.interrupt()之后,该标志即为true了;因为这个标志,连续两次wakeup,只会有一次效果。

epoll原理

epoll是Linux下的一种IO多路复用技术,可以非常高效的处理数以百万计的socket句柄。

三个epoll相关的系统调用:

  • int epoll_create(int size)
    epoll_create建立一个epoll对象。参数size是内核保证能够正确处理的*大句柄数,多于这个*大数时内核可不保证效果。
  • int epoll_ctl(int epfd, int op, int fd, struct epoll_event event)
    epoll_ctl可以操作epoll_create创建的epoll,如将socket句柄加入到epoll中让其监控,或把epoll正在监控的某个socket句柄移出epoll。
  • int epoll_wait(int epfd, struct epoll_event events,int maxevents, int timeout)
    epoll_wait在调用时,在给定的timeout时间内,所监控的句柄中有事件发生时,就返回用户态的进程。

epoll内部实现大概如下:

  1. epoll初始化时,会向内核注册一个文件系统,用于存储被监控的句柄文件,调用epoll_create时,会在这个文件系统中创建一个file节点。同时epoll会开辟自己的内核高速缓存区,以红黑树的结构保存句柄,以支持快速的查找、插入、删除。还会再建立一个list链表,用于存储准备就绪的事件。
  2. 当执行epoll_ctl时,除了把socket句柄放到epoll文件系统里file对象对应的红黑树上之外,还会给内核中断处理程序注册一个回调函数,告诉内核,如果这个句柄的中断到了,就把它放到准备就绪list链表里。所以,当一个socket上有数据到了,内核在把网卡上的数据copy到内核中后,就把socket插入到就绪链表里。
  3. 当epoll_wait调用时,仅仅观察就绪链表里有没有数据,如果有数据就返回,否则就sleep,超时时立刻返回。
友情链接: SITEMAP | 旋风加速器官网 | 旋风软件中心 | textarea | 黑洞加速器 | jiaohess | 老王加速器 | 烧饼哥加速器 | 小蓝鸟 | tiktok加速器 | 旋风加速度器 | 旋风加速 | quickq加速器 | 飞驰加速器 | 飞鸟加速器 | 狗急加速器 | hammer加速器 | trafficace | 原子加速器 | 葫芦加速器 | 麦旋风 | 油管加速器 | anycastly | INS加速器 | INS加速器免费版 | 免费vqn加速外网 | 旋风加速器 | 快橙加速器 | 啊哈加速器 | 迷雾通 | 优途加速器 | 海外播 | 坚果加速器 | 海外vqn加速 | 蘑菇加速器 | 毛豆加速器 | 接码平台 | 接码S | 西柚加速器 | 快柠檬加速器 | 黑洞加速 | falemon | 快橙加速器 | anycast加速器 | ibaidu | moneytreeblog | 坚果加速器 | 派币加速器 | 飞鸟加速器 | 毛豆APP | PIKPAK | 安卓vqn免费 | 一元机场加速器 | 一元机场 | 老王加速器 | 黑洞加速器 | 白石山 | 小牛加速器 | 黑洞加速 | 迷雾通官网 | 迷雾通 | 迷雾通加速器 | 十大免费加速神器 | 猎豹加速器 | 蚂蚁加速器 | 坚果加速器 | 黑洞加速 | 银河加速器 | 猎豹加速器 | 海鸥加速器 | 芒果加速器 | 小牛加速器 | 极光加速器 | 黑洞加速 | movabletype中文网 | 猎豹加速器官网 | 烧饼哥加速器官网 | 旋风加速器度器 | 哔咔漫画 | PicACG | 雷霆加速