欢迎光临
我们一直在努力

Java数据报之失序和丢包-JSP教程,Java技巧及代码

建站超值云服务器,限时71元/月

 

习惯了tcp编程,认为udp可以包办这些问题是错误的。一个udp应用程序要承担可靠性方面的全部工作,包括报文的丢失、重复、时延、乱序以及连接失效等问题。

通常我们在可靠性好,传输时延小的局域网上开发测试,一些问题不容易暴露,但在大型互联网上却会出现错误。

udp协议把递送的可靠性责任推到了上层即应用层,下面简单编写了几个类来专门处理两个问题:乱序和丢包。

四个类:datapacket 类,packetheader类,packetbody类 ,dataentry类,位于同一个文件datapacket .java中。

datapacket 类相当于一个门面模式,提供给外部使用,通信数据也在这个类中处理。

package com.skysoft.pcks;

import java.io.*;
import java.net.*;
import java.util.*;

public class  datapacket {
  inputstream is;
  outputstream os;
  packetheader header;
  packetbody body;
  arraylist al;
  public static final int dataswapsize = 64532;

  /**
   * 在接收数据报使用
   */
  public datapacket() {
    header = new packetheader();
    body = new packetbody();
    al = new arraylist();
  }
  /**
   * 在发送数据报时使用,它调用报文分割操作.
   * @param file string  硬盘文件
   */
  public datapacket(string file) {
    this();
    try {
      is = new fileinputstream(file);
      header.calcheaderinfo(is.available());
      this.madebody();
      is.close();
      //this.gereratedata();
    }
    catch (filenotfoundexception ex) {
      ex.printstacktrace();
    }
    catch (ioexception ex1) {
      ex1.printstacktrace();
    }
  }
  /**
   * 在发送数据报时使用,它调用报文分割操作.
   * @param url url url地址
   */
  public datapacket(url url) {
    this();
    try {
      //is = url.openstream();
      urlconnection conn=url.openconnection();
      is=conn.getinputstream();
      int total=conn.getcontentlength();
      header.calcheaderinfo(total);
      this.madebody();
      //system.out.println(total+”:”+total);
      is.close();
    }
    catch (ioexception ex) {
      ex.printstacktrace();
    }
  }
  /**
   * 为发送构造分组,使用packageheader处理了报头格式,并为分组编序号.
   */
  private void madebody() {
    al.clear();
    byte[] buffer;
    dataentry de;
    for (int i = 0; i < header.fragmentcounter; i++) {
      try {
        bytearrayoutputstream bos = new bytearrayoutputstream();
        //is.skip(i * body.body_buffer_size);
        header.arragesort(i);
        de = new dataentry(packetbody.body_buffer_size);
        de.setsn(i);
        de.setstreamsize(header.getstreamsize());
        de.setfragmentcounter(header.getfragmentcounter());
        if (header.iswtailfragment(i)) {
          buffer = new byte[header.getminfragment()];
          is.read(buffer, 0, buffer.length);
          header.setactbytesize(header.getminfragment());
          de.setactbytesize(header.getminfragment());
        }
        else {
          buffer = new byte[body.body_buffer_size];
          is.read(buffer, 0, buffer.length);
        }
        //system.out.println(“length——-“+i+” “+body.getbody().length+” “+header.getminfragment());
        body.setbody(buffer);
        //system.out.println(“length:” + i + ” ” + header.tostring());
        bos.write(header.getbyte(), 0, header.header_buffer_size);
        bos.write(body.getbody(), 0, body.getbody().length);
        de.setbytes(bos.tobytearray());
        al.add(de);
      }
      catch (ioexception ex) {
        ex.printstacktrace();
      }
    }
  }
  /**
   * 为发送构造分组,没有考虑报头格式,也没有为分组编序号.
   */
  private void madebody1() {
    al.clear();
    for (int i = 0; i < header.fragmentcounter; i++) {
      try {
        if (header.iswtailfragment(i))
          is.read(body.getbody(), i * body.body_buffer_size,
                  header.getminfragment());
        else
          is.read(body.getbody(), i * body.body_buffer_size,
                  body.body_buffer_size);
        bytearrayoutputstream bos = new bytearrayoutputstream();
        bos.write(header.getbyte(), 0, header.header_buffer_size);
        bos.write(body.getbody(), header.header_buffer_size,
                  body.getbody().length);
        al.add(bos);
      }
      catch (ioexception ex) {
        ex.printstacktrace();
      }
    }
  }
  /**
   * 在接收到报文后,对此报文执行组装,并处理报文丢失和乱序情况.
   * @param b1 byte[]
   */
  public void add(byte[] b1) {
    byte[] buffer = (byte[]) b1.clone();
    handlertext(buffer);
    dataentry de = new dataentry(buffer, header.getactbytesize());
    de.setsn(header.getsn());
    de.setstreamsize(header.getstreamsize());
    de.setfragmentcounter(header.getfragmentcounter());
    al.add(de);
  }
  private void handlertext(byte[] buffer) {
    bytearrayoutputstream baos = new bytearrayoutputstream();
    baos.write(buffer, 0, header.header_buffer_size);
    byte[] b=new byte[header.header_buffer_size];
    system.arraycopy(buffer,0,b,0,b.length);
    bytearrayinputstream bais = new bytearrayinputstream(baos.tobytearray());
    inputstreamreader isr = new inputstreamreader(bais);
    bufferedreader br = new bufferedreader(isr);
    try {
      header = new packetheader(br.readline());
    }
    catch (exception ex) {
      ex.printstacktrace();
    }
  }
 
