OVirtualRobotComm.cc

Go to the documentation of this file.
00001 /*
00002  * This file is part of openSDK.
00003  *
00004  * Copyright (C) 2006-2007 openSDK team
00005  *
00006  * openSDK is free software; you can redistribute it and/or modify
00007  * it under the terms of the GNU General Public License as published by
00008  * the Free Software Foundation; version 2.
00009  *
00010  * openSDK is distributed in the hope that it will be useful,
00011  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00012  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
00013  * GNU General Public License for more details.
00014  *
00015  * You should have received a copy of the GNU General Public License
00016  * along with openSDK; if not, write to the Free Software
00017  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
00018  *
00019  *     $Id: OVirtualRobotComm.cc,v 1.30 2007/07/15 15:32:07 nuno-lopes Exp $
00020  */
00021 
00022 #include <iostream>
00023 #include <sys/time.h>
00024 #include <unistd.h>
00025 #include <errno.h>
00026 #include <OPENR/ObjcommMessages.h>
00027 #include <OPENR/OObserver.h>
00028 #include <OPENR/OSubject.h>
00029 #include <ModuleData.h>
00030 #include <Platform.h>
00031 #include <Primitives.h>
00032 #include "main.h"
00033 #include "helper.h"
00034 #include "opensdkAPI.h"
00035 
00036 using namespace std;
00037 
00038 static void* VirtualRobotComm(void* PTS);
00039 static void* VirtualRobotComm_sensor(void* PTS);
00040 static void* VirtualRobotComm_image(void* PTS);
00041 static void sigalrm_handler(int);
00042 static void* process_sensor_readings(void*);
00043 static void* process_images(void*);
00044 static void processVectorData(OCommandInfo *info, OCommandData *data);
00045 
00046 #define NEW_SYSTEM_OBJ(func) \
00047         id = ThreadsList.size(); \
00048         data = new perThreadStruct(id); /* deleted in cleanup() */ \
00049         \
00050         if (pthread_create(&(data->threadId), NULL, func, data)) { \
00051                 cerr << "Couldn't create a new thread for the " << #func << " system object" << endl; \
00052                 exit(0x77); \
00053         } \
00054         ThreadsList.push_back(data);
00055 
00056 
00058 void init_virtual_robot_comm(void)
00059 {
00060         size_t id;
00061         perThreadStruct *data;
00062 
00063         NEW_SYSTEM_OBJ(VirtualRobotComm)
00064         NEW_SYSTEM_OBJ(VirtualRobotComm_sensor)
00065         NEW_SYSTEM_OBJ(VirtualRobotComm_image);
00066 
00067         pthread_t thread;
00068         if (pthread_create(&thread, NULL, process_sensor_readings, NULL)) {
00069                 cerr << "Couldn't create a new thread to connect to the sensor socket" << endl;
00070                 exit(0x78);
00071         }
00072 
00073         if (pthread_create(&thread, NULL, process_images, NULL)) {
00074                 cerr << "Couldn't create a new thread to connect to the images socket" << endl;
00075                 exit(0x79);
00076         }
00077 }
00078 
00079 #undef MOD_DATA
00080 #define MOD_DATA(x) (PTS->x)
00081 
00083 static void* VirtualRobotComm(void *data)
00084 {
00085         BLOCK_SIGNALS()
00086         pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
00087 
00088         perThreadStruct *PTS = (perThreadStruct*)data;
00089         pthread_setspecific(perThreadKey, data);
00090 
00091         // register this as a standard OPEN-R object
00092         OObserver observer;
00093         {
00094         OServiceEntry entry((void*)MOD_DATA(OID), 0);
00095         const char name[]="OVirtualRobotComm.Effector.OCommandVectorData.O";
00096         RegisterServiceEntry(entry, name);
00097         observer.SetNotifyEntry(entry);
00098         }
00099 
00100         MOD_DATA(DoInitCompleted) = true;
00101 
00102         // DoStart can only be called after all DoInit()s
00103         while (__DoInitCompleted == false) {
00104                 opensdk_yield();
00105         }
00106 
00107         // process connect messages
00108         pthread_mutex_lock(&MOD_DATA(queue_mutex));
00109         while (!MOD_DATA(queue).empty()) {
00110                 sem_wait(&MOD_DATA(sem)); // just to decrement the counter
00111 
00112                 if (__in_shutdown) {
00113                         MOD_DATA(DoStopCompleted) = true;
00114                         goto exit_and_signal;
00115                 }
00116 
00117                 struct message m = MOD_DATA(queue).front();
00118                 MOD_DATA(queue).pop();
00119 
00120                 OConnectMessage *msg = (OConnectMessage*)m.msg;
00121                 observer.ConnectHandler(*msg);
00122 
00123                 delete msg;
00124         }
00125         pthread_mutex_unlock(&MOD_DATA(queue_mutex));
00126 
00127         observer.AssertReady();
00128 
00129         // proccess messages
00130         while(1) {
00131                 sem_wait(&MOD_DATA(sem));
00132 
00133                 if (__in_shutdown) {
00134                         MOD_DATA(DoStopCompleted) = true;
00135                         break;
00136                 }
00137 
00138                 pthread_mutex_lock(&MOD_DATA(queue_mutex));
00139                 struct message m = MOD_DATA(queue).front();
00140                 MOD_DATA(queue).pop();
00141                 pthread_mutex_unlock(&MOD_DATA(queue_mutex));
00142 
00143                 ONotifyMessage *msg = (ONotifyMessage*)m.msg;
00144 
00145                 // sanity check
00146                 if (msg->msgType != omsgNOTIFY_V22) {
00147                         cerr << "strange message received in OVirtualRobotComm(2). type=" << msg->msgType << endl;
00148                         delete msg;
00149                         continue;
00150                 }
00151 
00152                 for (int i=0; i < msg->numOfData; ++i) {
00153                         RCRegion *RCR = msg->data[i].srcRCR;
00154                         OCommandVectorData *data = (OCommandVectorData*)RCR->Base();
00155 
00156                         for (unsigned int j=0; j < data->vectorInfo.numData; ++j) {
00157                                 processVectorData(data->GetInfo(j), data->GetData(j));
00158                         }
00159 
00160                         RCR->RemoveReference();
00161                 }
00162 
00163                 usleep(USECONDS_TICK/64);
00164 
00165                 observer.AssertReady(msg->subjectID);
00166 
00167                 delete msg;
00168         }
00169 
00170 exit_and_signal:
00171         MOD_DATA(exited) = true;
00172         pthread_cond_signal(&MOD_DATA(cond));
00173         return NULL;
00174 }
00175 
00176 
00177 static OSubject *subject; 
00178 static long frameNumber = 0; 
00179 static unsigned int tick_number = 0; 
00180 static OSensorFrameVectorData *SensorFrameData;
00181 static pthread_mutex_t sensorFrameData_mutex = PTHREAD_MUTEX_INITIALIZER;
00182 static const unsigned int numVectorData = num_of_primitives;
00183 static const size_t SensorFrameData_size = sizeof(OSensorFrameVectorData) + (sizeof(OSensorFrameInfo) + sizeof(OSensorFrameData)) * numVectorData;
00184 
00186 slongword VirtualRobotComm_get_sensor_reading(int id)
00187 {
00188         pthread_mutex_lock(&sensorFrameData_mutex);
00189         slongword tmp = SensorFrameData->GetData(id)->frame[tick_number].value;
00190         pthread_mutex_unlock(&sensorFrameData_mutex);
00191         return tmp;
00192 }
00193 
00195 static void* VirtualRobotComm_sensor(void *data)
00196 {
00197         BLOCK_SIGNALS_EXCEPT(SIGALRM)
00198         pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
00199 
00200         perThreadStruct *PTS = (perThreadStruct*)data;
00201         pthread_setspecific(perThreadKey, data);
00202 
00203         subject = new OSubject;
00204 
00205         // create the data that will be sent to observers
00206         SensorFrameData = (OSensorFrameVectorData*)calloc(1, SensorFrameData_size);
00207         SensorFrameData->SetNumData(numVectorData);
00208 
00209         for (unsigned int i=0; i < numVectorData; ++i) {
00210                 OSensorFrameInfo *info = SensorFrameData->GetInfo(i);
00211                 info->primitiveID = i;
00212                 info->numFrames   = 4;
00213                 info->frameSize   = 16; //all frames are 16 bytes long
00214                 info->dataOffset = sizeof(ODataVectorInfo) + sizeof(OSensorFrameInfo)*numVectorData + sizeof(OSensorFrameData)*i;
00215         }
00216 
00217         // register this as a standard OPEN-R object
00218         {
00219         OServiceEntry entry((void*)MOD_DATA(OID), 0);
00220         const char name[]="OVirtualRobotComm.Sensor.OSensorFrameVectorData.S";
00221         RegisterServiceEntry(entry, name);
00222         subject->SetReadyEntry(entry);
00223         }
00224 
00225         MOD_DATA(DoInitCompleted) = true;
00226 
00227         {
00228         struct itimerval timer = {
00229                 {0, USECONDS_TICK}, {0, USECONDS_TICK}
00230         };
00231         signal(SIGALRM, sigalrm_handler);
00232 
00233         if (setitimer(ITIMER_REAL, &timer, NULL) < 0) {
00234                 perror("setitimer() failed");
00235                 exit(0x41);
00236         }
00237         }
00238 
00239         // DoStart can only be called after all DoInit()s
00240         while (__DoInitCompleted == false) {
00241                 opensdk_yield();
00242         }
00243 
00244         // proccess messages
00245         while(1) {
00246                 if (sem_wait(&MOD_DATA(sem)) < 0) continue;
00247 
00248                 if (__in_shutdown) {
00249                         MOD_DATA(DoStopCompleted) = true;
00250                         break;
00251                 }
00252 
00253                 pthread_mutex_lock(&MOD_DATA(queue_mutex));
00254                 struct message m = MOD_DATA(queue).front();
00255                 MOD_DATA(queue).pop();
00256                 pthread_mutex_unlock(&MOD_DATA(queue_mutex));
00257 
00258                 OReadyMessage *msg = (OReadyMessage*)m.msg;
00259 
00260                 if (msg->msgType == omsgCONTROL || msg->msgType == omsgCONTROL_V21) {
00261                         OControlMessage *msg = (OControlMessage*)m.msg;
00262                         subject->ControlHandler(*msg);
00263                 } else {
00264                         subject->ReadyHandler(*msg);
00265                 }
00266 
00267                 delete msg;
00268         }
00269 
00270         MOD_DATA(exited) = true;
00271         pthread_cond_signal(&MOD_DATA(cond));
00272         return NULL;
00273 }
00274 
00275 
00277 static void sigalrm_handler(int)
00278 {
00279         pthread_mutex_lock(&sensorFrameData_mutex);
00280         unsigned int old_frame = tick_number;
00281 
00282         if (++tick_number == NUM_TICKS_TO_SEND_DATA) {
00283                 tick_number = 0;
00284 
00285                 for (unsigned int i=0; i < numVectorData; ++i) {
00286                         SensorFrameData->GetInfo(i)->frameNumber = frameNumber;
00287                 }
00288 
00289                 frameNumber = (frameNumber + NUM_TICKS_TO_SEND_DATA) % oframeMAX_NUMBER;
00290 
00291                 subject->SetData(SensorFrameData, SensorFrameData_size);
00292                 subject->NotifyObservers();
00293         }
00294 
00295         // copy the data from the previous frame to the new one, so that it can be used
00296         // if the simulator doesn't send anything in the short sensor frame period
00297         for (unsigned int i=0; i < numVectorData; ++i) {
00298                 SensorFrameData->GetData(i)->frame[tick_number].value = SensorFrameData->GetData(i)->frame[old_frame].value;
00299         }
00300 
00301         pthread_mutex_unlock(&sensorFrameData_mutex);
00302 }
00303 
00304 
00305 #define GET_DOUBLE(str) n = strtod(buf+sizeof(str)-1, NULL)
00306 #define SET_VALUE(id, val) { \
00307                 pthread_mutex_lock(&sensorFrameData_mutex); \
00308                 SensorFrameData->GetData(id)->frame[tick_number].value = (slongword)(val * 1000000.0); \
00309                 pthread_mutex_unlock(&sensorFrameData_mutex); \
00310         }
00311 
00313 static void* process_sensor_readings(void*)
00314 {
00315         FILE **fp = &socketList[SENSORS].fp_client;
00316         char buf[2048];
00317 
00318         BLOCK_SIGNALS()
00319 
00320         while (true) {
00321                 if (!*fp) {
00322                         sleep(2);
00323                         continue;
00324                 }
00325 
00326                 if (!fgets(buf, sizeof(buf), *fp)) {
00327                         continue;
00328                 }
00329 
00330                 int joint_id;
00331                 double n;
00332 
00333                 if (!strncmp(buf, "AX:", sizeof("AX:")-1)) {
00334                         GET_DOUBLE("AX:");
00335                         SET_VALUE(1, n)
00336 
00337                 } else if (!strncmp(buf, "AY:", sizeof("AY:")-1)) {
00338                         GET_DOUBLE("AY:");
00339                         SET_VALUE(0, n)
00340 
00341                 } else if (!strncmp(buf, "AZ:", sizeof("AZ:")-1)) {
00342                         GET_DOUBLE("AZ:");
00343                         SET_VALUE(2, n)
00344 
00345                 } else if (!strncmp(buf, "EDG:", sizeof("EDG:")-1)) {
00346                         GET_DOUBLE("EDG:");
00347                         SET_VALUE(10, n)
00348 
00349                 } else if (!strncmp(buf, "IRF:", sizeof("IRF:")-1)) {
00350                         GET_DOUBLE("IRF:");
00351                         SET_VALUE(40, n)
00352 
00353                 } else if (!strncmp(buf, "IRN:", sizeof("IRN:")-1)) {
00354                         GET_DOUBLE("IRN:");
00355                         SET_VALUE(39, n)
00356 
00357                 } else if (sscanf(buf, "J%d:%lf", &joint_id, &n) == 2) { // joint values
00358                         SET_VALUE(joint_id, n)
00359 
00360                 } else {
00361                         cerr << "bogus message received from the sensor socket: '" << buf << "'" << endl;
00362                 }
00363         }
00364 
00365         return NULL;
00366 }
00367 
00368 
00370 static void send_server(char *str)
00371 {
00372         int sock = socketList[ACTUATORS].fd_client;
00373         if (sock < 1) return;
00374 
00375         ssize_t ret;
00376         do {
00377                 ret = write(sock, str, strlen(str));
00378         } while (ret < 0 && errno == EINTR);
00379 }
00380 
00381 
00383 static OSubject *img_subject;
00384 static longword img_frame_number = 0;
00385 
00386 
00388 static void* VirtualRobotComm_image(void *data)
00389 {
00390         BLOCK_SIGNALS()
00391         pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
00392 
00393         perThreadStruct *PTS = (perThreadStruct*)data;
00394         pthread_setspecific(perThreadKey, data);
00395 
00396         img_subject = new OSubject();
00397 
00398         // register this as a standard OPEN-R object
00399         {
00400         OServiceEntry entry((void*)MOD_DATA(OID), 0);
00401         const char name[]="OVirtualRobotComm.FbkImageSensor.OFbkImageVectorData.S";
00402         RegisterServiceEntry(entry, name);
00403         img_subject->SetReadyEntry(entry);
00404         }
00405 
00406         MOD_DATA(DoInitCompleted) = true;
00407 
00408         // DoStart can only be called after all DoInit()s
00409         while (__DoInitCompleted == false) {
00410                 opensdk_yield();
00411         }
00412 
00413         // proccess messages
00414         while(1) {
00415                 sem_wait(&MOD_DATA(sem));
00416 
00417                 if (__in_shutdown) {
00418                         MOD_DATA(DoStopCompleted) = true;
00419                         break;
00420                 }
00421 
00422                 pthread_mutex_lock(&MOD_DATA(queue_mutex));
00423                 struct message m = MOD_DATA(queue).front();
00424                 MOD_DATA(queue).pop();
00425                 pthread_mutex_unlock(&MOD_DATA(queue_mutex));
00426 
00427                 OReadyMessage *msg = (OReadyMessage*)m.msg;
00428 
00429                 if (msg->msgType == omsgCONTROL || msg->msgType == omsgCONTROL_V21) {
00430                         OControlMessage *msg = (OControlMessage*)m.msg;
00431                         img_subject->ControlHandler(*msg);
00432                 } else {
00433                         img_subject->ReadyHandler(*msg);
00434                 }
00435 
00436                 delete msg;
00437         }
00438 
00439         MOD_DATA(exited) = true;
00440         pthread_cond_signal(&MOD_DATA(cond));
00441         return NULL;
00442 }
00443 
00444 
00445 
00446 #define FILL_INFO(idx,s,sprev) \
00447         { \
00448         OFbkImageInfo *info = data->GetInfo(idx); \
00449         info->primitiveID = 18; \
00450         info->type        = odataFBK_YCrCb; \
00451         info->dataSize    = IMG_SIZE/((s)*(s)); \
00452         info->width       = IMG_WIDTH/(s); \
00453         info->height      = IMG_HEIGHT/(s); \
00454         info->dataOffset  = sizeof(ODataVectorInfo) + sizeof(OFbkImageInfo)*IMG_LAYERS + (sprev); \
00455         }
00456 
00458 static void* process_images(void*)
00459 {
00460         FILE **fp = &socketList[IMAGE].fp_client;
00461         const size_t data_size = sizeof(OFbkImageVectorData) + sizeof(OFbkImageInfo)*IMG_LAYERS + (IMG_SIZE+IMG_SIZE/4+IMG_SIZE/16+IMG_SIZE);
00462         OFbkImageVectorData *data;
00463 
00464         BLOCK_SIGNALS()
00465 
00466         data = (OFbkImageVectorData*)malloc(data_size);
00467 
00468         data->vectorInfo.numData = IMG_LAYERS;
00469 
00470 
00471         FILL_INFO(ofbkimageLAYER_H, 1, 0)
00472         FILL_INFO(ofbkimageLAYER_M, 2, IMG_SIZE)
00473         FILL_INFO(ofbkimageLAYER_L, 4, IMG_SIZE+IMG_SIZE/4)
00474         FILL_INFO(ofbkimageLAYER_C, 1, IMG_SIZE+IMG_SIZE/4+IMG_SIZE/16)
00475 
00476         while (true) {
00477                 if (!*fp) {
00478                         sleep(2);
00479                         continue;
00480                 }
00481 
00482                 if (!fread(data->GetData(ofbkimageLAYER_H), IMG_SIZE, 1, *fp) ||
00483                 !fread(data->GetData(ofbkimageLAYER_M), IMG_SIZE/4, 1, *fp) ||
00484                 !fread(data->GetData(ofbkimageLAYER_L), IMG_SIZE/16, 1, *fp)
00485                 ) {
00486                         continue;
00487                 }
00488 
00489                 for (unsigned int i=0; i < IMG_LAYERS; ++i) {
00490                         data->GetInfo(i)->frameNumber = img_frame_number;
00491                 }
00492 
00493                 ++img_frame_number;
00494 
00495                 // TODO: copy the image to other buffer with the necessary transformations
00496 
00497                 // send the imgs to observers
00498                 img_subject->SetData(data, data_size);
00499                 img_subject->NotifyObservers();
00500 
00501         }
00502 
00503         return NULL;
00504 }
00505 
00506 
00508 static void processVectorData(OCommandInfo *info, OCommandData *data_)
00509 {
00510         OPrimitiveID primitiveID = info->primitiveID;
00511         size_t numFrames = info->numFrames;
00512         char buffer[2048], *str = buffer;
00513 
00514         str += sprintf(str, "%ld", primitiveID);
00515 
00516         switch (info->type) {
00517                 case odataLED_COMMAND2:
00518                         {
00519                         const OLEDCommandValue2 *data = (const OLEDCommandValue2*)data_->value;
00520                         //TODO
00521                         break;
00522                         }
00523 
00524                 case odataLED_COMMAND3:
00525                         {
00526                         const OLEDCommandValue3 *data = (const OLEDCommandValue3*)data_->value;
00527                         //TODO
00528                         break;
00529                         }
00530 
00531                 case odataJOINT_COMMAND2:
00532                         {
00533                         const OJointCommandValue2 *data = (const OJointCommandValue2*)data_->value;
00534                         for (unsigned int i=0; i < numFrames; ++i) {
00535                                 double val = data[i].value; // 10^-6 rad
00536                                 val /= 1000000.0; // rad
00537 
00538                                 str += sprintf(str, " %lf", val);
00539                         }
00540 
00541                         sprintf(str, "\n");
00542                         send_server(buffer);
00543                         break;
00544                         }
00545 
00546                 case odataJOINT_COMMAND4:
00547                         {
00548                         const OJointCommandValue4 *data = (const OJointCommandValue4*)data_->value;
00549                         //TODO
00550                         break;
00551                         }
00552 
00553                 default:
00554                         cerr << "processVectorData: structure format not implemented: " << info->type << endl;
00555         }
00556 }

Generated on Sun Dec 2 23:04:30 2007 for openSDK by  doxygen 1.3.9.1