src/share/jaxws_classes/com/sun/xml/internal/ws/api/pipe/Fiber.java

Tue, 09 Apr 2013 14:51:13 +0100

author
alanb
date
Tue, 09 Apr 2013 14:51:13 +0100
changeset 368
0989ad8c0860
parent 286
f50545b5e2f1
child 384
8f2986ff0235
permissions
-rw-r--r--

8010393: Update JAX-WS RI to 2.2.9-b12941
Reviewed-by: alanb, erikj
Contributed-by: miroslav.kos@oracle.com, martin.grebac@oracle.com

ohair@286 1 /*
alanb@368 2 * Copyright (c) 1997, 2013, Oracle and/or its affiliates. All rights reserved.
ohair@286 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
ohair@286 4 *
ohair@286 5 * This code is free software; you can redistribute it and/or modify it
ohair@286 6 * under the terms of the GNU General Public License version 2 only, as
ohair@286 7 * published by the Free Software Foundation. Oracle designates this
ohair@286 8 * particular file as subject to the "Classpath" exception as provided
ohair@286 9 * by Oracle in the LICENSE file that accompanied this code.
ohair@286 10 *
ohair@286 11 * This code is distributed in the hope that it will be useful, but WITHOUT
ohair@286 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
ohair@286 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
ohair@286 14 * version 2 for more details (a copy is included in the LICENSE file that
ohair@286 15 * accompanied this code).
ohair@286 16 *
ohair@286 17 * You should have received a copy of the GNU General Public License version
ohair@286 18 * 2 along with this work; if not, write to the Free Software Foundation,
ohair@286 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
ohair@286 20 *
ohair@286 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
ohair@286 22 * or visit www.oracle.com if you need additional information or have any
ohair@286 23 * questions.
ohair@286 24 */
ohair@286 25
ohair@286 26 package com.sun.xml.internal.ws.api.pipe;
ohair@286 27
ohair@286 28 import com.sun.istack.internal.NotNull;
ohair@286 29 import com.sun.istack.internal.Nullable;
ohair@286 30 import com.sun.xml.internal.ws.api.Cancelable;
ohair@286 31 import com.sun.xml.internal.ws.api.Component;
ohair@286 32 import com.sun.xml.internal.ws.api.ComponentRegistry;
ohair@286 33 import com.sun.xml.internal.ws.api.SOAPVersion;
ohair@286 34 import com.sun.xml.internal.ws.api.addressing.AddressingVersion;
alanb@368 35 import com.sun.xml.internal.ws.api.message.AddressingUtils;
ohair@286 36 import com.sun.xml.internal.ws.api.message.Packet;
ohair@286 37 import com.sun.xml.internal.ws.api.pipe.helper.AbstractFilterTubeImpl;
alanb@368 38 import com.sun.xml.internal.ws.api.pipe.helper.AbstractTubeImpl;
ohair@286 39 import com.sun.xml.internal.ws.api.server.Adapter;
alanb@368 40 import com.sun.xml.internal.ws.api.server.Container;
alanb@368 41 import com.sun.xml.internal.ws.api.server.ContainerResolver;
ohair@286 42
ohair@286 43 import java.util.ArrayList;
ohair@286 44 import java.util.List;
ohair@286 45 import java.util.Set;
ohair@286 46 import java.util.concurrent.CopyOnWriteArraySet;
ohair@286 47 import java.util.concurrent.atomic.AtomicInteger;
alanb@368 48 import java.util.concurrent.locks.Condition;
ohair@286 49 import java.util.concurrent.locks.ReentrantLock;
ohair@286 50 import java.util.logging.Level;
ohair@286 51 import java.util.logging.Logger;
ohair@286 52
alanb@368 53 import javax.xml.ws.Holder;
alanb@368 54 import javax.xml.ws.WebServiceException;
alanb@368 55
ohair@286 56 /**
ohair@286 57 * User-level thread. Represents the execution of one request/response processing.
ohair@286 58 * <p/>
ohair@286 59 * <p/>
ohair@286 60 * JAX-WS RI is capable of running a large number of request/response concurrently by
ohair@286 61 * using a relatively small number of threads. This is made possible by utilizing
ohair@286 62 * a {@link Fiber} &mdash; a user-level thread that gets created for each request/response
ohair@286 63 * processing.
ohair@286 64 * <p/>
ohair@286 65 * <p/>
ohair@286 66 * A fiber remembers where in the pipeline the processing is at, what needs to be
ohair@286 67 * executed on the way out (when processing response), and other additional information
ohair@286 68 * specific to the execution of a particular request/response.
ohair@286 69 * <p/>
ohair@286 70 * <h2>Suspend/Resume</h2>
ohair@286 71 * <p/>
ohair@286 72 * Fiber can be {@link NextAction#suspend() suspended} by a {@link Tube}.
ohair@286 73 * When a fiber is suspended, it will be kept on the side until it is
ohair@286 74 * {@link #resume(Packet) resumed}. This allows threads to go execute
ohair@286 75 * other runnable fibers, allowing efficient utilization of smaller number of
ohair@286 76 * threads.
ohair@286 77 * <p/>
ohair@286 78 * <h2>Context-switch Interception</h2>
ohair@286 79 * <p/>
ohair@286 80 * {@link FiberContextSwitchInterceptor} allows {@link Tube}s and {@link Adapter}s
ohair@286 81 * to perform additional processing every time a thread starts running a fiber
ohair@286 82 * and stops running it.
ohair@286 83 * <p/>
ohair@286 84 * <h2>Context ClassLoader</h2>
ohair@286 85 * <p/>
ohair@286 86 * Just like thread, a fiber has a context class loader (CCL.) A fiber's CCL
ohair@286 87 * becomes the thread's CCL when it's executing the fiber. The original CCL
ohair@286 88 * of the thread will be restored when the thread leaves the fiber execution.
ohair@286 89 * <p/>
ohair@286 90 * <p/>
ohair@286 91 * <h2>Debugging Aid</h2>
ohair@286 92 * <p/>
ohair@286 93 * Because {@link Fiber} doesn't keep much in the call stack, and instead use
ohair@286 94 * {@link #conts} to store the continuation, debugging fiber related activities
ohair@286 95 * could be harder.
ohair@286 96 * <p/>
ohair@286 97 * <p/>
ohair@286 98 * Setting the {@link #LOGGER} for FINE would give you basic start/stop/resume/suspend
ohair@286 99 * level logging. Using FINER would cause more detailed logging, which includes
ohair@286 100 * what tubes are executed in what order and how they behaved.
ohair@286 101 * <p/>
ohair@286 102 * <p/>
ohair@286 103 * When you debug the server side, consider setting {@link Fiber#serializeExecution}
ohair@286 104 * to true, so that execution of fibers are serialized. Debugging a server
ohair@286 105 * with more than one running threads is very tricky, and this switch will
ohair@286 106 * prevent that. This can be also enabled by setting the system property on.
ohair@286 107 * See the source code.
ohair@286 108 *
ohair@286 109 * @author Kohsuke Kawaguchi
ohair@286 110 * @author Jitendra Kotamraju
ohair@286 111 */
ohair@286 112 public final class Fiber implements Runnable, Cancelable, ComponentRegistry {
ohair@286 113
ohair@286 114 /**
ohair@286 115 * Callback interface for notification of suspend and resume.
ohair@286 116 *
ohair@286 117 * @since 2.2.6
alanb@368 118 * @deprecated Use {@link NextAction#suspend(Runnable)}
ohair@286 119 */
ohair@286 120 public interface Listener {
ohair@286 121 /**
ohair@286 122 * Fiber has been suspended. Implementations of this callback may resume the Fiber.
ohair@286 123 * @param fiber Fiber
ohair@286 124 */
ohair@286 125 public void fiberSuspended(Fiber fiber);
ohair@286 126
ohair@286 127 /**
ohair@286 128 * Fiber has been resumed. Behavior is undefined if implementations of this callback attempt to suspend the Fiber.
ohair@286 129 * @param fiber Fiber
ohair@286 130 */
ohair@286 131 public void fiberResumed(Fiber fiber);
ohair@286 132 }
ohair@286 133
alanb@368 134 private final List<Listener> _listeners = new ArrayList<Listener>();
ohair@286 135
ohair@286 136 /**
ohair@286 137 * Adds suspend/resume callback listener
ohair@286 138 * @param listener Listener
ohair@286 139 * @since 2.2.6
alanb@368 140 * @deprecated
ohair@286 141 */
ohair@286 142 public void addListener(Listener listener) {
ohair@286 143 synchronized(_listeners) {
ohair@286 144 if (!_listeners.contains(listener)) {
ohair@286 145 _listeners.add(listener);
ohair@286 146 }
ohair@286 147 }
ohair@286 148 }
ohair@286 149
ohair@286 150 /**
ohair@286 151 * Removes suspend/resume callback listener
ohair@286 152 * @param listener Listener
ohair@286 153 * @since 2.2.6
alanb@368 154 * @deprecated
ohair@286 155 */
ohair@286 156 public void removeListener(Listener listener) {
ohair@286 157 synchronized(_listeners) {
ohair@286 158 _listeners.remove(listener);
ohair@286 159 }
ohair@286 160 }
ohair@286 161
alanb@368 162 List<Listener> getCurrentListeners() {
ohair@286 163 synchronized(_listeners) {
ohair@286 164 return new ArrayList<Listener>(_listeners);
ohair@286 165 }
ohair@286 166 }
ohair@286 167
ohair@286 168 private void clearListeners() {
ohair@286 169 synchronized(_listeners) {
ohair@286 170 _listeners.clear();
ohair@286 171 }
ohair@286 172 }
ohair@286 173
ohair@286 174 /**
ohair@286 175 * {@link Tube}s whose {@link Tube#processResponse(Packet)} method needs
ohair@286 176 * to be invoked on the way back.
ohair@286 177 */
ohair@286 178 private Tube[] conts = new Tube[16];
ohair@286 179 private int contsSize;
ohair@286 180
ohair@286 181 /**
ohair@286 182 * If this field is non-null, the next instruction to execute is
ohair@286 183 * to call its {@link Tube#processRequest(Packet)}. Otherwise
ohair@286 184 * the instruction is to call {@link #conts}.
ohair@286 185 */
ohair@286 186 private Tube next;
ohair@286 187
ohair@286 188 private Packet packet;
ohair@286 189
ohair@286 190 private Throwable/*but really it's either RuntimeException or Error*/ throwable;
ohair@286 191
ohair@286 192 public final Engine owner;
ohair@286 193
ohair@286 194 /**
ohair@286 195 * Is this thread suspended? 0=not suspended, 1=suspended.
ohair@286 196 * <p/>
ohair@286 197 * <p/>
ohair@286 198 * Logically this is just a boolean, but we need to prepare for the case
ohair@286 199 * where the thread is {@link #resume(Packet) resumed} before we get to the {@link #suspend()}.
ohair@286 200 * This happens when things happen in the following order:
ohair@286 201 * <p/>
ohair@286 202 * <ol>
ohair@286 203 * <li>Tube decides that the fiber needs to be suspended to wait for the external event.
ohair@286 204 * <li>Tube hooks up fiber with some external mechanism (like NIO channel selector)
ohair@286 205 * <li>Tube returns with {@link NextAction#suspend()}.
ohair@286 206 * <li>"External mechanism" becomes signal state and invokes {@link Fiber#resume(Packet)}
ohair@286 207 * to wake up fiber
ohair@286 208 * <li>{@link Fiber#doRun} invokes {@link Fiber#suspend()}.
ohair@286 209 * </ol>
ohair@286 210 * <p/>
ohair@286 211 * <p/>
ohair@286 212 * Using int, this will work OK because {@link #suspendedCount} becomes -1 when
ohair@286 213 * {@link #resume(Packet)} occurs before {@link #suspend()}.
ohair@286 214 * <p/>
ohair@286 215 * <p/>
ohair@286 216 * Increment and decrement is guarded by 'this' object.
ohair@286 217 */
ohair@286 218 private volatile int suspendedCount = 0;
ohair@286 219
ohair@286 220 private volatile boolean isInsideSuspendCallbacks = false;
ohair@286 221
ohair@286 222 /**
ohair@286 223 * Is this {@link Fiber} currently running in the synchronous mode?
ohair@286 224 */
ohair@286 225 private boolean synchronous;
ohair@286 226
ohair@286 227 private boolean interrupted;
ohair@286 228
ohair@286 229 private final int id;
ohair@286 230
ohair@286 231 /**
ohair@286 232 * Active {@link FiberContextSwitchInterceptor}s for this fiber.
ohair@286 233 */
ohair@286 234 private List<FiberContextSwitchInterceptor> interceptors;
ohair@286 235
ohair@286 236 /**
ohair@286 237 * Fiber's context {@link ClassLoader}.
ohair@286 238 */
ohair@286 239 private
ohair@286 240 @Nullable
ohair@286 241 ClassLoader contextClassLoader;
ohair@286 242
ohair@286 243 private
ohair@286 244 @Nullable
ohair@286 245 CompletionCallback completionCallback;
ohair@286 246
alanb@368 247 private boolean isDeliverThrowableInPacket = false;
alanb@368 248
alanb@368 249 public void setDeliverThrowableInPacket(boolean isDeliverThrowableInPacket) {
alanb@368 250 this.isDeliverThrowableInPacket = isDeliverThrowableInPacket;
alanb@368 251 }
alanb@368 252
ohair@286 253 /**
ohair@286 254 * The thread on which this Fiber is currently executing, if applicable.
ohair@286 255 */
ohair@286 256 private Thread currentThread;
ohair@286 257
alanb@368 258 /**
alanb@368 259 * Replace uses of synchronized(this) with this lock so that we can control
alanb@368 260 * unlocking for resume use cases
alanb@368 261 */
alanb@368 262 private final ReentrantLock lock = new ReentrantLock();
alanb@368 263 private final Condition condition = lock.newCondition();
alanb@368 264
ohair@286 265 private volatile boolean isCanceled;
ohair@286 266
ohair@286 267 /**
ohair@286 268 * Set to true if this fiber is started asynchronously, to avoid
ohair@286 269 * doubly-invoking completion code.
ohair@286 270 */
ohair@286 271 private boolean started;
ohair@286 272
ohair@286 273 /**
ohair@286 274 * Set to true if this fiber is started sync but allowed to run async.
ohair@286 275 * This property exists for use cases where the processing model is fundamentally async
ohair@286 276 * but some requirement or feature mandates that part of the tubeline run synchronously. For
ohair@286 277 * instance, WS-ReliableMessaging with non-anonymous addressing is compatible with running
ohair@286 278 * asynchronously, but if in-order message delivery is used then message processing must assign
ohair@286 279 * a message number before the remainder of the processing can be asynchronous.
ohair@286 280 */
ohair@286 281 private boolean startedSync;
ohair@286 282
ohair@286 283 /**
alanb@368 284 * Callback to be invoked when a {@link Fiber} finishes execution.
ohair@286 285 */
ohair@286 286 public interface CompletionCallback {
ohair@286 287 /**
ohair@286 288 * Indicates that the fiber has finished its execution.
ohair@286 289 * <p/>
ohair@286 290 * <p/>
ohair@286 291 * Since the JAX-WS RI runs asynchronously,
ohair@286 292 * this method maybe invoked by a different thread
ohair@286 293 * than any of the threads that started it or run a part of tubeline.
ohair@286 294 */
ohair@286 295 void onCompletion(@NotNull Packet response);
ohair@286 296
ohair@286 297 /**
ohair@286 298 * Indicates that the fiber has finished abnormally, by throwing a given {@link Throwable}.
ohair@286 299 */
ohair@286 300 void onCompletion(@NotNull Throwable error);
ohair@286 301 }
ohair@286 302
ohair@286 303 Fiber(Engine engine) {
ohair@286 304 this.owner = engine;
alanb@368 305 id = iotaGen.incrementAndGet();
ohair@286 306 if (isTraceEnabled()) {
alanb@368 307 LOGGER.log(Level.FINE, "{0} created", getName());
ohair@286 308 }
ohair@286 309
ohair@286 310 // if this is run from another fiber, then we naturally inherit its context classloader,
ohair@286 311 // so this code works for fiber->fiber inheritance just fine.
ohair@286 312 contextClassLoader = Thread.currentThread().getContextClassLoader();
ohair@286 313 }
ohair@286 314
ohair@286 315 /**
ohair@286 316 * Starts the execution of this fiber asynchronously.
ohair@286 317 * <p/>
ohair@286 318 * <p/>
ohair@286 319 * This method works like {@link Thread#start()}.
ohair@286 320 *
ohair@286 321 * @param tubeline The first tube of the tubeline that will act on the packet.
ohair@286 322 * @param request The request packet to be passed to <tt>startPoint.processRequest()</tt>.
ohair@286 323 * @param completionCallback The callback to be invoked when the processing is finished and the
ohair@286 324 * final response packet is available.
ohair@286 325 * @see #runSync(Tube, Packet)
ohair@286 326 */
ohair@286 327 public void start(@NotNull Tube tubeline, @NotNull Packet request, @Nullable CompletionCallback completionCallback) {
ohair@286 328 start(tubeline, request, completionCallback, false);
ohair@286 329 }
ohair@286 330
ohair@286 331 private void dumpFiberContext(String desc) {
ohair@286 332 if(isTraceEnabled()) {
ohair@286 333 String action = null;
ohair@286 334 String msgId = null;
ohair@286 335 if (packet != null) {
ohair@286 336 for (SOAPVersion sv: SOAPVersion.values()) {
ohair@286 337 for (AddressingVersion av: AddressingVersion.values()) {
alanb@368 338 action = packet.getMessage() != null ? AddressingUtils.getAction(packet.getMessage().getHeaders(), av, sv) : null;
alanb@368 339 msgId = packet.getMessage() != null ? AddressingUtils.getMessageID(packet.getMessage().getHeaders(), av, sv) : null;
ohair@286 340 if (action != null || msgId != null) {
ohair@286 341 break;
ohair@286 342 }
ohair@286 343 }
ohair@286 344 if (action != null || msgId != null) {
ohair@286 345 break;
ohair@286 346 }
ohair@286 347 }
ohair@286 348 }
ohair@286 349 String actionAndMsgDesc;
ohair@286 350 if (action == null && msgId == null) {
ohair@286 351 actionAndMsgDesc = "NO ACTION or MSG ID";
ohair@286 352 } else {
ohair@286 353 actionAndMsgDesc = "'" + action + "' and msgId '" + msgId + "'";
ohair@286 354 }
ohair@286 355
ohair@286 356 String tubeDesc;
ohair@286 357 if (next != null) {
ohair@286 358 tubeDesc = next.toString() + ".processRequest()";
ohair@286 359 } else {
ohair@286 360 tubeDesc = peekCont() + ".processResponse()";
ohair@286 361 }
ohair@286 362
alanb@368 363 LOGGER.log(Level.FINE, "{0} {1} with {2} and ''current'' tube {3} from thread {4} with Packet: {5}", new Object[]{getName(), desc, actionAndMsgDesc, tubeDesc, Thread.currentThread().getName(), packet != null ? packet.toShortString() : null});
ohair@286 364 }
ohair@286 365 }
ohair@286 366
ohair@286 367 /**
ohair@286 368 * Starts the execution of this fiber.
ohair@286 369 *
ohair@286 370 * If forceSync is true, then the fiber is started for an ostensibly async invocation,
ohair@286 371 * but allows for some portion of the tubeline to run sync with the calling
ohair@286 372 * client instance (Port/Dispatch instance). This allows tubes that enforce
ohair@286 373 * ordering to see requests in the order they were sent at the point the
ohair@286 374 * client invoked them.
ohair@286 375 * <p>
ohair@286 376 * The forceSync parameter will be true only when the caller (e.g. AsyncInvoker or
ohair@286 377 * SEIStub) knows one or more tubes need to enforce ordering and thus need
ohair@286 378 * to run sync with the client. Such tubes can return
ohair@286 379 * NextAction.INVOKE_ASYNC to indicate that the next tube in the tubeline
ohair@286 380 * should be invoked async to the current thread.
ohair@286 381 *
ohair@286 382 * <p>
ohair@286 383 * This method works like {@link Thread#start()}.
ohair@286 384 *
ohair@286 385 * @param tubeline
ohair@286 386 * The first tube of the tubeline that will act on the packet.
ohair@286 387 * @param request
ohair@286 388 * The request packet to be passed to <tt>startPoint.processRequest()</tt>.
ohair@286 389 * @param completionCallback
ohair@286 390 * The callback to be invoked when the processing is finished and the
ohair@286 391 * final response packet is available.
ohair@286 392 *
ohair@286 393 * @see #start(Tube,Packet,CompletionCallback)
ohair@286 394 * @see #runSync(Tube,Packet)
ohair@286 395 * @since 2.2.6
ohair@286 396 */
ohair@286 397 public void start(@NotNull Tube tubeline, @NotNull Packet request, @Nullable CompletionCallback completionCallback, boolean forceSync) {
ohair@286 398 next = tubeline;
ohair@286 399 this.packet = request;
ohair@286 400 this.completionCallback = completionCallback;
ohair@286 401
ohair@286 402 if (forceSync) {
ohair@286 403 this.startedSync = true;
ohair@286 404 dumpFiberContext("starting (sync)");
ohair@286 405 run();
ohair@286 406 } else {
ohair@286 407 this.started = true;
ohair@286 408 dumpFiberContext("starting (async)");
ohair@286 409 owner.addRunnable(this);
ohair@286 410 }
ohair@286 411 }
ohair@286 412
ohair@286 413 /**
ohair@286 414 * Wakes up a suspended fiber.
ohair@286 415 * <p/>
ohair@286 416 * <p/>
ohair@286 417 * If a fiber was suspended without specifying the next {@link Tube},
ohair@286 418 * then the execution will be resumed in the response processing direction,
ohair@286 419 * by calling the {@link Tube#processResponse(Packet)} method on the next/first
ohair@286 420 * {@link Tube} in the {@link Fiber}'s processing stack with the specified resume
ohair@286 421 * packet as the parameter.
ohair@286 422 * <p/>
ohair@286 423 * <p/>
ohair@286 424 * If a fiber was suspended with specifying the next {@link Tube},
ohair@286 425 * then the execution will be resumed in the request processing direction,
ohair@286 426 * by calling the next tube's {@link Tube#processRequest(Packet)} method with the
ohair@286 427 * specified resume packet as the parameter.
ohair@286 428 * <p/>
ohair@286 429 * <p/>
ohair@286 430 * This method is implemented in a race-free way. Another thread can invoke
ohair@286 431 * this method even before this fiber goes into the suspension mode. So the caller
ohair@286 432 * need not worry about synchronizing {@link NextAction#suspend()} and this method.
ohair@286 433 *
ohair@286 434 * @param resumePacket packet used in the resumed processing
ohair@286 435 */
ohair@286 436 public void resume(@NotNull Packet resumePacket) {
ohair@286 437 resume(resumePacket, false);
ohair@286 438 }
ohair@286 439
ohair@286 440 /**
ohair@286 441 * Similar to resume(Packet) but allowing the Fiber to be resumed
ohair@286 442 * synchronously (in the current Thread). If you want to know when the
ohair@286 443 * fiber completes (not when this method returns) then add/wrap a
ohair@286 444 * CompletionCallback on this Fiber.
ohair@286 445 * For example, an asynchronous response endpoint that supports WS-ReliableMessaging
ohair@286 446 * including in-order message delivery may need to resume the Fiber synchronously
ohair@286 447 * until message order is confirmed prior to returning to asynchronous processing.
ohair@286 448 * @since 2.2.6
ohair@286 449 */
alanb@368 450 public void resume(@NotNull Packet resumePacket,
alanb@368 451 boolean forceSync) {
alanb@368 452 resume(resumePacket, forceSync, null);
ohair@286 453 }
ohair@286 454
ohair@286 455 /**
ohair@286 456 * Similar to resume(Packet, boolean) but allowing the Fiber to be resumed
ohair@286 457 * and at the same time atomically assign a new CompletionCallback to it.
ohair@286 458 * @since 2.2.6
ohair@286 459 */
ohair@286 460 public void resume(@NotNull Packet resumePacket,
ohair@286 461 boolean forceSync,
ohair@286 462 CompletionCallback callback) {
alanb@368 463 lock.lock();
alanb@368 464 try {
ohair@286 465 if (callback != null) {
ohair@286 466 setCompletionCallback(callback);
ohair@286 467 }
ohair@286 468 if(isTraceEnabled())
alanb@368 469 LOGGER.log(Level.FINE, "{0} resuming. Will have suspendedCount={1}", new Object[]{getName(), suspendedCount-1});
alanb@368 470 packet = resumePacket;
alanb@368 471 if( --suspendedCount == 0 ) {
alanb@368 472 if (!isInsideSuspendCallbacks) {
ohair@286 473 List<Listener> listeners = getCurrentListeners();
ohair@286 474 for (Listener listener: listeners) {
ohair@286 475 try {
ohair@286 476 listener.fiberResumed(this);
ohair@286 477 } catch (Throwable e) {
ohair@286 478 if (isTraceEnabled())
alanb@368 479 LOGGER.log(Level.FINE, "Listener {0} threw exception: {1}", new Object[]{listener, e.getMessage()});
ohair@286 480 }
ohair@286 481 }
ohair@286 482
ohair@286 483 if(synchronous) {
alanb@368 484 condition.signalAll();
ohair@286 485 } else if (forceSync || startedSync) {
ohair@286 486 run();
ohair@286 487 } else {
ohair@286 488 dumpFiberContext("resuming (async)");
ohair@286 489 owner.addRunnable(this);
ohair@286 490 }
alanb@368 491 }
alanb@368 492 } else {
alanb@368 493 if (isTraceEnabled()) {
alanb@368 494 LOGGER.log(Level.FINE, "{0} taking no action on resume because suspendedCount != 0: {1}", new Object[]{getName(), suspendedCount});
ohair@286 495 }
ohair@286 496 }
alanb@368 497 } finally {
alanb@368 498 lock.unlock();
alanb@368 499 }
ohair@286 500 }
ohair@286 501
ohair@286 502 /**
ohair@286 503 * Wakes up a suspended fiber and begins response processing.
ohair@286 504 * @since 2.2.6
ohair@286 505 */
alanb@368 506 public void resumeAndReturn(@NotNull Packet resumePacket,
alanb@368 507 boolean forceSync) {
ohair@286 508 if(isTraceEnabled())
alanb@368 509 LOGGER.log(Level.FINE, "{0} resumed with Return Packet", getName());
ohair@286 510 next = null;
ohair@286 511 resume(resumePacket, forceSync);
ohair@286 512 }
ohair@286 513
ohair@286 514 /**
ohair@286 515 * Wakes up a suspended fiber with an exception.
ohair@286 516 * <p/>
ohair@286 517 * <p/>
ohair@286 518 * The execution of the suspended fiber will be resumed in the response
ohair@286 519 * processing direction, by calling the {@link Tube#processException(Throwable)} method
ohair@286 520 * on the next/first {@link Tube} in the {@link Fiber}'s processing stack with
ohair@286 521 * the specified exception as the parameter.
ohair@286 522 * <p/>
ohair@286 523 * <p/>
ohair@286 524 * This method is implemented in a race-free way. Another thread can invoke
ohair@286 525 * this method even before this fiber goes into the suspension mode. So the caller
ohair@286 526 * need not worry about synchronizing {@link NextAction#suspend()} and this method.
ohair@286 527 *
ohair@286 528 * @param throwable exception that is used in the resumed processing
ohair@286 529 */
alanb@368 530 public void resume(@NotNull Throwable throwable) {
alanb@368 531 resume(throwable, packet, false);
alanb@368 532 }
alanb@368 533
alanb@368 534 /**
alanb@368 535 * Wakes up a suspended fiber with an exception.
alanb@368 536 * <p/>
alanb@368 537 * <p/>
alanb@368 538 * The execution of the suspended fiber will be resumed in the response
alanb@368 539 * processing direction, by calling the {@link Tube#processException(Throwable)} method
alanb@368 540 * on the next/first {@link Tube} in the {@link Fiber}'s processing stack with
alanb@368 541 * the specified exception as the parameter.
alanb@368 542 * <p/>
alanb@368 543 * <p/>
alanb@368 544 * This method is implemented in a race-free way. Another thread can invoke
alanb@368 545 * this method even before this fiber goes into the suspension mode. So the caller
alanb@368 546 * need not worry about synchronizing {@link NextAction#suspend()} and this method.
alanb@368 547 *
alanb@368 548 * @param throwable exception that is used in the resumed processing
alanb@368 549 * @param packet Packet that will be visible on the Fiber after the resume
alanb@368 550 * @since 2.2.8
alanb@368 551 */
alanb@368 552 public void resume(@NotNull Throwable throwable, @NotNull Packet packet) {
alanb@368 553 resume(throwable, packet, false);
ohair@286 554 }
ohair@286 555
ohair@286 556 /**
ohair@286 557 * Wakes up a suspend fiber with an exception.
ohair@286 558 *
ohair@286 559 * If forceSync is true, then the suspended fiber will resume with
ohair@286 560 * synchronous processing on the current thread. This will continue
ohair@286 561 * until some Tube indicates that it is safe to switch to asynchronous
ohair@286 562 * processing.
ohair@286 563 *
ohair@286 564 * @param error exception that is used in the resumed processing
ohair@286 565 * @param forceSync if processing begins synchronously
ohair@286 566 * @since 2.2.6
ohair@286 567 */
alanb@368 568 public void resume(@NotNull Throwable error,
alanb@368 569 boolean forceSync) {
alanb@368 570 resume(error, packet, forceSync);
alanb@368 571 }
alanb@368 572
alanb@368 573 /**
alanb@368 574 * Wakes up a suspend fiber with an exception.
alanb@368 575 *
alanb@368 576 * If forceSync is true, then the suspended fiber will resume with
alanb@368 577 * synchronous processing on the current thread. This will continue
alanb@368 578 * until some Tube indicates that it is safe to switch to asynchronous
alanb@368 579 * processing.
alanb@368 580 *
alanb@368 581 * @param error exception that is used in the resumed processing
alanb@368 582 * @param packet Packet that will be visible on the Fiber after the resume
alanb@368 583 * @param forceSync if processing begins synchronously
alanb@368 584 * @since 2.2.8
alanb@368 585 */
alanb@368 586 public void resume(@NotNull Throwable error,
alanb@368 587 @NotNull Packet packet,
alanb@368 588 boolean forceSync) {
ohair@286 589 if(isTraceEnabled())
alanb@368 590 LOGGER.log(Level.FINE, "{0} resumed with Return Throwable", getName());
ohair@286 591 next = null;
ohair@286 592 throwable = error;
ohair@286 593 resume(packet, forceSync);
ohair@286 594 }
ohair@286 595
ohair@286 596 /**
ohair@286 597 * Marks this Fiber as cancelled. A cancelled Fiber will never invoke its completion callback
alanb@368 598 * @param mayInterrupt if cancel should use {@link Thread#interrupt()}
alanb@368 599 * @see java.util.concurrent.Future#cancel(boolean)
ohair@286 600 * @since 2.2.6
ohair@286 601 */
alanb@368 602 @Override
ohair@286 603 public void cancel(boolean mayInterrupt) {
ohair@286 604 isCanceled = true;
ohair@286 605 if (mayInterrupt) {
alanb@368 606 // synchronized(this) is used as Thread running Fiber will be holding lock
alanb@368 607 synchronized(this) {
alanb@368 608 if (currentThread != null)
alanb@368 609 currentThread.interrupt();
alanb@368 610 }
ohair@286 611 }
ohair@286 612 }
ohair@286 613
ohair@286 614 /**
ohair@286 615 * Suspends this fiber's execution until the resume method is invoked.
ohair@286 616 * <p/>
ohair@286 617 * The call returns immediately, and when the fiber is resumed
ohair@286 618 * the execution picks up from the last scheduled continuation.
alanb@368 619 * @param onExitRunnable runnable to be invoked after fiber is marked for suspension
alanb@368 620 * @return if control loop must exit
ohair@286 621 */
alanb@368 622 private boolean suspend(Holder<Boolean> isRequireUnlock, Runnable onExitRunnable) {
alanb@368 623 if(isTraceEnabled()) {
alanb@368 624 LOGGER.log(Level.FINE, "{0} suspending. Will have suspendedCount={1}", new Object[]{getName(), suspendedCount+1});
alanb@368 625 if (suspendedCount > 0) {
alanb@368 626 LOGGER.log(Level.FINE, "WARNING - {0} suspended more than resumed. Will require more than one resume to actually resume this fiber.", getName());
alanb@368 627 }
alanb@368 628 }
ohair@286 629
alanb@368 630 List<Listener> listeners = getCurrentListeners();
alanb@368 631 if (++suspendedCount == 1) {
alanb@368 632 isInsideSuspendCallbacks = true;
alanb@368 633 try {
alanb@368 634 for (Listener listener: listeners) {
alanb@368 635 try {
alanb@368 636 listener.fiberSuspended(this);
alanb@368 637 } catch (Throwable e) {
alanb@368 638 if(isTraceEnabled())
alanb@368 639 LOGGER.log(Level.FINE, "Listener {0} threw exception: {1}", new Object[]{listener, e.getMessage()});
alanb@368 640 }
alanb@368 641 }
alanb@368 642 } finally {
alanb@368 643 isInsideSuspendCallbacks = false;
alanb@368 644 }
alanb@368 645 }
alanb@368 646
alanb@368 647 if (suspendedCount <= 0) {
alanb@368 648 // suspend callback caused fiber to resume
alanb@368 649 for (Listener listener: listeners) {
alanb@368 650 try {
alanb@368 651 listener.fiberResumed(this);
alanb@368 652 } catch (Throwable e) {
alanb@368 653 if(isTraceEnabled())
alanb@368 654 LOGGER.log(Level.FINE, "Listener {0} threw exception: {1}", new Object[]{listener, e.getMessage()});
ohair@286 655 }
ohair@286 656 }
ohair@286 657
alanb@368 658 } else if (onExitRunnable != null) {
alanb@368 659 // synchronous use cases cannot disconnect from the current thread
alanb@368 660 if (!synchronous) {
alanb@368 661 /* INTENTIONALLY UNLOCKING EARLY */
alanb@368 662 synchronized(this) {
alanb@368 663 // currentThread is protected by the monitor for this fiber so
alanb@368 664 // that it is accessible to cancel() even when the lock is held
alanb@368 665 currentThread = null;
alanb@368 666 }
alanb@368 667 lock.unlock();
alanb@368 668 assert(!lock.isHeldByCurrentThread());
alanb@368 669 isRequireUnlock.value = Boolean.FALSE;
alanb@368 670
ohair@286 671 try {
alanb@368 672 onExitRunnable.run();
alanb@368 673 } catch(Throwable t) {
alanb@368 674 throw new OnExitRunnableException(t);
ohair@286 675 }
ohair@286 676
alanb@368 677 return true;
alanb@368 678
alanb@368 679 } else {
alanb@368 680 // for synchronous we will stay with current thread, so do not disconnect
alanb@368 681 if (isTraceEnabled())
alanb@368 682 LOGGER.fine("onExitRunnable used with synchronous Fiber execution -- not exiting current thread");
alanb@368 683 onExitRunnable.run();
ohair@286 684 }
alanb@368 685 }
ohair@286 686
alanb@368 687 return false;
alanb@368 688 }
alanb@368 689
alanb@368 690 private static final class OnExitRunnableException extends RuntimeException {
alanb@368 691 private static final long serialVersionUID = 1L;
alanb@368 692
alanb@368 693 Throwable target;
alanb@368 694
alanb@368 695 public OnExitRunnableException(Throwable target) {
alanb@368 696 super((Throwable)null); // see pattern for InvocationTargetException
alanb@368 697 this.target = target;
ohair@286 698 }
ohair@286 699 }
ohair@286 700
ohair@286 701 /**
ohair@286 702 * Adds a new {@link FiberContextSwitchInterceptor} to this fiber.
ohair@286 703 * <p/>
ohair@286 704 * <p/>
ohair@286 705 * The newly installed fiber will take effect immediately after the current
ohair@286 706 * tube returns from its {@link Tube#processRequest(Packet)} or
ohair@286 707 * {@link Tube#processResponse(Packet)}, before the next tube begins processing.
ohair@286 708 * <p/>
ohair@286 709 * <p/>
ohair@286 710 * So when the tubeline consists of X and Y, and when X installs an interceptor,
ohair@286 711 * the order of execution will be as follows:
ohair@286 712 * <p/>
ohair@286 713 * <ol>
ohair@286 714 * <li>X.processRequest()
ohair@286 715 * <li>interceptor gets installed
ohair@286 716 * <li>interceptor.execute() is invoked
ohair@286 717 * <li>Y.processRequest()
ohair@286 718 * </ol>
ohair@286 719 */
alanb@368 720 public synchronized void addInterceptor(@NotNull FiberContextSwitchInterceptor interceptor) {
ohair@286 721 if (interceptors == null) {
ohair@286 722 interceptors = new ArrayList<FiberContextSwitchInterceptor>();
alanb@368 723 } else {
alanb@368 724 List<FiberContextSwitchInterceptor> l = new ArrayList<FiberContextSwitchInterceptor>();
alanb@368 725 l.addAll(interceptors);
alanb@368 726 interceptors = l;
ohair@286 727 }
ohair@286 728 interceptors.add(interceptor);
ohair@286 729 }
ohair@286 730
ohair@286 731 /**
ohair@286 732 * Removes a {@link FiberContextSwitchInterceptor} from this fiber.
ohair@286 733 * <p/>
ohair@286 734 * <p/>
ohair@286 735 * The removal of the interceptor takes effect immediately after the current
ohair@286 736 * tube returns from its {@link Tube#processRequest(Packet)} or
ohair@286 737 * {@link Tube#processResponse(Packet)}, before the next tube begins processing.
ohair@286 738 * <p/>
ohair@286 739 * <p/>
ohair@286 740 * <p/>
ohair@286 741 * So when the tubeline consists of X and Y, and when Y uninstalls an interceptor
ohair@286 742 * on the way out, then the order of execution will be as follows:
ohair@286 743 * <p/>
ohair@286 744 * <ol>
ohair@286 745 * <li>Y.processResponse() (notice that this happens with interceptor.execute() in the callstack)
ohair@286 746 * <li>interceptor gets uninstalled
ohair@286 747 * <li>interceptor.execute() returns
ohair@286 748 * <li>X.processResponse()
ohair@286 749 * </ol>
ohair@286 750 *
ohair@286 751 * @return true if the specified interceptor was removed. False if
ohair@286 752 * the specified interceptor was not registered with this fiber to begin with.
ohair@286 753 */
alanb@368 754 public synchronized boolean removeInterceptor(@NotNull FiberContextSwitchInterceptor interceptor) {
alanb@368 755 if (interceptors != null) {
alanb@368 756 boolean result = interceptors.remove(interceptor);
alanb@368 757 if (interceptors.isEmpty())
alanb@368 758 interceptors = null;
alanb@368 759 else {
alanb@368 760 List<FiberContextSwitchInterceptor> l = new ArrayList<FiberContextSwitchInterceptor>();
alanb@368 761 l.addAll(interceptors);
alanb@368 762 interceptors = l;
alanb@368 763 }
alanb@368 764 return result;
ohair@286 765 }
ohair@286 766 return false;
ohair@286 767 }
ohair@286 768
ohair@286 769 /**
ohair@286 770 * Gets the context {@link ClassLoader} of this fiber.
ohair@286 771 */
ohair@286 772 public
ohair@286 773 @Nullable
ohair@286 774 ClassLoader getContextClassLoader() {
ohair@286 775 return contextClassLoader;
ohair@286 776 }
ohair@286 777
ohair@286 778 /**
ohair@286 779 * Sets the context {@link ClassLoader} of this fiber.
ohair@286 780 */
ohair@286 781 public ClassLoader setContextClassLoader(@Nullable ClassLoader contextClassLoader) {
ohair@286 782 ClassLoader r = this.contextClassLoader;
ohair@286 783 this.contextClassLoader = contextClassLoader;
ohair@286 784 return r;
ohair@286 785 }
ohair@286 786
ohair@286 787 /**
ohair@286 788 * DO NOT CALL THIS METHOD. This is an implementation detail
ohair@286 789 * of {@link Fiber}.
ohair@286 790 */
ohair@286 791 @Deprecated
alanb@368 792 @Override
ohair@286 793 public void run() {
alanb@368 794 Container old = ContainerResolver.getDefault().enterContainer(owner.getContainer());
alanb@368 795 try {
alanb@368 796 assert !synchronous;
alanb@368 797 // doRun returns true to indicate an early exit from fiber processing
alanb@368 798 if (!doRun()) {
alanb@368 799 if (startedSync && suspendedCount == 0 &&
alanb@368 800 (next != null || contsSize > 0)) {
alanb@368 801 // We bailed out of running this fiber we started as sync, and now
alanb@368 802 // want to finish running it async
alanb@368 803 startedSync = false;
alanb@368 804 // Start back up as an async fiber
alanb@368 805 dumpFiberContext("restarting (async) after startSync");
alanb@368 806 owner.addRunnable(this);
alanb@368 807 } else {
alanb@368 808 completionCheck();
alanb@368 809 }
alanb@368 810 }
alanb@368 811 } finally {
alanb@368 812 ContainerResolver.getDefault().exitContainer(old);
ohair@286 813 }
ohair@286 814 }
ohair@286 815
ohair@286 816 /**
ohair@286 817 * Runs a given {@link Tube} (and everything thereafter) synchronously.
ohair@286 818 * <p/>
ohair@286 819 * <p/>
ohair@286 820 * This method blocks and returns only when all the successive {@link Tube}s
ohair@286 821 * complete their request/response processing. This method can be used
ohair@286 822 * if a {@link Tube} needs to fallback to synchronous processing.
ohair@286 823 * <p/>
ohair@286 824 * <h3>Example:</h3>
ohair@286 825 * <pre>
ohair@286 826 * class FooTube extends {@link AbstractFilterTubeImpl} {
ohair@286 827 * NextAction processRequest(Packet request) {
ohair@286 828 * // run everything synchronously and return with the response packet
ohair@286 829 * return doReturnWith(Fiber.current().runSync(next,request));
ohair@286 830 * }
ohair@286 831 * NextAction processResponse(Packet response) {
ohair@286 832 * // never be invoked
ohair@286 833 * }
ohair@286 834 * }
ohair@286 835 * </pre>
ohair@286 836 *
ohair@286 837 * @param tubeline The first tube of the tubeline that will act on the packet.
ohair@286 838 * @param request The request packet to be passed to <tt>startPoint.processRequest()</tt>.
ohair@286 839 * @return The response packet to the <tt>request</tt>.
ohair@286 840 * @see #start(Tube, Packet, CompletionCallback)
ohair@286 841 */
alanb@368 842 public
ohair@286 843 @NotNull
ohair@286 844 Packet runSync(@NotNull Tube tubeline, @NotNull Packet request) {
alanb@368 845 lock.lock();
alanb@368 846 try {
alanb@368 847 // save the current continuation, so that we return runSync() without executing them.
alanb@368 848 final Tube[] oldCont = conts;
alanb@368 849 final int oldContSize = contsSize;
alanb@368 850 final boolean oldSynchronous = synchronous;
alanb@368 851 final Tube oldNext = next;
ohair@286 852
alanb@368 853 if (oldContSize > 0) {
alanb@368 854 conts = new Tube[16];
alanb@368 855 contsSize = 0;
alanb@368 856 }
ohair@286 857
alanb@368 858 try {
alanb@368 859 synchronous = true;
alanb@368 860 this.packet = request;
alanb@368 861 next = tubeline;
alanb@368 862 doRun();
alanb@368 863 if (throwable != null) {
alanb@368 864 if (isDeliverThrowableInPacket) {
alanb@368 865 packet.addSatellite(new ThrowableContainerPropertySet(throwable));
alanb@368 866 } else {
alanb@368 867 if (throwable instanceof RuntimeException) {
alanb@368 868 throw (RuntimeException) throwable;
alanb@368 869 }
alanb@368 870 if (throwable instanceof Error) {
alanb@368 871 throw (Error) throwable;
alanb@368 872 }
alanb@368 873 // our system is supposed to only accept Error or RuntimeException
alanb@368 874 throw new AssertionError(throwable);
alanb@368 875 }
ohair@286 876 }
alanb@368 877 return this.packet;
alanb@368 878 } finally {
alanb@368 879 conts = oldCont;
alanb@368 880 contsSize = oldContSize;
alanb@368 881 synchronous = oldSynchronous;
alanb@368 882 next = oldNext;
alanb@368 883 if(interrupted) {
alanb@368 884 Thread.currentThread().interrupt();
alanb@368 885 interrupted = false;
ohair@286 886 }
alanb@368 887 if(!started && !startedSync)
alanb@368 888 completionCheck();
ohair@286 889 }
ohair@286 890 } finally {
alanb@368 891 lock.unlock();
ohair@286 892 }
ohair@286 893 }
ohair@286 894
alanb@368 895 private void completionCheck() {
alanb@368 896 lock.lock();
alanb@368 897 try {
alanb@368 898 // Don't trigger completion and callbacks if fiber is suspended
alanb@368 899 if(!isCanceled && contsSize==0 && suspendedCount == 0) {
alanb@368 900 if(isTraceEnabled())
alanb@368 901 LOGGER.log(Level.FINE, "{0} completed", getName());
alanb@368 902 clearListeners();
alanb@368 903 condition.signalAll();
alanb@368 904 if (completionCallback != null) {
alanb@368 905 if (throwable != null) {
alanb@368 906 if (isDeliverThrowableInPacket) {
alanb@368 907 packet.addSatellite(new ThrowableContainerPropertySet(throwable));
alanb@368 908 completionCallback.onCompletion(packet);
alanb@368 909 } else
alanb@368 910 completionCallback.onCompletion(throwable);
alanb@368 911 } else
alanb@368 912 completionCallback.onCompletion(packet);
alanb@368 913 }
ohair@286 914 }
alanb@368 915 } finally {
alanb@368 916 lock.unlock();
ohair@286 917 }
ohair@286 918 }
ohair@286 919
ohair@286 920 /**
ohair@286 921 * Invokes all registered {@link InterceptorHandler}s and then call into
ohair@286 922 * {@link Fiber#__doRun()}.
ohair@286 923 */
ohair@286 924 private class InterceptorHandler implements FiberContextSwitchInterceptor.Work<Tube, Tube> {
alanb@368 925 private final Holder<Boolean> isUnlockRequired;
alanb@368 926 private final List<FiberContextSwitchInterceptor> ints;
alanb@368 927
ohair@286 928 /**
ohair@286 929 * Index in {@link Fiber#interceptors} to invoke next.
ohair@286 930 */
ohair@286 931 private int idx;
ohair@286 932
alanb@368 933 public InterceptorHandler(Holder<Boolean> isUnlockRequired, List<FiberContextSwitchInterceptor> ints) {
alanb@368 934 this.isUnlockRequired = isUnlockRequired;
alanb@368 935 this.ints = ints;
alanb@368 936 }
alanb@368 937
ohair@286 938 /**
ohair@286 939 * Initiate the interception, and eventually invokes {@link Fiber#__doRun()}.
ohair@286 940 */
ohair@286 941 Tube invoke(Tube next) {
ohair@286 942 idx = 0;
ohair@286 943 return execute(next);
ohair@286 944 }
ohair@286 945
alanb@368 946 @Override
ohair@286 947 public Tube execute(Tube next) {
alanb@368 948 if (idx == ints.size()) {
ohair@286 949 Fiber.this.next = next;
alanb@368 950 if (__doRun(isUnlockRequired, ints))
alanb@368 951 return PLACEHOLDER;
ohair@286 952 } else {
alanb@368 953 FiberContextSwitchInterceptor interceptor = ints.get(idx++);
ohair@286 954 return interceptor.execute(Fiber.this, next, this);
ohair@286 955 }
ohair@286 956 return Fiber.this.next;
ohair@286 957 }
ohair@286 958 }
ohair@286 959
alanb@368 960 private static final PlaceholderTube PLACEHOLDER = new PlaceholderTube();
alanb@368 961
alanb@368 962 private static class PlaceholderTube extends AbstractTubeImpl {
alanb@368 963
alanb@368 964 @Override
alanb@368 965 public NextAction processRequest(Packet request) {
alanb@368 966 throw new UnsupportedOperationException();
alanb@368 967 }
alanb@368 968
alanb@368 969 @Override
alanb@368 970 public NextAction processResponse(Packet response) {
alanb@368 971 throw new UnsupportedOperationException();
alanb@368 972 }
alanb@368 973
alanb@368 974 @Override
alanb@368 975 public NextAction processException(Throwable t) {
alanb@368 976 return doThrow(t);
alanb@368 977 }
alanb@368 978
alanb@368 979 @Override
alanb@368 980 public void preDestroy() {
alanb@368 981 }
alanb@368 982
alanb@368 983 @Override
alanb@368 984 public PlaceholderTube copy(TubeCloner cloner) {
alanb@368 985 throw new UnsupportedOperationException();
alanb@368 986 }
alanb@368 987 }
alanb@368 988
ohair@286 989 /**
ohair@286 990 * Executes the fiber as much as possible.
ohair@286 991 *
ohair@286 992 */
alanb@368 993 private boolean doRun() {
ohair@286 994 dumpFiberContext("running");
ohair@286 995
ohair@286 996 if (serializeExecution) {
ohair@286 997 serializedExecutionLock.lock();
ohair@286 998 try {
alanb@368 999 return _doRun(next);
ohair@286 1000 } finally {
ohair@286 1001 serializedExecutionLock.unlock();
ohair@286 1002 }
ohair@286 1003 } else {
alanb@368 1004 return _doRun(next);
ohair@286 1005 }
ohair@286 1006 }
ohair@286 1007
alanb@368 1008 private boolean _doRun(Tube next) {
alanb@368 1009 // isRequireUnlock will contain Boolean.FALSE when lock has already been released in suspend
alanb@368 1010 Holder<Boolean> isRequireUnlock = new Holder<Boolean>(Boolean.TRUE);
alanb@368 1011 lock.lock();
alanb@368 1012 try {
alanb@368 1013 List<FiberContextSwitchInterceptor> ints;
alanb@368 1014 ClassLoader old;
alanb@368 1015 synchronized(this) {
alanb@368 1016 ints = interceptors;
ohair@286 1017
alanb@368 1018 // currentThread is protected by the monitor for this fiber so
alanb@368 1019 // that it is accessible to cancel() even when the lock is held
alanb@368 1020 currentThread = Thread.currentThread();
alanb@368 1021 if (isTraceEnabled()) {
alanb@368 1022 LOGGER.log(Level.FINE, "Thread entering _doRun(): {0}", currentThread);
alanb@368 1023 }
alanb@368 1024
alanb@368 1025 old = currentThread.getContextClassLoader();
alanb@368 1026 currentThread.setContextClassLoader(contextClassLoader);
ohair@286 1027 }
alanb@368 1028
alanb@368 1029 try {
alanb@368 1030 boolean needsToReenter = false;
alanb@368 1031 do {
alanb@368 1032 // if interceptors are set, go through the interceptors.
alanb@368 1033 if (ints == null) {
alanb@368 1034 this.next = next;
alanb@368 1035 if (__doRun(isRequireUnlock, ints)) {
alanb@368 1036 return true;
alanb@368 1037 }
alanb@368 1038 } else {
alanb@368 1039 next = new InterceptorHandler(isRequireUnlock, ints).invoke(next);
alanb@368 1040 if (next == PLACEHOLDER) {
alanb@368 1041 return true;
alanb@368 1042 }
alanb@368 1043 }
alanb@368 1044
alanb@368 1045 synchronized(this) {
alanb@368 1046 needsToReenter = (ints != interceptors);
alanb@368 1047 if (needsToReenter)
alanb@368 1048 ints = interceptors;
alanb@368 1049 }
alanb@368 1050 } while (needsToReenter);
alanb@368 1051 } catch(OnExitRunnableException o) {
alanb@368 1052 // catching this exception indicates onExitRunnable in suspend() threw.
alanb@368 1053 // we must still avoid double unlock
alanb@368 1054 Throwable t = o.target;
alanb@368 1055 if (t instanceof WebServiceException)
alanb@368 1056 throw (WebServiceException) t;
alanb@368 1057 throw new WebServiceException(t);
alanb@368 1058 } finally {
alanb@368 1059 // don't reference currentThread here because fiber processing
alanb@368 1060 // may already be running on a different thread (Note: isAlreadyExited
alanb@368 1061 // tracks this state
alanb@368 1062 Thread thread = Thread.currentThread();
alanb@368 1063 thread.setContextClassLoader(old);
alanb@368 1064 if (isTraceEnabled()) {
alanb@368 1065 LOGGER.log(Level.FINE, "Thread leaving _doRun(): {0}", thread);
alanb@368 1066 }
ohair@286 1067 }
ohair@286 1068
alanb@368 1069 return false;
alanb@368 1070 } finally {
alanb@368 1071 if (isRequireUnlock.value) {
alanb@368 1072 synchronized(this) {
alanb@368 1073 currentThread = null;
ohair@286 1074 }
alanb@368 1075 lock.unlock();
ohair@286 1076 }
ohair@286 1077 }
ohair@286 1078 }
ohair@286 1079
ohair@286 1080 /**
ohair@286 1081 * To be invoked from {@link #doRun()}.
ohair@286 1082 *
ohair@286 1083 * @see #doRun()
ohair@286 1084 */
alanb@368 1085 private boolean __doRun(Holder<Boolean> isRequireUnlock, List<FiberContextSwitchInterceptor> originalInterceptors) {
alanb@368 1086 assert(lock.isHeldByCurrentThread());
alanb@368 1087
ohair@286 1088 final Fiber old = CURRENT_FIBER.get();
ohair@286 1089 CURRENT_FIBER.set(this);
ohair@286 1090
ohair@286 1091 // if true, lots of debug messages to show what's being executed
ohair@286 1092 final boolean traceEnabled = LOGGER.isLoggable(Level.FINER);
ohair@286 1093
ohair@286 1094 try {
ohair@286 1095 boolean abortResponse = false;
alanb@368 1096 while(isReady(originalInterceptors)) {
alanb@368 1097 if (isCanceled) {
alanb@368 1098 next = null;
alanb@368 1099 throwable = null;
alanb@368 1100 contsSize = 0;
alanb@368 1101 break;
alanb@368 1102 }
alanb@368 1103
ohair@286 1104 try {
ohair@286 1105 NextAction na;
ohair@286 1106 Tube last;
ohair@286 1107 if(throwable!=null) {
ohair@286 1108 if(contsSize==0 || abortResponse) {
ohair@286 1109 contsSize = 0; // abortResponse case
ohair@286 1110 // nothing else to execute. we are done.
alanb@368 1111 return false;
ohair@286 1112 }
ohair@286 1113 last = popCont();
ohair@286 1114 if (traceEnabled)
alanb@368 1115 LOGGER.log(Level.FINER, "{0} {1}.processException({2})", new Object[]{getName(), last, throwable});
ohair@286 1116 na = last.processException(throwable);
ohair@286 1117 } else {
ohair@286 1118 if(next!=null) {
ohair@286 1119 if(traceEnabled)
alanb@368 1120 LOGGER.log(Level.FINER, "{0} {1}.processRequest({2})", new Object[]{getName(), next, packet != null ? "Packet@"+Integer.toHexString(packet.hashCode()) : "null"});
ohair@286 1121 na = next.processRequest(packet);
ohair@286 1122 last = next;
ohair@286 1123 } else {
ohair@286 1124 if(contsSize==0 || abortResponse) {
ohair@286 1125 // nothing else to execute. we are done.
ohair@286 1126 contsSize = 0;
alanb@368 1127 return false;
ohair@286 1128 }
ohair@286 1129 last = popCont();
ohair@286 1130 if(traceEnabled)
alanb@368 1131 LOGGER.log(Level.FINER, "{0} {1}.processResponse({2})", new Object[]{getName(), last, packet != null ? "Packet@"+Integer.toHexString(packet.hashCode()) : "null"});
ohair@286 1132 na = last.processResponse(packet);
ohair@286 1133 }
ohair@286 1134 }
ohair@286 1135
ohair@286 1136 if (traceEnabled)
alanb@368 1137 LOGGER.log(Level.FINER, "{0} {1} returned with {2}", new Object[]{getName(), last, na});
ohair@286 1138
ohair@286 1139 // If resume is called before suspend, then make sure
ohair@286 1140 // resume(Packet) is not lost
ohair@286 1141 if (na.kind != NextAction.SUSPEND) {
ohair@286 1142 // preserve in-flight packet so that processException may inspect
ohair@286 1143 if (na.kind != NextAction.THROW &&
ohair@286 1144 na.kind != NextAction.THROW_ABORT_RESPONSE)
ohair@286 1145 packet = na.packet;
ohair@286 1146 throwable = na.throwable;
ohair@286 1147 }
ohair@286 1148
ohair@286 1149 switch(na.kind) {
ohair@286 1150 case NextAction.INVOKE:
ohair@286 1151 case NextAction.INVOKE_ASYNC:
ohair@286 1152 pushCont(last);
ohair@286 1153 // fall through next
ohair@286 1154 case NextAction.INVOKE_AND_FORGET:
ohair@286 1155 next = na.next;
ohair@286 1156 if (na.kind == NextAction.INVOKE_ASYNC
ohair@286 1157 && startedSync) {
ohair@286 1158 // Break out here
alanb@368 1159 return false;
ohair@286 1160 }
ohair@286 1161 break;
ohair@286 1162 case NextAction.THROW_ABORT_RESPONSE:
ohair@286 1163 case NextAction.ABORT_RESPONSE:
ohair@286 1164 abortResponse = true;
alanb@368 1165 if (isTraceEnabled()) {
alanb@368 1166 LOGGER.log(Level.FINE, "Fiber {0} is aborting a response due to exception: {1}", new Object[]{this, na.throwable});
ohair@286 1167 }
ohair@286 1168 case NextAction.RETURN:
ohair@286 1169 case NextAction.THROW:
ohair@286 1170 next = null;
ohair@286 1171 break;
ohair@286 1172 case NextAction.SUSPEND:
ohair@286 1173 if (next != null) {
ohair@286 1174 // Only store the 'last' tube when we're processing
ohair@286 1175 // a request, since conts array is for processResponse
ohair@286 1176 pushCont(last);
ohair@286 1177 }
ohair@286 1178 next = na.next;
alanb@368 1179 if(suspend(isRequireUnlock, na.onExitRunnable))
alanb@368 1180 return true; // explicitly exiting control loop
ohair@286 1181 break;
ohair@286 1182 default:
ohair@286 1183 throw new AssertionError();
ohair@286 1184 }
ohair@286 1185 } catch (RuntimeException t) {
ohair@286 1186 if (traceEnabled)
ohair@286 1187 LOGGER.log(Level.FINER, getName() + " Caught " + t + ". Start stack unwinding", t);
ohair@286 1188 throwable = t;
ohair@286 1189 } catch (Error t) {
ohair@286 1190 if (traceEnabled)
ohair@286 1191 LOGGER.log(Level.FINER, getName() + " Caught " + t + ". Start stack unwinding", t);
ohair@286 1192 throwable = t;
ohair@286 1193 }
ohair@286 1194
ohair@286 1195 dumpFiberContext("After tube execution");
ohair@286 1196 }
ohair@286 1197
ohair@286 1198 // there's nothing we can execute right away.
ohair@286 1199 // we'll be back when this fiber is resumed.
ohair@286 1200
ohair@286 1201 } finally {
ohair@286 1202 CURRENT_FIBER.set(old);
ohair@286 1203 }
alanb@368 1204
alanb@368 1205 return false;
ohair@286 1206 }
ohair@286 1207
ohair@286 1208 private void pushCont(Tube tube) {
ohair@286 1209 conts[contsSize++] = tube;
ohair@286 1210
ohair@286 1211 // expand if needed
ohair@286 1212 int len = conts.length;
ohair@286 1213 if (contsSize == len) {
ohair@286 1214 Tube[] newBuf = new Tube[len * 2];
ohair@286 1215 System.arraycopy(conts, 0, newBuf, 0, len);
ohair@286 1216 conts = newBuf;
ohair@286 1217 }
ohair@286 1218 }
ohair@286 1219
ohair@286 1220 private Tube popCont() {
ohair@286 1221 return conts[--contsSize];
ohair@286 1222 }
ohair@286 1223
ohair@286 1224 private Tube peekCont() {
ohair@286 1225 int index = contsSize - 1;
ohair@286 1226 if (index >= 0 && index < conts.length) {
ohair@286 1227 return conts[index];
ohair@286 1228 } else {
ohair@286 1229 return null;
ohair@286 1230 }
ohair@286 1231 }
ohair@286 1232
ohair@286 1233 /**
ohair@286 1234 * Only to be used by Tubes that manipulate the Fiber to create alternate flows
ohair@286 1235 * @since 2.2.6
ohair@286 1236 */
ohair@286 1237 public void resetCont(Tube[] conts, int contsSize) {
ohair@286 1238 this.conts = conts;
ohair@286 1239 this.contsSize = contsSize;
ohair@286 1240 }
ohair@286 1241
ohair@286 1242 /**
alanb@368 1243 * Returns true if the fiber is ready to execute.
ohair@286 1244 */
alanb@368 1245 private boolean isReady(List<FiberContextSwitchInterceptor> originalInterceptors) {
ohair@286 1246 if (synchronous) {
ohair@286 1247 while (suspendedCount == 1)
ohair@286 1248 try {
ohair@286 1249 if (isTraceEnabled()) {
alanb@368 1250 LOGGER.log(Level.FINE, "{0} is blocking thread {1}", new Object[]{getName(), Thread.currentThread().getName()});
ohair@286 1251 }
alanb@368 1252 condition.await(); // the synchronized block is the whole runSync method.
ohair@286 1253 } catch (InterruptedException e) {
ohair@286 1254 // remember that we are interrupted, but don't respond to it
ohair@286 1255 // right away. This behavior is in line with what happens
ohair@286 1256 // when you are actually running the whole thing synchronously.
ohair@286 1257 interrupted = true;
ohair@286 1258 }
alanb@368 1259
alanb@368 1260 synchronized(this) {
alanb@368 1261 return interceptors == originalInterceptors;
alanb@368 1262 }
ohair@286 1263 }
alanb@368 1264 else {
alanb@368 1265 if (suspendedCount>0)
alanb@368 1266 return false;
alanb@368 1267 synchronized(this) {
alanb@368 1268 return interceptors == originalInterceptors;
alanb@368 1269 }
alanb@368 1270 }
ohair@286 1271 }
ohair@286 1272
ohair@286 1273 private String getName() {
ohair@286 1274 return "engine-" + owner.id + "fiber-" + id;
ohair@286 1275 }
ohair@286 1276
ohair@286 1277 @Override
ohair@286 1278 public String toString() {
ohair@286 1279 return getName();
ohair@286 1280 }
ohair@286 1281
ohair@286 1282 /**
ohair@286 1283 * Gets the current {@link Packet} associated with this fiber.
ohair@286 1284 * <p/>
ohair@286 1285 * <p/>
ohair@286 1286 * This method returns null if no packet has been associated with the fiber yet.
ohair@286 1287 */
ohair@286 1288 public
ohair@286 1289 @Nullable
ohair@286 1290 Packet getPacket() {
ohair@286 1291 return packet;
ohair@286 1292 }
ohair@286 1293
ohair@286 1294 /**
ohair@286 1295 * Returns completion callback associated with this Fiber
ohair@286 1296 * @return Completion callback
ohair@286 1297 * @since 2.2.6
ohair@286 1298 */
ohair@286 1299 public CompletionCallback getCompletionCallback() {
ohair@286 1300 return completionCallback;
ohair@286 1301 }
ohair@286 1302
ohair@286 1303 /**
ohair@286 1304 * Updates completion callback associated with this Fiber
ohair@286 1305 * @param completionCallback Completion callback
ohair@286 1306 * @since 2.2.6
ohair@286 1307 */
ohair@286 1308 public void setCompletionCallback(CompletionCallback completionCallback) {
ohair@286 1309 this.completionCallback = completionCallback;
ohair@286 1310 }
ohair@286 1311
ohair@286 1312 /**
ohair@286 1313 * (ADVANCED) Returns true if the current fiber is being executed synchronously.
ohair@286 1314 * <p/>
ohair@286 1315 * <p/>
ohair@286 1316 * Fiber may run synchronously for various reasons. Perhaps this is
ohair@286 1317 * on client side and application has invoked a synchronous method call.
ohair@286 1318 * Perhaps this is on server side and we have deployed on a synchronous
ohair@286 1319 * transport (like servlet.)
ohair@286 1320 * <p/>
ohair@286 1321 * <p/>
ohair@286 1322 * When a fiber is run synchronously (IOW by {@link #runSync(Tube, Packet)}),
ohair@286 1323 * further invocations to {@link #runSync(Tube, Packet)} can be done
ohair@286 1324 * without degrading the performance.
ohair@286 1325 * <p/>
ohair@286 1326 * <p/>
ohair@286 1327 * So this value can be used as a further optimization hint for
ohair@286 1328 * advanced {@link Tube}s to choose the best strategy to invoke
ohair@286 1329 * the next {@link Tube}. For example, a tube may want to install
ohair@286 1330 * a {@link FiberContextSwitchInterceptor} if running async, yet
ohair@286 1331 * it might find it faster to do {@link #runSync(Tube, Packet)}
ohair@286 1332 * if it's already running synchronously.
ohair@286 1333 */
ohair@286 1334 public static boolean isSynchronous() {
ohair@286 1335 return current().synchronous;
ohair@286 1336 }
ohair@286 1337
ohair@286 1338 /**
ohair@286 1339 * Returns true if the current Fiber on the current thread was started
ohair@286 1340 * synchronously. Note, this is not strictly the same as being synchronous
ohair@286 1341 * because the assumption is that the Fiber will ultimately be dispatched
ohair@286 1342 * asynchronously, possibly have a completion callback associated with it, etc.
ohair@286 1343 * Note, the 'startedSync' flag is cleared once the current Fiber is
ohair@286 1344 * converted to running asynchronously.
ohair@286 1345 * @since 2.2.6
ohair@286 1346 */
ohair@286 1347 public boolean isStartedSync() {
ohair@286 1348 return startedSync;
ohair@286 1349 }
ohair@286 1350
ohair@286 1351 /**
ohair@286 1352 * Gets the current fiber that's running.
ohair@286 1353 * <p/>
ohair@286 1354 * <p/>
ohair@286 1355 * This works like {@link Thread#currentThread()}.
ohair@286 1356 * This method only works when invoked from {@link Tube}.
ohair@286 1357 */
ohair@286 1358 public static
ohair@286 1359 @NotNull
ohair@286 1360 Fiber current() {
ohair@286 1361 Fiber fiber = CURRENT_FIBER.get();
ohair@286 1362 if (fiber == null)
ohair@286 1363 throw new IllegalStateException("Can be only used from fibers");
ohair@286 1364 return fiber;
ohair@286 1365 }
ohair@286 1366
ohair@286 1367 /**
ohair@286 1368 * Gets the current fiber that's running, if set.
ohair@286 1369 */
ohair@286 1370 public static Fiber getCurrentIfSet() {
ohair@286 1371 return CURRENT_FIBER.get();
ohair@286 1372 }
ohair@286 1373
ohair@286 1374 private static final ThreadLocal<Fiber> CURRENT_FIBER = new ThreadLocal<Fiber>();
ohair@286 1375
ohair@286 1376 /**
ohair@286 1377 * Used to allocate unique number for each fiber.
ohair@286 1378 */
ohair@286 1379 private static final AtomicInteger iotaGen = new AtomicInteger();
ohair@286 1380
ohair@286 1381 private static boolean isTraceEnabled() {
ohair@286 1382 return LOGGER.isLoggable(Level.FINE);
ohair@286 1383 }
ohair@286 1384
ohair@286 1385 private static final Logger LOGGER = Logger.getLogger(Fiber.class.getName());
ohair@286 1386
ohair@286 1387
ohair@286 1388 private static final ReentrantLock serializedExecutionLock = new ReentrantLock();
ohair@286 1389
ohair@286 1390 /**
ohair@286 1391 * Set this boolean to true to execute fibers sequentially one by one.
ohair@286 1392 * See class javadoc.
ohair@286 1393 */
ohair@286 1394 public static volatile boolean serializeExecution = Boolean.getBoolean(Fiber.class.getName() + ".serialize");
ohair@286 1395
ohair@286 1396 private final Set<Component> components = new CopyOnWriteArraySet<Component>();
ohair@286 1397
alanb@368 1398 @Override
alanb@368 1399 public <S> S getSPI(Class<S> spiType) {
alanb@368 1400 for (Component c : components) {
alanb@368 1401 S spi = c.getSPI(spiType);
alanb@368 1402 if (spi != null) {
alanb@368 1403 return spi;
alanb@368 1404 }
ohair@286 1405 }
alanb@368 1406 return null;
alanb@368 1407 }
ohair@286 1408
alanb@368 1409 @Override
alanb@368 1410 public Set<Component> getComponents() {
alanb@368 1411 return components;
alanb@368 1412 }
ohair@286 1413 }

mercurial