在jdk 1.4以前,java的io操作集中在java.io这个包中,是基于流的同步(blocking)api。对于大多数应用来说,这样的api使用很方便,然而,一些对性能要求较高的应用,尤其是服务端应用,往往需要一个更为有效的方式来处理io。从jdk 1.4起,nio api作为一个基于缓冲区,并能提供异步(non-blocking)io操作的api被引入。本文对其进行深入的介绍。
nio api主要集中在java.nio和它的subpackages中:
java.nio
定义了buffer及其数据类型相关的子类。其中被java.nio.channels中的类用来进行io操作的bytebuffer的作用非常重要。
java.nio.channels
定义了一系列处理io的channel接口以及这些接口在文件系统和网络通讯上的实现。通过selector这个类,还提供了进行异步io操作的办法。这个包可以说是nio api的核心。
java.nio.channels.spi
定义了可用来实现channel和selector api的抽象类。
java.nio.charset
定义了处理字符编码和解码的类。
java.nio.charset.spi
定义了可用来实现charset api的抽象类。
java.nio.channels.spi和java.nio.charset.spi这两个包主要被用来对现有nio api进行扩展,在实际的使用中,我们一般只和另外的3个包打交道。下面将对这3个包一一介绍。
package java.nio
这个包主要定义了buffer及其子类。buffer定义了一个线性存放primitive type数据的容器接口。对于除boolean以外的其他primitive type,都有一个相应的buffer子类,bytebuffer是其中最重要的一个子类。
下面这张uml类图描述了java.nio中的类的关系:
buffer
定义了一个可以线性存放primitive type数据的容器接口。buffer主要包含了与类型(byte, char…)无关的功能。值得注意的是buffer及其子类都不是线程安全的。
每个buffer都有以下的属性:
capacity
这个buffer最多能放多少数据。capacity一般在buffer被创建的时候指定。
limit
在buffer上进行的读写操作都不能越过这个下标。当写数据到buffer中时,limit一般和capacity相等,当读数据时,limit代表buffer中有效数据的长度。
position
读/写操作的当前下标。当使用buffer的相对位置进行读/写操作时,读/写会从这个下标进行,并在操作完成后,buffer会更新下标的值。
mark
一个临时存放的位置下标。调用mark()会将mark设为当前的position的值,以后调用reset()会将position属性设置为mark的值。mark的值总是小于等于position的值,如果将position的值设的比mark小,当前的mark值会被抛弃掉。
这些属性总是满足以下条件:
0 <= mark <= position <= limit <= capacity
limit和position的值除了通过limit()和position()函数来设置,也可以通过下面这些函数来改变:
buffer clear()
把position设为0,把limit设为capacity,一般在把数据写入buffer前调用。
buffer flip()
把limit设为当前position,把position设为0,一般在从buffer读出数据前调用。
buffer rewind()
把position设为0,limit不变,一般在把数据重写入buffer前调用。
buffer对象有可能是只读的,这时,任何对该对象的写操作都会触发一个readonlybufferexception。isreadonly()方法可以用来判断一个buffer是否只读。
bytebuffer
在buffer的子类中,bytebuffer是一个地位较为特殊的类,因为在java.io.channels中定义的各种channel的io操作基本上都是围绕bytebuffer展开的。
bytebuffer定义了4个static方法来做创建工作:
bytebuffer allocate(int capacity)
创建一个指定capacity的bytebuffer。
bytebuffer allocatedirect(int capacity)
创建一个direct的bytebuffer,这样的bytebuffer在参与io操作时性能会更好(很有可能是在底层的实现使用了dma技术),相应的,创建和回收direct的bytebuffer的代价也会高一些。isdirect()方法可以检查一个buffer是否是direct的。
bytebuffer wrap(byte [] array)
bytebuffer wrap(byte [] array, int offset, int length)
把一个byte数组或byte数组的一部分包装成bytebuffer。
bytebuffer定义了一系列get和put操作来从中读写byte数据,如下面几个:
byte get()
bytebuffer get(byte [] dst)
byte get(int index)
bytebuffer put(byte b)
bytebuffer put(byte [] src)
bytebuffer put(int index, byte b)
这些操作可分为绝对定位和相对定为两种,相对定位的读写操作依靠position来定位buffer中的位置,并在操作完成后会更新position的值。
在其它类型的buffer中,也定义了相同的函数来读写数据,唯一不同的就是一些参数和返回值的类型。
除了读写byte类型数据的函数,bytebuffer的一个特别之处是它还定义了读写其它primitive数据的方法,如:
int getint()
从bytebuffer中读出一个int值。
bytebuffer putint(int value)
写入一个int值到bytebuffer中。
读写其它类型的数据牵涉到字节序问题,bytebuffer会按其字节序(大字节序或小字节序)写入或读出一个其它类型的数据(int,long…)。字节序可以用order方法来取得和设置:
byteorder order()
返回bytebuffer的字节序。
bytebuffer order(byteorder bo)
设置bytebuffer的字节序。
bytebuffer另一个特别的地方是可以在它的基础上得到其它类型的buffer。如:
charbuffer ascharbuffer()
为当前的bytebuffer创建一个charbuffer的视图。在该视图buffer中的读写操作会按照bytebuffer的字节序作用到bytebuffer中的数据上。
用这类方法创建出来的buffer会从bytebuffer的position位置开始到limit位置结束,可以看作是这段数据的视图。视图buffer的readonly属性和direct属性与bytebuffer的一致,而且也只有通过这种方法,才可以得到其他数据类型的direct buffer。
byteorder
用来表示bytebuffer字节序的类,可将其看成java中的enum类型。主要定义了下面几个static方法和属性:
byteorder big_endian
代表大字节序的byteorder。
byteorder little_endian
代表小字节序的byteorder。
byteorder nativeorder()
返回当前硬件平台的字节序。
mappedbytebuffer
bytebuffer的子类,是文件内容在内存中的映射。这个类的实例需要通过filechannel的map()方法来创建。
接下来看看一个使用bytebuffer的例子,这个例子从标准输入不停地读入字符,当读满一行后,将收集的字符写到标准输出:
public static void main(string [] args)
throws ioexception
{
// 创建一个capacity为256的bytebuffer
bytebuffer buf = bytebuffer.allocate(256);
while (true) {
// 从标准输入流读入一个字符
int c = system.in.read();
// 当读到输入流结束时,退出循环
if (c == -1)
break;
// 把读入的字符写入bytebuffer中
buf.put((byte) c);
// 当读完一行时,输出收集的字符
if (c == \n) {
// 调用flip()使limit变为当前的position的值,position变为0,
// 为接下来从bytebuffer读取做准备
buf.flip();
// 构建一个byte数组
byte [] content = new byte[buf.limit()];
// 从bytebuffer中读取数据到byte数组中
buf.get(content);
// 把byte数组的内容写到标准输出
system.out.print(new string(content));
// 调用clear()使position变为0,limit变为capacity的值,
// 为接下来写入数据到bytebuffer中做准备
buf.clear();
}
}
}
package java.nio.channels
这个包定义了channel的概念,channel表现了一个可以进行io操作的通道(比如,通过filechannel,我们可以对文件进行读写操作)。java.nio.channels包含了文件系统和网络通讯相关的channel类。这个包通过selector和selectablechannel这两个类,还定义了一个进行异步(non-blocking)io操作的api,这对需要高性能io的应用非常重要。
下面这张uml类图描述了java.nio.channels中interface的关系:
channel
channel表现了一个可以进行io操作的通道,该interface定义了以下方法:
boolean isopen()
该channel是否是打开的。
void close()
关闭这个channel,相关的资源会被释放。
readablebytechannel
定义了一个可从中读取byte数据的channel interface。
int read(bytebuffer dst)
从channel中读取byte数据并写到bytebuffer中。返回读取的byte数。
writablebytechannel
定义了一个可向其写byte数据的channel interface。
int write(bytebuffer src)
从bytebuffer中读取byte数据并写到channel中。返回写出的byte数。
bytechannel
bytechannel并没有定义新的方法,它的作用只是把readablebytechannel和writablebytechannel合并在一起。
scatteringbytechannel
继承了readablebytechannel并提供了同时往几个bytebuffer中写数据的能力。
gatheringbytechannel
继承了writablebytechannel并提供了同时从几个bytebuffer中读数据的能力。
interruptiblechannel
用来表现一个可以被异步关闭的channel。这表现在两方面:
1. 当一个interruptiblechannel的close()方法被调用时,其它block在这个interruptiblechannel的io操作上的线程会接收到一个asynchronouscloseexception。
2. 当一个线程block在interruptiblechannel的io操作上时,另一个线程调用该线程的interrupt()方法会导致channel被关闭,该线程收到一个closedbyinterruptexception,同时线程的interrupt状态会被设置。
接下来的这张uml类图描述了java.nio.channels中类的关系:
异步io
异步io的支持可以算是nio api中最重要的功能,异步io允许应用程序同时监控多个channel以提高性能,这一功能是通过selector,selectablechannel和selectionkey这3个类来实现的。
selectablechannel代表了可以支持异步io操作的channel,可以将其注册在selector上,这种注册的关系由selectionkey这个类来表现(见uml图)。selector这个类通过select()函数,给应用程序提供了一个可以同时监控多个io channel的方法:
应用程序通过调用select()函数,让selector监控注册在其上的多个selectablechannel,当有channel的io操作可以进行时,select()方法就会返回以让应用程序检查channel的状态,并作相应的处理。
下面是jdk 1.4中异步io的一个例子,这段code使用了异步io实现了一个time server:
private static void acceptconnections(int port) throws exception {
// 打开一个selector
selector acceptselector =
selectorprovider.provider().openselector();
// 创建一个serversocketchannel,这是一个selectablechannel的子类
serversocketchannel ssc = serversocketchannel.open();
// 将其设为non-blocking状态,这样才能进行异步io操作
ssc.configureblocking(false);
// 给serversocketchannel对应的socket绑定ip和端口
inetaddress lh = inetaddress.getlocalhost();
inetsocketaddress isa = new inetsocketaddress(lh, port);
ssc.socket().bind(isa);
// 将serversocketchannel注册到selector上,返回对应的selectionkey
selectionkey acceptkey =
ssc.register(acceptselector, selectionkey.op_accept);
int keysadded = 0;
// 用select()函数来监控注册在selector上的selectablechannel
// 返回值代表了有多少channel可以进行io操作 (ready for io)
while ((keysadded = acceptselector.select()) > 0) {
// selectedkeys()返回一个selectionkey的集合,
// 其中每个selectionkey代表了一个可以进行io操作的channel。
// 一个serversocketchannel可以进行io操作意味着有新的tcp连接连入了
set readykeys = acceptselector.selectedkeys();
iterator i = readykeys.iterator();
while (i.hasnext()) {
selectionkey sk = (selectionkey) i.next();
// 需要将处理过的key从selectedkeys这个集合中删除
i.remove();
// 从selectionkey得到对应的channel
serversocketchannel nextready =
(serversocketchannel) sk.channel();
// 接受新的tcp连接
socket s = nextready.accept().socket();
// 把当前的时间写到这个新的tcp连接中
printwriter out =
new printwriter(s.getoutputstream(), true);
date now = new date();
out.println(now);
// 关闭连接
out.close();
}
}
}
这是个纯粹用于演示的例子,因为只有一个serversocketchannel需要监控,所以其实并不真的需要使用到异步io。不过正因为它的简单,可以很容易地看清楚异步io是如何工作的。
selectablechannel
这个抽象类是所有支持异步io操作的channel(如datagramchannel、socketchannel)的父类。selectablechannel可以注册到一个或多个selector上以进行异步io操作。
selectablechannel可以是blocking和non-blocking模式(所有channel创建的时候都是blocking模式),只有non-blocking的selectablechannel才可以参与异步io操作。
selectablechannel configureblocking(boolean block)
设置blocking模式。
boolean isblocking()
返回blocking模式。
通过register()方法,selectablechannel可以注册到selector上。
int validops()
返回一个bit mask,表示这个channel上支持的io操作。当前在selectionkey中,用静态常量定义了4种io操作的bit值:op_accept,op_connect,op_read和op_write。
selectionkey register(selector sel, int ops)
将当前channel注册到一个selector上并返回对应的selectionkey。在这以后,通过调用selector的select()函数就可以监控这个channel。ops这个参数是一个bit mask,代表了需要监控的io操作。
selectionkey register(selector sel, int ops, object att)
这个函数和上一个的意义一样,多出来的att参数会作为attachment被存放在返回的selectionkey中,这在需要存放一些session state的时候非常有用。
boolean isregistered()
该channel是否已注册在一个或多个selector上。
selectablechannel还提供了得到对应selectionkey的方法:
selectionkey keyfor(selector sel)
返回该channe在selector上的注册关系所对应的selectionkey。若无注册关系,返回null。
selector
selector可以同时监控多个selectablechannel的io状况,是异步io的核心。
selector open()
selector的一个静态方法,用于创建实例。
在一个selector中,有3个selectionkey的集合:
1. key set代表了所有注册在这个selector上的channel,这个集合可以通过keys()方法拿到。
2. selected-key set代表了所有通过select()方法监测到可以进行io操作的channel,这个集合可以通过selectedkeys()拿到。
3. cancelled-key set代表了已经cancel了注册关系的channel,在下一个select()操作中,这些channel对应的selectionkey会从key set和cancelled-key set中移走。这个集合无法直接访问。
以下是select()相关方法的说明:
int select()
监控所有注册的channel,当其中有注册的io操作可以进行时,该函数返回,并将对应的selectionkey加入selected-key set。
int select(long timeout)
可以设置超时的select()操作。
int selectnow()
进行一个立即返回的select()操作。
selector wakeup()
使一个还未返回的select()操作立刻返回。
selectionkey
代表了selector和selectablechannel的注册关系。
selector定义了4个静态常量来表示4种io操作,这些常量可以进行位操作组合成一个bit mask。
int op_accept
有新的网络连接可以accept,serversocketchannel支持这一异步io。
int op_connect
代表连接已经建立(或出错),socketchannel支持这一异步io。
int op_read
int op_write
代表了读、写操作。
以下是其主要方法:
object attachment()
返回selectionkey的attachment,attachment可以在注册channel的时候指定。
object attach(object ob)
设置selectionkey的attachment。
selectablechannel channel()
返回该selectionkey对应的channel。
selector selector()
返回该selectionkey对应的selector。
void cancel()
cancel这个selectionkey所对应的注册关系。
int interestops()
返回代表需要selector监控的io操作的bit mask。
selectionkey interestops(int ops)
设置interestops。
int readyops()
返回一个bit mask,代表在相应channel上可以进行的io操作。
serversocketchannel
支持异步操作,对应于java.net.serversocket这个类,提供了tcp协议io接口,支持op_accept操作。
serversocket socket()
返回对应的serversocket对象。
socketchannel accept()
接受一个连接,返回代表这个连接的socketchannel对象。
socketchannel
支持异步操作,对应于java.net.socket这个类,提供了tcp协议io接口,支持op_connect,op_read和op_write操作。这个类还实现了bytechannel,scatteringbytechannel和gatheringbytechannel接口。
datagramchannel和这个类比较相似,其对应于java.net.datagramsocket,提供了udp协议io接口。
socket socket()
返回对应的socket对象。
boolean connect(socketaddress remote)
boolean finishconnect()
connect()进行一个连接操作。如果当前socketchannel是blocking模式,这个函数会等到连接操作完成或错误发生才返回。如果当前socketchannel是non-blocking模式,函数在连接能立刻被建立时返回true,否则函数返回false,应用程序需要在以后用finishconnect()方法来完成连接操作。
pipe
包含了一个读和一个写的channel(pipe.sourcechannel和pipe.sinkchannel),这对channel可以用于进程中的通讯。
filechannel
用于对文件的读、写、映射、锁定等操作。和映射操作相关的类有filechannel.mapmode,和锁定操作相关的类有filelock。值得注意的是filechannel并不支持异步操作。
channels
这个类提供了一系列static方法来支持stream类和channel类之间的互操作。这些方法可以将channel类包装为stream类,比如,将readablebytechannel包装为inputstream或reader;也可以将stream类包装为channel类,比如,将outputstream包装为writablebytechannel。
package java.nio.charset
这个包定义了charset及相应的encoder和decoder。下面这张uml类图描述了这个包中类的关系,可以将其中charset,charsetdecoder和charsetencoder理解成一个abstract factory模式的实现:
charset
代表了一个字符集,同时提供了factory method来构建相应的charsetdecoder和charsetencoder。
charset提供了以下static的方法:
sortedmap availablecharsets()
返回当前系统支持的所有charset对象,用charset的名字作为set的key。
boolean issupported(string charsetname)
判断该名字对应的字符集是否被当前系统支持。
charset forname(string charsetname)
返回该名字对应的charset对象。
charset中比较重要的方法有:
string name()
返回该字符集的规范名。
set aliases()
返回该字符集的所有别名。
charsetdecoder newdecoder()
创建一个对应于这个charset的decoder。
charsetencoder newencoder()
创建一个对应于这个charset的encoder。
charsetdecoder
将按某种字符集编码的字节流解码为unicode字符数据的引擎。
charsetdecoder的输入是bytebuffer,输出是charbuffer。进行decode操作时一般按如下步骤进行:
1. 调用charsetdecoder的reset()方法。(第一次使用时可不调用)
2. 调用decode()方法0到n次,将endofinput参数设为false,告诉decoder有可能还有新的数据送入。
3. 调用decode()方法最后一次,将endofinput参数设为true,告诉decoder所有数据都已经送入。
4. 调用decoder的flush()方法。让decoder有机会把一些内部状态写到输出的charbuffer中。
charsetdecoder reset()
重置decoder,并清除decoder中的一些内部状态。
coderresult decode(bytebuffer in, charbuffer out, boolean endofinput)
从bytebuffer类型的输入中decode尽可能多的字节,并将结果写到charbuffer类型的输出中。根据decode的结果,可能返回3种coderresult:coderresult.underflow表示已经没有输入可以decode;coderresult.overflow表示输出已满;其它的coderresult表示decode过程中有错误发生。根据返回的结果,应用程序可以采取相应的措施,比如,增加输入,清除输出等等,然后再次调用decode()方法。
coderresult flush(charbuffer out)
有些decoder会在decode的过程中保留一些内部状态,调用这个方法让这些decoder有机会将这些内部状态写到输出的charbuffer中。调用成功返回coderresult.underflow。如果输出的空间不够,该函数返回coderresult.overflow,这时应用程序应该扩大输出charbuffer的空间,然后再次调用该方法。
charbuffer decode(bytebuffer in)
一个便捷的方法把bytebuffer中的内容decode到一个新创建的charbuffer中。在这个方法中包括了前面提到的4个步骤,所以不能和前3个函数一起使用。
decode过程中的错误有两种:malformed-input coderresult表示输入中数据有误;unmappable-character coderresult表示输入中有数据无法被解码成unicode的字符。如何处理decode过程中的错误取决于decoder的设置。对于这两种错误,decoder可以通过codingerroraction设置成:
1. 忽略错误
2. 报告错误。(这会导致错误发生时,decode()方法返回一个表示该错误的coderresult。)
3. 替换错误,用decoder中的替换字串替换掉有错误的部分。
codingerroraction malformedinputaction()
返回malformed-input的出错处理。
charsetdecoder onmalformedinput(codingerroraction newaction)
设置malformed-input的出错处理。
codingerroraction unmappablecharacteraction()
返回unmappable-character的出错处理。
charsetdecoder onunmappablecharacter(codingerroraction newaction)
设置unmappable-character的出错处理。
string replacement()
返回decoder的替换字串。
charsetdecoder replacewith(string newreplacement)
设置decoder的替换字串。
charsetencoder
将unicode字符数据编码为特定字符集的字节流的引擎。其接口和charsetdecoder相类似。
coderresult
描述encode/decode操作结果的类。
coderesult包含两个static成员:
coderresult overflow
表示输出已满
coderresult underflow
表示输入已无数据可用。
其主要的成员函数有:
boolean iserror()
boolean ismalformed()
boolean isunmappable()
boolean isoverflow()
boolean isunderflow()
用于判断该coderresult描述的错误。
int length()
返回错误的长度,比如,无法被转换成unicode的字节长度。
void throwexception()
抛出一个和这个coderresult相对应的exception。
codingerroraction
表示encoder/decoder中错误处理方法的类。可将其看成一个enum类型。有以下static属性:
codingerroraction ignore
忽略错误。
codingerroraction replace
用替换字串替换有错误的部分。
codingerroraction report
报告错误,对于不同的函数,有可能是返回一个和错误有关的coderresult,也有可能是抛出一个charactercodingexception。
参考文献
david flanagan – java in a nutshell
