00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include <iostream>
00023 #include <stack>
00024 #include <vector>
00025
00026 #include <cstring>
00027 #include <cstdlib>
00028 #include <cstdio>
00029 #include <time.h>
00030 #include <unistd.h>
00031 #include <dlfcn.h>
00032 #include <pthread.h>
00033 #include <sys/types.h>
00034 #include <sys/stat.h>
00035 #include <fcntl.h>
00036 #include <sys/socket.h>
00037 #include <sys/un.h>
00038 #include <signal.h>
00039
00040 #include <Platform.h>
00041 #include <ModuleData.h>
00042 #include <OpenSdkThread.h>
00043 #include <utils.h>
00044 #include "main.h"
00045 #include "helper.h"
00046 #include "gunzip.h"
00047 #include "OVirtualRobotComm.h"
00048 #include "opensdkAPI.h"
00049
00050 #include <OPENR/ObjcommMessages.h>
00051 #include <OPENR/OPENRAPI.h>
00052
00053 using namespace std;
00054
00055 #define CHECK_SYSCALL(val, msg) if ((val) == -1) { perror(msg); exit(4); }
00056 #define CLOSE_SOCKET(sock) if (sock != -1) { close(sock); sock = -1; }
00057 #define CREATE_SOCKET_THREAD(sock) \
00058 { \
00059 pthread_t myThread; \
00060 if (pthread_create(&myThread, NULL, listen_socket, sock)) { \
00061 cerr << "Couldn't create a new thread to load servers" << endl; \
00062 return 5; \
00063 } \
00064 skThreadsList.push(myThread); \
00065 }
00066
00067 static stack<void*> dlopenHandlers;
00068 vector<perThreadStruct*> ThreadsList;
00069 static stack<pthread_t> skThreadsList;
00070
00072 socketList_t socketList[]= {
00073 {-1, -1, NULL, ACTUATORS_SERVER_SOCKET},
00074 {-1, -1, NULL, SENSORS_SERVER_SOCKET},
00075 {-1, -1, NULL, IMAGE_SERVER_SOCKET},
00076 };
00077
00079 pthread_key_t perThreadKey;
00080
00082 bool __in_shutdown = false;
00083
00085 bool __DoInitCompleted = false;
00086
00088 bool __DoStopCompleted = false;
00089
00091 map<string, OServiceEntry> _objectsEntryList;
00092
00094 static list< pair<string, string> > connectionStrings;
00095
00096
00098 static bool load_module(const char* name)
00099 {
00100 char *path = resolve_case_insensitive_path(name);
00101
00102 string tmp_mod_name = path;
00103 tmp_mod_name += ".tmp";
00104
00105 int ret = gunzip_module(path, tmp_mod_name.c_str());
00106 free(path);
00107
00108 if (ret != Z_OK) {
00109 cerr << "Couldn't gunzip the module properly. Error " << ret << ": " << zError(ret) << endl;
00110 return false;
00111 }
00112
00113
00114 char cwd[PATH_MAX];
00115 if (getcwd(cwd, sizeof(cwd))) {
00116 tmp_mod_name = cwd + string("/") + tmp_mod_name;
00117 }
00118
00119 size_t id = ThreadsList.size();
00120 perThreadStruct* data = new perThreadStruct(id);
00121 executorArg* arg = new executorArg(tmp_mod_name, data);
00122
00123 if (pthread_create(&(data->threadId), NULL, module_executor, arg)) {
00124 cerr << "Couldn't create a new thread to load the module" << endl;
00125 return false;
00126 }
00127
00128 cout << "Loaded module: " << name << " (thread id: " << data->threadId << "::" << id << ")" << endl;
00129
00130
00131
00132 ThreadsList.push_back(data);
00133
00134 return true;
00135 }
00136
00137
00139 static bool parse_object_cfg(void)
00140 {
00141 char *path = resolve_case_insensitive_path("MS/OPEN-R/MW/CONF/OBJECT.CFG");
00142 fstream objectsFile(path);
00143 free(path);
00144
00145 if (!objectsFile) {
00146 cerr << "Couldn't open the OBJECT.CFG file" << endl;
00147 return false;
00148 }
00149
00150
00151 string line;
00152 bool mySection = true;
00153
00154 while(getNonEmptyLine(objectsFile, line, true, &mySection)) {
00155 if (!load_module(line.c_str() + 1)) {
00156 return false;
00157 }
00158 }
00159 return true;
00160 }
00161
00162
00164 static bool parse_connect_cfg(void)
00165 {
00166 char *path = resolve_case_insensitive_path("MS/OPEN-R/MW/CONF/CONNECT.CFG");
00167 fstream file(path);
00168 free(path);
00169
00170 if (!file) {
00171
00172 return true;
00173 }
00174
00175 string line;
00176 while(getNonEmptyLine(file, line)) {
00177 size_t i = line.find_first_of(" \t");
00178 size_t j = line.find_first_not_of(" \t", i);
00179 if (i == string::npos || j == string::npos) {
00180 cerr << "malformed CONNECT.CFG file" << endl;
00181 return false;
00182 }
00183
00184 string subject(line, 0, i);
00185 string observer(line, j);
00186 cout << "connect: " << subject << " --> " << observer << endl;
00187 connectionStrings.push_back(pair<string,string>(subject, observer));
00188 }
00189
00190 return true;
00191 }
00192
00193
00195 static void sendConnectMessages(void)
00196 {
00197 for(list< pair<string, string> >::iterator it = connectionStrings.begin(); it != connectionStrings.end(); ++it) {
00198
00199 if (_objectsEntryList.find(it->first) == _objectsEntryList.end() ||
00200 _objectsEntryList.find(it->second) == _objectsEntryList.end()) {
00201 continue;
00202 }
00203
00204 OServiceEntry subject = _objectsEntryList[it->first];
00205 OServiceEntry observer = _objectsEntryList[it->second];
00206 OServiceEntry subjectFrom = subject;
00207 OServiceEntry observerFrom = observer;
00208
00209 list< std::pair<int,int> >& l = ThreadsList[(size_t)subject.GetOID().GetAddress()]->ConnectHandlers;
00210 list< std::pair<int,int> >& l2 = ThreadsList[(size_t)observer.GetOID().GetAddress()]->ConnectHandlers;
00211
00212
00213 for (list< std::pair<int,int> >::iterator it2 = l.begin(); it2 != l.end(); ++it2) {
00214 if (it2->first == (int)subject.GetSelector()) {
00215 subjectFrom.selector = it2->second;
00216 break;
00217 }
00218 }
00219
00220 for (list< std::pair<int,int> >::iterator it2 = l2.begin(); it2 != l2.end(); ++it2) {
00221 if (it2->first == (int)observer.GetSelector()) {
00222 observerFrom.selector = it2->second;
00223 break;
00224 }
00225 }
00226
00227
00228
00229 OControlMessage *msg1 = new OControlMessage(objcommCMD_ADDOBSERVER_V22, observerFrom, 0, 1, 1, it->first.c_str());
00230 strcpy(msg1->subName, it->first.c_str());
00231 _sendMessage(subject, msg1);
00232
00233
00234 OConnectMessage *msg2 = new OConnectMessage(oconnectcmdCONNECT);
00235 msg2->entry = subjectFrom;
00236 strcpy(msg2->subName, it->second.c_str());
00237 _sendMessage(observer, msg2);
00238 }
00239 }
00240
00241
00243 static void* listen_socket(void* _sock)
00244 {
00245 BLOCK_SIGNALS()
00246
00247 int *fd_main = &(((socketList_t*)_sock)->fd_main);
00248 int *fd_client = &(((socketList_t*)_sock)->fd_client);
00249 FILE **fp_client = &(((socketList_t*)_sock)->fp_client);
00250 const char *name = ((socketList_t*)_sock)->name;
00251
00252 CHECK_SYSCALL(*fd_main = socket(AF_UNIX, SOCK_STREAM, 0), "socket() failed")
00253
00254 struct sockaddr_un addr;
00255 socklen_t sizeof_addr = sizeof(addr);
00256 addr.sun_family = AF_UNIX;
00257 strcpy(addr.sun_path, name);
00258
00259 unlink(name);
00260 CHECK_SYSCALL(bind(*fd_main, (const struct sockaddr*)&addr, sizeof(addr)), "bind() failed")
00261 CHECK_SYSCALL(listen(*fd_main, 1), "listen() failed")
00262
00263
00264 do {
00265 int tmp = *fd_client;
00266 CHECK_SYSCALL(*fd_client = accept(*fd_main, (struct sockaddr*)&addr, &sizeof_addr), "accept() failed")
00267 if (*fp_client) fclose(*fp_client);
00268 CLOSE_SOCKET(tmp)
00269 *fp_client = fdopen(*fd_client, "r+");
00270 } while(1);
00271
00272 return NULL;
00273 }
00274
00275
00277 static void connect_to_actuators_server()
00278 {
00279 struct sockaddr_un addr;
00280 addr.sun_family = AF_UNIX;
00281 strcpy(addr.sun_path, ACTUATORS_SERVER_SOCKET);
00282
00283 int s = socket(AF_UNIX, SOCK_STREAM, 0);
00284
00285 if (connect(s, (const sockaddr*)&addr, sizeof(addr)) < 0) {
00286 cerr << "not connected to the actuators server" << endl;
00287 } else {
00288 socketList[ACTUATORS].fd_client = s;
00289 cout << "connected to the actuators server" << endl;
00290 }
00291 }
00292
00293
00295 static void cleanup(void)
00296 {
00297 cout << "Start Shutdown procedures..." << endl;
00298
00299 __in_shutdown = true;
00300 __DoInitCompleted = true;
00301
00302
00303 OPENR::Shutdown(OBootCondition(obcbPAUSE_SW));
00304
00305
00306 for(vector<perThreadStruct*>::iterator it = ThreadsList.begin(); it != ThreadsList.end(); ++it) {
00307 sem_post(&((*it)->sem));
00308 }
00309
00310
00311 for(vector<perThreadStruct*>::iterator it = ThreadsList.begin(); it != ThreadsList.end(); ++it) {
00312 while ((*it)->DoStopCompleted == false && (*it)->exited == false) {
00313 opensdk_yield();
00314 }
00315 }
00316 __DoStopCompleted = true;
00317
00318
00319 struct timespec maxTime;
00320 maxTime.tv_sec = time(NULL) + MAX_SHUTDOWN_TIME;
00321 maxTime.tv_nsec = 0;
00322
00323 for(vector<perThreadStruct*>::iterator it = ThreadsList.begin(); it != ThreadsList.end(); ++it) {
00324 (*it)->exited or pthread_cond_timedwait(&((*it)->cond), &((*it)->cond_mutex), &maxTime);
00325
00326
00327 if (pthread_cancel((*it)->threadId) && pthread_join((*it)->threadId, NULL)) {
00328 cerr << "failed to stop module thread " << (*it)->threadId << endl;
00329 }
00330 delete (*it);
00331 }
00332
00333
00334 OpenSdkThread::cleanThreadPool();
00335
00336
00337 while(!skThreadsList.empty()) {
00338 if (pthread_cancel(skThreadsList.top())) {
00339 cerr << "failed to stop socket thread " << (void*)skThreadsList.top() << endl;
00340 }
00341 skThreadsList.pop();
00342 }
00343
00344
00345 while(!dlopenHandlers.empty()) {
00346
00347
00348
00349
00350
00351 dlopenHandlers.pop();
00352 }
00353
00354 for (unsigned int i=0; i<sizeof(socketList)/sizeof(*socketList); ++i) {
00355 CLOSE_SOCKET(socketList[i].fd_main)
00356 CLOSE_SOCKET(socketList[i].fd_client)
00357 unlink(socketList[i].name);
00358 }
00359 }
00360
00361
00363 static void sig_handler(int sig)
00364 {
00365
00366 int exit_code;
00367 switch(sig) {
00368 case SIGINT:
00369 case SIGQUIT:
00370 exit_code = 0;
00371 break;
00372 default:
00373 exit_code = sig + 100;
00374 }
00375 exit(exit_code);
00376 }
00377
00378
00379 int main(int argc, char *argv[])
00380 {
00381 bool run_with_chroot = true;
00382
00383 while (--argc) {
00384 ++argv;
00385
00386 if (!strcmp(*argv, "--help") || !strcmp(*argv, "-h")) {
00387 cout << "Usage: ./OPENRloader [options] [Directory Root]" << endl << endl
00388 << "Available options:" << endl
00389 << "--chroot=[yes|no]\tRun with or without chroot. defaults to yes" << endl
00390 << endl;
00391 return 0;
00392 }
00393
00394 if (!strncmp(*argv, "--chroot=", sizeof("--chroot=")-1)) {
00395 run_with_chroot = (argv[0][sizeof("--chroot=")-1] == 'y');
00396
00397 } else if (chdir(*argv) == -1) {
00398 perror("Couldn't change the working directory");
00399 return 1;
00400 }
00401 }
00402
00403 if (run_with_chroot) {
00404
00405 if (chroot(".") == 0) {
00406 cout << "Running with chroot()" << endl;
00407 } else {
00408 cerr << "chroot failed" << endl;
00409 }
00410 } else {
00411 cout << "not running with chroot()" << endl;
00412 }
00413
00414
00415 cout << "Starting boot procedures for " ROBOTDESIGN "..." << endl << endl;
00416
00417
00418 atexit(cleanup);
00419 signal(SIGINT, sig_handler);
00420 signal(SIGQUIT, sig_handler);
00421 signal(SIGTERM, sig_handler);
00422
00423 BLOCK_SIGNAL(SIGALRM);
00424
00425
00426 OpenSdkThread::initThreadPool();
00427
00428
00429 if (pthread_key_create(&perThreadKey, NULL)) {
00430 cerr << "pthread_key_create() failed" << endl;
00431 return 2;
00432 }
00433
00434
00435 CREATE_SOCKET_THREAD((void*)&socketList[SENSORS])
00436 CREATE_SOCKET_THREAD((void*)&socketList[IMAGE])
00437 connect_to_actuators_server();
00438 opensdk_yield();
00439
00440 if (!parse_connect_cfg()) {
00441 return 3;
00442 }
00443
00444
00445 init_virtual_robot_comm();
00446
00447 if (!parse_object_cfg()) {
00448 return 4;
00449 }
00450
00451
00452 for(vector<perThreadStruct*>::iterator it = ThreadsList.begin(); it != ThreadsList.end(); ++it) {
00453 while ((*it)->DoInitCompleted == false && (*it)->exited == false) {
00454 opensdk_yield();
00455 }
00456 }
00457
00458
00459 sendConnectMessages();
00460
00461
00462 __DoInitCompleted = true;
00463
00464
00465 pthread_mutex_t dummy = PTHREAD_MUTEX_INITIALIZER;
00466 pthread_mutex_lock(&dummy);
00467 pthread_mutex_lock(&dummy);
00468
00469 return 0;
00470 }