欢迎光临
我们一直在努力

Java网络服务器编程(NIO版)-JSP教程,Java技巧及代码

建站超值云服务器,限时71元/月

从java 1.4开始提供的nio api常用于开发高性能网络服务器,本文演示了如何用这个api开发一个tcp echo server。

 

java网络服务器编程 一文演示了如何使用java的socket api编写一个简单的tcp echo server。其阻塞式io的处理方式虽然简单,但每个客户端都需要一个单独的thread来处理,当服务器需要同时处理大量客户端时,这种做法不再可行。使用nio api可以让一个或有限的几个thread同时处理连接到服务器上的所有客户端。(关于nio api的一些介绍,可以在java nio api详解一文中找到。)

 

nio api允许一个线程通过selector对象同时监控多个selectablechannel来处理多路io,nio应用程序一般按下图所示工作:

figure 1

 

如figure 1 所示,client一直在循环地进行select操作,每次select()返回以后,通过selectedkeys()可以得到需要处理的selectablechannel并对其一一处理。

 

这样做虽然简单但也有个问题,当有不同类型的selectablechannel需要做不同的io处理时,在图中client的代码就需要判断channel的类型然后再作相应的操作,这往往意味着一连串的if else。更糟糕的是,每增加一种新的channel,不但需要增加相应的处理代码,还需要对这一串if else进行维护。(在本文的这个例子中,我们有serversocketchannel和socketchannel这两种channel需要分别被处理。)

 

如果考虑将channel及其需要的io处理进行封装,抽象出一个统一的接口,就可以解决这一问题。在listing 1中的niosession就是这个接口。

 

niosession的channel()方法返回其封装的selectablechannel对象,interestops()返回用于这个channel注册的interestops。registered()是当selectablechannel被注册后调用的回调函数,通过这个回调函数,niosession可以得到channel注册后的selectionkey。process()函数则是niosession接口的核心,这个方法抽象了封装的selectablechannel所需的io处理逻辑。

 

listing 1:

public interface niosession {

 

    public selectablechannel channel();

   

    public int interestops();

   

    public void registered(selectionkey key);

   

    public void process();  

}

 

和niosession一起工作的是nioworker这个类(listing 2),它是niosession的调用者,封装了一个selector对象和figure 1中循环select操作的逻辑。理解这个类可以帮助我们了解该如何使用niosession这个接口。

 

nioworker实现了runnable接口,循环select操作的逻辑就在run()方法中。在nioworker – niosession这个框架中,niosession在channel注册的时候会被作为attachment送入register函数,这样,在每次select()操作的循环中,对于selectedkeys()中的每一个selectionkey,我们都可以通过attachment拿到其相对应的niosession然后调用其process()方法。

 

每次select()循环还有一个任务,就是将通过add()方法加入到这个nioworker的niosession注册到selector上。在listing 2的代码中可以看出,niosession中的channel()被取出并注册在selector上,注册所需的interestops从niosession中取出,niosession本身则作为attachment送入register()函数。注册成功后,niosession的registered()回调函数会被调用。

 

nioworker的add()方法的作用是将一个niosession加入到该nioworker中,并wakeup当前的select操作,这样在下一次的select()调用之前,这个niosession会被注册。stop()方法则是让一个正在run()的nioworker停止。closeallchannels()会关闭当前注册的所有channel,这个方法可在nioworker不再使用时用来释放io资源。

 

listing 2:

public class nioworker implements runnable {

   

    public nioworker(selector sel) {

       _sel = sel;

       _added = new hashset();

    }

   

    public void run() {

       try {

           try {

             

              while (_run) {

                  _sel.select();

                  set selected = _sel.selectedkeys();

                  for (iterator itr = selected.iterator(); itr.hasnext();) {

                     selectionkey key = (selectionkey) itr.next();

                     niosession s = (niosession) key.attachment();

                     s.process();

                     itr.remove();

                  }

                 

                  synchronized (_added) {

                     for (iterator itr = _added.iterator(); itr.hasnext();) {

                         niosession s = (niosession) itr.next();

                         selectionkey key = s.channel().register(_sel, s.interestops(), s);

                         s.registered(key);

                         itr.remove();

                     }

                  }

              }

             

           } finally {

              _sel.close();

           }

       } catch (ioexception ex) {

           throw new error(ex);

       }

    }

   

    public void add(niosession s) {

       synchronized (_added) {

           _added.add(s);

       }

       _sel.wakeup();

    }

   

