博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Java AIO知识总结
阅读量:2058 次
发布时间:2019-04-29

本文共 13070 字,大约阅读时间需要 43 分钟。

文章目录

概述

什么是AIO?

Java 1.7升级了NIO类库,升级后的NIO类库被称为NIO 2.0,在NIO2.0中Java提供了异步文件I/O操作,同时提供了与UNIX网络编程事件驱动I/O对应的AIO(Asynchronous I/O),AIO是真正的异步非阻塞I/O,它不需要通过多路复用器(Selector)对注册的通道进行轮询操作即可实现异步读写,从而简化了NIO的编程模型。

AIO与NIO有什么区别?

  1. 按照UNIX网络编程模型区分,NIO是同步非阻塞I/O,AIO是异步非阻塞I/O;
  2. AIO与NIO的操作结果获取方式不同,NIO的操作结束后会将操作就绪的I/O放在队列中,由Selector依次循环获取处理;AIO操作结束后则会直接回调CompletionHandler的实现类的相应函数来进行处理;
  3. 处理操作结果时NIO是单线程,即由Selector依次在当前线程中进行处理,如果需要多线程处理需要自行实现,这也是为什么它是同步而非异步;而AIO在回调处理操作结果时,是多线程的,其底层设有线程池。

AIO性能比NIO更好吗?

目前的AIO与NIO底层都使用了epoll(Linux中),所以二者性能都很好,主要差异在于同步与异步,NIO是同步的,始终只有一个线程在进行结果处理,而AIO的异步回调则是基于多线程的,如果NIO结果处理中引入多线程,个人认为二者性能是相仿的。

什么是epoll?

epoll是Linux中多路复用IO接口select/poll的增强版本,select/poll模型是忙轮询,即一直不停地轮询看哪些操作已经结束可以获取操作结果了,而epoll则是将已经结束的操作的操作结果放入队列中,然后只需要遍历处理队列中的操作就可以了,避免了CPU的浪费,提升程序运行效率。

更详细具体的可以移步百科了解:

https://baike.baidu.com/item/epoll/10738144?fr=aladdin

AIO既然是异步的,那么如何获得操作结果?

AIO中提供了两种方式获取操作结果:

  1. 通过java.util.concurrent.Future类来表示异步操作的结果;
  2. 在执行异步操作时传入一个java.nio.channel,并传入CompletionHandler接口的实现类作为操作完成的回调,CompletionHandler顾名思义就是专门用来处理完成结果的。

Java AIO的API使用

了解CompletionHandler接口

CompletionHandler源码

package java.nio.channels;/** * A handler for consuming the result of an asynchronous I/O operation. * * 

The asynchronous channels defined in this package allow a completion * handler to be specified to consume the result of an asynchronous operation. * The {@link #completed completed} method is invoked when the I/O operation * completes successfully. The {@link #failed failed} method is invoked if the * I/O operations fails. The implementations of these methods should complete * in a timely manner so as to avoid keeping the invoking thread from dispatching * to other completion handlers. * * @param

The result type of the I/O operation * @param
The type of the object attached to the I/O operation * * @since 1.7 */public interface CompletionHandler
{
/** * Invoked when an operation has completed. * * @param result * The result of the I/O operation. * @param attachment * The object attached to the I/O operation when it was initiated. */ void completed(V result, A attachment); /** * Invoked when an operation fails. * * @param exc * The exception to indicate why the I/O operation failed * @param attachment * The object attached to the I/O operation when it was initiated. */ void failed(Throwable exc, A attachment);}

解析

CompletionHandler接口中有两个方法需要实现,分别是completedfailed,当操作完成时,会回调completed,出现异常失败时会回调failed。

completed

操作完成时,回调completed函数,其有result和attachment两个参数:

  • result是操作完成后的操作结果;
  • attachment是在进行回调时可以传入的附件,用于回调内的操作;

failed

操作异常时回调failed函数,其有exc和attachment两个参数:

  • exc即进行操作时出现的异常;
  • attachment和completed中的一致,为在进行回调时传入的附件,用于回调内操作;

Demo

/**     * 接收请求的结束动作处理类,当异步socket服务接收到一个请求时,会回调此handler,从而对收到的请求进行处理     */    private class AcceptCompletionHandler implements CompletionHandler
{
@Override public void completed(AsynchronousSocketChannel channel, SimpleTimeServer attachment) {
... } @Override public void failed(Throwable exc, SimpleTimeServer attachment) {
exc.printStackTrace(); ... } }

基于AIO的简单时间服务

Server端实现代码

