2021-07-06

深入学习Netty(2)——传统NIO编程

前言

  学习Netty编程,避免不了从了解Java 的NIO编程开始,这样才能通过比较让我们对Netty有更深的了解,才能知道Netty大大的好处。传统的NIO编程code起来比较麻烦,甚至有遗留Bug,但其中最基本的思想是一致的。

  参考资料《Netty In Action》、《Netty权威指南》(有需要的小伙伴可以评论或者私信我)

  博文中所有的代码都已上传到Github,欢迎Star、Fork

 


 

一、NIO 核心组件

  NIO,有人称之为New I/O,这是官方叫法。但是由于之前老的I/O类库是阻塞I/O,所以此时的NIO也可以是非阻塞I/O(Non-block I/O)

  与Socket类和ServerSocket类相对应,NIO提供了SocketChannel和ServerSocketChannel不同的套接字通道实现,可以支持阻塞和非阻塞两种模式

  NIO库是JDK 1.4中引入的,弥补了原来同步阻塞I/O的不足。这是因为提供了高速处理、面向块的I/O,主要包括:缓冲区Buffer、通道Channel、多路复用器Selector。

1.缓冲区Buffer

  在NIO库中,所有的数据都是缓冲区处理的,读取数据时直接读取缓冲区;在写入数据时,写入到缓冲区。在任何时候访问NIO中的数据,都是通过缓冲区进行操作。实际上缓冲区是一个数组,有不同类型的数组,通常是字节数组(ByteBuffer),但它不仅仅是一个数组,缓冲区提供对数据的结构化访问以及维护读写位置(limit)等信息。

   

2.通道Channel

  网络数据通过Channel双向读取和写入(全双工),这点不同于Stream(InputStream/OutputStream或者其子类)一个方向上移动。

  Channel可以分类两个大类:用于网络读写的SelectableChannel和用于文件操作的FileChannel。

  ServerSocketChannel和SocketChannel都是SelectableChannel的子类。

  

3.多路复用器Selector

  多路复用器提供选择已经就绪的任务的能力,具体来说:Selector会不断地轮询注册在其上的Channel,如果某个Channel上面发生读写事件,就表明这个Channel处于就绪状态,会被Selector轮询出来,通过SelectionKey可以获取就绪的Channel的集合,进行后续的I/O操作。这样就意味着只需要一个线程负责Selector轮询,就可以接入成千上万的客户端。

  多路复用器Selector是最核心的组件,在Netty编程中也是尤为重要的,但是这里不具体展开,到时候分析Netty源码的时候会具体介绍。

二、NIO服务端

1.服务端序列图

先放出如下的NIO服务端序列图,结合序列图给具体的步骤如下,之后的示例代码中也会有详细注释

  

 

第一步:打开ServerSocketChannel,用于监听客户端的连接,是所有客户端连接的父管道。

第二步:绑定监听端口,设置连接为非阻塞模式

第三步:创建Reactor线程,创建多路复用器并启动线程

第四步:将ServerSocketChannel注册到Reactor线程的多路复用器Selector上,监听ACCPET事件。

第五步:多路复用器在线程run方法在无线循环体内轮询准备就绪的Key。

第六步:多路复用器监听到有新的客户端接入,处理新的接入请求,完成TCP三次握手,建立物理链路。

第七步:设置客户端链路为非阻塞模式

第八步:将新接入的客户端注册到Reactor线程的多路复用器上,监听读操作,读取客户端发送的网络消息。

第九步:异步读取客户端请求消息到缓冲区

第十步:对ByteBuffer进行编解码,如果有半包消息指针reset,继续读取后续的报文,将解码成功的消息封装成Task,交给业务线程池中,进行业务处理

第十一步:将对象encode成ByteBuffer,调用SocketChannel的异步write接口,将消息异步发送给客户端

2.服务端代码示例

(1)多路复用服务MultiplexerTimeServer

