src/share/vm/utilities/workgroup.cpp

Thu, 17 Mar 2011 10:32:46 -0700

author
ysr
date
Thu, 17 Mar 2011 10:32:46 -0700
changeset 2651
92da084fefc9
parent 2314
f95d63e2154a
child 3294
bca17e38de00
permissions
-rw-r--r--

6668573: CMS: reference processing crash if ParallelCMSThreads > ParallelGCThreads
Summary: Use _max_num_q = max(discovery_degree, processing_degree), and let balance_queues() redistribute from discovery_degree to processing_degree of queues. This should also allow a more dynamic and flexible parallelism policy in the future.
Reviewed-by: jmasa, johnc

     1 /*
     2  * Copyright (c) 2001, 2011, Oracle and/or its affiliates. All rights reserved.
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     4  *
     5  * This code is free software; you can redistribute it and/or modify it
     6  * under the terms of the GNU General Public License version 2 only, as
     7  * published by the Free Software Foundation.
     8  *
     9  * This code is distributed in the hope that it will be useful, but WITHOUT
    10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
    12  * version 2 for more details (a copy is included in the LICENSE file that
    13  * accompanied this code).
    14  *
    15  * You should have received a copy of the GNU General Public License version
    16  * 2 along with this work; if not, write to the Free Software Foundation,
    17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
    18  *
    19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
    20  * or visit www.oracle.com if you need additional information or have any
    21  * questions.
    22  *
    23  */
    25 #include "precompiled.hpp"
    26 #include "memory/allocation.hpp"
    27 #include "memory/allocation.inline.hpp"
    28 #include "runtime/os.hpp"
    29 #include "utilities/workgroup.hpp"
    31 // Definitions of WorkGang methods.
    33 AbstractWorkGang::AbstractWorkGang(const char* name,
    34                                    bool  are_GC_task_threads,
    35                                    bool  are_ConcurrentGC_threads) :
    36   _name(name),
    37   _are_GC_task_threads(are_GC_task_threads),
    38   _are_ConcurrentGC_threads(are_ConcurrentGC_threads) {
    40   assert(!(are_GC_task_threads && are_ConcurrentGC_threads),
    41          "They cannot both be STW GC and Concurrent threads" );
    43   // Other initialization.
    44   _monitor = new Monitor(/* priority */       Mutex::leaf,
    45                          /* name */           "WorkGroup monitor",
    46                          /* allow_vm_block */ are_GC_task_threads);
    47   assert(monitor() != NULL, "Failed to allocate monitor");
    48   _terminate = false;
    49   _task = NULL;
    50   _sequence_number = 0;
    51   _started_workers = 0;
    52   _finished_workers = 0;
    53 }
    55 WorkGang::WorkGang(const char* name,
    56                    int         workers,
    57                    bool        are_GC_task_threads,
    58                    bool        are_ConcurrentGC_threads) :
    59   AbstractWorkGang(name, are_GC_task_threads, are_ConcurrentGC_threads) {
    60   // Save arguments.
    61   _total_workers = workers;
    62 }
    64 GangWorker* WorkGang::allocate_worker(int which) {
    65   GangWorker* new_worker = new GangWorker(this, which);
    66   return new_worker;
    67 }
    69 // The current implementation will exit if the allocation
    70 // of any worker fails.  Still, return a boolean so that
    71 // a future implementation can possibly do a partial
    72 // initialization of the workers and report such to the
    73 // caller.
    74 bool WorkGang::initialize_workers() {
    76   if (TraceWorkGang) {
    77     tty->print_cr("Constructing work gang %s with %d threads",
    78                   name(),
    79                   total_workers());
    80   }
    81   _gang_workers = NEW_C_HEAP_ARRAY(GangWorker*, total_workers());
    82   if (gang_workers() == NULL) {
    83     vm_exit_out_of_memory(0, "Cannot create GangWorker array.");
    84     return false;
    85   }
    86   os::ThreadType worker_type;
    87   if (are_ConcurrentGC_threads()) {
    88     worker_type = os::cgc_thread;
    89   } else {
    90     worker_type = os::pgc_thread;
    91   }
    92   for (int worker = 0; worker < total_workers(); worker += 1) {
    93     GangWorker* new_worker = allocate_worker(worker);
    94     assert(new_worker != NULL, "Failed to allocate GangWorker");
    95     _gang_workers[worker] = new_worker;
    96     if (new_worker == NULL || !os::create_thread(new_worker, worker_type)) {
    97       vm_exit_out_of_memory(0, "Cannot create worker GC thread. Out of system resources.");
    98       return false;
    99     }
   100     if (!DisableStartThread) {
   101       os::start_thread(new_worker);
   102     }
   103   }
   104   return true;
   105 }
   107 AbstractWorkGang::~AbstractWorkGang() {
   108   if (TraceWorkGang) {
   109     tty->print_cr("Destructing work gang %s", name());
   110   }
   111   stop();   // stop all the workers
   112   for (int worker = 0; worker < total_workers(); worker += 1) {
   113     delete gang_worker(worker);
   114   }
   115   delete gang_workers();
   116   delete monitor();
   117 }
   119 GangWorker* AbstractWorkGang::gang_worker(int i) const {
   120   // Array index bounds checking.
   121   GangWorker* result = NULL;
   122   assert(gang_workers() != NULL, "No workers for indexing");
   123   assert(((i >= 0) && (i < total_workers())), "Worker index out of bounds");
   124   result = _gang_workers[i];
   125   assert(result != NULL, "Indexing to null worker");
   126   return result;
   127 }
   129 void WorkGang::run_task(AbstractGangTask* task) {
   130   // This thread is executed by the VM thread which does not block
   131   // on ordinary MutexLocker's.
   132   MutexLockerEx ml(monitor(), Mutex::_no_safepoint_check_flag);
   133   if (TraceWorkGang) {
   134     tty->print_cr("Running work gang %s task %s", name(), task->name());
   135   }
   136   // Tell all the workers to run a task.
   137   assert(task != NULL, "Running a null task");
   138   // Initialize.
   139   _task = task;
   140   _sequence_number += 1;
   141   _started_workers = 0;
   142   _finished_workers = 0;
   143   // Tell the workers to get to work.
   144   monitor()->notify_all();
   145   // Wait for them to be finished
   146   while (finished_workers() < total_workers()) {
   147     if (TraceWorkGang) {
   148       tty->print_cr("Waiting in work gang %s: %d/%d finished sequence %d",
   149                     name(), finished_workers(), total_workers(),
   150                     _sequence_number);
   151     }
   152     monitor()->wait(/* no_safepoint_check */ true);
   153   }
   154   _task = NULL;
   155   if (TraceWorkGang) {
   156     tty->print_cr("/nFinished work gang %s: %d/%d sequence %d",
   157                   name(), finished_workers(), total_workers(),
   158                   _sequence_number);
   159   }
   160 }
   162 void AbstractWorkGang::stop() {
   163   // Tell all workers to terminate, then wait for them to become inactive.
   164   MutexLockerEx ml(monitor(), Mutex::_no_safepoint_check_flag);
   165   if (TraceWorkGang) {
   166     tty->print_cr("Stopping work gang %s task %s", name(), task()->name());
   167   }
   168   _task = NULL;
   169   _terminate = true;
   170   monitor()->notify_all();
   171   while (finished_workers() < total_workers()) {
   172     if (TraceWorkGang) {
   173       tty->print_cr("Waiting in work gang %s: %d/%d finished",
   174                     name(), finished_workers(), total_workers());
   175     }
   176     monitor()->wait(/* no_safepoint_check */ true);
   177   }
   178 }
   180 void AbstractWorkGang::internal_worker_poll(WorkData* data) const {
   181   assert(monitor()->owned_by_self(), "worker_poll is an internal method");
   182   assert(data != NULL, "worker data is null");
   183   data->set_terminate(terminate());
   184   data->set_task(task());
   185   data->set_sequence_number(sequence_number());
   186 }
   188 void AbstractWorkGang::internal_note_start() {
   189   assert(monitor()->owned_by_self(), "note_finish is an internal method");
   190   _started_workers += 1;
   191 }
   193 void AbstractWorkGang::internal_note_finish() {
   194   assert(monitor()->owned_by_self(), "note_finish is an internal method");
   195   _finished_workers += 1;
   196 }
   198 void AbstractWorkGang::print_worker_threads_on(outputStream* st) const {
   199   uint    num_thr = total_workers();
   200   for (uint i = 0; i < num_thr; i++) {
   201     gang_worker(i)->print_on(st);
   202     st->cr();
   203   }
   204 }
   206 void AbstractWorkGang::threads_do(ThreadClosure* tc) const {
   207   assert(tc != NULL, "Null ThreadClosure");
   208   uint num_thr = total_workers();
   209   for (uint i = 0; i < num_thr; i++) {
   210     tc->do_thread(gang_worker(i));
   211   }
   212 }
   214 // GangWorker methods.
   216 GangWorker::GangWorker(AbstractWorkGang* gang, uint id) {
   217   _gang = gang;
   218   set_id(id);
   219   set_name("Gang worker#%d (%s)", id, gang->name());
   220 }
   222 void GangWorker::run() {
   223   initialize();
   224   loop();
   225 }
   227 void GangWorker::initialize() {
   228   this->initialize_thread_local_storage();
   229   assert(_gang != NULL, "No gang to run in");
   230   os::set_priority(this, NearMaxPriority);
   231   if (TraceWorkGang) {
   232     tty->print_cr("Running gang worker for gang %s id %d",
   233                   gang()->name(), id());
   234   }
   235   // The VM thread should not execute here because MutexLocker's are used
   236   // as (opposed to MutexLockerEx's).
   237   assert(!Thread::current()->is_VM_thread(), "VM thread should not be part"
   238          " of a work gang");
   239 }
   241 void GangWorker::loop() {
   242   int previous_sequence_number = 0;
   243   Monitor* gang_monitor = gang()->monitor();
   244   for ( ; /* !terminate() */; ) {
   245     WorkData data;
   246     int part;  // Initialized below.
   247     {
   248       // Grab the gang mutex.
   249       MutexLocker ml(gang_monitor);
   250       // Wait for something to do.
   251       // Polling outside the while { wait } avoids missed notifies
   252       // in the outer loop.
   253       gang()->internal_worker_poll(&data);
   254       if (TraceWorkGang) {
   255         tty->print("Polled outside for work in gang %s worker %d",
   256                    gang()->name(), id());
   257         tty->print("  terminate: %s",
   258                    data.terminate() ? "true" : "false");
   259         tty->print("  sequence: %d (prev: %d)",
   260                    data.sequence_number(), previous_sequence_number);
   261         if (data.task() != NULL) {
   262           tty->print("  task: %s", data.task()->name());
   263         } else {
   264           tty->print("  task: NULL");
   265         }
   266         tty->cr();
   267       }
   268       for ( ; /* break or return */; ) {
   269         // Terminate if requested.
   270         if (data.terminate()) {
   271           gang()->internal_note_finish();
   272           gang_monitor->notify_all();
   273           return;
   274         }
   275         // Check for new work.
   276         if ((data.task() != NULL) &&
   277             (data.sequence_number() != previous_sequence_number)) {
   278           gang()->internal_note_start();
   279           gang_monitor->notify_all();
   280           part = gang()->started_workers() - 1;
   281           break;
   282         }
   283         // Nothing to do.
   284         gang_monitor->wait(/* no_safepoint_check */ true);
   285         gang()->internal_worker_poll(&data);
   286         if (TraceWorkGang) {
   287           tty->print("Polled inside for work in gang %s worker %d",
   288                      gang()->name(), id());
   289           tty->print("  terminate: %s",
   290                      data.terminate() ? "true" : "false");
   291           tty->print("  sequence: %d (prev: %d)",
   292                      data.sequence_number(), previous_sequence_number);
   293           if (data.task() != NULL) {
   294             tty->print("  task: %s", data.task()->name());
   295           } else {
   296             tty->print("  task: NULL");
   297           }
   298           tty->cr();
   299         }
   300       }
   301       // Drop gang mutex.
   302     }
   303     if (TraceWorkGang) {
   304       tty->print("Work for work gang %s id %d task %s part %d",
   305                  gang()->name(), id(), data.task()->name(), part);
   306     }
   307     assert(data.task() != NULL, "Got null task");
   308     data.task()->work(part);
   309     {
   310       if (TraceWorkGang) {
   311         tty->print("Finish for work gang %s id %d task %s part %d",
   312                    gang()->name(), id(), data.task()->name(), part);
   313       }
   314       // Grab the gang mutex.
   315       MutexLocker ml(gang_monitor);
   316       gang()->internal_note_finish();
   317       // Tell the gang you are done.
   318       gang_monitor->notify_all();
   319       // Drop the gang mutex.
   320     }
   321     previous_sequence_number = data.sequence_number();
   322   }
   323 }
   325 bool GangWorker::is_GC_task_thread() const {
   326   return gang()->are_GC_task_threads();
   327 }
   329 bool GangWorker::is_ConcurrentGC_thread() const {
   330   return gang()->are_ConcurrentGC_threads();
   331 }
   333 void GangWorker::print_on(outputStream* st) const {
   334   st->print("\"%s\" ", name());
   335   Thread::print_on(st);
   336   st->cr();
   337 }
   339 // Printing methods
   341 const char* AbstractWorkGang::name() const {
   342   return _name;
   343 }
   345 #ifndef PRODUCT
   347 const char* AbstractGangTask::name() const {
   348   return _name;
   349 }
   351 #endif /* PRODUCT */
   353 // *** WorkGangBarrierSync
   355 WorkGangBarrierSync::WorkGangBarrierSync()
   356   : _monitor(Mutex::safepoint, "work gang barrier sync", true),
   357     _n_workers(0), _n_completed(0), _should_reset(false) {
   358 }
   360 WorkGangBarrierSync::WorkGangBarrierSync(int n_workers, const char* name)
   361   : _monitor(Mutex::safepoint, name, true),
   362     _n_workers(n_workers), _n_completed(0), _should_reset(false) {
   363 }
   365 void WorkGangBarrierSync::set_n_workers(int n_workers) {
   366   _n_workers   = n_workers;
   367   _n_completed = 0;
   368   _should_reset = false;
   369 }
   371 void WorkGangBarrierSync::enter() {
   372   MutexLockerEx x(monitor(), Mutex::_no_safepoint_check_flag);
   373   if (should_reset()) {
   374     // The should_reset() was set and we are the first worker to enter
   375     // the sync barrier. We will zero the n_completed() count which
   376     // effectively resets the barrier.
   377     zero_completed();
   378     set_should_reset(false);
   379   }
   380   inc_completed();
   381   if (n_completed() == n_workers()) {
   382     // At this point we would like to reset the barrier to be ready in
   383     // case it is used again. However, we cannot set n_completed() to
   384     // 0, even after the notify_all(), given that some other workers
   385     // might still be waiting for n_completed() to become ==
   386     // n_workers(). So, if we set n_completed() to 0, those workers
   387     // will get stuck (as they will wake up, see that n_completed() !=
   388     // n_workers() and go back to sleep). Instead, we raise the
   389     // should_reset() flag and the barrier will be reset the first
   390     // time a worker enters it again.
   391     set_should_reset(true);
   392     monitor()->notify_all();
   393   } else {
   394     while (n_completed() != n_workers()) {
   395       monitor()->wait(/* no_safepoint_check */ true);
   396     }
   397   }
   398 }
   400 // SubTasksDone functions.
   402 SubTasksDone::SubTasksDone(int n) :
   403   _n_tasks(n), _n_threads(1), _tasks(NULL) {
   404   _tasks = NEW_C_HEAP_ARRAY(jint, n);
   405   guarantee(_tasks != NULL, "alloc failure");
   406   clear();
   407 }
   409 bool SubTasksDone::valid() {
   410   return _tasks != NULL;
   411 }
   413 void SubTasksDone::set_n_threads(int t) {
   414 #ifdef ASSERT
   415   assert(_claimed == 0 || _threads_completed == _n_threads,
   416          "should not be called while tasks are being processed!");
   417 #endif
   418   _n_threads = (t == 0 ? 1 : t);
   419 }
   421 void SubTasksDone::clear() {
   422   for (int i = 0; i < _n_tasks; i++) {
   423     _tasks[i] = 0;
   424   }
   425   _threads_completed = 0;
   426 #ifdef ASSERT
   427   _claimed = 0;
   428 #endif
   429 }
   431 bool SubTasksDone::is_task_claimed(int t) {
   432   assert(0 <= t && t < _n_tasks, "bad task id.");
   433   jint old = _tasks[t];
   434   if (old == 0) {
   435     old = Atomic::cmpxchg(1, &_tasks[t], 0);
   436   }
   437   assert(_tasks[t] == 1, "What else?");
   438   bool res = old != 0;
   439 #ifdef ASSERT
   440   if (!res) {
   441     assert(_claimed < _n_tasks, "Too many tasks claimed; missing clear?");
   442     Atomic::inc(&_claimed);
   443   }
   444 #endif
   445   return res;
   446 }
   448 void SubTasksDone::all_tasks_completed() {
   449   jint observed = _threads_completed;
   450   jint old;
   451   do {
   452     old = observed;
   453     observed = Atomic::cmpxchg(old+1, &_threads_completed, old);
   454   } while (observed != old);
   455   // If this was the last thread checking in, clear the tasks.
   456   if (observed+1 == _n_threads) clear();
   457 }
   460 SubTasksDone::~SubTasksDone() {
   461   if (_tasks != NULL) FREE_C_HEAP_ARRAY(jint, _tasks);
   462 }
   464 // *** SequentialSubTasksDone
   466 void SequentialSubTasksDone::clear() {
   467   _n_tasks   = _n_claimed   = 0;
   468   _n_threads = _n_completed = 0;
   469 }
   471 bool SequentialSubTasksDone::valid() {
   472   return _n_threads > 0;
   473 }
   475 bool SequentialSubTasksDone::is_task_claimed(int& t) {
   476   jint* n_claimed_ptr = &_n_claimed;
   477   t = *n_claimed_ptr;
   478   while (t < _n_tasks) {
   479     jint res = Atomic::cmpxchg(t+1, n_claimed_ptr, t);
   480     if (res == t) {
   481       return false;
   482     }
   483     t = *n_claimed_ptr;
   484   }
   485   return true;
   486 }
   488 bool SequentialSubTasksDone::all_tasks_completed() {
   489   jint* n_completed_ptr = &_n_completed;
   490   jint  complete        = *n_completed_ptr;
   491   while (true) {
   492     jint res = Atomic::cmpxchg(complete+1, n_completed_ptr, complete);
   493     if (res == complete) {
   494       break;
   495     }
   496     complete = res;
   497   }
   498   if (complete+1 == _n_threads) {
   499     clear();
   500     return true;
   501   }
   502   return false;
   503 }
   505 bool FreeIdSet::_stat_init = false;
   506 FreeIdSet* FreeIdSet::_sets[NSets];
   507 bool FreeIdSet::_safepoint;
   509 FreeIdSet::FreeIdSet(int sz, Monitor* mon) :
   510   _sz(sz), _mon(mon), _hd(0), _waiters(0), _index(-1), _claimed(0)
   511 {
   512   _ids = new int[sz];
   513   for (int i = 0; i < sz; i++) _ids[i] = i+1;
   514   _ids[sz-1] = end_of_list; // end of list.
   515   if (_stat_init) {
   516     for (int j = 0; j < NSets; j++) _sets[j] = NULL;
   517     _stat_init = true;
   518   }
   519   // Add to sets.  (This should happen while the system is still single-threaded.)
   520   for (int j = 0; j < NSets; j++) {
   521     if (_sets[j] == NULL) {
   522       _sets[j] = this;
   523       _index = j;
   524       break;
   525     }
   526   }
   527   guarantee(_index != -1, "Too many FreeIdSets in use!");
   528 }
   530 FreeIdSet::~FreeIdSet() {
   531   _sets[_index] = NULL;
   532 }
   534 void FreeIdSet::set_safepoint(bool b) {
   535   _safepoint = b;
   536   if (b) {
   537     for (int j = 0; j < NSets; j++) {
   538       if (_sets[j] != NULL && _sets[j]->_waiters > 0) {
   539         Monitor* mon = _sets[j]->_mon;
   540         mon->lock_without_safepoint_check();
   541         mon->notify_all();
   542         mon->unlock();
   543       }
   544     }
   545   }
   546 }
   548 #define FID_STATS 0
   550 int FreeIdSet::claim_par_id() {
   551 #if FID_STATS
   552   thread_t tslf = thr_self();
   553   tty->print("claim_par_id[%d]: sz = %d, claimed = %d\n", tslf, _sz, _claimed);
   554 #endif
   555   MutexLockerEx x(_mon, Mutex::_no_safepoint_check_flag);
   556   while (!_safepoint && _hd == end_of_list) {
   557     _waiters++;
   558 #if FID_STATS
   559     if (_waiters > 5) {
   560       tty->print("claim_par_id waiting[%d]: %d waiters, %d claimed.\n",
   561                  tslf, _waiters, _claimed);
   562     }
   563 #endif
   564     _mon->wait(Mutex::_no_safepoint_check_flag);
   565     _waiters--;
   566   }
   567   if (_hd == end_of_list) {
   568 #if FID_STATS
   569     tty->print("claim_par_id[%d]: returning EOL.\n", tslf);
   570 #endif
   571     return -1;
   572   } else {
   573     int res = _hd;
   574     _hd = _ids[res];
   575     _ids[res] = claimed;  // For debugging.
   576     _claimed++;
   577 #if FID_STATS
   578     tty->print("claim_par_id[%d]: returning %d, claimed = %d.\n",
   579                tslf, res, _claimed);
   580 #endif
   581     return res;
   582   }
   583 }
   585 bool FreeIdSet::claim_perm_id(int i) {
   586   assert(0 <= i && i < _sz, "Out of range.");
   587   MutexLockerEx x(_mon, Mutex::_no_safepoint_check_flag);
   588   int prev = end_of_list;
   589   int cur = _hd;
   590   while (cur != end_of_list) {
   591     if (cur == i) {
   592       if (prev == end_of_list) {
   593         _hd = _ids[cur];
   594       } else {
   595         _ids[prev] = _ids[cur];
   596       }
   597       _ids[cur] = claimed;
   598       _claimed++;
   599       return true;
   600     } else {
   601       prev = cur;
   602       cur = _ids[cur];
   603     }
   604   }
   605   return false;
   607 }
   609 void FreeIdSet::release_par_id(int id) {
   610   MutexLockerEx x(_mon, Mutex::_no_safepoint_check_flag);
   611   assert(_ids[id] == claimed, "Precondition.");
   612   _ids[id] = _hd;
   613   _hd = id;
   614   _claimed--;
   615 #if FID_STATS
   616   tty->print("[%d] release_par_id(%d), waiters =%d,  claimed = %d.\n",
   617              thr_self(), id, _waiters, _claimed);
   618 #endif
   619   if (_waiters > 0)
   620     // Notify all would be safer, but this is OK, right?
   621     _mon->notify_all();
   622 }

mercurial