0%

Java-NIO-选择器基础

一、选择器基础

从基础层面上看,选择器提供了询问通道是否已经准备好执行每个I/O操作的能力。在与SelectableChannel联合使用时,选择器提供了这种服务。就绪选择的真正意义在于潜在的大量的通道可以同时进行就绪准备的检查。调用者可以轻松地决定多个通道的哪一个准备好要运行。
对于选择有两种方法被激发:

  1. selector线程处于休眠状态,等待有一个或多个通道处于就绪状态
  2. 周期性轮询选择器,看在上次之后是否有新就绪的通道

在开发中每次询问每个通道是否就绪的时候需要遍历每一个候选通道并按照顺序进行检查是有问题的。这会使得在检查每个通道是够就绪时都至少进行一次系统调用,这种代价是十分昂贵的,但是主要的问题是,这种检查不是原子性的,可能列表中的某一个通道在被遍历检查后达到就绪状态,但是在下一次轮询之前我们无法感知到变化。而且除了不断地遍历之外没有其他的选择。

这就是为什么传统的监控多个Socket的解决方案就是通过多线程,使得线程可以在read()调用中阻塞直到可用。这就是使用阻塞的线程作为socket监控器,并将java虚拟机的线程调度当作了通知机制。而线程的增长会使得系统的复杂性飙升,性能损耗严重。

1.1 选择器、可选择通道、选择键类

1.1.1 选择器(Selector)

选择器类管理者一个被注册的通道集合的信息和他们的就绪状态。通道是和选择器一起被注册的,并通过选择器来更新通道的就绪状态。当这么做时可以选择将激发的选择器线程挂起,直到有就绪的通道。
下面我们看一下Selector的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public abstract class Selector implements Closeable {

/**
* Initializes a new instance of this class.
*/
protected Selector() { }

/**
* 创建了一个Selector
*/
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}

/**
* 返回当前选择器是否打开
*/
public abstract boolean isOpen();

/**
* 返回当前通道的Provider
*/
public abstract SelectorProvider provider();

/**
* 返回当前所有注册在selector中channel的selectionKey
*/
public abstract Set<SelectionKey> keys();

/**
* 返回注册在selector中等待IO操作(及有事件发生)channel的selectionKey。
*/
public abstract Set<SelectionKey> selectedKeys();

/**
* 非阻塞,只要有通道就绪就立刻返回。
*/
public abstract int selectNow() throws IOException;

/**
* 和select()一样,但最长阻塞时间为timeout毫秒。
*/
public abstract int select(long timeout)
throws IOException;

/**
* 阻塞到至少有一个通道在你注册的事件上就绪了。
*/
public abstract int select() throws IOException;

/**
* 过调用Selector对象的wakeup()方法让处在阻塞状态的select()方法立刻返回。该方法使得选择器上的第一个还没有返回的选择操作立即返回。如果当前没有进行中的选择操作,那么下一次对select()方法的一次调用将立即返回。
*/
public abstract Selector wakeup();

/**
* 关闭当前选择器
*/
public abstract void close() throws IOException;

}

1.1.1.1 常见的集中键集合

在一个刚初始化的 Selector 对象中,下面这几种集合都是空的。

  1. 已注册的键的集合(Registered key set)
    返回的是所有与选择器关联的已经注册的键的集合。并不是所有注册过的键都是仍然有效。可以通过keys()方法返回,并且可能是空的。返回的这个键集合不可以直接进行修改,进行修改会抛出java.lang.UnsupportedOperationException

  2. 已选择的键的集合(Selected key set)
    已选择键的集合是已注册键的集合的子集。这个集合返回的是被判断为已经准备好的通道的集合。并且包含于键的interest集合中的操作。这个集合通过selectedKeys()方法返回(并有可能是空的)。键可以直接从这个集合中移除,但是不能添加。添加会抛出java.lang.UnsupportedOperationException

  3. 已Ready的键的集合(Selected Ready key set)
    ready集合是已选择键的子集。每个键都关联一个已经准备好至少一种操作的通道。每个键都有一个内嵌的 ready集合,指示了所关联的通道已经准备好的操作。

  4. 已取消的键的集合(Cancelled key set)
    已取消的键的集合是已注册键的子集。这个集合包含了cancel()方法被调用过的键(这个键已经被无效化),但是他们还没有被注销。这个集合是选择器对象的私有对象,无法直接访问。

    使用内部的已取消的键的集合来延迟注销,是一种防止线程在取消键时阻塞,并防止与正在进行的选择操作冲突的优化。注销通道是一个潜在的代价很高的操作,这可能需要重新分配资源(请记住,键是与通道相关的,并且可能与它们相关的通道对象之间有复杂的交互)。清理已取消的键,并在选择操作之前和之后立即注销通道,可以消除它们可能正好在选择的过程中执行的潜在棘手问题。这是另一个兼顾健壮性的折中方案。

