src/share/jaxws_classes/com/sun/xml/internal/ws/api/pipe/Fiber.java

Thu, 31 Aug 2017 15:18:52 +0800

author
aoqi
date
Thu, 31 Aug 2017 15:18:52 +0800
changeset 637
9c07ef4934dd
parent 384
8f2986ff0235
parent 0
373ffda63c9a
permissions
-rw-r--r--

merge

aoqi@0 1 /*
aoqi@0 2 * Copyright (c) 1997, 2013, Oracle and/or its affiliates. All rights reserved.
aoqi@0 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
aoqi@0 4 *
aoqi@0 5 * This code is free software; you can redistribute it and/or modify it
aoqi@0 6 * under the terms of the GNU General Public License version 2 only, as
aoqi@0 7 * published by the Free Software Foundation. Oracle designates this
aoqi@0 8 * particular file as subject to the "Classpath" exception as provided
aoqi@0 9 * by Oracle in the LICENSE file that accompanied this code.
aoqi@0 10 *
aoqi@0 11 * This code is distributed in the hope that it will be useful, but WITHOUT
aoqi@0 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
aoqi@0 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
aoqi@0 14 * version 2 for more details (a copy is included in the LICENSE file that
aoqi@0 15 * accompanied this code).
aoqi@0 16 *
aoqi@0 17 * You should have received a copy of the GNU General Public License version
aoqi@0 18 * 2 along with this work; if not, write to the Free Software Foundation,
aoqi@0 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
aoqi@0 20 *
aoqi@0 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
aoqi@0 22 * or visit www.oracle.com if you need additional information or have any
aoqi@0 23 * questions.
aoqi@0 24 */
aoqi@0 25
aoqi@0 26 package com.sun.xml.internal.ws.api.pipe;
aoqi@0 27
aoqi@0 28 import com.sun.istack.internal.NotNull;
aoqi@0 29 import com.sun.istack.internal.Nullable;
aoqi@0 30 import com.sun.xml.internal.ws.api.Cancelable;
aoqi@0 31 import com.sun.xml.internal.ws.api.Component;
aoqi@0 32 import com.sun.xml.internal.ws.api.ComponentRegistry;
aoqi@0 33 import com.sun.xml.internal.ws.api.SOAPVersion;
aoqi@0 34 import com.sun.xml.internal.ws.api.addressing.AddressingVersion;
aoqi@0 35 import com.sun.xml.internal.ws.api.message.AddressingUtils;
aoqi@0 36 import com.sun.xml.internal.ws.api.message.Packet;
aoqi@0 37 import com.sun.xml.internal.ws.api.pipe.helper.AbstractFilterTubeImpl;
aoqi@0 38 import com.sun.xml.internal.ws.api.pipe.helper.AbstractTubeImpl;
aoqi@0 39 import com.sun.xml.internal.ws.api.server.Adapter;
aoqi@0 40 import com.sun.xml.internal.ws.api.server.Container;
aoqi@0 41 import com.sun.xml.internal.ws.api.server.ContainerResolver;
aoqi@0 42
aoqi@0 43 import java.util.ArrayList;
aoqi@0 44 import java.util.List;
aoqi@0 45 import java.util.Set;
aoqi@0 46 import java.util.concurrent.CopyOnWriteArraySet;
aoqi@0 47 import java.util.concurrent.atomic.AtomicInteger;
aoqi@0 48 import java.util.concurrent.locks.Condition;
aoqi@0 49 import java.util.concurrent.locks.ReentrantLock;
aoqi@0 50 import java.util.logging.Level;
aoqi@0 51 import java.util.logging.Logger;
aoqi@0 52
aoqi@0 53 import javax.xml.ws.Holder;
aoqi@0 54 import javax.xml.ws.WebServiceException;
aoqi@0 55
aoqi@0 56 /**
aoqi@0 57 * User-level thread. Represents the execution of one request/response processing.
aoqi@0 58 * <p/>
aoqi@0 59 * <p/>
aoqi@0 60 * JAX-WS RI is capable of running a large number of request/response concurrently by
aoqi@0 61 * using a relatively small number of threads. This is made possible by utilizing
aoqi@0 62 * a {@link Fiber} &mdash; a user-level thread that gets created for each request/response
aoqi@0 63 * processing.
aoqi@0 64 * <p/>
aoqi@0 65 * <p/>
aoqi@0 66 * A fiber remembers where in the pipeline the processing is at, what needs to be
aoqi@0 67 * executed on the way out (when processing response), and other additional information
aoqi@0 68 * specific to the execution of a particular request/response.
aoqi@0 69 * <p/>
aoqi@0 70 * <h2>Suspend/Resume</h2>
aoqi@0 71 * <p/>
aoqi@0 72 * Fiber can be {@link NextAction#suspend() suspended} by a {@link Tube}.
aoqi@0 73 * When a fiber is suspended, it will be kept on the side until it is
aoqi@0 74 * {@link #resume(Packet) resumed}. This allows threads to go execute
aoqi@0 75 * other runnable fibers, allowing efficient utilization of smaller number of
aoqi@0 76 * threads.
aoqi@0 77 * <p/>
aoqi@0 78 * <h2>Context-switch Interception</h2>
aoqi@0 79 * <p/>
aoqi@0 80 * {@link FiberContextSwitchInterceptor} allows {@link Tube}s and {@link Adapter}s
aoqi@0 81 * to perform additional processing every time a thread starts running a fiber
aoqi@0 82 * and stops running it.
aoqi@0 83 * <p/>
aoqi@0 84 * <h2>Context ClassLoader</h2>
aoqi@0 85 * <p/>
aoqi@0 86 * Just like thread, a fiber has a context class loader (CCL.) A fiber's CCL
aoqi@0 87 * becomes the thread's CCL when it's executing the fiber. The original CCL
aoqi@0 88 * of the thread will be restored when the thread leaves the fiber execution.
aoqi@0 89 * <p/>
aoqi@0 90 * <p/>
aoqi@0 91 * <h2>Debugging Aid</h2>
aoqi@0 92 * <p/>
aoqi@0 93 * Because {@link Fiber} doesn't keep much in the call stack, and instead use
aoqi@0 94 * {@link #conts} to store the continuation, debugging fiber related activities
aoqi@0 95 * could be harder.
aoqi@0 96 * <p/>
aoqi@0 97 * <p/>
aoqi@0 98 * Setting the {@link #LOGGER} for FINE would give you basic start/stop/resume/suspend
aoqi@0 99 * level logging. Using FINER would cause more detailed logging, which includes
aoqi@0 100 * what tubes are executed in what order and how they behaved.
aoqi@0 101 * <p/>
aoqi@0 102 * <p/>
aoqi@0 103 * When you debug the server side, consider setting {@link Fiber#serializeExecution}
aoqi@0 104 * to true, so that execution of fibers are serialized. Debugging a server
aoqi@0 105 * with more than one running threads is very tricky, and this switch will
aoqi@0 106 * prevent that. This can be also enabled by setting the system property on.
aoqi@0 107 * See the source code.
aoqi@0 108 *
aoqi@0 109 * @author Kohsuke Kawaguchi
aoqi@0 110 * @author Jitendra Kotamraju
aoqi@0 111 */
aoqi@0 112 public final class Fiber implements Runnable, Cancelable, ComponentRegistry {
aoqi@0 113
aoqi@0 114 /**
aoqi@0 115 * Callback interface for notification of suspend and resume.
aoqi@0 116 *
aoqi@0 117 * @since 2.2.6
aoqi@0 118 * @deprecated Use {@link NextAction#suspend(Runnable)}
aoqi@0 119 */
aoqi@0 120 public interface Listener {
aoqi@0 121 /**
aoqi@0 122 * Fiber has been suspended. Implementations of this callback may resume the Fiber.
aoqi@0 123 * @param fiber Fiber
aoqi@0 124 */
aoqi@0 125 public void fiberSuspended(Fiber fiber);
aoqi@0 126
aoqi@0 127 /**
aoqi@0 128 * Fiber has been resumed. Behavior is undefined if implementations of this callback attempt to suspend the Fiber.
aoqi@0 129 * @param fiber Fiber
aoqi@0 130 */
aoqi@0 131 public void fiberResumed(Fiber fiber);
aoqi@0 132 }
aoqi@0 133
aoqi@0 134 private final List<Listener> _listeners = new ArrayList<Listener>();
aoqi@0 135
aoqi@0 136 /**
aoqi@0 137 * Adds suspend/resume callback listener
aoqi@0 138 * @param listener Listener
aoqi@0 139 * @since 2.2.6
aoqi@0 140 * @deprecated
aoqi@0 141 */
aoqi@0 142 public void addListener(Listener listener) {
aoqi@0 143 synchronized(_listeners) {
aoqi@0 144 if (!_listeners.contains(listener)) {
aoqi@0 145 _listeners.add(listener);
aoqi@0 146 }
aoqi@0 147 }
aoqi@0 148 }
aoqi@0 149
aoqi@0 150 /**
aoqi@0 151 * Removes suspend/resume callback listener
aoqi@0 152 * @param listener Listener
aoqi@0 153 * @since 2.2.6
aoqi@0 154 * @deprecated
aoqi@0 155 */
aoqi@0 156 public void removeListener(Listener listener) {
aoqi@0 157 synchronized(_listeners) {
aoqi@0 158 _listeners.remove(listener);
aoqi@0 159 }
aoqi@0 160 }
aoqi@0 161
aoqi@0 162 List<Listener> getCurrentListeners() {
aoqi@0 163 synchronized(_listeners) {
aoqi@0 164 return new ArrayList<Listener>(_listeners);
aoqi@0 165 }
aoqi@0 166 }
aoqi@0 167
aoqi@0 168 private void clearListeners() {
aoqi@0 169 synchronized(_listeners) {
aoqi@0 170 _listeners.clear();
aoqi@0 171 }
aoqi@0 172 }
aoqi@0 173
aoqi@0 174 /**
aoqi@0 175 * {@link Tube}s whose {@link Tube#processResponse(Packet)} method needs
aoqi@0 176 * to be invoked on the way back.
aoqi@0 177 */
aoqi@0 178 private Tube[] conts = new Tube[16];
aoqi@0 179 private int contsSize;
aoqi@0 180
aoqi@0 181 /**
aoqi@0 182 * If this field is non-null, the next instruction to execute is
aoqi@0 183 * to call its {@link Tube#processRequest(Packet)}. Otherwise
aoqi@0 184 * the instruction is to call {@link #conts}.
aoqi@0 185 */
aoqi@0 186 private Tube next;
aoqi@0 187
aoqi@0 188 private Packet packet;
aoqi@0 189
aoqi@0 190 private Throwable/*but really it's either RuntimeException or Error*/ throwable;
aoqi@0 191
aoqi@0 192 public final Engine owner;
aoqi@0 193
aoqi@0 194 /**
aoqi@0 195 * Is this thread suspended? 0=not suspended, 1=suspended.
aoqi@0 196 * <p/>
aoqi@0 197 * <p/>
aoqi@0 198 * Logically this is just a boolean, but we need to prepare for the case
aoqi@0 199 * where the thread is {@link #resume(Packet) resumed} before we get to the {@link #suspend()}.
aoqi@0 200 * This happens when things happen in the following order:
aoqi@0 201 * <p/>
aoqi@0 202 * <ol>
aoqi@0 203 * <li>Tube decides that the fiber needs to be suspended to wait for the external event.
aoqi@0 204 * <li>Tube hooks up fiber with some external mechanism (like NIO channel selector)
aoqi@0 205 * <li>Tube returns with {@link NextAction#suspend()}.
aoqi@0 206 * <li>"External mechanism" becomes signal state and invokes {@link Fiber#resume(Packet)}
aoqi@0 207 * to wake up fiber
aoqi@0 208 * <li>{@link Fiber#doRun} invokes {@link Fiber#suspend()}.
aoqi@0 209 * </ol>
aoqi@0 210 * <p/>
aoqi@0 211 * <p/>
aoqi@0 212 * Using int, this will work OK because {@link #suspendedCount} becomes -1 when
aoqi@0 213 * {@link #resume(Packet)} occurs before {@link #suspend()}.
aoqi@0 214 * <p/>
aoqi@0 215 * <p/>
aoqi@0 216 * Increment and decrement is guarded by 'this' object.
aoqi@0 217 */
aoqi@0 218 private volatile int suspendedCount = 0;
aoqi@0 219
aoqi@0 220 private volatile boolean isInsideSuspendCallbacks = false;
aoqi@0 221
aoqi@0 222 /**
aoqi@0 223 * Is this {@link Fiber} currently running in the synchronous mode?
aoqi@0 224 */
aoqi@0 225 private boolean synchronous;
aoqi@0 226
aoqi@0 227 private boolean interrupted;
aoqi@0 228
aoqi@0 229 private final int id;
aoqi@0 230
aoqi@0 231 /**
aoqi@0 232 * Active {@link FiberContextSwitchInterceptor}s for this fiber.
aoqi@0 233 */
aoqi@0 234 private List<FiberContextSwitchInterceptor> interceptors;
aoqi@0 235
aoqi@0 236 /**
aoqi@0 237 * Fiber's context {@link ClassLoader}.
aoqi@0 238 */
aoqi@0 239 private
aoqi@0 240 @Nullable
aoqi@0 241 ClassLoader contextClassLoader;
aoqi@0 242
aoqi@0 243 private
aoqi@0 244 @Nullable
aoqi@0 245 CompletionCallback completionCallback;
aoqi@0 246
aoqi@0 247 private boolean isDeliverThrowableInPacket = false;
aoqi@0 248
aoqi@0 249 public void setDeliverThrowableInPacket(boolean isDeliverThrowableInPacket) {
aoqi@0 250 this.isDeliverThrowableInPacket = isDeliverThrowableInPacket;
aoqi@0 251 }
aoqi@0 252
aoqi@0 253 /**
aoqi@0 254 * The thread on which this Fiber is currently executing, if applicable.
aoqi@0 255 */
aoqi@0 256 private Thread currentThread;
aoqi@0 257
aoqi@0 258 /**
aoqi@0 259 * Replace uses of synchronized(this) with this lock so that we can control
aoqi@0 260 * unlocking for resume use cases
aoqi@0 261 */
aoqi@0 262 private final ReentrantLock lock = new ReentrantLock();
aoqi@0 263 private final Condition condition = lock.newCondition();
aoqi@0 264
aoqi@0 265 private volatile boolean isCanceled;
aoqi@0 266
aoqi@0 267 /**
aoqi@0 268 * Set to true if this fiber is started asynchronously, to avoid
aoqi@0 269 * doubly-invoking completion code.
aoqi@0 270 */
aoqi@0 271 private boolean started;
aoqi@0 272
aoqi@0 273 /**
aoqi@0 274 * Set to true if this fiber is started sync but allowed to run async.
aoqi@0 275 * This property exists for use cases where the processing model is fundamentally async
aoqi@0 276 * but some requirement or feature mandates that part of the tubeline run synchronously. For
aoqi@0 277 * instance, WS-ReliableMessaging with non-anonymous addressing is compatible with running
aoqi@0 278 * asynchronously, but if in-order message delivery is used then message processing must assign
aoqi@0 279 * a message number before the remainder of the processing can be asynchronous.
aoqi@0 280 */
aoqi@0 281 private boolean startedSync;
aoqi@0 282
aoqi@0 283 /**
aoqi@0 284 * Callback to be invoked when a {@link Fiber} finishes execution.
aoqi@0 285 */
aoqi@0 286 public interface CompletionCallback {
aoqi@0 287 /**
aoqi@0 288 * Indicates that the fiber has finished its execution.
aoqi@0 289 * <p/>
aoqi@0 290 * <p/>
aoqi@0 291 * Since the JAX-WS RI runs asynchronously,
aoqi@0 292 * this method maybe invoked by a different thread
aoqi@0 293 * than any of the threads that started it or run a part of tubeline.
aoqi@0 294 */
aoqi@0 295 void onCompletion(@NotNull Packet response);
aoqi@0 296
aoqi@0 297 /**
aoqi@0 298 * Indicates that the fiber has finished abnormally, by throwing a given {@link Throwable}.
aoqi@0 299 */
aoqi@0 300 void onCompletion(@NotNull Throwable error);
aoqi@0 301 }
aoqi@0 302
aoqi@0 303 Fiber(Engine engine) {
aoqi@0 304 this.owner = engine;
aoqi@0 305 id = iotaGen.incrementAndGet();
aoqi@0 306 if (isTraceEnabled()) {
aoqi@0 307 LOGGER.log(Level.FINE, "{0} created", getName());
aoqi@0 308 }
aoqi@0 309
aoqi@0 310 // if this is run from another fiber, then we naturally inherit its context classloader,
aoqi@0 311 // so this code works for fiber->fiber inheritance just fine.
aoqi@0 312 contextClassLoader = Thread.currentThread().getContextClassLoader();
aoqi@0 313 }
aoqi@0 314
aoqi@0 315 /**
aoqi@0 316 * Starts the execution of this fiber asynchronously.
aoqi@0 317 * <p/>
aoqi@0 318 * <p/>
aoqi@0 319 * This method works like {@link Thread#start()}.
aoqi@0 320 *
aoqi@0 321 * @param tubeline The first tube of the tubeline that will act on the packet.
aoqi@0 322 * @param request The request packet to be passed to <tt>startPoint.processRequest()</tt>.
aoqi@0 323 * @param completionCallback The callback to be invoked when the processing is finished and the
aoqi@0 324 * final response packet is available.
aoqi@0 325 * @see #runSync(Tube, Packet)
aoqi@0 326 */
aoqi@0 327 public void start(@NotNull Tube tubeline, @NotNull Packet request, @Nullable CompletionCallback completionCallback) {
aoqi@0 328 start(tubeline, request, completionCallback, false);
aoqi@0 329 }
aoqi@0 330
aoqi@0 331 private void dumpFiberContext(String desc) {
aoqi@0 332 if(isTraceEnabled()) {
aoqi@0 333 String action = null;
aoqi@0 334 String msgId = null;
aoqi@0 335 if (packet != null) {
aoqi@0 336 for (SOAPVersion sv: SOAPVersion.values()) {
aoqi@0 337 for (AddressingVersion av: AddressingVersion.values()) {
aoqi@0 338 action = packet.getMessage() != null ? AddressingUtils.getAction(packet.getMessage().getHeaders(), av, sv) : null;
aoqi@0 339 msgId = packet.getMessage() != null ? AddressingUtils.getMessageID(packet.getMessage().getHeaders(), av, sv) : null;
aoqi@0 340 if (action != null || msgId != null) {
aoqi@0 341 break;
aoqi@0 342 }
aoqi@0 343 }
aoqi@0 344 if (action != null || msgId != null) {
aoqi@0 345 break;
aoqi@0 346 }
aoqi@0 347 }
aoqi@0 348 }
aoqi@0 349 String actionAndMsgDesc;
aoqi@0 350 if (action == null && msgId == null) {
aoqi@0 351 actionAndMsgDesc = "NO ACTION or MSG ID";
aoqi@0 352 } else {
aoqi@0 353 actionAndMsgDesc = "'" + action + "' and msgId '" + msgId + "'";
aoqi@0 354 }
aoqi@0 355
aoqi@0 356 String tubeDesc;
aoqi@0 357 if (next != null) {
aoqi@0 358 tubeDesc = next.toString() + ".processRequest()";
aoqi@0 359 } else {
aoqi@0 360 tubeDesc = peekCont() + ".processResponse()";
aoqi@0 361 }
aoqi@0 362
aoqi@0 363 LOGGER.log(Level.FINE, "{0} {1} with {2} and ''current'' tube {3} from thread {4} with Packet: {5}", new Object[]{getName(), desc, actionAndMsgDesc, tubeDesc, Thread.currentThread().getName(), packet != null ? packet.toShortString() : null});
aoqi@0 364 }
aoqi@0 365 }
aoqi@0 366
aoqi@0 367 /**
aoqi@0 368 * Starts the execution of this fiber.
aoqi@0 369 *
aoqi@0 370 * If forceSync is true, then the fiber is started for an ostensibly async invocation,
aoqi@0 371 * but allows for some portion of the tubeline to run sync with the calling
aoqi@0 372 * client instance (Port/Dispatch instance). This allows tubes that enforce
aoqi@0 373 * ordering to see requests in the order they were sent at the point the
aoqi@0 374 * client invoked them.
aoqi@0 375 * <p>
aoqi@0 376 * The forceSync parameter will be true only when the caller (e.g. AsyncInvoker or
aoqi@0 377 * SEIStub) knows one or more tubes need to enforce ordering and thus need
aoqi@0 378 * to run sync with the client. Such tubes can return
aoqi@0 379 * NextAction.INVOKE_ASYNC to indicate that the next tube in the tubeline
aoqi@0 380 * should be invoked async to the current thread.
aoqi@0 381 *
aoqi@0 382 * <p>
aoqi@0 383 * This method works like {@link Thread#start()}.
aoqi@0 384 *
aoqi@0 385 * @param tubeline
aoqi@0 386 * The first tube of the tubeline that will act on the packet.
aoqi@0 387 * @param request
aoqi@0 388 * The request packet to be passed to <tt>startPoint.processRequest()</tt>.
aoqi@0 389 * @param completionCallback
aoqi@0 390 * The callback to be invoked when the processing is finished and the
aoqi@0 391 * final response packet is available.
aoqi@0 392 *
aoqi@0 393 * @see #start(Tube,Packet,CompletionCallback)
aoqi@0 394 * @see #runSync(Tube,Packet)
aoqi@0 395 * @since 2.2.6
aoqi@0 396 */
aoqi@0 397 public void start(@NotNull Tube tubeline, @NotNull Packet request, @Nullable CompletionCallback completionCallback, boolean forceSync) {
aoqi@0 398 next = tubeline;
aoqi@0 399 this.packet = request;
aoqi@0 400 this.completionCallback = completionCallback;
aoqi@0 401
aoqi@0 402 if (forceSync) {
aoqi@0 403 this.startedSync = true;
aoqi@0 404 dumpFiberContext("starting (sync)");
aoqi@0 405 run();
aoqi@0 406 } else {
aoqi@0 407 this.started = true;
aoqi@0 408 dumpFiberContext("starting (async)");
aoqi@0 409 owner.addRunnable(this);
aoqi@0 410 }
aoqi@0 411 }
aoqi@0 412
aoqi@0 413 /**
aoqi@0 414 * Wakes up a suspended fiber.
aoqi@0 415 * <p/>
aoqi@0 416 * <p/>
aoqi@0 417 * If a fiber was suspended without specifying the next {@link Tube},
aoqi@0 418 * then the execution will be resumed in the response processing direction,
aoqi@0 419 * by calling the {@link Tube#processResponse(Packet)} method on the next/first
aoqi@0 420 * {@link Tube} in the {@link Fiber}'s processing stack with the specified resume
aoqi@0 421 * packet as the parameter.
aoqi@0 422 * <p/>
aoqi@0 423 * <p/>
aoqi@0 424 * If a fiber was suspended with specifying the next {@link Tube},
aoqi@0 425 * then the execution will be resumed in the request processing direction,
aoqi@0 426 * by calling the next tube's {@link Tube#processRequest(Packet)} method with the
aoqi@0 427 * specified resume packet as the parameter.
aoqi@0 428 * <p/>
aoqi@0 429 * <p/>
aoqi@0 430 * This method is implemented in a race-free way. Another thread can invoke
aoqi@0 431 * this method even before this fiber goes into the suspension mode. So the caller
aoqi@0 432 * need not worry about synchronizing {@link NextAction#suspend()} and this method.
aoqi@0 433 *
aoqi@0 434 * @param resumePacket packet used in the resumed processing
aoqi@0 435 */
aoqi@0 436 public void resume(@NotNull Packet resumePacket) {
aoqi@0 437 resume(resumePacket, false);
aoqi@0 438 }
aoqi@0 439
aoqi@0 440 /**
aoqi@0 441 * Similar to resume(Packet) but allowing the Fiber to be resumed
aoqi@0 442 * synchronously (in the current Thread). If you want to know when the
aoqi@0 443 * fiber completes (not when this method returns) then add/wrap a
aoqi@0 444 * CompletionCallback on this Fiber.
aoqi@0 445 * For example, an asynchronous response endpoint that supports WS-ReliableMessaging
aoqi@0 446 * including in-order message delivery may need to resume the Fiber synchronously
aoqi@0 447 * until message order is confirmed prior to returning to asynchronous processing.
aoqi@0 448 * @since 2.2.6
aoqi@0 449 */
aoqi@0 450 public void resume(@NotNull Packet resumePacket,
aoqi@0 451 boolean forceSync) {
aoqi@0 452 resume(resumePacket, forceSync, null);
aoqi@0 453 }
aoqi@0 454
aoqi@0 455 /**
aoqi@0 456 * Similar to resume(Packet, boolean) but allowing the Fiber to be resumed
aoqi@0 457 * and at the same time atomically assign a new CompletionCallback to it.
aoqi@0 458 * @since 2.2.6
aoqi@0 459 */
aoqi@0 460 public void resume(@NotNull Packet resumePacket,
aoqi@0 461 boolean forceSync,
aoqi@0 462 CompletionCallback callback) {
aoqi@0 463 lock.lock();
aoqi@0 464 try {
aoqi@0 465 if (callback != null) {
aoqi@0 466 setCompletionCallback(callback);
aoqi@0 467 }
aoqi@0 468 if(isTraceEnabled())
aoqi@0 469 LOGGER.log(Level.FINE, "{0} resuming. Will have suspendedCount={1}", new Object[]{getName(), suspendedCount-1});
aoqi@0 470 packet = resumePacket;
aoqi@0 471 if( --suspendedCount == 0 ) {
aoqi@0 472 if (!isInsideSuspendCallbacks) {
aoqi@0 473 List<Listener> listeners = getCurrentListeners();
aoqi@0 474 for (Listener listener: listeners) {
aoqi@0 475 try {
aoqi@0 476 listener.fiberResumed(this);
aoqi@0 477 } catch (Throwable e) {
aoqi@0 478 if (isTraceEnabled())
aoqi@0 479 LOGGER.log(Level.FINE, "Listener {0} threw exception: {1}", new Object[]{listener, e.getMessage()});
aoqi@0 480 }
aoqi@0 481 }
aoqi@0 482
aoqi@0 483 if(synchronous) {
aoqi@0 484 condition.signalAll();
aoqi@0 485 } else if (forceSync || startedSync) {
aoqi@0 486 run();
aoqi@0 487 } else {
aoqi@0 488 dumpFiberContext("resuming (async)");
aoqi@0 489 owner.addRunnable(this);
aoqi@0 490 }
aoqi@0 491 }
aoqi@0 492 } else {
aoqi@0 493 if (isTraceEnabled()) {
aoqi@0 494 LOGGER.log(Level.FINE, "{0} taking no action on resume because suspendedCount != 0: {1}", new Object[]{getName(), suspendedCount});
aoqi@0 495 }
aoqi@0 496 }
aoqi@0 497 } finally {
aoqi@0 498 lock.unlock();
aoqi@0 499 }
aoqi@0 500 }
aoqi@0 501
aoqi@0 502 /**
aoqi@0 503 * Wakes up a suspended fiber and begins response processing.
aoqi@0 504 * @since 2.2.6
aoqi@0 505 */
aoqi@0 506 public void resumeAndReturn(@NotNull Packet resumePacket,
aoqi@0 507 boolean forceSync) {
aoqi@0 508 if(isTraceEnabled())
aoqi@0 509 LOGGER.log(Level.FINE, "{0} resumed with Return Packet", getName());
aoqi@0 510 next = null;
aoqi@0 511 resume(resumePacket, forceSync);
aoqi@0 512 }
aoqi@0 513
aoqi@0 514 /**
aoqi@0 515 * Wakes up a suspended fiber with an exception.
aoqi@0 516 * <p/>
aoqi@0 517 * <p/>
aoqi@0 518 * The execution of the suspended fiber will be resumed in the response
aoqi@0 519 * processing direction, by calling the {@link Tube#processException(Throwable)} method
aoqi@0 520 * on the next/first {@link Tube} in the {@link Fiber}'s processing stack with
aoqi@0 521 * the specified exception as the parameter.
aoqi@0 522 * <p/>
aoqi@0 523 * <p/>
aoqi@0 524 * This method is implemented in a race-free way. Another thread can invoke
aoqi@0 525 * this method even before this fiber goes into the suspension mode. So the caller
aoqi@0 526 * need not worry about synchronizing {@link NextAction#suspend()} and this method.
aoqi@0 527 *
aoqi@0 528 * @param throwable exception that is used in the resumed processing
aoqi@0 529 */
aoqi@0 530 public void resume(@NotNull Throwable throwable) {
aoqi@0 531 resume(throwable, packet, false);
aoqi@0 532 }
aoqi@0 533
aoqi@0 534 /**
aoqi@0 535 * Wakes up a suspended fiber with an exception.
aoqi@0 536 * <p/>
aoqi@0 537 * <p/>
aoqi@0 538 * The execution of the suspended fiber will be resumed in the response
aoqi@0 539 * processing direction, by calling the {@link Tube#processException(Throwable)} method
aoqi@0 540 * on the next/first {@link Tube} in the {@link Fiber}'s processing stack with
aoqi@0 541 * the specified exception as the parameter.
aoqi@0 542 * <p/>
aoqi@0 543 * <p/>
aoqi@0 544 * This method is implemented in a race-free way. Another thread can invoke
aoqi@0 545 * this method even before this fiber goes into the suspension mode. So the caller
aoqi@0 546 * need not worry about synchronizing {@link NextAction#suspend()} and this method.
aoqi@0 547 *
aoqi@0 548 * @param throwable exception that is used in the resumed processing
aoqi@0 549 * @param packet Packet that will be visible on the Fiber after the resume
aoqi@0 550 * @since 2.2.8
aoqi@0 551 */
aoqi@0 552 public void resume(@NotNull Throwable throwable, @NotNull Packet packet) {
aoqi@0 553 resume(throwable, packet, false);
aoqi@0 554 }
aoqi@0 555
aoqi@0 556 /**
aoqi@0 557 * Wakes up a suspend fiber with an exception.
aoqi@0 558 *
aoqi@0 559 * If forceSync is true, then the suspended fiber will resume with
aoqi@0 560 * synchronous processing on the current thread. This will continue
aoqi@0 561 * until some Tube indicates that it is safe to switch to asynchronous
aoqi@0 562 * processing.
aoqi@0 563 *
aoqi@0 564 * @param error exception that is used in the resumed processing
aoqi@0 565 * @param forceSync if processing begins synchronously
aoqi@0 566 * @since 2.2.6
aoqi@0 567 */
aoqi@0 568 public void resume(@NotNull Throwable error,
aoqi@0 569 boolean forceSync) {
aoqi@0 570 resume(error, packet, forceSync);
aoqi@0 571 }
aoqi@0 572
aoqi@0 573 /**
aoqi@0 574 * Wakes up a suspend fiber with an exception.
aoqi@0 575 *
aoqi@0 576 * If forceSync is true, then the suspended fiber will resume with
aoqi@0 577 * synchronous processing on the current thread. This will continue
aoqi@0 578 * until some Tube indicates that it is safe to switch to asynchronous
aoqi@0 579 * processing.
aoqi@0 580 *
aoqi@0 581 * @param error exception that is used in the resumed processing
aoqi@0 582 * @param packet Packet that will be visible on the Fiber after the resume
aoqi@0 583 * @param forceSync if processing begins synchronously
aoqi@0 584 * @since 2.2.8
aoqi@0 585 */
aoqi@0 586 public void resume(@NotNull Throwable error,
aoqi@0 587 @NotNull Packet packet,
aoqi@0 588 boolean forceSync) {
aoqi@0 589 if(isTraceEnabled())
aoqi@0 590 LOGGER.log(Level.FINE, "{0} resumed with Return Throwable", getName());
aoqi@0 591 next = null;
aoqi@0 592 throwable = error;
aoqi@0 593 resume(packet, forceSync);
aoqi@0 594 }
aoqi@0 595
aoqi@0 596 /**
aoqi@0 597 * Marks this Fiber as cancelled. A cancelled Fiber will never invoke its completion callback
aoqi@0 598 * @param mayInterrupt if cancel should use {@link Thread#interrupt()}
aoqi@0 599 * @see java.util.concurrent.Future#cancel(boolean)
aoqi@0 600 * @since 2.2.6
aoqi@0 601 */
aoqi@0 602 @Override
aoqi@0 603 public void cancel(boolean mayInterrupt) {
aoqi@0 604 isCanceled = true;
aoqi@0 605 if (mayInterrupt) {
aoqi@0 606 // synchronized(this) is used as Thread running Fiber will be holding lock
aoqi@0 607 synchronized(this) {
aoqi@0 608 if (currentThread != null)
aoqi@0 609 currentThread.interrupt();
aoqi@0 610 }
aoqi@0 611 }
aoqi@0 612 }
aoqi@0 613
aoqi@0 614 /**
aoqi@0 615 * Suspends this fiber's execution until the resume method is invoked.
aoqi@0 616 * <p/>
aoqi@0 617 * The call returns immediately, and when the fiber is resumed
aoqi@0 618 * the execution picks up from the last scheduled continuation.
aoqi@0 619 * @param onExitRunnable runnable to be invoked after fiber is marked for suspension
aoqi@0 620 * @return if control loop must exit
aoqi@0 621 */
aoqi@0 622 private boolean suspend(Holder<Boolean> isRequireUnlock, Runnable onExitRunnable) {
aoqi@0 623 if(isTraceEnabled()) {
aoqi@0 624 LOGGER.log(Level.FINE, "{0} suspending. Will have suspendedCount={1}", new Object[]{getName(), suspendedCount+1});
aoqi@0 625 if (suspendedCount > 0) {
aoqi@0 626 LOGGER.log(Level.FINE, "WARNING - {0} suspended more than resumed. Will require more than one resume to actually resume this fiber.", getName());
aoqi@0 627 }
aoqi@0 628 }
aoqi@0 629
aoqi@0 630 List<Listener> listeners = getCurrentListeners();
aoqi@0 631 if (++suspendedCount == 1) {
aoqi@0 632 isInsideSuspendCallbacks = true;
aoqi@0 633 try {
aoqi@0 634 for (Listener listener: listeners) {
aoqi@0 635 try {
aoqi@0 636 listener.fiberSuspended(this);
aoqi@0 637 } catch (Throwable e) {
aoqi@0 638 if(isTraceEnabled())
aoqi@0 639 LOGGER.log(Level.FINE, "Listener {0} threw exception: {1}", new Object[]{listener, e.getMessage()});
aoqi@0 640 }
aoqi@0 641 }
aoqi@0 642 } finally {
aoqi@0 643 isInsideSuspendCallbacks = false;
aoqi@0 644 }
aoqi@0 645 }
aoqi@0 646
aoqi@0 647 if (suspendedCount <= 0) {
aoqi@0 648 // suspend callback caused fiber to resume
aoqi@0 649 for (Listener listener: listeners) {
aoqi@0 650 try {
aoqi@0 651 listener.fiberResumed(this);
aoqi@0 652 } catch (Throwable e) {
aoqi@0 653 if(isTraceEnabled())
aoqi@0 654 LOGGER.log(Level.FINE, "Listener {0} threw exception: {1}", new Object[]{listener, e.getMessage()});
aoqi@0 655 }
aoqi@0 656 }
aoqi@0 657
aoqi@0 658 } else if (onExitRunnable != null) {
aoqi@0 659 // synchronous use cases cannot disconnect from the current thread
aoqi@0 660 if (!synchronous) {
aoqi@0 661 /* INTENTIONALLY UNLOCKING EARLY */
aoqi@0 662 synchronized(this) {
aoqi@0 663 // currentThread is protected by the monitor for this fiber so
aoqi@0 664 // that it is accessible to cancel() even when the lock is held
aoqi@0 665 currentThread = null;
aoqi@0 666 }
aoqi@0 667 lock.unlock();
aoqi@0 668 assert(!lock.isHeldByCurrentThread());
aoqi@0 669 isRequireUnlock.value = Boolean.FALSE;
aoqi@0 670
aoqi@0 671 try {
aoqi@0 672 onExitRunnable.run();
aoqi@0 673 } catch(Throwable t) {
aoqi@0 674 throw new OnExitRunnableException(t);
aoqi@0 675 }
aoqi@0 676
aoqi@0 677 return true;
aoqi@0 678
aoqi@0 679 } else {
aoqi@0 680 // for synchronous we will stay with current thread, so do not disconnect
aoqi@0 681 if (isTraceEnabled())
aoqi@0 682 LOGGER.fine("onExitRunnable used with synchronous Fiber execution -- not exiting current thread");
aoqi@0 683 onExitRunnable.run();
aoqi@0 684 }
aoqi@0 685 }
aoqi@0 686
aoqi@0 687 return false;
aoqi@0 688 }
aoqi@0 689
aoqi@0 690 private static final class OnExitRunnableException extends RuntimeException {
aoqi@0 691 private static final long serialVersionUID = 1L;
aoqi@0 692
aoqi@0 693 Throwable target;
aoqi@0 694
aoqi@0 695 public OnExitRunnableException(Throwable target) {
aoqi@0 696 super((Throwable)null); // see pattern for InvocationTargetException
aoqi@0 697 this.target = target;
aoqi@0 698 }
aoqi@0 699 }
aoqi@0 700
aoqi@0 701 /**
aoqi@0 702 * Adds a new {@link FiberContextSwitchInterceptor} to this fiber.
aoqi@0 703 * <p/>
aoqi@0 704 * <p/>
aoqi@0 705 * The newly installed fiber will take effect immediately after the current
aoqi@0 706 * tube returns from its {@link Tube#processRequest(Packet)} or
aoqi@0 707 * {@link Tube#processResponse(Packet)}, before the next tube begins processing.
aoqi@0 708 * <p/>
aoqi@0 709 * <p/>
aoqi@0 710 * So when the tubeline consists of X and Y, and when X installs an interceptor,
aoqi@0 711 * the order of execution will be as follows:
aoqi@0 712 * <p/>
aoqi@0 713 * <ol>
aoqi@0 714 * <li>X.processRequest()
aoqi@0 715 * <li>interceptor gets installed
aoqi@0 716 * <li>interceptor.execute() is invoked
aoqi@0 717 * <li>Y.processRequest()
aoqi@0 718 * </ol>
aoqi@0 719 */
aoqi@0 720 public synchronized void addInterceptor(@NotNull FiberContextSwitchInterceptor interceptor) {
aoqi@0 721 if (interceptors == null) {
aoqi@0 722 interceptors = new ArrayList<FiberContextSwitchInterceptor>();
aoqi@0 723 } else {
aoqi@0 724 List<FiberContextSwitchInterceptor> l = new ArrayList<FiberContextSwitchInterceptor>();
aoqi@0 725 l.addAll(interceptors);
aoqi@0 726 interceptors = l;
aoqi@0 727 }
aoqi@0 728 interceptors.add(interceptor);
aoqi@0 729 }
aoqi@0 730
aoqi@0 731 /**
aoqi@0 732 * Removes a {@link FiberContextSwitchInterceptor} from this fiber.
aoqi@0 733 * <p/>
aoqi@0 734 * <p/>
aoqi@0 735 * The removal of the interceptor takes effect immediately after the current
aoqi@0 736 * tube returns from its {@link Tube#processRequest(Packet)} or
aoqi@0 737 * {@link Tube#processResponse(Packet)}, before the next tube begins processing.
aoqi@0 738 * <p/>
aoqi@0 739 * <p/>
aoqi@0 740 * <p/>
aoqi@0 741 * So when the tubeline consists of X and Y, and when Y uninstalls an interceptor
aoqi@0 742 * on the way out, then the order of execution will be as follows:
aoqi@0 743 * <p/>
aoqi@0 744 * <ol>
aoqi@0 745 * <li>Y.processResponse() (notice that this happens with interceptor.execute() in the callstack)
aoqi@0 746 * <li>interceptor gets uninstalled
aoqi@0 747 * <li>interceptor.execute() returns
aoqi@0 748 * <li>X.processResponse()
aoqi@0 749 * </ol>
aoqi@0 750 *
aoqi@0 751 * @return true if the specified interceptor was removed. False if
aoqi@0 752 * the specified interceptor was not registered with this fiber to begin with.
aoqi@0 753 */
aoqi@0 754 public synchronized boolean removeInterceptor(@NotNull FiberContextSwitchInterceptor interceptor) {
aoqi@0 755 if (interceptors != null) {
aoqi@0 756 boolean result = interceptors.remove(interceptor);
aoqi@0 757 if (interceptors.isEmpty())
aoqi@0 758 interceptors = null;
aoqi@0 759 else {
aoqi@0 760 List<FiberContextSwitchInterceptor> l = new ArrayList<FiberContextSwitchInterceptor>();
aoqi@0 761 l.addAll(interceptors);
aoqi@0 762 interceptors = l;
aoqi@0 763 }
aoqi@0 764 return result;
aoqi@0 765 }
aoqi@0 766 return false;
aoqi@0 767 }
aoqi@0 768
aoqi@0 769 /**
aoqi@0 770 * Gets the context {@link ClassLoader} of this fiber.
aoqi@0 771 */
aoqi@0 772 public
aoqi@0 773 @Nullable
aoqi@0 774 ClassLoader getContextClassLoader() {
aoqi@0 775 return contextClassLoader;
aoqi@0 776 }
aoqi@0 777
aoqi@0 778 /**
aoqi@0 779 * Sets the context {@link ClassLoader} of this fiber.
aoqi@0 780 */
aoqi@0 781 public ClassLoader setContextClassLoader(@Nullable ClassLoader contextClassLoader) {
aoqi@0 782 ClassLoader r = this.contextClassLoader;
aoqi@0 783 this.contextClassLoader = contextClassLoader;
aoqi@0 784 return r;
aoqi@0 785 }
aoqi@0 786
aoqi@0 787 /**
aoqi@0 788 * DO NOT CALL THIS METHOD. This is an implementation detail
aoqi@0 789 * of {@link Fiber}.
aoqi@0 790 */
aoqi@0 791 @Deprecated
aoqi@0 792 @Override
aoqi@0 793 public void run() {
aoqi@0 794 Container old = ContainerResolver.getDefault().enterContainer(owner.getContainer());
aoqi@0 795 try {
aoqi@0 796 assert !synchronous;
aoqi@0 797 // doRun returns true to indicate an early exit from fiber processing
aoqi@0 798 if (!doRun()) {
aoqi@0 799 if (startedSync && suspendedCount == 0 &&
aoqi@0 800 (next != null || contsSize > 0)) {
aoqi@0 801 // We bailed out of running this fiber we started as sync, and now
aoqi@0 802 // want to finish running it async
aoqi@0 803 startedSync = false;
aoqi@0 804 // Start back up as an async fiber
aoqi@0 805 dumpFiberContext("restarting (async) after startSync");
aoqi@0 806 owner.addRunnable(this);
aoqi@0 807 } else {
aoqi@0 808 completionCheck();
aoqi@0 809 }
aoqi@0 810 }
aoqi@0 811 } finally {
aoqi@0 812 ContainerResolver.getDefault().exitContainer(old);
aoqi@0 813 }
aoqi@0 814 }
aoqi@0 815
aoqi@0 816 /**
aoqi@0 817 * Runs a given {@link Tube} (and everything thereafter) synchronously.
aoqi@0 818 * <p/>
aoqi@0 819 * <p/>
aoqi@0 820 * This method blocks and returns only when all the successive {@link Tube}s
aoqi@0 821 * complete their request/response processing. This method can be used
aoqi@0 822 * if a {@link Tube} needs to fallback to synchronous processing.
aoqi@0 823 * <p/>
aoqi@0 824 * <h3>Example:</h3>
aoqi@0 825 * <pre>
aoqi@0 826 * class FooTube extends {@link AbstractFilterTubeImpl} {
aoqi@0 827 * NextAction processRequest(Packet request) {
aoqi@0 828 * // run everything synchronously and return with the response packet
aoqi@0 829 * return doReturnWith(Fiber.current().runSync(next,request));
aoqi@0 830 * }
aoqi@0 831 * NextAction processResponse(Packet response) {
aoqi@0 832 * // never be invoked
aoqi@0 833 * }
aoqi@0 834 * }
aoqi@0 835 * </pre>
aoqi@0 836 *
aoqi@0 837 * @param tubeline The first tube of the tubeline that will act on the packet.
aoqi@0 838 * @param request The request packet to be passed to <tt>startPoint.processRequest()</tt>.
aoqi@0 839 * @return The response packet to the <tt>request</tt>.
aoqi@0 840 * @see #start(Tube, Packet, CompletionCallback)
aoqi@0 841 */
aoqi@0 842 public
aoqi@0 843 @NotNull
aoqi@0 844 Packet runSync(@NotNull Tube tubeline, @NotNull Packet request) {
aoqi@0 845 lock.lock();
aoqi@0 846 try {
aoqi@0 847 // save the current continuation, so that we return runSync() without executing them.
aoqi@0 848 final Tube[] oldCont = conts;
aoqi@0 849 final int oldContSize = contsSize;
aoqi@0 850 final boolean oldSynchronous = synchronous;
aoqi@0 851 final Tube oldNext = next;
aoqi@0 852
aoqi@0 853 if (oldContSize > 0) {
aoqi@0 854 conts = new Tube[16];
aoqi@0 855 contsSize = 0;
aoqi@0 856 }
aoqi@0 857
aoqi@0 858 try {
aoqi@0 859 synchronous = true;
aoqi@0 860 this.packet = request;
aoqi@0 861 next = tubeline;
aoqi@0 862 doRun();
aoqi@0 863 if (throwable != null) {
aoqi@0 864 if (isDeliverThrowableInPacket) {
aoqi@0 865 packet.addSatellite(new ThrowableContainerPropertySet(throwable));
aoqi@0 866 } else {
aoqi@0 867 if (throwable instanceof RuntimeException) {
aoqi@0 868 throw (RuntimeException) throwable;
aoqi@0 869 }
aoqi@0 870 if (throwable instanceof Error) {
aoqi@0 871 throw (Error) throwable;
aoqi@0 872 }
aoqi@0 873 // our system is supposed to only accept Error or RuntimeException
aoqi@0 874 throw new AssertionError(throwable);
aoqi@0 875 }
aoqi@0 876 }
aoqi@0 877 return this.packet;
aoqi@0 878 } finally {
aoqi@0 879 conts = oldCont;
aoqi@0 880 contsSize = oldContSize;
aoqi@0 881 synchronous = oldSynchronous;
aoqi@0 882 next = oldNext;
aoqi@0 883 if(interrupted) {
aoqi@0 884 Thread.currentThread().interrupt();
aoqi@0 885 interrupted = false;
aoqi@0 886 }
aoqi@0 887 if(!started && !startedSync)
aoqi@0 888 completionCheck();
aoqi@0 889 }
aoqi@0 890 } finally {
aoqi@0 891 lock.unlock();
aoqi@0 892 }
aoqi@0 893 }
aoqi@0 894
aoqi@0 895 private void completionCheck() {
aoqi@0 896 lock.lock();
aoqi@0 897 try {
aoqi@0 898 // Don't trigger completion and callbacks if fiber is suspended
aoqi@0 899 if(!isCanceled && contsSize==0 && suspendedCount == 0) {
aoqi@0 900 if(isTraceEnabled())
aoqi@0 901 LOGGER.log(Level.FINE, "{0} completed", getName());
aoqi@0 902 clearListeners();
aoqi@0 903 condition.signalAll();
aoqi@0 904 if (completionCallback != null) {
aoqi@0 905 if (throwable != null) {
aoqi@0 906 if (isDeliverThrowableInPacket) {
aoqi@0 907 packet.addSatellite(new ThrowableContainerPropertySet(throwable));
aoqi@0 908 completionCallback.onCompletion(packet);
aoqi@0 909 } else
aoqi@0 910 completionCallback.onCompletion(throwable);
aoqi@0 911 } else
aoqi@0 912 completionCallback.onCompletion(packet);
aoqi@0 913 }
aoqi@0 914 }
aoqi@0 915 } finally {
aoqi@0 916 lock.unlock();
aoqi@0 917 }
aoqi@0 918 }
aoqi@0 919
aoqi@0 920 /**
aoqi@0 921 * Invokes all registered {@link InterceptorHandler}s and then call into
aoqi@0 922 * {@link Fiber#__doRun()}.
aoqi@0 923 */
aoqi@0 924 private class InterceptorHandler implements FiberContextSwitchInterceptor.Work<Tube, Tube> {
aoqi@0 925 private final Holder<Boolean> isUnlockRequired;
aoqi@0 926 private final List<FiberContextSwitchInterceptor> ints;
aoqi@0 927
aoqi@0 928 /**
aoqi@0 929 * Index in {@link Fiber#interceptors} to invoke next.
aoqi@0 930 */
aoqi@0 931 private int idx;
aoqi@0 932
aoqi@0 933 public InterceptorHandler(Holder<Boolean> isUnlockRequired, List<FiberContextSwitchInterceptor> ints) {
aoqi@0 934 this.isUnlockRequired = isUnlockRequired;
aoqi@0 935 this.ints = ints;
aoqi@0 936 }
aoqi@0 937
aoqi@0 938 /**
aoqi@0 939 * Initiate the interception, and eventually invokes {@link Fiber#__doRun()}.
aoqi@0 940 */
aoqi@0 941 Tube invoke(Tube next) {
aoqi@0 942 idx = 0;
aoqi@0 943 return execute(next);
aoqi@0 944 }
aoqi@0 945
aoqi@0 946 @Override
aoqi@0 947 public Tube execute(Tube next) {
aoqi@0 948 if (idx == ints.size()) {
aoqi@0 949 Fiber.this.next = next;
aoqi@0 950 if (__doRun(isUnlockRequired, ints))
aoqi@0 951 return PLACEHOLDER;
aoqi@0 952 } else {
aoqi@0 953 FiberContextSwitchInterceptor interceptor = ints.get(idx++);
aoqi@0 954 return interceptor.execute(Fiber.this, next, this);
aoqi@0 955 }
aoqi@0 956 return Fiber.this.next;
aoqi@0 957 }
aoqi@0 958 }
aoqi@0 959
aoqi@0 960 private static final PlaceholderTube PLACEHOLDER = new PlaceholderTube();
aoqi@0 961
aoqi@0 962 private static class PlaceholderTube extends AbstractTubeImpl {
aoqi@0 963
aoqi@0 964 @Override
aoqi@0 965 public NextAction processRequest(Packet request) {
aoqi@0 966 throw new UnsupportedOperationException();
aoqi@0 967 }
aoqi@0 968
aoqi@0 969 @Override
aoqi@0 970 public NextAction processResponse(Packet response) {
aoqi@0 971 throw new UnsupportedOperationException();
aoqi@0 972 }
aoqi@0 973
aoqi@0 974 @Override
aoqi@0 975 public NextAction processException(Throwable t) {
aoqi@0 976 return doThrow(t);
aoqi@0 977 }
aoqi@0 978
aoqi@0 979 @Override
aoqi@0 980 public void preDestroy() {
aoqi@0 981 }
aoqi@0 982
aoqi@0 983 @Override
aoqi@0 984 public PlaceholderTube copy(TubeCloner cloner) {
aoqi@0 985 throw new UnsupportedOperationException();
aoqi@0 986 }
aoqi@0 987 }
aoqi@0 988
aoqi@0 989 /**
aoqi@0 990 * Executes the fiber as much as possible.
aoqi@0 991 *
aoqi@0 992 */
aoqi@0 993 private boolean doRun() {
aoqi@0 994 dumpFiberContext("running");
aoqi@0 995
aoqi@0 996 if (serializeExecution) {
aoqi@0 997 serializedExecutionLock.lock();
aoqi@0 998 try {
aoqi@0 999 return _doRun(next);
aoqi@0 1000 } finally {
aoqi@0 1001 serializedExecutionLock.unlock();
aoqi@0 1002 }
aoqi@0 1003 } else {
aoqi@0 1004 return _doRun(next);
aoqi@0 1005 }
aoqi@0 1006 }
aoqi@0 1007
aoqi@0 1008 private boolean _doRun(Tube next) {
aoqi@0 1009 // isRequireUnlock will contain Boolean.FALSE when lock has already been released in suspend
aoqi@0 1010 Holder<Boolean> isRequireUnlock = new Holder<Boolean>(Boolean.TRUE);
aoqi@0 1011 lock.lock();
aoqi@0 1012 try {
aoqi@0 1013 List<FiberContextSwitchInterceptor> ints;
aoqi@0 1014 ClassLoader old;
aoqi@0 1015 synchronized(this) {
aoqi@0 1016 ints = interceptors;
aoqi@0 1017
aoqi@0 1018 // currentThread is protected by the monitor for this fiber so
aoqi@0 1019 // that it is accessible to cancel() even when the lock is held
aoqi@0 1020 currentThread = Thread.currentThread();
aoqi@0 1021 if (isTraceEnabled()) {
aoqi@0 1022 LOGGER.log(Level.FINE, "Thread entering _doRun(): {0}", currentThread);
aoqi@0 1023 }
aoqi@0 1024
aoqi@0 1025 old = currentThread.getContextClassLoader();
aoqi@0 1026 currentThread.setContextClassLoader(contextClassLoader);
aoqi@0 1027 }
aoqi@0 1028
aoqi@0 1029 try {
aoqi@0 1030 boolean needsToReenter;
aoqi@0 1031 do {
aoqi@0 1032 // if interceptors are set, go through the interceptors.
aoqi@0 1033 if (ints == null) {
aoqi@0 1034 this.next = next;
aoqi@0 1035 if (__doRun(isRequireUnlock, null /*ints*/)) {
aoqi@0 1036 return true;
aoqi@0 1037 }
aoqi@0 1038 } else {
aoqi@0 1039 next = new InterceptorHandler(isRequireUnlock, ints).invoke(next);
aoqi@0 1040 if (next == PLACEHOLDER) {
aoqi@0 1041 return true;
aoqi@0 1042 }
aoqi@0 1043 }
aoqi@0 1044
aoqi@0 1045 synchronized(this) {
aoqi@0 1046 needsToReenter = (ints != interceptors);
aoqi@0 1047 if (needsToReenter)
aoqi@0 1048 ints = interceptors;
aoqi@0 1049 }
aoqi@0 1050 } while (needsToReenter);
aoqi@0 1051 } catch(OnExitRunnableException o) {
aoqi@0 1052 // catching this exception indicates onExitRunnable in suspend() threw.
aoqi@0 1053 // we must still avoid double unlock
aoqi@0 1054 Throwable t = o.target;
aoqi@0 1055 if (t instanceof WebServiceException)
aoqi@0 1056 throw (WebServiceException) t;
aoqi@0 1057 throw new WebServiceException(t);
aoqi@0 1058 } finally {
aoqi@0 1059 // don't reference currentThread here because fiber processing
aoqi@0 1060 // may already be running on a different thread (Note: isAlreadyExited
aoqi@0 1061 // tracks this state
aoqi@0 1062 Thread thread = Thread.currentThread();
aoqi@0 1063 thread.setContextClassLoader(old);
aoqi@0 1064 if (isTraceEnabled()) {
aoqi@0 1065 LOGGER.log(Level.FINE, "Thread leaving _doRun(): {0}", thread);
aoqi@0 1066 }
aoqi@0 1067 }
aoqi@0 1068
aoqi@0 1069 return false;
aoqi@0 1070 } finally {
aoqi@0 1071 if (isRequireUnlock.value) {
aoqi@0 1072 synchronized(this) {
aoqi@0 1073 currentThread = null;
aoqi@0 1074 }
aoqi@0 1075 lock.unlock();
aoqi@0 1076 }
aoqi@0 1077 }
aoqi@0 1078 }
aoqi@0 1079
aoqi@0 1080 /**
aoqi@0 1081 * To be invoked from {@link #doRun()}.
aoqi@0 1082 *
aoqi@0 1083 * @see #doRun()
aoqi@0 1084 */
aoqi@0 1085 private boolean __doRun(Holder<Boolean> isRequireUnlock, List<FiberContextSwitchInterceptor> originalInterceptors) {
aoqi@0 1086 assert(lock.isHeldByCurrentThread());
aoqi@0 1087
aoqi@0 1088 final Fiber old = CURRENT_FIBER.get();
aoqi@0 1089 CURRENT_FIBER.set(this);
aoqi@0 1090
aoqi@0 1091 // if true, lots of debug messages to show what's being executed
aoqi@0 1092 final boolean traceEnabled = LOGGER.isLoggable(Level.FINER);
aoqi@0 1093
aoqi@0 1094 try {
aoqi@0 1095 boolean abortResponse = false;
aoqi@0 1096 while(isReady(originalInterceptors)) {
aoqi@0 1097 if (isCanceled) {
aoqi@0 1098 next = null;
aoqi@0 1099 throwable = null;
aoqi@0 1100 contsSize = 0;
aoqi@0 1101 break;
aoqi@0 1102 }
aoqi@0 1103
aoqi@0 1104 try {
aoqi@0 1105 NextAction na;
aoqi@0 1106 Tube last;
aoqi@0 1107 if(throwable!=null) {
aoqi@0 1108 if(contsSize==0 || abortResponse) {
aoqi@0 1109 contsSize = 0; // abortResponse case
aoqi@0 1110 // nothing else to execute. we are done.
aoqi@0 1111 return false;
aoqi@0 1112 }
aoqi@0 1113 last = popCont();
aoqi@0 1114 if (traceEnabled)
aoqi@0 1115 LOGGER.log(Level.FINER, "{0} {1}.processException({2})", new Object[]{getName(), last, throwable});
aoqi@0 1116 na = last.processException(throwable);
aoqi@0 1117 } else {
aoqi@0 1118 if(next!=null) {
aoqi@0 1119 if(traceEnabled)
aoqi@0 1120 LOGGER.log(Level.FINER, "{0} {1}.processRequest({2})", new Object[]{getName(), next, packet != null ? "Packet@"+Integer.toHexString(packet.hashCode()) : "null"});
aoqi@0 1121 na = next.processRequest(packet);
aoqi@0 1122 last = next;
aoqi@0 1123 } else {
aoqi@0 1124 if(contsSize==0 || abortResponse) {
aoqi@0 1125 // nothing else to execute. we are done.
aoqi@0 1126 contsSize = 0;
aoqi@0 1127 return false;
aoqi@0 1128 }
aoqi@0 1129 last = popCont();
aoqi@0 1130 if(traceEnabled)
aoqi@0 1131 LOGGER.log(Level.FINER, "{0} {1}.processResponse({2})", new Object[]{getName(), last, packet != null ? "Packet@"+Integer.toHexString(packet.hashCode()) : "null"});
aoqi@0 1132 na = last.processResponse(packet);
aoqi@0 1133 }
aoqi@0 1134 }
aoqi@0 1135
aoqi@0 1136 if (traceEnabled)
aoqi@0 1137 LOGGER.log(Level.FINER, "{0} {1} returned with {2}", new Object[]{getName(), last, na});
aoqi@0 1138
aoqi@0 1139 // If resume is called before suspend, then make sure
aoqi@0 1140 // resume(Packet) is not lost
aoqi@0 1141 if (na.kind != NextAction.SUSPEND) {
aoqi@0 1142 // preserve in-flight packet so that processException may inspect
aoqi@0 1143 if (na.kind != NextAction.THROW &&
aoqi@0 1144 na.kind != NextAction.THROW_ABORT_RESPONSE)
aoqi@0 1145 packet = na.packet;
aoqi@0 1146 throwable = na.throwable;
aoqi@0 1147 }
aoqi@0 1148
aoqi@0 1149 switch(na.kind) {
aoqi@0 1150 case NextAction.INVOKE:
aoqi@0 1151 case NextAction.INVOKE_ASYNC:
aoqi@0 1152 pushCont(last);
aoqi@0 1153 // fall through next
aoqi@0 1154 case NextAction.INVOKE_AND_FORGET:
aoqi@0 1155 next = na.next;
aoqi@0 1156 if (na.kind == NextAction.INVOKE_ASYNC
aoqi@0 1157 && startedSync) {
aoqi@0 1158 // Break out here
aoqi@0 1159 return false;
aoqi@0 1160 }
aoqi@0 1161 break;
aoqi@0 1162 case NextAction.THROW_ABORT_RESPONSE:
aoqi@0 1163 case NextAction.ABORT_RESPONSE:
aoqi@0 1164 abortResponse = true;
aoqi@0 1165 if (isTraceEnabled()) {
aoqi@0 1166 LOGGER.log(Level.FINE, "Fiber {0} is aborting a response due to exception: {1}", new Object[]{this, na.throwable});
aoqi@0 1167 }
aoqi@0 1168 case NextAction.RETURN:
aoqi@0 1169 case NextAction.THROW:
aoqi@0 1170 next = null;
aoqi@0 1171 break;
aoqi@0 1172 case NextAction.SUSPEND:
aoqi@0 1173 if (next != null) {
aoqi@0 1174 // Only store the 'last' tube when we're processing
aoqi@0 1175 // a request, since conts array is for processResponse
aoqi@0 1176 pushCont(last);
aoqi@0 1177 }
aoqi@0 1178 next = na.next;
aoqi@0 1179 if(suspend(isRequireUnlock, na.onExitRunnable))
aoqi@0 1180 return true; // explicitly exiting control loop
aoqi@0 1181 break;
aoqi@0 1182 default:
aoqi@0 1183 throw new AssertionError();
aoqi@0 1184 }
aoqi@0 1185 } catch (RuntimeException t) {
aoqi@0 1186 if (traceEnabled)
aoqi@0 1187 LOGGER.log(Level.FINER, getName() + " Caught " + t + ". Start stack unwinding", t);
aoqi@0 1188 throwable = t;
aoqi@0 1189 } catch (Error t) {
aoqi@0 1190 if (traceEnabled)
aoqi@0 1191 LOGGER.log(Level.FINER, getName() + " Caught " + t + ". Start stack unwinding", t);
aoqi@0 1192 throwable = t;
aoqi@0 1193 }
aoqi@0 1194
aoqi@0 1195 dumpFiberContext("After tube execution");
aoqi@0 1196 }
aoqi@0 1197
aoqi@0 1198 // there's nothing we can execute right away.
aoqi@0 1199 // we'll be back when this fiber is resumed.
aoqi@0 1200
aoqi@0 1201 } finally {
aoqi@0 1202 CURRENT_FIBER.set(old);
aoqi@0 1203 }
aoqi@0 1204
aoqi@0 1205 return false;
aoqi@0 1206 }
aoqi@0 1207
aoqi@0 1208 private void pushCont(Tube tube) {
aoqi@0 1209 conts[contsSize++] = tube;
aoqi@0 1210
aoqi@0 1211 // expand if needed
aoqi@0 1212 int len = conts.length;
aoqi@0 1213 if (contsSize == len) {
aoqi@0 1214 Tube[] newBuf = new Tube[len * 2];
aoqi@0 1215 System.arraycopy(conts, 0, newBuf, 0, len);
aoqi@0 1216 conts = newBuf;
aoqi@0 1217 }
aoqi@0 1218 }
aoqi@0 1219
aoqi@0 1220 private Tube popCont() {
aoqi@0 1221 return conts[--contsSize];
aoqi@0 1222 }
aoqi@0 1223
aoqi@0 1224 private Tube peekCont() {
aoqi@0 1225 int index = contsSize - 1;
aoqi@0 1226 if (index >= 0 && index < conts.length) {
aoqi@0 1227 return conts[index];
aoqi@0 1228 } else {
aoqi@0 1229 return null;
aoqi@0 1230 }
aoqi@0 1231 }
aoqi@0 1232
aoqi@0 1233 /**
aoqi@0 1234 * Only to be used by Tubes that manipulate the Fiber to create alternate flows
aoqi@0 1235 * @since 2.2.6
aoqi@0 1236 */
aoqi@0 1237 public void resetCont(Tube[] conts, int contsSize) {
aoqi@0 1238 this.conts = conts;
aoqi@0 1239 this.contsSize = contsSize;
aoqi@0 1240 }
aoqi@0 1241
aoqi@0 1242 /**
aoqi@0 1243 * Returns true if the fiber is ready to execute.
aoqi@0 1244 */
aoqi@0 1245 private boolean isReady(List<FiberContextSwitchInterceptor> originalInterceptors) {
aoqi@0 1246 if (synchronous) {
aoqi@0 1247 while (suspendedCount == 1)
aoqi@0 1248 try {
aoqi@0 1249 if (isTraceEnabled()) {
aoqi@0 1250 LOGGER.log(Level.FINE, "{0} is blocking thread {1}", new Object[]{getName(), Thread.currentThread().getName()});
aoqi@0 1251 }
aoqi@0 1252 condition.await(); // the synchronized block is the whole runSync method.
aoqi@0 1253 } catch (InterruptedException e) {
aoqi@0 1254 // remember that we are interrupted, but don't respond to it
aoqi@0 1255 // right away. This behavior is in line with what happens
aoqi@0 1256 // when you are actually running the whole thing synchronously.
aoqi@0 1257 interrupted = true;
aoqi@0 1258 }
aoqi@0 1259
aoqi@0 1260 synchronized(this) {
aoqi@0 1261 return interceptors == originalInterceptors;
aoqi@0 1262 }
aoqi@0 1263 }
aoqi@0 1264 else {
aoqi@0 1265 if (suspendedCount>0)
aoqi@0 1266 return false;
aoqi@0 1267 synchronized(this) {
aoqi@0 1268 return interceptors == originalInterceptors;
aoqi@0 1269 }
aoqi@0 1270 }
aoqi@0 1271 }
aoqi@0 1272
aoqi@0 1273 private String getName() {
aoqi@0 1274 return "engine-" + owner.id + "fiber-" + id;
aoqi@0 1275 }
aoqi@0 1276
aoqi@0 1277 @Override
aoqi@0 1278 public String toString() {
aoqi@0 1279 return getName();
aoqi@0 1280 }
aoqi@0 1281
aoqi@0 1282 /**
aoqi@0 1283 * Gets the current {@link Packet} associated with this fiber.
aoqi@0 1284 * <p/>
aoqi@0 1285 * <p/>
aoqi@0 1286 * This method returns null if no packet has been associated with the fiber yet.
aoqi@0 1287 */
aoqi@0 1288 public
aoqi@0 1289 @Nullable
aoqi@0 1290 Packet getPacket() {
aoqi@0 1291 return packet;
aoqi@0 1292 }
aoqi@0 1293
aoqi@0 1294 /**
aoqi@0 1295 * Returns completion callback associated with this Fiber
aoqi@0 1296 * @return Completion callback
aoqi@0 1297 * @since 2.2.6
aoqi@0 1298 */
aoqi@0 1299 public CompletionCallback getCompletionCallback() {
aoqi@0 1300 return completionCallback;
aoqi@0 1301 }
aoqi@0 1302
aoqi@0 1303 /**
aoqi@0 1304 * Updates completion callback associated with this Fiber
aoqi@0 1305 * @param completionCallback Completion callback
aoqi@0 1306 * @since 2.2.6
aoqi@0 1307 */
aoqi@0 1308 public void setCompletionCallback(CompletionCallback completionCallback) {
aoqi@0 1309 this.completionCallback = completionCallback;
aoqi@0 1310 }
aoqi@0 1311
aoqi@0 1312 /**
aoqi@0 1313 * (ADVANCED) Returns true if the current fiber is being executed synchronously.
aoqi@0 1314 * <p/>
aoqi@0 1315 * <p/>
aoqi@0 1316 * Fiber may run synchronously for various reasons. Perhaps this is
aoqi@0 1317 * on client side and application has invoked a synchronous method call.
aoqi@0 1318 * Perhaps this is on server side and we have deployed on a synchronous
aoqi@0 1319 * transport (like servlet.)
aoqi@0 1320 * <p/>
aoqi@0 1321 * <p/>
aoqi@0 1322 * When a fiber is run synchronously (IOW by {@link #runSync(Tube, Packet)}),
aoqi@0 1323 * further invocations to {@link #runSync(Tube, Packet)} can be done
aoqi@0 1324 * without degrading the performance.
aoqi@0 1325 * <p/>
aoqi@0 1326 * <p/>
aoqi@0 1327 * So this value can be used as a further optimization hint for
aoqi@0 1328 * advanced {@link Tube}s to choose the best strategy to invoke
aoqi@0 1329 * the next {@link Tube}. For example, a tube may want to install
aoqi@0 1330 * a {@link FiberContextSwitchInterceptor} if running async, yet
aoqi@0 1331 * it might find it faster to do {@link #runSync(Tube, Packet)}
aoqi@0 1332 * if it's already running synchronously.
aoqi@0 1333 */
aoqi@0 1334 public static boolean isSynchronous() {
aoqi@0 1335 return current().synchronous;
aoqi@0 1336 }
aoqi@0 1337
aoqi@0 1338 /**
aoqi@0 1339 * Returns true if the current Fiber on the current thread was started
aoqi@0 1340 * synchronously. Note, this is not strictly the same as being synchronous
aoqi@0 1341 * because the assumption is that the Fiber will ultimately be dispatched
aoqi@0 1342 * asynchronously, possibly have a completion callback associated with it, etc.
aoqi@0 1343 * Note, the 'startedSync' flag is cleared once the current Fiber is
aoqi@0 1344 * converted to running asynchronously.
aoqi@0 1345 * @since 2.2.6
aoqi@0 1346 */
aoqi@0 1347 public boolean isStartedSync() {
aoqi@0 1348 return startedSync;
aoqi@0 1349 }
aoqi@0 1350
aoqi@0 1351 /**
aoqi@0 1352 * Gets the current fiber that's running.
aoqi@0 1353 * <p/>
aoqi@0 1354 * <p/>
aoqi@0 1355 * This works like {@link Thread#currentThread()}.
aoqi@0 1356 * This method only works when invoked from {@link Tube}.
aoqi@0 1357 */
aoqi@0 1358 public static
aoqi@0 1359 @NotNull
aoqi@0 1360 @SuppressWarnings({"null", "ConstantConditions"})
aoqi@0 1361 Fiber current() {
aoqi@0 1362 Fiber fiber = CURRENT_FIBER.get();
aoqi@0 1363 if (fiber == null)
aoqi@0 1364 throw new IllegalStateException("Can be only used from fibers");
aoqi@0 1365 return fiber;
aoqi@0 1366 }
aoqi@0 1367
aoqi@0 1368 /**
aoqi@0 1369 * Gets the current fiber that's running, if set.
aoqi@0 1370 */
aoqi@0 1371 public static Fiber getCurrentIfSet() {
aoqi@0 1372 return CURRENT_FIBER.get();
aoqi@0 1373 }
aoqi@0 1374
aoqi@0 1375 private static final ThreadLocal<Fiber> CURRENT_FIBER = new ThreadLocal<Fiber>();
aoqi@0 1376
aoqi@0 1377 /**
aoqi@0 1378 * Used to allocate unique number for each fiber.
aoqi@0 1379 */
aoqi@0 1380 private static final AtomicInteger iotaGen = new AtomicInteger();
aoqi@0 1381
aoqi@0 1382 private static boolean isTraceEnabled() {
aoqi@0 1383 return LOGGER.isLoggable(Level.FINE);
aoqi@0 1384 }
aoqi@0 1385
aoqi@0 1386 private static final Logger LOGGER = Logger.getLogger(Fiber.class.getName());
aoqi@0 1387
aoqi@0 1388
aoqi@0 1389 private static final ReentrantLock serializedExecutionLock = new ReentrantLock();
aoqi@0 1390
aoqi@0 1391 /**
aoqi@0 1392 * Set this boolean to true to execute fibers sequentially one by one.
aoqi@0 1393 * See class javadoc.
aoqi@0 1394 */
aoqi@0 1395 public static volatile boolean serializeExecution = Boolean.getBoolean(Fiber.class.getName() + ".serialize");
aoqi@0 1396
aoqi@0 1397 private final Set<Component> components = new CopyOnWriteArraySet<Component>();
aoqi@0 1398
aoqi@0 1399 @Override
aoqi@0 1400 public <S> S getSPI(Class<S> spiType) {
aoqi@0 1401 for (Component c : components) {
aoqi@0 1402 S spi = c.getSPI(spiType);
aoqi@0 1403 if (spi != null) {
aoqi@0 1404 return spi;
aoqi@0 1405 }
aoqi@0 1406 }
aoqi@0 1407 return null;
aoqi@0 1408 }
aoqi@0 1409
aoqi@0 1410 @Override
aoqi@0 1411 public Set<Component> getComponents() {
aoqi@0 1412 return components;
aoqi@0 1413 }
aoqi@0 1414 }

mercurial