src/share/vm/utilities/workgroup.cpp

Mon, 09 Aug 2010 17:51:56 -0700

author
never
date
Mon, 09 Aug 2010 17:51:56 -0700
changeset 2044
f4f596978298
parent 1907
c18cbe5936b8
child 2188
8b10f48633dc
permissions
-rw-r--r--

Merge

     1 /*
     2  * Copyright (c) 2001, 2007, Oracle and/or its affiliates. All rights reserved.
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     4  *
     5  * This code is free software; you can redistribute it and/or modify it
     6  * under the terms of the GNU General Public License version 2 only, as
     7  * published by the Free Software Foundation.
     8  *
     9  * This code is distributed in the hope that it will be useful, but WITHOUT
    10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
    12  * version 2 for more details (a copy is included in the LICENSE file that
    13  * accompanied this code).
    14  *
    15  * You should have received a copy of the GNU General Public License version
    16  * 2 along with this work; if not, write to the Free Software Foundation,
    17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
    18  *
    19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
    20  * or visit www.oracle.com if you need additional information or have any
    21  * questions.
    22  *
    23  */
    25 # include "incls/_precompiled.incl"
    26 # include "incls/_workgroup.cpp.incl"
    28 // Definitions of WorkGang methods.
    30 AbstractWorkGang::AbstractWorkGang(const char* name,
    31                                    bool  are_GC_task_threads,
    32                                    bool  are_ConcurrentGC_threads) :
    33   _name(name),
    34   _are_GC_task_threads(are_GC_task_threads),
    35   _are_ConcurrentGC_threads(are_ConcurrentGC_threads) {
    37   assert(!(are_GC_task_threads && are_ConcurrentGC_threads),
    38          "They cannot both be STW GC and Concurrent threads" );
    40   // Other initialization.
    41   _monitor = new Monitor(/* priority */       Mutex::leaf,
    42                          /* name */           "WorkGroup monitor",
    43                          /* allow_vm_block */ are_GC_task_threads);
    44   assert(monitor() != NULL, "Failed to allocate monitor");
    45   _terminate = false;
    46   _task = NULL;
    47   _sequence_number = 0;
    48   _started_workers = 0;
    49   _finished_workers = 0;
    50 }
    52 WorkGang::WorkGang(const char* name,
    53                    int         workers,
    54                    bool        are_GC_task_threads,
    55                    bool        are_ConcurrentGC_threads) :
    56   AbstractWorkGang(name, are_GC_task_threads, are_ConcurrentGC_threads)
    57 {
    58   // Save arguments.
    59   _total_workers = workers;
    61   if (TraceWorkGang) {
    62     tty->print_cr("Constructing work gang %s with %d threads", name, workers);
    63   }
    64   _gang_workers = NEW_C_HEAP_ARRAY(GangWorker*, workers);
    65   if (gang_workers() == NULL) {
    66     vm_exit_out_of_memory(0, "Cannot create GangWorker array.");
    67   }
    68   for (int worker = 0; worker < total_workers(); worker += 1) {
    69     GangWorker* new_worker = new GangWorker(this, worker);
    70     assert(new_worker != NULL, "Failed to allocate GangWorker");
    71     _gang_workers[worker] = new_worker;
    72     if (new_worker == NULL || !os::create_thread(new_worker, os::pgc_thread))
    73       vm_exit_out_of_memory(0, "Cannot create worker GC thread. Out of system resources.");
    74     if (!DisableStartThread) {
    75       os::start_thread(new_worker);
    76     }
    77   }
    78 }
    80 AbstractWorkGang::~AbstractWorkGang() {
    81   if (TraceWorkGang) {
    82     tty->print_cr("Destructing work gang %s", name());
    83   }
    84   stop();   // stop all the workers
    85   for (int worker = 0; worker < total_workers(); worker += 1) {
    86     delete gang_worker(worker);
    87   }
    88   delete gang_workers();
    89   delete monitor();
    90 }
    92 GangWorker* AbstractWorkGang::gang_worker(int i) const {
    93   // Array index bounds checking.
    94   GangWorker* result = NULL;
    95   assert(gang_workers() != NULL, "No workers for indexing");
    96   assert(((i >= 0) && (i < total_workers())), "Worker index out of bounds");
    97   result = _gang_workers[i];
    98   assert(result != NULL, "Indexing to null worker");
    99   return result;
   100 }
   102 void WorkGang::run_task(AbstractGangTask* task) {
   103   // This thread is executed by the VM thread which does not block
   104   // on ordinary MutexLocker's.
   105   MutexLockerEx ml(monitor(), Mutex::_no_safepoint_check_flag);
   106   if (TraceWorkGang) {
   107     tty->print_cr("Running work gang %s task %s", name(), task->name());
   108   }
   109   // Tell all the workers to run a task.
   110   assert(task != NULL, "Running a null task");
   111   // Initialize.
   112   _task = task;
   113   _sequence_number += 1;
   114   _started_workers = 0;
   115   _finished_workers = 0;
   116   // Tell the workers to get to work.
   117   monitor()->notify_all();
   118   // Wait for them to be finished
   119   while (finished_workers() < total_workers()) {
   120     if (TraceWorkGang) {
   121       tty->print_cr("Waiting in work gang %s: %d/%d finished sequence %d",
   122                     name(), finished_workers(), total_workers(),
   123                     _sequence_number);
   124     }
   125     monitor()->wait(/* no_safepoint_check */ true);
   126   }
   127   _task = NULL;
   128   if (TraceWorkGang) {
   129     tty->print_cr("/nFinished work gang %s: %d/%d sequence %d",
   130                   name(), finished_workers(), total_workers(),
   131                   _sequence_number);
   132     }
   133 }
   135 void AbstractWorkGang::stop() {
   136   // Tell all workers to terminate, then wait for them to become inactive.
   137   MutexLockerEx ml(monitor(), Mutex::_no_safepoint_check_flag);
   138   if (TraceWorkGang) {
   139     tty->print_cr("Stopping work gang %s task %s", name(), task()->name());
   140   }
   141   _task = NULL;
   142   _terminate = true;
   143   monitor()->notify_all();
   144   while (finished_workers() < total_workers()) {
   145     if (TraceWorkGang) {
   146       tty->print_cr("Waiting in work gang %s: %d/%d finished",
   147                     name(), finished_workers(), total_workers());
   148     }
   149     monitor()->wait(/* no_safepoint_check */ true);
   150   }
   151 }
   153 void AbstractWorkGang::internal_worker_poll(WorkData* data) const {
   154   assert(monitor()->owned_by_self(), "worker_poll is an internal method");
   155   assert(data != NULL, "worker data is null");
   156   data->set_terminate(terminate());
   157   data->set_task(task());
   158   data->set_sequence_number(sequence_number());
   159 }
   161 void AbstractWorkGang::internal_note_start() {
   162   assert(monitor()->owned_by_self(), "note_finish is an internal method");
   163   _started_workers += 1;
   164 }
   166 void AbstractWorkGang::internal_note_finish() {
   167   assert(monitor()->owned_by_self(), "note_finish is an internal method");
   168   _finished_workers += 1;
   169 }
   171 void AbstractWorkGang::print_worker_threads_on(outputStream* st) const {
   172   uint    num_thr = total_workers();
   173   for (uint i = 0; i < num_thr; i++) {
   174     gang_worker(i)->print_on(st);
   175     st->cr();
   176   }
   177 }
   179 void AbstractWorkGang::threads_do(ThreadClosure* tc) const {
   180   assert(tc != NULL, "Null ThreadClosure");
   181   uint num_thr = total_workers();
   182   for (uint i = 0; i < num_thr; i++) {
   183     tc->do_thread(gang_worker(i));
   184   }
   185 }
   187 // GangWorker methods.
   189 GangWorker::GangWorker(AbstractWorkGang* gang, uint id) {
   190   _gang = gang;
   191   set_id(id);
   192   set_name("Gang worker#%d (%s)", id, gang->name());
   193 }
   195 void GangWorker::run() {
   196   initialize();
   197   loop();
   198 }
   200 void GangWorker::initialize() {
   201   this->initialize_thread_local_storage();
   202   assert(_gang != NULL, "No gang to run in");
   203   os::set_priority(this, NearMaxPriority);
   204   if (TraceWorkGang) {
   205     tty->print_cr("Running gang worker for gang %s id %d",
   206                   gang()->name(), id());
   207   }
   208   // The VM thread should not execute here because MutexLocker's are used
   209   // as (opposed to MutexLockerEx's).
   210   assert(!Thread::current()->is_VM_thread(), "VM thread should not be part"
   211          " of a work gang");
   212 }
   214 void GangWorker::loop() {
   215   int previous_sequence_number = 0;
   216   Monitor* gang_monitor = gang()->monitor();
   217   for ( ; /* !terminate() */; ) {
   218     WorkData data;
   219     int part;  // Initialized below.
   220     {
   221       // Grab the gang mutex.
   222       MutexLocker ml(gang_monitor);
   223       // Wait for something to do.
   224       // Polling outside the while { wait } avoids missed notifies
   225       // in the outer loop.
   226       gang()->internal_worker_poll(&data);
   227       if (TraceWorkGang) {
   228         tty->print("Polled outside for work in gang %s worker %d",
   229                    gang()->name(), id());
   230         tty->print("  terminate: %s",
   231                    data.terminate() ? "true" : "false");
   232         tty->print("  sequence: %d (prev: %d)",
   233                    data.sequence_number(), previous_sequence_number);
   234         if (data.task() != NULL) {
   235           tty->print("  task: %s", data.task()->name());
   236         } else {
   237           tty->print("  task: NULL");
   238         }
   239         tty->cr();
   240       }
   241       for ( ; /* break or return */; ) {
   242         // Terminate if requested.
   243         if (data.terminate()) {
   244           gang()->internal_note_finish();
   245           gang_monitor->notify_all();
   246           return;
   247         }
   248         // Check for new work.
   249         if ((data.task() != NULL) &&
   250             (data.sequence_number() != previous_sequence_number)) {
   251           gang()->internal_note_start();
   252           gang_monitor->notify_all();
   253           part = gang()->started_workers() - 1;
   254           break;
   255         }
   256         // Nothing to do.
   257         gang_monitor->wait(/* no_safepoint_check */ true);
   258         gang()->internal_worker_poll(&data);
   259         if (TraceWorkGang) {
   260           tty->print("Polled inside for work in gang %s worker %d",
   261                      gang()->name(), id());
   262           tty->print("  terminate: %s",
   263                      data.terminate() ? "true" : "false");
   264           tty->print("  sequence: %d (prev: %d)",
   265                      data.sequence_number(), previous_sequence_number);
   266           if (data.task() != NULL) {
   267             tty->print("  task: %s", data.task()->name());
   268           } else {
   269             tty->print("  task: NULL");
   270           }
   271           tty->cr();
   272         }
   273       }
   274       // Drop gang mutex.
   275     }
   276     if (TraceWorkGang) {
   277       tty->print("Work for work gang %s id %d task %s part %d",
   278                  gang()->name(), id(), data.task()->name(), part);
   279     }
   280     assert(data.task() != NULL, "Got null task");
   281     data.task()->work(part);
   282     {
   283       if (TraceWorkGang) {
   284         tty->print("Finish for work gang %s id %d task %s part %d",
   285                    gang()->name(), id(), data.task()->name(), part);
   286       }
   287       // Grab the gang mutex.
   288       MutexLocker ml(gang_monitor);
   289       gang()->internal_note_finish();
   290       // Tell the gang you are done.
   291       gang_monitor->notify_all();
   292       // Drop the gang mutex.
   293     }
   294     previous_sequence_number = data.sequence_number();
   295   }
   296 }
   298 bool GangWorker::is_GC_task_thread() const {
   299   return gang()->are_GC_task_threads();
   300 }
   302 bool GangWorker::is_ConcurrentGC_thread() const {
   303   return gang()->are_ConcurrentGC_threads();
   304 }
   306 void GangWorker::print_on(outputStream* st) const {
   307   st->print("\"%s\" ", name());
   308   Thread::print_on(st);
   309   st->cr();
   310 }
   312 // Printing methods
   314 const char* AbstractWorkGang::name() const {
   315   return _name;
   316 }
   318 #ifndef PRODUCT
   320 const char* AbstractGangTask::name() const {
   321   return _name;
   322 }
   324 #endif /* PRODUCT */
   326 // *** WorkGangBarrierSync
   328 WorkGangBarrierSync::WorkGangBarrierSync()
   329   : _monitor(Mutex::safepoint, "work gang barrier sync", true),
   330     _n_workers(0), _n_completed(0), _should_reset(false) {
   331 }
   333 WorkGangBarrierSync::WorkGangBarrierSync(int n_workers, const char* name)
   334   : _monitor(Mutex::safepoint, name, true),
   335     _n_workers(n_workers), _n_completed(0), _should_reset(false) {
   336 }
   338 void WorkGangBarrierSync::set_n_workers(int n_workers) {
   339   _n_workers   = n_workers;
   340   _n_completed = 0;
   341   _should_reset = false;
   342 }
   344 void WorkGangBarrierSync::enter() {
   345   MutexLockerEx x(monitor(), Mutex::_no_safepoint_check_flag);
   346   if (should_reset()) {
   347     // The should_reset() was set and we are the first worker to enter
   348     // the sync barrier. We will zero the n_completed() count which
   349     // effectively resets the barrier.
   350     zero_completed();
   351     set_should_reset(false);
   352   }
   353   inc_completed();
   354   if (n_completed() == n_workers()) {
   355     // At this point we would like to reset the barrier to be ready in
   356     // case it is used again. However, we cannot set n_completed() to
   357     // 0, even after the notify_all(), given that some other workers
   358     // might still be waiting for n_completed() to become ==
   359     // n_workers(). So, if we set n_completed() to 0, those workers
   360     // will get stuck (as they will wake up, see that n_completed() !=
   361     // n_workers() and go back to sleep). Instead, we raise the
   362     // should_reset() flag and the barrier will be reset the first
   363     // time a worker enters it again.
   364     set_should_reset(true);
   365     monitor()->notify_all();
   366   } else {
   367     while (n_completed() != n_workers()) {
   368       monitor()->wait(/* no_safepoint_check */ true);
   369     }
   370   }
   371 }
   373 // SubTasksDone functions.
   375 SubTasksDone::SubTasksDone(int n) :
   376   _n_tasks(n), _n_threads(1), _tasks(NULL) {
   377   _tasks = NEW_C_HEAP_ARRAY(jint, n);
   378   guarantee(_tasks != NULL, "alloc failure");
   379   clear();
   380 }
   382 bool SubTasksDone::valid() {
   383   return _tasks != NULL;
   384 }
   386 void SubTasksDone::set_par_threads(int t) {
   387 #ifdef ASSERT
   388   assert(_claimed == 0 || _threads_completed == _n_threads,
   389          "should not be called while tasks are being processed!");
   390 #endif
   391   _n_threads = (t == 0 ? 1 : t);
   392 }
   394 void SubTasksDone::clear() {
   395   for (int i = 0; i < _n_tasks; i++) {
   396     _tasks[i] = 0;
   397   }
   398   _threads_completed = 0;
   399 #ifdef ASSERT
   400   _claimed = 0;
   401 #endif
   402 }
   404 bool SubTasksDone::is_task_claimed(int t) {
   405   assert(0 <= t && t < _n_tasks, "bad task id.");
   406   jint old = _tasks[t];
   407   if (old == 0) {
   408     old = Atomic::cmpxchg(1, &_tasks[t], 0);
   409   }
   410   assert(_tasks[t] == 1, "What else?");
   411   bool res = old != 0;
   412 #ifdef ASSERT
   413   if (!res) {
   414     assert(_claimed < _n_tasks, "Too many tasks claimed; missing clear?");
   415     Atomic::inc(&_claimed);
   416   }
   417 #endif
   418   return res;
   419 }
   421 void SubTasksDone::all_tasks_completed() {
   422   jint observed = _threads_completed;
   423   jint old;
   424   do {
   425     old = observed;
   426     observed = Atomic::cmpxchg(old+1, &_threads_completed, old);
   427   } while (observed != old);
   428   // If this was the last thread checking in, clear the tasks.
   429   if (observed+1 == _n_threads) clear();
   430 }
   433 SubTasksDone::~SubTasksDone() {
   434   if (_tasks != NULL) FREE_C_HEAP_ARRAY(jint, _tasks);
   435 }
   437 // *** SequentialSubTasksDone
   439 void SequentialSubTasksDone::clear() {
   440   _n_tasks   = _n_claimed   = 0;
   441   _n_threads = _n_completed = 0;
   442 }
   444 bool SequentialSubTasksDone::valid() {
   445   return _n_threads > 0;
   446 }
   448 bool SequentialSubTasksDone::is_task_claimed(int& t) {
   449   jint* n_claimed_ptr = &_n_claimed;
   450   t = *n_claimed_ptr;
   451   while (t < _n_tasks) {
   452     jint res = Atomic::cmpxchg(t+1, n_claimed_ptr, t);
   453     if (res == t) {
   454       return false;
   455     }
   456     t = *n_claimed_ptr;
   457   }
   458   return true;
   459 }
   461 bool SequentialSubTasksDone::all_tasks_completed() {
   462   jint* n_completed_ptr = &_n_completed;
   463   jint  complete        = *n_completed_ptr;
   464   while (true) {
   465     jint res = Atomic::cmpxchg(complete+1, n_completed_ptr, complete);
   466     if (res == complete) {
   467       break;
   468     }
   469     complete = res;
   470   }
   471   if (complete+1 == _n_threads) {
   472     clear();
   473     return true;
   474   }
   475   return false;
   476 }
   478 bool FreeIdSet::_stat_init = false;
   479 FreeIdSet* FreeIdSet::_sets[NSets];
   480 bool FreeIdSet::_safepoint;
   482 FreeIdSet::FreeIdSet(int sz, Monitor* mon) :
   483   _sz(sz), _mon(mon), _hd(0), _waiters(0), _index(-1), _claimed(0)
   484 {
   485   _ids = new int[sz];
   486   for (int i = 0; i < sz; i++) _ids[i] = i+1;
   487   _ids[sz-1] = end_of_list; // end of list.
   488   if (_stat_init) {
   489     for (int j = 0; j < NSets; j++) _sets[j] = NULL;
   490     _stat_init = true;
   491   }
   492   // Add to sets.  (This should happen while the system is still single-threaded.)
   493   for (int j = 0; j < NSets; j++) {
   494     if (_sets[j] == NULL) {
   495       _sets[j] = this;
   496       _index = j;
   497       break;
   498     }
   499   }
   500   guarantee(_index != -1, "Too many FreeIdSets in use!");
   501 }
   503 FreeIdSet::~FreeIdSet() {
   504   _sets[_index] = NULL;
   505 }
   507 void FreeIdSet::set_safepoint(bool b) {
   508   _safepoint = b;
   509   if (b) {
   510     for (int j = 0; j < NSets; j++) {
   511       if (_sets[j] != NULL && _sets[j]->_waiters > 0) {
   512         Monitor* mon = _sets[j]->_mon;
   513         mon->lock_without_safepoint_check();
   514         mon->notify_all();
   515         mon->unlock();
   516       }
   517     }
   518   }
   519 }
   521 #define FID_STATS 0
   523 int FreeIdSet::claim_par_id() {
   524 #if FID_STATS
   525   thread_t tslf = thr_self();
   526   tty->print("claim_par_id[%d]: sz = %d, claimed = %d\n", tslf, _sz, _claimed);
   527 #endif
   528   MutexLockerEx x(_mon, Mutex::_no_safepoint_check_flag);
   529   while (!_safepoint && _hd == end_of_list) {
   530     _waiters++;
   531 #if FID_STATS
   532     if (_waiters > 5) {
   533       tty->print("claim_par_id waiting[%d]: %d waiters, %d claimed.\n",
   534                  tslf, _waiters, _claimed);
   535     }
   536 #endif
   537     _mon->wait(Mutex::_no_safepoint_check_flag);
   538     _waiters--;
   539   }
   540   if (_hd == end_of_list) {
   541 #if FID_STATS
   542     tty->print("claim_par_id[%d]: returning EOL.\n", tslf);
   543 #endif
   544     return -1;
   545   } else {
   546     int res = _hd;
   547     _hd = _ids[res];
   548     _ids[res] = claimed;  // For debugging.
   549     _claimed++;
   550 #if FID_STATS
   551     tty->print("claim_par_id[%d]: returning %d, claimed = %d.\n",
   552                tslf, res, _claimed);
   553 #endif
   554     return res;
   555   }
   556 }
   558 bool FreeIdSet::claim_perm_id(int i) {
   559   assert(0 <= i && i < _sz, "Out of range.");
   560   MutexLockerEx x(_mon, Mutex::_no_safepoint_check_flag);
   561   int prev = end_of_list;
   562   int cur = _hd;
   563   while (cur != end_of_list) {
   564     if (cur == i) {
   565       if (prev == end_of_list) {
   566         _hd = _ids[cur];
   567       } else {
   568         _ids[prev] = _ids[cur];
   569       }
   570       _ids[cur] = claimed;
   571       _claimed++;
   572       return true;
   573     } else {
   574       prev = cur;
   575       cur = _ids[cur];
   576     }
   577   }
   578   return false;
   580 }
   582 void FreeIdSet::release_par_id(int id) {
   583   MutexLockerEx x(_mon, Mutex::_no_safepoint_check_flag);
   584   assert(_ids[id] == claimed, "Precondition.");
   585   _ids[id] = _hd;
   586   _hd = id;
   587   _claimed--;
   588 #if FID_STATS
   589   tty->print("[%d] release_par_id(%d), waiters =%d,  claimed = %d.\n",
   590              thr_self(), id, _waiters, _claimed);
   591 #endif
   592   if (_waiters > 0)
   593     // Notify all would be safer, but this is OK, right?
   594     _mon->notify_all();
   595 }

mercurial