//file: posix.cc //A pseudo omnithread. Use the same name of functions in omni_thread but implement //them by SGI's sproc multi-threading scheme. The purpose of doing this is to make //omniORB compatible with OpenMP. //Created by Song Li 06-06-2003 #include #include #include "omnithread.h" //#include "MemPoolSGI.h" //#define THROW_ERRORS(x) {if ((x) != 1) throw omni_thread_fatal(error); } #define DB(x) #define ERRNO(x) (((x) != 0) ? (errno) : 0) /////////////////////////////////////////////////////////////////////////// // Arena /////////////////////////////////////////////////////////////////////////// inline Arena::Arena(void) { char* arenafile; usconfig(CONF_LOCKTYPE, US_NODEBUG); usconfig(CONF_ARENATYPE, US_SHAREDONLY); arenafile = tmpnam(NULL); arena = usinit(arenafile); } inline Arena::Arena(usptr_t* src) { arena = src; } inline usptr_t* Arena::get_arena() { return(arena); } inline Arena::operator usptr_t*(void) { return(arena); } Arena* omni_mutex::arena; Arena* omni_condition::arena; Arena* omni_semaphore::arena; /////////////////////////////////////////////////////////////////////////// // // Mutex // /////////////////////////////////////////////////////////////////////////// omni_mutex::omni_mutex(void) { this->arena = new Arena(); mutex = usnewlock(this->arena->get_arena()); } omni_mutex::~omni_mutex(void) { usfreelock(mutex, this->arena->get_arena()); delete this->arena; } void omni_mutex::lock(void) { if(ussetlock(mutex) != 1) throw omni_thread_fatal(errno); } void omni_mutex::unlock(void) { if(usunsetlock(mutex) != 0) { throw omni_thread_fatal(errno); } } /////////////////////////////////////////////////////////////////////////// // // Condition variable // /////////////////////////////////////////////////////////////////////////// omni_condition::omni_condition(omni_mutex* m) : mutex(m) { this->arena = new Arena(); cond = usnewpollsema(this->arena->get_arena(),0); sd = usopenpollsema(cond, S_IRUSR | S_IWUSR); } omni_condition::~omni_condition(void) { broadcast(); usclosepollsema(cond); usfreepollsema(cond,this->arena); delete this->arena; } void omni_condition::wait(void) { int result; //mutex->lock(); do { if((result = uspsema(cond)) == 0) { struct pollfd fds; fds.fd =sd; fds.events = POLLIN; fds.revents = 0; mutex->unlock(); switch (poll(&fds, 1, -1)) { case 0: result = ETIMEDOUT;break; case -1: result = -1; break; default: result = 0; break; }; mutex->lock(); }; } while (result == 0); //mutex->unlock(); } int omni_condition::timedwait(unsigned long secs, unsigned long nanosecs) { int result; timespec abs; clock_gettime(CLOCK_REALTIME, &abs); //mutex->lock(); do { struct pollfd fds; fds.fd = sd; fds.events = POLLIN; fds.revents = 0; mutex->unlock(); switch(poll(&fds, 1, secs-abs.tv_sec)) { case 0: result = ETIMEDOUT; break; case 1: result = -1; break; default: result = 0; break; }; mutex->lock(); }while (result == 0); //mutex->unlock(); return result; } void omni_condition::signal(void) { if(ustestsema(cond) < 0) std::cout<arena = new Arena(); sema = usnewsema(this->arena->get_arena(), value); } omni_semaphore::~omni_semaphore(void) { usfreesema(sema, this->arena); delete this->arena; } void omni_semaphore::wait(void) { omni_mutex_lock l(m); if(uspsema(sema) < 0) std::cout<<"omni_semaphore error in omni_semaphore::wait "<_state = STATE_RUNNING; t->posix_thread = getLocalThreadPtr(); DB(cerr << "initial thread " << t->id() << endl); //THROW_ERRORS(pthread_setspecific(self_key, (void*)t)); //setpriority(PRIO_PROCESS,mThreadPID,0); } // // Wrapper for thread creation. // extern "C" void* omni_thread_wrapper(void* ptr) { omni_thread* me = (omni_thread*)ptr; DB(cerr << "omni_thread_wrapper: thread " << me->id() << " started\n"); //THROW_ERRORS(pthread_setspecific(self_key, me)); // // Now invoke the thread function with the given argument. // if (me->fn_void != NULL) { (*me->fn_void)(me->thread_arg); omni_thread::exit(); } if (me->fn_ret != NULL) { void* return_value = (*me->fn_ret)(me->thread_arg); omni_thread::exit(return_value); } if (me->detached) { me->run(me->thread_arg); omni_thread::exit(); } else { void* return_value = me->run_undetached(me->thread_arg); omni_thread::exit(return_value); } // should never get here. return NULL; } // // Constructors for omni_thread - set up the thread object but don't // start it running. // // construct a detached thread running a given function. omni_thread::omni_thread(void (*fn)(void*), void* arg, priority_t pri) { common_constructor(arg, pri, 1); fn_void = fn; fn_ret = NULL; } // construct an undetached thread running a given function. omni_thread::omni_thread(void* (*fn)(void*), void* arg, priority_t pri) { common_constructor(arg, pri, 0); fn_void = NULL; fn_ret = fn; } // construct a thread which will run either run() or run_undetached(). omni_thread::omni_thread(void* arg, priority_t pri) { common_constructor(arg, pri, 1); fn_void = NULL; fn_ret = NULL; } // common part of all constructors. void omni_thread::common_constructor(void* arg, priority_t pri, int det) { _state = STATE_NEW; _priority = pri; next_id_mutex->lock(); _id = next_id++; next_id_mutex->unlock(); thread_arg = arg; detached = det; // may be altered in start_undetached() // posix_thread is set up in initialisation routine or start(). } // // Destructor for omni_thread. // omni_thread::~omni_thread(void) { DB(cerr << "destructor called for thread " << id() << endl); } // // Start the thread // void omni_thread::start(void) { mutex.lock(); if (_state != STATE_NEW) throw omni_thread_invalid(); //mutex.lock(); setLocalThreadPtr(this); prctl(PR_SETEXITSIG, 0); prctl(PR_TERMCHILD); mutex.unlock(); mThreadPID = sproc(thread_func_t(&omni_thread_wrapper),PR_SALL,(void*)this); _state = STATE_RUNNING; } // // Start a thread which will run the member function run_undetached(). // void omni_thread::start_undetached(void) { if ((fn_void != NULL) || (fn_ret != NULL)) throw omni_thread_invalid(); detached = 0; start(); } // // join - simply check error conditions & call pthread_join. // void omni_thread::join(void** arg) { int status; pid_t pid; if ((_state != STATE_RUNNING) && (_state != STATE_TERMINATED)) { mutex.unlock(); throw omni_thread_invalid(); } mutex.unlock(); if (this == self()) throw omni_thread_invalid(); if (detached) throw omni_thread_invalid(); do { pid = ::waitpid(mThreadPID, &status, 0); } while ( WIFSTOPPED(status) != 0 ); if ( pid > -1 ) { if ( WIFEXITED(status) != 0 && arg != NULL ) { **((int**) arg) = WEXITSTATUS(status); } else if ( WIFSIGNALED(status) != 0 && arg != NULL ) { **((int**) arg) = WTERMSIG(status); } } delete this; } // // Change this thread's priority. // void omni_thread::set_priority(priority_t pri) { omni_mutex_lock l(mutex); if (_state != STATE_RUNNING) throw omni_thread_invalid(); _priority = pri; setpriority(PRIO_PROCESS,mThreadPID,pri); } // // create - construct a new thread object and start it running. Returns thread // object if successful, null pointer if not. // omni_thread* omni_thread::create(void (*fn)(void*), void* arg, priority_t pri) { omni_thread* t = new omni_thread(fn, arg, pri); t->start(); return t; } // undetached version omni_thread* omni_thread::create(void* (*fn)(void*), void* arg, priority_t pri) { omni_thread* t = new omni_thread(fn, arg, pri); t->start(); return t; } void omni_thread::exit(void* return_value) { omni_thread* me = self(); if (me) { me->mutex.lock(); me->_state = STATE_TERMINATED; me->mutex.unlock(); DB(cerr << "omni_thread::exit: thread " << me->id() << " detached " << me->detached << " return value " << return_value << endl); if (me->detached) delete me; } else { DB(cerr << "omni_thread::exit: called with a non-omnithread. Exit quietly." << endl); } //pthread_exit(return_value); } omni_thread* omni_thread::self(void) { return getLocalThreadPtr(); } void omni_thread::yield(void) { sginap(0); } void omni_thread::sleep(unsigned long secs, unsigned long nanosecs) { timespec rqts = { secs, nanosecs }; timespec remain; while (nanosleep(&rqts, &remain)) { if (errno == EINTR) { rqts.tv_sec = remain.tv_sec; rqts.tv_nsec = remain.tv_nsec; continue; } else throw omni_thread_fatal(errno); } } void omni_thread::get_time(unsigned long* abs_sec, unsigned long* abs_nsec, unsigned long rel_sec, unsigned long rel_nsec) { timespec abs; clock_gettime(CLOCK_REALTIME, &abs); abs.tv_nsec += rel_nsec; abs.tv_sec += rel_sec + abs.tv_nsec / 1000000000; abs.tv_nsec = abs.tv_nsec % 1000000000; *abs_sec = abs.tv_sec; *abs_nsec = abs.tv_nsec; } int omni_thread::posix_priority(priority_t pri) { return 1; //all normal priority } void omni_thread::stacksize(unsigned long sz) { stack_size = sz; } unsigned long omni_thread::stacksize() { return stack_size; }