传统的java代码当需要异步处理io时往往需要手写的多线程支持,nio包新增了3个异步通道
AsynchronousFileChannel 用于文件
AsynchronousSocketChannel 用于套接字,支持超时
AsynchronousServerSocketChannel 用于套接字接受服务端
当你希望由主线程发起io操作,并等待结果时,需要使用Future(对,就是current包下那个接口)来接收结果
相对应的它也提供一个回调的模式来处理结果
下面我尝试用AsynchronousFileChannel来读取一个文本文件的全部内容, 网上大部分的例子都是直接一次性将文件内容读入缓存
这里我们分几次读,因为实际生产环境中你很难碰到一个可以一次性读取的小文件还需要异步处理的
通常这种操作都是面向大文件
文件内容是 1234567890测试中文
package com.nio;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class TestPath3 {
public static void main(String[] args) throws IOException, InterruptedException, ExecutionException {
Path test3 = Paths.get("C:/ttt/test3.txt");
AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(test3, StandardOpenOption.READ);
int bufferSize = 8;
ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
long position = 0;
Future<Integer> operation = fileChannel.read(buffer, position);
while (!operation.isDone()){
System.out.println("do something here");
}
printBuffer(buffer);
while(operation.get()>0){
position+=bufferSize;
operation = fileChannel.read(buffer, position);
while (!operation.isDone()){
System.out.println("do something here");
}
printBuffer(buffer);
}
}
private static void printBuffer(ByteBuffer buffer) {
byte[] data;
buffer.flip();
data = new byte[buffer.limit()];
buffer.get(data);
System.out.println(new String(data));
buffer.clear();
}
}
得到结果
do something here
12345678
do something here
do something here
do something here
do something here
do something here
do something here
90测试
do something here
do something here
do something here
中文
do something here
do something here
do something here
do something here
do something here
do something here
do something here
do something here
do something here
do something here
do something here
do something here
do something here
do something here
do something here
do something here
do something here
do something here
do something here
do something here
do something here
do something here
do something here
do something here

当然这种方式跟正常的处理模式好像正好是颠倒的, 通常情况下我们更希望使用一种回调的模式
子线程结束运行应该主动通知主线程,而不是让主线程使用while进行轮询
下面这种方式是比较正常的
package com.nio;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.ExecutionException;
public class TestPath5 {
public static void main(String[] args) throws IOException, InterruptedException, ExecutionException {
Path test3 = Paths.get("C:/ttt/test3.txt");
AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(test3, StandardOpenOption.READ);
int bufferSize = 80000;
ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
long position = 0;
CompleteHandlerImpl completeHandlerImpl = new CompleteHandlerImpl();
fileChannel.read(buffer, position, buffer, completeHandlerImpl);
Thread.sleep(5000);
}
public static void printBuffer(ByteBuffer buffer) {
byte[] data;
buffer.flip();
data = new byte[buffer.limit()];
buffer.get(data);
System.out.println(new String(data));
buffer.clear();
}
}
class CompleteHandlerImpl implements CompletionHandler<Integer,ByteBuffer>{
@Override
public void completed(Integer result, ByteBuffer attachment) {
TestPath5.printBuffer(attachment);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.out.println("error");
}
}
上面这个例子是通过回调一次性读取了全部文件, 这里我们稍做改动,用递归把它写成分次读入处理的
package com.nio;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.ExecutionException;
public class TestPath5 {
public static void main(String[] args) throws IOException, InterruptedException, ExecutionException {
Path test3 = Paths.get("C:/ttt/test3.txt");
AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(test3, StandardOpenOption.READ);
int bufferSize = 8;
ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
long position = 0;
CompleteHandlerImpl completeHandlerImpl = new CompleteHandlerImpl(fileChannel,bufferSize,position);
fileChannel.read(buffer, position, buffer, completeHandlerImpl);
Thread.sleep(500);
}
public static void printBuffer(ByteBuffer buffer) {
byte[] data;
buffer.flip();
data = new byte[buffer.limit()];
buffer.get(data);
System.out.println(new String(data));
buffer.clear();
}
}
class CompleteHandlerImpl implements CompletionHandler<Integer,ByteBuffer>{
AsynchronousFileChannel fileChannel;
int bufferSize;
long position;
private CompleteHandlerImpl(){
}
public CompleteHandlerImpl(AsynchronousFileChannel fileChannel, int bufferSize, long position) {
this.fileChannel = fileChannel;
this.bufferSize = bufferSize;
this.position = position;
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
if(result>0){
TestPath5.printBuffer(attachment);
position = position + bufferSize;
CompleteHandlerImpl completeHandlerImpl = new CompleteHandlerImpl(fileChannel,bufferSize,position);
fileChannel.read(attachment, position, attachment, completeHandlerImpl);
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.out.println("error");
}
}