1.1.1.2
1.1.2 可选择通道(SelectableChannel)

这个抽象类提供了通道的可选择性需要的方法,继承了这个抽象类的类可以被注册到Selector对象上。同时可以指定对那个对选择器而言,哪些操作是感兴趣的。一个通道可以注册到多个选择器上,这么做的话,在更新 interest 集合为指定的值的同时,将返回与之前相同的选择键。而对于一个选择器,一个通道只能被注册一次。通道在被注册到一个选择器上之前,必须先设置为非阻塞模式(通过调用 configureBlocking(false))。
一个例外的情形是当您试图将一个通道注册到一个相关的键已经被取消的选择器上,而通道仍然处于被注册的状态的时候。通道不会在键被取消的时候立即注销。直到下一次操作发生为止,它们仍然会处于被注册的状态。在这种情况下,未检查的CancelledKeyException将会被抛出。请务必在键可能被取消的情况下检查SelectionKey对象的状态。

下面我们看一下SelectableChannel的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
public abstract class SelectableChannel
extends AbstractInterruptibleChannel
implements Channel
{

/**
* Initializes a new instance of this class.
*/
protected SelectableChannel() { }

/**
* 返回创建此通道的Provider
*/
public abstract SelectorProvider provider();

/**
* 返回此通道支持的操作
*/
public abstract int validOps();

// Internal state:
// keySet, may be empty but is never null, typ. a tiny array
// boolean isRegistered, protected by key set
// regLock, lock object to prevent duplicate registrations
// boolean isBlocking, protected by regLock

/**
* 告知此通道当前是否已注册到任何选择器。新创建的通道未注册。
* 由于密钥取消和通道取消注册之间存在固有的延迟,在所有密钥被取消之后,通道可能会保持注册一段时间。通道在关闭后还可以保持注册一段时间。
*/
public abstract boolean isRegistered();
//
// sync(keySet) { return isRegistered; }

/**
* 返回注册键
*/
public abstract SelectionKey keyFor(Selector sel);
//
// sync(keySet) { return findKey(sel); }

/**
* 将通道注册到选择器上,并关联一个附件
*/
public abstract SelectionKey register(Selector sel, int ops, Object att)
throws ClosedChannelException;
//
// sync(regLock) {
// sync(keySet) { look for selector }
// if (channel found) { set interest ops -- may block in selector;
// return key; }
// create new key -- may block somewhere in selector;
// sync(keySet) { add key; }
// attach(attachment);
// return key;
// }

/**
* 将通道注册到选择器上,并关联一个附件
*/
public final SelectionKey register(Selector sel, int ops)
throws ClosedChannelException
{
return register(sel, ops, null);
}

/**
* 调整阻塞模式
*/
public abstract SelectableChannel configureBlocking(boolean block)
throws IOException;
//
// sync(regLock) {
// sync(keySet) { throw IBME if block && isRegistered; }
// change mode;
// }

/**
* 查看通道阻塞模式
*/
public abstract boolean isBlocking();

/**
* 获取锁对象
*/
public abstract Object blockingLock();
}
1.1.3 选择键(SelectionKey)

选择键封装了特定的通道和特定选择器的注册关系。选择捡对象被Selectable.register()返回并提供一个表示这种注册关系的标记。同时选择键也存储了当前注册关系关系的通道操作,以及通道已经准备好的操作。