  private string calfilesize(int size) {
    return size / 1024 + “k”;
  }

  public arraylist getdatapackets() {
    return al;
  }
/**
 * 是否接收完毕,通过序号是否等于最大段数来判断,这也许有问题,比如,正好是最后一个段丢失了,这样
 * 这个包整个就丢失了.
 * @return
 */
  public boolean isfull() {
    return this.header.getsn() == this.header.getfragmentcounter() – 1 ? true : false;
  }
/**
 * 判断是否只有一个段.
 * @return
 */
  public boolean iszero() {
    return this.header.getsn() == 0 ? true : false;
  }
/**
 * 该函数执行报文组装,不考虑丢失的报文.
 * @return
 */
  private bytearrayoutputstream fetchdatapackets() {
    bytearrayoutputstream bos = new bytearrayoutputstream();
    byte[] buffer = null;
    dataentry de;
    for (int i = 0; i < al.size(); i++) {
      try {
        de = this.getsndata(i);
        buffer = de.getbyte();
        if (header.getstreamsize() == de.getstreamsize()) {
          bos.write(de.getbyte(), header.header_buffer_size, de.getactbytesize());
          system.out.println(de.tostring() + ” — fetchdatapackets”);
        }
      }
      catch (exception ex) {
        ex.printstacktrace();
      }
    }
    return bos;
  }

  /**
   * 该函数执行报文组装,对于丢失的报文,写入空报文.
   * @return bytearrayoutputstream
   */
  private bytearrayoutputstream fetchdatapackets_sn() {
    bytearrayoutputstream bos = new bytearrayoutputstream();
    byte[] buffer;
    dataentry de;
    for (int i = 0; i < header.getfragmentcounter(); i++) {
      try {
        de = this.getsndata(i);
        if (de == null) {
          de = seachdedata(i);
        }
        buffer = de.getbyte();
        //system.out.println(de.getsn() + “:” + i);
        //handlertext(buffer);
        //bos.write(buffer, header.header_buffer_size,
        //          buffer.length – header.header_buffer_size);
        if (header.getstreamsize() == de.getstreamsize()) {
          bos.write(de.getbyte(), header.header_buffer_size,
                    de.getactbytesize());
          //system.out.println(de.tostring());
        }
      }
      catch (exception ex) {
        ex.printstacktrace();
      }
    }
    return bos;
  }