package com.dongrui.study.ioserver.aioserver;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.AsynchronousServerSocketChannel;import java.nio.channels.AsynchronousSocketChannel;import java.nio.channels.CompletionHandler;import java.nio.charset.StandardCharsets;import java.text.SimpleDateFormat;import java.util.Date;import java.util.concurrent.CountDownLatch;/** * 使用AIO实现的简单时间服务 */public class SimpleTimeServer implements Runnable {
/** * 维持服务线程的门闩 */ private CountDownLatch latch; /** * 异步socket服务通道 */ private AsynchronousServerSocketChannel asynchronousServerSocketChannel; private SimpleTimeServer(int port) throws IOException {
//开启异步socket服务 asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open(); //绑定端口 asynchronousServerSocketChannel.bind(new InetSocketAddress(port)); System.out.println("simple time server start in " + port); } @Override public void run() {
latch = new CountDownLatch(1); //异步socket服务接收请求 System.out.println("我是监听线程:" + Thread.currentThread()); asynchronousServerSocketChannel.accept(this, new AcceptCompletionHandler()); try {
latch.await(); } catch (InterruptedException e) {
e.printStackTrace(); } } /** * 接收请求的结束动作处理类,当异步socket服务接收到一个请求时,会回调此handler,从而对收到的请求进行处理 */ private class AcceptCompletionHandler implements CompletionHandler
{
@Override public void completed(AsynchronousSocketChannel channel, SimpleTimeServer attachment) {
try {
System.out.println("我是处理线程:" + Thread.currentThread()); //循环监听,进行监听操作的是SimpleTimeServer运行的线程 attachment.asynchronousServerSocketChannel.accept(attachment, this); //这里休眠20秒,可以看到当处理线程没有处理完成时,会启用新的线程来处理后面的请求 Thread.sleep(20); ByteBuffer buffer = ByteBuffer.allocate(1024); //从请求通道中读取数据 channel.read(buffer, buffer, new ReadCompletionHandler(channel)); } catch (InterruptedException e) {
e.printStackTrace(); } } @Override public void failed(Throwable exc, SimpleTimeServer attachment) {
//接收请求失败,打印异常信息,将门闩减一,服务线程终止 exc.printStackTrace(); attachment.latch.countDown(); } } /** * 读取数据的结束动作处理类,当系统将数据读取到buffer中,会回调此handler */ private class ReadCompletionHandler implements CompletionHandler
{
private AsynchronousSocketChannel channel; ReadCompletionHandler(AsynchronousSocketChannel channel) {
this.channel = channel; } @Override public void completed(Integer byteNum, ByteBuffer readBuffer) {
if (byteNum <= 0) return; readBuffer.flip(); byte[] body = new byte[byteNum]; readBuffer.get(body); String req = new String(body, StandardCharsets.UTF_8); System.out.println("the time server received order: " + req); SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String currentTime = "query time order".equalsIgnoreCase(req) ? format.format(new Date()) + "" : "bad order"; doWrite(currentTime); } @Override public void failed(Throwable exc, ByteBuffer attachment) {
try {
//读取失败,关闭通道 channel.close(); } catch (IOException e) {
e.printStackTrace(); } } private void doWrite(String msg) {
if (null != msg) {
byte[] bytes = msg.getBytes(StandardCharsets.UTF_8); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); channel.write(writeBuffer, writeBuffer, //写操作结束的回调handler new CompletionHandler
() {
@Override public void completed(Integer result, ByteBuffer buffer) {
if (buffer.hasRemaining()) {
channel.write(buffer, buffer, this); } } @Override public void failed(Throwable exc, ByteBuffer attachment) {
try {
channel.close(); } catch (IOException e) {
e.printStackTrace(); } } }); } } } public static void main(String[] args) {
try {
System.out.println("我是主线程:" + Thread.currentThread()); new SimpleTimeServer(8088).run(); System.out.println("监听线程已挂"); } catch (IOException e) {
e.printStackTrace(); } }}

Client端实现代码