下面我们看一下SelectionKey的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
public abstract class SelectionKey {

/**
* 返回对应的通道
*/
public abstract SelectableChannel channel();

/**
* 返回对应的选择器
*/
public abstract Selector selector();

/**
* 返回当前key是否可用
*/
public abstract boolean isValid();

/**
* 取消当前关联关系
*/
public abstract void cancel();

/**
* 返回当前键关心的操作
*/
public abstract int interestOps();

/**
* 设置当前键关心的操作
*/
public abstract SelectionKey interestOps(int ops);

/**
* 返回当前键已经就绪的操作
*/
public abstract int readyOps();


public static final int OP_READ = 1 << 0;
public static final int OP_WRITE = 1 << 2;
public static final int OP_CONNECT = 1 << 3;
public static final int OP_ACCEPT = 1 << 4;

/**
* 判断当前通道是否已经准备好读操作
*/
public final boolean isReadable() {
return (readyOps() & OP_READ) != 0;
}

/**
* 判断当前通道是否已经准备好写操作
*/
public final boolean isWritable() {
return (readyOps() & OP_WRITE) != 0;
}

/**
* 判断当前通道是否已经连接好
*/
public final boolean isConnectable() {
return (readyOps() & OP_CONNECT) != 0;
}

/**
* 判断当前通道是否已经准备好接受socket连接
*/
public final boolean isAcceptable() {
return (readyOps() & OP_ACCEPT) != 0;
}

// 关联的附件
private volatile Object attachment = null;

private static final AtomicReferenceFieldUpdater<SelectionKey,Object>
attachmentUpdater = AtomicReferenceFieldUpdater.newUpdater(
SelectionKey.class, Object.class, "attachment"
);

/**
* 设置关联对象
*/
public final Object attach(Object ob) {
return attachmentUpdater.getAndSet(this, ob);
}

/**
* 获取关联对象
*/
public final Object attachment() {
return attachment;
}
}

二、创建选择器

1
2
3
4
5
6
Selector selector = Selector.open();
channel1.register(selector, SelectionKey.OP_READ);
channel2.register(selector, SelectionKey.OP_WRITE);
channel3.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
// Wait up to 10 seconds for a channel to become ready
readyCount = selector.select(10000);

Selector对象是通过调用静态工厂方法open()来实例化的。选择器不是像通道或流(stream)那样的基本I/O对象:数据从来没有通过它们进行传递。类方法open()向SPI发出请求,通过默认的 SelectorProvider对象获取一个新的实例。通过调用一个自定义的SelectorProvider对象的openSelector()方法来创建一个Selector实例也是可行的。大多数情况下,您不需要关心SPI,只需要调用open()方法来创建新的Selector对象。

三、使用选择器

3.1 选择过程

执行任何一种形式的select()中的任何一种方法,选择器都会执行下面这些步骤

  1. 已取消的键的集合将会被检查。如果他是非空,每个已取消的键的集合中的键将从另外两个中移除,并且相关的通道将会被注销。这个步骤结束后,已取消的键的集合将会是空的。
  2. 已注册的键的集合中的键的interest集合将被检查。这个步骤中的执行过后,对interest集合的改动不会影响剩余的检查过程。一旦就绪条件被定下来,底层操作系统将会进行查询,以确定每个通道所关心的操作的真实就绪状态。依赖于特定的select()方法调用,如果没有通道已经准备好,线程可能会在这是进行阻塞,通常会有一个超时值。直到系统调用完成为止。这个过程可能会使得调用线程睡眠一段时间。然后当前通道的就绪状态将确定下来。对于那些还没准备好的通道将不会执行任何的操作。对于操作系统指示至少已经准备好的interest集合中的一种操作的通道,将执行以下两种操作中的一种:
    a. 如果通道的键还没有处于已选择的键的集合中,那么键的ready集合将被清空,然后表示操作系统发现的当前通道已经准备好的操作的比特掩码将会被设置。
    b. 如果键在已选择的键的集合中。键的ready集合将被表示操作系统发现当前已经准备好的操作的比特掩码会更新。所有之前的已经不再是就绪状态的操作不会被清除,一旦键 被放置于选择器的已选择的键的集合中,它的ready集合将是累积的。比特位只会被设置,不会被清理。
    3.步骤2可能会花费很长时间,特别是所激发的线程处于休眠状态时。与该选择器相关的键可能会同时被取消。当步骤2结束时,步骤1将重新执行,以完成任意一个在选择进行的过程中,键已经被取消的通道的注销。
    4.select操作返回的值是ready集合在步骤2中被修改的键的数量,而不是已选择的键的集合中的通道的总数。返回值不是已准备好的通道的总数,而是从上一个select()调用之后进入就绪状态的通道的数量。之前的调用中就绪的,并且在本次调用中仍然就绪的通道不会被计入,而那些在前一次调用中已经就绪但已经不再处于就绪状态的通道也不会被计入。这些通道可能仍然在已选择的键的集合中,但不会被计入返回值中。返回值可能是0。