  /**
   * 对缓冲的数据包进行排序处理,即按顺序提取同一帧的数据,如果没有找到该序号的帧,则返回空值.
   * @param sn int 要找的帧序号.
   * @return dataentry
   */
  private dataentry getsndata(int sn) {
    dataentry de = null;
    for (int i = 0; i < al.size(); i++) {
      de = (dataentry) al.get(i);
      if (header.getstreamsize() == de.getstreamsize()) {
        if (sn == de.getsn())
          break;
        else
          de = null;
      }
    }
    return de;
  }

  /**
   * 按序号开始向前或者是向后寻找最近的帧片段,日后可以增加请求重发功能,通过开一个通信连接.
   * @param sn int
   * @return dataentry
   */
  private dataentry seachdedata(int sn) {
    dataentry de = null;
    int initvalue, minvalue = 10000;
    dataentry back, fore = null;
    for (int i = 0; i < al.size(); i++) {
      de = (dataentry) al.get(i);
      if (header.getstreamsize() == de.getstreamsize()) {
        initvalue = math.abs(de.getsn() – sn);
        if (de.getfragmentcounter() != de.getsn() && initvalue < minvalue) {
          minvalue = initvalue;
          fore = de;
        }
      }
    }
    return fore;
  }

  /**
   * 除去最后一帧外,随机抽取一帧.
   * @return dataentry
   */
  private dataentry seachdedata() {
    dataentry de = null;
    for (int i = 0; i < al.size(); i++) {
      de = (dataentry) al.get(i);
      system.out.println(“sky ::::” + de.getfragmentcounter() + “:” + de.getsn() +
                         “:” + i);
      if (header.getstreamsize() == de.getstreamsize()) {
        if (de.getfragmentcounter() != de.getsn()) {
          break;
        }
      }
    }
    return de;
  }
  /**
   * 生成组装完的结果数据.因为用图像来做测试,所以令其返回图像.
   * @return image
   */
  public java.awt.image gereratedata() {
     bytearrayinputstream bis;
     java.awt.image.bufferedimage bimage = null;
     try {
       byte[] b = fetchdatapackets_sn().tobytearray();
       //fetchdatapackets_old1()
       bis = new bytearrayinputstream(b);
       bimage = javax.imageio.imageio.read(bis);

     }
     catch (exception ex1) {
       ex1.printstacktrace();
     }
     return bimage;
  }

  public static void main(string args[]) {
    datapacket dp = new datapacket(“e:\\nature\\14.jpg”);
  }
}
/**
 * 数据实体,充当临时处理场所.
 * @author administrator
 *
 */
class dataentry {
  byte[] bytes;
  int fragmentcounter, sn, actbytesize;
  long streamsize;
  int minfragment;

  public dataentry() {

  }

  public dataentry(int size) {
    this.actbytesize = size;
  }

  public dataentry(byte[] b, int i) {
    this.bytes = b;
    this.actbytesize = i;
  }

  public byte[] getbyte() {
    return this.bytes;
  }

  public void setbytes(byte[] b) {
    this.bytes = b;
  }

  public void setstreamsize(long size) {
    this.streamsize = size;
  }

  public long getstreamsize() {
    return this.streamsize;
  }

  public int getminfragment() {
    return minfragment;
  }

  public synchronized void setsn(int i) {
    this.sn = i;
  }

  public synchronized int getsn() {
    return sn;
  }

  public synchronized int getfragmentcounter() {
    return fragmentcounter;
  }

  public synchronized void setfragmentcounter(int c) {
    this.fragmentcounter = c;
  }

  public void setactbytesize(int size) {
    actbytesize = size;
  }

  public int getactbytesize() {
    return actbytesize;
  }

  public string tostring() {
    return this.streamsize + “::” + this.fragmentcounter + “::” + this.sn +
        “::” + this.actbytesize + ” recv dataentry”;
  }
}
/**
 * 报头,处理报头格式
 * @author administrator
 *
 */
class packetheader implements serializable{
  public static final int header_buffer_size = 1024;
  int fragmentcounter, sn;
  int actbytesize = packetbody.body_buffer_size;
  byte[] header; //= new byte[header_buffer_size];
  long streamsize;
  int minfragment;

  public packetheader() {

  }

