线程启动后,会执行run函数
void run()
{
ProcessMessages(kForever);
}
bool ProcessMessages(int cms)
{
........
while(true)
{
Message msg;
if (!Get(&msg,cmsNext))
return !IsQuiting();
Dispatch(&msg); //分发消息给相应的任务处理器处理
}
........
}
当线程获取一个消息(任务)后,会将任务分发给相应的处理器处理,Dispatch就是将消息分发给任务处理器的。如果没有消息可以处理,线程将在Get函数处休眠
Get的实现流程:
bool Get(Message *pmsg, int cmsWait, bool process_io)
{
while(true)
{
....
while(!msgq_.empty())
{
*pmsg = msgq_.front();
msgq_.pop_front();
return true;
}
.....
// Wait and multiplex in the meantime
if (!ss_->Wait(cmsNext, process_io))
return false;
}
return false;
}
如果消息队列里有消息,就直接取出消息,并且函数返回true;如果消息队列中没有,则使用多路复用技术等待一个消息,
ss_->Wait()会使线程睡眠,直到一个消息事件被触发。
我们重点说说Wait函数的实现:
bool Wait(int cmsWait, bool process_io)
{
......
fWait = true;
while(fWait)
{
std::vector<WSAEVENT> events;
std::vector<Dispatcher *> event_owners;
events.push_back(socket_ev_); //
....
size_t i = 0;
while(i < dispatchers_.size()) //可以看作是线程负责的事件对象集,有个默认 线程退出事件对象 会在ss_对象构造的时候进行创建,调用WakeUp()后会被触发
{
Dispatcher* disp = dispatchers_[i++];
if (!process_io && (disp != signal_wakeup_))
continue;
SOCKET s = disp->GetSocket();
if (disp->CheckSignalClose()) {
// We just signalled close, don't poll this socket
} else if (s != INVALID_SOCKET) {
WSAEventSelect(s,
events[0],
FlagsToEvents(disp->GetRequestedEvents()));
} else {
events.push_back(disp->GetWSAEvent());
event_owners.push_back(disp);
}
.......
}
......
DWORD dw = WSAWaitForMultipleEvents(static_cast<DWORD>(events.size()),
&events[0],
false,
cmsNext,
false);
if (dw == WSA_WAIT_FAILED) {
// Failed?
// TODO: need a better strategy than this!
int error = WSAGetLastError();
ASSERT(false);
return false;
} else if (dw == WSA_WAIT_TIMEOUT) {
// Timeout?
return true;
} else {
int index = dw - WSA_WAIT_EVENT_0;
if (index > 0) {
--index; // The first event is the socket event
event_owners[index]->OnPreEvent(0); //
event_owners[index]->OnEvent(0, 0); //这里会将fWait置为false
} else if (process_io) {
//处理socket
}
}
if (!fWait_)
break; //跳出while循环
}
return true;
}
此函数将线程负责的事件对象放到vector容器中(events),并且调用函数WSAWaitForMultipleEvents异步等待某个事件的触发。
当ss_实例被创建的时候,有个默认的事件对象被创建:
signal_wakeup_ = new Signaler(this, &fWait_);
并且signal_wakeup_会被保存在ss_对象的dispatchers_容器中。
在Wait函数中,会将dispatchers_中的事件对象放到临时的 events 容器中,当线程中没有消息或是等待消息的时候,线程会在WSAWaitForMultipleEvents中进行休眠,直到有消息到达。
当我们调用Thread::Post()接口的时候,会将post的消息放到线程的 msgq_ 容器中,并且触发ss_->WakeUp()函数。
void WakeUp()
{
signal_wakeup_->Signal();
}
WakeUp函数将触发signal_wakeup_事件,WSAWaitForMultipleEvents 函数返回,Wait函数将调用Signaler:: OnEvent ()将fWait置为false。Wait函数返回true