NIO API

NIO

NIO简介

Java NIO (New IO / Non Blocking IO)
是从Java1.4版本开始引入的一个新的IO API, 可以的替代标准的Java IO API.
NIO与原来的IO有同样的作用和目的, 但是使用的方式完全不同, NIO支持面向缓冲区的,基于通道的IO操作.
NIO将以更加高效的方式进行文件的读写操作.

NIO与IO的主要区别

IO NIO
面向流(Stream Oriented) 面向缓冲区
阻塞IO (Blocking IO) 非阻塞IO (Non Blocking IO)
(无) 选择器(Selectors)

通道和缓冲区

NIO系统的核心在于: 通道(Channel)和缓冲区(Buffer).
通道表示打开到IO设备(例如:文件,套接字)的连接.
若需要使用NIO系统, 需要获取用于连接IO设备的通道一级用于容纳数据的缓冲区.
然后操作缓冲区, 对数据进行处理.

简而言之, Channel负责传输, Buffer负责存储

缓冲区(Buffer):

在JAVA NIO中负责数据的存取. 缓冲区就是数组,用于存储不同数据类型的数据.

根据数据类型不同(boolean除外),提供了相应类型的缓冲区:

  • ByteBuffer
  • CharBuffer
  • ShortBuffer
  • IntBuffer
  • LongBuffer
  • FloatBuffer
  • DoubleBuffer

上述缓冲区的管理方法几乎一致,通过allocate()获取缓冲区

缓冲区存取数据的两个核心方法:

  • put(): 存入数据到缓冲区中
  • get(): 获取缓冲区中的数据

Buffer的核心属性:

  • capacity: 容量,表示缓冲区中最大存储数据的容量. 一旦声明不能改变.
  • limit: 界限,表示缓冲区中可以操作数据的大小. (limit后的数据不能进行读写)
  • position: 位置, 表示缓冲区中正在操作数据的位置.
  • mark: 标记, 表示记录当前position的位置,可以通过reset()恢复到mark的位置

mark <= position <= limit <= capacity

缓冲区的测试代码:

@Test
public void test01(){
    String str = "abcde";
    // 1. 分配一个指定大小的缓冲区
    ByteBuffer allocate = ByteBuffer.allocate(1024);
    System.out.println("---------allocate()------------");
    System.out.println(allocate.position());    // 0
    System.out.println(allocate.limit());       // 1024
    System.out.println(allocate.capacity());    // 1024

    // 2. 利用put()存入数据到缓冲区中
    System.out.println("-------put()--------");
    allocate.put(str.getBytes());
    System.out.println(allocate.position());    // 5
    System.out.println(allocate.limit());       // 1024
    System.out.println(allocate.capacity());    // 1024

    // 3. 切换读取数据模式
    System.out.println("---------flip()------------");
    allocate.flip();
    System.out.println(allocate.position());    // 0
    System.out.println(allocate.limit());       // 5
    System.out.println(allocate.capacity());    // 1024

    // 4. 利用get() 读取缓冲区中的数据

    System.out.println("--------get()------------");
    byte[] dst = new byte[allocate.limit()];    // {0,0,0,0,0}
    allocate.get(dst);
    System.out.println(new String(dst, 0, dst.length));

    System.out.println("---------after get()------------");
    System.out.println(allocate.position());    // 5
    System.out.println(allocate.limit());       // 5
    System.out.println(allocate.capacity());    // 1024

    // 5. rewind() 可重复读数据
    allocate.rewind();
    System.out.println("---------rewind()------------");
    System.out.println(allocate.position());    // 0
    System.out.println(allocate.limit());       // 5
    System.out.println(allocate.capacity());    // 1024

    // 6. 清空缓冲区. 但是缓冲区中的数据依然存在,但是处于"被遗忘"状态
    allocate.clear();
    System.out.println("---------clear()------------");
    System.out.println(allocate.position());    // 0
    System.out.println(allocate.limit());       // 1024
    System.out.println(allocate.capacity());    // 1024

    System.out.println((char)allocate.get());   // 'a'
    System.out.println(allocate.position());    // 1

}

