
using system;

using system.threading;

using system.runtime.interopservices;

namespace iocpthreading



{

[structlayout(layoutkind.sequential, charset=charset.auto)]

public sealed class iocpthreadpool


{

[dllimport(“kernel32”, charset=charset.auto)]

private unsafe static extern uint32 createiocompletionport(uint32 hfile, uint32 hexistingcompletionport, uint32* puicompletionkey, uint32 uinumberofconcurrentthreads);

[dllimport(“kernel32”, charset=charset.auto)]

private unsafe static extern boolean closehandle(uint32 hobject);

[dllimport(“kernel32”, charset=charset.auto)]

private unsafe static extern boolean postqueuedcompletionstatus(uint32 hcompletionport, uint32 uisizeofargument, uint32* puiuserarg, system.threading.nativeoverlapped* poverlapped);

[dllimport(“kernel32”, charset=charset.auto)]

private unsafe static extern boolean getqueuedcompletionstatus(uint32 hcompletionport, uint32* psizeofargument, uint32* puiuserarg, system.threading.nativeoverlapped** ppoverlapped, uint32 uimilliseconds);

private const uint32 invalid_handle_value = 0xffffffff;

private const uint32 inifinite = 0xffffffff;

private const int32 shutdown_iocpthread = 0x7fffffff;

public delegate void user_function(int ivalue);

private uint32 m_hhandle;


private uint32 gethandle

{ get

{ return m_hhandle; } set

{ m_hhandle = value; } }

private int32 m_uimaxconcurrency;


private int32 getmaxconcurrency

{ get

{ return m_uimaxconcurrency; } set

{ m_uimaxconcurrency = value; } }

private int32 m_iminthreadsinpool;


private int32 getminthreadsinpool

{ get

{ return m_iminthreadsinpool; } set

{ m_iminthreadsinpool = value; } }

private int32 m_imaxthreadsinpool;


private int32 getmaxthreadsinpool

{ get

{ return m_imaxthreadsinpool; } set

{ m_imaxthreadsinpool = value; } }

private object m_pcriticalsection;


private object getcriticalsection

{ get

{ return m_pcriticalsection; } set

{ m_pcriticalsection = value; } }

private user_function m_pfnuserfunction;


private user_function getuserfunction

{ get

{ return m_pfnuserfunction; } set

{ m_pfnuserfunction = value; } }

private boolean m_bdisposeflag;


/**//// <summary> simtype: flag to indicate if the class is disposing </summary>


private boolean isdisposed

{ get

{ return m_bdisposeflag; } set

{ m_bdisposeflag = value; } }

private int32 m_icurthreadsinpool;


/**//// <summary> simtype: the current number of threads in the thread pool </summary>


public int32 getcurthreadsinpool

{ get

{ return m_icurthreadsinpool; } set

{ m_icurthreadsinpool = value; } }


/**//// <summary> simtype: increment current number of threads in the thread pool </summary>


private int32 inccurthreadsinpool()

{ return interlocked.increment(ref m_icurthreadsinpool); }


/**//// <summary> simtype: decrement current number of threads in the thread pool </summary>


private int32 deccurthreadsinpool()

{ return interlocked.decrement(ref m_icurthreadsinpool); }

private int32 m_iactthreadsinpool;


/**//// <summary> simtype: the current number of active threads in the thread pool </summary>


public int32 getactthreadsinpool

{ get

{ return m_iactthreadsinpool; } set

{ m_iactthreadsinpool = value; } }


/**//// <summary> simtype: increment current number of active threads in the thread pool </summary>


private int32 incactthreadsinpool()

{ return interlocked.increment(ref m_iactthreadsinpool); }


/**//// <summary> simtype: decrement current number of active threads in the thread pool </summary>


private int32 decactthreadsinpool()

{ return interlocked.decrement(ref m_iactthreadsinpool); }

private int32 m_icurworkinpool;


/**//// <summary> simtype: the current number of work posted in the thread pool </summary>


public int32 getcurworkinpool

{ get

{ return m_icurworkinpool; } set

{ m_icurworkinpool = value; } }


/**//// <summary> simtype: increment current number of work posted in the thread pool </summary>


private int32 inccurworkinpool()

{ return interlocked.increment(ref m_icurworkinpool); }


/**//// <summary> simtype: decrement current number of work posted in the thread pool </summary>


private int32 deccurworkinpool()

{ return interlocked.decrement(ref m_icurworkinpool); }

public iocpthreadpool(int32 imaxconcurrency, int32 iminthreadsinpool, int32 imaxthreadsinpool, user_function pfnuserfunction)


{

try


{

// set initial class state

getmaxconcurrency = imaxconcurrency;

getminthreadsinpool = iminthreadsinpool;

getmaxthreadsinpool = imaxthreadsinpool;

getuserfunction = pfnuserfunction;

// init the thread counters

getcurthreadsinpool = 0;

getactthreadsinpool = 0;

getcurworkinpool = 0;

// initialize the monitor object

getcriticalsection = new object();

// set the disposing flag to false

isdisposed = false;

unsafe


{

// create an io completion port for thread pool use

gethandle = createiocompletionport(invalid_handle_value, 0, null, (uint32) getmaxconcurrency);

}

// test to make sure the io completion port was created

if (gethandle == 0)

throw new exception(“unable to create io completion port”);

// allocate and start the minimum number of threads specified

int32 istartingcount = getcurthreadsinpool;

threadstart tsthread = new threadstart(iocpfunction);

for (int32 ithread = 0; ithread < getminthreadsinpool; ++ithread)


{

// create a thread and start it

thread ththread = new thread(tsthread);

ththread.name = “iocp ” + ththread.gethashcode();

ththread.start();

// increment the thread pool count

inccurthreadsinpool();

}

}

catch


{

throw new exception(“unhandled exception”);

}

}

~iocpthreadpool()


{

if (!isdisposed)

dispose();

}

public void dispose()


{

try


{

// flag that we are disposing this object

isdisposed = true;

// get the current number of threads in the pool

int32 icurthreadsinpool = getcurthreadsinpool;

// shutdown all thread in the pool

for (int32 ithread = 0; ithread < icurthreadsinpool; ++ithread)


{

unsafe


{

bool bret = postqueuedcompletionstatus(gethandle, 4, (uint32*) shutdown_iocpthread, null);

}

}

// wait here until all the threads are gone

while (getcurthreadsinpool != 0) thread.sleep(100);

unsafe


{

// close the iocp handle

closehandle(gethandle);

}

}

catch


{

}

}

private void iocpfunction()


{

uint32 uinumberofbytes;

int32 ivalue;

try


{

while (true)


{

unsafe


{

system.threading.nativeoverlapped* pov;

// wait for an event

getqueuedcompletionstatus(gethandle, &uinumberofbytes, (uint32*) &ivalue, &pov, inifinite);

}

// decrement the number of events in queue

deccurworkinpool();

// was this thread told to shutdown

if (ivalue == shutdown_iocpthread)

break;

// increment the number of active threads

incactthreadsinpool();

try


{

// call the user function

getuserfunction(ivalue);

}

catch(exception ex)


{

throw ex;

}

// get a lock

monitor.enter(getcriticalsection);

try


{

// if we have less than max threads currently in the pool

if (getcurthreadsinpool < getmaxthreadsinpool)


{

// should we add a new thread to the pool

if (getactthreadsinpool == getcurthreadsinpool)


{

if (isdisposed == false)


{

// create a thread and start it

threadstart tsthread = new threadstart(iocpfunction);

thread ththread = new thread(tsthread);

ththread.name = “iocp ” + ththread.gethashcode();

ththread.start();

// increment the thread pool count

inccurthreadsinpool();

}

}

}

}

catch


{

}

// relase the lock

monitor.exit(getcriticalsection);

// increment the number of active threads

decactthreadsinpool();

}

}

catch(exception ex)


{

string str=ex.message;

}

// decrement the thread pool count

deccurthreadsinpool();

}

//public void postevent(int32 ivalue

public void postevent(int ivalue)


{

try


{

// only add work if we are not disposing

if (isdisposed == false)


{

unsafe


{

// post an event into the iocp thread pool

postqueuedcompletionstatus(gethandle, 4, (uint32*) ivalue, null);

}

// increment the number of item of work

inccurworkinpool();

// get a lock

monitor.enter(getcriticalsection);

try


{

// if we have less than max threads currently in the pool

if (getcurthreadsinpool < getmaxthreadsinpool)


{

// should we add a new thread to the pool

if (getactthreadsinpool == getcurthreadsinpool)


{

if (isdisposed == false)


{

// create a thread and start it

threadstart tsthread = new threadstart(iocpfunction);

thread ththread = new thread(tsthread);

ththread.name = “iocp ” + ththread.gethashcode();

ththread.start();

// increment the thread pool count

inccurthreadsinpool();

}

}

}

}

catch


{

}

// release the lock

monitor.exit(getcriticalsection);

}

}

catch (exception e)


{

throw e;

}

catch


{

throw new exception(“unhandled exception”);

}

}

public void postevent()


{

try


{

// only add work if we are not disposing

if (isdisposed == false)


{

unsafe


{

// post an event into the iocp thread pool

postqueuedcompletionstatus(gethandle, 0, null, null);

}

// increment the number of item of work

inccurworkinpool();

// get a lock

monitor.enter(getcriticalsection);

try


{

// if we have less than max threads currently in the pool

if (getcurthreadsinpool < getmaxthreadsinpool)


{

// should we add a new thread to the pool

if (getactthreadsinpool == getcurthreadsinpool)


{

if (isdisposed == false)


{

// create a thread and start it

threadstart tsthread = new threadstart(iocpfunction);

thread ththread = new thread(tsthread);

ththread.name = “iocp ” + ththread.gethashcode();

ththread.start();

// increment the thread pool count

inccurthreadsinpool();

}

}

}

}

catch


{

}

// release the lock

monitor.exit(getcriticalsection);

}

}

catch


{

throw new exception(“unhandled exception”);

}

}

}

}