  public packetheader(long l) {
    this.setstreamsize(l);

  }

  public packetheader(string s) {
    string[] tm = s.split(“::”);
    this.setactbytesize(integer.parseint(tm[3]));
    this.setsn(integer.parseint(tm[2]));
    this.setfragmentcounter(integer.parseint(tm[1]));
    this.setstreamsize(long.parselong(tm[0]));
  }

  /**
   * 根据文件的段的顺序生成数据头.
   * @param sn 文件序列
   */
  public void arragesort(int sn) {
    this.setsn(sn);
    this.setbyte();
  }

  public void calcheaderinfo(long l) {
    this.setstreamsize(l);
    calcheaderinfo();
  }
  /**
   * 计算流要被分成的片段数量,并得出最小片段余量.
   */
  public void calcheaderinfo() {
    fragmentcounter = math.round( (float) streamsize /
                                 packetbody.body_buffer_size);
    float critical = (float) streamsize / packetbody.body_buffer_size;
    if (critical – fragmentcounter < 0.5 && critical – fragmentcounter > 0)
      fragmentcounter++;
    minfragment = (int) (streamsize % packetbody.body_buffer_size);
  }

  public byte[] getheader() {
    long it = new long(this.streamsize);
    return new byte[] {it.bytevalue()};
  }

  public byte[] getbyte() {
    return header; //this.tostring().getbytes();
  }
  /**
   * 生成报头字节,首先取得数据包头 流尺寸::段片数::段顺序::段实际尺寸 的字节形式,
   * 然后加入回车换行符号,对于1024字节中剩余的部分一律写入元素为0的字节数组.
   */
  public void setbyte() {
    bytearrayoutputstream bos = new bytearrayoutputstream();
    byte[] buffer = this.tobyte();
    try {
      bos.write(buffer);
      bos.write(“\r\n”.getbytes());
      bos.write(new byte[packetheader.header_buffer_size – buffer.length], 0,
                packetheader.header_buffer_size – buffer.length);
      header = bos.tobytearray();
    }
    catch (ioexception ex) {
      ex.printstacktrace();
    }
  }

  public void setstreamsize(long size) {
    this.streamsize = size;
  }

  public long getstreamsize() {
    return this.streamsize;
  }

  public int getminfragment() {
    return minfragment;
  }

  public synchronized void setsn(int i) {
    this.sn = i;
  }

  public int getsn() {
    return sn;
  }

  public int getfragmentcounter() {
    return fragmentcounter;
  }

  public synchronized void setfragmentcounter(int c) {
    this.fragmentcounter = c;
  }

  public void setactbytesize(int size) {
    actbytesize = size;
    setbyte();
  }

  public int getactbytesize() {
    return actbytesize;
  }
  /**
   * 数据包头的格式为:流尺寸::段片数::段顺序::段实际尺寸
   * 报头字节长度是可变化的,比如,可以加入流的具体信息如:流所属文件的名称,文件类型以及一些其他信息.
   * @return string
   */
  public string tostring() {
    return streamsize + “::” + this.fragmentcounter + “::” + this.getsn() +
        “::” + this.getactbytesize();
  }

  public byte[] tobyte() {
    return this.tostring().getbytes();
  }
  /**
   * 是否为尾段
   * @param i int
   * @return boolean
   */
  public boolean iswtailfragment(int i) {
    return (i == fragmentcounter – 1) ? true : false;
  }

}
/**
 * 用户数据区
 * @author administrator
 *
 */
class packetbody implements serializable{
  public static final int body_buffer_size = 63508; //65508
  byte[] body;

  public packetbody() {
  }

  public void setbody(byte[] b) {
    this.body = b;
  }

  public byte[] getbody() {
    return body;
  }
}

这个数据处理类,将在接下来使用。

赞(0)
版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com 特别注意:本站所有转载文章言论不代表本站观点! 本站所提供的图片等素材,版权归原作者所有,如需使用,请与原作者联系。未经允许不得转载:IDC资讯中心 » Java数据报之失序和丢包-JSP教程,Java技巧及代码
分享到: 更多 (0)