本文共 13070 字,大约阅读时间需要 43 分钟。
Java 1.7升级了NIO类库,升级后的NIO类库被称为NIO 2.0,在NIO2.0中Java提供了异步文件I/O操作,同时提供了与UNIX网络编程事件驱动I/O对应的AIO(Asynchronous I/O),AIO是真正的异步非阻塞I/O,它不需要通过多路复用器(Selector)对注册的通道进行轮询操作即可实现异步读写,从而简化了NIO的编程模型。
目前的AIO与NIO底层都使用了epoll(Linux中),所以二者性能都很好,主要差异在于同步与异步,NIO是同步的,始终只有一个线程在进行结果处理,而AIO的异步回调则是基于多线程的,如果NIO结果处理中引入多线程,个人认为二者性能是相仿的。
epoll是Linux中多路复用IO接口select/poll的增强版本,select/poll模型是忙轮询,即一直不停地轮询看哪些操作已经结束可以获取操作结果了,而epoll则是将已经结束的操作的操作结果放入队列中,然后只需要遍历处理队列中的操作就可以了,避免了CPU的浪费,提升程序运行效率。
更详细具体的可以移步百科了解:
https://baike.baidu.com/item/epoll/10738144?fr=aladdin
AIO中提供了两种方式获取操作结果:
package java.nio.channels;/** * A handler for consuming the result of an asynchronous I/O operation. * *The asynchronous channels defined in this package allow a completion * handler to be specified to consume the result of an asynchronous operation. * The {@link #completed completed} method is invoked when the I/O operation * completes successfully. The {@link #failed failed} method is invoked if the * I/O operations fails. The implementations of these methods should complete * in a timely manner so as to avoid keeping the invoking thread from dispatching * to other completion handlers. * * @param
The result type of the I/O operation * @param The type of the object attached to the I/O operation * * @since 1.7 */public interface CompletionHandler { /** * Invoked when an operation has completed. * * @param result * The result of the I/O operation. * @param attachment * The object attached to the I/O operation when it was initiated. */ void completed(V result, A attachment); /** * Invoked when an operation fails. * * @param exc * The exception to indicate why the I/O operation failed * @param attachment * The object attached to the I/O operation when it was initiated. */ void failed(Throwable exc, A attachment);}
CompletionHandler接口中有两个方法需要实现,分别是completed和failed,当操作完成时,会回调completed,出现异常失败时会回调failed。
操作完成时,回调completed函数,其有result和attachment两个参数:
操作异常时回调failed函数,其有exc和attachment两个参数:
/** * 接收请求的结束动作处理类,当异步socket服务接收到一个请求时,会回调此handler,从而对收到的请求进行处理 */ private class AcceptCompletionHandler implements CompletionHandler{ @Override public void completed(AsynchronousSocketChannel channel, SimpleTimeServer attachment) { ... } @Override public void failed(Throwable exc, SimpleTimeServer attachment) { exc.printStackTrace(); ... } }
package com.dongrui.study.ioserver.aioserver;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.AsynchronousServerSocketChannel;import java.nio.channels.AsynchronousSocketChannel;import java.nio.channels.CompletionHandler;import java.nio.charset.StandardCharsets;import java.text.SimpleDateFormat;import java.util.Date;import java.util.concurrent.CountDownLatch;/** * 使用AIO实现的简单时间服务 */public class SimpleTimeServer implements Runnable { /** * 维持服务线程的门闩 */ private CountDownLatch latch; /** * 异步socket服务通道 */ private AsynchronousServerSocketChannel asynchronousServerSocketChannel; private SimpleTimeServer(int port) throws IOException { //开启异步socket服务 asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open(); //绑定端口 asynchronousServerSocketChannel.bind(new InetSocketAddress(port)); System.out.println("simple time server start in " + port); } @Override public void run() { latch = new CountDownLatch(1); //异步socket服务接收请求 System.out.println("我是监听线程:" + Thread.currentThread()); asynchronousServerSocketChannel.accept(this, new AcceptCompletionHandler()); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } /** * 接收请求的结束动作处理类,当异步socket服务接收到一个请求时,会回调此handler,从而对收到的请求进行处理 */ private class AcceptCompletionHandler implements CompletionHandler{ @Override public void completed(AsynchronousSocketChannel channel, SimpleTimeServer attachment) { try { System.out.println("我是处理线程:" + Thread.currentThread()); //循环监听,进行监听操作的是SimpleTimeServer运行的线程 attachment.asynchronousServerSocketChannel.accept(attachment, this); //这里休眠20秒,可以看到当处理线程没有处理完成时,会启用新的线程来处理后面的请求 Thread.sleep(20); ByteBuffer buffer = ByteBuffer.allocate(1024); //从请求通道中读取数据 channel.read(buffer, buffer, new ReadCompletionHandler(channel)); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc, SimpleTimeServer attachment) { //接收请求失败,打印异常信息,将门闩减一,服务线程终止 exc.printStackTrace(); attachment.latch.countDown(); } } /** * 读取数据的结束动作处理类,当系统将数据读取到buffer中,会回调此handler */ private class ReadCompletionHandler implements CompletionHandler { private AsynchronousSocketChannel channel; ReadCompletionHandler(AsynchronousSocketChannel channel) { this.channel = channel; } @Override public void completed(Integer byteNum, ByteBuffer readBuffer) { if (byteNum <= 0) return; readBuffer.flip(); byte[] body = new byte[byteNum]; readBuffer.get(body); String req = new String(body, StandardCharsets.UTF_8); System.out.println("the time server received order: " + req); SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String currentTime = "query time order".equalsIgnoreCase(req) ? format.format(new Date()) + "" : "bad order"; doWrite(currentTime); } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { //读取失败,关闭通道 channel.close(); } catch (IOException e) { e.printStackTrace(); } } private void doWrite(String msg) { if (null != msg) { byte[] bytes = msg.getBytes(StandardCharsets.UTF_8); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); channel.write(writeBuffer, writeBuffer, //写操作结束的回调handler new CompletionHandler () { @Override public void completed(Integer result, ByteBuffer buffer) { if (buffer.hasRemaining()) { channel.write(buffer, buffer, this); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } } }); } } } public static void main(String[] args) { try { System.out.println("我是主线程:" + Thread.currentThread()); new SimpleTimeServer(8088).run(); System.out.println("监听线程已挂"); } catch (IOException e) { e.printStackTrace(); } }}
package com.dongrui.study.ioclient.aioclient;import com.google.common.primitives.Bytes;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.AsynchronousSocketChannel;import java.nio.channels.CompletionHandler;import java.nio.channels.InterruptedByTimeoutException;import java.nio.charset.StandardCharsets;import java.util.ArrayList;import java.util.List;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;/** * 基于AIO的简单时间服务客户端 */public class SimpleTimeClient { private String host; private int port; private CountDownLatch latch; /** * 异步socket通道 */ private AsynchronousSocketChannel channel; private SimpleTimeClient(String host, int port) throws IOException { this.host = host; this.port = port; this.latch = new CountDownLatch(1); initChannel(); } private void initChannel() throws IOException { // 打开异步socket通道 channel = AsynchronousSocketChannel.open(); // 异步连接指定地址,连接完成后会回调ConnectionCompletionHandler channel.connect(new InetSocketAddress(host, port), null, new ConnectionCompletionHandler()); } private class ConnectionCompletionHandler implements CompletionHandler{ @Override public void completed(Void result, Void attachment) { System.out.println("connection thread: " + Thread.currentThread()); String msg = "query time order"; ByteBuffer writeBuffer = ByteBuffer.allocate(msg.length()); writeBuffer.put(msg.getBytes(StandardCharsets.UTF_8)).flip(); // 异步写入发送数据,写入完成后会回调WriteCompletionHandler channel.write(writeBuffer, null, new WriteCompletionHandler()); } @Override public void failed(Throwable exc, Void attachment) { exc.printStackTrace(); latch.countDown(); } } /** * 读取到的byte集合和缓存 */ private class BufferAndArr { public BufferAndArr(List bytesArr, ByteBuffer buffer) { this.bytesArr = bytesArr; this.buffer = buffer; } List bytesArr; ByteBuffer buffer; } /** * 写数据完成回调处理类 */ private class WriteCompletionHandler implements CompletionHandler { @Override public void completed(Integer result, Void attachment) { System.out.println("write thread: " + Thread.currentThread()); List rtnBytesArr = new ArrayList<>(); ByteBuffer readBuffer = ByteBuffer.allocate(1024); // 异步读取返回的数据,读取结束后会回调ReadCompletionHandler channel.read(readBuffer, 1000, TimeUnit.MILLISECONDS, new BufferAndArr(rtnBytesArr, readBuffer), new ReadCompletionHandler()); } @Override public void failed(Throwable exc, Void attachment) { exc.printStackTrace(); latch.countDown(); } } /** * 读数据完成回调处理类 */ private class ReadCompletionHandler implements CompletionHandler { @Override public void completed(Integer bytesNum, BufferAndArr attachment) { System.out.println("read thread: " + Thread.currentThread()); int size = attachment.buffer.limit(); attachment.buffer.flip(); byte[] tempBytes = new byte[bytesNum]; attachment.buffer.get(tempBytes); attachment.bytesArr.addAll(Bytes.asList(tempBytes)); // 根据读取到的数据长度与缓存总长度比较,相等则继续读取,否则读取结束 if (bytesNum >= size) { attachment.buffer.clear(); // 继续读取时加入超时时间,如果已经读取完,则会触发超时异常,转到fail中 channel.read(attachment.buffer, 1000, TimeUnit.MILLISECONDS, attachment, new ReadCompletionHandler()); } else { completionAction(attachment.bytesArr); } } @Override public void failed(Throwable exc, BufferAndArr attachment) { // 当没有数据时会超时抛出InterruptedByTimeoutException异常,然后在这里处理读取结果,因为暂时没有发现更好的方法 if (exc instanceof InterruptedByTimeoutException) { completionAction(attachment.bytesArr); } else { exc.printStackTrace(); } } private void completionAction(List bytesArr) { System.out.println("当前时间:" + new String(Bytes.toArray(bytesArr), StandardCharsets.UTF_8)); latch.countDown(); } } public static void main(String[] args) { while (true) { new Thread(() -> { try { System.out.println("time client thread: " + Thread.currentThread()); SimpleTimeClient client = new SimpleTimeClient("localhost", 8088); client.latch.await(); } catch (IOException | InterruptedException e) { e.printStackTrace(); } }).start(); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } }}
转载地址:http://xptlf.baihongyu.com/