src/share/classes/com/sun/corba/se/impl/encoding/BufferManagerReadStream.java

Wed, 27 Apr 2016 01:21:28 +0800

author
aoqi
date
Wed, 27 Apr 2016 01:21:28 +0800
changeset 0
7ef37b2cdcad
child 748
6845b95cba6b
permissions
-rw-r--r--

Initial load
http://hg.openjdk.java.net/jdk8u/jdk8u/corba/
changeset: 765:f46df0af2ca8
tag: jdk8u25-b17

     1 /*
     2  * Copyright (c) 2000, 2009, Oracle and/or its affiliates. All rights reserved.
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     4  *
     5  * This code is free software; you can redistribute it and/or modify it
     6  * under the terms of the GNU General Public License version 2 only, as
     7  * published by the Free Software Foundation.  Oracle designates this
     8  * particular file as subject to the "Classpath" exception as provided
     9  * by Oracle in the LICENSE file that accompanied this code.
    10  *
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
    14  * version 2 for more details (a copy is included in the LICENSE file that
    15  * accompanied this code).
    16  *
    17  * You should have received a copy of the GNU General Public License version
    18  * 2 along with this work; if not, write to the Free Software Foundation,
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
    20  *
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
    22  * or visit www.oracle.com if you need additional information or have any
    23  * questions.
    24  */
    25 package com.sun.corba.se.impl.encoding;
    27 import java.nio.ByteBuffer;
    28 import com.sun.corba.se.pept.transport.ByteBufferPool;
    29 import com.sun.corba.se.spi.logging.CORBALogDomains;
    30 import com.sun.corba.se.spi.orb.ORB;
    31 import com.sun.corba.se.impl.logging.ORBUtilSystemException;
    32 import com.sun.corba.se.impl.orbutil.ORBUtility;
    33 import com.sun.corba.se.impl.protocol.RequestCanceledException;
    34 import com.sun.corba.se.impl.protocol.giopmsgheaders.FragmentMessage;
    35 import com.sun.corba.se.impl.protocol.giopmsgheaders.Message;
    36 import java.util.*;
    38 public class BufferManagerReadStream
    39     implements BufferManagerRead, MarkAndResetHandler
    40 {
    41     private boolean receivedCancel = false;
    42     private int cancelReqId = 0;
    44     // We should convert endOfStream to a final static dummy end node
    45     private boolean endOfStream = true;
    46     private BufferQueue fragmentQueue = new BufferQueue();
    47     private long FRAGMENT_TIMEOUT = 60000;
    49     // REVISIT - This should go in BufferManagerRead. But, since
    50     //           BufferManagerRead is an interface. BufferManagerRead
    51     //           might ought to be an abstract class instead of an
    52     //           interface.
    53     private ORB orb ;
    54     private ORBUtilSystemException wrapper ;
    55     private boolean debug = false;
    57     BufferManagerReadStream( ORB orb )
    58     {
    59         this.orb = orb ;
    60         this.wrapper = ORBUtilSystemException.get( orb,
    61             CORBALogDomains.RPC_ENCODING ) ;
    62         debug = orb.transportDebugFlag;
    63     }
    65     public void cancelProcessing(int requestId) {
    66         synchronized(fragmentQueue) {
    67             receivedCancel = true;
    68             cancelReqId = requestId;
    69             fragmentQueue.notify();
    70         }
    71     }
    73     public void processFragment(ByteBuffer byteBuffer, FragmentMessage msg)
    74     {
    75         ByteBufferWithInfo bbwi =
    76             new ByteBufferWithInfo(orb, byteBuffer, msg.getHeaderLength());
    78         synchronized (fragmentQueue) {
    79             if (debug)
    80             {
    81                 // print address of ByteBuffer being queued
    82                 int bbAddress = System.identityHashCode(byteBuffer);
    83                 StringBuffer sb = new StringBuffer(80);
    84                 sb.append("processFragment() - queueing ByteBuffer id (");
    85                 sb.append(bbAddress).append(") to fragment queue.");
    86                 String strMsg = sb.toString();
    87                 dprint(strMsg);
    88             }
    89             fragmentQueue.enqueue(bbwi);
    90             endOfStream = !msg.moreFragmentsToFollow();
    91             fragmentQueue.notify();
    92         }
    93     }
    95     public ByteBufferWithInfo underflow (ByteBufferWithInfo bbwi)
    96     {
    98       ByteBufferWithInfo result = null;
   100       try {
   101           //System.out.println("ENTER underflow");
   103         synchronized (fragmentQueue) {
   105             if (receivedCancel) {
   106                 throw new RequestCanceledException(cancelReqId);
   107             }
   109             while (fragmentQueue.size() == 0) {
   111                 if (endOfStream) {
   112                     throw wrapper.endOfStream() ;
   113                 }
   115                 boolean interrupted = false;
   116                 try {
   117                     fragmentQueue.wait(FRAGMENT_TIMEOUT);
   118                 } catch (InterruptedException e) {
   119                     interrupted = true;
   120                 }
   122                 if (!interrupted && fragmentQueue.size() == 0) {
   123                     throw wrapper.bufferReadManagerTimeout();
   124                 }
   126                 if (receivedCancel) {
   127                     throw new RequestCanceledException(cancelReqId);
   128                 }
   129             }
   131             result = fragmentQueue.dequeue();
   132             result.fragmented = true;
   134             if (debug)
   135             {
   136                 // print address of ByteBuffer being dequeued
   137                 int bbAddr = System.identityHashCode(result.byteBuffer);
   138                 StringBuffer sb1 = new StringBuffer(80);
   139                 sb1.append("underflow() - dequeued ByteBuffer id (");
   140                 sb1.append(bbAddr).append(") from fragment queue.");
   141                 String msg1 = sb1.toString();
   142                 dprint(msg1);
   143             }
   145             // VERY IMPORTANT
   146             // Release bbwi.byteBuffer to the ByteBufferPool only if
   147             // this BufferManagerStream is not marked for potential restore.
   148             if (markEngaged == false && bbwi != null && bbwi.byteBuffer != null)
   149             {
   150                 ByteBufferPool byteBufferPool = getByteBufferPool();
   152                 if (debug)
   153                 {
   154                     // print address of ByteBuffer being released
   155                     int bbAddress = System.identityHashCode(bbwi.byteBuffer);
   156                     StringBuffer sb = new StringBuffer(80);
   157                     sb.append("underflow() - releasing ByteBuffer id (");
   158                     sb.append(bbAddress).append(") to ByteBufferPool.");
   159                     String msg = sb.toString();
   160                     dprint(msg);
   161                 }
   163                 byteBufferPool.releaseByteBuffer(bbwi.byteBuffer);
   164                 bbwi.byteBuffer = null;
   165                 bbwi = null;
   166             }
   167         }
   168         return result;
   169       } finally {
   170           //System.out.println("EXIT underflow");
   171       }
   172     }
   174     public void init(Message msg) {
   175         if (msg != null)
   176             endOfStream = !msg.moreFragmentsToFollow();
   177     }
   179     // Release any queued ByteBufferWithInfo's byteBuffers to the
   180     // ByteBufferPoool
   181     public void close(ByteBufferWithInfo bbwi)
   182     {
   183         int inputBbAddress = 0;
   185         // release ByteBuffers on fragmentQueue
   186         if (fragmentQueue != null)
   187         {
   188             synchronized (fragmentQueue)
   189             {
   190                 // IMPORTANT: The fragment queue may have one ByteBuffer
   191                 //            on it that's also on the CDRInputStream if
   192                 //            this method is called when the stream is 'marked'.
   193                 //            Thus, we'll compare the ByteBuffer passed
   194                 //            in (from a CDRInputStream) with all ByteBuffers
   195                 //            on the stack. If one is found to equal, it will
   196                 //            not be released to the ByteBufferPool.
   197                 if (bbwi != null)
   198                 {
   199                     inputBbAddress = System.identityHashCode(bbwi.byteBuffer);
   200                 }
   202                 ByteBufferWithInfo abbwi = null;
   203                 ByteBufferPool byteBufferPool = getByteBufferPool();
   204                 while (fragmentQueue.size() != 0)
   205                 {
   206                     abbwi = fragmentQueue.dequeue();
   207                     if (abbwi != null && abbwi.byteBuffer != null)
   208                     {
   209                         int bbAddress = System.identityHashCode(abbwi.byteBuffer);
   210                         if (inputBbAddress != bbAddress)
   211                         {
   212                             if (debug)
   213                             {
   214                                  // print address of ByteBuffer released
   215                                  StringBuffer sb = new StringBuffer(80);
   216                                  sb.append("close() - fragmentQueue is ")
   217                                    .append("releasing ByteBuffer id (")
   218                                    .append(bbAddress).append(") to ")
   219                                    .append("ByteBufferPool.");
   220                                  String msg = sb.toString();
   221                                  dprint(msg);
   222                             }
   223                         }
   224                         byteBufferPool.releaseByteBuffer(abbwi.byteBuffer);
   225                     }
   226                 }
   227             }
   228             fragmentQueue = null;
   229         }
   231         // release ByteBuffers on fragmentStack
   232         if (fragmentStack != null && fragmentStack.size() != 0)
   233         {
   234             // IMPORTANT: The fragment stack may have one ByteBuffer
   235             //            on it that's also on the CDRInputStream if
   236             //            this method is called when the stream is 'marked'.
   237             //            Thus, we'll compare the ByteBuffer passed
   238             //            in (from a CDRInputStream) with all ByteBuffers
   239             //            on the stack. If one is found to equal, it will
   240             //            not be released to the ByteBufferPool.
   241             if (bbwi != null)
   242             {
   243                 inputBbAddress = System.identityHashCode(bbwi.byteBuffer);
   244             }
   246             ByteBufferWithInfo abbwi = null;
   247             ByteBufferPool byteBufferPool = getByteBufferPool();
   248             ListIterator itr = fragmentStack.listIterator();
   249             while (itr.hasNext())
   250             {
   251                 abbwi = (ByteBufferWithInfo)itr.next();
   253                 if (abbwi != null && abbwi.byteBuffer != null)
   254                 {
   255                    int bbAddress = System.identityHashCode(abbwi.byteBuffer);
   256                    if (inputBbAddress != bbAddress)
   257                    {
   258                        if (debug)
   259                        {
   260                             // print address of ByteBuffer being released
   261                             StringBuffer sb = new StringBuffer(80);
   262                             sb.append("close() - fragmentStack - releasing ")
   263                               .append("ByteBuffer id (" + bbAddress + ") to ")
   264                               .append("ByteBufferPool.");
   265                             String msg = sb.toString();
   266                             dprint(msg);
   267                        }
   268                        byteBufferPool.releaseByteBuffer(abbwi.byteBuffer);
   269                    }
   270                 }
   271             }
   272             fragmentStack = null;
   273         }
   275     }
   277     protected ByteBufferPool getByteBufferPool()
   278     {
   279         return orb.getByteBufferPool();
   280     }
   282     private void dprint(String msg)
   283     {
   284         ORBUtility.dprint("BufferManagerReadStream", msg);
   285     }
   287     // Mark and reset handler ----------------------------------------
   289     private boolean markEngaged = false;
   291     // List of fragment ByteBufferWithInfos received since
   292     // the mark was engaged.
   293     private LinkedList fragmentStack = null;
   294     private RestorableInputStream inputStream = null;
   296     // Original state of the stream
   297     private Object streamMemento = null;
   299     public void mark(RestorableInputStream inputStream)
   300     {
   301         this.inputStream = inputStream;
   302         markEngaged = true;
   304         // Get the magic Object that the stream will use to
   305         // reconstruct it's state when reset is called
   306         streamMemento = inputStream.createStreamMemento();
   308         if (fragmentStack != null) {
   309             fragmentStack.clear();
   310         }
   311     }
   313     // Collects fragments received since the mark was engaged.
   314     public void fragmentationOccured(ByteBufferWithInfo newFragment)
   315     {
   316         if (!markEngaged)
   317             return;
   319         if (fragmentStack == null)
   320             fragmentStack = new LinkedList();
   322         fragmentStack.addFirst(new ByteBufferWithInfo(newFragment));
   323     }
   325     public void reset()
   326     {
   327         if (!markEngaged) {
   328             // REVISIT - call to reset without call to mark
   329             return;
   330         }
   332         markEngaged = false;
   334         // If we actually did peek across fragments, we need
   335         // to push those fragments onto the front of the
   336         // buffer queue.
   337         if (fragmentStack != null && fragmentStack.size() != 0) {
   338             ListIterator iter = fragmentStack.listIterator();
   340             synchronized(fragmentQueue) {
   341                 while (iter.hasNext()) {
   342                     fragmentQueue.push((ByteBufferWithInfo)iter.next());
   343                 }
   344             }
   346             fragmentStack.clear();
   347         }
   349         // Give the stream the magic Object to restore
   350         // it's state.
   351         inputStream.restoreInternalState(streamMemento);
   352     }
   354     public MarkAndResetHandler getMarkAndResetHandler() {
   355         return this;
   356     }
   357 }

mercurial