    public synchronized void stop() {

       _run = false;

       _sel.wakeup();

    }

   

    public void closeallchannels() {

       for (iterator itr = _sel.keys().iterator(); itr.hasnext();) {

           selectionkey key = (selectionkey) itr.next();

           try {        

              key.channel().close();

           } catch (ioexception ex) {}

       }

    }

   

    protected selector _sel = null;

    protected collection _added = null;

    protected volatile boolean _run = true;

}

 

在echo server这个例子中,我们需要一个serversocketchannel来接受新的tcp连接,对于每个tcp连接,我们还需要一个socketchannel来处理这个tcp连接上的io操作。把这两种channel和上面的nioworker – niosession结构整合在一起,可以得到nioserversession和nioechosession这两个类,它们分别封装了serversocketchannel和socketchannel及其对应的io操作。下面这个uml类图描述了这4个类的关系:

figure 2

 

可以看到nioworker和niosession对新加入的两个类没有任何依赖性,nioserversession和nioechosession通过实现niosession这个接口为系统加入了新的功能。这样的一个体系架构符合了open-close原则,新的功能可以通过实现niosession被加入而无需对原有的模块进行修改,这体现了面向对象设计的强大威力。

 

nioserversession的实现(listing 3)相对比较简单,其封装了一个serversocketchannel以及从这个channel上接受新的tcp连接的逻辑。nioserversession还需要一个nioworker的引用,这样每接受一个新的tcp连接,nioserversession就为其创建一个nioechosession的对象,并将这个对象加入到nioworker中。

 

listing 3:

public class nioserversession implements niosession {

   

    public nioserversession(serversocketchannel channel, nioworker worker) {

       _channel = channel;

       _worker = worker;

    }

   

    public void registered(selectionkey key) {}

   

    public void process() {

       try {

           socketchannel c = _channel.accept();

           if (c != null) {

              c.configureblocking(false);

              nioechosession s = new nioechosession(c);

              _worker.add(s);

           }

       } catch (ioexception ex) {

           throw new error(ex);

       }

    }

   

    public selectablechannel channel() {

       return _channel;

    }

   

    public int interestops(){

       return selectionkey.op_accept;

    }

   

    protected serversocketchannel _channel;

    protected nioworker _worker;

}

 

nioechosession的行为要复杂一些,nioechosession会先从tcp连接中读取数据,再将这些数据用同一个连接写回去,并重复这个步骤直到客户端把连接关闭为止。我们可以把“reading”和“writing”看作nioechosession的两个状态,这样可以用一个有限状态机来描述它的行为,如下图所示:

figure 3

 

接下来的工作就是如何实现这个有限状态机了。在这个例子中,我们使用state模式来实现它。下面这张uml类图描述了nioechosession的设计细节。

figure 4

 

nioechosession所处的状态由echostate这个抽象类来表现,其两个子类分别对应了“reading”和“writing”这两个状态。nioechosession会将process()和interestops()这两个方法delegate给echostate来处理,这样,当nioechosession处于不同的状态时,就会有不同的行为。

 

listing 4是echostate的实现。echostate定义了process()和interestops()这两个抽象的方法来让子类实现。nioechosession中的process()方法会被delegate到其当前echostate的process()方法,nioechosession本身也会作为一个描述context的参数被送入echostate的process()方法中。echostate定义的interestops()方法则会在nioechosession注册和转变state的时候被用到。

 

echostate还定义了两个静态的方法来返回预先创建好的readstate和writestate,这样做的好处是可以避免在nioechosession转换state的时候创建一些不必要的对象从而影响性能。然而,这样做要求state类必须是无状态的,状态需要保存在context类,也就是nioechosession中。

 

listing 4:

public abstract class echostate {

   

    public abstract void process(nioechosession s) throws ioexception;

   

    public abstract int interestops();

   

    public static echostate readstate() {

       return _read;

    }

   

    public static echostate writestate() {

       return _write;

    }

   

    protected static echostate _read = new readstate();

    protected static echostate _write = new writestate();

}

 

listing 5是nioechosession的实现。nioechosession包含有一个socketchannel,这个channel注册后得到的selectionkey,一个用于存放数据的bytebuffer和一个记录当前state的echostate对象。在初始化时,echostate被初始化为一个readstate。nioechosession把process()方法和interestops()方法都delegate到当前的echostate中。其setstate()方法用于切换当前state,在切换state后,nioechosession会通过selectionkey更新注册的interestops。close()方法用于关闭这个nioechosession对象。

 

