src/share/jaxws_classes/com/sun/xml/internal/ws/api/pipe/Fiber.java

Thu, 31 Aug 2017 15:18:52 +0800

author
aoqi
date
Thu, 31 Aug 2017 15:18:52 +0800
changeset 637
9c07ef4934dd
parent 384
8f2986ff0235
parent 0
373ffda63c9a
permissions
-rw-r--r--

merge

     1 /*
     2  * Copyright (c) 1997, 2013, Oracle and/or its affiliates. All rights reserved.
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     4  *
     5  * This code is free software; you can redistribute it and/or modify it
     6  * under the terms of the GNU General Public License version 2 only, as
     7  * published by the Free Software Foundation.  Oracle designates this
     8  * particular file as subject to the "Classpath" exception as provided
     9  * by Oracle in the LICENSE file that accompanied this code.
    10  *
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
    14  * version 2 for more details (a copy is included in the LICENSE file that
    15  * accompanied this code).
    16  *
    17  * You should have received a copy of the GNU General Public License version
    18  * 2 along with this work; if not, write to the Free Software Foundation,
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
    20  *
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
    22  * or visit www.oracle.com if you need additional information or have any
    23  * questions.
    24  */
    26 package com.sun.xml.internal.ws.api.pipe;
    28 import com.sun.istack.internal.NotNull;
    29 import com.sun.istack.internal.Nullable;
    30 import com.sun.xml.internal.ws.api.Cancelable;
    31 import com.sun.xml.internal.ws.api.Component;
    32 import com.sun.xml.internal.ws.api.ComponentRegistry;
    33 import com.sun.xml.internal.ws.api.SOAPVersion;
    34 import com.sun.xml.internal.ws.api.addressing.AddressingVersion;
    35 import com.sun.xml.internal.ws.api.message.AddressingUtils;
    36 import com.sun.xml.internal.ws.api.message.Packet;
    37 import com.sun.xml.internal.ws.api.pipe.helper.AbstractFilterTubeImpl;
    38 import com.sun.xml.internal.ws.api.pipe.helper.AbstractTubeImpl;
    39 import com.sun.xml.internal.ws.api.server.Adapter;
    40 import com.sun.xml.internal.ws.api.server.Container;
    41 import com.sun.xml.internal.ws.api.server.ContainerResolver;
    43 import java.util.ArrayList;
    44 import java.util.List;
    45 import java.util.Set;
    46 import java.util.concurrent.CopyOnWriteArraySet;
    47 import java.util.concurrent.atomic.AtomicInteger;
    48 import java.util.concurrent.locks.Condition;
    49 import java.util.concurrent.locks.ReentrantLock;
    50 import java.util.logging.Level;
    51 import java.util.logging.Logger;
    53 import javax.xml.ws.Holder;
    54 import javax.xml.ws.WebServiceException;
    56 /**
    57  * User-level thread. Represents the execution of one request/response processing.
    58  * <p/>
    59  * <p/>
    60  * JAX-WS RI is capable of running a large number of request/response concurrently by
    61  * using a relatively small number of threads. This is made possible by utilizing
    62  * a {@link Fiber} &mdash; a user-level thread that gets created for each request/response
    63  * processing.
    64  * <p/>
    65  * <p/>
    66  * A fiber remembers where in the pipeline the processing is at, what needs to be
    67  * executed on the way out (when processing response), and other additional information
    68  * specific to the execution of a particular request/response.
    69  * <p/>
    70  * <h2>Suspend/Resume</h2>
    71  * <p/>
    72  * Fiber can be {@link NextAction#suspend() suspended} by a {@link Tube}.
    73  * When a fiber is suspended, it will be kept on the side until it is
    74  * {@link #resume(Packet) resumed}. This allows threads to go execute
    75  * other runnable fibers, allowing efficient utilization of smaller number of
    76  * threads.
    77  * <p/>
    78  * <h2>Context-switch Interception</h2>
    79  * <p/>
    80  * {@link FiberContextSwitchInterceptor} allows {@link Tube}s and {@link Adapter}s
    81  * to perform additional processing every time a thread starts running a fiber
    82  * and stops running it.
    83  * <p/>
    84  * <h2>Context ClassLoader</h2>
    85  * <p/>
    86  * Just like thread, a fiber has a context class loader (CCL.) A fiber's CCL
    87  * becomes the thread's CCL when it's executing the fiber. The original CCL
    88  * of the thread will be restored when the thread leaves the fiber execution.
    89  * <p/>
    90  * <p/>
    91  * <h2>Debugging Aid</h2>
    92  * <p/>
    93  * Because {@link Fiber} doesn't keep much in the call stack, and instead use
    94  * {@link #conts} to store the continuation, debugging fiber related activities
    95  * could be harder.
    96  * <p/>
    97  * <p/>
    98  * Setting the {@link #LOGGER} for FINE would give you basic start/stop/resume/suspend
    99  * level logging. Using FINER would cause more detailed logging, which includes
   100  * what tubes are executed in what order and how they behaved.
   101  * <p/>
   102  * <p/>
   103  * When you debug the server side, consider setting {@link Fiber#serializeExecution}
   104  * to true, so that execution of fibers are serialized. Debugging a server
   105  * with more than one running threads is very tricky, and this switch will
   106  * prevent that. This can be also enabled by setting the system property on.
   107  * See the source code.
   108  *
   109  * @author Kohsuke Kawaguchi
   110  * @author Jitendra Kotamraju
   111  */
   112 public final class Fiber implements Runnable, Cancelable, ComponentRegistry {
   114         /**
   115          * Callback interface for notification of suspend and resume.
   116          *
   117          * @since 2.2.6
   118          * @deprecated Use {@link NextAction#suspend(Runnable)}
   119          */
   120     public interface Listener {
   121         /**
   122          * Fiber has been suspended.  Implementations of this callback may resume the Fiber.
   123          * @param fiber Fiber
   124          */
   125         public void fiberSuspended(Fiber fiber);
   127         /**
   128          * Fiber has been resumed.  Behavior is undefined if implementations of this callback attempt to suspend the Fiber.
   129          * @param fiber Fiber
   130          */
   131         public void fiberResumed(Fiber fiber);
   132     }
   134     private final List<Listener> _listeners = new ArrayList<Listener>();
   136     /**
   137      * Adds suspend/resume callback listener
   138      * @param listener Listener
   139      * @since 2.2.6
   140      * @deprecated
   141      */
   142     public void addListener(Listener listener) {
   143         synchronized(_listeners) {
   144             if (!_listeners.contains(listener)) {
   145                 _listeners.add(listener);
   146             }
   147         }
   148     }
   150     /**
   151      * Removes suspend/resume callback listener
   152      * @param listener Listener
   153      * @since 2.2.6
   154      * @deprecated
   155      */
   156     public void removeListener(Listener listener) {
   157         synchronized(_listeners) {
   158             _listeners.remove(listener);
   159         }
   160     }
   162     List<Listener> getCurrentListeners() {
   163       synchronized(_listeners) {
   164          return new ArrayList<Listener>(_listeners);
   165       }
   166     }
   168     private void clearListeners() {
   169         synchronized(_listeners) {
   170             _listeners.clear();
   171         }
   172     }
   174     /**
   175      * {@link Tube}s whose {@link Tube#processResponse(Packet)} method needs
   176      * to be invoked on the way back.
   177      */
   178     private Tube[] conts = new Tube[16];
   179     private int contsSize;
   181     /**
   182      * If this field is non-null, the next instruction to execute is
   183      * to call its {@link Tube#processRequest(Packet)}. Otherwise
   184      * the instruction is to call {@link #conts}.
   185      */
   186     private Tube next;
   188     private Packet packet;
   190     private Throwable/*but really it's either RuntimeException or Error*/ throwable;
   192     public final Engine owner;
   194     /**
   195      * Is this thread suspended? 0=not suspended, 1=suspended.
   196      * <p/>
   197      * <p/>
   198      * Logically this is just a boolean, but we need to prepare for the case
   199      * where the thread is {@link #resume(Packet) resumed} before we get to the {@link #suspend()}.
   200      * This happens when things happen in the following order:
   201      * <p/>
   202      * <ol>
   203      * <li>Tube decides that the fiber needs to be suspended to wait for the external event.
   204      * <li>Tube hooks up fiber with some external mechanism (like NIO channel selector)
   205      * <li>Tube returns with {@link NextAction#suspend()}.
   206      * <li>"External mechanism" becomes signal state and invokes {@link Fiber#resume(Packet)}
   207      * to wake up fiber
   208      * <li>{@link Fiber#doRun} invokes {@link Fiber#suspend()}.
   209      * </ol>
   210      * <p/>
   211      * <p/>
   212      * Using int, this will work OK because {@link #suspendedCount} becomes -1 when
   213      * {@link #resume(Packet)} occurs before {@link #suspend()}.
   214      * <p/>
   215      * <p/>
   216      * Increment and decrement is guarded by 'this' object.
   217      */
   218     private volatile int suspendedCount = 0;
   220     private volatile boolean isInsideSuspendCallbacks = false;
   222     /**
   223      * Is this {@link Fiber} currently running in the synchronous mode?
   224      */
   225     private boolean synchronous;
   227     private boolean interrupted;
   229     private final int id;
   231     /**
   232      * Active {@link FiberContextSwitchInterceptor}s for this fiber.
   233      */
   234     private List<FiberContextSwitchInterceptor> interceptors;
   236     /**
   237      * Fiber's context {@link ClassLoader}.
   238      */
   239     private
   240     @Nullable
   241     ClassLoader contextClassLoader;
   243     private
   244     @Nullable
   245     CompletionCallback completionCallback;
   247     private boolean isDeliverThrowableInPacket = false;
   249     public void setDeliverThrowableInPacket(boolean isDeliverThrowableInPacket) {
   250         this.isDeliverThrowableInPacket = isDeliverThrowableInPacket;
   251     }
   253     /**
   254      * The thread on which this Fiber is currently executing, if applicable.
   255      */
   256     private Thread currentThread;
   258     /**
   259      * Replace uses of synchronized(this) with this lock so that we can control
   260      * unlocking for resume use cases
   261      */
   262     private final ReentrantLock lock = new ReentrantLock();
   263     private final Condition condition = lock.newCondition();
   265     private volatile boolean isCanceled;
   267     /**
   268      * Set to true if this fiber is started asynchronously, to avoid
   269      * doubly-invoking completion code.
   270      */
   271     private boolean started;
   273     /**
   274      * Set to true if this fiber is started sync but allowed to run async.
   275      * This property exists for use cases where the processing model is fundamentally async
   276      * but some requirement or feature mandates that part of the tubeline run synchronously.  For
   277      * instance, WS-ReliableMessaging with non-anonymous addressing is compatible with running
   278      * asynchronously, but if in-order message delivery is used then message processing must assign
   279      * a message number before the remainder of the processing can be asynchronous.
   280      */
   281     private boolean startedSync;
   283     /**
   284      * Callback to be invoked when a {@link Fiber} finishes execution.
   285      */
   286     public interface CompletionCallback {
   287         /**
   288          * Indicates that the fiber has finished its execution.
   289          * <p/>
   290          * <p/>
   291          * Since the JAX-WS RI runs asynchronously,
   292          * this method maybe invoked by a different thread
   293          * than any of the threads that started it or run a part of tubeline.
   294          */
   295         void onCompletion(@NotNull Packet response);
   297         /**
   298          * Indicates that the fiber has finished abnormally, by throwing a given {@link Throwable}.
   299          */
   300         void onCompletion(@NotNull Throwable error);
   301     }
   303     Fiber(Engine engine) {
   304         this.owner = engine;
   305         id = iotaGen.incrementAndGet();
   306         if (isTraceEnabled()) {
   307             LOGGER.log(Level.FINE, "{0} created", getName());
   308         }
   310         // if this is run from another fiber, then we naturally inherit its context classloader,
   311         // so this code works for fiber->fiber inheritance just fine.
   312         contextClassLoader = Thread.currentThread().getContextClassLoader();
   313     }
   315     /**
   316      * Starts the execution of this fiber asynchronously.
   317      * <p/>
   318      * <p/>
   319      * This method works like {@link Thread#start()}.
   320      *
   321      * @param tubeline           The first tube of the tubeline that will act on the packet.
   322      * @param request            The request packet to be passed to <tt>startPoint.processRequest()</tt>.
   323      * @param completionCallback The callback to be invoked when the processing is finished and the
   324      *                           final response packet is available.
   325      * @see #runSync(Tube, Packet)
   326      */
   327     public void start(@NotNull Tube tubeline, @NotNull Packet request, @Nullable CompletionCallback completionCallback) {
   328         start(tubeline, request, completionCallback, false);
   329     }
   331     private void dumpFiberContext(String desc) {
   332         if(isTraceEnabled()) {
   333             String action = null;
   334             String msgId = null;
   335             if (packet != null) {
   336                 for (SOAPVersion sv: SOAPVersion.values()) {
   337                     for (AddressingVersion av: AddressingVersion.values()) {
   338                         action = packet.getMessage() != null ? AddressingUtils.getAction(packet.getMessage().getHeaders(), av, sv) : null;
   339                         msgId = packet.getMessage() != null ? AddressingUtils.getMessageID(packet.getMessage().getHeaders(), av, sv) : null;
   340                         if (action != null || msgId != null) {
   341                            break;
   342                         }
   343                     }
   344                     if (action != null || msgId != null) {
   345                         break;
   346                     }
   347                 }
   348             }
   349             String actionAndMsgDesc;
   350             if (action == null && msgId == null) {
   351                 actionAndMsgDesc = "NO ACTION or MSG ID";
   352             } else {
   353                 actionAndMsgDesc = "'" + action + "' and msgId '" + msgId + "'";
   354             }
   356             String tubeDesc;
   357             if (next != null) {
   358                 tubeDesc = next.toString() + ".processRequest()";
   359             } else {
   360                 tubeDesc = peekCont() + ".processResponse()";
   361             }
   363             LOGGER.log(Level.FINE, "{0} {1} with {2} and ''current'' tube {3} from thread {4} with Packet: {5}", new Object[]{getName(), desc, actionAndMsgDesc, tubeDesc, Thread.currentThread().getName(), packet != null ? packet.toShortString() : null});
   364         }
   365     }
   367     /**
   368      * Starts the execution of this fiber.
   369      *
   370      * If forceSync is true, then the fiber is started for an ostensibly async invocation,
   371      * but allows for some portion of the tubeline to run sync with the calling
   372      * client instance (Port/Dispatch instance). This allows tubes that enforce
   373      * ordering to see requests in the order they were sent at the point the
   374      * client invoked them.
   375      * <p>
   376      * The forceSync parameter will be true only when the caller (e.g. AsyncInvoker or
   377      * SEIStub) knows one or more tubes need to enforce ordering and thus need
   378      * to run sync with the client. Such tubes can return
   379      * NextAction.INVOKE_ASYNC to indicate that the next tube in the tubeline
   380      * should be invoked async to the current thread.
   381      *
   382      * <p>
   383      * This method works like {@link Thread#start()}.
   384      *
   385      * @param tubeline
   386      *      The first tube of the tubeline that will act on the packet.
   387      * @param request
   388      *      The request packet to be passed to <tt>startPoint.processRequest()</tt>.
   389      * @param completionCallback
   390      *      The callback to be invoked when the processing is finished and the
   391      *      final response packet is available.
   392      *
   393      * @see #start(Tube,Packet,CompletionCallback)
   394      * @see #runSync(Tube,Packet)
   395      * @since 2.2.6
   396      */
   397     public void start(@NotNull Tube tubeline, @NotNull Packet request, @Nullable CompletionCallback completionCallback, boolean forceSync) {
   398         next = tubeline;
   399         this.packet = request;
   400         this.completionCallback = completionCallback;
   402         if (forceSync) {
   403                 this.startedSync = true;
   404                 dumpFiberContext("starting (sync)");
   405                 run();
   406         } else {
   407                 this.started = true;
   408                 dumpFiberContext("starting (async)");
   409                 owner.addRunnable(this);
   410         }
   411     }
   413     /**
   414      * Wakes up a suspended fiber.
   415      * <p/>
   416      * <p/>
   417      * If a fiber was suspended without specifying the next {@link Tube},
   418      * then the execution will be resumed in the response processing direction,
   419      * by calling the {@link Tube#processResponse(Packet)} method on the next/first
   420      * {@link Tube} in the {@link Fiber}'s processing stack with the specified resume
   421      * packet as the parameter.
   422      * <p/>
   423      * <p/>
   424      * If a fiber was suspended with specifying the next {@link Tube},
   425      * then the execution will be resumed in the request processing direction,
   426      * by calling the next tube's {@link Tube#processRequest(Packet)} method with the
   427      * specified resume packet as the parameter.
   428      * <p/>
   429      * <p/>
   430      * This method is implemented in a race-free way. Another thread can invoke
   431      * this method even before this fiber goes into the suspension mode. So the caller
   432      * need not worry about synchronizing {@link NextAction#suspend()} and this method.
   433      *
   434      * @param resumePacket packet used in the resumed processing
   435      */
   436     public void resume(@NotNull Packet resumePacket) {
   437         resume(resumePacket, false);
   438     }
   440     /**
   441      * Similar to resume(Packet) but allowing the Fiber to be resumed
   442      * synchronously (in the current Thread). If you want to know when the
   443      * fiber completes (not when this method returns) then add/wrap a
   444      * CompletionCallback on this Fiber.
   445      * For example, an asynchronous response endpoint that supports WS-ReliableMessaging
   446      * including in-order message delivery may need to resume the Fiber synchronously
   447      * until message order is confirmed prior to returning to asynchronous processing.
   448      * @since 2.2.6
   449      */
   450     public void resume(@NotNull Packet resumePacket,
   451     boolean forceSync) {
   452         resume(resumePacket, forceSync, null);
   453     }
   455     /**
   456      * Similar to resume(Packet, boolean) but allowing the Fiber to be resumed
   457      * and at the same time atomically assign a new CompletionCallback to it.
   458      * @since 2.2.6
   459      */
   460     public void resume(@NotNull Packet resumePacket,
   461                        boolean forceSync,
   462                        CompletionCallback callback) {
   463        lock.lock();
   464        try {
   465            if (callback != null) {
   466              setCompletionCallback(callback);
   467            }
   468            if(isTraceEnabled())
   469                 LOGGER.log(Level.FINE, "{0} resuming. Will have suspendedCount={1}", new Object[]{getName(), suspendedCount-1});
   470                 packet = resumePacket;
   471                 if( --suspendedCount == 0 ) {
   472                    if (!isInsideSuspendCallbacks) {
   473                         List<Listener> listeners = getCurrentListeners();
   474                         for (Listener listener: listeners) {
   475                             try {
   476                                 listener.fiberResumed(this);
   477                             } catch (Throwable e) {
   478                                 if (isTraceEnabled())
   479                                         LOGGER.log(Level.FINE, "Listener {0} threw exception: {1}", new Object[]{listener, e.getMessage()});
   480                             }
   481                         }
   483                         if(synchronous) {
   484                             condition.signalAll();
   485                         } else if (forceSync || startedSync) {
   486                             run();
   487                         } else {
   488                             dumpFiberContext("resuming (async)");
   489                             owner.addRunnable(this);
   490                         }
   491                    }
   492                 } else {
   493                     if (isTraceEnabled()) {
   494                         LOGGER.log(Level.FINE, "{0} taking no action on resume because suspendedCount != 0: {1}", new Object[]{getName(), suspendedCount});
   495                 }
   496             }
   497        } finally {
   498            lock.unlock();
   499        }
   500     }
   502     /**
   503      * Wakes up a suspended fiber and begins response processing.
   504      * @since 2.2.6
   505      */
   506     public void resumeAndReturn(@NotNull Packet resumePacket,
   507                                 boolean forceSync) {
   508         if(isTraceEnabled())
   509             LOGGER.log(Level.FINE, "{0} resumed with Return Packet", getName());
   510         next = null;
   511         resume(resumePacket, forceSync);
   512     }
   514     /**
   515      * Wakes up a suspended fiber with an exception.
   516      * <p/>
   517      * <p/>
   518      * The execution of the suspended fiber will be resumed in the response
   519      * processing direction, by calling the {@link Tube#processException(Throwable)} method
   520      * on the next/first {@link Tube} in the {@link Fiber}'s processing stack with
   521      * the specified exception as the parameter.
   522      * <p/>
   523      * <p/>
   524      * This method is implemented in a race-free way. Another thread can invoke
   525      * this method even before this fiber goes into the suspension mode. So the caller
   526      * need not worry about synchronizing {@link NextAction#suspend()} and this method.
   527      *
   528      * @param throwable exception that is used in the resumed processing
   529      */
   530     public void resume(@NotNull Throwable throwable) {
   531         resume(throwable, packet, false);
   532     }
   534     /**
   535      * Wakes up a suspended fiber with an exception.
   536      * <p/>
   537      * <p/>
   538      * The execution of the suspended fiber will be resumed in the response
   539      * processing direction, by calling the {@link Tube#processException(Throwable)} method
   540      * on the next/first {@link Tube} in the {@link Fiber}'s processing stack with
   541      * the specified exception as the parameter.
   542      * <p/>
   543      * <p/>
   544      * This method is implemented in a race-free way. Another thread can invoke
   545      * this method even before this fiber goes into the suspension mode. So the caller
   546      * need not worry about synchronizing {@link NextAction#suspend()} and this method.
   547      *
   548      * @param throwable exception that is used in the resumed processing
   549      * @param packet Packet that will be visible on the Fiber after the resume
   550      * @since 2.2.8
   551      */
   552     public void resume(@NotNull Throwable throwable, @NotNull Packet packet) {
   553         resume(throwable, packet, false);
   554     }
   556     /**
   557      * Wakes up a suspend fiber with an exception.
   558      *
   559      * If forceSync is true, then the suspended fiber will resume with
   560      * synchronous processing on the current thread.  This will continue
   561      * until some Tube indicates that it is safe to switch to asynchronous
   562      * processing.
   563      *
   564      * @param error exception that is used in the resumed processing
   565      * @param forceSync if processing begins synchronously
   566      * @since 2.2.6
   567      */
   568     public void resume(@NotNull Throwable error,
   569                        boolean forceSync) {
   570         resume(error, packet, forceSync);
   571     }
   573     /**
   574      * Wakes up a suspend fiber with an exception.
   575      *
   576      * If forceSync is true, then the suspended fiber will resume with
   577      * synchronous processing on the current thread.  This will continue
   578      * until some Tube indicates that it is safe to switch to asynchronous
   579      * processing.
   580      *
   581      * @param error exception that is used in the resumed processing
   582      * @param packet Packet that will be visible on the Fiber after the resume
   583      * @param forceSync if processing begins synchronously
   584      * @since 2.2.8
   585      */
   586     public void resume(@NotNull Throwable error,
   587                        @NotNull Packet packet,
   588                        boolean forceSync) {
   589         if(isTraceEnabled())
   590             LOGGER.log(Level.FINE, "{0} resumed with Return Throwable", getName());
   591         next = null;
   592         throwable = error;
   593         resume(packet, forceSync);
   594     }
   596     /**
   597      * Marks this Fiber as cancelled.  A cancelled Fiber will never invoke its completion callback
   598      * @param mayInterrupt if cancel should use {@link Thread#interrupt()}
   599      * @see java.util.concurrent.Future#cancel(boolean)
   600      * @since 2.2.6
   601      */
   602     @Override
   603     public void cancel(boolean mayInterrupt) {
   604         isCanceled = true;
   605         if (mayInterrupt) {
   606             // synchronized(this) is used as Thread running Fiber will be holding lock
   607             synchronized(this) {
   608                 if (currentThread != null)
   609                     currentThread.interrupt();
   610             }
   611         }
   612     }
   614     /**
   615      * Suspends this fiber's execution until the resume method is invoked.
   616      * <p/>
   617      * The call returns immediately, and when the fiber is resumed
   618      * the execution picks up from the last scheduled continuation.
   619      * @param onExitRunnable runnable to be invoked after fiber is marked for suspension
   620      * @return if control loop must exit
   621      */
   622     private boolean suspend(Holder<Boolean> isRequireUnlock, Runnable onExitRunnable) {
   623         if(isTraceEnabled()) {
   624             LOGGER.log(Level.FINE, "{0} suspending. Will have suspendedCount={1}", new Object[]{getName(), suspendedCount+1});
   625             if (suspendedCount > 0) {
   626                 LOGGER.log(Level.FINE, "WARNING - {0} suspended more than resumed. Will require more than one resume to actually resume this fiber.", getName());
   627             }
   628         }
   630         List<Listener> listeners = getCurrentListeners();
   631         if (++suspendedCount == 1) {
   632             isInsideSuspendCallbacks = true;
   633             try {
   634                 for (Listener listener: listeners) {
   635                     try {
   636                         listener.fiberSuspended(this);
   637                     } catch (Throwable e) {
   638                         if(isTraceEnabled())
   639                             LOGGER.log(Level.FINE, "Listener {0} threw exception: {1}", new Object[]{listener, e.getMessage()});
   640                     }
   641                 }
   642             } finally {
   643                 isInsideSuspendCallbacks = false;
   644             }
   645         }
   647         if (suspendedCount <= 0) {
   648             // suspend callback caused fiber to resume
   649             for (Listener listener: listeners) {
   650                 try {
   651                     listener.fiberResumed(this);
   652                 } catch (Throwable e) {
   653                         if(isTraceEnabled())
   654                                 LOGGER.log(Level.FINE, "Listener {0} threw exception: {1}", new Object[]{listener, e.getMessage()});
   655                 }
   656             }
   658         } else if (onExitRunnable != null) {
   659             // synchronous use cases cannot disconnect from the current thread
   660             if (!synchronous) {
   661                 /* INTENTIONALLY UNLOCKING EARLY */
   662                 synchronized(this) {
   663                     // currentThread is protected by the monitor for this fiber so
   664                     // that it is accessible to cancel() even when the lock is held
   665                     currentThread = null;
   666                 }
   667                 lock.unlock();
   668                 assert(!lock.isHeldByCurrentThread());
   669                 isRequireUnlock.value = Boolean.FALSE;
   671                 try {
   672                     onExitRunnable.run();
   673                 } catch(Throwable t) {
   674                     throw new OnExitRunnableException(t);
   675                 }
   677                 return true;
   679             } else {
   680                 // for synchronous we will stay with current thread, so do not disconnect
   681                 if (isTraceEnabled())
   682                     LOGGER.fine("onExitRunnable used with synchronous Fiber execution -- not exiting current thread");
   683                 onExitRunnable.run();
   684             }
   685         }
   687         return false;
   688     }
   690     private static final class OnExitRunnableException extends RuntimeException {
   691         private static final long serialVersionUID = 1L;
   693         Throwable target;
   695         public OnExitRunnableException(Throwable target) {
   696             super((Throwable)null); // see pattern for InvocationTargetException
   697             this.target = target;
   698         }
   699     }
   701     /**
   702      * Adds a new {@link FiberContextSwitchInterceptor} to this fiber.
   703      * <p/>
   704      * <p/>
   705      * The newly installed fiber will take effect immediately after the current
   706      * tube returns from its {@link Tube#processRequest(Packet)} or
   707      * {@link Tube#processResponse(Packet)}, before the next tube begins processing.
   708      * <p/>
   709      * <p/>
   710      * So when the tubeline consists of X and Y, and when X installs an interceptor,
   711      * the order of execution will be as follows:
   712      * <p/>
   713      * <ol>
   714      * <li>X.processRequest()
   715      * <li>interceptor gets installed
   716      * <li>interceptor.execute() is invoked
   717      * <li>Y.processRequest()
   718      * </ol>
   719      */
   720     public synchronized void addInterceptor(@NotNull FiberContextSwitchInterceptor interceptor) {
   721         if (interceptors == null) {
   722             interceptors = new ArrayList<FiberContextSwitchInterceptor>();
   723         } else {
   724             List<FiberContextSwitchInterceptor> l = new ArrayList<FiberContextSwitchInterceptor>();
   725             l.addAll(interceptors);
   726             interceptors = l;
   727         }
   728         interceptors.add(interceptor);
   729     }
   731     /**
   732      * Removes a {@link FiberContextSwitchInterceptor} from this fiber.
   733      * <p/>
   734      * <p/>
   735      * The removal of the interceptor takes effect immediately after the current
   736      * tube returns from its {@link Tube#processRequest(Packet)} or
   737      * {@link Tube#processResponse(Packet)}, before the next tube begins processing.
   738      * <p/>
   739      * <p/>
   740      * <p/>
   741      * So when the tubeline consists of X and Y, and when Y uninstalls an interceptor
   742      * on the way out, then the order of execution will be as follows:
   743      * <p/>
   744      * <ol>
   745      * <li>Y.processResponse() (notice that this happens with interceptor.execute() in the callstack)
   746      * <li>interceptor gets uninstalled
   747      * <li>interceptor.execute() returns
   748      * <li>X.processResponse()
   749      * </ol>
   750      *
   751      * @return true if the specified interceptor was removed. False if
   752      *         the specified interceptor was not registered with this fiber to begin with.
   753      */
   754     public synchronized boolean removeInterceptor(@NotNull FiberContextSwitchInterceptor interceptor) {
   755         if (interceptors != null) {
   756             boolean result = interceptors.remove(interceptor);
   757             if (interceptors.isEmpty())
   758                 interceptors = null;
   759             else {
   760                 List<FiberContextSwitchInterceptor> l = new ArrayList<FiberContextSwitchInterceptor>();
   761                 l.addAll(interceptors);
   762                 interceptors = l;
   763             }
   764             return result;
   765         }
   766         return false;
   767     }
   769     /**
   770      * Gets the context {@link ClassLoader} of this fiber.
   771      */
   772     public
   773     @Nullable
   774     ClassLoader getContextClassLoader() {
   775         return contextClassLoader;
   776     }
   778     /**
   779      * Sets the context {@link ClassLoader} of this fiber.
   780      */
   781     public ClassLoader setContextClassLoader(@Nullable ClassLoader contextClassLoader) {
   782         ClassLoader r = this.contextClassLoader;
   783         this.contextClassLoader = contextClassLoader;
   784         return r;
   785     }
   787     /**
   788      * DO NOT CALL THIS METHOD. This is an implementation detail
   789      * of {@link Fiber}.
   790      */
   791     @Deprecated
   792     @Override
   793     public void run() {
   794         Container old = ContainerResolver.getDefault().enterContainer(owner.getContainer());
   795         try {
   796             assert !synchronous;
   797             // doRun returns true to indicate an early exit from fiber processing
   798             if (!doRun()) {
   799                 if (startedSync && suspendedCount == 0 &&
   800                     (next != null || contsSize > 0)) {
   801                     // We bailed out of running this fiber we started as sync, and now
   802                     // want to finish running it async
   803                     startedSync = false;
   804                     // Start back up as an async fiber
   805                     dumpFiberContext("restarting (async) after startSync");
   806                     owner.addRunnable(this);
   807                 } else {
   808                     completionCheck();
   809                 }
   810             }
   811         } finally {
   812             ContainerResolver.getDefault().exitContainer(old);
   813         }
   814     }
   816     /**
   817      * Runs a given {@link Tube} (and everything thereafter) synchronously.
   818      * <p/>
   819      * <p/>
   820      * This method blocks and returns only when all the successive {@link Tube}s
   821      * complete their request/response processing. This method can be used
   822      * if a {@link Tube} needs to fallback to synchronous processing.
   823      * <p/>
   824      * <h3>Example:</h3>
   825      * <pre>
   826      * class FooTube extends {@link AbstractFilterTubeImpl} {
   827      *   NextAction processRequest(Packet request) {
   828      *     // run everything synchronously and return with the response packet
   829      *     return doReturnWith(Fiber.current().runSync(next,request));
   830      *   }
   831      *   NextAction processResponse(Packet response) {
   832      *     // never be invoked
   833      *   }
   834      * }
   835      * </pre>
   836      *
   837      * @param tubeline The first tube of the tubeline that will act on the packet.
   838      * @param request  The request packet to be passed to <tt>startPoint.processRequest()</tt>.
   839      * @return The response packet to the <tt>request</tt>.
   840      * @see #start(Tube, Packet, CompletionCallback)
   841      */
   842     public
   843     @NotNull
   844     Packet runSync(@NotNull Tube tubeline, @NotNull Packet request) {
   845         lock.lock();
   846         try {
   847             // save the current continuation, so that we return runSync() without executing them.
   848             final Tube[] oldCont = conts;
   849             final int oldContSize = contsSize;
   850             final boolean oldSynchronous = synchronous;
   851             final Tube oldNext = next;
   853             if (oldContSize > 0) {
   854                 conts = new Tube[16];
   855                 contsSize = 0;
   856             }
   858             try {
   859                 synchronous = true;
   860                 this.packet = request;
   861                 next = tubeline;
   862                 doRun();
   863                 if (throwable != null) {
   864                     if (isDeliverThrowableInPacket) {
   865                         packet.addSatellite(new ThrowableContainerPropertySet(throwable));
   866                     } else {
   867                         if (throwable instanceof RuntimeException) {
   868                             throw (RuntimeException) throwable;
   869                         }
   870                         if (throwable instanceof Error) {
   871                             throw (Error) throwable;
   872                         }
   873                         // our system is supposed to only accept Error or RuntimeException
   874                         throw new AssertionError(throwable);
   875                     }
   876                 }
   877                 return this.packet;
   878             } finally {
   879                 conts = oldCont;
   880                 contsSize = oldContSize;
   881                 synchronous = oldSynchronous;
   882                 next = oldNext;
   883                 if(interrupted) {
   884                     Thread.currentThread().interrupt();
   885                     interrupted = false;
   886                 }
   887                 if(!started && !startedSync)
   888                     completionCheck();
   889             }
   890         } finally {
   891             lock.unlock();
   892         }
   893     }
   895     private void completionCheck() {
   896         lock.lock();
   897         try {
   898             // Don't trigger completion and callbacks if fiber is suspended
   899             if(!isCanceled && contsSize==0 && suspendedCount == 0) {
   900                 if(isTraceEnabled())
   901                     LOGGER.log(Level.FINE, "{0} completed", getName());
   902                 clearListeners();
   903                 condition.signalAll();
   904                 if (completionCallback != null) {
   905                     if (throwable != null) {
   906                         if (isDeliverThrowableInPacket) {
   907                             packet.addSatellite(new ThrowableContainerPropertySet(throwable));
   908                             completionCallback.onCompletion(packet);
   909                         } else
   910                             completionCallback.onCompletion(throwable);
   911                     } else
   912                         completionCallback.onCompletion(packet);
   913                 }
   914             }
   915         } finally {
   916             lock.unlock();
   917         }
   918     }
   920     /**
   921      * Invokes all registered {@link InterceptorHandler}s and then call into
   922      * {@link Fiber#__doRun()}.
   923      */
   924     private class InterceptorHandler implements FiberContextSwitchInterceptor.Work<Tube, Tube> {
   925         private final Holder<Boolean> isUnlockRequired;
   926         private final List<FiberContextSwitchInterceptor> ints;
   928         /**
   929          * Index in {@link Fiber#interceptors} to invoke next.
   930          */
   931         private int idx;
   933         public InterceptorHandler(Holder<Boolean> isUnlockRequired, List<FiberContextSwitchInterceptor> ints) {
   934             this.isUnlockRequired = isUnlockRequired;
   935             this.ints = ints;
   936         }
   938         /**
   939          * Initiate the interception, and eventually invokes {@link Fiber#__doRun()}.
   940          */
   941         Tube invoke(Tube next) {
   942             idx = 0;
   943             return execute(next);
   944         }
   946         @Override
   947         public Tube execute(Tube next) {
   948             if (idx == ints.size()) {
   949                 Fiber.this.next = next;
   950                 if (__doRun(isUnlockRequired, ints))
   951                     return PLACEHOLDER;
   952             } else {
   953                 FiberContextSwitchInterceptor interceptor = ints.get(idx++);
   954                 return interceptor.execute(Fiber.this, next, this);
   955             }
   956             return Fiber.this.next;
   957         }
   958     }
   960     private static final PlaceholderTube PLACEHOLDER = new PlaceholderTube();
   962     private static class PlaceholderTube extends AbstractTubeImpl {
   964         @Override
   965         public NextAction processRequest(Packet request) {
   966             throw new UnsupportedOperationException();
   967         }
   969         @Override
   970         public NextAction processResponse(Packet response) {
   971             throw new UnsupportedOperationException();
   972         }
   974         @Override
   975         public NextAction processException(Throwable t) {
   976             return doThrow(t);
   977         }
   979         @Override
   980         public void preDestroy() {
   981         }
   983         @Override
   984         public PlaceholderTube copy(TubeCloner cloner) {
   985             throw new UnsupportedOperationException();
   986         }
   987     }
   989     /**
   990      * Executes the fiber as much as possible.
   991      *
   992      */
   993     private boolean doRun() {
   994         dumpFiberContext("running");
   996         if (serializeExecution) {
   997             serializedExecutionLock.lock();
   998             try {
   999                 return _doRun(next);
  1000             } finally {
  1001                 serializedExecutionLock.unlock();
  1003         } else {
  1004             return _doRun(next);
  1008     private boolean _doRun(Tube next) {
  1009         // isRequireUnlock will contain Boolean.FALSE when lock has already been released in suspend
  1010         Holder<Boolean> isRequireUnlock = new Holder<Boolean>(Boolean.TRUE);
  1011         lock.lock();
  1012         try {
  1013             List<FiberContextSwitchInterceptor> ints;
  1014             ClassLoader old;
  1015             synchronized(this) {
  1016                 ints = interceptors;
  1018                 // currentThread is protected by the monitor for this fiber so
  1019                 // that it is accessible to cancel() even when the lock is held
  1020                 currentThread = Thread.currentThread();
  1021                 if (isTraceEnabled()) {
  1022                     LOGGER.log(Level.FINE, "Thread entering _doRun(): {0}", currentThread);
  1025                 old = currentThread.getContextClassLoader();
  1026                 currentThread.setContextClassLoader(contextClassLoader);
  1029             try {
  1030                 boolean needsToReenter;
  1031                 do {
  1032                     // if interceptors are set, go through the interceptors.
  1033                     if (ints == null) {
  1034                         this.next = next;
  1035                         if (__doRun(isRequireUnlock, null /*ints*/)) {
  1036                             return true;
  1038                     } else {
  1039                         next = new InterceptorHandler(isRequireUnlock, ints).invoke(next);
  1040                         if (next == PLACEHOLDER) {
  1041                             return true;
  1045                     synchronized(this) {
  1046                         needsToReenter = (ints != interceptors);
  1047                         if (needsToReenter)
  1048                             ints = interceptors;
  1050                 } while (needsToReenter);
  1051             } catch(OnExitRunnableException o) {
  1052                 // catching this exception indicates onExitRunnable in suspend() threw.
  1053                 // we must still avoid double unlock
  1054                 Throwable t = o.target;
  1055                 if (t instanceof WebServiceException)
  1056                     throw (WebServiceException) t;
  1057                 throw new WebServiceException(t);
  1058             } finally {
  1059                 // don't reference currentThread here because fiber processing
  1060                 // may already be running on a different thread (Note: isAlreadyExited
  1061                 // tracks this state
  1062                 Thread thread = Thread.currentThread();
  1063                 thread.setContextClassLoader(old);
  1064                 if (isTraceEnabled()) {
  1065                     LOGGER.log(Level.FINE, "Thread leaving _doRun(): {0}", thread);
  1069             return false;
  1070         } finally {
  1071             if (isRequireUnlock.value) {
  1072                 synchronized(this) {
  1073                     currentThread = null;
  1075                 lock.unlock();
  1080     /**
  1081      * To be invoked from {@link #doRun()}.
  1083      * @see #doRun()
  1084      */
  1085     private boolean __doRun(Holder<Boolean> isRequireUnlock, List<FiberContextSwitchInterceptor> originalInterceptors) {
  1086         assert(lock.isHeldByCurrentThread());
  1088         final Fiber old = CURRENT_FIBER.get();
  1089         CURRENT_FIBER.set(this);
  1091         // if true, lots of debug messages to show what's being executed
  1092         final boolean traceEnabled = LOGGER.isLoggable(Level.FINER);
  1094         try {
  1095             boolean abortResponse = false;
  1096             while(isReady(originalInterceptors)) {
  1097                 if (isCanceled) {
  1098                     next = null;
  1099                     throwable = null;
  1100                     contsSize = 0;
  1101                     break;
  1104                 try {
  1105                     NextAction na;
  1106                     Tube last;
  1107                     if(throwable!=null) {
  1108                         if(contsSize==0 || abortResponse) {
  1109                             contsSize = 0; // abortResponse case
  1110                             // nothing else to execute. we are done.
  1111                             return false;
  1113                         last = popCont();
  1114                         if (traceEnabled)
  1115                             LOGGER.log(Level.FINER, "{0} {1}.processException({2})", new Object[]{getName(), last, throwable});
  1116                         na = last.processException(throwable);
  1117                     } else {
  1118                         if(next!=null) {
  1119                             if(traceEnabled)
  1120                                 LOGGER.log(Level.FINER, "{0} {1}.processRequest({2})", new Object[]{getName(), next, packet != null ? "Packet@"+Integer.toHexString(packet.hashCode()) : "null"});
  1121                             na = next.processRequest(packet);
  1122                             last = next;
  1123                         } else {
  1124                             if(contsSize==0 || abortResponse) {
  1125                                 // nothing else to execute. we are done.
  1126                                 contsSize = 0;
  1127                                 return false;
  1129                             last = popCont();
  1130                             if(traceEnabled)
  1131                                 LOGGER.log(Level.FINER, "{0} {1}.processResponse({2})", new Object[]{getName(), last, packet != null ? "Packet@"+Integer.toHexString(packet.hashCode()) : "null"});
  1132                             na = last.processResponse(packet);
  1136                     if (traceEnabled)
  1137                         LOGGER.log(Level.FINER, "{0} {1} returned with {2}", new Object[]{getName(), last, na});
  1139                     // If resume is called before suspend, then make sure
  1140                     // resume(Packet) is not lost
  1141                     if (na.kind != NextAction.SUSPEND) {
  1142                         // preserve in-flight packet so that processException may inspect
  1143                         if (na.kind != NextAction.THROW &&
  1144                           na.kind != NextAction.THROW_ABORT_RESPONSE)
  1145                                 packet = na.packet;
  1146                         throwable = na.throwable;
  1149                     switch(na.kind) {
  1150                     case NextAction.INVOKE:
  1151                     case NextAction.INVOKE_ASYNC:
  1152                         pushCont(last);
  1153                         // fall through next
  1154                     case NextAction.INVOKE_AND_FORGET:
  1155                         next = na.next;
  1156                         if (na.kind == NextAction.INVOKE_ASYNC
  1157                             && startedSync) {
  1158                           // Break out here
  1159                           return false;
  1161                         break;
  1162                     case NextAction.THROW_ABORT_RESPONSE:
  1163                     case NextAction.ABORT_RESPONSE:
  1164                         abortResponse = true;
  1165                         if (isTraceEnabled()) {
  1166                           LOGGER.log(Level.FINE, "Fiber {0} is aborting a response due to exception: {1}", new Object[]{this, na.throwable});
  1168                     case NextAction.RETURN:
  1169                     case NextAction.THROW:
  1170                         next = null;
  1171                         break;
  1172                     case NextAction.SUSPEND:
  1173                         if (next != null) {
  1174                           // Only store the 'last' tube when we're processing
  1175                           // a request, since conts array is for processResponse
  1176                           pushCont(last);
  1178                         next = na.next;
  1179                         if(suspend(isRequireUnlock, na.onExitRunnable))
  1180                             return true; // explicitly exiting control loop
  1181                         break;
  1182                     default:
  1183                         throw new AssertionError();
  1185                 } catch (RuntimeException t) {
  1186                     if (traceEnabled)
  1187                         LOGGER.log(Level.FINER, getName() + " Caught " + t + ". Start stack unwinding", t);
  1188                     throwable = t;
  1189                 } catch (Error t) {
  1190                     if (traceEnabled)
  1191                         LOGGER.log(Level.FINER, getName() + " Caught " + t + ". Start stack unwinding", t);
  1192                     throwable = t;
  1195                 dumpFiberContext("After tube execution");
  1198             // there's nothing we can execute right away.
  1199             // we'll be back when this fiber is resumed.
  1201         } finally {
  1202             CURRENT_FIBER.set(old);
  1205         return false;
  1208     private void pushCont(Tube tube) {
  1209         conts[contsSize++] = tube;
  1211         // expand if needed
  1212         int len = conts.length;
  1213         if (contsSize == len) {
  1214             Tube[] newBuf = new Tube[len * 2];
  1215             System.arraycopy(conts, 0, newBuf, 0, len);
  1216             conts = newBuf;
  1220     private Tube popCont() {
  1221         return conts[--contsSize];
  1224     private Tube peekCont() {
  1225         int index = contsSize - 1;
  1226         if (index >= 0 && index < conts.length) {
  1227           return conts[index];
  1228         } else {
  1229           return null;
  1233     /**
  1234      * Only to be used by Tubes that manipulate the Fiber to create alternate flows
  1235      * @since 2.2.6
  1236      */
  1237     public void resetCont(Tube[] conts, int contsSize) {
  1238         this.conts = conts;
  1239         this.contsSize = contsSize;
  1242     /**
  1243      * Returns true if the fiber is ready to execute.
  1244      */
  1245     private boolean isReady(List<FiberContextSwitchInterceptor> originalInterceptors) {
  1246         if (synchronous) {
  1247             while (suspendedCount == 1)
  1248                 try {
  1249                     if (isTraceEnabled()) {
  1250                         LOGGER.log(Level.FINE, "{0} is blocking thread {1}", new Object[]{getName(), Thread.currentThread().getName()});
  1252                     condition.await(); // the synchronized block is the whole runSync method.
  1253                 } catch (InterruptedException e) {
  1254                     // remember that we are interrupted, but don't respond to it
  1255                     // right away. This behavior is in line with what happens
  1256                     // when you are actually running the whole thing synchronously.
  1257                     interrupted = true;
  1260             synchronized(this) {
  1261                 return interceptors == originalInterceptors;
  1264         else {
  1265             if (suspendedCount>0)
  1266                 return false;
  1267             synchronized(this) {
  1268                 return interceptors == originalInterceptors;
  1273     private String getName() {
  1274         return "engine-" + owner.id + "fiber-" + id;
  1277     @Override
  1278     public String toString() {
  1279         return getName();
  1282     /**
  1283      * Gets the current {@link Packet} associated with this fiber.
  1284      * <p/>
  1285      * <p/>
  1286      * This method returns null if no packet has been associated with the fiber yet.
  1287      */
  1288     public
  1289     @Nullable
  1290     Packet getPacket() {
  1291         return packet;
  1294     /**
  1295      * Returns completion callback associated with this Fiber
  1296      * @return Completion callback
  1297      * @since 2.2.6
  1298      */
  1299     public CompletionCallback getCompletionCallback() {
  1300         return completionCallback;
  1303     /**
  1304      * Updates completion callback associated with this Fiber
  1305      * @param completionCallback Completion callback
  1306      * @since 2.2.6
  1307      */
  1308     public void setCompletionCallback(CompletionCallback completionCallback) {
  1309         this.completionCallback = completionCallback;
  1312     /**
  1313      * (ADVANCED) Returns true if the current fiber is being executed synchronously.
  1314      * <p/>
  1315      * <p/>
  1316      * Fiber may run synchronously for various reasons. Perhaps this is
  1317      * on client side and application has invoked a synchronous method call.
  1318      * Perhaps this is on server side and we have deployed on a synchronous
  1319      * transport (like servlet.)
  1320      * <p/>
  1321      * <p/>
  1322      * When a fiber is run synchronously (IOW by {@link #runSync(Tube, Packet)}),
  1323      * further invocations to {@link #runSync(Tube, Packet)} can be done
  1324      * without degrading the performance.
  1325      * <p/>
  1326      * <p/>
  1327      * So this value can be used as a further optimization hint for
  1328      * advanced {@link Tube}s to choose the best strategy to invoke
  1329      * the next {@link Tube}. For example, a tube may want to install
  1330      * a {@link FiberContextSwitchInterceptor} if running async, yet
  1331      * it might find it faster to do {@link #runSync(Tube, Packet)}
  1332      * if it's already running synchronously.
  1333      */
  1334     public static boolean isSynchronous() {
  1335         return current().synchronous;
  1338     /**
  1339      * Returns true if the current Fiber on the current thread was started
  1340      * synchronously. Note, this is not strictly the same as being synchronous
  1341      * because the assumption is that the Fiber will ultimately be dispatched
  1342      * asynchronously, possibly have a completion callback associated with it, etc.
  1343      * Note, the 'startedSync' flag is cleared once the current Fiber is
  1344      * converted to running asynchronously.
  1345      * @since 2.2.6
  1346      */
  1347     public boolean isStartedSync() {
  1348         return startedSync;
  1351     /**
  1352      * Gets the current fiber that's running.
  1353      * <p/>
  1354      * <p/>
  1355      * This works like {@link Thread#currentThread()}.
  1356      * This method only works when invoked from {@link Tube}.
  1357      */
  1358     public static
  1359     @NotNull
  1360     @SuppressWarnings({"null", "ConstantConditions"})
  1361     Fiber current() {
  1362         Fiber fiber = CURRENT_FIBER.get();
  1363         if (fiber == null)
  1364             throw new IllegalStateException("Can be only used from fibers");
  1365         return fiber;
  1368     /**
  1369      * Gets the current fiber that's running, if set.
  1370      */
  1371     public static Fiber getCurrentIfSet() {
  1372         return CURRENT_FIBER.get();
  1375     private static final ThreadLocal<Fiber> CURRENT_FIBER = new ThreadLocal<Fiber>();
  1377     /**
  1378      * Used to allocate unique number for each fiber.
  1379      */
  1380     private static final AtomicInteger iotaGen = new AtomicInteger();
  1382     private static boolean isTraceEnabled() {
  1383         return LOGGER.isLoggable(Level.FINE);
  1386     private static final Logger LOGGER = Logger.getLogger(Fiber.class.getName());
  1389     private static final ReentrantLock serializedExecutionLock = new ReentrantLock();
  1391     /**
  1392      * Set this boolean to true to execute fibers sequentially one by one.
  1393      * See class javadoc.
  1394      */
  1395     public static volatile boolean serializeExecution = Boolean.getBoolean(Fiber.class.getName() + ".serialize");
  1397     private final Set<Component> components = new CopyOnWriteArraySet<Component>();
  1399     @Override
  1400     public <S> S getSPI(Class<S> spiType) {
  1401         for (Component c : components) {
  1402             S spi = c.getSPI(spiType);
  1403             if (spi != null) {
  1404                 return spi;
  1407         return null;
  1410     @Override
  1411     public Set<Component> getComponents() {
  1412         return components;

mercurial