1 回答

TA貢獻2021條經(jīng)驗 獲得超8個贊
我們先看一個服務小例子,并比較Reactor框架
#include <iostream>
#include "ace/auto_ptr.h"
#include "ace/log_msg.h"
#include "ace/inet_addr.h"
#include "ace/sock_acceptor.h"
#include "ace/reactor.h"
#include "ace/acceptor.h"
#include "ace/Connector.h"
#include "ace/Reactor_Notification_Strategy.h"
#include "ace/Select_Reactor.h"
#include "ace/Singleton.h"
#include "ace/svc_handler.h"
#include "ace/Message_Block.h"
#include "ace/Message_Queue.h"
#include "ace/SOCK_Stream.h"
#include "ace/Null_Mutex.h"
#include "ace/Null_Condition.h"
using namespace std;
//服務客戶
class ClientService:public ACE_Svc_Handler<ACE_SOCK_STREAM,ACE_NULL_SYNCH>
{
public:
int open(void *p);
virtual int handle_input(ACE_HANDLE fd=ACE_INVALID_HANDLE);
virtual int handle_output(ACE_HANDLE fd=ACE_INVALID_HANDLE);
virtual int handle_close(ACE_HANDLE handle,ACE_Reactor_Mask close_mask);
protected:
ACE_SOCK_Stream sock_;
ACE_Message_Queue<ACE_NULL_SYNCH> output_queue_;
};
int ClientService::open(void *p)
{
if(ACE_Svc_Handler::open(p)==-1)
return -1;
ACE_TCHAR peer_name[512];
ACE_INET_Addr peer_addr;
if(this->peer().get_remote_addr(peer_addr)==0&&peer_addr.addr_to_string(peer_name,512)==0)
cout<<" connection from "<<peer_name<<endl;
return 0;
}
int ClientService::handle_input(ACE_HANDLE)
{
const size_t INPUT_SIZE=4096;
char buffer[INPUT_SIZE];
ssize_t recv_cnt,send_cnt;
if((recv_cnt=this->peer().recv(buffer,sizeof(buffer)))<=0)
{
ACE_DEBUG((LM_DEBUG,ACE_TEXT("(%P|%t) connection closed/n")));
return -1;
}
send_cnt=this->peer().send(buffer,ACE_static_cast(size_t,recv_cnt));
if(send_cnt==recv_cnt)
return 0;
if(send_cnt==-1&&ACE_OS::last_error()!=EWOULDBLOCK)
ACE_ERROR_RETURN((LM_ERROR,ACE_TEXT("%P|%t) %p/n"),ACE_TEXT("send")),0);
if(send_cnt==-1)
send_cnt=0;
ACE_Message_Block *mb;
size_t remaining=ACE_static_cast(size_t,(recv_cnt-send_cnt));
ACE_NEW_RETURN(mb,ACE_Message_Block(&buffer[send_cnt],remaining),-1);
int output_off=this->msg_queue()->is_empty();
ACE_Time_Value nowait(ACE_OS::gettimeofday());
if(this->putq(mb,&nowait)==-1)
{
ACE_ERROR((LM_ERROR,ACE_TEXT("(%P|%t)%P;d
不同Reactor框架,此框架無需寫ClientAcceptor類,即無需設置acceptor對象的ACE_Reactor實例,只需typedef
typedef ACE_Acceptor<ClientService,ACE_SOCK_ACCEPTOR> ClientAcceptor;
我們繼續(xù)考察ClientService類
class ClientService:public ACE_Svc_Handler<ACE_SOCK_STREAM,ACE_NULL_SYNCH>
ACE_Svc_Handler允許你指定流類型和加鎖類型,與ACE_Acceptor一樣,ACE_Svc_Handler需要使用流的地址trait,所以我們使用了ACE_SOCK_STREAM宏,使這些代碼在支持或不支持模板trait類型的系統(tǒng)上都能編譯
之所以要加鎖類型,是因為ACE_Svc_Handler是派生自ACE_Task的,后者含有一個ACE_Message_Queue成員,你必須為這個成員提供同步類型。
在此我們不會使用ACE_Task提供的線程能力,但我們要使用繼承而得的ACE_Message_Queue成員,所以我們移除了先前例子的ACE_Message_Queue成員。
另外,我們只使用一個線程,所以用ACE_NULL_SYNCH
注意:ACE_Svc_Handler按照我們通常所需的方式實現(xiàn)了get()_handle()方法,所以此方法也不見了
接著,再看handle_input(ACE_HANDLE)方法,與前一個版本不同的是:
1.我們使用ACE_Svc_Handler::peer()方法是訪問底層的ACE_SOCK_Stream
2.我們通過繼承的ACE_Task::msg_queue()方法訪問繼承的ACE_Message_Queue
3.為了把數(shù)據(jù)塊放入隊列中,我們可以繼承而得的ACE_Task::putq()方法。同樣,我們新的handle_output也類似的,但他利用了繼承而得的方法
最后,默認的handle_close()方法會移除所有的反應器登記信息,取消所有的定時器,并刪除該處理器
ACE_Svc_Handler的使用極大地簡化了我們的服務處理器,使得我們能完全專注于需要的解決的問題,而不需要為所有那些連接管理問題而分心
接著,我們再看ACE_Connector
#include <iostream>
#include "ace/reactor.h"
#include "ace/inet_addr.h"
#include "ace/sock_stream.h"
#include "ace/sock_connector.h"
#include "ace/connector.h"
#include "ace/svc_handler.h"
#include "ace/reactor_notification_strategy.h"
#include <conio.h>
using namespace std;
typedef ACE_Svc_Handler<ACE_SOCK_STREAM,ACE_NULL_SYNCH> super;
class Client:public ACE_Svc_Handler<ACE_SOCK_STREAM,ACE_NULL_SYNCH>
{
public:
Client():notifier_(0,this,ACE_Event_Handler::WRITE_MASK)
{
}
virtual int open(void *p=0);
virtual int handle_input(ACE_HANDLE fd=ACE_INVALID_HANDLE);
virtual int handle_output(ACE_HANDLE fd=ACE_INVALID_HANDLE);
virtual int handle_timeout(const ACE_Time_Value ¤t_time,const void *act=0);
private:
enum{ITERATIONS=5};
int iterations_;
ACE_Reactor_Notification_Strategy notifier_;
};
int Client::open(void *p)
{
ACE_Time_Value iter_delay(2);
if(ACE_Svc_Handler::open(p)==-1)
return -1;
this->notifier_.reactor(this->reactor());
this->msg_queue()->notification_strategy(this->notifier_);
return this->reactor()->schedule_timer(this,0,ACE_Time_Value::zero,iter_delay);
}
int Client::handle_input(ACE_HANDLE)
{
char buf[64];
ssize_t recv_cnt=this->peer().recv(buf,sizeof(buf)-1);
if(recv_cnt>0)
{
}
if(recv_cnt==0||ACE_OS::last_error()!=EWOULDBLOCK)
從程序中我們看到有一個新類,ACE_Reactor_Notification_Strategy是一個策略類,實現(xiàn)了Strategy模式,它允許你定制另一個類的行為,且無需改變受影響的類
在例子中也可以看到構造的時候先初始化了notifier_對象,設置正確的ACE_Reactor指針,目的是通過ACE_Reactor_Notification_Strategy這樣的對象使ACE_Message_Queue策略化。如果ACE_Message_Queue擁有一個策略對象,無論何時有ACE_Message_Block對象進入隊列,ACE_Message_Queue都會調用該策略對象的notify()方法。因為我們已經(jīng)設置了notifier_,他會在Client的反應器上發(fā)出notify()調用,把一個通知放入隊列,通知的目標是我們的client對象的handle_output()方法。要把ACE_Message_Queue的入隊操作結合進反應器的事件循環(huán)
handle_timeout方法:如果我們把預定數(shù)目的串發(fā)給服務器,就是用close_writer()方法關閉我們這一端的TCP/IP socket。
注意,對于我們想要發(fā)往服務器的每一個串,我們都把它插入一個ACE_Message_Block,并把這個塊放入隊列,這將使消息隊列用notifier_對象把通知放入反應器的隊列。當反應器處理該通知時,他會調用我們的handle_output方法,然后我們從隊列中取出數(shù)據(jù),直到隊列變空。
在handle_output方法中,有cancel_wakeup()和schedule_wakeup(),前者是從這個處理器的反應器登記信息中移除指定的掩碼位,而schedule_wakeup()則增加指定的掩碼。也就是說不會造成handle_close()被調用。因此,這一次也無需實現(xiàn)handle_close(),以專門處理被取消的WRITE掩碼,我們復用了ACE_Svc_Handler的默認handle_close()方法,所以無需編寫代碼
添加回答
舉報