public class MultiplexerTimeServer implements Runnable { private Selector selector; private ServerSocketChannel servChannel; private volatile boolean stop; /**  * 初始化多路复用器、绑定监听端口  *  * @param port  */ public MultiplexerTimeServer(int port) {  try {   // 1. 打开ServerSocketChannel,监听客户端连接   servChannel = ServerSocketChannel.open();   // 2. 绑定监听端口,设置连接为非阻塞模式   servChannel.socket().bind(new InetSocketAddress(port), 1024);   servChannel.configureBlocking(false);   // 3. 创建Reactor线程,创建多路复用并启动线程   selector = Selector.open();   // 4. 将ServerSocketChannel注册到Reactor线程的多路了复用器Selector,监听ACCEPT事件   servChannel.register(selector, SelectionKey.OP_ACCEPT);   System.out.println("The time server is start in port : " + port);  } catch (IOException e) {   e.printStackTrace();   System.exit(1);  } } public void stop() {  this.stop = true; } @Override public void run() {  while (!stop) {   try {    selector.select(1000);    Set<SelectionKey> selectedKeys = selector.selectedKeys();    Iterator<SelectionKey> it = selectedKeys.iterator();    SelectionKey key = null;    // 循环轮询准备就绪的Key    while (it.hasNext()) {     key = it.next();     it.remove();     try {      // deal with I/O event      handleInput(key);     } catch (Exception e) {      if (key != null) {       key.cancel();       if (key.channel() != null) {        key.channel().close();       }      }     }    }   } catch (Throwable t) {    t.printStackTrace();   }  }  // 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源  if (selector != null) {   try {    selector.close();   } catch (IOException e) {    e.printStackTrace();   }  } } private void handleInput(SelectionKey key) throws IOException {  if (key.isValid()) {   // 处理新接入的请求消息   if (key.isAcceptable()) {    // a connection was accepted by a ServerSocketChannel    ServerSocketChannel ssc = (ServerSocketChannel) key.channel();    // 6. 监听到新的客户端接入,处理新的接入请求我,完成TCP三次握手-->建立链路    SocketChannel sc = ssc.accept();    // 7. 设置客户端链路为非阻塞模式    sc.configureBlocking(false);    sc.socket().setReuseAddress(true);    // 8. 将新接入的客户端连接注册到Reactor线程的多路复用器上,监听读操作,读取客户端发送的消息    sc.register(selector, SelectionKey.OP_READ);   }   if (key.isReadable()) {    // a channel is ready for reading    SocketChannel sc = (SocketChannel) key.channel();    ByteBuffer readBuffer = ByteBuffer.allocate(1024);    // 9. 异步读取客户端请求消息到缓冲区    int readBytes = sc.read(readBuffer);    if (readBytes > 0) {     readBuffer.flip();     // 10. 读取解码报文     byte[] bytes = new byte[readBuffer.remaining()];     readBuffer.get(bytes);     String body = new String(bytes, "UTF-8");     System.out.println("The time server receive order : " + body);     String currentTime = "QUERY TIME ORDER"       .equalsIgnoreCase(body) ? new java.util.Date(       System.currentTimeMillis()).toString()       : "BAD ORDER";     doWrite(sc, currentTime);    } else if (readBytes < 0) {     // 对端链路关闭     key.cancel();     sc.close();    } else {     // 读到0字节,忽略    }   }  } } private void doWrite(SocketChannel channel, String response)   throws IOException {  if (response != null && response.trim().length() > 0) {   byte[] bytes = response.getBytes();   ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);   writeBuffer.put(bytes);   writeBuffer.flip();   channel.write(writeBuffer);  } }}

(2)NIO服务TimeServer

public class TimeServer { public static void main(String[] args) {  int port = 8084;  MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);  new Thread(timeServer, "NIO-TimeServer").start(); }}

(3)开启服务端

运行TimeServer:

使用netstat命令查看是否对8084端口开启监听

三、NIO客户端

1.客户端序列图

第一步:打开SocketChannel,绑定客户端本地地址(可选,默认系统会随机会分配一个可用的本地地址)

第二步:设置SocketChannel为非阻塞模式,同时设置客户端连接的TCP参数

第三步:异步连接服务端

第四步:判断是否连接成功,如果连接成功则直接注册读状态位到多路复用中。如果没有当前没有连接成功(异步连接,返回false,说明客户端已经发送sync包,服务端没有返回ack包,物理链路还没建立)

第五步:向Reactor线程的多路复用OP_CONNECT状态位,监听服务端的TCP ACK应答

第六步:创建Reactor线程,创建多路复用器并启动线程。

第七步:多路复用在线程run方法无线循环体内轮询准备就绪的Key

第八步:接收connect事件进行处理

第九步:判断连接结果,如果连接成功,注册读事件到多路复用器,

第十步:注册读事件到多路复用器

第十一步:异步读客户端请求消息到缓冲区

第十二步:对ByteBuffer进行编解码

第十三步:将POJO对象encode成ByteBuffer,调用SocketChannel的异步write接口,将消息异步发送给客户端。

2.客户端示例代码

(1)客户端处理TimeClientHandle

public class TimeClientHandle implements Runnable { private String host; private int port; private Selector selector; private SocketChannel socketChannel; private volatile boolean stop; public TimeClientHandle(String host, int port) {  this.host = host == null ? "127.0.0.1" : host;  this.port = port;  try {   // 创建多路复用器并打开   selector = Selector.open();   // 1.打开SocketChannel,   socketChannel = SocketChannel.open();   // 2.设置SocketChannel非阻塞模式, 这里不设置TCP参数   socketChannel.configureBlocking(false);  } catch (IOException e) {   e.printStackTrace();   System.exit(1);  } } @Override public void run() {  try {   // 连接服务端   doConnect();  } catch (IOException e) {   e.printStackTrace();   System.exit(1);  }  while (!stop) {   try {    // 6. 多路复用器在线程run方法的无限循环体内轮询准备就绪的Key    selector.select(1000);    Set<SelectionKey> selectedKeys = selector.selectedKeys();    Iterator<SelectionKey> it = selectedKeys.iterator();    SelectionKey key = null;    while (it.hasNext()) {     key = it.next();     it.remove();     try {      handleInput(key);     } catch (Exception e) {      if (key != null) {       key.cancel();       if (key.channel() != null) {        key.channel().close();       }      }     }    }   } catch (Exception e) {    e.printStackTrace();    System.exit(1);   }  }  // 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源  if (selector != null) {   try {    selector.close();   } catch (IOException e) {    e.printStackTrace();   }  } } /**  * 处理客户端输入  *  * @param key  * @throws IOException  */ private void handleInput(SelectionKey key) throws IOException {  if (key.isValid()) {   // 判断是否连接成功   SocketChannel sc = (SocketChannel) key.channel();   // 7. 接收connect事件进行处理   if (key.isConnectable()) {    // 8. 如果连接完成则注册读事件到多路复用器    if (sc.finishConnect()) {     sc.register(selector, SelectionKey.OP_READ);     doWrite(sc);    } else {     System.exit(1);// 连接失败,进程退出    }   }   if (key.isReadable()) {    ByteBuffer readBuffer = ByteBuffer.allocate(1024);    // 9. 异步读客户端请求消息到缓冲区    int readBytes = sc.read(readBuffer);    if (readBytes > 0) {     readBuffer.flip();     byte[] bytes = new byte[readBuffer.remaining()];     readBuffer.get(byt......

原文转载:http://www.shaoqun.com/a/849309.html

跨境电商:https://www.ikjzd.com/

3suisses:https://www.ikjzd.com/w/412

贝恩投资公司:https://www.ikjzd.com/w/1336

eprice:https://www.ikjzd.com/w/1325


前言  学习Netty编程,避免不了从了解Java的NIO编程开始,这样才能通过比较让我们对Netty有更深的了解,才能知道Netty大大的好处。传统的NIO编程code起来比较麻烦,甚至有遗留Bug,但其中最基本的思想是一致的。  参考资料《NettyInAction》、《Netty权威指南》(有需要的小伙伴可以评论或者私信我)  博文中所有的代码都已上传到Github,欢迎Star、Fork一
沃尔玛:https://www.ikjzd.com/w/220
友家速递:https://www.ikjzd.com/w/1341
上海跨境通:https://www.ikjzd.com/w/1329
近百位商家账户被盗!亚马逊遭黑客钓鱼技术攻击:https://www.ikjzd.com/articles/91348
2019年myMall平台数据分析:热销品类,潜力产品:https://www.ikjzd.com/articles/91349
20个listing,卖出300万!爆款流量都是"磨"出来的!:https://www.ikjzd.com/articles/91350
Instagram广告类型及高效推广营销策略:https://www.ikjzd.com/articles/91352
口述:夫妻交换的性爱故事(3/3):http://www.30bags.com/m/a/249635.html
速卖通物流发往国外是怎么操作的?速卖通线上发货又是怎样的?:https://www.ikjzd.com/articles/146381
今日亚马逊"姐夫"时代结束,新任CEO掌权!卖家前路未知:https://www.ikjzd.com/articles/146377
女人想火一样的四个场景,看你有没有。:http://lady.shaoqun.com/a/403812.html
无意中发现老板办公室被美女下属"欺骗":http://lady.shaoqun.com/a/403813.html

No comments:

Post a Comment