熟悉ACE的朋友进来看一看?
我写了一端程序为什么接收完数据后显示 ERROR :ACCESS VIOLATION 的错误?
//acceptor.h
#ifndef __ACCEPTOR__H
#define __ACCEPTOR__H
#include "header.h"
#include "BufferManager.h"
#include "ReadData.h"
class Acceptor : public ACE_Event_Handler
{
public:
Acceptor( BufferManager * );
virtual ~Acceptor();
void open( ACE_INET_Addr & );
virtual int handle_input( ACE_HANDLE );
virtual int handle_close( ACE_HANDLE, ACE_Reactor_Mask );
virtual ACE_HANDLE get_handle() const;
ACE_SOCK_Acceptor & GetAcceptor();
private:
ACE_SOCK_Acceptor _acceptor;
BufferManager * _pBM;
};
Acceptor::Acceptor( BufferManager * pBM )
{
_pBM = pBM;
}
Acceptor::~Acceptor()
{
//
}
void Acceptor::open( ACE_INET_Addr & addr )
{
_acceptor.open( addr );
reactor()->register_handler( this, ACE_Event_Handler::ACCEPT_MASK );
}
ACE_HANDLE Acceptor::get_handle() const
{
return _acceptor.get_handle();
}
int Acceptor::handle_input( ACE_HANDLE handle )
{
ReadData * peer = new ReadData( _pBM );
_acceptor.accept( peer->GetStream() );
ACE_Reactor::instance()->register_handler( peer,
ACE_Event_Handler::READ_MASK );
return 0;
}
ACE_SOCK_Acceptor & Acceptor::GetAcceptor()
{
return _acceptor;
}
int Acceptor::handle_close( ACE_HANDLE = ACE_INVALID_HANDLE,
ACE_Reactor_Mask = 0 )
{
_acceptor.close ();
//delete this;
return 0;
}
#endif
//buffer.h
#ifndef __BUFFER_MANAGER__H
#define __BUFFER_MANAGER__H
#include "header.h"
class Full{};
class Empty{};
class BufferManager
{
public:
BufferManager( unsigned int );
virtual ~BufferManager();
void Push( CellBuf * );
void Pop( CellBuf * );
void Down( ACE_Thread_Mutex & mutex );
void Down( ACE_Semaphore & sem );
void Up( ACE_Thread_Mutex & mutex );
void Up( ACE_Semaphore & sem );
ACE_Semaphore & GetFull();
ACE_Semaphore & GetEmpty();
ACE_Thread_Mutex & GetMutex();
bool IsEmpty() const;
bool IsFull() const;
private:
CellBuf *_pCB;
int _iFront;
int _iRear;
unsigned int _unCellBufCount;
ACE_Thread_Mutex _mutex;
ACE_Semaphore _full;
ACE_Semaphore _empty;
};
BufferManager::BufferManager( unsigned int unCellBufCount )
: _full( 0 ), _empty( unCellBufCount )
{
_unCellBufCount = unCellBufCount;
_pCB = new CellBuf[ _unCellBufCount ];
_iFront = _iRear = 0;
}
BufferManager::~BufferManager()
{
delete[] _pCB;
}
bool BufferManager::IsEmpty() const
{
return ( _iFront == _iRear ) ? true : false;
}
bool BufferManager::IsFull() const
{
return ( ( _iRear + 1 ) % _unCellBufCount == _iFront ) ? true : false;
}
void BufferManager::Push( CellBuf * pBuf )
{
if( IsFull() )
throw Full();
_iRear = ( _iRear + 1 ) % _unCellBufCount;
memcpy( _pCB[ _iRear ].buf, pBuf->buf, pBuf->length );
_pCB[ _iRear ].length = pBuf->length;
}
void BufferManager::Pop( CellBuf * pBuf )
{
if( IsEmpty() )
throw Empty();
_iFront = ( _iFront + 1 ) % _unCellBufCount;
memcpy( pBuf->buf, _pCB[ _iFront ].buf, _pCB[ _iFront ].length );
pBuf->length = _pCB[ _iFront ].length;
}
void BufferManager::Down( ACE_Thread_Mutex & mutex )
{
mutex.acquire();
}
void BufferManager::Down( ACE_Semaphore & sem )
{
sem.acquire();
}
void BufferManager::Up( ACE_Thread_Mutex & mutex )
{
mutex.release();
}
void BufferManager::Up( ACE_Semaphore & sem )
{
sem.release();
}
ACE_Semaphore & BufferManager::GetFull()
{
return _full;
}
ACE_Semaphore & BufferManager::GetEmpty()
{
return _empty;
}
ACE_Thread_Mutex & BufferManager::GetMutex()
{
return _mutex;
}
#endif
//header.h
#ifndef __HEADER__H
#define __HEADER__H
//引入ACE库文件
#include "ace/Reactor.h"
#include "ace/Svc_Handler.h"
#include "ace/Acceptor.h"
#include "ace/Synch.h"
#include "ace/SOCK_Acceptor.h"
#include "ace/Thread.h"
//引入STL库文件
#include <iostream>
#include <string>
using namespace std;
//全局数据结构
const unsigned short int CELL_BUFFER_SIZE = 4096;
typedef struct
{
char buf[ CELL_BUFFER_SIZE ];
unsigned short int length;
}CellBuf;
#endif
//readdata.h
#ifndef __READ__DATA__H
#define __READ__DATA__H
#include "header.h"
#include "BufferManager.h"
//从客户机接收数据
class ReadData : public ACE_Event_Handler
{
public:
ReadData( BufferManager * );
virtual ~ReadData();
virtual int handle_input( ACE_HANDLE );
virtual ACE_HANDLE get_handle() const;
virtual int handle_close( ACE_HANDLE, ACE_Reactor_Mask );
ACE_SOCK_Stream & GetStream();
private:
ACE_SOCK_Stream _peer;
BufferManager * _pBM;
CellBuf * _pCBM;
};
ReadData::ReadData( BufferManager * pBM )
{
_pBM = pBM;
_pCBM = new CellBuf;
}
ReadData::~ReadData()
{
delete _pCBM;
}
ACE_HANDLE ReadData::get_handle() const
{
return _peer.get_handle();
}
ACE_SOCK_Stream & ReadData::GetStream()
{
return _peer;
}
int ReadData::handle_input( ACE_HANDLE handle )
{
int length;
//使用生产者算法进行缓冲区同步互斥访问
_pBM->Down( _pBM->GetEmpty() );
_pBM->Down( _pBM->GetMutex() );
length = _peer.recv( _pCBM->buf, CELL_BUFFER_SIZE );
if( length != 0 )
{
_pCBM->length = length;
_pBM->Push( _pCBM );
/**********************************************************************/
short int shortIntSize = sizeof( short int );
short int x, y;
for( int i = 0; i < length; i += ( 2 * shortIntSize ) )
{
::memcpy( &x, &_pCBM->buf[ i ], shortIntSize );
::memcpy( &y, &_pCBM->buf[ i + shortIntSize ], shortIntSize );
cout << x << " " << y << " ";
}
/**************************************************************************/
}
_pBM->Up( _pBM->GetMutex() );
_pBM->Up( _pBM->GetFull() );
return 0;
}
int ReadData::handle_close( ACE_HANDLE handle, ACE_Reactor_Mask mask )
{
delete this;
return 0;
}
#endif
//main.cpp
#include "ace/TP_Reactor.h"
#include "ace/Thread_Manager.h"
#include "header.h"
#include "Acceptor.h"
#include "ReadData.h"
#include "WriteData.h"
#include "WriteThread.h"
static ACE_THR_FUNC_RETURN event_loop( void *arg )
{
ACE_Reactor *reactor = ACE_static_cast( ACE_Reactor *, arg );
reactor->owner( ACE_OS::thr_self() );
reactor->run_reactor_event_loop();
return 0;
}
int main( int argc, char *argv[] )
{
const unsigned int N_THREADS = 4;
BufferManager *pBM = new BufferManager( 1024 );
ACE_TP_Reactor tpReactor;
ACE_INET_Addr addr( 2000 );
ACE_Reactor reactor( &tpReactor );
Acceptor *pAC = new Acceptor( pBM );
pAC->reactor( ACE_Reactor::instance() );
pAC->open( addr );
/*string strHost = "myhost";
WriteThread wt( strHost, 2001, pBM );
wt.open( NULL );
*/
ACE_Thread_Manager::instance()->spawn_n( N_THREADS, event_loop,
ACE_Reactor::instance() );
ACE_Thread_Manager::instance()->wait();
//wt.wait();
delete pAC;
return 0;
}