3.2 停止选择过程

在Selector类中提供了三种从被阻塞的select()方法中优雅退出的能力。

请注意这些方法中的任意一个都不会关闭任何一个相关的通道。中断一个选择器与中断一个通道是不一样的

3.2.1 wakeup()

调用Selector对象的wakeup()方法将使得选择器上的第一个还没有返回的选择操作立即返 回。如果当前没有在进行中的选择,那么下一次对select()方法的一种形式的调用将立即返回。后续的选择操作将正常进行。在选择操作之间多次调用wakeup()方法与调用它一次没有什么不同。
有时这种延迟的唤醒行为并不是您想要的。您可能只想唤醒一个睡眠中的线程,而使得后续的选择继续正常地进行。您可以通过在调用wakeup()方法后调用 selectNow()方法来绕过这个问题。
尽管如此,如果您将您的代码构造为合理地关注于返回值和执行选择集合,那么即使下一个select()方法的调用在没有通道就绪时就立即返回,也应该不会有什么不同。不管怎么说,您应该为可能发生的事件做好准备。

3.2.2 close()

如果选择器的close()方法被调用,那么任何一个在选择操作中阻塞的线程都将被唤醒,就像wakeup()方法被调用了一样。与选择器相关的通道将被注销,而键将被取消。

3.2.2 interrupt()

如果睡眠中的线程的interrupt()方法被调用,它的返回状态将被设置。如果被唤醒的线程之后将试图在通道上执行I/O操作,通道将立即关闭,然后线程将捕捉到一个异常。

3.3 使用简单选择器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class SelectSockets {
public static void main(String[] args) throws Exception {
new SelectSockets().run();
}

private void run() throws Exception {
try (final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
final Selector selector = Selector.open()) {
serverSocketChannel.bind(new InetSocketAddress(8080));
serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
final int n = selector.select();
if (n == 0) {
continue;
}

final Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
final SelectionKey key = iterator.next();
if (key.isAcceptable()) {
final ServerSocketChannel server = (ServerSocketChannel) key.channel();
final SocketChannel socketChannel = server.accept();
System.out.println("接收到连接, IP: " + socketChannel.socket().getRemoteSocketAddress().toString());
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
this.readDataFromSocket(key);
}
iterator.remove();
}
}
}
}

private void readDataFromSocket(SelectionKey key) throws Exception {
final SocketChannel channel = (SocketChannel) key.channel();
final ByteBuffer buffer = ByteBuffer.allocate(128);
channel.read(buffer);
buffer.flip();
System.out.println("从 [ " + channel.socket().getRemoteSocketAddress().toString() + " ] 接收到消息: " + new String(buffer.array(), 0, buffer.remaining()));
buffer.clear();
buffer.put("Hi i'm here".getBytes(StandardCharsets.UTF_8));
buffer.flip();
channel.write(buffer);
channel.close();
}
}

在循环的底部,我们通过调用Iterator(迭代器)对象的remove()方法,将键从已选择的键的集合中移除。键可以直接从selectKeys()返回的Set中移除,但同时需要用Iterator来检查集合,您需要使用迭代器的remove()方法来避免破坏迭代器内部的状态。

