src/share/vm/utilities/workgroup.cpp

Sat, 01 Dec 2007 00:00:00 +0000

author
duke
date
Sat, 01 Dec 2007 00:00:00 +0000
changeset 435
a61af66fc99e
child 777
37f87013dfd8
permissions
-rw-r--r--

Initial load

     1 /*
     2  * Copyright 2001-2007 Sun Microsystems, Inc.  All Rights Reserved.
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     4  *
     5  * This code is free software; you can redistribute it and/or modify it
     6  * under the terms of the GNU General Public License version 2 only, as
     7  * published by the Free Software Foundation.
     8  *
     9  * This code is distributed in the hope that it will be useful, but WITHOUT
    10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
    12  * version 2 for more details (a copy is included in the LICENSE file that
    13  * accompanied this code).
    14  *
    15  * You should have received a copy of the GNU General Public License version
    16  * 2 along with this work; if not, write to the Free Software Foundation,
    17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
    18  *
    19  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
    20  * CA 95054 USA or visit www.sun.com if you need additional information or
    21  * have any questions.
    22  *
    23  */
    25 # include "incls/_precompiled.incl"
    26 # include "incls/_workgroup.cpp.incl"
    28 // Definitions of WorkGang methods.
    30 AbstractWorkGang::AbstractWorkGang(const char* name,
    31                                    bool  are_GC_threads) :
    32   _name(name),
    33   _are_GC_threads(are_GC_threads) {
    34   // Other initialization.
    35   _monitor = new Monitor(/* priority */       Mutex::leaf,
    36                          /* name */           "WorkGroup monitor",
    37                          /* allow_vm_block */ are_GC_threads);
    38   assert(monitor() != NULL, "Failed to allocate monitor");
    39   _terminate = false;
    40   _task = NULL;
    41   _sequence_number = 0;
    42   _started_workers = 0;
    43   _finished_workers = 0;
    44 }
    46 WorkGang::WorkGang(const char* name,
    47                    int           workers,
    48                    bool          are_GC_threads) :
    49   AbstractWorkGang(name, are_GC_threads) {
    50   // Save arguments.
    51   _total_workers = workers;
    52   if (TraceWorkGang) {
    53     tty->print_cr("Constructing work gang %s with %d threads", name, workers);
    54   }
    55   _gang_workers = NEW_C_HEAP_ARRAY(GangWorker*, workers);
    56   assert(gang_workers() != NULL, "Failed to allocate gang workers");
    57   for (int worker = 0; worker < total_workers(); worker += 1) {
    58     GangWorker* new_worker = new GangWorker(this, worker);
    59     assert(new_worker != NULL, "Failed to allocate GangWorker");
    60     _gang_workers[worker] = new_worker;
    61     if (new_worker == NULL || !os::create_thread(new_worker, os::pgc_thread))
    62       vm_exit_out_of_memory(0, "Cannot create worker GC thread. Out of system resources.");
    63     if (!DisableStartThread) {
    64       os::start_thread(new_worker);
    65     }
    66   }
    67 }
    69 AbstractWorkGang::~AbstractWorkGang() {
    70   if (TraceWorkGang) {
    71     tty->print_cr("Destructing work gang %s", name());
    72   }
    73   stop();   // stop all the workers
    74   for (int worker = 0; worker < total_workers(); worker += 1) {
    75     delete gang_worker(worker);
    76   }
    77   delete gang_workers();
    78   delete monitor();
    79 }
    81 GangWorker* AbstractWorkGang::gang_worker(int i) const {
    82   // Array index bounds checking.
    83   GangWorker* result = NULL;
    84   assert(gang_workers() != NULL, "No workers for indexing");
    85   assert(((i >= 0) && (i < total_workers())), "Worker index out of bounds");
    86   result = _gang_workers[i];
    87   assert(result != NULL, "Indexing to null worker");
    88   return result;
    89 }
    91 void WorkGang::run_task(AbstractGangTask* task) {
    92   // This thread is executed by the VM thread which does not block
    93   // on ordinary MutexLocker's.
    94   MutexLockerEx ml(monitor(), Mutex::_no_safepoint_check_flag);
    95   if (TraceWorkGang) {
    96     tty->print_cr("Running work gang %s task %s", name(), task->name());
    97   }
    98   // Tell all the workers to run a task.
    99   assert(task != NULL, "Running a null task");
   100   // Initialize.
   101   _task = task;
   102   _sequence_number += 1;
   103   _started_workers = 0;
   104   _finished_workers = 0;
   105   // Tell the workers to get to work.
   106   monitor()->notify_all();
   107   // Wait for them to be finished
   108   while (finished_workers() < total_workers()) {
   109     if (TraceWorkGang) {
   110       tty->print_cr("Waiting in work gang %s: %d/%d finished sequence %d",
   111                     name(), finished_workers(), total_workers(),
   112                     _sequence_number);
   113     }
   114     monitor()->wait(/* no_safepoint_check */ true);
   115   }
   116   _task = NULL;
   117   if (TraceWorkGang) {
   118     tty->print_cr("/nFinished work gang %s: %d/%d sequence %d",
   119                   name(), finished_workers(), total_workers(),
   120                   _sequence_number);
   121     }
   122 }
   124 void AbstractWorkGang::stop() {
   125   // Tell all workers to terminate, then wait for them to become inactive.
   126   MutexLockerEx ml(monitor(), Mutex::_no_safepoint_check_flag);
   127   if (TraceWorkGang) {
   128     tty->print_cr("Stopping work gang %s task %s", name(), task()->name());
   129   }
   130   _task = NULL;
   131   _terminate = true;
   132   monitor()->notify_all();
   133   while (finished_workers() < total_workers()) {
   134     if (TraceWorkGang) {
   135       tty->print_cr("Waiting in work gang %s: %d/%d finished",
   136                     name(), finished_workers(), total_workers());
   137     }
   138     monitor()->wait(/* no_safepoint_check */ true);
   139   }
   140 }
   142 void AbstractWorkGang::internal_worker_poll(WorkData* data) const {
   143   assert(monitor()->owned_by_self(), "worker_poll is an internal method");
   144   assert(data != NULL, "worker data is null");
   145   data->set_terminate(terminate());
   146   data->set_task(task());
   147   data->set_sequence_number(sequence_number());
   148 }
   150 void AbstractWorkGang::internal_note_start() {
   151   assert(monitor()->owned_by_self(), "note_finish is an internal method");
   152   _started_workers += 1;
   153 }
   155 void AbstractWorkGang::internal_note_finish() {
   156   assert(monitor()->owned_by_self(), "note_finish is an internal method");
   157   _finished_workers += 1;
   158 }
   160 void AbstractWorkGang::print_worker_threads_on(outputStream* st) const {
   161   uint    num_thr = total_workers();
   162   for (uint i = 0; i < num_thr; i++) {
   163     gang_worker(i)->print_on(st);
   164     st->cr();
   165   }
   166 }
   168 void AbstractWorkGang::threads_do(ThreadClosure* tc) const {
   169   assert(tc != NULL, "Null ThreadClosure");
   170   uint num_thr = total_workers();
   171   for (uint i = 0; i < num_thr; i++) {
   172     tc->do_thread(gang_worker(i));
   173   }
   174 }
   176 // GangWorker methods.
   178 GangWorker::GangWorker(AbstractWorkGang* gang, uint id) {
   179   _gang = gang;
   180   set_id(id);
   181   set_name("Gang worker#%d (%s)", id, gang->name());
   182 }
   184 void GangWorker::run() {
   185   initialize();
   186   loop();
   187 }
   189 void GangWorker::initialize() {
   190   this->initialize_thread_local_storage();
   191   assert(_gang != NULL, "No gang to run in");
   192   os::set_priority(this, NearMaxPriority);
   193   if (TraceWorkGang) {
   194     tty->print_cr("Running gang worker for gang %s id %d",
   195                   gang()->name(), id());
   196   }
   197   // The VM thread should not execute here because MutexLocker's are used
   198   // as (opposed to MutexLockerEx's).
   199   assert(!Thread::current()->is_VM_thread(), "VM thread should not be part"
   200          " of a work gang");
   201 }
   203 void GangWorker::loop() {
   204   int previous_sequence_number = 0;
   205   Monitor* gang_monitor = gang()->monitor();
   206   for ( ; /* !terminate() */; ) {
   207     WorkData data;
   208     int part;  // Initialized below.
   209     {
   210       // Grab the gang mutex.
   211       MutexLocker ml(gang_monitor);
   212       // Wait for something to do.
   213       // Polling outside the while { wait } avoids missed notifies
   214       // in the outer loop.
   215       gang()->internal_worker_poll(&data);
   216       if (TraceWorkGang) {
   217         tty->print("Polled outside for work in gang %s worker %d",
   218                    gang()->name(), id());
   219         tty->print("  terminate: %s",
   220                    data.terminate() ? "true" : "false");
   221         tty->print("  sequence: %d (prev: %d)",
   222                    data.sequence_number(), previous_sequence_number);
   223         if (data.task() != NULL) {
   224           tty->print("  task: %s", data.task()->name());
   225         } else {
   226           tty->print("  task: NULL");
   227         }
   228         tty->cr();
   229       }
   230       for ( ; /* break or return */; ) {
   231         // Terminate if requested.
   232         if (data.terminate()) {
   233           gang()->internal_note_finish();
   234           gang_monitor->notify_all();
   235           return;
   236         }
   237         // Check for new work.
   238         if ((data.task() != NULL) &&
   239             (data.sequence_number() != previous_sequence_number)) {
   240           gang()->internal_note_start();
   241           gang_monitor->notify_all();
   242           part = gang()->started_workers() - 1;
   243           break;
   244         }
   245         // Nothing to do.
   246         gang_monitor->wait(/* no_safepoint_check */ true);
   247         gang()->internal_worker_poll(&data);
   248         if (TraceWorkGang) {
   249           tty->print("Polled inside for work in gang %s worker %d",
   250                      gang()->name(), id());
   251           tty->print("  terminate: %s",
   252                      data.terminate() ? "true" : "false");
   253           tty->print("  sequence: %d (prev: %d)",
   254                      data.sequence_number(), previous_sequence_number);
   255           if (data.task() != NULL) {
   256             tty->print("  task: %s", data.task()->name());
   257           } else {
   258             tty->print("  task: NULL");
   259           }
   260           tty->cr();
   261         }
   262       }
   263       // Drop gang mutex.
   264     }
   265     if (TraceWorkGang) {
   266       tty->print("Work for work gang %s id %d task %s part %d",
   267                  gang()->name(), id(), data.task()->name(), part);
   268     }
   269     assert(data.task() != NULL, "Got null task");
   270     data.task()->work(part);
   271     {
   272       if (TraceWorkGang) {
   273         tty->print("Finish for work gang %s id %d task %s part %d",
   274                    gang()->name(), id(), data.task()->name(), part);
   275       }
   276       // Grab the gang mutex.
   277       MutexLocker ml(gang_monitor);
   278       gang()->internal_note_finish();
   279       // Tell the gang you are done.
   280       gang_monitor->notify_all();
   281       // Drop the gang mutex.
   282     }
   283     previous_sequence_number = data.sequence_number();
   284   }
   285 }
   287 bool GangWorker::is_GC_task_thread() const {
   288   return gang()->are_GC_threads();
   289 }
   291 void GangWorker::print_on(outputStream* st) const {
   292   st->print("\"%s\" ", name());
   293   Thread::print_on(st);
   294   st->cr();
   295 }
   297 // Printing methods
   299 const char* AbstractWorkGang::name() const {
   300   return _name;
   301 }
   303 #ifndef PRODUCT
   305 const char* AbstractGangTask::name() const {
   306   return _name;
   307 }
   309 #endif /* PRODUCT */
   311 // *** WorkGangBarrierSync
   313 WorkGangBarrierSync::WorkGangBarrierSync()
   314   : _monitor(Mutex::safepoint, "work gang barrier sync", true),
   315     _n_workers(0), _n_completed(0) {
   316 }
   318 WorkGangBarrierSync::WorkGangBarrierSync(int n_workers, const char* name)
   319   : _monitor(Mutex::safepoint, name, true),
   320     _n_workers(n_workers), _n_completed(0) {
   321 }
   323 void WorkGangBarrierSync::set_n_workers(int n_workers) {
   324   _n_workers   = n_workers;
   325   _n_completed = 0;
   326 }
   328 void WorkGangBarrierSync::enter() {
   329   MutexLockerEx x(monitor(), Mutex::_no_safepoint_check_flag);
   330   inc_completed();
   331   if (n_completed() == n_workers()) {
   332     monitor()->notify_all();
   333   }
   334   else {
   335     while (n_completed() != n_workers()) {
   336       monitor()->wait(/* no_safepoint_check */ true);
   337     }
   338   }
   339 }
   341 // SubTasksDone functions.
   343 SubTasksDone::SubTasksDone(int n) :
   344   _n_tasks(n), _n_threads(1), _tasks(NULL) {
   345   _tasks = NEW_C_HEAP_ARRAY(jint, n);
   346   guarantee(_tasks != NULL, "alloc failure");
   347   clear();
   348 }
   350 bool SubTasksDone::valid() {
   351   return _tasks != NULL;
   352 }
   354 void SubTasksDone::set_par_threads(int t) {
   355 #ifdef ASSERT
   356   assert(_claimed == 0 || _threads_completed == _n_threads,
   357          "should not be called while tasks are being processed!");
   358 #endif
   359   _n_threads = (t == 0 ? 1 : t);
   360 }
   362 void SubTasksDone::clear() {
   363   for (int i = 0; i < _n_tasks; i++) {
   364     _tasks[i] = 0;
   365   }
   366   _threads_completed = 0;
   367 #ifdef ASSERT
   368   _claimed = 0;
   369 #endif
   370 }
   372 bool SubTasksDone::is_task_claimed(int t) {
   373   assert(0 <= t && t < _n_tasks, "bad task id.");
   374   jint old = _tasks[t];
   375   if (old == 0) {
   376     old = Atomic::cmpxchg(1, &_tasks[t], 0);
   377   }
   378   assert(_tasks[t] == 1, "What else?");
   379   bool res = old != 0;
   380 #ifdef ASSERT
   381   if (!res) {
   382     assert(_claimed < _n_tasks, "Too many tasks claimed; missing clear?");
   383     Atomic::inc(&_claimed);
   384   }
   385 #endif
   386   return res;
   387 }
   389 void SubTasksDone::all_tasks_completed() {
   390   jint observed = _threads_completed;
   391   jint old;
   392   do {
   393     old = observed;
   394     observed = Atomic::cmpxchg(old+1, &_threads_completed, old);
   395   } while (observed != old);
   396   // If this was the last thread checking in, clear the tasks.
   397   if (observed+1 == _n_threads) clear();
   398 }
   401 SubTasksDone::~SubTasksDone() {
   402   if (_tasks != NULL) FREE_C_HEAP_ARRAY(jint, _tasks);
   403 }
   405 // *** SequentialSubTasksDone
   407 void SequentialSubTasksDone::clear() {
   408   _n_tasks   = _n_claimed   = 0;
   409   _n_threads = _n_completed = 0;
   410 }
   412 bool SequentialSubTasksDone::valid() {
   413   return _n_threads > 0;
   414 }
   416 bool SequentialSubTasksDone::is_task_claimed(int& t) {
   417   jint* n_claimed_ptr = &_n_claimed;
   418   t = *n_claimed_ptr;
   419   while (t < _n_tasks) {
   420     jint res = Atomic::cmpxchg(t+1, n_claimed_ptr, t);
   421     if (res == t) {
   422       return false;
   423     }
   424     t = *n_claimed_ptr;
   425   }
   426   return true;
   427 }
   429 bool SequentialSubTasksDone::all_tasks_completed() {
   430   jint* n_completed_ptr = &_n_completed;
   431   jint  complete        = *n_completed_ptr;
   432   while (true) {
   433     jint res = Atomic::cmpxchg(complete+1, n_completed_ptr, complete);
   434     if (res == complete) {
   435       break;
   436     }
   437     complete = res;
   438   }
   439   if (complete+1 == _n_threads) {
   440     clear();
   441     return true;
   442   }
   443   return false;
   444 }

mercurial