loader/main.cc

Go to the documentation of this file.
00001 /*
00002  * This file is part of openSDK.
00003  *
00004  * Copyright (C) 2006-2007 openSDK team
00005  *
00006  * openSDK is free software; you can redistribute it and/or modify
00007  * it under the terms of the GNU General Public License as published by
00008  * the Free Software Foundation; version 2.
00009  *
00010  * openSDK is distributed in the hope that it will be useful,
00011  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00012  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
00013  * GNU General Public License for more details.
00014  *
00015  * You should have received a copy of the GNU General Public License
00016  * along with openSDK; if not, write to the Free Software
00017  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
00018  *
00019  *     $Id: main.cc,v 1.56 2007/07/16 13:09:00 nuno-lopes Exp $
00020  */
00021 
00022 #include <iostream>
00023 #include <stack>
00024 #include <vector>
00025 
00026 #include <cstring>
00027 #include <cstdlib>
00028 #include <cstdio>
00029 #include <time.h>
00030 #include <unistd.h>
00031 #include <dlfcn.h>
00032 #include <pthread.h>
00033 #include <sys/types.h>
00034 #include <sys/stat.h>
00035 #include <fcntl.h>
00036 #include <sys/socket.h>
00037 #include <sys/un.h>
00038 #include <signal.h>
00039 
00040 #include <Platform.h>
00041 #include <ModuleData.h>
00042 #include <OpenSdkThread.h>
00043 #include <utils.h>
00044 #include "main.h"
00045 #include "helper.h"
00046 #include "gunzip.h"
00047 #include "OVirtualRobotComm.h"
00048 #include "opensdkAPI.h"
00049 
00050 #include <OPENR/ObjcommMessages.h>
00051 #include <OPENR/OPENRAPI.h>
00052 
00053 using namespace std;
00054 
00055 #define CHECK_SYSCALL(val, msg) if ((val) == -1) { perror(msg); exit(4); }
00056 #define CLOSE_SOCKET(sock) if (sock != -1) { close(sock); sock = -1; }
00057 #define CREATE_SOCKET_THREAD(sock) \
00058         { \
00059                 pthread_t myThread; \
00060                 if (pthread_create(&myThread, NULL, listen_socket, sock)) { \
00061                         cerr << "Couldn't create a new thread to load servers" << endl; \
00062                         return 5; \
00063                 } \
00064                 skThreadsList.push(myThread); \
00065         }
00066 
00067 static stack<void*> dlopenHandlers;     // dlopen()'ed librarys
00068 vector<perThreadStruct*> ThreadsList; // modules threads
00069 static stack<pthread_t> skThreadsList;  // socket related threads
00070 
00072 socketList_t socketList[]= {
00073         {-1, -1, NULL, ACTUATORS_SERVER_SOCKET},
00074         {-1, -1, NULL, SENSORS_SERVER_SOCKET},
00075         {-1, -1, NULL, IMAGE_SERVER_SOCKET},
00076 };
00077 
00079 pthread_key_t perThreadKey;
00080 
00082 bool __in_shutdown = false;
00083 
00085 bool __DoInitCompleted = false;
00086 
00088 bool __DoStopCompleted = false;
00089 
00091 map<string, OServiceEntry> _objectsEntryList;
00092 
00094 static list< pair<string, string> > connectionStrings;
00095 
00096 
00098 static bool load_module(const char* name)
00099 {
00100         char *path = resolve_case_insensitive_path(name);
00101 
00102         string tmp_mod_name = path;
00103         tmp_mod_name += ".tmp";
00104 
00105         int ret = gunzip_module(path, tmp_mod_name.c_str());
00106         free(path);
00107 
00108         if (ret != Z_OK) {
00109                 cerr << "Couldn't gunzip the module properly. Error " << ret << ": " << zError(ret) << endl;
00110                 return false;
00111         }
00112 
00113         // compatibility with GDB: prepend the current working dir so that it is able to load the debug symbols
00114         char cwd[PATH_MAX];
00115         if (getcwd(cwd, sizeof(cwd))) {
00116                 tmp_mod_name = cwd + string("/") + tmp_mod_name;
00117         }
00118 
00119         size_t id = ThreadsList.size();
00120         perThreadStruct* data = new perThreadStruct(id); // deleted in cleanup()
00121         executorArg* arg      = new executorArg(tmp_mod_name, data); // deleted in helper.cc
00122 
00123         if (pthread_create(&(data->threadId), NULL, module_executor, arg)) {
00124                 cerr << "Couldn't create a new thread to load the module" << endl;
00125                 return false;
00126         }
00127 
00128         cout << "Loaded module: " << name << " (thread id: " << data->threadId << "::" << id << ")" << endl;
00129 
00130         // for future cleanup
00131 //      dlopenHandlers.push(handle);
00132         ThreadsList.push_back(data);
00133 
00134         return true;
00135 }
00136 
00137 
00139 static bool parse_object_cfg(void)
00140 {
00141         char *path = resolve_case_insensitive_path("MS/OPEN-R/MW/CONF/OBJECT.CFG");
00142         fstream objectsFile(path);
00143         free(path);
00144 
00145         if (!objectsFile) {
00146                 cerr << "Couldn't open the OBJECT.CFG file" << endl;
00147                 return false;
00148         }
00149 
00150         //parse the file and load the objects
00151         string line;
00152         bool mySection = true; // starts on a top level section
00153 
00154         while(getNonEmptyLine(objectsFile, line, true, &mySection)) {
00155                 if (!load_module(line.c_str() + 1)) {
00156                         return false;
00157                 }
00158         }
00159         return true;
00160 }
00161 
00162 
00164 static bool parse_connect_cfg(void)
00165 {
00166         char *path = resolve_case_insensitive_path("MS/OPEN-R/MW/CONF/CONNECT.CFG");
00167         fstream file(path);
00168         free(path);
00169 
00170         if (!file) {
00171                 // the file is optional
00172                 return true;
00173         }
00174 
00175         string line;
00176         while(getNonEmptyLine(file, line)) {
00177                 size_t i = line.find_first_of(" \t");
00178                 size_t j = line.find_first_not_of(" \t", i);
00179                 if (i == string::npos || j == string::npos) {
00180                         cerr << "malformed CONNECT.CFG file" << endl;
00181                         return false;
00182                 }
00183 
00184                 string subject(line, 0, i);
00185                 string observer(line, j);
00186                 cout << "connect: " << subject << " --> " << observer << endl;
00187                 connectionStrings.push_back(pair<string,string>(subject, observer));
00188         }
00189 
00190         return true;
00191 }
00192 
00193 
00195 static void sendConnectMessages(void)
00196 {
00197         for(list< pair<string, string> >::iterator it = connectionStrings.begin(); it != connectionStrings.end(); ++it) {
00198 
00199                 if (_objectsEntryList.find(it->first) == _objectsEntryList.end() ||
00200                 _objectsEntryList.find(it->second) == _objectsEntryList.end()) {
00201                         continue;
00202                 }
00203 
00204                 OServiceEntry subject      = _objectsEntryList[it->first];
00205                 OServiceEntry observer     = _objectsEntryList[it->second];
00206                 OServiceEntry subjectFrom  = subject;
00207                 OServiceEntry observerFrom = observer;
00208 
00209                 list< std::pair<int,int> >& l = ThreadsList[(size_t)subject.GetOID().GetAddress()]->ConnectHandlers;
00210                 list< std::pair<int,int> >& l2 = ThreadsList[(size_t)observer.GetOID().GetAddress()]->ConnectHandlers;
00211 
00212                 // find the right selectors
00213                 for (list< std::pair<int,int> >::iterator it2 = l.begin(); it2 != l.end(); ++it2) {
00214                         if (it2->first == (int)subject.GetSelector()) {
00215                                 subjectFrom.selector = it2->second;
00216                                 break;
00217                         }
00218                 }
00219 
00220                 for (list< std::pair<int,int> >::iterator it2 = l2.begin(); it2 != l2.end(); ++it2) {
00221                         if (it2->first == (int)observer.GetSelector()) {
00222                                 observerFrom.selector = it2->second;
00223                                 break;
00224                         }
00225                 }
00226 
00227 
00228                 // send msg to the subject
00229                 OControlMessage *msg1 = new OControlMessage(objcommCMD_ADDOBSERVER_V22, observerFrom, 0, 1, 1, it->first.c_str()); // TODO: fix min/max/skip
00230                 strcpy(msg1->subName, it->first.c_str());
00231                 _sendMessage(subject, msg1);
00232 
00233                 // send msg to the observer
00234                 OConnectMessage *msg2 = new OConnectMessage(oconnectcmdCONNECT);
00235                 msg2->entry = subjectFrom;
00236                 strcpy(msg2->subName, it->second.c_str());
00237                 _sendMessage(observer, msg2);
00238         }
00239 }
00240 
00241 
00243 static void* listen_socket(void* _sock)
00244 {
00245         BLOCK_SIGNALS()
00246 
00247         int *fd_main = &(((socketList_t*)_sock)->fd_main);
00248         int *fd_client = &(((socketList_t*)_sock)->fd_client);
00249         FILE **fp_client = &(((socketList_t*)_sock)->fp_client);
00250         const char *name = ((socketList_t*)_sock)->name;
00251 
00252         CHECK_SYSCALL(*fd_main = socket(AF_UNIX, SOCK_STREAM, 0), "socket() failed")
00253 
00254         struct sockaddr_un addr;
00255         socklen_t sizeof_addr = sizeof(addr);
00256         addr.sun_family = AF_UNIX;
00257         strcpy(addr.sun_path, name);
00258 
00259         unlink(name);
00260         CHECK_SYSCALL(bind(*fd_main, (const struct sockaddr*)&addr, sizeof(addr)), "bind() failed")
00261         CHECK_SYSCALL(listen(*fd_main, 1), "listen() failed")
00262 
00263         // we only use one socket at a time. so we close the existing socket and keep the most recent
00264         do {
00265                 int tmp = *fd_client;
00266                 CHECK_SYSCALL(*fd_client = accept(*fd_main, (struct sockaddr*)&addr, &sizeof_addr), "accept() failed")
00267                 if (*fp_client) fclose(*fp_client);
00268                 CLOSE_SOCKET(tmp)
00269                 *fp_client = fdopen(*fd_client, "r+");
00270         } while(1);
00271 
00272         return NULL;
00273 }
00274 
00275 
00277 static void connect_to_actuators_server()
00278 {
00279         struct sockaddr_un addr;
00280         addr.sun_family = AF_UNIX;
00281         strcpy(addr.sun_path, ACTUATORS_SERVER_SOCKET);
00282 
00283         int s = socket(AF_UNIX, SOCK_STREAM, 0);
00284 
00285         if (connect(s, (const sockaddr*)&addr, sizeof(addr)) < 0) {
00286                 cerr << "not connected to the actuators server" << endl;
00287         } else {
00288                 socketList[ACTUATORS].fd_client = s;
00289                 cout << "connected to the actuators server" << endl;
00290         }
00291 }
00292 
00293 
00295 static void cleanup(void)
00296 {
00297         cout << "Start Shutdown procedures..." << endl;
00298 
00299         __in_shutdown = true;
00300         __DoInitCompleted = true; // fake it if it didn't complete
00301 
00302         // update the robot boot condition
00303         OPENR::Shutdown(OBootCondition(obcbPAUSE_SW));
00304 
00305         // let modules know that they should start exiting
00306         for(vector<perThreadStruct*>::iterator it = ThreadsList.begin(); it != ThreadsList.end(); ++it) {
00307                 sem_post(&((*it)->sem));
00308         }
00309 
00310         // check if all modules have already run its DoInit() method
00311         for(vector<perThreadStruct*>::iterator it = ThreadsList.begin(); it != ThreadsList.end(); ++it) {
00312                 while ((*it)->DoStopCompleted == false && (*it)->exited == false) {
00313                         opensdk_yield();
00314                 }
00315         }
00316         __DoStopCompleted = true;
00317 
00318         // shutdown modules (max 30 seconds)
00319         struct timespec maxTime;
00320         maxTime.tv_sec  = time(NULL) + MAX_SHUTDOWN_TIME;
00321         maxTime.tv_nsec = 0;
00322 
00323         for(vector<perThreadStruct*>::iterator it = ThreadsList.begin(); it != ThreadsList.end(); ++it) {
00324                 (*it)->exited or pthread_cond_timedwait(&((*it)->cond), &((*it)->cond_mutex), &maxTime);
00325 
00326                 // we call both _cancel and _join, because _cancel will fail if the thread has already exited
00327                 if (pthread_cancel((*it)->threadId) && pthread_join((*it)->threadId, NULL)) {
00328                         cerr << "failed to stop module thread " << (*it)->threadId << endl;
00329                 }
00330                 delete (*it);
00331         }
00332 
00333         // ask each one of the threads of the pool to quit
00334         OpenSdkThread::cleanThreadPool();
00335 
00336         // cleanup socket threads resources
00337         while(!skThreadsList.empty()) {
00338                 if (pthread_cancel(skThreadsList.top())) {
00339                         cerr << "failed to stop socket thread " << (void*)skThreadsList.top() << endl;
00340                 }
00341                 skThreadsList.pop();
00342         }
00343 
00344         // unload modules
00345         while(!dlopenHandlers.empty()) {
00346                 // compatibility with valgrind: don't close dlopen'ed libraries so that it is able to generate
00347                 // useful memory leakage reports
00348                 /* if (dlclose(dlopenHandlers.top())) {
00349                         cerr << "failed to close library " << dlopenHandlers.top() << endl;
00350                 } */
00351                 dlopenHandlers.pop();
00352         }
00353 
00354         for (unsigned int i=0; i<sizeof(socketList)/sizeof(*socketList); ++i) {
00355                 CLOSE_SOCKET(socketList[i].fd_main)
00356                 CLOSE_SOCKET(socketList[i].fd_client)
00357                 unlink(socketList[i].name);
00358         }
00359 }
00360 
00361 
00363 static void sig_handler(int sig)
00364 {
00365         //cleanup();
00366         int exit_code;
00367         switch(sig) {
00368                 case SIGINT:
00369                 case SIGQUIT:
00370                         exit_code = 0;
00371                         break;
00372                 default:
00373                         exit_code = sig + 100;
00374         }
00375         exit(exit_code);
00376 }
00377 
00378 
00379 int main(int argc, char *argv[])
00380 {
00381         bool run_with_chroot = true;
00382 
00383         while (--argc) {
00384                 ++argv;
00385 
00386                 if (!strcmp(*argv, "--help") || !strcmp(*argv, "-h")) {
00387                         cout << "Usage: ./OPENRloader [options] [Directory Root]" << endl << endl
00388                         << "Available options:" << endl
00389                         << "--chroot=[yes|no]\tRun with or without chroot. defaults to yes" << endl
00390                         << endl;
00391                         return 0;
00392                 }
00393 
00394                 if (!strncmp(*argv, "--chroot=", sizeof("--chroot=")-1)) {
00395                         run_with_chroot = (argv[0][sizeof("--chroot=")-1] == 'y');
00396 
00397                 } else if (chdir(*argv) == -1) {
00398                         perror("Couldn't change the working directory");
00399                         return 1;
00400                 }
00401         }
00402 
00403         if (run_with_chroot) {
00404                 // try to chroot, so that fopen() calls in the client code work correctly
00405                 if (chroot(".") == 0) {
00406                         cout << "Running with chroot()" << endl;
00407                 } else {
00408                         cerr << "chroot failed" << endl;
00409                 }
00410         } else {
00411                 cout << "not running with chroot()" << endl;
00412         }
00413 
00414         // now we are ready to start loading the files
00415         cout << "Starting boot procedures for " ROBOTDESIGN "..." << endl << endl;
00416 
00417         // register handler to cleanup the mess as shutdown
00418         atexit(cleanup);
00419         signal(SIGINT,  sig_handler);
00420         signal(SIGQUIT, sig_handler);
00421         signal(SIGTERM, sig_handler);
00422 
00423         BLOCK_SIGNAL(SIGALRM);
00424 
00425         //Initialize OpenSdkThread pool
00426         OpenSdkThread::initThreadPool();
00427 
00428         // create per-thread data
00429         if (pthread_key_create(&perThreadKey, NULL)) {
00430                 cerr << "pthread_key_create() failed" << endl;
00431                 return 2;
00432         }
00433 
00434         // create the sockets for communications with servers
00435         CREATE_SOCKET_THREAD((void*)&socketList[SENSORS])
00436         CREATE_SOCKET_THREAD((void*)&socketList[IMAGE])
00437         connect_to_actuators_server();
00438         opensdk_yield(); // really create the servers before loading modules
00439 
00440         if (!parse_connect_cfg()) {
00441                 return 3;
00442         }
00443 
00444         // init the thread that implements the OVirtualRobotComm system object
00445         init_virtual_robot_comm();
00446 
00447         if (!parse_object_cfg()) {
00448                 return 4;
00449         }
00450 
00451         // check if all modules have already run its DoInit() method
00452         for(vector<perThreadStruct*>::iterator it = ThreadsList.begin(); it != ThreadsList.end(); ++it) {
00453                 while ((*it)->DoInitCompleted == false && (*it)->exited == false) {
00454                         opensdk_yield();
00455                 }
00456         }
00457 
00458         // connect objects
00459         sendConnectMessages();
00460 
00461         // now, objects can run DoStart() safely
00462         __DoInitCompleted = true;
00463 
00464         // lock this thread, as it isn't needed anymore
00465         pthread_mutex_t dummy = PTHREAD_MUTEX_INITIALIZER;
00466         pthread_mutex_lock(&dummy);
00467         pthread_mutex_lock(&dummy);
00468 
00469         return 0;
00470 }

Generated on Sun Dec 2 23:04:29 2007 for openSDK by  doxygen 1.3.9.1