3.4 选择器的并发性分析

  1. 选择器对象是线程安全的,但是他们包含的键集合不是。
  2. 通过keys()和selectKeys()返回的键的集合是Selector对象内部的私有的Set对象集合的直接引用。这些集合可能在任意时间被改变。
  3. 可注册键的集合是只读的,修改会抛出java.lang.UnsupportedOperationException。查看过程中如果底层的set被修改,则会抛出java.util.ConcurrentModificationException
  4. 如果想避免并发访问选择器的键的集合的问题,可以现在Selector上进行同步,再在已注册键的集合上进行同步,最后是已选择键的集合。这样就可以合理的进行同步访问了。在多线程的场景下,如果您需要对任何一个键的集合进行修改,都需要按照这种顺序。锁的过程非常重要。如果竞争的线程没有以相同的顺序请求锁,就会有死锁的潜在隐患。
  5. Selector类的close()和select()方法的同步方式是一样的,因此也有一直阻塞的可能性。在选择过程还在进行的过程中,所有对close()调用都会被阻塞,直到选择过程结束,或者执行选择的线程进入睡眠。
  6. 执行选择的线程将会在执行关闭的线程获得锁时立即被唤醒并关闭选择器。
  7. 在不进行同步的情况下,任何时候都有可能关闭一个通道或者取消一个键,然后导致键的状态以及相关通道发生意料之外的改变。一个特定的键的集合中的一个键的存在并不保证键仍然是有效的,或者它相关的通道仍然是打开的。
  8. 关闭通道不应该是一个耗时的操作。NIO设计者们特别想要组织这种可能性:一个线程在关闭一个处于选择操作中的通道时,被阻塞与无限期的等待。

3.5 使用多线程来实现选择器

在单cpu的系统中,只有一个线程运行可以消除线程间的上下文切换带来的系统额外开销。但是在多cpu的系统中。只有一个线程轮流处理时,就会有n-1个cpu处于空闲状态。一个更好的策略是对所有的可选择通道使用一个选择器,并将对就绪通道的服务委托给其他线程。您只用一个线程监控通道的就绪状态并使用一个协调好的工作线程池来处理共接收到的数据。根据部署的条件,线程池的大小是可以调整的(或者它自己进行动态的调整)。

3.5.1 定义线程池

首先我们定义一个线程池,通过这个线程池来获取可用线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class WorkerThreadPool {
private final List<WorkerThread> idle = new LinkedList<>();

public WorkerThreadPool(int poolSize) {
for (int i = 0; i < poolSize; i++) {
final WorkerThread thread = new WorkerThread(this);
thread.setName("Worker " + i);
thread.start();
idle.add(thread);
}
}

public WorkerThread getWorker() {
WorkerThread thread = null;
synchronized (idle) {
if (idle.size() > 0) {
thread = idle.remove(0);
}
return thread;
}
}

public void returnWorker(WorkerThread thread) {
synchronized (idle) {
idle.add(thread);
}
}
}

3.5.2 定义执行线程类

定义实际的线程执行类,用来执行实际的通道读操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
class WorkerThread extends Thread {
private final ByteBuffer buffer = ByteBuffer.allocate(1024);
private final WorkerThreadPool pool;
private SelectionKey key;

public WorkerThread(WorkerThreadPool pool) {
this.pool = pool;
}

@Override
public synchronized void run() {
System.out.println(this.getName() + " is ready");
while (true) {
try {
// 等待线程被唤醒然后读取数据
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
this.interrupt();
}
if (key == null) {
continue;
}
System.out.println(this.getName() + " has bean awakened");
try {
this.drainChannel(key);
} catch (IOException e) {
System.out.println("Caught '" + e + "' closing channel");
// Close channel and nudge selector
try {
key.channel().close();
} catch (IOException ex) {
ex.printStackTrace();
}
key.selector().wakeup();
}
// 读取完成后将key重新置空
key = null;
// 将工作线程重新放入线程池
this.pool.returnWorker(this);
}
}

/**
* 因为在serviceChannel方法之前已经将OP_READ忽略,所以在完成读操作后重新启用OP_READ,并调用选择器的wakeup方法,这样选择器就会继续监控当前通道
*/
private void drainChannel(SelectionKey key) throws IOException {
final SocketChannel channel = (SocketChannel) key.channel();
int count;
buffer.clear();
while ((count = channel.read(buffer)) > 0) {
buffer.flip();
System.out.println("从 [ " + channel.socket().getRemoteSocketAddress().toString() + " ] 接收到消息: " + new String(buffer.array(), 0, buffer.remaining()));
}
channel.write(ByteBuffer.wrap("Hi i'm here".getBytes(StandardCharsets.UTF_8)));
if (count < 0) {
channel.close();
return;
}
// 完成读操作后,再将read操作作为关注操作
key.interestOps(key.interestOps() | SelectionKey.OP_READ);
key.selector().wakeup();
}


/**
* 当前方法和run方法都是同步的,所以在同一时间只有有一个key被线程执行。
* 在唤醒工作线程之前,我们先将key的interest集合中的read操作移除。使选择器忽略当前通道的度准备状态。避免被重复读取
*/
public synchronized void serviceChannel(SelectionKey key) {
this.key = key;
// 避免选择器再循环时再一次关注到read变化
key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));
// 唤醒阻塞的线程
this.notify();
}
}

