西瓜の

-人类是因为记录才进入文明-

2020/05/12
20:03
technology

java BIO和NIO

BIO: blocking I/O 。

  • BIO就是传统I/O,其相关的类和接口在 java.io 包下
  • BIO 是 同步阻塞 IO ,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,可以通过线程池机制改善(实现多个客户连接服务器)

BIO问题:

  • 每个请求都需要创建独立的线程,与对应的客户端进行数据 Read,业务处理,数据 Write 。
  • 当并发数较大时,需要创建大量线程来处理连接,系统资源占用较大。
  • 连接建立后,如果当前线程暂时没有数据可读,则线程就阻塞在 Read 操作上,造成线程资源浪费

使用场景:BIO方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4以前的唯一选择,程序简单易理解

一个简单的BIO 服务端代码示例:(客户端可使用 telnet 模拟)

ps: 如何开启win10的telnet服务?

package com.wmelon.bio;

import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 启动 cmd
 * 执行 telnet 127.0.0.1 6666
 * 快捷键  ctrl+]
 * 发消息 send hello
 * @author watermelon
 * @time 2020/4/29
 */
public class BIOServer {
    public static void main(String[] args) {
        int co =0;
        //线程池
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        try {
            //启动一个服务端
            ServerSocket serverSocket = new ServerSocket(6666);
            while (true) {
                //监听,等待客户端连接
                System.out.println("主线程id:" + Thread.currentThread().getId() + ",主线程name:" + Thread.currentThread().getName());
                System.out.println("等待连接。。。");
                final Socket socket = serverSocket.accept();
                System.out.println("连接到第"+(++co)+"个客户端");
                //给每个客户端开启一个线程 与之交互
                newCachedThreadPool.execute(() -> {
                    handler(socket);
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    public static void handler(Socket socket) {

        try {
            while (true) {
                System.out.println("线程id:" + Thread.currentThread().getId() + ",线程name:" + Thread.currentThread().getName());
                //指定一次最大能读取多少个字节
//                byte[] bytes = new byte[4];
                byte[] bytes = new byte[1024];
                InputStream inputStream = socket.getInputStream();
                System.out.println("read。。。");
                int read = inputStream.read(bytes);
                if (read != -1) {
                    //读取bytes中的有效字节(0 - read 这个区间为有效字节)
                    System.out.println(new String(bytes,0,read));
                } else {
                    break;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                socket.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

NIO: non-blocking IO,(也称 New IO)。

  • NIO是 同步非阻塞 IO,其相关类和接口放在 java.nio 包下
  • NIO 有三大核心部分:Channel(通道)Buffer(缓冲区), Selector(选择器)
  • NIO是 面向缓冲区 ,或者面向块 编程的。数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区中前后移动,这就增加了处理过程中的灵活性,使用它可以提供非阻塞式的高伸缩性网络
  • Java NIO的非阻塞模式,使一个线程从某通道发送请求或者读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取,而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。 非阻塞写也是如此,一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情

NIO和BIO 比较:

  • BIO 以流的方式处理数据,而 NIO 以块的方式处理数据,块 I/O 的效率比流 I/O 高很多
  • BIO 是阻塞的,NIO 则是非阻塞的
  • BIO基于字节流和字符流进行操作,而 NIO 基于 Channel(通道)和 Buffer(缓冲区)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector(选择器)用于监听多个通道的事件(比如:连接请求,数据到达等),因此使用单个线程就可以监听多个客户端通道

一个简单地NIO 服务端和客户端代码示例:

服务端:

package com.wmelon.nio.groupChat;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;

/**
 * @author watermelon
 * @time 2020/5/11
 */
public class GroupChatServer {
    //Selector 能够检测多个注册的通道上是否有事件发生,是NIO不阻塞的关键
    private Selector selector;

    //服务端通道  在服务器端监听新的客户端 Socket
    private ServerSocketChannel serverSocketChannel;

    //端口
    private int port = 6677;

    public GroupChatServer() {
        //在构造方法中初始化Selector ,初始化ServerSocketChannel并绑定端口,然后将serverSocketChannel 注册到 Selector 并监听 OP_ACCEPT事件
        try {
            selector = Selector.open();

            serverSocketChannel = ServerSocketChannel.open();

            serverSocketChannel.socket().bind(new InetSocketAddress(port));

            //设置 serverSocketChannel 为非阻塞,默认为 true
            serverSocketChannel.configureBlocking(false);

            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 监听
     */
    private void listen() {

        try {
            while (true) {
                if (selector.select() > 0) {
                    //有连接进来
                    //获取所有 有事件发生的 SelectionKey
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectionKeys.iterator();
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        //连接请求
                        if (key.isAcceptable()) {
                            //获取到 SocketChannel  并将   socketChannel 注册到 selector,监听  OP_READ 事件
                            SocketChannel socketChannel = serverSocketChannel.accept();
                            socketChannel.configureBlocking(false);
                            socketChannel.register(selector, SelectionKey.OP_READ);
                            System.out.println(socketChannel.getRemoteAddress() + "上线");
                        }
                        //消息读取请求
                        if (key.isReadable()) {
                            //读取消息
                            readData(key);
                        }
                        //从 selectionKeys 中移除 已处理的事件,以免重复处理
                        iterator.remove();
                    }

                } else {
                    //没有连接
                    System.out.println("");
                }

            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 读取数据
     *
     * @param sourceKey 发送消息的客户端  SelectionKey
     */
    private void readData(SelectionKey sourceKey) {
        SocketChannel channel = null;
        try {
            channel = (SocketChannel) sourceKey.channel();
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            int read = channel.read(buffer);
            //read 表示读取的数据大小 大于0 则表示 有数据被读取
            if (read > 0) {
                String msg = new String(buffer.array());
                System.out.println(msg);
                sendToClients(msg, sourceKey);
            }
        } catch (Exception e) {
            sourceKey.cancel();
            try {
                System.out.println(channel.getRemoteAddress() + "离线了");
                channel.close();
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
    }

    /**
     * 转发消息给除了自己的其他客户端
     *
     * @param msg  消息内容
     * @param sourceKey 发送消息的客户端  SelectionKey
     * @throws IOException
     */
    private void sendToClients(String msg, SelectionKey sourceKey) throws IOException {
        //获取所有注册到 selector 中的客户端
        Set<SelectionKey> keys = selector.keys();
        Iterator<SelectionKey> iterator = keys.iterator();
        while (iterator.hasNext()) {
            SelectionKey targetKey = iterator.next();
            //通过 SelectionKey  获取对应通道
            Channel channel = targetKey.channel();
            //排除自己
            if (channel instanceof SocketChannel && sourceKey != targetKey) {
                SocketChannel channel1 = (SocketChannel) channel;
                channel1.write(ByteBuffer.wrap(msg.getBytes()));
            }
        }
    }

    public static void main(String[] args) {
        GroupChatServer server = new GroupChatServer();
        server.listen();
    }
}

客户端:

package com.wmelon.nio.groupChat;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;

/**
 * @author watermelon
 * @time 2020/5/11
 */
public class GroupChatClient {

    //host
    private String host = "127.0.0.1";
    //ip
    private int port = 6677;
    // 网络 IO 通道,具体负责进行读写操作。NIO 把缓冲区的数据写入通道,或者把通道里的数据读到缓冲区
    private SocketChannel socketChannel;


    public GroupChatClient() {
        //初始化  socketChannel,指定连接的服务端地址
        try {
            socketChannel = SocketChannel.open(new InetSocketAddress(host, port));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }


    /**
     * 接收消息
     * @throws IOException
     */
    private void receiveMsg() throws IOException {
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        int read = socketChannel.read(byteBuffer);
        if (read > 0) {
            System.out.println(new String(byteBuffer.array()));
        }
    }


    /**
     * 发送消息
     * @param msg
     */
    private void sendMsg(String msg) {
        try {
            msg = socketChannel.getLocalAddress() + "说 :" + msg;
        } catch (IOException e) {
            e.printStackTrace();
        }

        try {
            socketChannel.write(ByteBuffer.wrap(msg.getBytes()));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        GroupChatClient groupChatClient = new GroupChatClient();
        //新开一个线程接收消息
        new Thread(() -> {
            try {
                while (true){
                    groupChatClient.receiveMsg();
                    Thread.sleep(1000);
                }
            } catch (IOException | InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        // 可在 控制台 输入并发送消息
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String msg = scanner.next();
            groupChatClient.sendMsg(msg);
    }
    }
}

ps:

SelectionKey:表示 Selector 和网络通道的注册关系,有4种

OP_ACCEPT:有新的网络连接可以 accept,值为 16
OP_CONNECT:代表连接已经建立,值为 8
OP_READ:代表读操作,值为 1
OP_WRITE:代表写操作,值为 4