src/share/vm/utilities/workgroup.cpp

Sat, 01 Dec 2007 00:00:00 +0000

author
duke
date
Sat, 01 Dec 2007 00:00:00 +0000
changeset 435
a61af66fc99e
child 777
37f87013dfd8
permissions
-rw-r--r--

Initial load

duke@435 1 /*
duke@435 2 * Copyright 2001-2007 Sun Microsystems, Inc. All Rights Reserved.
duke@435 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
duke@435 4 *
duke@435 5 * This code is free software; you can redistribute it and/or modify it
duke@435 6 * under the terms of the GNU General Public License version 2 only, as
duke@435 7 * published by the Free Software Foundation.
duke@435 8 *
duke@435 9 * This code is distributed in the hope that it will be useful, but WITHOUT
duke@435 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
duke@435 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
duke@435 12 * version 2 for more details (a copy is included in the LICENSE file that
duke@435 13 * accompanied this code).
duke@435 14 *
duke@435 15 * You should have received a copy of the GNU General Public License version
duke@435 16 * 2 along with this work; if not, write to the Free Software Foundation,
duke@435 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
duke@435 18 *
duke@435 19 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
duke@435 20 * CA 95054 USA or visit www.sun.com if you need additional information or
duke@435 21 * have any questions.
duke@435 22 *
duke@435 23 */
duke@435 24
duke@435 25 # include "incls/_precompiled.incl"
duke@435 26 # include "incls/_workgroup.cpp.incl"
duke@435 27
duke@435 28 // Definitions of WorkGang methods.
duke@435 29
duke@435 30 AbstractWorkGang::AbstractWorkGang(const char* name,
duke@435 31 bool are_GC_threads) :
duke@435 32 _name(name),
duke@435 33 _are_GC_threads(are_GC_threads) {
duke@435 34 // Other initialization.
duke@435 35 _monitor = new Monitor(/* priority */ Mutex::leaf,
duke@435 36 /* name */ "WorkGroup monitor",
duke@435 37 /* allow_vm_block */ are_GC_threads);
duke@435 38 assert(monitor() != NULL, "Failed to allocate monitor");
duke@435 39 _terminate = false;
duke@435 40 _task = NULL;
duke@435 41 _sequence_number = 0;
duke@435 42 _started_workers = 0;
duke@435 43 _finished_workers = 0;
duke@435 44 }
duke@435 45
duke@435 46 WorkGang::WorkGang(const char* name,
duke@435 47 int workers,
duke@435 48 bool are_GC_threads) :
duke@435 49 AbstractWorkGang(name, are_GC_threads) {
duke@435 50 // Save arguments.
duke@435 51 _total_workers = workers;
duke@435 52 if (TraceWorkGang) {
duke@435 53 tty->print_cr("Constructing work gang %s with %d threads", name, workers);
duke@435 54 }
duke@435 55 _gang_workers = NEW_C_HEAP_ARRAY(GangWorker*, workers);
duke@435 56 assert(gang_workers() != NULL, "Failed to allocate gang workers");
duke@435 57 for (int worker = 0; worker < total_workers(); worker += 1) {
duke@435 58 GangWorker* new_worker = new GangWorker(this, worker);
duke@435 59 assert(new_worker != NULL, "Failed to allocate GangWorker");
duke@435 60 _gang_workers[worker] = new_worker;
duke@435 61 if (new_worker == NULL || !os::create_thread(new_worker, os::pgc_thread))
duke@435 62 vm_exit_out_of_memory(0, "Cannot create worker GC thread. Out of system resources.");
duke@435 63 if (!DisableStartThread) {
duke@435 64 os::start_thread(new_worker);
duke@435 65 }
duke@435 66 }
duke@435 67 }
duke@435 68
duke@435 69 AbstractWorkGang::~AbstractWorkGang() {
duke@435 70 if (TraceWorkGang) {
duke@435 71 tty->print_cr("Destructing work gang %s", name());
duke@435 72 }
duke@435 73 stop(); // stop all the workers
duke@435 74 for (int worker = 0; worker < total_workers(); worker += 1) {
duke@435 75 delete gang_worker(worker);
duke@435 76 }
duke@435 77 delete gang_workers();
duke@435 78 delete monitor();
duke@435 79 }
duke@435 80
duke@435 81 GangWorker* AbstractWorkGang::gang_worker(int i) const {
duke@435 82 // Array index bounds checking.
duke@435 83 GangWorker* result = NULL;
duke@435 84 assert(gang_workers() != NULL, "No workers for indexing");
duke@435 85 assert(((i >= 0) && (i < total_workers())), "Worker index out of bounds");
duke@435 86 result = _gang_workers[i];
duke@435 87 assert(result != NULL, "Indexing to null worker");
duke@435 88 return result;
duke@435 89 }
duke@435 90
duke@435 91 void WorkGang::run_task(AbstractGangTask* task) {
duke@435 92 // This thread is executed by the VM thread which does not block
duke@435 93 // on ordinary MutexLocker's.
duke@435 94 MutexLockerEx ml(monitor(), Mutex::_no_safepoint_check_flag);
duke@435 95 if (TraceWorkGang) {
duke@435 96 tty->print_cr("Running work gang %s task %s", name(), task->name());
duke@435 97 }
duke@435 98 // Tell all the workers to run a task.
duke@435 99 assert(task != NULL, "Running a null task");
duke@435 100 // Initialize.
duke@435 101 _task = task;
duke@435 102 _sequence_number += 1;
duke@435 103 _started_workers = 0;
duke@435 104 _finished_workers = 0;
duke@435 105 // Tell the workers to get to work.
duke@435 106 monitor()->notify_all();
duke@435 107 // Wait for them to be finished
duke@435 108 while (finished_workers() < total_workers()) {
duke@435 109 if (TraceWorkGang) {
duke@435 110 tty->print_cr("Waiting in work gang %s: %d/%d finished sequence %d",
duke@435 111 name(), finished_workers(), total_workers(),
duke@435 112 _sequence_number);
duke@435 113 }
duke@435 114 monitor()->wait(/* no_safepoint_check */ true);
duke@435 115 }
duke@435 116 _task = NULL;
duke@435 117 if (TraceWorkGang) {
duke@435 118 tty->print_cr("/nFinished work gang %s: %d/%d sequence %d",
duke@435 119 name(), finished_workers(), total_workers(),
duke@435 120 _sequence_number);
duke@435 121 }
duke@435 122 }
duke@435 123
duke@435 124 void AbstractWorkGang::stop() {
duke@435 125 // Tell all workers to terminate, then wait for them to become inactive.
duke@435 126 MutexLockerEx ml(monitor(), Mutex::_no_safepoint_check_flag);
duke@435 127 if (TraceWorkGang) {
duke@435 128 tty->print_cr("Stopping work gang %s task %s", name(), task()->name());
duke@435 129 }
duke@435 130 _task = NULL;
duke@435 131 _terminate = true;
duke@435 132 monitor()->notify_all();
duke@435 133 while (finished_workers() < total_workers()) {
duke@435 134 if (TraceWorkGang) {
duke@435 135 tty->print_cr("Waiting in work gang %s: %d/%d finished",
duke@435 136 name(), finished_workers(), total_workers());
duke@435 137 }
duke@435 138 monitor()->wait(/* no_safepoint_check */ true);
duke@435 139 }
duke@435 140 }
duke@435 141
duke@435 142 void AbstractWorkGang::internal_worker_poll(WorkData* data) const {
duke@435 143 assert(monitor()->owned_by_self(), "worker_poll is an internal method");
duke@435 144 assert(data != NULL, "worker data is null");
duke@435 145 data->set_terminate(terminate());
duke@435 146 data->set_task(task());
duke@435 147 data->set_sequence_number(sequence_number());
duke@435 148 }
duke@435 149
duke@435 150 void AbstractWorkGang::internal_note_start() {
duke@435 151 assert(monitor()->owned_by_self(), "note_finish is an internal method");
duke@435 152 _started_workers += 1;
duke@435 153 }
duke@435 154
duke@435 155 void AbstractWorkGang::internal_note_finish() {
duke@435 156 assert(monitor()->owned_by_self(), "note_finish is an internal method");
duke@435 157 _finished_workers += 1;
duke@435 158 }
duke@435 159
duke@435 160 void AbstractWorkGang::print_worker_threads_on(outputStream* st) const {
duke@435 161 uint num_thr = total_workers();
duke@435 162 for (uint i = 0; i < num_thr; i++) {
duke@435 163 gang_worker(i)->print_on(st);
duke@435 164 st->cr();
duke@435 165 }
duke@435 166 }
duke@435 167
duke@435 168 void AbstractWorkGang::threads_do(ThreadClosure* tc) const {
duke@435 169 assert(tc != NULL, "Null ThreadClosure");
duke@435 170 uint num_thr = total_workers();
duke@435 171 for (uint i = 0; i < num_thr; i++) {
duke@435 172 tc->do_thread(gang_worker(i));
duke@435 173 }
duke@435 174 }
duke@435 175
duke@435 176 // GangWorker methods.
duke@435 177
duke@435 178 GangWorker::GangWorker(AbstractWorkGang* gang, uint id) {
duke@435 179 _gang = gang;
duke@435 180 set_id(id);
duke@435 181 set_name("Gang worker#%d (%s)", id, gang->name());
duke@435 182 }
duke@435 183
duke@435 184 void GangWorker::run() {
duke@435 185 initialize();
duke@435 186 loop();
duke@435 187 }
duke@435 188
duke@435 189 void GangWorker::initialize() {
duke@435 190 this->initialize_thread_local_storage();
duke@435 191 assert(_gang != NULL, "No gang to run in");
duke@435 192 os::set_priority(this, NearMaxPriority);
duke@435 193 if (TraceWorkGang) {
duke@435 194 tty->print_cr("Running gang worker for gang %s id %d",
duke@435 195 gang()->name(), id());
duke@435 196 }
duke@435 197 // The VM thread should not execute here because MutexLocker's are used
duke@435 198 // as (opposed to MutexLockerEx's).
duke@435 199 assert(!Thread::current()->is_VM_thread(), "VM thread should not be part"
duke@435 200 " of a work gang");
duke@435 201 }
duke@435 202
duke@435 203 void GangWorker::loop() {
duke@435 204 int previous_sequence_number = 0;
duke@435 205 Monitor* gang_monitor = gang()->monitor();
duke@435 206 for ( ; /* !terminate() */; ) {
duke@435 207 WorkData data;
duke@435 208 int part; // Initialized below.
duke@435 209 {
duke@435 210 // Grab the gang mutex.
duke@435 211 MutexLocker ml(gang_monitor);
duke@435 212 // Wait for something to do.
duke@435 213 // Polling outside the while { wait } avoids missed notifies
duke@435 214 // in the outer loop.
duke@435 215 gang()->internal_worker_poll(&data);
duke@435 216 if (TraceWorkGang) {
duke@435 217 tty->print("Polled outside for work in gang %s worker %d",
duke@435 218 gang()->name(), id());
duke@435 219 tty->print(" terminate: %s",
duke@435 220 data.terminate() ? "true" : "false");
duke@435 221 tty->print(" sequence: %d (prev: %d)",
duke@435 222 data.sequence_number(), previous_sequence_number);
duke@435 223 if (data.task() != NULL) {
duke@435 224 tty->print(" task: %s", data.task()->name());
duke@435 225 } else {
duke@435 226 tty->print(" task: NULL");
duke@435 227 }
duke@435 228 tty->cr();
duke@435 229 }
duke@435 230 for ( ; /* break or return */; ) {
duke@435 231 // Terminate if requested.
duke@435 232 if (data.terminate()) {
duke@435 233 gang()->internal_note_finish();
duke@435 234 gang_monitor->notify_all();
duke@435 235 return;
duke@435 236 }
duke@435 237 // Check for new work.
duke@435 238 if ((data.task() != NULL) &&
duke@435 239 (data.sequence_number() != previous_sequence_number)) {
duke@435 240 gang()->internal_note_start();
duke@435 241 gang_monitor->notify_all();
duke@435 242 part = gang()->started_workers() - 1;
duke@435 243 break;
duke@435 244 }
duke@435 245 // Nothing to do.
duke@435 246 gang_monitor->wait(/* no_safepoint_check */ true);
duke@435 247 gang()->internal_worker_poll(&data);
duke@435 248 if (TraceWorkGang) {
duke@435 249 tty->print("Polled inside for work in gang %s worker %d",
duke@435 250 gang()->name(), id());
duke@435 251 tty->print(" terminate: %s",
duke@435 252 data.terminate() ? "true" : "false");
duke@435 253 tty->print(" sequence: %d (prev: %d)",
duke@435 254 data.sequence_number(), previous_sequence_number);
duke@435 255 if (data.task() != NULL) {
duke@435 256 tty->print(" task: %s", data.task()->name());
duke@435 257 } else {
duke@435 258 tty->print(" task: NULL");
duke@435 259 }
duke@435 260 tty->cr();
duke@435 261 }
duke@435 262 }
duke@435 263 // Drop gang mutex.
duke@435 264 }
duke@435 265 if (TraceWorkGang) {
duke@435 266 tty->print("Work for work gang %s id %d task %s part %d",
duke@435 267 gang()->name(), id(), data.task()->name(), part);
duke@435 268 }
duke@435 269 assert(data.task() != NULL, "Got null task");
duke@435 270 data.task()->work(part);
duke@435 271 {
duke@435 272 if (TraceWorkGang) {
duke@435 273 tty->print("Finish for work gang %s id %d task %s part %d",
duke@435 274 gang()->name(), id(), data.task()->name(), part);
duke@435 275 }
duke@435 276 // Grab the gang mutex.
duke@435 277 MutexLocker ml(gang_monitor);
duke@435 278 gang()->internal_note_finish();
duke@435 279 // Tell the gang you are done.
duke@435 280 gang_monitor->notify_all();
duke@435 281 // Drop the gang mutex.
duke@435 282 }
duke@435 283 previous_sequence_number = data.sequence_number();
duke@435 284 }
duke@435 285 }
duke@435 286
duke@435 287 bool GangWorker::is_GC_task_thread() const {
duke@435 288 return gang()->are_GC_threads();
duke@435 289 }
duke@435 290
duke@435 291 void GangWorker::print_on(outputStream* st) const {
duke@435 292 st->print("\"%s\" ", name());
duke@435 293 Thread::print_on(st);
duke@435 294 st->cr();
duke@435 295 }
duke@435 296
duke@435 297 // Printing methods
duke@435 298
duke@435 299 const char* AbstractWorkGang::name() const {
duke@435 300 return _name;
duke@435 301 }
duke@435 302
duke@435 303 #ifndef PRODUCT
duke@435 304
duke@435 305 const char* AbstractGangTask::name() const {
duke@435 306 return _name;
duke@435 307 }
duke@435 308
duke@435 309 #endif /* PRODUCT */
duke@435 310
duke@435 311 // *** WorkGangBarrierSync
duke@435 312
duke@435 313 WorkGangBarrierSync::WorkGangBarrierSync()
duke@435 314 : _monitor(Mutex::safepoint, "work gang barrier sync", true),
duke@435 315 _n_workers(0), _n_completed(0) {
duke@435 316 }
duke@435 317
duke@435 318 WorkGangBarrierSync::WorkGangBarrierSync(int n_workers, const char* name)
duke@435 319 : _monitor(Mutex::safepoint, name, true),
duke@435 320 _n_workers(n_workers), _n_completed(0) {
duke@435 321 }
duke@435 322
duke@435 323 void WorkGangBarrierSync::set_n_workers(int n_workers) {
duke@435 324 _n_workers = n_workers;
duke@435 325 _n_completed = 0;
duke@435 326 }
duke@435 327
duke@435 328 void WorkGangBarrierSync::enter() {
duke@435 329 MutexLockerEx x(monitor(), Mutex::_no_safepoint_check_flag);
duke@435 330 inc_completed();
duke@435 331 if (n_completed() == n_workers()) {
duke@435 332 monitor()->notify_all();
duke@435 333 }
duke@435 334 else {
duke@435 335 while (n_completed() != n_workers()) {
duke@435 336 monitor()->wait(/* no_safepoint_check */ true);
duke@435 337 }
duke@435 338 }
duke@435 339 }
duke@435 340
duke@435 341 // SubTasksDone functions.
duke@435 342
duke@435 343 SubTasksDone::SubTasksDone(int n) :
duke@435 344 _n_tasks(n), _n_threads(1), _tasks(NULL) {
duke@435 345 _tasks = NEW_C_HEAP_ARRAY(jint, n);
duke@435 346 guarantee(_tasks != NULL, "alloc failure");
duke@435 347 clear();
duke@435 348 }
duke@435 349
duke@435 350 bool SubTasksDone::valid() {
duke@435 351 return _tasks != NULL;
duke@435 352 }
duke@435 353
duke@435 354 void SubTasksDone::set_par_threads(int t) {
duke@435 355 #ifdef ASSERT
duke@435 356 assert(_claimed == 0 || _threads_completed == _n_threads,
duke@435 357 "should not be called while tasks are being processed!");
duke@435 358 #endif
duke@435 359 _n_threads = (t == 0 ? 1 : t);
duke@435 360 }
duke@435 361
duke@435 362 void SubTasksDone::clear() {
duke@435 363 for (int i = 0; i < _n_tasks; i++) {
duke@435 364 _tasks[i] = 0;
duke@435 365 }
duke@435 366 _threads_completed = 0;
duke@435 367 #ifdef ASSERT
duke@435 368 _claimed = 0;
duke@435 369 #endif
duke@435 370 }
duke@435 371
duke@435 372 bool SubTasksDone::is_task_claimed(int t) {
duke@435 373 assert(0 <= t && t < _n_tasks, "bad task id.");
duke@435 374 jint old = _tasks[t];
duke@435 375 if (old == 0) {
duke@435 376 old = Atomic::cmpxchg(1, &_tasks[t], 0);
duke@435 377 }
duke@435 378 assert(_tasks[t] == 1, "What else?");
duke@435 379 bool res = old != 0;
duke@435 380 #ifdef ASSERT
duke@435 381 if (!res) {
duke@435 382 assert(_claimed < _n_tasks, "Too many tasks claimed; missing clear?");
duke@435 383 Atomic::inc(&_claimed);
duke@435 384 }
duke@435 385 #endif
duke@435 386 return res;
duke@435 387 }
duke@435 388
duke@435 389 void SubTasksDone::all_tasks_completed() {
duke@435 390 jint observed = _threads_completed;
duke@435 391 jint old;
duke@435 392 do {
duke@435 393 old = observed;
duke@435 394 observed = Atomic::cmpxchg(old+1, &_threads_completed, old);
duke@435 395 } while (observed != old);
duke@435 396 // If this was the last thread checking in, clear the tasks.
duke@435 397 if (observed+1 == _n_threads) clear();
duke@435 398 }
duke@435 399
duke@435 400
duke@435 401 SubTasksDone::~SubTasksDone() {
duke@435 402 if (_tasks != NULL) FREE_C_HEAP_ARRAY(jint, _tasks);
duke@435 403 }
duke@435 404
duke@435 405 // *** SequentialSubTasksDone
duke@435 406
duke@435 407 void SequentialSubTasksDone::clear() {
duke@435 408 _n_tasks = _n_claimed = 0;
duke@435 409 _n_threads = _n_completed = 0;
duke@435 410 }
duke@435 411
duke@435 412 bool SequentialSubTasksDone::valid() {
duke@435 413 return _n_threads > 0;
duke@435 414 }
duke@435 415
duke@435 416 bool SequentialSubTasksDone::is_task_claimed(int& t) {
duke@435 417 jint* n_claimed_ptr = &_n_claimed;
duke@435 418 t = *n_claimed_ptr;
duke@435 419 while (t < _n_tasks) {
duke@435 420 jint res = Atomic::cmpxchg(t+1, n_claimed_ptr, t);
duke@435 421 if (res == t) {
duke@435 422 return false;
duke@435 423 }
duke@435 424 t = *n_claimed_ptr;
duke@435 425 }
duke@435 426 return true;
duke@435 427 }
duke@435 428
duke@435 429 bool SequentialSubTasksDone::all_tasks_completed() {
duke@435 430 jint* n_completed_ptr = &_n_completed;
duke@435 431 jint complete = *n_completed_ptr;
duke@435 432 while (true) {
duke@435 433 jint res = Atomic::cmpxchg(complete+1, n_completed_ptr, complete);
duke@435 434 if (res == complete) {
duke@435 435 break;
duke@435 436 }
duke@435 437 complete = res;
duke@435 438 }
duke@435 439 if (complete+1 == _n_threads) {
duke@435 440 clear();
duke@435 441 return true;
duke@435 442 }
duke@435 443 return false;
duke@435 444 }

mercurial