src/share/vm/utilities/workgroup.cpp

changeset 0
f90c822e73f8
child 6876
710a3c8b516e
equal deleted inserted replaced
-1:000000000000 0:f90c822e73f8
1 /*
2 * Copyright (c) 2001, 2014, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This code is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12 * version 2 for more details (a copy is included in the LICENSE file that
13 * accompanied this code).
14 *
15 * You should have received a copy of the GNU General Public License version
16 * 2 along with this work; if not, write to the Free Software Foundation,
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18 *
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20 * or visit www.oracle.com if you need additional information or have any
21 * questions.
22 *
23 */
24
25 #include "precompiled.hpp"
26 #include "memory/allocation.hpp"
27 #include "memory/allocation.inline.hpp"
28 #include "runtime/os.hpp"
29 #include "utilities/workgroup.hpp"
30
31 PRAGMA_FORMAT_MUTE_WARNINGS_FOR_GCC
32
33 // Definitions of WorkGang methods.
34
35 AbstractWorkGang::AbstractWorkGang(const char* name,
36 bool are_GC_task_threads,
37 bool are_ConcurrentGC_threads) :
38 _name(name),
39 _are_GC_task_threads(are_GC_task_threads),
40 _are_ConcurrentGC_threads(are_ConcurrentGC_threads) {
41
42 assert(!(are_GC_task_threads && are_ConcurrentGC_threads),
43 "They cannot both be STW GC and Concurrent threads" );
44
45 // Other initialization.
46 _monitor = new Monitor(/* priority */ Mutex::leaf,
47 /* name */ "WorkGroup monitor",
48 /* allow_vm_block */ are_GC_task_threads);
49 assert(monitor() != NULL, "Failed to allocate monitor");
50 _terminate = false;
51 _task = NULL;
52 _sequence_number = 0;
53 _started_workers = 0;
54 _finished_workers = 0;
55 }
56
57 WorkGang::WorkGang(const char* name,
58 uint workers,
59 bool are_GC_task_threads,
60 bool are_ConcurrentGC_threads) :
61 AbstractWorkGang(name, are_GC_task_threads, are_ConcurrentGC_threads) {
62 _total_workers = workers;
63 }
64
65 GangWorker* WorkGang::allocate_worker(uint which) {
66 GangWorker* new_worker = new GangWorker(this, which);
67 return new_worker;
68 }
69
70 // The current implementation will exit if the allocation
71 // of any worker fails. Still, return a boolean so that
72 // a future implementation can possibly do a partial
73 // initialization of the workers and report such to the
74 // caller.
75 bool WorkGang::initialize_workers() {
76
77 if (TraceWorkGang) {
78 tty->print_cr("Constructing work gang %s with %d threads",
79 name(),
80 total_workers());
81 }
82 _gang_workers = NEW_C_HEAP_ARRAY(GangWorker*, total_workers(), mtInternal);
83 if (gang_workers() == NULL) {
84 vm_exit_out_of_memory(0, OOM_MALLOC_ERROR, "Cannot create GangWorker array.");
85 return false;
86 }
87 os::ThreadType worker_type;
88 if (are_ConcurrentGC_threads()) {
89 worker_type = os::cgc_thread;
90 } else {
91 worker_type = os::pgc_thread;
92 }
93 for (uint worker = 0; worker < total_workers(); worker += 1) {
94 GangWorker* new_worker = allocate_worker(worker);
95 assert(new_worker != NULL, "Failed to allocate GangWorker");
96 _gang_workers[worker] = new_worker;
97 if (new_worker == NULL || !os::create_thread(new_worker, worker_type)) {
98 vm_exit_out_of_memory(0, OOM_MALLOC_ERROR,
99 "Cannot create worker GC thread. Out of system resources.");
100 return false;
101 }
102 if (!DisableStartThread) {
103 os::start_thread(new_worker);
104 }
105 }
106 return true;
107 }
108
109 AbstractWorkGang::~AbstractWorkGang() {
110 if (TraceWorkGang) {
111 tty->print_cr("Destructing work gang %s", name());
112 }
113 stop(); // stop all the workers
114 for (uint worker = 0; worker < total_workers(); worker += 1) {
115 delete gang_worker(worker);
116 }
117 delete gang_workers();
118 delete monitor();
119 }
120
121 GangWorker* AbstractWorkGang::gang_worker(uint i) const {
122 // Array index bounds checking.
123 GangWorker* result = NULL;
124 assert(gang_workers() != NULL, "No workers for indexing");
125 assert(((i >= 0) && (i < total_workers())), "Worker index out of bounds");
126 result = _gang_workers[i];
127 assert(result != NULL, "Indexing to null worker");
128 return result;
129 }
130
131 void WorkGang::run_task(AbstractGangTask* task) {
132 run_task(task, total_workers());
133 }
134
135 void WorkGang::run_task(AbstractGangTask* task, uint no_of_parallel_workers) {
136 task->set_for_termination(no_of_parallel_workers);
137
138 // This thread is executed by the VM thread which does not block
139 // on ordinary MutexLocker's.
140 MutexLockerEx ml(monitor(), Mutex::_no_safepoint_check_flag);
141 if (TraceWorkGang) {
142 tty->print_cr("Running work gang %s task %s", name(), task->name());
143 }
144 // Tell all the workers to run a task.
145 assert(task != NULL, "Running a null task");
146 // Initialize.
147 _task = task;
148 _sequence_number += 1;
149 _started_workers = 0;
150 _finished_workers = 0;
151 // Tell the workers to get to work.
152 monitor()->notify_all();
153 // Wait for them to be finished
154 while (finished_workers() < no_of_parallel_workers) {
155 if (TraceWorkGang) {
156 tty->print_cr("Waiting in work gang %s: %d/%d finished sequence %d",
157 name(), finished_workers(), no_of_parallel_workers,
158 _sequence_number);
159 }
160 monitor()->wait(/* no_safepoint_check */ true);
161 }
162 _task = NULL;
163 if (TraceWorkGang) {
164 tty->print_cr("\nFinished work gang %s: %d/%d sequence %d",
165 name(), finished_workers(), no_of_parallel_workers,
166 _sequence_number);
167 Thread* me = Thread::current();
168 tty->print_cr(" T: 0x%x VM_thread: %d", me, me->is_VM_thread());
169 }
170 }
171
172 void FlexibleWorkGang::run_task(AbstractGangTask* task) {
173 // If active_workers() is passed, _finished_workers
174 // must only be incremented for workers that find non_null
175 // work (as opposed to all those that just check that the
176 // task is not null).
177 WorkGang::run_task(task, (uint) active_workers());
178 }
179
180 void AbstractWorkGang::stop() {
181 // Tell all workers to terminate, then wait for them to become inactive.
182 MutexLockerEx ml(monitor(), Mutex::_no_safepoint_check_flag);
183 if (TraceWorkGang) {
184 tty->print_cr("Stopping work gang %s task %s", name(), task()->name());
185 }
186 _task = NULL;
187 _terminate = true;
188 monitor()->notify_all();
189 while (finished_workers() < active_workers()) {
190 if (TraceWorkGang) {
191 tty->print_cr("Waiting in work gang %s: %d/%d finished",
192 name(), finished_workers(), active_workers());
193 }
194 monitor()->wait(/* no_safepoint_check */ true);
195 }
196 }
197
198 void AbstractWorkGang::internal_worker_poll(WorkData* data) const {
199 assert(monitor()->owned_by_self(), "worker_poll is an internal method");
200 assert(data != NULL, "worker data is null");
201 data->set_terminate(terminate());
202 data->set_task(task());
203 data->set_sequence_number(sequence_number());
204 }
205
206 void AbstractWorkGang::internal_note_start() {
207 assert(monitor()->owned_by_self(), "note_finish is an internal method");
208 _started_workers += 1;
209 }
210
211 void AbstractWorkGang::internal_note_finish() {
212 assert(monitor()->owned_by_self(), "note_finish is an internal method");
213 _finished_workers += 1;
214 }
215
216 void AbstractWorkGang::print_worker_threads_on(outputStream* st) const {
217 uint num_thr = total_workers();
218 for (uint i = 0; i < num_thr; i++) {
219 gang_worker(i)->print_on(st);
220 st->cr();
221 }
222 }
223
224 void AbstractWorkGang::threads_do(ThreadClosure* tc) const {
225 assert(tc != NULL, "Null ThreadClosure");
226 uint num_thr = total_workers();
227 for (uint i = 0; i < num_thr; i++) {
228 tc->do_thread(gang_worker(i));
229 }
230 }
231
232 // GangWorker methods.
233
234 GangWorker::GangWorker(AbstractWorkGang* gang, uint id) {
235 _gang = gang;
236 set_id(id);
237 set_name("Gang worker#%d (%s)", id, gang->name());
238 }
239
240 void GangWorker::run() {
241 initialize();
242 loop();
243 }
244
245 void GangWorker::initialize() {
246 this->initialize_thread_local_storage();
247 this->record_stack_base_and_size();
248 assert(_gang != NULL, "No gang to run in");
249 os::set_priority(this, NearMaxPriority);
250 if (TraceWorkGang) {
251 tty->print_cr("Running gang worker for gang %s id %d",
252 gang()->name(), id());
253 }
254 // The VM thread should not execute here because MutexLocker's are used
255 // as (opposed to MutexLockerEx's).
256 assert(!Thread::current()->is_VM_thread(), "VM thread should not be part"
257 " of a work gang");
258 }
259
260 void GangWorker::loop() {
261 int previous_sequence_number = 0;
262 Monitor* gang_monitor = gang()->monitor();
263 for ( ; /* !terminate() */; ) {
264 WorkData data;
265 int part; // Initialized below.
266 {
267 // Grab the gang mutex.
268 MutexLocker ml(gang_monitor);
269 // Wait for something to do.
270 // Polling outside the while { wait } avoids missed notifies
271 // in the outer loop.
272 gang()->internal_worker_poll(&data);
273 if (TraceWorkGang) {
274 tty->print("Polled outside for work in gang %s worker %d",
275 gang()->name(), id());
276 tty->print(" terminate: %s",
277 data.terminate() ? "true" : "false");
278 tty->print(" sequence: %d (prev: %d)",
279 data.sequence_number(), previous_sequence_number);
280 if (data.task() != NULL) {
281 tty->print(" task: %s", data.task()->name());
282 } else {
283 tty->print(" task: NULL");
284 }
285 tty->cr();
286 }
287 for ( ; /* break or return */; ) {
288 // Terminate if requested.
289 if (data.terminate()) {
290 gang()->internal_note_finish();
291 gang_monitor->notify_all();
292 return;
293 }
294 // Check for new work.
295 if ((data.task() != NULL) &&
296 (data.sequence_number() != previous_sequence_number)) {
297 if (gang()->needs_more_workers()) {
298 gang()->internal_note_start();
299 gang_monitor->notify_all();
300 part = gang()->started_workers() - 1;
301 break;
302 }
303 }
304 // Nothing to do.
305 gang_monitor->wait(/* no_safepoint_check */ true);
306 gang()->internal_worker_poll(&data);
307 if (TraceWorkGang) {
308 tty->print("Polled inside for work in gang %s worker %d",
309 gang()->name(), id());
310 tty->print(" terminate: %s",
311 data.terminate() ? "true" : "false");
312 tty->print(" sequence: %d (prev: %d)",
313 data.sequence_number(), previous_sequence_number);
314 if (data.task() != NULL) {
315 tty->print(" task: %s", data.task()->name());
316 } else {
317 tty->print(" task: NULL");
318 }
319 tty->cr();
320 }
321 }
322 // Drop gang mutex.
323 }
324 if (TraceWorkGang) {
325 tty->print("Work for work gang %s id %d task %s part %d",
326 gang()->name(), id(), data.task()->name(), part);
327 }
328 assert(data.task() != NULL, "Got null task");
329 data.task()->work(part);
330 {
331 if (TraceWorkGang) {
332 tty->print("Finish for work gang %s id %d task %s part %d",
333 gang()->name(), id(), data.task()->name(), part);
334 }
335 // Grab the gang mutex.
336 MutexLocker ml(gang_monitor);
337 gang()->internal_note_finish();
338 // Tell the gang you are done.
339 gang_monitor->notify_all();
340 // Drop the gang mutex.
341 }
342 previous_sequence_number = data.sequence_number();
343 }
344 }
345
346 bool GangWorker::is_GC_task_thread() const {
347 return gang()->are_GC_task_threads();
348 }
349
350 bool GangWorker::is_ConcurrentGC_thread() const {
351 return gang()->are_ConcurrentGC_threads();
352 }
353
354 void GangWorker::print_on(outputStream* st) const {
355 st->print("\"%s\" ", name());
356 Thread::print_on(st);
357 st->cr();
358 }
359
360 // Printing methods
361
362 const char* AbstractWorkGang::name() const {
363 return _name;
364 }
365
366 #ifndef PRODUCT
367
368 const char* AbstractGangTask::name() const {
369 return _name;
370 }
371
372 #endif /* PRODUCT */
373
374 // FlexibleWorkGang
375
376
377 // *** WorkGangBarrierSync
378
379 WorkGangBarrierSync::WorkGangBarrierSync()
380 : _monitor(Mutex::safepoint, "work gang barrier sync", true),
381 _n_workers(0), _n_completed(0), _should_reset(false), _aborted(false) {
382 }
383
384 WorkGangBarrierSync::WorkGangBarrierSync(uint n_workers, const char* name)
385 : _monitor(Mutex::safepoint, name, true),
386 _n_workers(n_workers), _n_completed(0), _should_reset(false), _aborted(false) {
387 }
388
389 void WorkGangBarrierSync::set_n_workers(uint n_workers) {
390 _n_workers = n_workers;
391 _n_completed = 0;
392 _should_reset = false;
393 _aborted = false;
394 }
395
396 bool WorkGangBarrierSync::enter() {
397 MutexLockerEx x(monitor(), Mutex::_no_safepoint_check_flag);
398 if (should_reset()) {
399 // The should_reset() was set and we are the first worker to enter
400 // the sync barrier. We will zero the n_completed() count which
401 // effectively resets the barrier.
402 zero_completed();
403 set_should_reset(false);
404 }
405 inc_completed();
406 if (n_completed() == n_workers()) {
407 // At this point we would like to reset the barrier to be ready in
408 // case it is used again. However, we cannot set n_completed() to
409 // 0, even after the notify_all(), given that some other workers
410 // might still be waiting for n_completed() to become ==
411 // n_workers(). So, if we set n_completed() to 0, those workers
412 // will get stuck (as they will wake up, see that n_completed() !=
413 // n_workers() and go back to sleep). Instead, we raise the
414 // should_reset() flag and the barrier will be reset the first
415 // time a worker enters it again.
416 set_should_reset(true);
417 monitor()->notify_all();
418 } else {
419 while (n_completed() != n_workers() && !aborted()) {
420 monitor()->wait(/* no_safepoint_check */ true);
421 }
422 }
423 return !aborted();
424 }
425
426 void WorkGangBarrierSync::abort() {
427 MutexLockerEx x(monitor(), Mutex::_no_safepoint_check_flag);
428 set_aborted();
429 monitor()->notify_all();
430 }
431
432 // SubTasksDone functions.
433
434 SubTasksDone::SubTasksDone(uint n) :
435 _n_tasks(n), _n_threads(1), _tasks(NULL) {
436 _tasks = NEW_C_HEAP_ARRAY(uint, n, mtInternal);
437 guarantee(_tasks != NULL, "alloc failure");
438 clear();
439 }
440
441 bool SubTasksDone::valid() {
442 return _tasks != NULL;
443 }
444
445 void SubTasksDone::set_n_threads(uint t) {
446 assert(_claimed == 0 || _threads_completed == _n_threads,
447 "should not be called while tasks are being processed!");
448 _n_threads = (t == 0 ? 1 : t);
449 }
450
451 void SubTasksDone::clear() {
452 for (uint i = 0; i < _n_tasks; i++) {
453 _tasks[i] = 0;
454 }
455 _threads_completed = 0;
456 #ifdef ASSERT
457 _claimed = 0;
458 #endif
459 }
460
461 bool SubTasksDone::is_task_claimed(uint t) {
462 assert(0 <= t && t < _n_tasks, "bad task id.");
463 uint old = _tasks[t];
464 if (old == 0) {
465 old = Atomic::cmpxchg(1, &_tasks[t], 0);
466 }
467 assert(_tasks[t] == 1, "What else?");
468 bool res = old != 0;
469 #ifdef ASSERT
470 if (!res) {
471 assert(_claimed < _n_tasks, "Too many tasks claimed; missing clear?");
472 Atomic::inc((volatile jint*) &_claimed);
473 }
474 #endif
475 return res;
476 }
477
478 void SubTasksDone::all_tasks_completed() {
479 jint observed = _threads_completed;
480 jint old;
481 do {
482 old = observed;
483 observed = Atomic::cmpxchg(old+1, &_threads_completed, old);
484 } while (observed != old);
485 // If this was the last thread checking in, clear the tasks.
486 if (observed+1 == (jint)_n_threads) clear();
487 }
488
489
490 SubTasksDone::~SubTasksDone() {
491 if (_tasks != NULL) FREE_C_HEAP_ARRAY(jint, _tasks, mtInternal);
492 }
493
494 // *** SequentialSubTasksDone
495
496 void SequentialSubTasksDone::clear() {
497 _n_tasks = _n_claimed = 0;
498 _n_threads = _n_completed = 0;
499 }
500
501 bool SequentialSubTasksDone::valid() {
502 return _n_threads > 0;
503 }
504
505 bool SequentialSubTasksDone::is_task_claimed(uint& t) {
506 uint* n_claimed_ptr = &_n_claimed;
507 t = *n_claimed_ptr;
508 while (t < _n_tasks) {
509 jint res = Atomic::cmpxchg(t+1, n_claimed_ptr, t);
510 if (res == (jint)t) {
511 return false;
512 }
513 t = *n_claimed_ptr;
514 }
515 return true;
516 }
517
518 bool SequentialSubTasksDone::all_tasks_completed() {
519 uint* n_completed_ptr = &_n_completed;
520 uint complete = *n_completed_ptr;
521 while (true) {
522 uint res = Atomic::cmpxchg(complete+1, n_completed_ptr, complete);
523 if (res == complete) {
524 break;
525 }
526 complete = res;
527 }
528 if (complete+1 == _n_threads) {
529 clear();
530 return true;
531 }
532 return false;
533 }
534
535 bool FreeIdSet::_stat_init = false;
536 FreeIdSet* FreeIdSet::_sets[NSets];
537 bool FreeIdSet::_safepoint;
538
539 FreeIdSet::FreeIdSet(int sz, Monitor* mon) :
540 _sz(sz), _mon(mon), _hd(0), _waiters(0), _index(-1), _claimed(0)
541 {
542 _ids = NEW_C_HEAP_ARRAY(int, sz, mtInternal);
543 for (int i = 0; i < sz; i++) _ids[i] = i+1;
544 _ids[sz-1] = end_of_list; // end of list.
545 if (_stat_init) {
546 for (int j = 0; j < NSets; j++) _sets[j] = NULL;
547 _stat_init = true;
548 }
549 // Add to sets. (This should happen while the system is still single-threaded.)
550 for (int j = 0; j < NSets; j++) {
551 if (_sets[j] == NULL) {
552 _sets[j] = this;
553 _index = j;
554 break;
555 }
556 }
557 guarantee(_index != -1, "Too many FreeIdSets in use!");
558 }
559
560 FreeIdSet::~FreeIdSet() {
561 _sets[_index] = NULL;
562 FREE_C_HEAP_ARRAY(int, _ids, mtInternal);
563 }
564
565 void FreeIdSet::set_safepoint(bool b) {
566 _safepoint = b;
567 if (b) {
568 for (int j = 0; j < NSets; j++) {
569 if (_sets[j] != NULL && _sets[j]->_waiters > 0) {
570 Monitor* mon = _sets[j]->_mon;
571 mon->lock_without_safepoint_check();
572 mon->notify_all();
573 mon->unlock();
574 }
575 }
576 }
577 }
578
579 #define FID_STATS 0
580
581 int FreeIdSet::claim_par_id() {
582 #if FID_STATS
583 thread_t tslf = thr_self();
584 tty->print("claim_par_id[%d]: sz = %d, claimed = %d\n", tslf, _sz, _claimed);
585 #endif
586 MutexLockerEx x(_mon, Mutex::_no_safepoint_check_flag);
587 while (!_safepoint && _hd == end_of_list) {
588 _waiters++;
589 #if FID_STATS
590 if (_waiters > 5) {
591 tty->print("claim_par_id waiting[%d]: %d waiters, %d claimed.\n",
592 tslf, _waiters, _claimed);
593 }
594 #endif
595 _mon->wait(Mutex::_no_safepoint_check_flag);
596 _waiters--;
597 }
598 if (_hd == end_of_list) {
599 #if FID_STATS
600 tty->print("claim_par_id[%d]: returning EOL.\n", tslf);
601 #endif
602 return -1;
603 } else {
604 int res = _hd;
605 _hd = _ids[res];
606 _ids[res] = claimed; // For debugging.
607 _claimed++;
608 #if FID_STATS
609 tty->print("claim_par_id[%d]: returning %d, claimed = %d.\n",
610 tslf, res, _claimed);
611 #endif
612 return res;
613 }
614 }
615
616 bool FreeIdSet::claim_perm_id(int i) {
617 assert(0 <= i && i < _sz, "Out of range.");
618 MutexLockerEx x(_mon, Mutex::_no_safepoint_check_flag);
619 int prev = end_of_list;
620 int cur = _hd;
621 while (cur != end_of_list) {
622 if (cur == i) {
623 if (prev == end_of_list) {
624 _hd = _ids[cur];
625 } else {
626 _ids[prev] = _ids[cur];
627 }
628 _ids[cur] = claimed;
629 _claimed++;
630 return true;
631 } else {
632 prev = cur;
633 cur = _ids[cur];
634 }
635 }
636 return false;
637
638 }
639
640 void FreeIdSet::release_par_id(int id) {
641 MutexLockerEx x(_mon, Mutex::_no_safepoint_check_flag);
642 assert(_ids[id] == claimed, "Precondition.");
643 _ids[id] = _hd;
644 _hd = id;
645 _claimed--;
646 #if FID_STATS
647 tty->print("[%d] release_par_id(%d), waiters =%d, claimed = %d.\n",
648 thr_self(), id, _waiters, _claimed);
649 #endif
650 if (_waiters > 0)
651 // Notify all would be safer, but this is OK, right?
652 _mon->notify_all();
653 }

mercurial