OSubject.cc

Go to the documentation of this file.
00001 /*
00002  * This file is part of openSDK.
00003  *
00004  * Copyright (C) 2006-2007 openSDK team
00005  *
00006  * openSDK is free software; you can redistribute it and/or modify
00007  * it under the terms of the GNU General Public License as published by
00008  * the Free Software Foundation; version 2.
00009  *
00010  * openSDK is distributed in the hope that it will be useful,
00011  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00012  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
00013  * GNU General Public License for more details.
00014  *
00015  * You should have received a copy of the GNU General Public License
00016  * along with openSDK; if not, write to the Free Software
00017  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
00018  *
00019  *     $Id: OSubject.cc,v 1.22 2007/03/25 12:58:16 nuno-lopes Exp $
00020  */
00021 
00022 #include <iostream>
00023 #include <OPENR/OSubject.h>
00024 #include <opensdkAPI.h>
00025 #include <ModuleData.h>
00026 
00027 using namespace std;
00028 
00029 #define FETCH_ITERATOR(ret) \
00030         ObserverConstIterator it = findObserver(id); \
00031         if (it == end()) { return ret; }
00032 
00033 
00034 ObserverIterator OSubject::findObserver(const ObserverID& id)
00035 {
00036         for (ObserverIterator it = observers_.begin(); it != observers_.end(); ++it) {
00037                 if (it->GetObserverID() == id) {
00038                         return it;
00039                 }
00040         }
00041 
00042         return observers_.end(); // invalid iterator
00043 }
00044 
00045 ObserverConstIterator OSubject::findObserver(const ObserverID& id) const
00046 {
00047         for (ObserverConstIterator it = begin(); it != end(); ++it) {
00048                 if (it->GetObserverID() == id) {
00049                         return it;
00050                 }
00051         }
00052 
00053         return end(); // invalid iterator
00054 }
00055 
00056 OSubject::OSubject(void) : maxBufferSize_(DEFAULT_MAX_BUFFER_SIZE), notifyUnitSize_(1)
00057 {
00058         //TODO
00059 }
00060 
00061 OSubject::~OSubject()
00062 {
00063         for (ObserverIterator it = observers_.begin(); it != observers_.end(); ++it) {
00064                 it->DestroyBuffer();
00065                 it->Delete();
00066                 observers_.erase(it);
00067         }
00068 }
00069 
00070 OStatus OSubject::SetReadyEntry(const OServiceEntry& entry)
00071 {
00072         myID_.Set(entry.GetOID(), entry.GetSelector());
00073 
00074         for (std::list< std::pair<int,int> >::iterator it = MOD_DATA(ConnectHandlers).begin(); it != MOD_DATA(ConnectHandlers).end(); ++it) {
00075                 if (it->second == 0) {
00076                         it->second = entry.GetSelector();
00077                         return oSUCCESS;
00078                 }
00079         }
00080         return oFAIL;
00081 
00082 }
00083 
00084 OStatus OSubject::SetBufferSize(size_t size)
00085 {
00086         maxBufferSize_ = size;
00087         return oSUCCESS;
00088 }
00089 
00090 OStatus OSubject::SetNotifyUnitSize(size_t size)
00091 {
00092         if (size > GetBufferSize() || size > MAX_NUM_DATA) {
00093                 return oFAIL;
00094         }
00095 
00096         notifyUnitSize_ = size;
00097         return oSUCCESS;
00098 }
00099 
00100 OStatus OSubject::SetData(const OObserverInfo& info, RCRegion* r)
00101 {
00102         r->AddReference();
00103 
00104         // queue is full, overwrite the oldest
00105         if (info.impl_->d_skip) {
00106                 info.impl_->queue[info.impl_->d_min]->RemoveReference();
00107                 info.impl_->d_min = (info.impl_->d_min + 1) % info.impl_->q_size;
00108                 --(info.impl_->n_data);
00109         }
00110 
00111         info.impl_->queue[info.impl_->d_max] = r;
00112 
00113         info.impl_->d_max = (info.impl_->d_max + 1) % info.impl_->q_size;
00114         if (info.impl_->d_max == info.impl_->d_min) {
00115                 info.impl_->d_skip = 1; // queue became full
00116         }
00117 
00118         ++(info.impl_->n_data);
00119 
00120         return oSUCCESS;
00121 }
00122 
00123 OStatus OSubject::SetData(const ObserverID& id, RCRegion* r)
00124 {
00125         FETCH_ITERATOR(oFAIL)
00126         return SetData(*it, r);
00127 }
00128 
00129 OStatus OSubject::SetData(RCRegion* r)
00130 {
00131         for (ObserverConstIterator it = begin(); it != end(); ++it) {
00132                 OStatus tmp = SetData(*it, r);
00133                 if (tmp != oSUCCESS) {
00134                         return tmp;
00135                 }
00136         }
00137 
00138         return oSUCCESS;
00139 }
00140 
00141 OStatus OSubject::SetData(const ObserverInfo& info, const void* buf, size_t size)
00142 {
00143         RCRegion* rgn = new RCRegion(size);
00144         memcpy(rgn->Base(), buf, size);
00145         OStatus tmp = SetData(info, rgn);
00146 
00147         // remove our reference since each observer will increment the reference counter
00148         rgn->RemoveReference();
00149 
00150         return tmp;
00151 }
00152 
00153 OStatus OSubject::SetData(const ObserverID& id, const void* buf, size_t size)
00154 {
00155         FETCH_ITERATOR(oFAIL)
00156         return SetData(*it, buf, size);
00157 }
00158 
00159 OStatus OSubject::SetData(const void* buf, size_t size)
00160 {
00161         OStatus tmp = oSUCCESS;
00162         RCRegion* rgn = new RCRegion(size);
00163         memcpy(rgn->Base(), buf, size);
00164 
00165         for (ObserverConstIterator it = begin(); it != end(); ++it) {
00166                 tmp = SetData(*it, rgn);
00167                 if (tmp != oSUCCESS) {
00168                         break;
00169                 }
00170         }
00171 
00172         // remove our reference since each observer will increment the reference counter
00173         rgn->RemoveReference();
00174 
00175         return tmp;
00176 }
00177 
00178 OStatus OSubject::NotifyObserver(const OObserverInfo& info)
00179 {
00180         if (IsReady(info)) { //ASSERT-READY state
00181                 ONotifyMessage *msg = new ONotifyMessage();
00182                 msg->subjectID      = GetID();
00183                 msg->numOfData      = 0;
00184                 msg->numOfNotify    = ++(info.impl_->n_notify);
00185 
00186                 for (unsigned int i = 0; info.impl_->n_data; ++i) {
00187                         msg->data[i].srcRCR = info.impl_->queue[info.impl_->d_min];
00188                         info.impl_->d_min = (info.impl_->d_min + 1) % info.impl_->q_size;
00189                         --(info.impl_->n_data);
00190                         msg->numOfData++;
00191                 }
00192 
00193                 info.impl_->d_skip = 0;
00194 
00195                 return _sendMessage(info.GetObserverID(), msg);
00196 
00197         } else if (ReadyStatus(info) == -1) { // DEASSERT-READY state
00198                 ClearBuffer(info);
00199         }
00200 
00201         return oSUCCESS;
00202 }
00203 
00204 OStatus OSubject::NotifyObserver(const ObserverID& id)
00205 {
00206         FETCH_ITERATOR(oFAIL)
00207         return NotifyObserver(*it);
00208 }
00209 
00210 OStatus OSubject::NotifyObservers(void)
00211 {
00212         for (ObserverConstIterator it = begin(); it != end(); ++it) {
00213                 OStatus tmp = NotifyObserver(*it);
00214                 if (tmp != oSUCCESS) {
00215                         return tmp;
00216                 }
00217         }
00218 
00219         return oSUCCESS;
00220 }
00221 
00222 size_t OSubject::RemainBuffer(const OObserverInfo& info) const
00223 {
00224         return (info.impl_->q_size - info.impl_->n_data);
00225 }
00226 
00227 size_t OSubject::RemainBuffer(const ObserverID& id) const
00228 {
00229         FETCH_ITERATOR(0)
00230         return RemainBuffer(*it);
00231 }
00232 
00233 size_t OSubject::RemainBuffer(void) const
00234 {
00235         size_t min = DEFAULT_MAX_BUFFER_SIZE;
00236         for (ObserverConstIterator it = begin(); it != end(); ++it) {
00237                 size_t tmp = RemainBuffer(*it);
00238                 if (tmp < min) {
00239                         min = tmp;
00240                 }
00241         }
00242 
00243         return min;
00244 }
00245 
00246 
00247 OStatus OSubject::ClearBuffer(const OObserverInfo& info)
00248 {
00249         while (info.impl_->n_data) {
00250                 info.impl_->queue[info.impl_->d_min]->RemoveReference();
00251                 info.impl_->d_min = (info.impl_->d_min + 1) % info.impl_->q_size;
00252                 --(info.impl_->n_data);
00253         }
00254 
00255         info.impl_->d_min  = 0;
00256         info.impl_->d_max  = 0;
00257         info.impl_->d_skip = 0;
00258         return oSUCCESS;
00259 }
00260 
00261 OStatus OSubject::ClearBuffer(const ObserverID& id)
00262 {
00263         FETCH_ITERATOR(oFAIL)
00264         return ClearBuffer(*it);
00265 }
00266 
00267 OStatus OSubject::ClearBuffer()
00268 {
00269         for (ObserverConstIterator it = begin(); it != end(); ++it) {
00270                 OStatus tmp = ClearBuffer(*it);
00271                 if (tmp != oSUCCESS) {
00272                         return tmp;
00273                 }
00274         }
00275 
00276         return oSUCCESS;
00277 }
00278 
00279 OStatus OSubject::ControlHandler(const OControlMessage& msg, OStatus status)
00280 {
00281         if (status != oSUCCESS) {
00282                 return status;
00283         }
00284 
00285         if (msg.msgType != omsgCONTROL && msg.msgType != omsgCONTROL_V21) {
00286                 OSYSLOG1((osyslogERROR, "Unknown message type received in ControlHandler: %d (Entry: %d/%d)", msg.msgType, (size_t)myID_.GetOID().GetAddress(), (int)myID_.GetSelector()));
00287                 return oFAIL;
00288         }
00289 
00290         switch (msg.command) {
00291                 case objcommCMD_ADDOBSERVER:
00292                 case objcommCMD_ADDOBSERVER_SELF:
00293                 case objcommCMD_ADDOBSERVER_V22:
00294                         {
00295                         OObserverInfo info;
00296                         info.impl_ = new _OObserverInfo(msg.observerID, 0, 0, 0, objcommVERSION22);
00297                         info.CreateBuffer(GetBufferSize());
00298                         observers_.push_back(info);
00299                         }
00300                         break;
00301 
00302                 case objcommCMD_REMOVEOBSERVER:
00303                 case objcommCMD_REMOVEOBSERVER_SELF:
00304                 case objcommCMD_REMOVEOBSERVER_V22:
00305                         for (ObserverIterator it = observers_.begin(); it != observers_.end(); ++it) {
00306                                 if (it->GetObserverID().GetOID() == msg.observerID.GetOID() &&
00307                                 it->GetObserverID().GetSelector() == msg.observerID.GetSelector()) {
00308                                         it->DestroyBuffer();
00309                                         it->Delete();
00310                                         observers_.erase(it);
00311                                         break;
00312                                 }
00313                         }
00314                         break;
00315 
00316                 default:
00317                         OSYSLOG1((osyslogERROR, "Unknown message command received in ControlHandler: %d", msg.command));
00318                         return oFAIL;
00319         }
00320 
00321         return oSUCCESS;
00322 }
00323 
00324 OStatus OSubject::ReadyHandler(const OReadyMessage& msg)
00325 {
00326         if (msg.msgType != omsgREADY && msg.msgType != omsgREADY_V21 && msg.msgType != omsgREADY_V22) {
00327                 OSYSLOG1((osyslogERROR, "Unknown message type received in ReadyHandler: %d (Entry: %d/%d)", msg.msgType, (size_t)myID_.GetOID().GetAddress(), (int)myID_.GetSelector()));
00328                 return oFAIL;
00329         }
00330 
00331         switch (msg.command) {
00332                 case objcommCMD_ASSERT_READY:
00333                         for (ObserverIterator it = observers_.begin(); it != observers_.end(); ++it) {
00334                                 if (it->GetObserverID().GetOID() == msg.observerID.GetOID() &&
00335                                 it->GetObserverID().GetSelector() == msg.observerID.GetSelector()) {
00336                                         it->impl_->ready_flag = 1;
00337                                         break;
00338                                 }
00339                         }
00340                         break;
00341 
00342                 case objcommCMD_DEASSERT_READY:
00343                         for (ObserverIterator it = observers_.begin(); it != observers_.end(); ++it) {
00344                                 if (it->GetObserverID().GetOID() == msg.observerID.GetOID() &&
00345                                 it->GetObserverID().GetSelector() == msg.observerID.GetSelector()) {
00346                                         it->impl_->ready_flag = -1;
00347                                         break;
00348                                 }
00349                         }
00350                         break;
00351 
00352                 default:
00353                         OSYSLOG1((osyslogERROR, "Unknown message command received in ReadyHandler: %d", msg.command));
00354                         return oFAIL;
00355         }
00356 
00357         return oSUCCESS;
00358 }
00359 
00360 int OSubject::NumberOfObservers(void) const
00361 {
00362         return observers_.size();
00363 }
00364 
00365 int OSubject::ReadyStatus(const ObserverID& id) const
00366 {
00367         FETCH_ITERATOR(0)
00368         return ReadyStatus(*it);
00369 }
00370 
00371 int OSubject::IsAllReady(void) const
00372 {
00373         for (ObserverConstIterator it = begin(); it != end(); ++it) {
00374                 if (ReadyStatus(*it) == 0) { // sent state
00375                         return 0;
00376                 }
00377         }
00378 
00379         return 1;
00380 }
00381 
00382 int OSubject::IsAnyReady(void) const
00383 {
00384         for (ObserverConstIterator it = begin(); it != end(); ++it) {
00385                 if (IsReady(*it)) { // assert-ready
00386                         return 1;
00387                 }
00388         }
00389 
00390         return 0;
00391 }
00392 
00393 int OSubject::IsReady(const ObserverID& id) const
00394 {
00395         FETCH_ITERATOR(0)
00396         return IsReady(*it);
00397 }

Generated on Sun Dec 2 23:04:30 2007 for openSDK by  doxygen 1.3.9.1