@Test
public void test02(){
    String str = "abcde";
    ByteBuffer allocate = ByteBuffer.allocate(1024);
    // 缓冲区中存入数据
    allocate.put(str.getBytes());

    // 切换读模式
    allocate.flip();

    byte[] bytes = new byte[allocate.limit()];
    // 读两个, 从bytes数组的索引0开始存
    allocate.get(bytes, 0, 2);
    System.out.println(new String(bytes));      // ab

    // 标记当然读到的位置
    allocate.mark();

    // 再读两个,从bytes数组的索引2开始存
    allocate.get(bytes, 2,2);
    System.out.println(new String(bytes));      // abcd

    System.out.println(allocate.position());    // 4
    // 将position,读指针重置到mark处
    allocate.reset();
    System.out.println(allocate.position());    // 2

    System.out.println("-----------");
    // 判断缓冲区中是否还有剩余数据
    if (allocate.hasRemaining()){               // 现在position=2, limit=5
        // 获取缓冲区中还要操作的数量
        System.out.println(allocate.remaining());// limit-position
    }
}

直接缓冲区与非直接缓冲区

非直接缓冲区: 通过allocate()方法分配缓冲区, 将缓冲区建立在JVM的内存中
直接缓冲区: 通过allocateDirect()方法分配直接缓冲区,将缓冲区建立在操作系统的物理内存中. 可以提高效率.

  • 字节缓冲区要么是直接的,要么是非直接的. 如果为直接字节缓冲区,则Java虚拟机会尽最大努力直接在此缓冲区上执行本机IO操作. 也就是说, 在每次调用基础操作系统的一个本机IO操作之前(或之后),虚拟机都会尽量避免将缓冲区的内容复制到中间缓冲区中(或从中间缓冲区中复制内容).
  • 直接字节缓冲区可以通过调用此类的allocateDirect()工厂方法来创建. 此方法返回的缓冲区进行分配和取消分配所需成本通常高于非直接缓冲区. 直接缓冲区的内容可以驻留在常规的垃圾回收堆之外, 因此,他们对应用程序的内存需求量造成的影响可能并不明显. 所以, 建议将直接缓冲区主要分配给哪些易受基础系统的本机IO操作影响的大型,持久的缓冲区. 一般情况下, 最好仅在直接缓冲区能再程序性能方面带来明显好处时分配他们.
  • 直接字节缓冲区还可以通过FileChannel的map()方法将文件区域直接映射到内存中来创建. 该方法返回MappedByteBuffer. Java平台的实现有助于通过JNI从本机代码创建直接字节缓冲区. 如果以上这些缓冲区中的某个缓冲区实例指的是不可访问的内存区域, 则试图访问该区域不会更改该缓冲区的内容, 并且将会在访问期间或稍后的某个时间导致抛出不确定的异常.
  • 字节缓冲区是直接缓冲区还是非直接缓冲区可通过调用其isDirect()方法来确定. 提供此方法是为了能够在性能关键型代码中执行显示缓冲区管理.

非直接缓冲区

直接缓冲区

当应用程序发起一个read请求时, 磁盘中的数据没法直接传到应用程序中.
磁盘中的数据会首先读到内核地址空间中, 然后在内核地址空间的数据会copy到用户地址空间中, 然后数据才会读到应用程序中.
写数据是同样的需要copy一次.
在这个过程中, copy这步操作显得略微多余, 于是NIO中提出了内存映射文件,也叫直接缓冲区.
他在内核地址空间和用户地址空间之间, 在物理内存中直接开辟了一块缓冲区.因此把中间copy的过程省掉了.

但是开辟直接缓冲区的消耗是比较大的. 而且直接缓冲区脱离了JVM的控制, 完全由操作系统控制,

@Test
public void test03(){
    ByteBuffer buf = ByteBuffer.allocateDirect(1024);
    // 判断是否是直接缓冲区
    System.out.println(buf.isDirect()); // true
}

通道(Channel)

  1. 通道: 由java.nio.channels包定义的. Channel表示IO源与目标打开的连接.
    Channel类似于传统的”流”. 只不过Channel本身不能直接访问数据, Channel只能与Buffer进行交互.

  2. 通道的主要实现类
    java.nio.channels.Channel 接口:
    FileChannel
    SocketChannel
    ServerSocketChannel
    DatagramChannel

  3. 获取通道

    • Java针对支持通道的类提供了getChannel()方法
      本地IO:
      FileInputStream/FileOutputStream
      RandomAccessFile

      网络IO:
      Socket
      ServerSocket
      DatagramSocket

    • 在 JDK 1.7 中的NIO.2 针对各个通道提供了静态方法 open()

    • 在 JDK 1.7 中的NIO.2 的Files工具类的newByteChannel()

