习惯了tcp编程,认为udp可以包办这些问题是错误的。一个udp应用程序要承担可靠性方面的全部工作,包括报文的丢失、重复、时延、乱序以及连接失效等问题。
通常我们在可靠性好,传输时延小的局域网上开发测试,一些问题不容易暴露,但在大型互联网上却会出现错误。
udp协议把递送的可靠性责任推到了上层即应用层,下面简单编写了几个类来专门处理两个问题:乱序和丢包。
四个类:datapacket 类,packetheader类,packetbody类 ,dataentry类,位于同一个文件datapacket .java中。
datapacket 类相当于一个门面模式,提供给外部使用,通信数据也在这个类中处理。
package com.skysoft.pcks;
import java.io.*;
import java.net.*;
import java.util.*;
public class datapacket {
inputstream is;
outputstream os;
packetheader header;
packetbody body;
arraylist al;
public static final int dataswapsize = 64532;
/**
* 在接收数据报使用
*/
public datapacket() {
header = new packetheader();
body = new packetbody();
al = new arraylist();
}
/**
* 在发送数据报时使用,它调用报文分割操作.
* @param file string 硬盘文件
*/
public datapacket(string file) {
this();
try {
is = new fileinputstream(file);
header.calcheaderinfo(is.available());
this.madebody();
is.close();
//this.gereratedata();
}
catch (filenotfoundexception ex) {
ex.printstacktrace();
}
catch (ioexception ex1) {
ex1.printstacktrace();
}
}
/**
* 在发送数据报时使用,它调用报文分割操作.
* @param url url url地址
*/
public datapacket(url url) {
this();
try {
//is = url.openstream();
urlconnection conn=url.openconnection();
is=conn.getinputstream();
int total=conn.getcontentlength();
header.calcheaderinfo(total);
this.madebody();
//system.out.println(total+”:”+total);
is.close();
}
catch (ioexception ex) {
ex.printstacktrace();
}
}
/**
* 为发送构造分组,使用packageheader处理了报头格式,并为分组编序号.
*/
private void madebody() {
al.clear();
byte[] buffer;
dataentry de;
for (int i = 0; i < header.fragmentcounter; i++) {
try {
bytearrayoutputstream bos = new bytearrayoutputstream();
//is.skip(i * body.body_buffer_size);
header.arragesort(i);
de = new dataentry(packetbody.body_buffer_size);
de.setsn(i);
de.setstreamsize(header.getstreamsize());
de.setfragmentcounter(header.getfragmentcounter());
if (header.iswtailfragment(i)) {
buffer = new byte[header.getminfragment()];
is.read(buffer, 0, buffer.length);
header.setactbytesize(header.getminfragment());
de.setactbytesize(header.getminfragment());
}
else {
buffer = new byte[body.body_buffer_size];
is.read(buffer, 0, buffer.length);
}
//system.out.println(“length——-“+i+” “+body.getbody().length+” “+header.getminfragment());
body.setbody(buffer);
//system.out.println(“length:” + i + ” ” + header.tostring());
bos.write(header.getbyte(), 0, header.header_buffer_size);
bos.write(body.getbody(), 0, body.getbody().length);
de.setbytes(bos.tobytearray());
al.add(de);
}
catch (ioexception ex) {
ex.printstacktrace();
}
}
}
/**
* 为发送构造分组,没有考虑报头格式,也没有为分组编序号.
*/
private void madebody1() {
al.clear();
for (int i = 0; i < header.fragmentcounter; i++) {
try {
if (header.iswtailfragment(i))
is.read(body.getbody(), i * body.body_buffer_size,
header.getminfragment());
else
is.read(body.getbody(), i * body.body_buffer_size,
body.body_buffer_size);
bytearrayoutputstream bos = new bytearrayoutputstream();
bos.write(header.getbyte(), 0, header.header_buffer_size);
bos.write(body.getbody(), header.header_buffer_size,
body.getbody().length);
al.add(bos);
}
catch (ioexception ex) {
ex.printstacktrace();
}
}
}
/**
* 在接收到报文后,对此报文执行组装,并处理报文丢失和乱序情况.
* @param b1 byte[]
*/
public void add(byte[] b1) {
byte[] buffer = (byte[]) b1.clone();
handlertext(buffer);
dataentry de = new dataentry(buffer, header.getactbytesize());
de.setsn(header.getsn());
de.setstreamsize(header.getstreamsize());
de.setfragmentcounter(header.getfragmentcounter());
al.add(de);
}
private void handlertext(byte[] buffer) {
bytearrayoutputstream baos = new bytearrayoutputstream();
baos.write(buffer, 0, header.header_buffer_size);
byte[] b=new byte[header.header_buffer_size];
system.arraycopy(buffer,0,b,0,b.length);
bytearrayinputstream bais = new bytearrayinputstream(baos.tobytearray());
inputstreamreader isr = new inputstreamreader(bais);
bufferedreader br = new bufferedreader(isr);
try {
header = new packetheader(br.readline());
}
catch (exception ex) {
ex.printstacktrace();
}
}
private string calfilesize(int size) {
return size / 1024 + “k”;
}
public arraylist getdatapackets() {
return al;
}
/**
* 是否接收完毕,通过序号是否等于最大段数来判断,这也许有问题,比如,正好是最后一个段丢失了,这样
* 这个包整个就丢失了.
* @return
*/
public boolean isfull() {
return this.header.getsn() == this.header.getfragmentcounter() – 1 ? true : false;
}
/**
* 判断是否只有一个段.
* @return
*/
public boolean iszero() {
return this.header.getsn() == 0 ? true : false;
}
/**
* 该函数执行报文组装,不考虑丢失的报文.
* @return
*/
private bytearrayoutputstream fetchdatapackets() {
bytearrayoutputstream bos = new bytearrayoutputstream();
byte[] buffer = null;
dataentry de;
for (int i = 0; i < al.size(); i++) {
try {
de = this.getsndata(i);
buffer = de.getbyte();
if (header.getstreamsize() == de.getstreamsize()) {
bos.write(de.getbyte(), header.header_buffer_size, de.getactbytesize());
system.out.println(de.tostring() + ” — fetchdatapackets”);
}
}
catch (exception ex) {
ex.printstacktrace();
}
}
return bos;
}
/**
* 该函数执行报文组装,对于丢失的报文,写入空报文.
* @return bytearrayoutputstream
*/
private bytearrayoutputstream fetchdatapackets_sn() {
bytearrayoutputstream bos = new bytearrayoutputstream();
byte[] buffer;
dataentry de;
for (int i = 0; i < header.getfragmentcounter(); i++) {
try {
de = this.getsndata(i);
if (de == null) {
de = seachdedata(i);
}
buffer = de.getbyte();
//system.out.println(de.getsn() + “:” + i);
//handlertext(buffer);
//bos.write(buffer, header.header_buffer_size,
// buffer.length – header.header_buffer_size);
if (header.getstreamsize() == de.getstreamsize()) {
bos.write(de.getbyte(), header.header_buffer_size,
de.getactbytesize());
//system.out.println(de.tostring());
}
}
catch (exception ex) {
ex.printstacktrace();
}
}
return bos;
}
/**
* 对缓冲的数据包进行排序处理,即按顺序提取同一帧的数据,如果没有找到该序号的帧,则返回空值.
* @param sn int 要找的帧序号.
* @return dataentry
*/
private dataentry getsndata(int sn) {
dataentry de = null;
for (int i = 0; i < al.size(); i++) {
de = (dataentry) al.get(i);
if (header.getstreamsize() == de.getstreamsize()) {
if (sn == de.getsn())
break;
else
de = null;
}
}
return de;
}
/**
* 按序号开始向前或者是向后寻找最近的帧片段,日后可以增加请求重发功能,通过开一个通信连接.
* @param sn int
* @return dataentry
*/
private dataentry seachdedata(int sn) {
dataentry de = null;
int initvalue, minvalue = 10000;
dataentry back, fore = null;
for (int i = 0; i < al.size(); i++) {
de = (dataentry) al.get(i);
if (header.getstreamsize() == de.getstreamsize()) {
initvalue = math.abs(de.getsn() – sn);
if (de.getfragmentcounter() != de.getsn() && initvalue < minvalue) {
minvalue = initvalue;
fore = de;
}
}
}
return fore;
}
/**
* 除去最后一帧外,随机抽取一帧.
* @return dataentry
*/
private dataentry seachdedata() {
dataentry de = null;
for (int i = 0; i < al.size(); i++) {
de = (dataentry) al.get(i);
system.out.println(“sky ::::” + de.getfragmentcounter() + “:” + de.getsn() +
“:” + i);
if (header.getstreamsize() == de.getstreamsize()) {
if (de.getfragmentcounter() != de.getsn()) {
break;
}
}
}
return de;
}
/**
* 生成组装完的结果数据.因为用图像来做测试,所以令其返回图像.
* @return image
*/
public java.awt.image gereratedata() {
bytearrayinputstream bis;
java.awt.image.bufferedimage bimage = null;
try {
byte[] b = fetchdatapackets_sn().tobytearray();
//fetchdatapackets_old1()
bis = new bytearrayinputstream(b);
bimage = javax.imageio.imageio.read(bis);
}
catch (exception ex1) {
ex1.printstacktrace();
}
return bimage;
}
public static void main(string args[]) {
datapacket dp = new datapacket(“e:\\nature\\14.jpg”);
}
}
/**
* 数据实体,充当临时处理场所.
* @author administrator
*
*/
class dataentry {
byte[] bytes;
int fragmentcounter, sn, actbytesize;
long streamsize;
int minfragment;
public dataentry() {
}
public dataentry(int size) {
this.actbytesize = size;
}
public dataentry(byte[] b, int i) {
this.bytes = b;
this.actbytesize = i;
}
public byte[] getbyte() {
return this.bytes;
}
public void setbytes(byte[] b) {
this.bytes = b;
}
public void setstreamsize(long size) {
this.streamsize = size;
}
public long getstreamsize() {
return this.streamsize;
}
public int getminfragment() {
return minfragment;
}
public synchronized void setsn(int i) {
this.sn = i;
}
public synchronized int getsn() {
return sn;
}
public synchronized int getfragmentcounter() {
return fragmentcounter;
}
public synchronized void setfragmentcounter(int c) {
this.fragmentcounter = c;
}
public void setactbytesize(int size) {
actbytesize = size;
}
public int getactbytesize() {
return actbytesize;
}
public string tostring() {
return this.streamsize + “::” + this.fragmentcounter + “::” + this.sn +
“::” + this.actbytesize + ” recv dataentry”;
}
}
/**
* 报头,处理报头格式
* @author administrator
*
*/
class packetheader implements serializable{
public static final int header_buffer_size = 1024;
int fragmentcounter, sn;
int actbytesize = packetbody.body_buffer_size;
byte[] header; //= new byte[header_buffer_size];
long streamsize;
int minfragment;
public packetheader() {
}
public packetheader(long l) {
this.setstreamsize(l);
}
public packetheader(string s) {
string[] tm = s.split(“::”);
this.setactbytesize(integer.parseint(tm[3]));
this.setsn(integer.parseint(tm[2]));
this.setfragmentcounter(integer.parseint(tm[1]));
this.setstreamsize(long.parselong(tm[0]));
}
/**
* 根据文件的段的顺序生成数据头.
* @param sn 文件序列
*/
public void arragesort(int sn) {
this.setsn(sn);
this.setbyte();
}
public void calcheaderinfo(long l) {
this.setstreamsize(l);
calcheaderinfo();
}
/**
* 计算流要被分成的片段数量,并得出最小片段余量.
*/
public void calcheaderinfo() {
fragmentcounter = math.round( (float) streamsize /
packetbody.body_buffer_size);
float critical = (float) streamsize / packetbody.body_buffer_size;
if (critical – fragmentcounter < 0.5 && critical – fragmentcounter > 0)
fragmentcounter++;
minfragment = (int) (streamsize % packetbody.body_buffer_size);
}
public byte[] getheader() {
long it = new long(this.streamsize);
return new byte[] {it.bytevalue()};
}
public byte[] getbyte() {
return header; //this.tostring().getbytes();
}
/**
* 生成报头字节,首先取得数据包头 流尺寸::段片数::段顺序::段实际尺寸 的字节形式,
* 然后加入回车换行符号,对于1024字节中剩余的部分一律写入元素为0的字节数组.
*/
public void setbyte() {
bytearrayoutputstream bos = new bytearrayoutputstream();
byte[] buffer = this.tobyte();
try {
bos.write(buffer);
bos.write(“\r\n”.getbytes());
bos.write(new byte[packetheader.header_buffer_size – buffer.length], 0,
packetheader.header_buffer_size – buffer.length);
header = bos.tobytearray();
}
catch (ioexception ex) {
ex.printstacktrace();
}
}
public void setstreamsize(long size) {
this.streamsize = size;
}
public long getstreamsize() {
return this.streamsize;
}
public int getminfragment() {
return minfragment;
}
public synchronized void setsn(int i) {
this.sn = i;
}
public int getsn() {
return sn;
}
public int getfragmentcounter() {
return fragmentcounter;
}
public synchronized void setfragmentcounter(int c) {
this.fragmentcounter = c;
}
public void setactbytesize(int size) {
actbytesize = size;
setbyte();
}
public int getactbytesize() {
return actbytesize;
}
/**
* 数据包头的格式为:流尺寸::段片数::段顺序::段实际尺寸
* 报头字节长度是可变化的,比如,可以加入流的具体信息如:流所属文件的名称,文件类型以及一些其他信息.
* @return string
*/
public string tostring() {
return streamsize + “::” + this.fragmentcounter + “::” + this.getsn() +
“::” + this.getactbytesize();
}
public byte[] tobyte() {
return this.tostring().getbytes();
}
/**
* 是否为尾段
* @param i int
* @return boolean
*/
public boolean iswtailfragment(int i) {
return (i == fragmentcounter – 1) ? true : false;
}
}
/**
* 用户数据区
* @author administrator
*
*/
class packetbody implements serializable{
public static final int body_buffer_size = 63508; //65508
byte[] body;
public packetbody() {
}
public void setbody(byte[] b) {
this.body = b;
}
public byte[] getbody() {
return body;
}
}
这个数据处理类,将在接下来使用。