src/share/jaxws_classes/com/sun/xml/internal/ws/api/pipe/Fiber.java

changeset 0
373ffda63c9a
child 637
9c07ef4934dd
equal deleted inserted replaced
-1:000000000000 0:373ffda63c9a
1 /*
2 * Copyright (c) 1997, 2013, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26 package com.sun.xml.internal.ws.api.pipe;
27
28 import com.sun.istack.internal.NotNull;
29 import com.sun.istack.internal.Nullable;
30 import com.sun.xml.internal.ws.api.Cancelable;
31 import com.sun.xml.internal.ws.api.Component;
32 import com.sun.xml.internal.ws.api.ComponentRegistry;
33 import com.sun.xml.internal.ws.api.SOAPVersion;
34 import com.sun.xml.internal.ws.api.addressing.AddressingVersion;
35 import com.sun.xml.internal.ws.api.message.AddressingUtils;
36 import com.sun.xml.internal.ws.api.message.Packet;
37 import com.sun.xml.internal.ws.api.pipe.helper.AbstractFilterTubeImpl;
38 import com.sun.xml.internal.ws.api.pipe.helper.AbstractTubeImpl;
39 import com.sun.xml.internal.ws.api.server.Adapter;
40 import com.sun.xml.internal.ws.api.server.Container;
41 import com.sun.xml.internal.ws.api.server.ContainerResolver;
42
43 import java.util.ArrayList;
44 import java.util.List;
45 import java.util.Set;
46 import java.util.concurrent.CopyOnWriteArraySet;
47 import java.util.concurrent.atomic.AtomicInteger;
48 import java.util.concurrent.locks.Condition;
49 import java.util.concurrent.locks.ReentrantLock;
50 import java.util.logging.Level;
51 import java.util.logging.Logger;
52
53 import javax.xml.ws.Holder;
54 import javax.xml.ws.WebServiceException;
55
56 /**
57 * User-level thread. Represents the execution of one request/response processing.
58 * <p/>
59 * <p/>
60 * JAX-WS RI is capable of running a large number of request/response concurrently by
61 * using a relatively small number of threads. This is made possible by utilizing
62 * a {@link Fiber} &mdash; a user-level thread that gets created for each request/response
63 * processing.
64 * <p/>
65 * <p/>
66 * A fiber remembers where in the pipeline the processing is at, what needs to be
67 * executed on the way out (when processing response), and other additional information
68 * specific to the execution of a particular request/response.
69 * <p/>
70 * <h2>Suspend/Resume</h2>
71 * <p/>
72 * Fiber can be {@link NextAction#suspend() suspended} by a {@link Tube}.
73 * When a fiber is suspended, it will be kept on the side until it is
74 * {@link #resume(Packet) resumed}. This allows threads to go execute
75 * other runnable fibers, allowing efficient utilization of smaller number of
76 * threads.
77 * <p/>
78 * <h2>Context-switch Interception</h2>
79 * <p/>
80 * {@link FiberContextSwitchInterceptor} allows {@link Tube}s and {@link Adapter}s
81 * to perform additional processing every time a thread starts running a fiber
82 * and stops running it.
83 * <p/>
84 * <h2>Context ClassLoader</h2>
85 * <p/>
86 * Just like thread, a fiber has a context class loader (CCL.) A fiber's CCL
87 * becomes the thread's CCL when it's executing the fiber. The original CCL
88 * of the thread will be restored when the thread leaves the fiber execution.
89 * <p/>
90 * <p/>
91 * <h2>Debugging Aid</h2>
92 * <p/>
93 * Because {@link Fiber} doesn't keep much in the call stack, and instead use
94 * {@link #conts} to store the continuation, debugging fiber related activities
95 * could be harder.
96 * <p/>
97 * <p/>
98 * Setting the {@link #LOGGER} for FINE would give you basic start/stop/resume/suspend
99 * level logging. Using FINER would cause more detailed logging, which includes
100 * what tubes are executed in what order and how they behaved.
101 * <p/>
102 * <p/>
103 * When you debug the server side, consider setting {@link Fiber#serializeExecution}
104 * to true, so that execution of fibers are serialized. Debugging a server
105 * with more than one running threads is very tricky, and this switch will
106 * prevent that. This can be also enabled by setting the system property on.
107 * See the source code.
108 *
109 * @author Kohsuke Kawaguchi
110 * @author Jitendra Kotamraju
111 */
112 public final class Fiber implements Runnable, Cancelable, ComponentRegistry {
113
114 /**
115 * Callback interface for notification of suspend and resume.
116 *
117 * @since 2.2.6
118 * @deprecated Use {@link NextAction#suspend(Runnable)}
119 */
120 public interface Listener {
121 /**
122 * Fiber has been suspended. Implementations of this callback may resume the Fiber.
123 * @param fiber Fiber
124 */
125 public void fiberSuspended(Fiber fiber);
126
127 /**
128 * Fiber has been resumed. Behavior is undefined if implementations of this callback attempt to suspend the Fiber.
129 * @param fiber Fiber
130 */
131 public void fiberResumed(Fiber fiber);
132 }
133
134 private final List<Listener> _listeners = new ArrayList<Listener>();
135
136 /**
137 * Adds suspend/resume callback listener
138 * @param listener Listener
139 * @since 2.2.6
140 * @deprecated
141 */
142 public void addListener(Listener listener) {
143 synchronized(_listeners) {
144 if (!_listeners.contains(listener)) {
145 _listeners.add(listener);
146 }
147 }
148 }
149
150 /**
151 * Removes suspend/resume callback listener
152 * @param listener Listener
153 * @since 2.2.6
154 * @deprecated
155 */
156 public void removeListener(Listener listener) {
157 synchronized(_listeners) {
158 _listeners.remove(listener);
159 }
160 }
161
162 List<Listener> getCurrentListeners() {
163 synchronized(_listeners) {
164 return new ArrayList<Listener>(_listeners);
165 }
166 }
167
168 private void clearListeners() {
169 synchronized(_listeners) {
170 _listeners.clear();
171 }
172 }
173
174 /**
175 * {@link Tube}s whose {@link Tube#processResponse(Packet)} method needs
176 * to be invoked on the way back.
177 */
178 private Tube[] conts = new Tube[16];
179 private int contsSize;
180
181 /**
182 * If this field is non-null, the next instruction to execute is
183 * to call its {@link Tube#processRequest(Packet)}. Otherwise
184 * the instruction is to call {@link #conts}.
185 */
186 private Tube next;
187
188 private Packet packet;
189
190 private Throwable/*but really it's either RuntimeException or Error*/ throwable;
191
192 public final Engine owner;
193
194 /**
195 * Is this thread suspended? 0=not suspended, 1=suspended.
196 * <p/>
197 * <p/>
198 * Logically this is just a boolean, but we need to prepare for the case
199 * where the thread is {@link #resume(Packet) resumed} before we get to the {@link #suspend()}.
200 * This happens when things happen in the following order:
201 * <p/>
202 * <ol>
203 * <li>Tube decides that the fiber needs to be suspended to wait for the external event.
204 * <li>Tube hooks up fiber with some external mechanism (like NIO channel selector)
205 * <li>Tube returns with {@link NextAction#suspend()}.
206 * <li>"External mechanism" becomes signal state and invokes {@link Fiber#resume(Packet)}
207 * to wake up fiber
208 * <li>{@link Fiber#doRun} invokes {@link Fiber#suspend()}.
209 * </ol>
210 * <p/>
211 * <p/>
212 * Using int, this will work OK because {@link #suspendedCount} becomes -1 when
213 * {@link #resume(Packet)} occurs before {@link #suspend()}.
214 * <p/>
215 * <p/>
216 * Increment and decrement is guarded by 'this' object.
217 */
218 private volatile int suspendedCount = 0;
219
220 private volatile boolean isInsideSuspendCallbacks = false;
221
222 /**
223 * Is this {@link Fiber} currently running in the synchronous mode?
224 */
225 private boolean synchronous;
226
227 private boolean interrupted;
228
229 private final int id;
230
231 /**
232 * Active {@link FiberContextSwitchInterceptor}s for this fiber.
233 */
234 private List<FiberContextSwitchInterceptor> interceptors;
235
236 /**
237 * Fiber's context {@link ClassLoader}.
238 */
239 private
240 @Nullable
241 ClassLoader contextClassLoader;
242
243 private
244 @Nullable
245 CompletionCallback completionCallback;
246
247 private boolean isDeliverThrowableInPacket = false;
248
249 public void setDeliverThrowableInPacket(boolean isDeliverThrowableInPacket) {
250 this.isDeliverThrowableInPacket = isDeliverThrowableInPacket;
251 }
252
253 /**
254 * The thread on which this Fiber is currently executing, if applicable.
255 */
256 private Thread currentThread;
257
258 /**
259 * Replace uses of synchronized(this) with this lock so that we can control
260 * unlocking for resume use cases
261 */
262 private final ReentrantLock lock = new ReentrantLock();
263 private final Condition condition = lock.newCondition();
264
265 private volatile boolean isCanceled;
266
267 /**
268 * Set to true if this fiber is started asynchronously, to avoid
269 * doubly-invoking completion code.
270 */
271 private boolean started;
272
273 /**
274 * Set to true if this fiber is started sync but allowed to run async.
275 * This property exists for use cases where the processing model is fundamentally async
276 * but some requirement or feature mandates that part of the tubeline run synchronously. For
277 * instance, WS-ReliableMessaging with non-anonymous addressing is compatible with running
278 * asynchronously, but if in-order message delivery is used then message processing must assign
279 * a message number before the remainder of the processing can be asynchronous.
280 */
281 private boolean startedSync;
282
283 /**
284 * Callback to be invoked when a {@link Fiber} finishes execution.
285 */
286 public interface CompletionCallback {
287 /**
288 * Indicates that the fiber has finished its execution.
289 * <p/>
290 * <p/>
291 * Since the JAX-WS RI runs asynchronously,
292 * this method maybe invoked by a different thread
293 * than any of the threads that started it or run a part of tubeline.
294 */
295 void onCompletion(@NotNull Packet response);
296
297 /**
298 * Indicates that the fiber has finished abnormally, by throwing a given {@link Throwable}.
299 */
300 void onCompletion(@NotNull Throwable error);
301 }
302
303 Fiber(Engine engine) {
304 this.owner = engine;
305 id = iotaGen.incrementAndGet();
306 if (isTraceEnabled()) {
307 LOGGER.log(Level.FINE, "{0} created", getName());
308 }
309
310 // if this is run from another fiber, then we naturally inherit its context classloader,
311 // so this code works for fiber->fiber inheritance just fine.
312 contextClassLoader = Thread.currentThread().getContextClassLoader();
313 }
314
315 /**
316 * Starts the execution of this fiber asynchronously.
317 * <p/>
318 * <p/>
319 * This method works like {@link Thread#start()}.
320 *
321 * @param tubeline The first tube of the tubeline that will act on the packet.
322 * @param request The request packet to be passed to <tt>startPoint.processRequest()</tt>.
323 * @param completionCallback The callback to be invoked when the processing is finished and the
324 * final response packet is available.
325 * @see #runSync(Tube, Packet)
326 */
327 public void start(@NotNull Tube tubeline, @NotNull Packet request, @Nullable CompletionCallback completionCallback) {
328 start(tubeline, request, completionCallback, false);
329 }
330
331 private void dumpFiberContext(String desc) {
332 if(isTraceEnabled()) {
333 String action = null;
334 String msgId = null;
335 if (packet != null) {
336 for (SOAPVersion sv: SOAPVersion.values()) {
337 for (AddressingVersion av: AddressingVersion.values()) {
338 action = packet.getMessage() != null ? AddressingUtils.getAction(packet.getMessage().getHeaders(), av, sv) : null;
339 msgId = packet.getMessage() != null ? AddressingUtils.getMessageID(packet.getMessage().getHeaders(), av, sv) : null;
340 if (action != null || msgId != null) {
341 break;
342 }
343 }
344 if (action != null || msgId != null) {
345 break;
346 }
347 }
348 }
349 String actionAndMsgDesc;
350 if (action == null && msgId == null) {
351 actionAndMsgDesc = "NO ACTION or MSG ID";
352 } else {
353 actionAndMsgDesc = "'" + action + "' and msgId '" + msgId + "'";
354 }
355
356 String tubeDesc;
357 if (next != null) {
358 tubeDesc = next.toString() + ".processRequest()";
359 } else {
360 tubeDesc = peekCont() + ".processResponse()";
361 }
362
363 LOGGER.log(Level.FINE, "{0} {1} with {2} and ''current'' tube {3} from thread {4} with Packet: {5}", new Object[]{getName(), desc, actionAndMsgDesc, tubeDesc, Thread.currentThread().getName(), packet != null ? packet.toShortString() : null});
364 }
365 }
366
367 /**
368 * Starts the execution of this fiber.
369 *
370 * If forceSync is true, then the fiber is started for an ostensibly async invocation,
371 * but allows for some portion of the tubeline to run sync with the calling
372 * client instance (Port/Dispatch instance). This allows tubes that enforce
373 * ordering to see requests in the order they were sent at the point the
374 * client invoked them.
375 * <p>
376 * The forceSync parameter will be true only when the caller (e.g. AsyncInvoker or
377 * SEIStub) knows one or more tubes need to enforce ordering and thus need
378 * to run sync with the client. Such tubes can return
379 * NextAction.INVOKE_ASYNC to indicate that the next tube in the tubeline
380 * should be invoked async to the current thread.
381 *
382 * <p>
383 * This method works like {@link Thread#start()}.
384 *
385 * @param tubeline
386 * The first tube of the tubeline that will act on the packet.
387 * @param request
388 * The request packet to be passed to <tt>startPoint.processRequest()</tt>.
389 * @param completionCallback
390 * The callback to be invoked when the processing is finished and the
391 * final response packet is available.
392 *
393 * @see #start(Tube,Packet,CompletionCallback)
394 * @see #runSync(Tube,Packet)
395 * @since 2.2.6
396 */
397 public void start(@NotNull Tube tubeline, @NotNull Packet request, @Nullable CompletionCallback completionCallback, boolean forceSync) {
398 next = tubeline;
399 this.packet = request;
400 this.completionCallback = completionCallback;
401
402 if (forceSync) {
403 this.startedSync = true;
404 dumpFiberContext("starting (sync)");
405 run();
406 } else {
407 this.started = true;
408 dumpFiberContext("starting (async)");
409 owner.addRunnable(this);
410 }
411 }
412
413 /**
414 * Wakes up a suspended fiber.
415 * <p/>
416 * <p/>
417 * If a fiber was suspended without specifying the next {@link Tube},
418 * then the execution will be resumed in the response processing direction,
419 * by calling the {@link Tube#processResponse(Packet)} method on the next/first
420 * {@link Tube} in the {@link Fiber}'s processing stack with the specified resume
421 * packet as the parameter.
422 * <p/>
423 * <p/>
424 * If a fiber was suspended with specifying the next {@link Tube},
425 * then the execution will be resumed in the request processing direction,
426 * by calling the next tube's {@link Tube#processRequest(Packet)} method with the
427 * specified resume packet as the parameter.
428 * <p/>
429 * <p/>
430 * This method is implemented in a race-free way. Another thread can invoke
431 * this method even before this fiber goes into the suspension mode. So the caller
432 * need not worry about synchronizing {@link NextAction#suspend()} and this method.
433 *
434 * @param resumePacket packet used in the resumed processing
435 */
436 public void resume(@NotNull Packet resumePacket) {
437 resume(resumePacket, false);
438 }
439
440 /**
441 * Similar to resume(Packet) but allowing the Fiber to be resumed
442 * synchronously (in the current Thread). If you want to know when the
443 * fiber completes (not when this method returns) then add/wrap a
444 * CompletionCallback on this Fiber.
445 * For example, an asynchronous response endpoint that supports WS-ReliableMessaging
446 * including in-order message delivery may need to resume the Fiber synchronously
447 * until message order is confirmed prior to returning to asynchronous processing.
448 * @since 2.2.6
449 */
450 public void resume(@NotNull Packet resumePacket,
451 boolean forceSync) {
452 resume(resumePacket, forceSync, null);
453 }
454
455 /**
456 * Similar to resume(Packet, boolean) but allowing the Fiber to be resumed
457 * and at the same time atomically assign a new CompletionCallback to it.
458 * @since 2.2.6
459 */
460 public void resume(@NotNull Packet resumePacket,
461 boolean forceSync,
462 CompletionCallback callback) {
463 lock.lock();
464 try {
465 if (callback != null) {
466 setCompletionCallback(callback);
467 }
468 if(isTraceEnabled())
469 LOGGER.log(Level.FINE, "{0} resuming. Will have suspendedCount={1}", new Object[]{getName(), suspendedCount-1});
470 packet = resumePacket;
471 if( --suspendedCount == 0 ) {
472 if (!isInsideSuspendCallbacks) {
473 List<Listener> listeners = getCurrentListeners();
474 for (Listener listener: listeners) {
475 try {
476 listener.fiberResumed(this);
477 } catch (Throwable e) {
478 if (isTraceEnabled())
479 LOGGER.log(Level.FINE, "Listener {0} threw exception: {1}", new Object[]{listener, e.getMessage()});
480 }
481 }
482
483 if(synchronous) {
484 condition.signalAll();
485 } else if (forceSync || startedSync) {
486 run();
487 } else {
488 dumpFiberContext("resuming (async)");
489 owner.addRunnable(this);
490 }
491 }
492 } else {
493 if (isTraceEnabled()) {
494 LOGGER.log(Level.FINE, "{0} taking no action on resume because suspendedCount != 0: {1}", new Object[]{getName(), suspendedCount});
495 }
496 }
497 } finally {
498 lock.unlock();
499 }
500 }
501
502 /**
503 * Wakes up a suspended fiber and begins response processing.
504 * @since 2.2.6
505 */
506 public void resumeAndReturn(@NotNull Packet resumePacket,
507 boolean forceSync) {
508 if(isTraceEnabled())
509 LOGGER.log(Level.FINE, "{0} resumed with Return Packet", getName());
510 next = null;
511 resume(resumePacket, forceSync);
512 }
513
514 /**
515 * Wakes up a suspended fiber with an exception.
516 * <p/>
517 * <p/>
518 * The execution of the suspended fiber will be resumed in the response
519 * processing direction, by calling the {@link Tube#processException(Throwable)} method
520 * on the next/first {@link Tube} in the {@link Fiber}'s processing stack with
521 * the specified exception as the parameter.
522 * <p/>
523 * <p/>
524 * This method is implemented in a race-free way. Another thread can invoke
525 * this method even before this fiber goes into the suspension mode. So the caller
526 * need not worry about synchronizing {@link NextAction#suspend()} and this method.
527 *
528 * @param throwable exception that is used in the resumed processing
529 */
530 public void resume(@NotNull Throwable throwable) {
531 resume(throwable, packet, false);
532 }
533
534 /**
535 * Wakes up a suspended fiber with an exception.
536 * <p/>
537 * <p/>
538 * The execution of the suspended fiber will be resumed in the response
539 * processing direction, by calling the {@link Tube#processException(Throwable)} method
540 * on the next/first {@link Tube} in the {@link Fiber}'s processing stack with
541 * the specified exception as the parameter.
542 * <p/>
543 * <p/>
544 * This method is implemented in a race-free way. Another thread can invoke
545 * this method even before this fiber goes into the suspension mode. So the caller
546 * need not worry about synchronizing {@link NextAction#suspend()} and this method.
547 *
548 * @param throwable exception that is used in the resumed processing
549 * @param packet Packet that will be visible on the Fiber after the resume
550 * @since 2.2.8
551 */
552 public void resume(@NotNull Throwable throwable, @NotNull Packet packet) {
553 resume(throwable, packet, false);
554 }
555
556 /**
557 * Wakes up a suspend fiber with an exception.
558 *
559 * If forceSync is true, then the suspended fiber will resume with
560 * synchronous processing on the current thread. This will continue
561 * until some Tube indicates that it is safe to switch to asynchronous
562 * processing.
563 *
564 * @param error exception that is used in the resumed processing
565 * @param forceSync if processing begins synchronously
566 * @since 2.2.6
567 */
568 public void resume(@NotNull Throwable error,
569 boolean forceSync) {
570 resume(error, packet, forceSync);
571 }
572
573 /**
574 * Wakes up a suspend fiber with an exception.
575 *
576 * If forceSync is true, then the suspended fiber will resume with
577 * synchronous processing on the current thread. This will continue
578 * until some Tube indicates that it is safe to switch to asynchronous
579 * processing.
580 *
581 * @param error exception that is used in the resumed processing
582 * @param packet Packet that will be visible on the Fiber after the resume
583 * @param forceSync if processing begins synchronously
584 * @since 2.2.8
585 */
586 public void resume(@NotNull Throwable error,
587 @NotNull Packet packet,
588 boolean forceSync) {
589 if(isTraceEnabled())
590 LOGGER.log(Level.FINE, "{0} resumed with Return Throwable", getName());
591 next = null;
592 throwable = error;
593 resume(packet, forceSync);
594 }
595
596 /**
597 * Marks this Fiber as cancelled. A cancelled Fiber will never invoke its completion callback
598 * @param mayInterrupt if cancel should use {@link Thread#interrupt()}
599 * @see java.util.concurrent.Future#cancel(boolean)
600 * @since 2.2.6
601 */
602 @Override
603 public void cancel(boolean mayInterrupt) {
604 isCanceled = true;
605 if (mayInterrupt) {
606 // synchronized(this) is used as Thread running Fiber will be holding lock
607 synchronized(this) {
608 if (currentThread != null)
609 currentThread.interrupt();
610 }
611 }
612 }
613
614 /**
615 * Suspends this fiber's execution until the resume method is invoked.
616 * <p/>
617 * The call returns immediately, and when the fiber is resumed
618 * the execution picks up from the last scheduled continuation.
619 * @param onExitRunnable runnable to be invoked after fiber is marked for suspension
620 * @return if control loop must exit
621 */
622 private boolean suspend(Holder<Boolean> isRequireUnlock, Runnable onExitRunnable) {
623 if(isTraceEnabled()) {
624 LOGGER.log(Level.FINE, "{0} suspending. Will have suspendedCount={1}", new Object[]{getName(), suspendedCount+1});
625 if (suspendedCount > 0) {
626 LOGGER.log(Level.FINE, "WARNING - {0} suspended more than resumed. Will require more than one resume to actually resume this fiber.", getName());
627 }
628 }
629
630 List<Listener> listeners = getCurrentListeners();
631 if (++suspendedCount == 1) {
632 isInsideSuspendCallbacks = true;
633 try {
634 for (Listener listener: listeners) {
635 try {
636 listener.fiberSuspended(this);
637 } catch (Throwable e) {
638 if(isTraceEnabled())
639 LOGGER.log(Level.FINE, "Listener {0} threw exception: {1}", new Object[]{listener, e.getMessage()});
640 }
641 }
642 } finally {
643 isInsideSuspendCallbacks = false;
644 }
645 }
646
647 if (suspendedCount <= 0) {
648 // suspend callback caused fiber to resume
649 for (Listener listener: listeners) {
650 try {
651 listener.fiberResumed(this);
652 } catch (Throwable e) {
653 if(isTraceEnabled())
654 LOGGER.log(Level.FINE, "Listener {0} threw exception: {1}", new Object[]{listener, e.getMessage()});
655 }
656 }
657
658 } else if (onExitRunnable != null) {
659 // synchronous use cases cannot disconnect from the current thread
660 if (!synchronous) {
661 /* INTENTIONALLY UNLOCKING EARLY */
662 synchronized(this) {
663 // currentThread is protected by the monitor for this fiber so
664 // that it is accessible to cancel() even when the lock is held
665 currentThread = null;
666 }
667 lock.unlock();
668 assert(!lock.isHeldByCurrentThread());
669 isRequireUnlock.value = Boolean.FALSE;
670
671 try {
672 onExitRunnable.run();
673 } catch(Throwable t) {
674 throw new OnExitRunnableException(t);
675 }
676
677 return true;
678
679 } else {
680 // for synchronous we will stay with current thread, so do not disconnect
681 if (isTraceEnabled())
682 LOGGER.fine("onExitRunnable used with synchronous Fiber execution -- not exiting current thread");
683 onExitRunnable.run();
684 }
685 }
686
687 return false;
688 }
689
690 private static final class OnExitRunnableException extends RuntimeException {
691 private static final long serialVersionUID = 1L;
692
693 Throwable target;
694
695 public OnExitRunnableException(Throwable target) {
696 super((Throwable)null); // see pattern for InvocationTargetException
697 this.target = target;
698 }
699 }
700
701 /**
702 * Adds a new {@link FiberContextSwitchInterceptor} to this fiber.
703 * <p/>
704 * <p/>
705 * The newly installed fiber will take effect immediately after the current
706 * tube returns from its {@link Tube#processRequest(Packet)} or
707 * {@link Tube#processResponse(Packet)}, before the next tube begins processing.
708 * <p/>
709 * <p/>
710 * So when the tubeline consists of X and Y, and when X installs an interceptor,
711 * the order of execution will be as follows:
712 * <p/>
713 * <ol>
714 * <li>X.processRequest()
715 * <li>interceptor gets installed
716 * <li>interceptor.execute() is invoked
717 * <li>Y.processRequest()
718 * </ol>
719 */
720 public synchronized void addInterceptor(@NotNull FiberContextSwitchInterceptor interceptor) {
721 if (interceptors == null) {
722 interceptors = new ArrayList<FiberContextSwitchInterceptor>();
723 } else {
724 List<FiberContextSwitchInterceptor> l = new ArrayList<FiberContextSwitchInterceptor>();
725 l.addAll(interceptors);
726 interceptors = l;
727 }
728 interceptors.add(interceptor);
729 }
730
731 /**
732 * Removes a {@link FiberContextSwitchInterceptor} from this fiber.
733 * <p/>
734 * <p/>
735 * The removal of the interceptor takes effect immediately after the current
736 * tube returns from its {@link Tube#processRequest(Packet)} or
737 * {@link Tube#processResponse(Packet)}, before the next tube begins processing.
738 * <p/>
739 * <p/>
740 * <p/>
741 * So when the tubeline consists of X and Y, and when Y uninstalls an interceptor
742 * on the way out, then the order of execution will be as follows:
743 * <p/>
744 * <ol>
745 * <li>Y.processResponse() (notice that this happens with interceptor.execute() in the callstack)
746 * <li>interceptor gets uninstalled
747 * <li>interceptor.execute() returns
748 * <li>X.processResponse()
749 * </ol>
750 *
751 * @return true if the specified interceptor was removed. False if
752 * the specified interceptor was not registered with this fiber to begin with.
753 */
754 public synchronized boolean removeInterceptor(@NotNull FiberContextSwitchInterceptor interceptor) {
755 if (interceptors != null) {
756 boolean result = interceptors.remove(interceptor);
757 if (interceptors.isEmpty())
758 interceptors = null;
759 else {
760 List<FiberContextSwitchInterceptor> l = new ArrayList<FiberContextSwitchInterceptor>();
761 l.addAll(interceptors);
762 interceptors = l;
763 }
764 return result;
765 }
766 return false;
767 }
768
769 /**
770 * Gets the context {@link ClassLoader} of this fiber.
771 */
772 public
773 @Nullable
774 ClassLoader getContextClassLoader() {
775 return contextClassLoader;
776 }
777
778 /**
779 * Sets the context {@link ClassLoader} of this fiber.
780 */
781 public ClassLoader setContextClassLoader(@Nullable ClassLoader contextClassLoader) {
782 ClassLoader r = this.contextClassLoader;
783 this.contextClassLoader = contextClassLoader;
784 return r;
785 }
786
787 /**
788 * DO NOT CALL THIS METHOD. This is an implementation detail
789 * of {@link Fiber}.
790 */
791 @Deprecated
792 @Override
793 public void run() {
794 Container old = ContainerResolver.getDefault().enterContainer(owner.getContainer());
795 try {
796 assert !synchronous;
797 // doRun returns true to indicate an early exit from fiber processing
798 if (!doRun()) {
799 if (startedSync && suspendedCount == 0 &&
800 (next != null || contsSize > 0)) {
801 // We bailed out of running this fiber we started as sync, and now
802 // want to finish running it async
803 startedSync = false;
804 // Start back up as an async fiber
805 dumpFiberContext("restarting (async) after startSync");
806 owner.addRunnable(this);
807 } else {
808 completionCheck();
809 }
810 }
811 } finally {
812 ContainerResolver.getDefault().exitContainer(old);
813 }
814 }
815
816 /**
817 * Runs a given {@link Tube} (and everything thereafter) synchronously.
818 * <p/>
819 * <p/>
820 * This method blocks and returns only when all the successive {@link Tube}s
821 * complete their request/response processing. This method can be used
822 * if a {@link Tube} needs to fallback to synchronous processing.
823 * <p/>
824 * <h3>Example:</h3>
825 * <pre>
826 * class FooTube extends {@link AbstractFilterTubeImpl} {
827 * NextAction processRequest(Packet request) {
828 * // run everything synchronously and return with the response packet
829 * return doReturnWith(Fiber.current().runSync(next,request));
830 * }
831 * NextAction processResponse(Packet response) {
832 * // never be invoked
833 * }
834 * }
835 * </pre>
836 *
837 * @param tubeline The first tube of the tubeline that will act on the packet.
838 * @param request The request packet to be passed to <tt>startPoint.processRequest()</tt>.
839 * @return The response packet to the <tt>request</tt>.
840 * @see #start(Tube, Packet, CompletionCallback)
841 */
842 public
843 @NotNull
844 Packet runSync(@NotNull Tube tubeline, @NotNull Packet request) {
845 lock.lock();
846 try {
847 // save the current continuation, so that we return runSync() without executing them.
848 final Tube[] oldCont = conts;
849 final int oldContSize = contsSize;
850 final boolean oldSynchronous = synchronous;
851 final Tube oldNext = next;
852
853 if (oldContSize > 0) {
854 conts = new Tube[16];
855 contsSize = 0;
856 }
857
858 try {
859 synchronous = true;
860 this.packet = request;
861 next = tubeline;
862 doRun();
863 if (throwable != null) {
864 if (isDeliverThrowableInPacket) {
865 packet.addSatellite(new ThrowableContainerPropertySet(throwable));
866 } else {
867 if (throwable instanceof RuntimeException) {
868 throw (RuntimeException) throwable;
869 }
870 if (throwable instanceof Error) {
871 throw (Error) throwable;
872 }
873 // our system is supposed to only accept Error or RuntimeException
874 throw new AssertionError(throwable);
875 }
876 }
877 return this.packet;
878 } finally {
879 conts = oldCont;
880 contsSize = oldContSize;
881 synchronous = oldSynchronous;
882 next = oldNext;
883 if(interrupted) {
884 Thread.currentThread().interrupt();
885 interrupted = false;
886 }
887 if(!started && !startedSync)
888 completionCheck();
889 }
890 } finally {
891 lock.unlock();
892 }
893 }
894
895 private void completionCheck() {
896 lock.lock();
897 try {
898 // Don't trigger completion and callbacks if fiber is suspended
899 if(!isCanceled && contsSize==0 && suspendedCount == 0) {
900 if(isTraceEnabled())
901 LOGGER.log(Level.FINE, "{0} completed", getName());
902 clearListeners();
903 condition.signalAll();
904 if (completionCallback != null) {
905 if (throwable != null) {
906 if (isDeliverThrowableInPacket) {
907 packet.addSatellite(new ThrowableContainerPropertySet(throwable));
908 completionCallback.onCompletion(packet);
909 } else
910 completionCallback.onCompletion(throwable);
911 } else
912 completionCallback.onCompletion(packet);
913 }
914 }
915 } finally {
916 lock.unlock();
917 }
918 }
919
920 /**
921 * Invokes all registered {@link InterceptorHandler}s and then call into
922 * {@link Fiber#__doRun()}.
923 */
924 private class InterceptorHandler implements FiberContextSwitchInterceptor.Work<Tube, Tube> {
925 private final Holder<Boolean> isUnlockRequired;
926 private final List<FiberContextSwitchInterceptor> ints;
927
928 /**
929 * Index in {@link Fiber#interceptors} to invoke next.
930 */
931 private int idx;
932
933 public InterceptorHandler(Holder<Boolean> isUnlockRequired, List<FiberContextSwitchInterceptor> ints) {
934 this.isUnlockRequired = isUnlockRequired;
935 this.ints = ints;
936 }
937
938 /**
939 * Initiate the interception, and eventually invokes {@link Fiber#__doRun()}.
940 */
941 Tube invoke(Tube next) {
942 idx = 0;
943 return execute(next);
944 }
945
946 @Override
947 public Tube execute(Tube next) {
948 if (idx == ints.size()) {
949 Fiber.this.next = next;
950 if (__doRun(isUnlockRequired, ints))
951 return PLACEHOLDER;
952 } else {
953 FiberContextSwitchInterceptor interceptor = ints.get(idx++);
954 return interceptor.execute(Fiber.this, next, this);
955 }
956 return Fiber.this.next;
957 }
958 }
959
960 private static final PlaceholderTube PLACEHOLDER = new PlaceholderTube();
961
962 private static class PlaceholderTube extends AbstractTubeImpl {
963
964 @Override
965 public NextAction processRequest(Packet request) {
966 throw new UnsupportedOperationException();
967 }
968
969 @Override
970 public NextAction processResponse(Packet response) {
971 throw new UnsupportedOperationException();
972 }
973
974 @Override
975 public NextAction processException(Throwable t) {
976 return doThrow(t);
977 }
978
979 @Override
980 public void preDestroy() {
981 }
982
983 @Override
984 public PlaceholderTube copy(TubeCloner cloner) {
985 throw new UnsupportedOperationException();
986 }
987 }
988
989 /**
990 * Executes the fiber as much as possible.
991 *
992 */
993 private boolean doRun() {
994 dumpFiberContext("running");
995
996 if (serializeExecution) {
997 serializedExecutionLock.lock();
998 try {
999 return _doRun(next);
1000 } finally {
1001 serializedExecutionLock.unlock();
1002 }
1003 } else {
1004 return _doRun(next);
1005 }
1006 }
1007
1008 private boolean _doRun(Tube next) {
1009 // isRequireUnlock will contain Boolean.FALSE when lock has already been released in suspend
1010 Holder<Boolean> isRequireUnlock = new Holder<Boolean>(Boolean.TRUE);
1011 lock.lock();
1012 try {
1013 List<FiberContextSwitchInterceptor> ints;
1014 ClassLoader old;
1015 synchronized(this) {
1016 ints = interceptors;
1017
1018 // currentThread is protected by the monitor for this fiber so
1019 // that it is accessible to cancel() even when the lock is held
1020 currentThread = Thread.currentThread();
1021 if (isTraceEnabled()) {
1022 LOGGER.log(Level.FINE, "Thread entering _doRun(): {0}", currentThread);
1023 }
1024
1025 old = currentThread.getContextClassLoader();
1026 currentThread.setContextClassLoader(contextClassLoader);
1027 }
1028
1029 try {
1030 boolean needsToReenter;
1031 do {
1032 // if interceptors are set, go through the interceptors.
1033 if (ints == null) {
1034 this.next = next;
1035 if (__doRun(isRequireUnlock, null /*ints*/)) {
1036 return true;
1037 }
1038 } else {
1039 next = new InterceptorHandler(isRequireUnlock, ints).invoke(next);
1040 if (next == PLACEHOLDER) {
1041 return true;
1042 }
1043 }
1044
1045 synchronized(this) {
1046 needsToReenter = (ints != interceptors);
1047 if (needsToReenter)
1048 ints = interceptors;
1049 }
1050 } while (needsToReenter);
1051 } catch(OnExitRunnableException o) {
1052 // catching this exception indicates onExitRunnable in suspend() threw.
1053 // we must still avoid double unlock
1054 Throwable t = o.target;
1055 if (t instanceof WebServiceException)
1056 throw (WebServiceException) t;
1057 throw new WebServiceException(t);
1058 } finally {
1059 // don't reference currentThread here because fiber processing
1060 // may already be running on a different thread (Note: isAlreadyExited
1061 // tracks this state
1062 Thread thread = Thread.currentThread();
1063 thread.setContextClassLoader(old);
1064 if (isTraceEnabled()) {
1065 LOGGER.log(Level.FINE, "Thread leaving _doRun(): {0}", thread);
1066 }
1067 }
1068
1069 return false;
1070 } finally {
1071 if (isRequireUnlock.value) {
1072 synchronized(this) {
1073 currentThread = null;
1074 }
1075 lock.unlock();
1076 }
1077 }
1078 }
1079
1080 /**
1081 * To be invoked from {@link #doRun()}.
1082 *
1083 * @see #doRun()
1084 */
1085 private boolean __doRun(Holder<Boolean> isRequireUnlock, List<FiberContextSwitchInterceptor> originalInterceptors) {
1086 assert(lock.isHeldByCurrentThread());
1087
1088 final Fiber old = CURRENT_FIBER.get();
1089 CURRENT_FIBER.set(this);
1090
1091 // if true, lots of debug messages to show what's being executed
1092 final boolean traceEnabled = LOGGER.isLoggable(Level.FINER);
1093
1094 try {
1095 boolean abortResponse = false;
1096 while(isReady(originalInterceptors)) {
1097 if (isCanceled) {
1098 next = null;
1099 throwable = null;
1100 contsSize = 0;
1101 break;
1102 }
1103
1104 try {
1105 NextAction na;
1106 Tube last;
1107 if(throwable!=null) {
1108 if(contsSize==0 || abortResponse) {
1109 contsSize = 0; // abortResponse case
1110 // nothing else to execute. we are done.
1111 return false;
1112 }
1113 last = popCont();
1114 if (traceEnabled)
1115 LOGGER.log(Level.FINER, "{0} {1}.processException({2})", new Object[]{getName(), last, throwable});
1116 na = last.processException(throwable);
1117 } else {
1118 if(next!=null) {
1119 if(traceEnabled)
1120 LOGGER.log(Level.FINER, "{0} {1}.processRequest({2})", new Object[]{getName(), next, packet != null ? "Packet@"+Integer.toHexString(packet.hashCode()) : "null"});
1121 na = next.processRequest(packet);
1122 last = next;
1123 } else {
1124 if(contsSize==0 || abortResponse) {
1125 // nothing else to execute. we are done.
1126 contsSize = 0;
1127 return false;
1128 }
1129 last = popCont();
1130 if(traceEnabled)
1131 LOGGER.log(Level.FINER, "{0} {1}.processResponse({2})", new Object[]{getName(), last, packet != null ? "Packet@"+Integer.toHexString(packet.hashCode()) : "null"});
1132 na = last.processResponse(packet);
1133 }
1134 }
1135
1136 if (traceEnabled)
1137 LOGGER.log(Level.FINER, "{0} {1} returned with {2}", new Object[]{getName(), last, na});
1138
1139 // If resume is called before suspend, then make sure
1140 // resume(Packet) is not lost
1141 if (na.kind != NextAction.SUSPEND) {
1142 // preserve in-flight packet so that processException may inspect
1143 if (na.kind != NextAction.THROW &&
1144 na.kind != NextAction.THROW_ABORT_RESPONSE)
1145 packet = na.packet;
1146 throwable = na.throwable;
1147 }
1148
1149 switch(na.kind) {
1150 case NextAction.INVOKE:
1151 case NextAction.INVOKE_ASYNC:
1152 pushCont(last);
1153 // fall through next
1154 case NextAction.INVOKE_AND_FORGET:
1155 next = na.next;
1156 if (na.kind == NextAction.INVOKE_ASYNC
1157 && startedSync) {
1158 // Break out here
1159 return false;
1160 }
1161 break;
1162 case NextAction.THROW_ABORT_RESPONSE:
1163 case NextAction.ABORT_RESPONSE:
1164 abortResponse = true;
1165 if (isTraceEnabled()) {
1166 LOGGER.log(Level.FINE, "Fiber {0} is aborting a response due to exception: {1}", new Object[]{this, na.throwable});
1167 }
1168 case NextAction.RETURN:
1169 case NextAction.THROW:
1170 next = null;
1171 break;
1172 case NextAction.SUSPEND:
1173 if (next != null) {
1174 // Only store the 'last' tube when we're processing
1175 // a request, since conts array is for processResponse
1176 pushCont(last);
1177 }
1178 next = na.next;
1179 if(suspend(isRequireUnlock, na.onExitRunnable))
1180 return true; // explicitly exiting control loop
1181 break;
1182 default:
1183 throw new AssertionError();
1184 }
1185 } catch (RuntimeException t) {
1186 if (traceEnabled)
1187 LOGGER.log(Level.FINER, getName() + " Caught " + t + ". Start stack unwinding", t);
1188 throwable = t;
1189 } catch (Error t) {
1190 if (traceEnabled)
1191 LOGGER.log(Level.FINER, getName() + " Caught " + t + ". Start stack unwinding", t);
1192 throwable = t;
1193 }
1194
1195 dumpFiberContext("After tube execution");
1196 }
1197
1198 // there's nothing we can execute right away.
1199 // we'll be back when this fiber is resumed.
1200
1201 } finally {
1202 CURRENT_FIBER.set(old);
1203 }
1204
1205 return false;
1206 }
1207
1208 private void pushCont(Tube tube) {
1209 conts[contsSize++] = tube;
1210
1211 // expand if needed
1212 int len = conts.length;
1213 if (contsSize == len) {
1214 Tube[] newBuf = new Tube[len * 2];
1215 System.arraycopy(conts, 0, newBuf, 0, len);
1216 conts = newBuf;
1217 }
1218 }
1219
1220 private Tube popCont() {
1221 return conts[--contsSize];
1222 }
1223
1224 private Tube peekCont() {
1225 int index = contsSize - 1;
1226 if (index >= 0 && index < conts.length) {
1227 return conts[index];
1228 } else {
1229 return null;
1230 }
1231 }
1232
1233 /**
1234 * Only to be used by Tubes that manipulate the Fiber to create alternate flows
1235 * @since 2.2.6
1236 */
1237 public void resetCont(Tube[] conts, int contsSize) {
1238 this.conts = conts;
1239 this.contsSize = contsSize;
1240 }
1241
1242 /**
1243 * Returns true if the fiber is ready to execute.
1244 */
1245 private boolean isReady(List<FiberContextSwitchInterceptor> originalInterceptors) {
1246 if (synchronous) {
1247 while (suspendedCount == 1)
1248 try {
1249 if (isTraceEnabled()) {
1250 LOGGER.log(Level.FINE, "{0} is blocking thread {1}", new Object[]{getName(), Thread.currentThread().getName()});
1251 }
1252 condition.await(); // the synchronized block is the whole runSync method.
1253 } catch (InterruptedException e) {
1254 // remember that we are interrupted, but don't respond to it
1255 // right away. This behavior is in line with what happens
1256 // when you are actually running the whole thing synchronously.
1257 interrupted = true;
1258 }
1259
1260 synchronized(this) {
1261 return interceptors == originalInterceptors;
1262 }
1263 }
1264 else {
1265 if (suspendedCount>0)
1266 return false;
1267 synchronized(this) {
1268 return interceptors == originalInterceptors;
1269 }
1270 }
1271 }
1272
1273 private String getName() {
1274 return "engine-" + owner.id + "fiber-" + id;
1275 }
1276
1277 @Override
1278 public String toString() {
1279 return getName();
1280 }
1281
1282 /**
1283 * Gets the current {@link Packet} associated with this fiber.
1284 * <p/>
1285 * <p/>
1286 * This method returns null if no packet has been associated with the fiber yet.
1287 */
1288 public
1289 @Nullable
1290 Packet getPacket() {
1291 return packet;
1292 }
1293
1294 /**
1295 * Returns completion callback associated with this Fiber
1296 * @return Completion callback
1297 * @since 2.2.6
1298 */
1299 public CompletionCallback getCompletionCallback() {
1300 return completionCallback;
1301 }
1302
1303 /**
1304 * Updates completion callback associated with this Fiber
1305 * @param completionCallback Completion callback
1306 * @since 2.2.6
1307 */
1308 public void setCompletionCallback(CompletionCallback completionCallback) {
1309 this.completionCallback = completionCallback;
1310 }
1311
1312 /**
1313 * (ADVANCED) Returns true if the current fiber is being executed synchronously.
1314 * <p/>
1315 * <p/>
1316 * Fiber may run synchronously for various reasons. Perhaps this is
1317 * on client side and application has invoked a synchronous method call.
1318 * Perhaps this is on server side and we have deployed on a synchronous
1319 * transport (like servlet.)
1320 * <p/>
1321 * <p/>
1322 * When a fiber is run synchronously (IOW by {@link #runSync(Tube, Packet)}),
1323 * further invocations to {@link #runSync(Tube, Packet)} can be done
1324 * without degrading the performance.
1325 * <p/>
1326 * <p/>
1327 * So this value can be used as a further optimization hint for
1328 * advanced {@link Tube}s to choose the best strategy to invoke
1329 * the next {@link Tube}. For example, a tube may want to install
1330 * a {@link FiberContextSwitchInterceptor} if running async, yet
1331 * it might find it faster to do {@link #runSync(Tube, Packet)}
1332 * if it's already running synchronously.
1333 */
1334 public static boolean isSynchronous() {
1335 return current().synchronous;
1336 }
1337
1338 /**
1339 * Returns true if the current Fiber on the current thread was started
1340 * synchronously. Note, this is not strictly the same as being synchronous
1341 * because the assumption is that the Fiber will ultimately be dispatched
1342 * asynchronously, possibly have a completion callback associated with it, etc.
1343 * Note, the 'startedSync' flag is cleared once the current Fiber is
1344 * converted to running asynchronously.
1345 * @since 2.2.6
1346 */
1347 public boolean isStartedSync() {
1348 return startedSync;
1349 }
1350
1351 /**
1352 * Gets the current fiber that's running.
1353 * <p/>
1354 * <p/>
1355 * This works like {@link Thread#currentThread()}.
1356 * This method only works when invoked from {@link Tube}.
1357 */
1358 public static
1359 @NotNull
1360 @SuppressWarnings({"null", "ConstantConditions"})
1361 Fiber current() {
1362 Fiber fiber = CURRENT_FIBER.get();
1363 if (fiber == null)
1364 throw new IllegalStateException("Can be only used from fibers");
1365 return fiber;
1366 }
1367
1368 /**
1369 * Gets the current fiber that's running, if set.
1370 */
1371 public static Fiber getCurrentIfSet() {
1372 return CURRENT_FIBER.get();
1373 }
1374
1375 private static final ThreadLocal<Fiber> CURRENT_FIBER = new ThreadLocal<Fiber>();
1376
1377 /**
1378 * Used to allocate unique number for each fiber.
1379 */
1380 private static final AtomicInteger iotaGen = new AtomicInteger();
1381
1382 private static boolean isTraceEnabled() {
1383 return LOGGER.isLoggable(Level.FINE);
1384 }
1385
1386 private static final Logger LOGGER = Logger.getLogger(Fiber.class.getName());
1387
1388
1389 private static final ReentrantLock serializedExecutionLock = new ReentrantLock();
1390
1391 /**
1392 * Set this boolean to true to execute fibers sequentially one by one.
1393 * See class javadoc.
1394 */
1395 public static volatile boolean serializeExecution = Boolean.getBoolean(Fiber.class.getName() + ".serialize");
1396
1397 private final Set<Component> components = new CopyOnWriteArraySet<Component>();
1398
1399 @Override
1400 public <S> S getSPI(Class<S> spiType) {
1401 for (Component c : components) {
1402 S spi = c.getSPI(spiType);
1403 if (spi != null) {
1404 return spi;
1405 }
1406 }
1407 return null;
1408 }
1409
1410 @Override
1411 public Set<Component> getComponents() {
1412 return components;
1413 }
1414 }

mercurial