利用通道完成文件的复制

@Test
public void test01() throws IOException {
    File file = new File("1.txt");
    System.out.println(file.getAbsolutePath());
    FileInputStream in = new FileInputStream("1.txt");
    FileOutputStream out = new FileOutputStream("2.txt");

    // 1. 获取通道
    FileChannel inChannel = in.getChannel();
    FileChannel outChannel = out.getChannel();

    // 2. 分配指定大小的缓冲区
    ByteBuffer buf = ByteBuffer.allocate(1024);

    // 3. 将通道中的数据存到缓冲区中
    while (inChannel.read(buf) != -1){
        // 切换成读数据模式
        buf.flip(); 
        // 4. 将缓冲区的数据写入通道
        outChannel.write(buf);
        buf.clear();// 清空缓冲区
    }

    // close要放在try块的finally里确保关闭
    outChannel.close();
    inChannel.close();
    out.close();
    in.close();

}

使用直接缓冲区完成文件复制(内存映射文件)

@Test
public void test02() throws IOException {
    FileChannel inChannel = FileChannel.open(Paths.get("1.txt"), StandardOpenOption.READ);
    FileChannel outChannel = FileChannel.open(Paths.get("2.txt"), StandardOpenOption.WRITE, StandardOpenOption.CREATE, StandardOpenOption.READ);

    // 内存映射文件
    MappedByteBuffer inMapBuffer = inChannel.map(FileChannel.MapMode.READ_ONLY, 0, inChannel.size());
    MappedByteBuffer outMapBuffer = outChannel.map(FileChannel.MapMode.READ_WRITE, 0, inChannel.size());

    // 直接对缓冲区进行数据的读写操作
    byte[] dst = new byte[inMapBuffer.limit()];
    inMapBuffer.get(dst);
    outMapBuffer.put(dst);

    inChannel.close();
    outChannel.close();
}
  1. 通道之间的数据传输
    transferFrom()
    transferTo()
@Test
public void test03() throws IOException {
    FileChannel inChannel = FileChannel.open(Paths.get("1.txt"), StandardOpenOption.READ);
    FileChannel outChannel = FileChannel.open(Paths.get("2.txt"), StandardOpenOption.WRITE, StandardOpenOption.CREATE, StandardOpenOption.READ);

//  inChannel.transferTo(0, inChannel.size(), outChannel);
    outChannel.transferFrom(inChannel, 0, inChannel.size());

    inChannel.close();
    outChannel.close();
}

分散(Scatter)聚集(Gather)
分散读取(Scattering Reads): 将通道中的数据分散到多核缓冲区中
聚集写入(Gathering Writes): 将多个缓冲区中的数据聚集到通道中


// 分散和聚集
@Test
public void test04() throws IOException {
    RandomAccessFile raf1 = new RandomAccessFile("1.txt", "rw");
    // 1. 获取通道
    FileChannel channel1 = raf1.getChannel();

    // 2. 分配指定大小的缓冲区
    ByteBuffer buf1 = ByteBuffer.allocate(100);
    ByteBuffer buf2 = ByteBuffer.allocate(1024);

    ByteBuffer[] bufs = {buf1, buf2};
    // 3. 分散读取
    channel1.read(bufs);

    for (ByteBuffer byteBuffer : bufs) {
        byteBuffer.flip();
    }

    System.out.println(new String(bufs[0].array(), 0, bufs[0].limit()));
    System.out.println("---------------");
    System.out.println(new String(bufs[1].array(), 0, bufs[1].limit()));


    RandomAccessFile raf2 = new RandomAccessFile("2.txt", "rw");
    FileChannel channel2 = raf2.getChannel();
    channel2.write(bufs);

    channel2.close();
    channel1.close();

}

**字符集: Charset
编码: 字符串 -> 字节数组
解码: 字节数组 -> 字符串

