1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/src/share/vm/utilities/workgroup.cpp Sat Dec 01 00:00:00 2007 +0000 1.3 @@ -0,0 +1,444 @@ 1.4 +/* 1.5 + * Copyright 2001-2007 Sun Microsystems, Inc. All Rights Reserved. 1.6 + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 1.7 + * 1.8 + * This code is free software; you can redistribute it and/or modify it 1.9 + * under the terms of the GNU General Public License version 2 only, as 1.10 + * published by the Free Software Foundation. 1.11 + * 1.12 + * This code is distributed in the hope that it will be useful, but WITHOUT 1.13 + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 1.14 + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 1.15 + * version 2 for more details (a copy is included in the LICENSE file that 1.16 + * accompanied this code). 1.17 + * 1.18 + * You should have received a copy of the GNU General Public License version 1.19 + * 2 along with this work; if not, write to the Free Software Foundation, 1.20 + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 1.21 + * 1.22 + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, 1.23 + * CA 95054 USA or visit www.sun.com if you need additional information or 1.24 + * have any questions. 1.25 + * 1.26 + */ 1.27 + 1.28 +# include "incls/_precompiled.incl" 1.29 +# include "incls/_workgroup.cpp.incl" 1.30 + 1.31 +// Definitions of WorkGang methods. 1.32 + 1.33 +AbstractWorkGang::AbstractWorkGang(const char* name, 1.34 + bool are_GC_threads) : 1.35 + _name(name), 1.36 + _are_GC_threads(are_GC_threads) { 1.37 + // Other initialization. 1.38 + _monitor = new Monitor(/* priority */ Mutex::leaf, 1.39 + /* name */ "WorkGroup monitor", 1.40 + /* allow_vm_block */ are_GC_threads); 1.41 + assert(monitor() != NULL, "Failed to allocate monitor"); 1.42 + _terminate = false; 1.43 + _task = NULL; 1.44 + _sequence_number = 0; 1.45 + _started_workers = 0; 1.46 + _finished_workers = 0; 1.47 +} 1.48 + 1.49 +WorkGang::WorkGang(const char* name, 1.50 + int workers, 1.51 + bool are_GC_threads) : 1.52 + AbstractWorkGang(name, are_GC_threads) { 1.53 + // Save arguments. 1.54 + _total_workers = workers; 1.55 + if (TraceWorkGang) { 1.56 + tty->print_cr("Constructing work gang %s with %d threads", name, workers); 1.57 + } 1.58 + _gang_workers = NEW_C_HEAP_ARRAY(GangWorker*, workers); 1.59 + assert(gang_workers() != NULL, "Failed to allocate gang workers"); 1.60 + for (int worker = 0; worker < total_workers(); worker += 1) { 1.61 + GangWorker* new_worker = new GangWorker(this, worker); 1.62 + assert(new_worker != NULL, "Failed to allocate GangWorker"); 1.63 + _gang_workers[worker] = new_worker; 1.64 + if (new_worker == NULL || !os::create_thread(new_worker, os::pgc_thread)) 1.65 + vm_exit_out_of_memory(0, "Cannot create worker GC thread. Out of system resources."); 1.66 + if (!DisableStartThread) { 1.67 + os::start_thread(new_worker); 1.68 + } 1.69 + } 1.70 +} 1.71 + 1.72 +AbstractWorkGang::~AbstractWorkGang() { 1.73 + if (TraceWorkGang) { 1.74 + tty->print_cr("Destructing work gang %s", name()); 1.75 + } 1.76 + stop(); // stop all the workers 1.77 + for (int worker = 0; worker < total_workers(); worker += 1) { 1.78 + delete gang_worker(worker); 1.79 + } 1.80 + delete gang_workers(); 1.81 + delete monitor(); 1.82 +} 1.83 + 1.84 +GangWorker* AbstractWorkGang::gang_worker(int i) const { 1.85 + // Array index bounds checking. 1.86 + GangWorker* result = NULL; 1.87 + assert(gang_workers() != NULL, "No workers for indexing"); 1.88 + assert(((i >= 0) && (i < total_workers())), "Worker index out of bounds"); 1.89 + result = _gang_workers[i]; 1.90 + assert(result != NULL, "Indexing to null worker"); 1.91 + return result; 1.92 +} 1.93 + 1.94 +void WorkGang::run_task(AbstractGangTask* task) { 1.95 + // This thread is executed by the VM thread which does not block 1.96 + // on ordinary MutexLocker's. 1.97 + MutexLockerEx ml(monitor(), Mutex::_no_safepoint_check_flag); 1.98 + if (TraceWorkGang) { 1.99 + tty->print_cr("Running work gang %s task %s", name(), task->name()); 1.100 + } 1.101 + // Tell all the workers to run a task. 1.102 + assert(task != NULL, "Running a null task"); 1.103 + // Initialize. 1.104 + _task = task; 1.105 + _sequence_number += 1; 1.106 + _started_workers = 0; 1.107 + _finished_workers = 0; 1.108 + // Tell the workers to get to work. 1.109 + monitor()->notify_all(); 1.110 + // Wait for them to be finished 1.111 + while (finished_workers() < total_workers()) { 1.112 + if (TraceWorkGang) { 1.113 + tty->print_cr("Waiting in work gang %s: %d/%d finished sequence %d", 1.114 + name(), finished_workers(), total_workers(), 1.115 + _sequence_number); 1.116 + } 1.117 + monitor()->wait(/* no_safepoint_check */ true); 1.118 + } 1.119 + _task = NULL; 1.120 + if (TraceWorkGang) { 1.121 + tty->print_cr("/nFinished work gang %s: %d/%d sequence %d", 1.122 + name(), finished_workers(), total_workers(), 1.123 + _sequence_number); 1.124 + } 1.125 +} 1.126 + 1.127 +void AbstractWorkGang::stop() { 1.128 + // Tell all workers to terminate, then wait for them to become inactive. 1.129 + MutexLockerEx ml(monitor(), Mutex::_no_safepoint_check_flag); 1.130 + if (TraceWorkGang) { 1.131 + tty->print_cr("Stopping work gang %s task %s", name(), task()->name()); 1.132 + } 1.133 + _task = NULL; 1.134 + _terminate = true; 1.135 + monitor()->notify_all(); 1.136 + while (finished_workers() < total_workers()) { 1.137 + if (TraceWorkGang) { 1.138 + tty->print_cr("Waiting in work gang %s: %d/%d finished", 1.139 + name(), finished_workers(), total_workers()); 1.140 + } 1.141 + monitor()->wait(/* no_safepoint_check */ true); 1.142 + } 1.143 +} 1.144 + 1.145 +void AbstractWorkGang::internal_worker_poll(WorkData* data) const { 1.146 + assert(monitor()->owned_by_self(), "worker_poll is an internal method"); 1.147 + assert(data != NULL, "worker data is null"); 1.148 + data->set_terminate(terminate()); 1.149 + data->set_task(task()); 1.150 + data->set_sequence_number(sequence_number()); 1.151 +} 1.152 + 1.153 +void AbstractWorkGang::internal_note_start() { 1.154 + assert(monitor()->owned_by_self(), "note_finish is an internal method"); 1.155 + _started_workers += 1; 1.156 +} 1.157 + 1.158 +void AbstractWorkGang::internal_note_finish() { 1.159 + assert(monitor()->owned_by_self(), "note_finish is an internal method"); 1.160 + _finished_workers += 1; 1.161 +} 1.162 + 1.163 +void AbstractWorkGang::print_worker_threads_on(outputStream* st) const { 1.164 + uint num_thr = total_workers(); 1.165 + for (uint i = 0; i < num_thr; i++) { 1.166 + gang_worker(i)->print_on(st); 1.167 + st->cr(); 1.168 + } 1.169 +} 1.170 + 1.171 +void AbstractWorkGang::threads_do(ThreadClosure* tc) const { 1.172 + assert(tc != NULL, "Null ThreadClosure"); 1.173 + uint num_thr = total_workers(); 1.174 + for (uint i = 0; i < num_thr; i++) { 1.175 + tc->do_thread(gang_worker(i)); 1.176 + } 1.177 +} 1.178 + 1.179 +// GangWorker methods. 1.180 + 1.181 +GangWorker::GangWorker(AbstractWorkGang* gang, uint id) { 1.182 + _gang = gang; 1.183 + set_id(id); 1.184 + set_name("Gang worker#%d (%s)", id, gang->name()); 1.185 +} 1.186 + 1.187 +void GangWorker::run() { 1.188 + initialize(); 1.189 + loop(); 1.190 +} 1.191 + 1.192 +void GangWorker::initialize() { 1.193 + this->initialize_thread_local_storage(); 1.194 + assert(_gang != NULL, "No gang to run in"); 1.195 + os::set_priority(this, NearMaxPriority); 1.196 + if (TraceWorkGang) { 1.197 + tty->print_cr("Running gang worker for gang %s id %d", 1.198 + gang()->name(), id()); 1.199 + } 1.200 + // The VM thread should not execute here because MutexLocker's are used 1.201 + // as (opposed to MutexLockerEx's). 1.202 + assert(!Thread::current()->is_VM_thread(), "VM thread should not be part" 1.203 + " of a work gang"); 1.204 +} 1.205 + 1.206 +void GangWorker::loop() { 1.207 + int previous_sequence_number = 0; 1.208 + Monitor* gang_monitor = gang()->monitor(); 1.209 + for ( ; /* !terminate() */; ) { 1.210 + WorkData data; 1.211 + int part; // Initialized below. 1.212 + { 1.213 + // Grab the gang mutex. 1.214 + MutexLocker ml(gang_monitor); 1.215 + // Wait for something to do. 1.216 + // Polling outside the while { wait } avoids missed notifies 1.217 + // in the outer loop. 1.218 + gang()->internal_worker_poll(&data); 1.219 + if (TraceWorkGang) { 1.220 + tty->print("Polled outside for work in gang %s worker %d", 1.221 + gang()->name(), id()); 1.222 + tty->print(" terminate: %s", 1.223 + data.terminate() ? "true" : "false"); 1.224 + tty->print(" sequence: %d (prev: %d)", 1.225 + data.sequence_number(), previous_sequence_number); 1.226 + if (data.task() != NULL) { 1.227 + tty->print(" task: %s", data.task()->name()); 1.228 + } else { 1.229 + tty->print(" task: NULL"); 1.230 + } 1.231 + tty->cr(); 1.232 + } 1.233 + for ( ; /* break or return */; ) { 1.234 + // Terminate if requested. 1.235 + if (data.terminate()) { 1.236 + gang()->internal_note_finish(); 1.237 + gang_monitor->notify_all(); 1.238 + return; 1.239 + } 1.240 + // Check for new work. 1.241 + if ((data.task() != NULL) && 1.242 + (data.sequence_number() != previous_sequence_number)) { 1.243 + gang()->internal_note_start(); 1.244 + gang_monitor->notify_all(); 1.245 + part = gang()->started_workers() - 1; 1.246 + break; 1.247 + } 1.248 + // Nothing to do. 1.249 + gang_monitor->wait(/* no_safepoint_check */ true); 1.250 + gang()->internal_worker_poll(&data); 1.251 + if (TraceWorkGang) { 1.252 + tty->print("Polled inside for work in gang %s worker %d", 1.253 + gang()->name(), id()); 1.254 + tty->print(" terminate: %s", 1.255 + data.terminate() ? "true" : "false"); 1.256 + tty->print(" sequence: %d (prev: %d)", 1.257 + data.sequence_number(), previous_sequence_number); 1.258 + if (data.task() != NULL) { 1.259 + tty->print(" task: %s", data.task()->name()); 1.260 + } else { 1.261 + tty->print(" task: NULL"); 1.262 + } 1.263 + tty->cr(); 1.264 + } 1.265 + } 1.266 + // Drop gang mutex. 1.267 + } 1.268 + if (TraceWorkGang) { 1.269 + tty->print("Work for work gang %s id %d task %s part %d", 1.270 + gang()->name(), id(), data.task()->name(), part); 1.271 + } 1.272 + assert(data.task() != NULL, "Got null task"); 1.273 + data.task()->work(part); 1.274 + { 1.275 + if (TraceWorkGang) { 1.276 + tty->print("Finish for work gang %s id %d task %s part %d", 1.277 + gang()->name(), id(), data.task()->name(), part); 1.278 + } 1.279 + // Grab the gang mutex. 1.280 + MutexLocker ml(gang_monitor); 1.281 + gang()->internal_note_finish(); 1.282 + // Tell the gang you are done. 1.283 + gang_monitor->notify_all(); 1.284 + // Drop the gang mutex. 1.285 + } 1.286 + previous_sequence_number = data.sequence_number(); 1.287 + } 1.288 +} 1.289 + 1.290 +bool GangWorker::is_GC_task_thread() const { 1.291 + return gang()->are_GC_threads(); 1.292 +} 1.293 + 1.294 +void GangWorker::print_on(outputStream* st) const { 1.295 + st->print("\"%s\" ", name()); 1.296 + Thread::print_on(st); 1.297 + st->cr(); 1.298 +} 1.299 + 1.300 +// Printing methods 1.301 + 1.302 +const char* AbstractWorkGang::name() const { 1.303 + return _name; 1.304 +} 1.305 + 1.306 +#ifndef PRODUCT 1.307 + 1.308 +const char* AbstractGangTask::name() const { 1.309 + return _name; 1.310 +} 1.311 + 1.312 +#endif /* PRODUCT */ 1.313 + 1.314 +// *** WorkGangBarrierSync 1.315 + 1.316 +WorkGangBarrierSync::WorkGangBarrierSync() 1.317 + : _monitor(Mutex::safepoint, "work gang barrier sync", true), 1.318 + _n_workers(0), _n_completed(0) { 1.319 +} 1.320 + 1.321 +WorkGangBarrierSync::WorkGangBarrierSync(int n_workers, const char* name) 1.322 + : _monitor(Mutex::safepoint, name, true), 1.323 + _n_workers(n_workers), _n_completed(0) { 1.324 +} 1.325 + 1.326 +void WorkGangBarrierSync::set_n_workers(int n_workers) { 1.327 + _n_workers = n_workers; 1.328 + _n_completed = 0; 1.329 +} 1.330 + 1.331 +void WorkGangBarrierSync::enter() { 1.332 + MutexLockerEx x(monitor(), Mutex::_no_safepoint_check_flag); 1.333 + inc_completed(); 1.334 + if (n_completed() == n_workers()) { 1.335 + monitor()->notify_all(); 1.336 + } 1.337 + else { 1.338 + while (n_completed() != n_workers()) { 1.339 + monitor()->wait(/* no_safepoint_check */ true); 1.340 + } 1.341 + } 1.342 +} 1.343 + 1.344 +// SubTasksDone functions. 1.345 + 1.346 +SubTasksDone::SubTasksDone(int n) : 1.347 + _n_tasks(n), _n_threads(1), _tasks(NULL) { 1.348 + _tasks = NEW_C_HEAP_ARRAY(jint, n); 1.349 + guarantee(_tasks != NULL, "alloc failure"); 1.350 + clear(); 1.351 +} 1.352 + 1.353 +bool SubTasksDone::valid() { 1.354 + return _tasks != NULL; 1.355 +} 1.356 + 1.357 +void SubTasksDone::set_par_threads(int t) { 1.358 +#ifdef ASSERT 1.359 + assert(_claimed == 0 || _threads_completed == _n_threads, 1.360 + "should not be called while tasks are being processed!"); 1.361 +#endif 1.362 + _n_threads = (t == 0 ? 1 : t); 1.363 +} 1.364 + 1.365 +void SubTasksDone::clear() { 1.366 + for (int i = 0; i < _n_tasks; i++) { 1.367 + _tasks[i] = 0; 1.368 + } 1.369 + _threads_completed = 0; 1.370 +#ifdef ASSERT 1.371 + _claimed = 0; 1.372 +#endif 1.373 +} 1.374 + 1.375 +bool SubTasksDone::is_task_claimed(int t) { 1.376 + assert(0 <= t && t < _n_tasks, "bad task id."); 1.377 + jint old = _tasks[t]; 1.378 + if (old == 0) { 1.379 + old = Atomic::cmpxchg(1, &_tasks[t], 0); 1.380 + } 1.381 + assert(_tasks[t] == 1, "What else?"); 1.382 + bool res = old != 0; 1.383 +#ifdef ASSERT 1.384 + if (!res) { 1.385 + assert(_claimed < _n_tasks, "Too many tasks claimed; missing clear?"); 1.386 + Atomic::inc(&_claimed); 1.387 + } 1.388 +#endif 1.389 + return res; 1.390 +} 1.391 + 1.392 +void SubTasksDone::all_tasks_completed() { 1.393 + jint observed = _threads_completed; 1.394 + jint old; 1.395 + do { 1.396 + old = observed; 1.397 + observed = Atomic::cmpxchg(old+1, &_threads_completed, old); 1.398 + } while (observed != old); 1.399 + // If this was the last thread checking in, clear the tasks. 1.400 + if (observed+1 == _n_threads) clear(); 1.401 +} 1.402 + 1.403 + 1.404 +SubTasksDone::~SubTasksDone() { 1.405 + if (_tasks != NULL) FREE_C_HEAP_ARRAY(jint, _tasks); 1.406 +} 1.407 + 1.408 +// *** SequentialSubTasksDone 1.409 + 1.410 +void SequentialSubTasksDone::clear() { 1.411 + _n_tasks = _n_claimed = 0; 1.412 + _n_threads = _n_completed = 0; 1.413 +} 1.414 + 1.415 +bool SequentialSubTasksDone::valid() { 1.416 + return _n_threads > 0; 1.417 +} 1.418 + 1.419 +bool SequentialSubTasksDone::is_task_claimed(int& t) { 1.420 + jint* n_claimed_ptr = &_n_claimed; 1.421 + t = *n_claimed_ptr; 1.422 + while (t < _n_tasks) { 1.423 + jint res = Atomic::cmpxchg(t+1, n_claimed_ptr, t); 1.424 + if (res == t) { 1.425 + return false; 1.426 + } 1.427 + t = *n_claimed_ptr; 1.428 + } 1.429 + return true; 1.430 +} 1.431 + 1.432 +bool SequentialSubTasksDone::all_tasks_completed() { 1.433 + jint* n_completed_ptr = &_n_completed; 1.434 + jint complete = *n_completed_ptr; 1.435 + while (true) { 1.436 + jint res = Atomic::cmpxchg(complete+1, n_completed_ptr, complete); 1.437 + if (res == complete) { 1.438 + break; 1.439 + } 1.440 + complete = res; 1.441 + } 1.442 + if (complete+1 == _n_threads) { 1.443 + clear(); 1.444 + return true; 1.445 + } 1.446 + return false; 1.447 +}