package com.dongrui.study.ioclient.aioclient;import com.google.common.primitives.Bytes;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.AsynchronousSocketChannel;import java.nio.channels.CompletionHandler;import java.nio.channels.InterruptedByTimeoutException;import java.nio.charset.StandardCharsets;import java.util.ArrayList;import java.util.List;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;/** * 基于AIO的简单时间服务客户端 */public class SimpleTimeClient {
private String host; private int port; private CountDownLatch latch; /** * 异步socket通道 */ private AsynchronousSocketChannel channel; private SimpleTimeClient(String host, int port) throws IOException {
this.host = host; this.port = port; this.latch = new CountDownLatch(1); initChannel(); } private void initChannel() throws IOException {
// 打开异步socket通道 channel = AsynchronousSocketChannel.open(); // 异步连接指定地址,连接完成后会回调ConnectionCompletionHandler channel.connect(new InetSocketAddress(host, port), null, new ConnectionCompletionHandler()); } private class ConnectionCompletionHandler implements CompletionHandler
{
@Override public void completed(Void result, Void attachment) {
System.out.println("connection thread: " + Thread.currentThread()); String msg = "query time order"; ByteBuffer writeBuffer = ByteBuffer.allocate(msg.length()); writeBuffer.put(msg.getBytes(StandardCharsets.UTF_8)).flip(); // 异步写入发送数据,写入完成后会回调WriteCompletionHandler channel.write(writeBuffer, null, new WriteCompletionHandler()); } @Override public void failed(Throwable exc, Void attachment) {
exc.printStackTrace(); latch.countDown(); } } /** * 读取到的byte集合和缓存 */ private class BufferAndArr {
public BufferAndArr(List
bytesArr, ByteBuffer buffer) {
this.bytesArr = bytesArr; this.buffer = buffer; } List
bytesArr; ByteBuffer buffer; } /** * 写数据完成回调处理类 */ private class WriteCompletionHandler implements CompletionHandler
{
@Override public void completed(Integer result, Void attachment) {
System.out.println("write thread: " + Thread.currentThread()); List
rtnBytesArr = new ArrayList<>(); ByteBuffer readBuffer = ByteBuffer.allocate(1024); // 异步读取返回的数据,读取结束后会回调ReadCompletionHandler channel.read(readBuffer, 1000, TimeUnit.MILLISECONDS, new BufferAndArr(rtnBytesArr, readBuffer), new ReadCompletionHandler()); } @Override public void failed(Throwable exc, Void attachment) {
exc.printStackTrace(); latch.countDown(); } } /** * 读数据完成回调处理类 */ private class ReadCompletionHandler implements CompletionHandler
{ @Override public void completed(Integer bytesNum, BufferAndArr attachment) { System.out.println("read thread: " + Thread.currentThread()); int size = attachment.buffer.limit(); attachment.buffer.flip(); byte[] tempBytes = new byte[bytesNum]; attachment.buffer.get(tempBytes); attachment.bytesArr.addAll(Bytes.asList(tempBytes)); // 根据读取到的数据长度与缓存总长度比较,相等则继续读取,否则读取结束 if (bytesNum >= size) { attachment.buffer.clear(); // 继续读取时加入超时时间,如果已经读取完,则会触发超时异常,转到fail中 channel.read(attachment.buffer, 1000, TimeUnit.MILLISECONDS, attachment, new ReadCompletionHandler()); } else { completionAction(attachment.bytesArr); } } @Override public void failed(Throwable exc, BufferAndArr attachment) { // 当没有数据时会超时抛出InterruptedByTimeoutException异常,然后在这里处理读取结果,因为暂时没有发现更好的方法 if (exc instanceof InterruptedByTimeoutException) { completionAction(attachment.bytesArr); } else { exc.printStackTrace(); } } private void completionAction(List
bytesArr) { System.out.println("当前时间:" + new String(Bytes.toArray(bytesArr), StandardCharsets.UTF_8)); latch.countDown(); } } public static void main(String[] args) { while (true) { new Thread(() -> { try { System.out.println("time client thread: " + Thread.currentThread()); SimpleTimeClient client = new SimpleTimeClient("localhost", 8088); client.latch.await(); } catch (IOException | InterruptedException e) { e.printStackTrace(); } }).start(); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } }}

转载地址:http://xptlf.baihongyu.com/

你可能感兴趣的文章
解决 Kubernetes 部署 Metrics Server 无法访问 Apiserver 问题
查看>>
AWS 容器三大新品:K8s 发行版,免费镜像库和 “Game Changer”AWS Proton
查看>>
多平台容器镜像构建就看这一篇
查看>>
macOS Big Sur 使用全新虚拟化框架创建超轻量虚拟机!
查看>>
16 岁高中生成功在 iPhone 7 上安装 Ubuntu 20.04 桌面!
查看>>
两个程序都要用同一个端口,怎么解?
查看>>
有了这款图形管理界面,一分钟内配置 10 个 WireGuard 客户端不是梦
查看>>
Containerd镜像lazy-pulling解读
查看>>
Grafana 教程 - 构建你的第一个仪表盘
查看>>
由 OOM 引发的 ext4 文件系统卡死
查看>>
什么?WireGuard 可以让躲在 NAT 后面的客户端之间直连了??
查看>>
k8s集群内的节点,可能没你想象的那么健壮!(磁盘管理篇)
查看>>
利用 ebpf sockmap/redirection 提升 socket 性能(2020)
查看>>
春节前最后一波福利,Alibaba Java 训练营]5天突破面向对象编程限时免费报名!...
查看>>
我就要在容器里写文件!?
查看>>
支付宝集五福最全攻略,五分钟集齐五福!
查看>>
Runc 容器初始化和容器逃逸
查看>>
使用 GDB + Qemu 调试 Linux 内核
查看>>
介绍一个小工具:SSL-exporter
查看>>
深入理解 tc ebpf 的 direct-action (da) 模式(2020)
查看>>