3.5.3 修改代码读函数实现多线程

接下来我们对读操作的函数进行修改,使用多线程来读操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class SelectSocketThread extends SelectSockets {

private final WorkerThreadPool pool = new WorkerThreadPool(5);
public static void main(String[] argv) throws Exception {
new SelectSocketThread().go(argv);
}

@Override
protected void readDataFromSocket(SelectionKey key) throws Exception {
final WorkerThread worker = pool.getWorker();

if (worker == null) {
// 没有可用线程,直接返回,等待选择器循环调用到此方法,直到有可用线程
return;
}

//调用this将唤醒工作线程,然后返回
worker.serviceChannel(key);
}
}

3.6 实现单线程Reactor模式

之前我们实现的都是简单的多路复用网络,下面我们通过reactor模式来优化服务端的设计。
首先我们需要抽象出来三种组件

  1. Readtor线程: 负责响应IO时间并将实际的处理分发给Handler或者Acceptor处理器。
  2. Handler处理器: 负责执行读写操作
  3. Acceptor处理器: 负责执行客户端连接的建立

img

3.6.1 Handler组件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Handler implements Runnable {
private final SocketChannel channel;

public Handler(SocketChannel channel) {
this.channel = channel;
}

@Override
public void run() {
try {
final ByteBuffer buffer = ByteBuffer.allocate(128);
channel.read(buffer);
buffer.flip();
System.out.println("从 [ " + channel.socket().getRemoteSocketAddress().toString() + " ] 接收到消息: " + new String(buffer.array(), 0, buffer.remaining()));
channel.write(ByteBuffer.wrap("Hi i'm reactor server".getBytes(StandardCharsets.UTF_8)));
} catch (IOException e) {
e.printStackTrace();
}
}
}
3.6.2 Acceptor组件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class Acceptor implements Runnable {

private final ServerSocketChannel serverSocketChannel;
private final Selector selector;

public Acceptor(ServerSocketChannel serverSocketChannel, Selector selector) {
this.serverSocketChannel = serverSocketChannel;
this.selector = selector;
}

@Override
public void run() {
try {
final SocketChannel channel = serverSocketChannel.accept();
System.out.println("客户端已连接,IP地址为:" + channel.getRemoteAddress());
channel.configureBlocking(false);
// 这里在注册时,创建好对应的Handler,这样在Reactor中分发的时候就可以直接调用Handler了
channel.register(selector, SelectionKey.OP_READ, new Handler(channel));
} catch (IOException e) {
e.printStackTrace();
}

}
}
3.6.3 Reactor组件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class Reactor implements Closeable, Runnable {

private final ServerSocketChannel serverSocketChannel;
private final Selector selector;

public Reactor() throws IOException {
this.serverSocketChannel = ServerSocketChannel.open();
this.selector = Selector.open();
}

public static void main(String[] args) {
try(final Reactor reactor = new Reactor()) {
reactor.run();
} catch (IOException e) {
e.printStackTrace();
}
}

@Override
public void run() {
try {
serverSocketChannel.bind(new InetSocketAddress(8080));
serverSocketChannel.configureBlocking(false);
// 这里在注册时,创建好对应的Acceptor,这样在Reactor中分发的时候就可以直接调用Acceptor了
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, new Acceptor(serverSocketChannel, selector));
while (true) {
final int count = selector.select();
System.out.println("监听到 " + count + " 个对象");
final Set<SelectionKey> selectionKeys = selector.selectedKeys();
final Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
this.dispatch(iterator.next());
iterator.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}

private void dispatch(SelectionKey key) {
final Object attach = key.attachment();
if (attach instanceof Runnable) {
// 由于Handler和Acceptor都实现自Runnable接口,这里就统一调用一下
((Runnable) attach).run();
}
}

@Override
public void close() throws IOException {
serverSocketChannel.close();
selector.close();
}
}
3.6.4 单线程Reactor总结

通过Reactor模式我们成功将Selector选择,连接建立,读写操作分离成不同的组件。使得逻辑代码分工更加清晰方便维护。但是单线程始终没有办法应对大量的请求,如果请求量上去了,单线程还是不够用。下面我们对代码进行修改,实现多线程处理。

3.7 实现多线程Reactor模式

3.7.1 单Reactor模式

首先我们看一下多线程情况下的架构
img

对于这种改动我们的修改很简单,就是稍微修改一下Handler类就可以

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class Handler implements Runnable {
private static final ExecutorService POOL = Executors.newFixedThreadPool(10);
private final SocketChannel channel;

public Handler(SocketChannel channel) {
this.channel = channel;
}

@Override
public void run() {
try {
final ByteBuffer buffer = ByteBuffer.allocate(128);
channel.read(buffer);
buffer.flip();
POOL.submit(() -> {
try {
System.out.println("从 [ " + channel.socket().getRemoteSocketAddress().toString() + " ] 接收到消息: " + new String(buffer.array(), 0, buffer.remaining()));
channel.write(ByteBuffer.wrap("Hi i'm reactor server".getBytes(StandardCharsets.UTF_8)));
} catch (IOException e) {
e.printStackTrace();
}
});

} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

3.7.2 一主多从Reactor模式

我们还可以继续进行细分,现在是一个Reactor处理所有的操作请求,我们可以将其设计成Reactor的一主多从,让主Reactor处理Accept操作,而从Reactor处理其他的操作。

img

作为主Reactor的代码不需要进行修改。
我们需要创建一个从Reactor类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public class SubReactor implements Closeable, Runnable {
private final Selector selector;
private static final Integer POOL_SIZE = 4;

/**
* 线程池大小为4,表示从Reactor的大小为4
*/
private static final ExecutorService POOL = Executors.newFixedThreadPool(POOL_SIZE);
private static final SubReactor[] reactors = new SubReactor[POOL_SIZE];
private static int selectedIndex = 0;

public SubReactor() throws IOException {
this.selector = Selector.open();
}

static {
try {
for (Integer i = 0; i < POOL_SIZE; i++) {
final SubReactor subReactor;
subReactor = new SubReactor();
reactors[i] = subReactor;
POOL.submit(subReactor);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}

public synchronized static Selector nextSelector() {
final Selector selector = reactors[selectedIndex++].selector;
selectedIndex = selectedIndex & POOL_SIZE;
return selector;
}

@Override
public void run() {
try {
while (true) {
final int count = selector.select();
System.out.println("监听到 " + count + " 个对象");
final Set<SelectionKey> selectionKeys = selector.selectedKeys();
final Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
this.dispatch(iterator.next());
iterator.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}

private void dispatch(SelectionKey key){
Object att = key.attachment();
if(att instanceof Runnable) {
((Runnable) att).run();
}
}

@Override
public void close() throws IOException {
selector.close();
}
}

然后我们修改一下Acceptor类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class Acceptor implements Runnable {

private final ServerSocketChannel serverSocketChannel;

public Acceptor(ServerSocketChannel serverSocketChannel) {
this.serverSocketChannel = serverSocketChannel;
}

@Override
public void run() {
try {
final SocketChannel channel = serverSocketChannel.accept();
System.out.println("客户端已连接,IP地址为:" + channel.getRemoteAddress());
channel.configureBlocking(false);
final Selector selector = SubReactor.nextSelector();
// 在注册前唤醒,防止卡死
selector.wakeup();
// 这里在注册时,创建好对应的Handler,这样在Reactor中分发的时候就可以直接调用Handler了
channel.register(selector, SelectionKey.OP_READ, new Handler(channel));
} catch (IOException e) {
e.printStackTrace();
}
}
}