@Test
public void test06() throws CharacterCodingException {
    Charset cs1 = Charset.forName("GBK");

    // 获取编码器
    CharsetEncoder ce = cs1.newEncoder();
    // 获取解码器
    CharsetDecoder cd = cs1.newDecoder();

    CharBuffer cBuf = CharBuffer.allocate(1024);
    cBuf.put("安抚无法发泄出");
    cBuf.flip();

    // 编码
    ByteBuffer bBuf = ce.encode(cBuf);

    for (int i = 0; i < bBuf.limit(); i++) {
        System.out.println(bBuf.get());
    }

    // 解码
    bBuf.flip();
    CharBuffer cBuf2 = cd.decode(bBuf);
    System.out.println(cBuf2.toString());

}

阻塞与非阻塞

一个阻塞socket

// 客户端向服务端发送一个文件
@Test
public void client() throws IOException {
    // 1. 获取通道
    SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9898));

    FileChannel inChannel = FileChannel.open(Paths.get("1.txt"), StandardOpenOption.READ);

    // 2. 分配指定大小的缓冲区
    ByteBuffer buf = ByteBuffer.allocate(1024);

    // 3. 读取本地文件并发送到服务端
    while (inChannel.read(buf) != -1){
        buf.flip();
        socketChannel.write(buf);
        buf.clear();
    }

    // 4. 关闭通道
    inChannel.close();
    socketChannel.close();
}

// 服务端接受文件并写到硬盘上
@Test
public void server() throws IOException {
    // 1. 获取通道
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    // 2. 绑定连接
    serverSocketChannel.bind(new InetSocketAddress(9898));

    FileChannel inChannel = FileChannel.open(Paths.get("2.txt"), StandardOpenOption.WRITE, StandardOpenOption.CREATE);
    // 3. 获取客户端连接的通道
    SocketChannel socketChannel = serverSocketChannel.accept();

    // 4. 分配一个指定大小的缓冲区
    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

    // 4. 接收客户端的数据,并保存到本地.
    while (socketChannel.read(byteBuffer) != -1){
        byteBuffer.flip();
        inChannel.write(byteBuffer);
        byteBuffer.clear();
    }

    inChannel.close();
    socketChannel.close();
    serverSocketChannel.close();

}

阻塞式,带回应socket


@Test
public void client() throws IOException {

    SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9898));

    FileChannel inChannel = FileChannel.open(Paths.get("1.txt"), StandardOpenOption.READ);

    ByteBuffer buf = ByteBuffer.allocate(1024);

    while (inChannel.read(buf) != -1){
        buf.flip();
        sChannel.write(buf);
        buf.clear();
    }

    // 关闭输出流, 告诉服务端数据已经发送完了
    sChannel.shutdownOutput();

    int len = 0;
    while ((len = sChannel.read(buf)) != -1){
        buf.flip();
        System.out.println(new String(buf.array(), 0, len));
        buf.clear();
    }

    inChannel.close();
    sChannel.close();

}

@Test
public void server() throws IOException {

    ServerSocketChannel ssChannel = ServerSocketChannel.open();

    FileChannel outChannel = FileChannel.open(Paths.get("2.txt"), StandardOpenOption.WRITE, StandardOpenOption.CREATE);

    ssChannel.bind(new InetSocketAddress(9898));

    SocketChannel sChannel = ssChannel.accept();

    ByteBuffer buf = ByteBuffer.allocate(1024);


    while (sChannel.read(buf) != -1){
        buf.flip();
        outChannel.write(buf);
        buf.clear();
    }


    //发送反馈给客户端
    buf.put("服务端接收数据成功".getBytes());
    buf.flip();
    sChannel.write(buf);

    sChannel.shutdownOutput();

    sChannel.close();
    outChannel.close();
    ssChannel.close();
}

非阻塞模式

选择器(Selector): 选择器是SelectableChannel对象的多路复用器, Selector可以同时监控多个SelectableChannel的IO状况, 也就是说, 利用Selector可使一个单独的线程管理多个Channel. Selector是非阻塞IO的核心.

选择器的应用:

创建Selector: 通过调用Selector.open()方法创建一个Selector.
向选择器注册通道: SelectableChannel.register(Selector sel, int ops);
当调用register(Selector sel, int ops)将通道注册选择器时, 选择器对通道的监听事件, 需要通过第二个参数ops指定.
可以监听的事件类型(可使用SelectionKey的四个常量表示):
- 读: SelectionKey.OP_READ(1)
- 写: SelectionKey.OP_WRITE(4)
- 连接: SelectionKey.OP_CONNECT(8)
- 接收: SelectionKey.OP_ACCEPT(16)
若注册时不止监听一个事件, 则可以使用”位或”操作符连接
> int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;

