00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include <iostream>
00023 #include <OPENR/OSubject.h>
00024 #include <opensdkAPI.h>
00025 #include <ModuleData.h>
00026
00027 using namespace std;
00028
00029 #define FETCH_ITERATOR(ret) \
00030 ObserverConstIterator it = findObserver(id); \
00031 if (it == end()) { return ret; }
00032
00033
00034 ObserverIterator OSubject::findObserver(const ObserverID& id)
00035 {
00036 for (ObserverIterator it = observers_.begin(); it != observers_.end(); ++it) {
00037 if (it->GetObserverID() == id) {
00038 return it;
00039 }
00040 }
00041
00042 return observers_.end();
00043 }
00044
00045 ObserverConstIterator OSubject::findObserver(const ObserverID& id) const
00046 {
00047 for (ObserverConstIterator it = begin(); it != end(); ++it) {
00048 if (it->GetObserverID() == id) {
00049 return it;
00050 }
00051 }
00052
00053 return end();
00054 }
00055
00056 OSubject::OSubject(void) : maxBufferSize_(DEFAULT_MAX_BUFFER_SIZE), notifyUnitSize_(1)
00057 {
00058
00059 }
00060
00061 OSubject::~OSubject()
00062 {
00063 for (ObserverIterator it = observers_.begin(); it != observers_.end(); ++it) {
00064 it->DestroyBuffer();
00065 it->Delete();
00066 observers_.erase(it);
00067 }
00068 }
00069
00070 OStatus OSubject::SetReadyEntry(const OServiceEntry& entry)
00071 {
00072 myID_.Set(entry.GetOID(), entry.GetSelector());
00073
00074 for (std::list< std::pair<int,int> >::iterator it = MOD_DATA(ConnectHandlers).begin(); it != MOD_DATA(ConnectHandlers).end(); ++it) {
00075 if (it->second == 0) {
00076 it->second = entry.GetSelector();
00077 return oSUCCESS;
00078 }
00079 }
00080 return oFAIL;
00081
00082 }
00083
00084 OStatus OSubject::SetBufferSize(size_t size)
00085 {
00086 maxBufferSize_ = size;
00087 return oSUCCESS;
00088 }
00089
00090 OStatus OSubject::SetNotifyUnitSize(size_t size)
00091 {
00092 if (size > GetBufferSize() || size > MAX_NUM_DATA) {
00093 return oFAIL;
00094 }
00095
00096 notifyUnitSize_ = size;
00097 return oSUCCESS;
00098 }
00099
00100 OStatus OSubject::SetData(const OObserverInfo& info, RCRegion* r)
00101 {
00102 r->AddReference();
00103
00104
00105 if (info.impl_->d_skip) {
00106 info.impl_->queue[info.impl_->d_min]->RemoveReference();
00107 info.impl_->d_min = (info.impl_->d_min + 1) % info.impl_->q_size;
00108 --(info.impl_->n_data);
00109 }
00110
00111 info.impl_->queue[info.impl_->d_max] = r;
00112
00113 info.impl_->d_max = (info.impl_->d_max + 1) % info.impl_->q_size;
00114 if (info.impl_->d_max == info.impl_->d_min) {
00115 info.impl_->d_skip = 1;
00116 }
00117
00118 ++(info.impl_->n_data);
00119
00120 return oSUCCESS;
00121 }
00122
00123 OStatus OSubject::SetData(const ObserverID& id, RCRegion* r)
00124 {
00125 FETCH_ITERATOR(oFAIL)
00126 return SetData(*it, r);
00127 }
00128
00129 OStatus OSubject::SetData(RCRegion* r)
00130 {
00131 for (ObserverConstIterator it = begin(); it != end(); ++it) {
00132 OStatus tmp = SetData(*it, r);
00133 if (tmp != oSUCCESS) {
00134 return tmp;
00135 }
00136 }
00137
00138 return oSUCCESS;
00139 }
00140
00141 OStatus OSubject::SetData(const ObserverInfo& info, const void* buf, size_t size)
00142 {
00143 RCRegion* rgn = new RCRegion(size);
00144 memcpy(rgn->Base(), buf, size);
00145 OStatus tmp = SetData(info, rgn);
00146
00147
00148 rgn->RemoveReference();
00149
00150 return tmp;
00151 }
00152
00153 OStatus OSubject::SetData(const ObserverID& id, const void* buf, size_t size)
00154 {
00155 FETCH_ITERATOR(oFAIL)
00156 return SetData(*it, buf, size);
00157 }
00158
00159 OStatus OSubject::SetData(const void* buf, size_t size)
00160 {
00161 OStatus tmp = oSUCCESS;
00162 RCRegion* rgn = new RCRegion(size);
00163 memcpy(rgn->Base(), buf, size);
00164
00165 for (ObserverConstIterator it = begin(); it != end(); ++it) {
00166 tmp = SetData(*it, rgn);
00167 if (tmp != oSUCCESS) {
00168 break;
00169 }
00170 }
00171
00172
00173 rgn->RemoveReference();
00174
00175 return tmp;
00176 }
00177
00178 OStatus OSubject::NotifyObserver(const OObserverInfo& info)
00179 {
00180 if (IsReady(info)) {
00181 ONotifyMessage *msg = new ONotifyMessage();
00182 msg->subjectID = GetID();
00183 msg->numOfData = 0;
00184 msg->numOfNotify = ++(info.impl_->n_notify);
00185
00186 for (unsigned int i = 0; info.impl_->n_data; ++i) {
00187 msg->data[i].srcRCR = info.impl_->queue[info.impl_->d_min];
00188 info.impl_->d_min = (info.impl_->d_min + 1) % info.impl_->q_size;
00189 --(info.impl_->n_data);
00190 msg->numOfData++;
00191 }
00192
00193 info.impl_->d_skip = 0;
00194
00195 return _sendMessage(info.GetObserverID(), msg);
00196
00197 } else if (ReadyStatus(info) == -1) {
00198 ClearBuffer(info);
00199 }
00200
00201 return oSUCCESS;
00202 }
00203
00204 OStatus OSubject::NotifyObserver(const ObserverID& id)
00205 {
00206 FETCH_ITERATOR(oFAIL)
00207 return NotifyObserver(*it);
00208 }
00209
00210 OStatus OSubject::NotifyObservers(void)
00211 {
00212 for (ObserverConstIterator it = begin(); it != end(); ++it) {
00213 OStatus tmp = NotifyObserver(*it);
00214 if (tmp != oSUCCESS) {
00215 return tmp;
00216 }
00217 }
00218
00219 return oSUCCESS;
00220 }
00221
00222 size_t OSubject::RemainBuffer(const OObserverInfo& info) const
00223 {
00224 return (info.impl_->q_size - info.impl_->n_data);
00225 }
00226
00227 size_t OSubject::RemainBuffer(const ObserverID& id) const
00228 {
00229 FETCH_ITERATOR(0)
00230 return RemainBuffer(*it);
00231 }
00232
00233 size_t OSubject::RemainBuffer(void) const
00234 {
00235 size_t min = DEFAULT_MAX_BUFFER_SIZE;
00236 for (ObserverConstIterator it = begin(); it != end(); ++it) {
00237 size_t tmp = RemainBuffer(*it);
00238 if (tmp < min) {
00239 min = tmp;
00240 }
00241 }
00242
00243 return min;
00244 }
00245
00246
00247 OStatus OSubject::ClearBuffer(const OObserverInfo& info)
00248 {
00249 while (info.impl_->n_data) {
00250 info.impl_->queue[info.impl_->d_min]->RemoveReference();
00251 info.impl_->d_min = (info.impl_->d_min + 1) % info.impl_->q_size;
00252 --(info.impl_->n_data);
00253 }
00254
00255 info.impl_->d_min = 0;
00256 info.impl_->d_max = 0;
00257 info.impl_->d_skip = 0;
00258 return oSUCCESS;
00259 }
00260
00261 OStatus OSubject::ClearBuffer(const ObserverID& id)
00262 {
00263 FETCH_ITERATOR(oFAIL)
00264 return ClearBuffer(*it);
00265 }
00266
00267 OStatus OSubject::ClearBuffer()
00268 {
00269 for (ObserverConstIterator it = begin(); it != end(); ++it) {
00270 OStatus tmp = ClearBuffer(*it);
00271 if (tmp != oSUCCESS) {
00272 return tmp;
00273 }
00274 }
00275
00276 return oSUCCESS;
00277 }
00278
00279 OStatus OSubject::ControlHandler(const OControlMessage& msg, OStatus status)
00280 {
00281 if (status != oSUCCESS) {
00282 return status;
00283 }
00284
00285 if (msg.msgType != omsgCONTROL && msg.msgType != omsgCONTROL_V21) {
00286 OSYSLOG1((osyslogERROR, "Unknown message type received in ControlHandler: %d (Entry: %d/%d)", msg.msgType, (size_t)myID_.GetOID().GetAddress(), (int)myID_.GetSelector()));
00287 return oFAIL;
00288 }
00289
00290 switch (msg.command) {
00291 case objcommCMD_ADDOBSERVER:
00292 case objcommCMD_ADDOBSERVER_SELF:
00293 case objcommCMD_ADDOBSERVER_V22:
00294 {
00295 OObserverInfo info;
00296 info.impl_ = new _OObserverInfo(msg.observerID, 0, 0, 0, objcommVERSION22);
00297 info.CreateBuffer(GetBufferSize());
00298 observers_.push_back(info);
00299 }
00300 break;
00301
00302 case objcommCMD_REMOVEOBSERVER:
00303 case objcommCMD_REMOVEOBSERVER_SELF:
00304 case objcommCMD_REMOVEOBSERVER_V22:
00305 for (ObserverIterator it = observers_.begin(); it != observers_.end(); ++it) {
00306 if (it->GetObserverID().GetOID() == msg.observerID.GetOID() &&
00307 it->GetObserverID().GetSelector() == msg.observerID.GetSelector()) {
00308 it->DestroyBuffer();
00309 it->Delete();
00310 observers_.erase(it);
00311 break;
00312 }
00313 }
00314 break;
00315
00316 default:
00317 OSYSLOG1((osyslogERROR, "Unknown message command received in ControlHandler: %d", msg.command));
00318 return oFAIL;
00319 }
00320
00321 return oSUCCESS;
00322 }
00323
00324 OStatus OSubject::ReadyHandler(const OReadyMessage& msg)
00325 {
00326 if (msg.msgType != omsgREADY && msg.msgType != omsgREADY_V21 && msg.msgType != omsgREADY_V22) {
00327 OSYSLOG1((osyslogERROR, "Unknown message type received in ReadyHandler: %d (Entry: %d/%d)", msg.msgType, (size_t)myID_.GetOID().GetAddress(), (int)myID_.GetSelector()));
00328 return oFAIL;
00329 }
00330
00331 switch (msg.command) {
00332 case objcommCMD_ASSERT_READY:
00333 for (ObserverIterator it = observers_.begin(); it != observers_.end(); ++it) {
00334 if (it->GetObserverID().GetOID() == msg.observerID.GetOID() &&
00335 it->GetObserverID().GetSelector() == msg.observerID.GetSelector()) {
00336 it->impl_->ready_flag = 1;
00337 break;
00338 }
00339 }
00340 break;
00341
00342 case objcommCMD_DEASSERT_READY:
00343 for (ObserverIterator it = observers_.begin(); it != observers_.end(); ++it) {
00344 if (it->GetObserverID().GetOID() == msg.observerID.GetOID() &&
00345 it->GetObserverID().GetSelector() == msg.observerID.GetSelector()) {
00346 it->impl_->ready_flag = -1;
00347 break;
00348 }
00349 }
00350 break;
00351
00352 default:
00353 OSYSLOG1((osyslogERROR, "Unknown message command received in ReadyHandler: %d", msg.command));
00354 return oFAIL;
00355 }
00356
00357 return oSUCCESS;
00358 }
00359
00360 int OSubject::NumberOfObservers(void) const
00361 {
00362 return observers_.size();
00363 }
00364
00365 int OSubject::ReadyStatus(const ObserverID& id) const
00366 {
00367 FETCH_ITERATOR(0)
00368 return ReadyStatus(*it);
00369 }
00370
00371 int OSubject::IsAllReady(void) const
00372 {
00373 for (ObserverConstIterator it = begin(); it != end(); ++it) {
00374 if (ReadyStatus(*it) == 0) {
00375 return 0;
00376 }
00377 }
00378
00379 return 1;
00380 }
00381
00382 int OSubject::IsAnyReady(void) const
00383 {
00384 for (ObserverConstIterator it = begin(); it != end(); ++it) {
00385 if (IsReady(*it)) {
00386 return 1;
00387 }
00388 }
00389
00390 return 0;
00391 }
00392
00393 int OSubject::IsReady(const ObserverID& id) const
00394 {
00395 FETCH_ITERATOR(0)
00396 return IsReady(*it);
00397 }