Wed, 27 Apr 2016 01:21:28 +0800
Initial load
http://hg.openjdk.java.net/jdk8u/jdk8u/corba/
changeset: 765:f46df0af2ca8
tag: jdk8u25-b17
1 /*
2 * Copyright (c) 2000, 2009, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package com.sun.corba.se.impl.encoding;
27 import java.nio.ByteBuffer;
28 import com.sun.corba.se.pept.transport.ByteBufferPool;
29 import com.sun.corba.se.spi.logging.CORBALogDomains;
30 import com.sun.corba.se.spi.orb.ORB;
31 import com.sun.corba.se.impl.logging.ORBUtilSystemException;
32 import com.sun.corba.se.impl.orbutil.ORBUtility;
33 import com.sun.corba.se.impl.protocol.RequestCanceledException;
34 import com.sun.corba.se.impl.protocol.giopmsgheaders.FragmentMessage;
35 import com.sun.corba.se.impl.protocol.giopmsgheaders.Message;
36 import java.util.*;
38 public class BufferManagerReadStream
39 implements BufferManagerRead, MarkAndResetHandler
40 {
41 private boolean receivedCancel = false;
42 private int cancelReqId = 0;
44 // We should convert endOfStream to a final static dummy end node
45 private boolean endOfStream = true;
46 private BufferQueue fragmentQueue = new BufferQueue();
47 private long FRAGMENT_TIMEOUT = 60000;
49 // REVISIT - This should go in BufferManagerRead. But, since
50 // BufferManagerRead is an interface. BufferManagerRead
51 // might ought to be an abstract class instead of an
52 // interface.
53 private ORB orb ;
54 private ORBUtilSystemException wrapper ;
55 private boolean debug = false;
57 BufferManagerReadStream( ORB orb )
58 {
59 this.orb = orb ;
60 this.wrapper = ORBUtilSystemException.get( orb,
61 CORBALogDomains.RPC_ENCODING ) ;
62 debug = orb.transportDebugFlag;
63 }
65 public void cancelProcessing(int requestId) {
66 synchronized(fragmentQueue) {
67 receivedCancel = true;
68 cancelReqId = requestId;
69 fragmentQueue.notify();
70 }
71 }
73 public void processFragment(ByteBuffer byteBuffer, FragmentMessage msg)
74 {
75 ByteBufferWithInfo bbwi =
76 new ByteBufferWithInfo(orb, byteBuffer, msg.getHeaderLength());
78 synchronized (fragmentQueue) {
79 if (debug)
80 {
81 // print address of ByteBuffer being queued
82 int bbAddress = System.identityHashCode(byteBuffer);
83 StringBuffer sb = new StringBuffer(80);
84 sb.append("processFragment() - queueing ByteBuffer id (");
85 sb.append(bbAddress).append(") to fragment queue.");
86 String strMsg = sb.toString();
87 dprint(strMsg);
88 }
89 fragmentQueue.enqueue(bbwi);
90 endOfStream = !msg.moreFragmentsToFollow();
91 fragmentQueue.notify();
92 }
93 }
95 public ByteBufferWithInfo underflow (ByteBufferWithInfo bbwi)
96 {
98 ByteBufferWithInfo result = null;
100 try {
101 //System.out.println("ENTER underflow");
103 synchronized (fragmentQueue) {
105 if (receivedCancel) {
106 throw new RequestCanceledException(cancelReqId);
107 }
109 while (fragmentQueue.size() == 0) {
111 if (endOfStream) {
112 throw wrapper.endOfStream() ;
113 }
115 boolean interrupted = false;
116 try {
117 fragmentQueue.wait(FRAGMENT_TIMEOUT);
118 } catch (InterruptedException e) {
119 interrupted = true;
120 }
122 if (!interrupted && fragmentQueue.size() == 0) {
123 throw wrapper.bufferReadManagerTimeout();
124 }
126 if (receivedCancel) {
127 throw new RequestCanceledException(cancelReqId);
128 }
129 }
131 result = fragmentQueue.dequeue();
132 result.fragmented = true;
134 if (debug)
135 {
136 // print address of ByteBuffer being dequeued
137 int bbAddr = System.identityHashCode(result.byteBuffer);
138 StringBuffer sb1 = new StringBuffer(80);
139 sb1.append("underflow() - dequeued ByteBuffer id (");
140 sb1.append(bbAddr).append(") from fragment queue.");
141 String msg1 = sb1.toString();
142 dprint(msg1);
143 }
145 // VERY IMPORTANT
146 // Release bbwi.byteBuffer to the ByteBufferPool only if
147 // this BufferManagerStream is not marked for potential restore.
148 if (markEngaged == false && bbwi != null && bbwi.byteBuffer != null)
149 {
150 ByteBufferPool byteBufferPool = getByteBufferPool();
152 if (debug)
153 {
154 // print address of ByteBuffer being released
155 int bbAddress = System.identityHashCode(bbwi.byteBuffer);
156 StringBuffer sb = new StringBuffer(80);
157 sb.append("underflow() - releasing ByteBuffer id (");
158 sb.append(bbAddress).append(") to ByteBufferPool.");
159 String msg = sb.toString();
160 dprint(msg);
161 }
163 byteBufferPool.releaseByteBuffer(bbwi.byteBuffer);
164 bbwi.byteBuffer = null;
165 bbwi = null;
166 }
167 }
168 return result;
169 } finally {
170 //System.out.println("EXIT underflow");
171 }
172 }
174 public void init(Message msg) {
175 if (msg != null)
176 endOfStream = !msg.moreFragmentsToFollow();
177 }
179 // Release any queued ByteBufferWithInfo's byteBuffers to the
180 // ByteBufferPoool
181 public void close(ByteBufferWithInfo bbwi)
182 {
183 int inputBbAddress = 0;
185 // release ByteBuffers on fragmentQueue
186 if (fragmentQueue != null)
187 {
188 synchronized (fragmentQueue)
189 {
190 // IMPORTANT: The fragment queue may have one ByteBuffer
191 // on it that's also on the CDRInputStream if
192 // this method is called when the stream is 'marked'.
193 // Thus, we'll compare the ByteBuffer passed
194 // in (from a CDRInputStream) with all ByteBuffers
195 // on the stack. If one is found to equal, it will
196 // not be released to the ByteBufferPool.
197 if (bbwi != null)
198 {
199 inputBbAddress = System.identityHashCode(bbwi.byteBuffer);
200 }
202 ByteBufferWithInfo abbwi = null;
203 ByteBufferPool byteBufferPool = getByteBufferPool();
204 while (fragmentQueue.size() != 0)
205 {
206 abbwi = fragmentQueue.dequeue();
207 if (abbwi != null && abbwi.byteBuffer != null)
208 {
209 int bbAddress = System.identityHashCode(abbwi.byteBuffer);
210 if (inputBbAddress != bbAddress)
211 {
212 if (debug)
213 {
214 // print address of ByteBuffer released
215 StringBuffer sb = new StringBuffer(80);
216 sb.append("close() - fragmentQueue is ")
217 .append("releasing ByteBuffer id (")
218 .append(bbAddress).append(") to ")
219 .append("ByteBufferPool.");
220 String msg = sb.toString();
221 dprint(msg);
222 }
223 }
224 byteBufferPool.releaseByteBuffer(abbwi.byteBuffer);
225 }
226 }
227 }
228 fragmentQueue = null;
229 }
231 // release ByteBuffers on fragmentStack
232 if (fragmentStack != null && fragmentStack.size() != 0)
233 {
234 // IMPORTANT: The fragment stack may have one ByteBuffer
235 // on it that's also on the CDRInputStream if
236 // this method is called when the stream is 'marked'.
237 // Thus, we'll compare the ByteBuffer passed
238 // in (from a CDRInputStream) with all ByteBuffers
239 // on the stack. If one is found to equal, it will
240 // not be released to the ByteBufferPool.
241 if (bbwi != null)
242 {
243 inputBbAddress = System.identityHashCode(bbwi.byteBuffer);
244 }
246 ByteBufferWithInfo abbwi = null;
247 ByteBufferPool byteBufferPool = getByteBufferPool();
248 ListIterator itr = fragmentStack.listIterator();
249 while (itr.hasNext())
250 {
251 abbwi = (ByteBufferWithInfo)itr.next();
253 if (abbwi != null && abbwi.byteBuffer != null)
254 {
255 int bbAddress = System.identityHashCode(abbwi.byteBuffer);
256 if (inputBbAddress != bbAddress)
257 {
258 if (debug)
259 {
260 // print address of ByteBuffer being released
261 StringBuffer sb = new StringBuffer(80);
262 sb.append("close() - fragmentStack - releasing ")
263 .append("ByteBuffer id (" + bbAddress + ") to ")
264 .append("ByteBufferPool.");
265 String msg = sb.toString();
266 dprint(msg);
267 }
268 byteBufferPool.releaseByteBuffer(abbwi.byteBuffer);
269 }
270 }
271 }
272 fragmentStack = null;
273 }
275 }
277 protected ByteBufferPool getByteBufferPool()
278 {
279 return orb.getByteBufferPool();
280 }
282 private void dprint(String msg)
283 {
284 ORBUtility.dprint("BufferManagerReadStream", msg);
285 }
287 // Mark and reset handler ----------------------------------------
289 private boolean markEngaged = false;
291 // List of fragment ByteBufferWithInfos received since
292 // the mark was engaged.
293 private LinkedList fragmentStack = null;
294 private RestorableInputStream inputStream = null;
296 // Original state of the stream
297 private Object streamMemento = null;
299 public void mark(RestorableInputStream inputStream)
300 {
301 this.inputStream = inputStream;
302 markEngaged = true;
304 // Get the magic Object that the stream will use to
305 // reconstruct it's state when reset is called
306 streamMemento = inputStream.createStreamMemento();
308 if (fragmentStack != null) {
309 fragmentStack.clear();
310 }
311 }
313 // Collects fragments received since the mark was engaged.
314 public void fragmentationOccured(ByteBufferWithInfo newFragment)
315 {
316 if (!markEngaged)
317 return;
319 if (fragmentStack == null)
320 fragmentStack = new LinkedList();
322 fragmentStack.addFirst(new ByteBufferWithInfo(newFragment));
323 }
325 public void reset()
326 {
327 if (!markEngaged) {
328 // REVISIT - call to reset without call to mark
329 return;
330 }
332 markEngaged = false;
334 // If we actually did peek across fragments, we need
335 // to push those fragments onto the front of the
336 // buffer queue.
337 if (fragmentStack != null && fragmentStack.size() != 0) {
338 ListIterator iter = fragmentStack.listIterator();
340 synchronized(fragmentQueue) {
341 while (iter.hasNext()) {
342 fragmentQueue.push((ByteBufferWithInfo)iter.next());
343 }
344 }
346 fragmentStack.clear();
347 }
349 // Give the stream the magic Object to restore
350 // it's state.
351 inputStream.restoreInternalState(streamMemento);
352 }
354 public MarkAndResetHandler getMarkAndResetHandler() {
355 return this;
356 }
357 }