@Test
public void client() throws IOException {
    // 1. 获取通道
    SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9898));

    // 2. 切换到非阻塞模式
    sChannel.configureBlocking(false);

    // 3. 分配指定大小的缓冲区
    ByteBuffer buf = ByteBuffer.allocate(1024);

    // 4. 发送数据给服务端
    buf.put(LocalDateTime.now().toString().getBytes());
    buf.flip();
    sChannel.write(buf);
    buf.clear();

    // 关闭连接
    sChannel.close();

}

@Test
public void server() throws IOException {
    // 1. 获取通道
    ServerSocketChannel ssChannel = ServerSocketChannel.open();
    // 2. 绑定端口
    ssChannel.bind(new InetSocketAddress(9898));

    // 3. 切换到非阻塞模式
    ssChannel.configureBlocking(false);

    // 4. 获取选择器
    Selector selector = Selector.open();

    // 5. 将通道注册到选择器上, 并且指定监听事件
    ssChannel.register(selector, SelectionKey.OP_ACCEPT);

    // 6. 轮询式的获取选择器上已经"准备就绪"的事件
    while (selector.select() > 0){
        // 7. 获取当前选择器中所有注册的"选择键(已就绪的监听事件)"
        Iterator<SelectionKey> it = selector.selectedKeys().iterator();

        while (it.hasNext()){
            // 8. 获取准备就绪的事件
            SelectionKey selectionKey = it.next();

            // 9. 判断具体是什么事件准备就绪
            if (selectionKey.isAcceptable()){
                // 10. 若"接收就绪", 获取客户端连接
                SocketChannel sChannel = ssChannel.accept();

                // 11. 将客户端连接切换非阻塞模式
                sChannel.configureBlocking(false);

                // 12. 将该通道注册到选择器上
                sChannel.register(selector, SelectionKey.OP_READ);
            }else if (selectionKey.isReadable()){
                // 13. 获取当前选择器上"读就绪"状态的通道
                SocketChannel sChannel = (SocketChannel) selectionKey.channel();

                // 14. 读取数据
                ByteBuffer buf = ByteBuffer.allocate(1024);

                int len = 0;
                while ((len = sChannel.read(buf))> 0){
                    buf.flip();
                    System.out.println(new String(buf.array(), 0, len));
                    buf.clear();
                }
            }

            // 15. 取消选择键
            it.remove();
        }
    }
}

管道(Pipe):

Java NIO管道是两个线程之间的单向数据连接. Pipe有一个source通道和一个sink通道. 数据会被写到sink通道, 从source通道读取.

@Test
public void test1() throws IOException {
    // 1. 获取管道
    Pipe pipe = Pipe.open();

    // 2. 将缓冲区的数据写入管道
    Pipe.SinkChannel sinkChannel = pipe.sink();

    ByteBuffer buf = ByteBuffer.allocate(1024);
    buf.put("通过单向管道发送数据".getBytes());
    buf.flip();

    sinkChannel.write(buf);
    buf.clear();

    // 3. 读取缓冲区中的数据
    Pipe.SourceChannel sourceChannel = pipe.source();
    int len = sourceChannel.read(buf);
    buf.flip();
    System.out.println(new String(buf.array(), 0, len));
}

   转载规则


《NIO API》 echi1995 采用 知识共享署名 4.0 国际许可协议 进行许可。
 上一篇
JDK动态代理和CGLIB动态代理 JDK动态代理和CGLIB动态代理
JDK动态代理和CGLIB动态代理代理代理模式是一种常用的设计模式, 代表性的有Spring中的AOP. 代理模式就像是一个中介一样, 以租房为例. 房东作为被代理类, 租客作为调用方, 中介就是租客和房东中间的代理. 在中介的操作下, 租
下一篇 
J.U.C (一) J.U.C (一)
整体认识J.U.C, 之前也写过AQS相关的博客, 那时候听了一节课就匆匆忙忙的记笔记写博客, 对J.U.C也没有一个系统的学习. 最近翻到一篇博客写的挺全的,想拿来学习一下. 深入浅出Java Concurrcy 整体认识首先要对J.U.
  目录