listing 5:

public class nioechosession implements niosession {

   

    public nioechosession(socketchannel c) {

       _channel = c;

       _buf = bytebuffer.allocate(128);

       _state = echostate.readstate();

    }

   

    public void registered(selectionkey key) {

       _key = key;

    }

   

    public void process() {

       try {

           _state.process(this);

       } catch (ioexception ex) {

           close();

           throw new error(ex);

       }

    }

   

    public selectablechannel channel() {

       return _channel;

    }

   

    public int interestops() {

       return _state.interestops();

    }

   

    public void setstate(echostate state) {

       _state = state;

       _key.interestops(interestops());

    }

   

    public void close() {

       try {

           _channel.close();

       } catch (ioexception ex) {

           throw new error(ex);

       }

    }

   

    protected socketchannel _channel = null;

    protected selectionkey _key;

    protected bytebuffer _buf = null;

    protected echostate _state = null;

}

 

listing 6和listing 7分别是readstate和writestate的实现。readstate在process()中会先从nioechosession的channel中读取数据,如果未能读到数据,nioechosession会继续留在readstate;如果读取出错,nioechosession会被关闭;如果读取成功,nioechosession会被切换到writestate。writestate则负责将nioechosession中已经读取的数据写回到channel中,全部写完后,nioechosession会被切换回readstate。

 

listing 6:

public class readstate extends echostate {

   

    public void process(nioechosession s)

       throws ioexception

    {

       socketchannel channel = s._channel;

       bytebuffer buf = s._buf;

       int count = channel.read(buf);

 

       if (count == 0) {

           return;

       }

 

       if (count == -1) {

           s.close();

           return;

       }

 

       buf.flip();

       s.setstate(echostate.writestate());

    }

   

    public int interestops() {

       return selectionkey.op_read;

    }

}

 

listing 7:

public class writestate extends echostate {

   

    public void process(nioechosession s)

       throws ioexception

    {

       socketchannel channel = s._channel;

       bytebuffer buf = s._buf;

       channel.write(buf);

       if (buf.remaining() == 0) {

           buf.clear();

           s.setstate(echostate.readstate());

       }

    }

   

    public int interestops() {

       return selectionkey.op_write;

    }

}

 

nioechoserver(listing 8)被用来启动和关闭一个tcp echo server,这个类实现了runnable接口,调用其run()方法就启动了echo server。其shutdown()方法被用来关闭这个echo server,注意shutdown()和run()的finally block中的同步代码确保了只有当echo server被关闭后,shutdown()方法才会返回。

 

listing 8:

public class nioechoserver implements runnable {

   

    public void run() {

       try {

           serversocketchannel serv = serversocketchannel.open();

           try {

              serv.socket().bind(new inetsocketaddress(7));

              serv.configureblocking(false);

              _worker = new nioworker(selector.open());

              nioserversession s = new nioserversession(serv, _worker);

              _worker.add(s);

              _worker.run();

           } finally {

              _worker.closeallchannels();

              synchronized (this) {

                  notify();

              }

           }

       } catch (ioexception ex) {

           throw new error(ex);

       }

    }

   

    public synchronized void shutdown() {

       _worker.stop();

       try {

           wait();

       } catch (interruptedexception ex) {

           throw new error(ex);

       }

    }

   

    protected nioworker _worker = null;

}

 

最后,通过一个简单的main()函数(listing 9),我们就可以运行这个echo server了。

 

listing 9:

    public static void main(string [] args) {

       new nioechoserver().run();

    }

 

我们可以通过telnet程序来检验这个程序的运行状况:

1. 打开一个命令行,输入 telnet localhost 7 来运行一个telnet程序并连接到echo server上。

2. 在telnet程序中输入字符,可以看到输入的字符被显示在屏幕上。(这是因为echo server将收到的字符写回到客户端)

3. 多打开几个telnet程序进行测试,可以看到echo server能通过nio api用一个thread服务多个客户端。

作者:daijialin                       

mailto:woodydai@gmail.com

http://blog.csdn.net/daijialin

赞(0)
版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com 特别注意:本站所有转载文章言论不代表本站观点! 本站所提供的图片等素材,版权归原作者所有,如需使用,请与原作者联系。未经允许不得转载:IDC资讯中心 » Java网络服务器编程(NIO版)-JSP教程,Java技巧及代码
分享到: 更多 (0)

相关推荐

  • 暂无文章