Thu, 31 Aug 2017 18:10:36 +0800
merge
aoqi@0 | 1 | /* |
aoqi@0 | 2 | * Copyright (c) 2001, 2002, Oracle and/or its affiliates. All rights reserved. |
aoqi@0 | 3 | * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
aoqi@0 | 4 | * |
aoqi@0 | 5 | * This code is free software; you can redistribute it and/or modify it |
aoqi@0 | 6 | * under the terms of the GNU General Public License version 2 only, as |
aoqi@0 | 7 | * published by the Free Software Foundation. Oracle designates this |
aoqi@0 | 8 | * particular file as subject to the "Classpath" exception as provided |
aoqi@0 | 9 | * by Oracle in the LICENSE file that accompanied this code. |
aoqi@0 | 10 | * |
aoqi@0 | 11 | * This code is distributed in the hope that it will be useful, but WITHOUT |
aoqi@0 | 12 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
aoqi@0 | 13 | * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
aoqi@0 | 14 | * version 2 for more details (a copy is included in the LICENSE file that |
aoqi@0 | 15 | * accompanied this code). |
aoqi@0 | 16 | * |
aoqi@0 | 17 | * You should have received a copy of the GNU General Public License version |
aoqi@0 | 18 | * 2 along with this work; if not, write to the Free Software Foundation, |
aoqi@0 | 19 | * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
aoqi@0 | 20 | * |
aoqi@0 | 21 | * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA |
aoqi@0 | 22 | * or visit www.oracle.com if you need additional information or have any |
aoqi@0 | 23 | * questions. |
aoqi@0 | 24 | */ |
aoqi@0 | 25 | |
aoqi@0 | 26 | /* |
aoqi@0 | 27 | File: ConditionVariable.java |
aoqi@0 | 28 | |
aoqi@0 | 29 | Originally written by Doug Lea and released into the public domain. |
aoqi@0 | 30 | This may be used for any purposes whatsoever without acknowledgment. |
aoqi@0 | 31 | Thanks for the assistance and support of Sun Microsystems Labs, |
aoqi@0 | 32 | and everyone contributing, testing, and using this code. |
aoqi@0 | 33 | |
aoqi@0 | 34 | History: |
aoqi@0 | 35 | Date Who What |
aoqi@0 | 36 | 11Jun1998 dl Create public version |
aoqi@0 | 37 | 08dec2001 kmc Added support for Reentrant Mutexes |
aoqi@0 | 38 | */ |
aoqi@0 | 39 | |
aoqi@0 | 40 | package com.sun.corba.se.impl.orbutil.concurrent; |
aoqi@0 | 41 | |
aoqi@0 | 42 | import com.sun.corba.se.impl.orbutil.ORBUtility ; |
aoqi@0 | 43 | |
aoqi@0 | 44 | /** |
aoqi@0 | 45 | * This class is designed for fans of POSIX pthreads programming. |
aoqi@0 | 46 | * If you restrict yourself to Mutexes and CondVars, you can |
aoqi@0 | 47 | * use most of your favorite constructions. Don't randomly mix them |
aoqi@0 | 48 | * with synchronized methods or blocks though. |
aoqi@0 | 49 | * <p> |
aoqi@0 | 50 | * Method names and behavior are as close as is reasonable to |
aoqi@0 | 51 | * those in POSIX. |
aoqi@0 | 52 | * <p> |
aoqi@0 | 53 | * <b>Sample Usage.</b> Here is a full version of a bounded buffer |
aoqi@0 | 54 | * that implements the BoundedChannel interface, written in |
aoqi@0 | 55 | * a style reminscent of that in POSIX programming books. |
aoqi@0 | 56 | * <pre> |
aoqi@0 | 57 | * class CVBuffer implements BoundedChannel { |
aoqi@0 | 58 | * private final Mutex mutex; |
aoqi@0 | 59 | * private final CondVar notFull; |
aoqi@0 | 60 | * private final CondVar notEmpty; |
aoqi@0 | 61 | * private int count = 0; |
aoqi@0 | 62 | * private int takePtr = 0; |
aoqi@0 | 63 | * private int putPtr = 0; |
aoqi@0 | 64 | * private final Object[] array; |
aoqi@0 | 65 | * |
aoqi@0 | 66 | * public CVBuffer(int capacity) { |
aoqi@0 | 67 | * array = new Object[capacity]; |
aoqi@0 | 68 | * mutex = new Mutex(); |
aoqi@0 | 69 | * notFull = new CondVar(mutex); |
aoqi@0 | 70 | * notEmpty = new CondVar(mutex); |
aoqi@0 | 71 | * } |
aoqi@0 | 72 | * |
aoqi@0 | 73 | * public int capacity() { return array.length; } |
aoqi@0 | 74 | * |
aoqi@0 | 75 | * public void put(Object x) throws InterruptedException { |
aoqi@0 | 76 | * mutex.acquire(); |
aoqi@0 | 77 | * try { |
aoqi@0 | 78 | * while (count == array.length) { |
aoqi@0 | 79 | * notFull.await(); |
aoqi@0 | 80 | * } |
aoqi@0 | 81 | * array[putPtr] = x; |
aoqi@0 | 82 | * putPtr = (putPtr + 1) % array.length; |
aoqi@0 | 83 | * ++count; |
aoqi@0 | 84 | * notEmpty.signal(); |
aoqi@0 | 85 | * } |
aoqi@0 | 86 | * finally { |
aoqi@0 | 87 | * mutex.release(); |
aoqi@0 | 88 | * } |
aoqi@0 | 89 | * } |
aoqi@0 | 90 | * |
aoqi@0 | 91 | * public Object take() throws InterruptedException { |
aoqi@0 | 92 | * Object x = null; |
aoqi@0 | 93 | * mutex.acquire(); |
aoqi@0 | 94 | * try { |
aoqi@0 | 95 | * while (count == 0) { |
aoqi@0 | 96 | * notEmpty.await(); |
aoqi@0 | 97 | * } |
aoqi@0 | 98 | * x = array[takePtr]; |
aoqi@0 | 99 | * array[takePtr] = null; |
aoqi@0 | 100 | * takePtr = (takePtr + 1) % array.length; |
aoqi@0 | 101 | * --count; |
aoqi@0 | 102 | * notFull.signal(); |
aoqi@0 | 103 | * } |
aoqi@0 | 104 | * finally { |
aoqi@0 | 105 | * mutex.release(); |
aoqi@0 | 106 | * } |
aoqi@0 | 107 | * return x; |
aoqi@0 | 108 | * } |
aoqi@0 | 109 | * |
aoqi@0 | 110 | * public boolean offer(Object x, long msecs) throws InterruptedException { |
aoqi@0 | 111 | * mutex.acquire(); |
aoqi@0 | 112 | * try { |
aoqi@0 | 113 | * if (count == array.length) { |
aoqi@0 | 114 | * notFull.timedwait(msecs); |
aoqi@0 | 115 | * if (count == array.length) |
aoqi@0 | 116 | * return false; |
aoqi@0 | 117 | * } |
aoqi@0 | 118 | * array[putPtr] = x; |
aoqi@0 | 119 | * putPtr = (putPtr + 1) % array.length; |
aoqi@0 | 120 | * ++count; |
aoqi@0 | 121 | * notEmpty.signal(); |
aoqi@0 | 122 | * return true; |
aoqi@0 | 123 | * } |
aoqi@0 | 124 | * finally { |
aoqi@0 | 125 | * mutex.release(); |
aoqi@0 | 126 | * } |
aoqi@0 | 127 | * } |
aoqi@0 | 128 | * |
aoqi@0 | 129 | * public Object poll(long msecs) throws InterruptedException { |
aoqi@0 | 130 | * Object x = null; |
aoqi@0 | 131 | * mutex.acquire(); |
aoqi@0 | 132 | * try { |
aoqi@0 | 133 | * if (count == 0) { |
aoqi@0 | 134 | * notEmpty.timedwait(msecs); |
aoqi@0 | 135 | * if (count == 0) |
aoqi@0 | 136 | * return null; |
aoqi@0 | 137 | * } |
aoqi@0 | 138 | * x = array[takePtr]; |
aoqi@0 | 139 | * array[takePtr] = null; |
aoqi@0 | 140 | * takePtr = (takePtr + 1) % array.length; |
aoqi@0 | 141 | * --count; |
aoqi@0 | 142 | * notFull.signal(); |
aoqi@0 | 143 | * } |
aoqi@0 | 144 | * finally { |
aoqi@0 | 145 | * mutex.release(); |
aoqi@0 | 146 | * } |
aoqi@0 | 147 | * return x; |
aoqi@0 | 148 | * } |
aoqi@0 | 149 | * } |
aoqi@0 | 150 | * |
aoqi@0 | 151 | * </pre> |
aoqi@0 | 152 | * @see Mutex |
aoqi@0 | 153 | * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] |
aoqi@0 | 154 | |
aoqi@0 | 155 | **/ |
aoqi@0 | 156 | |
aoqi@0 | 157 | public class CondVar { |
aoqi@0 | 158 | |
aoqi@0 | 159 | protected boolean debug_ ; |
aoqi@0 | 160 | |
aoqi@0 | 161 | /** The mutex **/ |
aoqi@0 | 162 | protected final Sync mutex_; |
aoqi@0 | 163 | protected final ReentrantMutex remutex_; |
aoqi@0 | 164 | |
aoqi@0 | 165 | private int releaseMutex() |
aoqi@0 | 166 | { |
aoqi@0 | 167 | int count = 1 ; |
aoqi@0 | 168 | |
aoqi@0 | 169 | if (remutex_!=null) |
aoqi@0 | 170 | count = remutex_.releaseAll() ; |
aoqi@0 | 171 | else |
aoqi@0 | 172 | mutex_.release() ; |
aoqi@0 | 173 | |
aoqi@0 | 174 | return count ; |
aoqi@0 | 175 | } |
aoqi@0 | 176 | |
aoqi@0 | 177 | private void acquireMutex( int count ) throws InterruptedException |
aoqi@0 | 178 | { |
aoqi@0 | 179 | if (remutex_!=null) |
aoqi@0 | 180 | remutex_.acquireAll( count ) ; |
aoqi@0 | 181 | else |
aoqi@0 | 182 | mutex_.acquire() ; |
aoqi@0 | 183 | } |
aoqi@0 | 184 | |
aoqi@0 | 185 | /** |
aoqi@0 | 186 | * Create a new CondVar that relies on the given mutual |
aoqi@0 | 187 | * exclusion lock. |
aoqi@0 | 188 | * @param mutex A mutual exclusion lock which must either be non-reentrant, |
aoqi@0 | 189 | * or else be ReentrantMutex. |
aoqi@0 | 190 | * Standard usage is to supply an instance of <code>Mutex</code>, |
aoqi@0 | 191 | * but, for example, a Semaphore initialized to 1 also works. |
aoqi@0 | 192 | * On the other hand, many other Sync implementations would not |
aoqi@0 | 193 | * work here, so some care is required to supply a sensible |
aoqi@0 | 194 | * synchronization object. |
aoqi@0 | 195 | * In normal use, the mutex should be one that is used for <em>all</em> |
aoqi@0 | 196 | * synchronization of the object using the CondVar. Generally, |
aoqi@0 | 197 | * to prevent nested monitor lockouts, this |
aoqi@0 | 198 | * object should not use any native Java synchronized blocks. |
aoqi@0 | 199 | **/ |
aoqi@0 | 200 | |
aoqi@0 | 201 | public CondVar(Sync mutex, boolean debug) { |
aoqi@0 | 202 | debug_ = debug ; |
aoqi@0 | 203 | mutex_ = mutex; |
aoqi@0 | 204 | if (mutex instanceof ReentrantMutex) |
aoqi@0 | 205 | remutex_ = (ReentrantMutex)mutex; |
aoqi@0 | 206 | else |
aoqi@0 | 207 | remutex_ = null; |
aoqi@0 | 208 | } |
aoqi@0 | 209 | |
aoqi@0 | 210 | public CondVar( Sync mutex ) { |
aoqi@0 | 211 | this( mutex, false ) ; |
aoqi@0 | 212 | } |
aoqi@0 | 213 | |
aoqi@0 | 214 | /** |
aoqi@0 | 215 | * Wait for notification. This operation at least momentarily |
aoqi@0 | 216 | * releases the mutex. The mutex is always held upon return, |
aoqi@0 | 217 | * even if interrupted. |
aoqi@0 | 218 | * @exception InterruptedException if the thread was interrupted |
aoqi@0 | 219 | * before or during the wait. However, if the thread is interrupted |
aoqi@0 | 220 | * after the wait but during mutex re-acquisition, the interruption |
aoqi@0 | 221 | * is ignored, while still ensuring |
aoqi@0 | 222 | * that the currentThread's interruption state stays true, so can |
aoqi@0 | 223 | * be probed by callers. |
aoqi@0 | 224 | **/ |
aoqi@0 | 225 | public void await() throws InterruptedException { |
aoqi@0 | 226 | int count = 0 ; |
aoqi@0 | 227 | if (Thread.interrupted()) |
aoqi@0 | 228 | throw new InterruptedException(); |
aoqi@0 | 229 | |
aoqi@0 | 230 | try { |
aoqi@0 | 231 | if (debug_) |
aoqi@0 | 232 | ORBUtility.dprintTrace( this, "await enter" ) ; |
aoqi@0 | 233 | |
aoqi@0 | 234 | synchronized(this) { |
aoqi@0 | 235 | count = releaseMutex() ; |
aoqi@0 | 236 | try { |
aoqi@0 | 237 | wait(); |
aoqi@0 | 238 | } catch (InterruptedException ex) { |
aoqi@0 | 239 | notify(); |
aoqi@0 | 240 | throw ex; |
aoqi@0 | 241 | } |
aoqi@0 | 242 | } |
aoqi@0 | 243 | } finally { |
aoqi@0 | 244 | // Must ignore interrupt on re-acquire |
aoqi@0 | 245 | boolean interrupted = false; |
aoqi@0 | 246 | for (;;) { |
aoqi@0 | 247 | try { |
aoqi@0 | 248 | acquireMutex( count ); |
aoqi@0 | 249 | break; |
aoqi@0 | 250 | } catch (InterruptedException ex) { |
aoqi@0 | 251 | interrupted = true; |
aoqi@0 | 252 | } |
aoqi@0 | 253 | } |
aoqi@0 | 254 | |
aoqi@0 | 255 | if (interrupted) { |
aoqi@0 | 256 | Thread.currentThread().interrupt(); |
aoqi@0 | 257 | } |
aoqi@0 | 258 | |
aoqi@0 | 259 | if (debug_) |
aoqi@0 | 260 | ORBUtility.dprintTrace( this, "await exit" ) ; |
aoqi@0 | 261 | } |
aoqi@0 | 262 | } |
aoqi@0 | 263 | |
aoqi@0 | 264 | /** |
aoqi@0 | 265 | * Wait for at most msecs for notification. |
aoqi@0 | 266 | * This operation at least momentarily |
aoqi@0 | 267 | * releases the mutex. The mutex is always held upon return, |
aoqi@0 | 268 | * even if interrupted. |
aoqi@0 | 269 | * @param msecs The time to wait. A value less than or equal to zero |
aoqi@0 | 270 | * causes a momentarily release |
aoqi@0 | 271 | * and re-acquire of the mutex, and always returns false. |
aoqi@0 | 272 | * @return false if at least msecs have elapsed |
aoqi@0 | 273 | * upon resumption; else true. A |
aoqi@0 | 274 | * false return does NOT necessarily imply that the thread was |
aoqi@0 | 275 | * not notified. For example, it might have been notified |
aoqi@0 | 276 | * after the time elapsed but just before resuming. |
aoqi@0 | 277 | * @exception InterruptedException if the thread was interrupted |
aoqi@0 | 278 | * before or during the wait. |
aoqi@0 | 279 | **/ |
aoqi@0 | 280 | |
aoqi@0 | 281 | public boolean timedwait(long msecs) throws InterruptedException { |
aoqi@0 | 282 | |
aoqi@0 | 283 | if (Thread.interrupted()) |
aoqi@0 | 284 | throw new InterruptedException(); |
aoqi@0 | 285 | |
aoqi@0 | 286 | boolean success = false; |
aoqi@0 | 287 | int count = 0; |
aoqi@0 | 288 | |
aoqi@0 | 289 | try { |
aoqi@0 | 290 | if (debug_) |
aoqi@0 | 291 | ORBUtility.dprintTrace( this, "timedwait enter" ) ; |
aoqi@0 | 292 | |
aoqi@0 | 293 | synchronized(this) { |
aoqi@0 | 294 | count = releaseMutex() ; |
aoqi@0 | 295 | try { |
aoqi@0 | 296 | if (msecs > 0) { |
aoqi@0 | 297 | long start = System.currentTimeMillis(); |
aoqi@0 | 298 | wait(msecs); |
aoqi@0 | 299 | success = System.currentTimeMillis() - start <= msecs; |
aoqi@0 | 300 | } |
aoqi@0 | 301 | } catch (InterruptedException ex) { |
aoqi@0 | 302 | notify(); |
aoqi@0 | 303 | throw ex; |
aoqi@0 | 304 | } |
aoqi@0 | 305 | } |
aoqi@0 | 306 | } finally { |
aoqi@0 | 307 | // Must ignore interrupt on re-acquire |
aoqi@0 | 308 | boolean interrupted = false; |
aoqi@0 | 309 | for (;;) { |
aoqi@0 | 310 | try { |
aoqi@0 | 311 | acquireMutex( count ) ; |
aoqi@0 | 312 | break; |
aoqi@0 | 313 | } catch (InterruptedException ex) { |
aoqi@0 | 314 | interrupted = true; |
aoqi@0 | 315 | } |
aoqi@0 | 316 | } |
aoqi@0 | 317 | |
aoqi@0 | 318 | if (interrupted) { |
aoqi@0 | 319 | Thread.currentThread().interrupt(); |
aoqi@0 | 320 | } |
aoqi@0 | 321 | |
aoqi@0 | 322 | if (debug_) |
aoqi@0 | 323 | ORBUtility.dprintTrace( this, "timedwait exit" ) ; |
aoqi@0 | 324 | } |
aoqi@0 | 325 | return success; |
aoqi@0 | 326 | } |
aoqi@0 | 327 | |
aoqi@0 | 328 | /** |
aoqi@0 | 329 | * Notify a waiting thread. |
aoqi@0 | 330 | * If one exists, a non-interrupted thread will return |
aoqi@0 | 331 | * normally (i.e., not via InterruptedException) from await or timedwait. |
aoqi@0 | 332 | **/ |
aoqi@0 | 333 | public synchronized void signal() { |
aoqi@0 | 334 | notify(); |
aoqi@0 | 335 | } |
aoqi@0 | 336 | |
aoqi@0 | 337 | /** Notify all waiting threads **/ |
aoqi@0 | 338 | public synchronized void broadcast() { |
aoqi@0 | 339 | notifyAll(); |
aoqi@0 | 340 | } |
aoqi@0 | 341 | } |