src/share/classes/com/sun/corba/se/impl/transport/SocketOrChannelConnectionImpl.java

Tue, 28 Dec 2010 15:52:36 -0800

author
ohair
date
Tue, 28 Dec 2010 15:52:36 -0800
changeset 240
f90b3e014e83
parent 226
e0f7ed041196
child 391
18a02ad8dc73
permissions
-rw-r--r--

6962318: Update copyright year
Reviewed-by: xdono

duke@1 1 /*
ohair@240 2 * Copyright (c) 2001, 2010, Oracle and/or its affiliates. All rights reserved.
duke@1 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
duke@1 4 *
duke@1 5 * This code is free software; you can redistribute it and/or modify it
duke@1 6 * under the terms of the GNU General Public License version 2 only, as
ohair@158 7 * published by the Free Software Foundation. Oracle designates this
duke@1 8 * particular file as subject to the "Classpath" exception as provided
ohair@158 9 * by Oracle in the LICENSE file that accompanied this code.
duke@1 10 *
duke@1 11 * This code is distributed in the hope that it will be useful, but WITHOUT
duke@1 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
duke@1 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
duke@1 14 * version 2 for more details (a copy is included in the LICENSE file that
duke@1 15 * accompanied this code).
duke@1 16 *
duke@1 17 * You should have received a copy of the GNU General Public License version
duke@1 18 * 2 along with this work; if not, write to the Free Software Foundation,
duke@1 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
duke@1 20 *
ohair@158 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
ohair@158 22 * or visit www.oracle.com if you need additional information or have any
ohair@158 23 * questions.
duke@1 24 */
duke@1 25
duke@1 26 package com.sun.corba.se.impl.transport;
duke@1 27
duke@1 28 import java.io.IOException;
duke@1 29 import java.net.InetSocketAddress;
duke@1 30 import java.net.Socket;
duke@1 31 import java.nio.ByteBuffer;
duke@1 32 import java.nio.channels.SelectableChannel;
duke@1 33 import java.nio.channels.SelectionKey;
duke@1 34 import java.nio.channels.SocketChannel;
duke@1 35 import java.security.AccessController;
duke@1 36 import java.security.PrivilegedAction;
duke@1 37 import java.util.Collections;
duke@1 38 import java.util.Hashtable;
duke@1 39 import java.util.HashMap;
duke@1 40 import java.util.Map;
duke@1 41
duke@1 42 import org.omg.CORBA.COMM_FAILURE;
duke@1 43 import org.omg.CORBA.CompletionStatus;
duke@1 44 import org.omg.CORBA.DATA_CONVERSION;
duke@1 45 import org.omg.CORBA.INTERNAL;
duke@1 46 import org.omg.CORBA.MARSHAL;
duke@1 47 import org.omg.CORBA.OBJECT_NOT_EXIST;
duke@1 48 import org.omg.CORBA.SystemException;
duke@1 49
duke@1 50 import com.sun.org.omg.SendingContext.CodeBase;
duke@1 51
duke@1 52 import com.sun.corba.se.pept.broker.Broker;
duke@1 53 import com.sun.corba.se.pept.encoding.InputObject;
duke@1 54 import com.sun.corba.se.pept.encoding.OutputObject;
duke@1 55 import com.sun.corba.se.pept.protocol.MessageMediator;
duke@1 56 import com.sun.corba.se.pept.transport.Acceptor;
duke@1 57 import com.sun.corba.se.pept.transport.Connection;
duke@1 58 import com.sun.corba.se.pept.transport.ConnectionCache;
duke@1 59 import com.sun.corba.se.pept.transport.ContactInfo;
duke@1 60 import com.sun.corba.se.pept.transport.EventHandler;
duke@1 61 import com.sun.corba.se.pept.transport.InboundConnectionCache;
duke@1 62 import com.sun.corba.se.pept.transport.OutboundConnectionCache;
duke@1 63 import com.sun.corba.se.pept.transport.ResponseWaitingRoom;
duke@1 64 import com.sun.corba.se.pept.transport.Selector;
duke@1 65
duke@1 66 import com.sun.corba.se.spi.ior.IOR;
duke@1 67 import com.sun.corba.se.spi.ior.iiop.GIOPVersion;
duke@1 68 import com.sun.corba.se.spi.logging.CORBALogDomains;
duke@1 69 import com.sun.corba.se.spi.orb.ORB ;
duke@1 70 import com.sun.corba.se.spi.orbutil.threadpool.NoSuchThreadPoolException;
duke@1 71 import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException;
duke@1 72 import com.sun.corba.se.spi.orbutil.threadpool.Work;
duke@1 73 import com.sun.corba.se.spi.protocol.CorbaMessageMediator;
duke@1 74 import com.sun.corba.se.spi.transport.CorbaContactInfo;
duke@1 75 import com.sun.corba.se.spi.transport.CorbaConnection;
duke@1 76 import com.sun.corba.se.spi.transport.CorbaResponseWaitingRoom;
duke@1 77 import com.sun.corba.se.spi.transport.ReadTimeouts;
duke@1 78
duke@1 79 import com.sun.corba.se.impl.encoding.CachedCodeBase;
duke@1 80 import com.sun.corba.se.impl.encoding.CDRInputStream_1_0;
duke@1 81 import com.sun.corba.se.impl.encoding.CDROutputObject;
duke@1 82 import com.sun.corba.se.impl.encoding.CDROutputStream_1_0;
duke@1 83 import com.sun.corba.se.impl.encoding.CodeSetComponentInfo;
duke@1 84 import com.sun.corba.se.impl.encoding.OSFCodeSetRegistry;
duke@1 85 import com.sun.corba.se.impl.logging.ORBUtilSystemException;
duke@1 86 import com.sun.corba.se.impl.orbutil.ORBConstants;
duke@1 87 import com.sun.corba.se.impl.orbutil.ORBUtility;
duke@1 88 import com.sun.corba.se.impl.protocol.giopmsgheaders.Message;
duke@1 89 import com.sun.corba.se.impl.protocol.giopmsgheaders.MessageBase;
duke@1 90 import com.sun.corba.se.impl.transport.CorbaResponseWaitingRoomImpl;
duke@1 91
duke@1 92 /**
duke@1 93 * @author Harold Carr
duke@1 94 */
duke@1 95 public class SocketOrChannelConnectionImpl
duke@1 96 extends
duke@1 97 EventHandlerBase
duke@1 98 implements
duke@1 99 CorbaConnection,
duke@1 100 Work
duke@1 101 {
duke@1 102 public static boolean dprintWriteLocks = false;
duke@1 103
duke@1 104 //
duke@1 105 // New transport.
duke@1 106 //
duke@1 107
duke@1 108 protected long enqueueTime;
duke@1 109
duke@1 110 protected SocketChannel socketChannel;
duke@1 111 public SocketChannel getSocketChannel()
duke@1 112 {
duke@1 113 return socketChannel;
duke@1 114 }
duke@1 115
duke@1 116 // REVISIT:
duke@1 117 // protected for test: genericRPCMSGFramework.IIOPConnection constructor.
duke@1 118 protected CorbaContactInfo contactInfo;
duke@1 119 protected Acceptor acceptor;
duke@1 120 protected ConnectionCache connectionCache;
duke@1 121
duke@1 122 //
duke@1 123 // From iiop.Connection.java
duke@1 124 //
duke@1 125
duke@1 126 protected Socket socket; // The socket used for this connection.
duke@1 127 protected long timeStamp = 0;
duke@1 128 protected boolean isServer = false;
duke@1 129
duke@1 130 // Start at some value other than zero since this is a magic
duke@1 131 // value in some protocols.
duke@1 132 protected int requestId = 5;
duke@1 133 protected CorbaResponseWaitingRoom responseWaitingRoom;
duke@1 134 protected int state;
duke@1 135 protected java.lang.Object stateEvent = new java.lang.Object();
duke@1 136 protected java.lang.Object writeEvent = new java.lang.Object();
duke@1 137 protected boolean writeLocked;
duke@1 138 protected int serverRequestCount = 0;
duke@1 139
duke@1 140 // Server request map: used on the server side of Connection
duke@1 141 // Maps request ID to IIOPInputStream.
duke@1 142 Map serverRequestMap = null;
duke@1 143
duke@1 144 // This is a flag associated per connection telling us if the
duke@1 145 // initial set of sending contexts were sent to the receiver
duke@1 146 // already...
duke@1 147 protected boolean postInitialContexts = false;
duke@1 148
duke@1 149 // Remote reference to CodeBase server (supplies
duke@1 150 // FullValueDescription, among other things)
duke@1 151 protected IOR codeBaseServerIOR;
duke@1 152
duke@1 153 // CodeBase cache for this connection. This will cache remote operations,
duke@1 154 // handle connecting, and ensure we don't do any remote operations until
duke@1 155 // necessary.
duke@1 156 protected CachedCodeBase cachedCodeBase = new CachedCodeBase(this);
duke@1 157
duke@1 158 protected ORBUtilSystemException wrapper ;
duke@1 159
duke@1 160 // transport read timeout values
duke@1 161 protected ReadTimeouts readTimeouts;
duke@1 162
duke@1 163 protected boolean shouldReadGiopHeaderOnly;
duke@1 164
duke@1 165 // A message mediator used when shouldReadGiopHeaderOnly is
duke@1 166 // true to maintain request message state across execution in a
duke@1 167 // SelectorThread and WorkerThread.
duke@1 168 protected CorbaMessageMediator partialMessageMediator = null;
duke@1 169
duke@1 170 // Used in genericRPCMSGFramework test.
duke@1 171 protected SocketOrChannelConnectionImpl(ORB orb)
duke@1 172 {
duke@1 173 this.orb = orb;
duke@1 174 wrapper = ORBUtilSystemException.get( orb,
duke@1 175 CORBALogDomains.RPC_TRANSPORT ) ;
duke@1 176
duke@1 177 setWork(this);
duke@1 178 responseWaitingRoom = new CorbaResponseWaitingRoomImpl(orb, this);
duke@1 179 setReadTimeouts(orb.getORBData().getTransportTCPReadTimeouts());
duke@1 180 }
duke@1 181
duke@1 182 // Both client and servers.
duke@1 183 protected SocketOrChannelConnectionImpl(ORB orb,
duke@1 184 boolean useSelectThreadToWait,
duke@1 185 boolean useWorkerThread)
duke@1 186 {
duke@1 187 this(orb) ;
duke@1 188 setUseSelectThreadToWait(useSelectThreadToWait);
duke@1 189 setUseWorkerThreadForEvent(useWorkerThread);
duke@1 190 }
duke@1 191
duke@1 192 // Client constructor.
duke@1 193 public SocketOrChannelConnectionImpl(ORB orb,
duke@1 194 CorbaContactInfo contactInfo,
duke@1 195 boolean useSelectThreadToWait,
duke@1 196 boolean useWorkerThread,
duke@1 197 String socketType,
duke@1 198 String hostname,
duke@1 199 int port)
duke@1 200 {
duke@1 201 this(orb, useSelectThreadToWait, useWorkerThread);
duke@1 202
duke@1 203 this.contactInfo = contactInfo;
duke@1 204
duke@1 205 try {
duke@1 206 socket = orb.getORBData().getSocketFactory()
duke@1 207 .createSocket(socketType,
duke@1 208 new InetSocketAddress(hostname, port));
duke@1 209 socketChannel = socket.getChannel();
duke@1 210
duke@1 211 if (socketChannel != null) {
duke@1 212 boolean isBlocking = !useSelectThreadToWait;
duke@1 213 socketChannel.configureBlocking(isBlocking);
duke@1 214 } else {
duke@1 215 // IMPORTANT: non-channel-backed sockets must use
duke@1 216 // dedicated reader threads.
duke@1 217 setUseSelectThreadToWait(false);
duke@1 218 }
duke@1 219 if (orb.transportDebugFlag) {
duke@1 220 dprint(".initialize: connection created: " + socket);
duke@1 221 }
duke@1 222 } catch (Throwable t) {
duke@1 223 throw wrapper.connectFailure(t, socketType, hostname,
duke@1 224 Integer.toString(port));
duke@1 225 }
duke@1 226 state = OPENING;
duke@1 227 }
duke@1 228
duke@1 229 // Client-side convenience.
duke@1 230 public SocketOrChannelConnectionImpl(ORB orb,
duke@1 231 CorbaContactInfo contactInfo,
duke@1 232 String socketType,
duke@1 233 String hostname,
duke@1 234 int port)
duke@1 235 {
duke@1 236 this(orb, contactInfo,
duke@1 237 orb.getORBData().connectionSocketUseSelectThreadToWait(),
duke@1 238 orb.getORBData().connectionSocketUseWorkerThreadForEvent(),
duke@1 239 socketType, hostname, port);
duke@1 240 }
duke@1 241
duke@1 242 // Server-side constructor.
duke@1 243 public SocketOrChannelConnectionImpl(ORB orb,
duke@1 244 Acceptor acceptor,
duke@1 245 Socket socket,
duke@1 246 boolean useSelectThreadToWait,
duke@1 247 boolean useWorkerThread)
duke@1 248 {
duke@1 249 this(orb, useSelectThreadToWait, useWorkerThread);
duke@1 250
duke@1 251 this.socket = socket;
duke@1 252 socketChannel = socket.getChannel();
duke@1 253 if (socketChannel != null) {
duke@1 254 // REVISIT
duke@1 255 try {
duke@1 256 boolean isBlocking = !useSelectThreadToWait;
duke@1 257 socketChannel.configureBlocking(isBlocking);
duke@1 258 } catch (IOException e) {
duke@1 259 RuntimeException rte = new RuntimeException();
duke@1 260 rte.initCause(e);
duke@1 261 throw rte;
duke@1 262 }
duke@1 263 }
duke@1 264 this.acceptor = acceptor;
duke@1 265
duke@1 266 serverRequestMap = Collections.synchronizedMap(new HashMap());
duke@1 267 isServer = true;
duke@1 268
duke@1 269 state = ESTABLISHED;
duke@1 270 }
duke@1 271
duke@1 272 // Server-side convenience
duke@1 273 public SocketOrChannelConnectionImpl(ORB orb,
duke@1 274 Acceptor acceptor,
duke@1 275 Socket socket)
duke@1 276 {
duke@1 277 this(orb, acceptor, socket,
duke@1 278 (socket.getChannel() == null
duke@1 279 ? false
duke@1 280 : orb.getORBData().connectionSocketUseSelectThreadToWait()),
duke@1 281 (socket.getChannel() == null
duke@1 282 ? false
duke@1 283 : orb.getORBData().connectionSocketUseWorkerThreadForEvent()));
duke@1 284 }
duke@1 285
duke@1 286 ////////////////////////////////////////////////////
duke@1 287 //
duke@1 288 // framework.transport.Connection
duke@1 289 //
duke@1 290
duke@1 291 public boolean shouldRegisterReadEvent()
duke@1 292 {
duke@1 293 return true;
duke@1 294 }
duke@1 295
duke@1 296 public boolean shouldRegisterServerReadEvent()
duke@1 297 {
duke@1 298 return true;
duke@1 299 }
duke@1 300
duke@1 301 public boolean read()
duke@1 302 {
duke@1 303 try {
duke@1 304 if (orb.transportDebugFlag) {
duke@1 305 dprint(".read->: " + this);
duke@1 306 }
duke@1 307 CorbaMessageMediator messageMediator = readBits();
duke@1 308 if (messageMediator != null) {
duke@1 309 // Null can happen when client closes stream
duke@1 310 // causing purgecalls.
duke@1 311 return dispatch(messageMediator);
duke@1 312 }
duke@1 313 return true;
duke@1 314 } finally {
duke@1 315 if (orb.transportDebugFlag) {
duke@1 316 dprint(".read<-: " + this);
duke@1 317 }
duke@1 318 }
duke@1 319 }
duke@1 320
duke@1 321 protected CorbaMessageMediator readBits()
duke@1 322 {
duke@1 323 try {
duke@1 324
duke@1 325 if (orb.transportDebugFlag) {
duke@1 326 dprint(".readBits->: " + this);
duke@1 327 }
duke@1 328
duke@1 329 MessageMediator messageMediator;
duke@1 330 // REVISIT - use common factory base class.
duke@1 331 if (contactInfo != null) {
duke@1 332 messageMediator =
duke@1 333 contactInfo.createMessageMediator(orb, this);
duke@1 334 } else if (acceptor != null) {
duke@1 335 messageMediator = acceptor.createMessageMediator(orb, this);
duke@1 336 } else {
duke@1 337 throw
duke@1 338 new RuntimeException("SocketOrChannelConnectionImpl.readBits");
duke@1 339 }
duke@1 340 return (CorbaMessageMediator) messageMediator;
duke@1 341
duke@1 342 } catch (ThreadDeath td) {
duke@1 343 if (orb.transportDebugFlag) {
duke@1 344 dprint(".readBits: " + this + ": ThreadDeath: " + td, td);
duke@1 345 }
duke@1 346 try {
duke@1 347 purgeCalls(wrapper.connectionAbort(td), false, false);
duke@1 348 } catch (Throwable t) {
duke@1 349 if (orb.transportDebugFlag) {
duke@1 350 dprint(".readBits: " + this + ": purgeCalls: Throwable: " + t, t);
duke@1 351 }
duke@1 352 }
duke@1 353 throw td;
duke@1 354 } catch (Throwable ex) {
duke@1 355 if (orb.transportDebugFlag) {
duke@1 356 dprint(".readBits: " + this + ": Throwable: " + ex, ex);
duke@1 357 }
duke@1 358
duke@1 359 try {
duke@1 360 if (ex instanceof INTERNAL) {
duke@1 361 sendMessageError(GIOPVersion.DEFAULT_VERSION);
duke@1 362 }
duke@1 363 } catch (IOException e) {
duke@1 364 if (orb.transportDebugFlag) {
duke@1 365 dprint(".readBits: " + this +
duke@1 366 ": sendMessageError: IOException: " + e, e);
duke@1 367 }
duke@1 368 }
duke@1 369 // REVISIT - make sure reader thread is killed.
duke@1 370 orb.getTransportManager().getSelector(0).unregisterForEvent(this);
duke@1 371 // Notify anyone waiting.
duke@1 372 purgeCalls(wrapper.connectionAbort(ex), true, false);
duke@1 373 // REVISIT
duke@1 374 //keepRunning = false;
duke@1 375 // REVISIT - if this is called after purgeCalls then
duke@1 376 // the state of the socket is ABORT so the writeLock
duke@1 377 // in close throws an exception. It is ignored but
duke@1 378 // causes IBM (screen scraping) tests to fail.
duke@1 379 //close();
duke@1 380 } finally {
duke@1 381 if (orb.transportDebugFlag) {
duke@1 382 dprint(".readBits<-: " + this);
duke@1 383 }
duke@1 384 }
duke@1 385 return null;
duke@1 386 }
duke@1 387
duke@1 388 protected CorbaMessageMediator finishReadingBits(MessageMediator messageMediator)
duke@1 389 {
duke@1 390 try {
duke@1 391
duke@1 392 if (orb.transportDebugFlag) {
duke@1 393 dprint(".finishReadingBits->: " + this);
duke@1 394 }
duke@1 395
duke@1 396 // REVISIT - use common factory base class.
duke@1 397 if (contactInfo != null) {
duke@1 398 messageMediator =
duke@1 399 contactInfo.finishCreatingMessageMediator(orb, this, messageMediator);
duke@1 400 } else if (acceptor != null) {
duke@1 401 messageMediator =
duke@1 402 acceptor.finishCreatingMessageMediator(orb, this, messageMediator);
duke@1 403 } else {
duke@1 404 throw
duke@1 405 new RuntimeException("SocketOrChannelConnectionImpl.finishReadingBits");
duke@1 406 }
duke@1 407 return (CorbaMessageMediator) messageMediator;
duke@1 408
duke@1 409 } catch (ThreadDeath td) {
duke@1 410 if (orb.transportDebugFlag) {
duke@1 411 dprint(".finishReadingBits: " + this + ": ThreadDeath: " + td, td);
duke@1 412 }
duke@1 413 try {
duke@1 414 purgeCalls(wrapper.connectionAbort(td), false, false);
duke@1 415 } catch (Throwable t) {
duke@1 416 if (orb.transportDebugFlag) {
duke@1 417 dprint(".finishReadingBits: " + this + ": purgeCalls: Throwable: " + t, t);
duke@1 418 }
duke@1 419 }
duke@1 420 throw td;
duke@1 421 } catch (Throwable ex) {
duke@1 422 if (orb.transportDebugFlag) {
duke@1 423 dprint(".finishReadingBits: " + this + ": Throwable: " + ex, ex);
duke@1 424 }
duke@1 425
duke@1 426 try {
duke@1 427 if (ex instanceof INTERNAL) {
duke@1 428 sendMessageError(GIOPVersion.DEFAULT_VERSION);
duke@1 429 }
duke@1 430 } catch (IOException e) {
duke@1 431 if (orb.transportDebugFlag) {
duke@1 432 dprint(".finishReadingBits: " + this +
duke@1 433 ": sendMessageError: IOException: " + e, e);
duke@1 434 }
duke@1 435 }
duke@1 436 // REVISIT - make sure reader thread is killed.
duke@1 437 orb.getTransportManager().getSelector(0).unregisterForEvent(this);
duke@1 438 // Notify anyone waiting.
duke@1 439 purgeCalls(wrapper.connectionAbort(ex), true, false);
duke@1 440 // REVISIT
duke@1 441 //keepRunning = false;
duke@1 442 // REVISIT - if this is called after purgeCalls then
duke@1 443 // the state of the socket is ABORT so the writeLock
duke@1 444 // in close throws an exception. It is ignored but
duke@1 445 // causes IBM (screen scraping) tests to fail.
duke@1 446 //close();
duke@1 447 } finally {
duke@1 448 if (orb.transportDebugFlag) {
duke@1 449 dprint(".finishReadingBits<-: " + this);
duke@1 450 }
duke@1 451 }
duke@1 452 return null;
duke@1 453 }
duke@1 454
duke@1 455 protected boolean dispatch(CorbaMessageMediator messageMediator)
duke@1 456 {
duke@1 457 try {
duke@1 458 if (orb.transportDebugFlag) {
duke@1 459 dprint(".dispatch->: " + this);
duke@1 460 }
duke@1 461
duke@1 462 //
duke@1 463 // NOTE:
duke@1 464 //
duke@1 465 // This call is the transition from the tranport block
duke@1 466 // to the protocol block.
duke@1 467 //
duke@1 468
duke@1 469 boolean result =
duke@1 470 messageMediator.getProtocolHandler()
duke@1 471 .handleRequest(messageMediator);
duke@1 472
duke@1 473 return result;
duke@1 474
duke@1 475 } catch (ThreadDeath td) {
duke@1 476 if (orb.transportDebugFlag) {
duke@1 477 dprint(".dispatch: ThreadDeath", td );
duke@1 478 }
duke@1 479 try {
duke@1 480 purgeCalls(wrapper.connectionAbort(td), false, false);
duke@1 481 } catch (Throwable t) {
duke@1 482 if (orb.transportDebugFlag) {
duke@1 483 dprint(".dispatch: purgeCalls: Throwable", t);
duke@1 484 }
duke@1 485 }
duke@1 486 throw td;
duke@1 487 } catch (Throwable ex) {
duke@1 488 if (orb.transportDebugFlag) {
duke@1 489 dprint(".dispatch: Throwable", ex ) ;
duke@1 490 }
duke@1 491
duke@1 492 try {
duke@1 493 if (ex instanceof INTERNAL) {
duke@1 494 sendMessageError(GIOPVersion.DEFAULT_VERSION);
duke@1 495 }
duke@1 496 } catch (IOException e) {
duke@1 497 if (orb.transportDebugFlag) {
duke@1 498 dprint(".dispatch: sendMessageError: IOException", e);
duke@1 499 }
duke@1 500 }
duke@1 501 purgeCalls(wrapper.connectionAbort(ex), false, false);
duke@1 502 // REVISIT
duke@1 503 //keepRunning = false;
duke@1 504 } finally {
duke@1 505 if (orb.transportDebugFlag) {
duke@1 506 dprint(".dispatch<-: " + this);
duke@1 507 }
duke@1 508 }
duke@1 509
duke@1 510 return true;
duke@1 511 }
duke@1 512
duke@1 513 public boolean shouldUseDirectByteBuffers()
duke@1 514 {
duke@1 515 return getSocketChannel() != null;
duke@1 516 }
duke@1 517
duke@1 518 public ByteBuffer read(int size, int offset, int length, long max_wait_time)
duke@1 519 throws IOException
duke@1 520 {
duke@1 521 if (shouldUseDirectByteBuffers()) {
duke@1 522
duke@1 523 ByteBuffer byteBuffer =
duke@1 524 orb.getByteBufferPool().getByteBuffer(size);
duke@1 525
duke@1 526 if (orb.transportDebugFlag) {
duke@1 527 // print address of ByteBuffer gotten from pool
duke@1 528 int bbAddress = System.identityHashCode(byteBuffer);
duke@1 529 StringBuffer sb = new StringBuffer(80);
duke@1 530 sb.append(".read: got ByteBuffer id (");
duke@1 531 sb.append(bbAddress).append(") from ByteBufferPool.");
duke@1 532 String msgStr = sb.toString();
duke@1 533 dprint(msgStr);
duke@1 534 }
duke@1 535
duke@1 536 byteBuffer.position(offset);
duke@1 537 byteBuffer.limit(size);
duke@1 538
duke@1 539 readFully(byteBuffer, length, max_wait_time);
duke@1 540
duke@1 541 return byteBuffer;
duke@1 542 }
duke@1 543
duke@1 544 byte[] buf = new byte[size];
duke@1 545 readFully(getSocket().getInputStream(), buf,
duke@1 546 offset, length, max_wait_time);
duke@1 547 ByteBuffer byteBuffer = ByteBuffer.wrap(buf);
duke@1 548 byteBuffer.limit(size);
duke@1 549 return byteBuffer;
duke@1 550 }
duke@1 551
duke@1 552 public ByteBuffer read(ByteBuffer byteBuffer, int offset,
duke@1 553 int length, long max_wait_time)
duke@1 554 throws IOException
duke@1 555 {
duke@1 556 int size = offset + length;
duke@1 557 if (shouldUseDirectByteBuffers()) {
duke@1 558
duke@1 559 if (! byteBuffer.isDirect()) {
duke@1 560 throw wrapper.unexpectedNonDirectByteBufferWithChannelSocket();
duke@1 561 }
duke@1 562 if (size > byteBuffer.capacity()) {
duke@1 563 if (orb.transportDebugFlag) {
duke@1 564 // print address of ByteBuffer being released
duke@1 565 int bbAddress = System.identityHashCode(byteBuffer);
duke@1 566 StringBuffer bbsb = new StringBuffer(80);
duke@1 567 bbsb.append(".read: releasing ByteBuffer id (")
duke@1 568 .append(bbAddress).append(") to ByteBufferPool.");
duke@1 569 String bbmsg = bbsb.toString();
duke@1 570 dprint(bbmsg);
duke@1 571 }
duke@1 572 orb.getByteBufferPool().releaseByteBuffer(byteBuffer);
duke@1 573 byteBuffer = orb.getByteBufferPool().getByteBuffer(size);
duke@1 574 }
duke@1 575 byteBuffer.position(offset);
duke@1 576 byteBuffer.limit(size);
duke@1 577 readFully(byteBuffer, length, max_wait_time);
duke@1 578 byteBuffer.position(0);
duke@1 579 byteBuffer.limit(size);
duke@1 580 return byteBuffer;
duke@1 581 }
duke@1 582 if (byteBuffer.isDirect()) {
duke@1 583 throw wrapper.unexpectedDirectByteBufferWithNonChannelSocket();
duke@1 584 }
duke@1 585 byte[] buf = new byte[size];
duke@1 586 readFully(getSocket().getInputStream(), buf,
duke@1 587 offset, length, max_wait_time);
duke@1 588 return ByteBuffer.wrap(buf);
duke@1 589 }
duke@1 590
duke@1 591 public void readFully(ByteBuffer byteBuffer, int size, long max_wait_time)
duke@1 592 throws IOException
duke@1 593 {
duke@1 594 int n = 0;
duke@1 595 int bytecount = 0;
duke@1 596 long time_to_wait = readTimeouts.get_initial_time_to_wait();
duke@1 597 long total_time_in_wait = 0;
duke@1 598
duke@1 599 // The reading of data incorporates a strategy to detect a
duke@1 600 // rogue client. The strategy is implemented as follows. As
duke@1 601 // long as data is being read, at least 1 byte or more, we
duke@1 602 // assume we have a well behaved client. If no data is read,
duke@1 603 // then we sleep for a time to wait, re-calculate a new time to
duke@1 604 // wait which is lengthier than the previous time spent waiting.
duke@1 605 // Then, if the total time spent waiting does not exceed a
duke@1 606 // maximum time we are willing to wait, we attempt another
duke@1 607 // read. If the maximum amount of time we are willing to
duke@1 608 // spend waiting for more data is exceeded, we throw an
duke@1 609 // IOException.
duke@1 610
duke@1 611 // NOTE: Reading of GIOP headers are treated with a smaller
duke@1 612 // maximum time to wait threshold. Based on extensive
duke@1 613 // performance testing, all GIOP headers are being
duke@1 614 // read in 1 read access.
duke@1 615
duke@1 616 do {
duke@1 617 bytecount = getSocketChannel().read(byteBuffer);
duke@1 618
duke@1 619 if (bytecount < 0) {
duke@1 620 throw new IOException("End-of-stream");
duke@1 621 }
duke@1 622 else if (bytecount == 0) {
duke@1 623 try {
duke@1 624 Thread.sleep(time_to_wait);
duke@1 625 total_time_in_wait += time_to_wait;
duke@1 626 time_to_wait =
duke@1 627 (long)(time_to_wait*readTimeouts.get_backoff_factor());
duke@1 628 }
duke@1 629 catch (InterruptedException ie) {
duke@1 630 // ignore exception
duke@1 631 if (orb.transportDebugFlag) {
duke@1 632 dprint("readFully(): unexpected exception "
duke@1 633 + ie.toString());
duke@1 634 }
duke@1 635 }
duke@1 636 }
duke@1 637 else {
duke@1 638 n += bytecount;
duke@1 639 }
duke@1 640 }
duke@1 641 while (n < size && total_time_in_wait < max_wait_time);
duke@1 642
duke@1 643 if (n < size && total_time_in_wait >= max_wait_time)
duke@1 644 {
duke@1 645 // failed to read entire message
duke@1 646 throw wrapper.transportReadTimeoutExceeded(new Integer(size),
duke@1 647 new Integer(n), new Long(max_wait_time),
duke@1 648 new Long(total_time_in_wait));
duke@1 649 }
duke@1 650
duke@1 651 getConnectionCache().stampTime(this);
duke@1 652 }
duke@1 653
duke@1 654 // To support non-channel connections.
duke@1 655 public void readFully(java.io.InputStream is, byte[] buf,
duke@1 656 int offset, int size, long max_wait_time)
duke@1 657 throws IOException
duke@1 658 {
duke@1 659 int n = 0;
duke@1 660 int bytecount = 0;
duke@1 661 long time_to_wait = readTimeouts.get_initial_time_to_wait();
duke@1 662 long total_time_in_wait = 0;
duke@1 663
duke@1 664 // The reading of data incorporates a strategy to detect a
duke@1 665 // rogue client. The strategy is implemented as follows. As
duke@1 666 // long as data is being read, at least 1 byte or more, we
duke@1 667 // assume we have a well behaved client. If no data is read,
duke@1 668 // then we sleep for a time to wait, re-calculate a new time to
duke@1 669 // wait which is lengthier than the previous time spent waiting.
duke@1 670 // Then, if the total time spent waiting does not exceed a
duke@1 671 // maximum time we are willing to wait, we attempt another
duke@1 672 // read. If the maximum amount of time we are willing to
duke@1 673 // spend waiting for more data is exceeded, we throw an
duke@1 674 // IOException.
duke@1 675
duke@1 676 // NOTE: Reading of GIOP headers are treated with a smaller
duke@1 677 // maximum time to wait threshold. Based on extensive
duke@1 678 // performance testing, all GIOP headers are being
duke@1 679 // read in 1 read access.
duke@1 680
duke@1 681 do {
duke@1 682 bytecount = is.read(buf, offset + n, size - n);
duke@1 683 if (bytecount < 0) {
duke@1 684 throw new IOException("End-of-stream");
duke@1 685 }
duke@1 686 else if (bytecount == 0) {
duke@1 687 try {
duke@1 688 Thread.sleep(time_to_wait);
duke@1 689 total_time_in_wait += time_to_wait;
duke@1 690 time_to_wait =
duke@1 691 (long)(time_to_wait*readTimeouts.get_backoff_factor());
duke@1 692 }
duke@1 693 catch (InterruptedException ie) {
duke@1 694 // ignore exception
duke@1 695 if (orb.transportDebugFlag) {
duke@1 696 dprint("readFully(): unexpected exception "
duke@1 697 + ie.toString());
duke@1 698 }
duke@1 699 }
duke@1 700 }
duke@1 701 else {
duke@1 702 n += bytecount;
duke@1 703 }
duke@1 704 }
duke@1 705 while (n < size && total_time_in_wait < max_wait_time);
duke@1 706
duke@1 707 if (n < size && total_time_in_wait >= max_wait_time)
duke@1 708 {
duke@1 709 // failed to read entire message
duke@1 710 throw wrapper.transportReadTimeoutExceeded(new Integer(size),
duke@1 711 new Integer(n), new Long(max_wait_time),
duke@1 712 new Long(total_time_in_wait));
duke@1 713 }
duke@1 714
duke@1 715 getConnectionCache().stampTime(this);
duke@1 716 }
duke@1 717
duke@1 718 public void write(ByteBuffer byteBuffer)
duke@1 719 throws IOException
duke@1 720 {
duke@1 721 if (shouldUseDirectByteBuffers()) {
duke@1 722 /* NOTE: cannot perform this test. If one ask for a
duke@1 723 ByteBuffer from the pool which is bigger than the size
duke@1 724 of ByteBuffers managed by the pool, then the pool will
duke@1 725 return a HeapByteBuffer.
duke@1 726 if (byteBuffer.hasArray()) {
duke@1 727 throw wrapper.unexpectedNonDirectByteBufferWithChannelSocket();
duke@1 728 }
duke@1 729 */
duke@1 730 // IMPORTANT: For non-blocking SocketChannels, there's no guarantee
duke@1 731 // all bytes are written on first write attempt.
duke@1 732 do {
duke@1 733 getSocketChannel().write(byteBuffer);
duke@1 734 }
duke@1 735 while (byteBuffer.hasRemaining());
duke@1 736
duke@1 737 } else {
duke@1 738 if (! byteBuffer.hasArray()) {
duke@1 739 throw wrapper.unexpectedDirectByteBufferWithNonChannelSocket();
duke@1 740 }
duke@1 741 byte[] tmpBuf = byteBuffer.array();
duke@1 742 getSocket().getOutputStream().write(tmpBuf, 0, byteBuffer.limit());
duke@1 743 getSocket().getOutputStream().flush();
duke@1 744 }
duke@1 745
duke@1 746 // TimeStamp connection to indicate it has been used
duke@1 747 // Note granularity of connection usage is assumed for
duke@1 748 // now to be that of a IIOP packet.
duke@1 749 getConnectionCache().stampTime(this);
duke@1 750 }
duke@1 751
duke@1 752 /**
duke@1 753 * Note:it is possible for this to be called more than once
duke@1 754 */
duke@1 755 public synchronized void close()
duke@1 756 {
duke@1 757 try {
duke@1 758 if (orb.transportDebugFlag) {
duke@1 759 dprint(".close->: " + this);
duke@1 760 }
duke@1 761 writeLock();
duke@1 762
duke@1 763 // REVISIT It will be good to have a read lock on the reader thread
duke@1 764 // before we proceed further, to avoid the reader thread (server side)
duke@1 765 // from processing requests. This avoids the risk that a new request
duke@1 766 // will be accepted by ReaderThread while the ListenerThread is
duke@1 767 // attempting to close this connection.
duke@1 768
duke@1 769 if (isBusy()) { // we are busy!
duke@1 770 writeUnlock();
duke@1 771 if (orb.transportDebugFlag) {
duke@1 772 dprint(".close: isBusy so no close: " + this);
duke@1 773 }
duke@1 774 return;
duke@1 775 }
duke@1 776
duke@1 777 try {
duke@1 778 try {
duke@1 779 sendCloseConnection(GIOPVersion.V1_0);
duke@1 780 } catch (Throwable t) {
duke@1 781 wrapper.exceptionWhenSendingCloseConnection(t);
duke@1 782 }
duke@1 783
duke@1 784 synchronized ( stateEvent ){
duke@1 785 state = CLOSE_SENT;
duke@1 786 stateEvent.notifyAll();
duke@1 787 }
duke@1 788
duke@1 789 // stop the reader without causing it to do purgeCalls
duke@1 790 //Exception ex = new Exception();
duke@1 791 //reader.stop(ex); // REVISIT
duke@1 792
duke@1 793 // NOTE: !!!!!!
duke@1 794 // This does writeUnlock().
duke@1 795 purgeCalls(wrapper.connectionRebind(), false, true);
duke@1 796
duke@1 797 } catch (Exception ex) {
duke@1 798 if (orb.transportDebugFlag) {
duke@1 799 dprint(".close: exception: " + this, ex);
duke@1 800 }
duke@1 801 }
duke@1 802 try {
duke@1 803 Selector selector = orb.getTransportManager().getSelector(0);
duke@1 804 selector.unregisterForEvent(this);
duke@1 805 if (socketChannel != null) {
duke@1 806 socketChannel.close();
duke@1 807 }
duke@1 808 socket.close();
duke@1 809 } catch (IOException e) {
duke@1 810 if (orb.transportDebugFlag) {
duke@1 811 dprint(".close: " + this, e);
duke@1 812 }
duke@1 813 }
skoppar@226 814 closeConnectionResources();
duke@1 815 } finally {
duke@1 816 if (orb.transportDebugFlag) {
duke@1 817 dprint(".close<-: " + this);
duke@1 818 }
duke@1 819 }
duke@1 820 }
duke@1 821
skoppar@226 822 public void closeConnectionResources() {
skoppar@226 823 if (orb.transportDebugFlag) {
skoppar@226 824 dprint(".closeConnectionResources->: " + this);
skoppar@226 825 }
skoppar@226 826 Selector selector = orb.getTransportManager().getSelector(0);
skoppar@226 827 selector.unregisterForEvent(this);
skoppar@226 828 try {
skoppar@226 829 if (socketChannel != null)
skoppar@226 830 socketChannel.close() ;
skoppar@226 831 if (socket != null && !socket.isClosed())
skoppar@226 832 socket.close() ;
skoppar@226 833 } catch (IOException e) {
skoppar@226 834 if (orb.transportDebugFlag) {
skoppar@226 835 dprint( ".closeConnectionResources: " + this, e ) ;
skoppar@226 836 }
skoppar@226 837 }
skoppar@226 838 if (orb.transportDebugFlag) {
skoppar@226 839 dprint(".closeConnectionResources<-: " + this);
skoppar@226 840 }
skoppar@226 841 }
skoppar@226 842
skoppar@226 843
duke@1 844 public Acceptor getAcceptor()
duke@1 845 {
duke@1 846 return acceptor;
duke@1 847 }
duke@1 848
duke@1 849 public ContactInfo getContactInfo()
duke@1 850 {
duke@1 851 return contactInfo;
duke@1 852 }
duke@1 853
duke@1 854 public EventHandler getEventHandler()
duke@1 855 {
duke@1 856 return this;
duke@1 857 }
duke@1 858
duke@1 859 public OutputObject createOutputObject(MessageMediator messageMediator)
duke@1 860 {
duke@1 861 // REVISIT - remove this method from Connection and all it subclasses.
duke@1 862 throw new RuntimeException("*****SocketOrChannelConnectionImpl.createOutputObject - should not be called.");
duke@1 863 }
duke@1 864
duke@1 865 // This is used by the GIOPOutputObject in order to
duke@1 866 // throw the correct error when handling code sets.
duke@1 867 // Can we determine if we are on the server side by
duke@1 868 // other means? XREVISIT
duke@1 869 public boolean isServer()
duke@1 870 {
duke@1 871 return isServer;
duke@1 872 }
duke@1 873
duke@1 874 public boolean isBusy()
duke@1 875 {
duke@1 876 if (serverRequestCount > 0 ||
duke@1 877 getResponseWaitingRoom().numberRegistered() > 0)
duke@1 878 {
duke@1 879 return true;
duke@1 880 } else {
duke@1 881 return false;
duke@1 882 }
duke@1 883 }
duke@1 884
duke@1 885 public long getTimeStamp()
duke@1 886 {
duke@1 887 return timeStamp;
duke@1 888 }
duke@1 889
duke@1 890 public void setTimeStamp(long time)
duke@1 891 {
duke@1 892 timeStamp = time;
duke@1 893 }
duke@1 894
duke@1 895 public void setState(String stateString)
duke@1 896 {
duke@1 897 synchronized (stateEvent) {
duke@1 898 if (stateString.equals("ESTABLISHED")) {
duke@1 899 state = ESTABLISHED;
duke@1 900 stateEvent.notifyAll();
duke@1 901 } else {
duke@1 902 // REVISIT: ASSERT
duke@1 903 }
duke@1 904 }
duke@1 905 }
duke@1 906
duke@1 907 /**
duke@1 908 * Sets the writeLock for this connection.
duke@1 909 * If the writeLock is already set by someone else, block till the
duke@1 910 * writeLock is released and can set by us.
duke@1 911 * IMPORTANT: this connection's lock must be acquired before
duke@1 912 * setting the writeLock and must be unlocked after setting the writeLock.
duke@1 913 */
duke@1 914 public void writeLock()
duke@1 915 {
duke@1 916 try {
duke@1 917 if (dprintWriteLocks && orb.transportDebugFlag) {
duke@1 918 dprint(".writeLock->: " + this);
duke@1 919 }
duke@1 920 // Keep looping till we can set the writeLock.
duke@1 921 while ( true ) {
duke@1 922 int localState = state;
duke@1 923 switch ( localState ) {
duke@1 924
duke@1 925 case OPENING:
duke@1 926 synchronized (stateEvent) {
duke@1 927 if (state != OPENING) {
duke@1 928 // somebody has changed 'state' so be careful
duke@1 929 break;
duke@1 930 }
duke@1 931 try {
duke@1 932 stateEvent.wait();
duke@1 933 } catch (InterruptedException ie) {
duke@1 934 if (orb.transportDebugFlag) {
duke@1 935 dprint(".writeLock: OPENING InterruptedException: " + this);
duke@1 936 }
duke@1 937 }
duke@1 938 }
duke@1 939 // Loop back
duke@1 940 break;
duke@1 941
duke@1 942 case ESTABLISHED:
duke@1 943 synchronized (writeEvent) {
duke@1 944 if (!writeLocked) {
duke@1 945 writeLocked = true;
duke@1 946 return;
duke@1 947 }
duke@1 948
duke@1 949 try {
duke@1 950 // do not stay here too long if state != ESTABLISHED
duke@1 951 // Bug 4752117
duke@1 952 while (state == ESTABLISHED && writeLocked) {
duke@1 953 writeEvent.wait(100);
duke@1 954 }
duke@1 955 } catch (InterruptedException ie) {
duke@1 956 if (orb.transportDebugFlag) {
duke@1 957 dprint(".writeLock: ESTABLISHED InterruptedException: " + this);
duke@1 958 }
duke@1 959 }
duke@1 960 }
duke@1 961 // Loop back
duke@1 962 break;
duke@1 963
duke@1 964 //
duke@1 965 // XXX
duke@1 966 // Need to distinguish between client and server roles
duke@1 967 // here probably.
duke@1 968 //
duke@1 969 case ABORT:
duke@1 970 synchronized ( stateEvent ){
duke@1 971 if (state != ABORT) {
duke@1 972 break;
duke@1 973 }
duke@1 974 throw wrapper.writeErrorSend() ;
duke@1 975 }
duke@1 976
duke@1 977 case CLOSE_RECVD:
duke@1 978 // the connection has been closed or closing
duke@1 979 // ==> throw rebind exception
duke@1 980 synchronized ( stateEvent ){
duke@1 981 if (state != CLOSE_RECVD) {
duke@1 982 break;
duke@1 983 }
duke@1 984 throw wrapper.connectionCloseRebind() ;
duke@1 985 }
duke@1 986
duke@1 987 default:
duke@1 988 if (orb.transportDebugFlag) {
duke@1 989 dprint(".writeLock: default: " + this);
duke@1 990 }
duke@1 991 // REVISIT
duke@1 992 throw new RuntimeException(".writeLock: bad state");
duke@1 993 }
duke@1 994 }
duke@1 995 } finally {
duke@1 996 if (dprintWriteLocks && orb.transportDebugFlag) {
duke@1 997 dprint(".writeLock<-: " + this);
duke@1 998 }
duke@1 999 }
duke@1 1000 }
duke@1 1001
duke@1 1002 public void writeUnlock()
duke@1 1003 {
duke@1 1004 try {
duke@1 1005 if (dprintWriteLocks && orb.transportDebugFlag) {
duke@1 1006 dprint(".writeUnlock->: " + this);
duke@1 1007 }
duke@1 1008 synchronized (writeEvent) {
duke@1 1009 writeLocked = false;
duke@1 1010 writeEvent.notify(); // wake up one guy waiting to write
duke@1 1011 }
duke@1 1012 } finally {
duke@1 1013 if (dprintWriteLocks && orb.transportDebugFlag) {
duke@1 1014 dprint(".writeUnlock<-: " + this);
duke@1 1015 }
duke@1 1016 }
duke@1 1017 }
duke@1 1018
duke@1 1019 // Assumes the caller handles writeLock and writeUnlock
duke@1 1020 public void sendWithoutLock(OutputObject outputObject)
duke@1 1021 {
duke@1 1022 // Don't we need to check for CloseConnection
duke@1 1023 // here? REVISIT
duke@1 1024
duke@1 1025 // XREVISIT - Shouldn't the MessageMediator
duke@1 1026 // be the one to handle writing the data here?
duke@1 1027
duke@1 1028 try {
duke@1 1029
duke@1 1030 // Write the fragment/message
duke@1 1031
duke@1 1032 CDROutputObject cdrOutputObject = (CDROutputObject) outputObject;
duke@1 1033 cdrOutputObject.writeTo(this);
duke@1 1034 // REVISIT - no flush?
duke@1 1035 //socket.getOutputStream().flush();
duke@1 1036
duke@1 1037 } catch (IOException e1) {
duke@1 1038
duke@1 1039 /*
duke@1 1040 * ADDED(Ram J) 10/13/2000 In the event of an IOException, try
duke@1 1041 * sending a CancelRequest for regular requests / locate requests
duke@1 1042 */
duke@1 1043
duke@1 1044 // Since IIOPOutputStream's msgheader is set only once, and not
duke@1 1045 // altered during sending multiple fragments, the original
duke@1 1046 // msgheader will always have the requestId.
duke@1 1047 // REVISIT This could be optimized to send a CancelRequest only
duke@1 1048 // if any fragments had been sent already.
duke@1 1049
duke@1 1050 /* REVISIT: MOVE TO SUBCONTRACT
duke@1 1051 Message msg = os.getMessage();
duke@1 1052 if (msg.getType() == Message.GIOPRequest ||
duke@1 1053 msg.getType() == Message.GIOPLocateRequest) {
duke@1 1054 GIOPVersion requestVersion = msg.getGIOPVersion();
duke@1 1055 int requestId = MessageBase.getRequestId(msg);
duke@1 1056 try {
duke@1 1057 sendCancelRequest(requestVersion, requestId);
duke@1 1058 } catch (IOException e2) {
duke@1 1059 // most likely an abortive connection closure.
duke@1 1060 // ignore, since nothing more can be done.
duke@1 1061 if (orb.transportDebugFlag) {
duke@1 1062
duke@1 1063 }
duke@1 1064 }
duke@1 1065 */
duke@1 1066
duke@1 1067 // REVISIT When a send failure happens, purgeCalls() need to be
duke@1 1068 // called to ensure that the connection is properly removed from
duke@1 1069 // further usage (ie., cancelling pending requests with COMM_FAILURE
duke@1 1070 // with an appropriate minor_code CompletionStatus.MAY_BE).
duke@1 1071
duke@1 1072 // Relying on the IIOPOutputStream (as noted below) is not
duke@1 1073 // sufficient as it handles COMM_FAILURE only for the final
duke@1 1074 // fragment (during invoke processing). Note that COMM_FAILURE could
duke@1 1075 // happen while sending the initial fragments.
duke@1 1076 // Also the IIOPOutputStream does not properly close the connection.
duke@1 1077 // It simply removes the connection from the table. An orderly
duke@1 1078 // closure is needed (ie., cancel pending requests on the connection
duke@1 1079 // COMM_FAILURE as well.
duke@1 1080
duke@1 1081 // IIOPOutputStream will cleanup the connection info when it
duke@1 1082 // sees this exception.
tbell@68 1083 SystemException exc = wrapper.writeErrorSend(e1);
tbell@68 1084 purgeCalls(exc, false, true);
tbell@68 1085 throw exc;
duke@1 1086 }
duke@1 1087 }
duke@1 1088
duke@1 1089 public void registerWaiter(MessageMediator messageMediator)
duke@1 1090 {
duke@1 1091 responseWaitingRoom.registerWaiter(messageMediator);
duke@1 1092 }
duke@1 1093
duke@1 1094 public void unregisterWaiter(MessageMediator messageMediator)
duke@1 1095 {
duke@1 1096 responseWaitingRoom.unregisterWaiter(messageMediator);
duke@1 1097 }
duke@1 1098
duke@1 1099 public InputObject waitForResponse(MessageMediator messageMediator)
duke@1 1100 {
duke@1 1101 return responseWaitingRoom.waitForResponse(messageMediator);
duke@1 1102 }
duke@1 1103
duke@1 1104 public void setConnectionCache(ConnectionCache connectionCache)
duke@1 1105 {
duke@1 1106 this.connectionCache = connectionCache;
duke@1 1107 }
duke@1 1108
duke@1 1109 public ConnectionCache getConnectionCache()
duke@1 1110 {
duke@1 1111 return connectionCache;
duke@1 1112 }
duke@1 1113
duke@1 1114 ////////////////////////////////////////////////////
duke@1 1115 //
duke@1 1116 // EventHandler methods
duke@1 1117 //
duke@1 1118
duke@1 1119 public void setUseSelectThreadToWait(boolean x)
duke@1 1120 {
duke@1 1121 useSelectThreadToWait = x;
duke@1 1122 // REVISIT - Reading of a GIOP header only is information
duke@1 1123 // that should be passed into the constructor
duke@1 1124 // from the SocketOrChannelConnection factory.
duke@1 1125 setReadGiopHeaderOnly(shouldUseSelectThreadToWait());
duke@1 1126 }
duke@1 1127
duke@1 1128 public void handleEvent()
duke@1 1129 {
duke@1 1130 if (orb.transportDebugFlag) {
duke@1 1131 dprint(".handleEvent->: " + this);
duke@1 1132 }
duke@1 1133 getSelectionKey().interestOps(getSelectionKey().interestOps() &
duke@1 1134 (~ getInterestOps()));
duke@1 1135
duke@1 1136 if (shouldUseWorkerThreadForEvent()) {
duke@1 1137 Throwable throwable = null;
duke@1 1138 try {
duke@1 1139 int poolToUse = 0;
duke@1 1140 if (shouldReadGiopHeaderOnly()) {
duke@1 1141 partialMessageMediator = readBits();
duke@1 1142 poolToUse =
duke@1 1143 partialMessageMediator.getThreadPoolToUse();
duke@1 1144 }
duke@1 1145
duke@1 1146 if (orb.transportDebugFlag) {
duke@1 1147 dprint(".handleEvent: addWork to pool: " + poolToUse);
duke@1 1148 }
duke@1 1149 orb.getThreadPoolManager().getThreadPool(poolToUse)
duke@1 1150 .getWorkQueue(0).addWork(getWork());
duke@1 1151 } catch (NoSuchThreadPoolException e) {
duke@1 1152 throwable = e;
duke@1 1153 } catch (NoSuchWorkQueueException e) {
duke@1 1154 throwable = e;
duke@1 1155 }
duke@1 1156 // REVISIT: need to close connection.
duke@1 1157 if (throwable != null) {
duke@1 1158 if (orb.transportDebugFlag) {
duke@1 1159 dprint(".handleEvent: " + throwable);
duke@1 1160 }
duke@1 1161 INTERNAL i = new INTERNAL("NoSuchThreadPoolException");
duke@1 1162 i.initCause(throwable);
duke@1 1163 throw i;
duke@1 1164 }
duke@1 1165 } else {
duke@1 1166 if (orb.transportDebugFlag) {
duke@1 1167 dprint(".handleEvent: doWork");
duke@1 1168 }
duke@1 1169 getWork().doWork();
duke@1 1170 }
duke@1 1171 if (orb.transportDebugFlag) {
duke@1 1172 dprint(".handleEvent<-: " + this);
duke@1 1173 }
duke@1 1174 }
duke@1 1175
duke@1 1176 public SelectableChannel getChannel()
duke@1 1177 {
duke@1 1178 return socketChannel;
duke@1 1179 }
duke@1 1180
duke@1 1181 public int getInterestOps()
duke@1 1182 {
duke@1 1183 return SelectionKey.OP_READ;
duke@1 1184 }
duke@1 1185
duke@1 1186 // public Acceptor getAcceptor() - already defined above.
duke@1 1187
duke@1 1188 public Connection getConnection()
duke@1 1189 {
duke@1 1190 return this;
duke@1 1191 }
duke@1 1192
duke@1 1193 ////////////////////////////////////////////////////
duke@1 1194 //
duke@1 1195 // Work methods.
duke@1 1196 //
duke@1 1197
duke@1 1198 public String getName()
duke@1 1199 {
duke@1 1200 return this.toString();
duke@1 1201 }
duke@1 1202
duke@1 1203 public void doWork()
duke@1 1204 {
duke@1 1205 try {
duke@1 1206 if (orb.transportDebugFlag) {
duke@1 1207 dprint(".doWork->: " + this);
duke@1 1208 }
duke@1 1209
duke@1 1210 // IMPORTANT: Sanity checks on SelectionKeys such as
duke@1 1211 // SelectorKey.isValid() should not be done
duke@1 1212 // here.
duke@1 1213 //
duke@1 1214
duke@1 1215 if (!shouldReadGiopHeaderOnly()) {
duke@1 1216 read();
duke@1 1217 }
duke@1 1218 else {
duke@1 1219 // get the partialMessageMediator
duke@1 1220 // created by SelectorThread
duke@1 1221 CorbaMessageMediator messageMediator =
duke@1 1222 this.getPartialMessageMediator();
duke@1 1223
duke@1 1224 // read remaining info needed in a MessageMediator
duke@1 1225 messageMediator = finishReadingBits(messageMediator);
duke@1 1226
duke@1 1227 if (messageMediator != null) {
duke@1 1228 // Null can happen when client closes stream
duke@1 1229 // causing purgecalls.
duke@1 1230 dispatch(messageMediator);
duke@1 1231 }
duke@1 1232 }
duke@1 1233 } catch (Throwable t) {
duke@1 1234 if (orb.transportDebugFlag) {
duke@1 1235 dprint(".doWork: ignoring Throwable: "
duke@1 1236 + t
duke@1 1237 + " " + this);
duke@1 1238 }
duke@1 1239 } finally {
duke@1 1240 if (orb.transportDebugFlag) {
duke@1 1241 dprint(".doWork<-: " + this);
duke@1 1242 }
duke@1 1243 }
duke@1 1244 }
duke@1 1245
duke@1 1246 public void setEnqueueTime(long timeInMillis)
duke@1 1247 {
duke@1 1248 enqueueTime = timeInMillis;
duke@1 1249 }
duke@1 1250
duke@1 1251 public long getEnqueueTime()
duke@1 1252 {
duke@1 1253 return enqueueTime;
duke@1 1254 }
duke@1 1255
duke@1 1256 ////////////////////////////////////////////////////
duke@1 1257 //
duke@1 1258 // spi.transport.CorbaConnection.
duke@1 1259 //
duke@1 1260
duke@1 1261 // IMPORTANT: Reader Threads must NOT read Giop header only.
duke@1 1262 public boolean shouldReadGiopHeaderOnly() {
duke@1 1263 return shouldReadGiopHeaderOnly;
duke@1 1264 }
duke@1 1265
duke@1 1266 protected void setReadGiopHeaderOnly(boolean shouldReadHeaderOnly) {
duke@1 1267 shouldReadGiopHeaderOnly = shouldReadHeaderOnly;
duke@1 1268 }
duke@1 1269
duke@1 1270 public ResponseWaitingRoom getResponseWaitingRoom()
duke@1 1271 {
duke@1 1272 return responseWaitingRoom;
duke@1 1273 }
duke@1 1274
duke@1 1275 // REVISIT - inteface defines isServer but already defined in
duke@1 1276 // higher interface.
duke@1 1277
duke@1 1278 public void serverRequestMapPut(int requestId,
duke@1 1279 CorbaMessageMediator messageMediator)
duke@1 1280 {
duke@1 1281 serverRequestMap.put(new Integer(requestId), messageMediator);
duke@1 1282 }
duke@1 1283
duke@1 1284 public CorbaMessageMediator serverRequestMapGet(int requestId)
duke@1 1285 {
duke@1 1286 return (CorbaMessageMediator)
duke@1 1287 serverRequestMap.get(new Integer(requestId));
duke@1 1288 }
duke@1 1289
duke@1 1290 public void serverRequestMapRemove(int requestId)
duke@1 1291 {
duke@1 1292 serverRequestMap.remove(new Integer(requestId));
duke@1 1293 }
duke@1 1294
duke@1 1295
duke@1 1296 // REVISIT: this is also defined in:
duke@1 1297 // com.sun.corba.se.spi.legacy.connection.Connection
duke@1 1298 public java.net.Socket getSocket()
duke@1 1299 {
duke@1 1300 return socket;
duke@1 1301 }
duke@1 1302
duke@1 1303 /** It is possible for a Close Connection to have been
duke@1 1304 ** sent here, but we will not check for this. A "lazy"
duke@1 1305 ** Exception will be thrown in the Worker thread after the
duke@1 1306 ** incoming request has been processed even though the connection
duke@1 1307 ** is closed before the request is processed. This is o.k because
duke@1 1308 ** it is a boundary condition. To prevent it we would have to add
duke@1 1309 ** more locks which would reduce performance in the normal case.
duke@1 1310 **/
duke@1 1311 public synchronized void serverRequestProcessingBegins()
duke@1 1312 {
duke@1 1313 serverRequestCount++;
duke@1 1314 }
duke@1 1315
duke@1 1316 public synchronized void serverRequestProcessingEnds()
duke@1 1317 {
duke@1 1318 serverRequestCount--;
duke@1 1319 }
duke@1 1320
duke@1 1321 //
duke@1 1322 //
duke@1 1323 //
duke@1 1324
duke@1 1325 public synchronized int getNextRequestId()
duke@1 1326 {
duke@1 1327 return requestId++;
duke@1 1328 }
duke@1 1329
duke@1 1330 // Negotiated code sets for char and wchar data
duke@1 1331 protected CodeSetComponentInfo.CodeSetContext codeSetContext = null;
duke@1 1332
duke@1 1333 public ORB getBroker()
duke@1 1334 {
duke@1 1335 return orb;
duke@1 1336 }
duke@1 1337
duke@1 1338 public CodeSetComponentInfo.CodeSetContext getCodeSetContext() {
duke@1 1339 // Needs to be synchronized for the following case when the client
duke@1 1340 // doesn't send the code set context twice, and we have two threads
duke@1 1341 // in ServerRequestDispatcher processCodeSetContext.
duke@1 1342 //
duke@1 1343 // Thread A checks to see if there is a context, there is none, so
duke@1 1344 // it calls setCodeSetContext, getting the synch lock.
duke@1 1345 // Thread B checks to see if there is a context. If we didn't synch,
duke@1 1346 // it might decide to outlaw wchar/wstring.
duke@1 1347 if (codeSetContext == null) {
duke@1 1348 synchronized(this) {
duke@1 1349 return codeSetContext;
duke@1 1350 }
duke@1 1351 }
duke@1 1352
duke@1 1353 return codeSetContext;
duke@1 1354 }
duke@1 1355
duke@1 1356 public synchronized void setCodeSetContext(CodeSetComponentInfo.CodeSetContext csc) {
duke@1 1357 // Double check whether or not we need to do this
duke@1 1358 if (codeSetContext == null) {
duke@1 1359
duke@1 1360 if (OSFCodeSetRegistry.lookupEntry(csc.getCharCodeSet()) == null ||
duke@1 1361 OSFCodeSetRegistry.lookupEntry(csc.getWCharCodeSet()) == null) {
duke@1 1362 // If the client says it's negotiated a code set that
duke@1 1363 // isn't a fallback and we never said we support, then
duke@1 1364 // it has a bug.
duke@1 1365 throw wrapper.badCodesetsFromClient() ;
duke@1 1366 }
duke@1 1367
duke@1 1368 codeSetContext = csc;
duke@1 1369 }
duke@1 1370 }
duke@1 1371
duke@1 1372 //
duke@1 1373 // from iiop.IIOPConnection.java
duke@1 1374 //
duke@1 1375
duke@1 1376 // Map request ID to an InputObject.
duke@1 1377 // This is so the client thread can start unmarshaling
duke@1 1378 // the reply and remove it from the out_calls map while the
duke@1 1379 // ReaderThread can still obtain the input stream to give
duke@1 1380 // new fragments. Only the ReaderThread touches the clientReplyMap,
duke@1 1381 // so it doesn't incur synchronization overhead.
duke@1 1382
duke@1 1383 public MessageMediator clientRequestMapGet(int requestId)
duke@1 1384 {
duke@1 1385 return responseWaitingRoom.getMessageMediator(requestId);
duke@1 1386 }
duke@1 1387
duke@1 1388 protected MessageMediator clientReply_1_1;
duke@1 1389
duke@1 1390 public void clientReply_1_1_Put(MessageMediator x)
duke@1 1391 {
duke@1 1392 clientReply_1_1 = x;
duke@1 1393 }
duke@1 1394
duke@1 1395 public MessageMediator clientReply_1_1_Get()
duke@1 1396 {
duke@1 1397 return clientReply_1_1;
duke@1 1398 }
duke@1 1399
duke@1 1400 public void clientReply_1_1_Remove()
duke@1 1401 {
duke@1 1402 clientReply_1_1 = null;
duke@1 1403 }
duke@1 1404
duke@1 1405 protected MessageMediator serverRequest_1_1;
duke@1 1406
duke@1 1407 public void serverRequest_1_1_Put(MessageMediator x)
duke@1 1408 {
duke@1 1409 serverRequest_1_1 = x;
duke@1 1410 }
duke@1 1411
duke@1 1412 public MessageMediator serverRequest_1_1_Get()
duke@1 1413 {
duke@1 1414 return serverRequest_1_1;
duke@1 1415 }
duke@1 1416
duke@1 1417 public void serverRequest_1_1_Remove()
duke@1 1418 {
duke@1 1419 serverRequest_1_1 = null;
duke@1 1420 }
duke@1 1421
duke@1 1422 protected String getStateString( int state )
duke@1 1423 {
duke@1 1424 synchronized ( stateEvent ){
duke@1 1425 switch (state) {
duke@1 1426 case OPENING : return "OPENING" ;
duke@1 1427 case ESTABLISHED : return "ESTABLISHED" ;
duke@1 1428 case CLOSE_SENT : return "CLOSE_SENT" ;
duke@1 1429 case CLOSE_RECVD : return "CLOSE_RECVD" ;
duke@1 1430 case ABORT : return "ABORT" ;
duke@1 1431 default : return "???" ;
duke@1 1432 }
duke@1 1433 }
duke@1 1434 }
duke@1 1435
duke@1 1436 public synchronized boolean isPostInitialContexts() {
duke@1 1437 return postInitialContexts;
duke@1 1438 }
duke@1 1439
duke@1 1440 // Can never be unset...
duke@1 1441 public synchronized void setPostInitialContexts(){
duke@1 1442 postInitialContexts = true;
duke@1 1443 }
duke@1 1444
duke@1 1445 /**
duke@1 1446 * Wake up the outstanding requests on the connection, and hand them
duke@1 1447 * COMM_FAILURE exception with a given minor code.
duke@1 1448 *
duke@1 1449 * Also, delete connection from connection table and
duke@1 1450 * stop the reader thread.
duke@1 1451
duke@1 1452 * Note that this should only ever be called by the Reader thread for
duke@1 1453 * this connection.
duke@1 1454 *
duke@1 1455 * @param minor_code The minor code for the COMM_FAILURE major code.
duke@1 1456 * @param die Kill the reader thread (this thread) before exiting.
duke@1 1457 */
duke@1 1458 public void purgeCalls(SystemException systemException,
duke@1 1459 boolean die, boolean lockHeld)
duke@1 1460 {
duke@1 1461 int minor_code = systemException.minor;
duke@1 1462
duke@1 1463 try{
duke@1 1464 if (orb.transportDebugFlag) {
duke@1 1465 dprint(".purgeCalls->: "
duke@1 1466 + minor_code + "/" + die + "/" + lockHeld
duke@1 1467 + " " + this);
duke@1 1468 }
duke@1 1469
duke@1 1470 // If this invocation is a result of ThreadDeath caused
duke@1 1471 // by a previous execution of this routine, just exit.
duke@1 1472
duke@1 1473 synchronized ( stateEvent ){
duke@1 1474 if ((state == ABORT) || (state == CLOSE_RECVD)) {
duke@1 1475 if (orb.transportDebugFlag) {
duke@1 1476 dprint(".purgeCalls: exiting since state is: "
duke@1 1477 + getStateString(state)
duke@1 1478 + " " + this);
duke@1 1479 }
duke@1 1480 return;
duke@1 1481 }
duke@1 1482 }
duke@1 1483
duke@1 1484 // Grab the writeLock (freeze the calls)
duke@1 1485 try {
duke@1 1486 if (!lockHeld) {
duke@1 1487 writeLock();
duke@1 1488 }
duke@1 1489 } catch (SystemException ex) {
duke@1 1490 if (orb.transportDebugFlag)
duke@1 1491 dprint(".purgeCalls: SystemException" + ex
duke@1 1492 + "; continuing " + this);
duke@1 1493 }
duke@1 1494
duke@1 1495 // Mark the state of the connection
duke@1 1496 // and determine the request status
duke@1 1497 org.omg.CORBA.CompletionStatus completion_status;
duke@1 1498 synchronized ( stateEvent ){
duke@1 1499 if (minor_code == ORBUtilSystemException.CONNECTION_REBIND) {
duke@1 1500 state = CLOSE_RECVD;
duke@1 1501 systemException.completed = CompletionStatus.COMPLETED_NO;
duke@1 1502 } else {
duke@1 1503 state = ABORT;
duke@1 1504 systemException.completed = CompletionStatus.COMPLETED_MAYBE;
duke@1 1505 }
duke@1 1506 stateEvent.notifyAll();
duke@1 1507 }
duke@1 1508
duke@1 1509 try {
duke@1 1510 socket.getInputStream().close();
duke@1 1511 socket.getOutputStream().close();
duke@1 1512 socket.close();
duke@1 1513 } catch (Exception ex) {
duke@1 1514 if (orb.transportDebugFlag) {
duke@1 1515 dprint(".purgeCalls: Exception closing socket: " + ex
duke@1 1516 + " " + this);
duke@1 1517 }
duke@1 1518 }
duke@1 1519
duke@1 1520 // Signal all threads with outstanding requests on this
duke@1 1521 // connection and give them the SystemException;
duke@1 1522
duke@1 1523 responseWaitingRoom.signalExceptionToAllWaiters(systemException);
duke@1 1524
duke@1 1525 if (contactInfo != null) {
duke@1 1526 ((OutboundConnectionCache)getConnectionCache()).remove(contactInfo);
duke@1 1527 } else if (acceptor != null) {
duke@1 1528 ((InboundConnectionCache)getConnectionCache()).remove(this);
duke@1 1529 }
duke@1 1530
duke@1 1531 //
duke@1 1532 // REVISIT: Stop the reader thread
duke@1 1533 //
duke@1 1534
duke@1 1535 // Signal all the waiters of the writeLock.
duke@1 1536 // There are 4 types of writeLock waiters:
duke@1 1537 // 1. Send waiters:
duke@1 1538 // 2. SendReply waiters:
duke@1 1539 // 3. cleanUp waiters:
duke@1 1540 // 4. purge_call waiters:
duke@1 1541 //
duke@1 1542
duke@1 1543 writeUnlock();
duke@1 1544
duke@1 1545 } finally {
duke@1 1546 if (orb.transportDebugFlag) {
duke@1 1547 dprint(".purgeCalls<-: "
duke@1 1548 + minor_code + "/" + die + "/" + lockHeld
duke@1 1549 + " " + this);
duke@1 1550 }
duke@1 1551 }
duke@1 1552 }
duke@1 1553
duke@1 1554 /*************************************************************************
duke@1 1555 * The following methods are for dealing with Connection cleaning for
duke@1 1556 * better scalability of servers in high network load conditions.
duke@1 1557 **************************************************************************/
duke@1 1558
duke@1 1559 public void sendCloseConnection(GIOPVersion giopVersion)
duke@1 1560 throws IOException
duke@1 1561 {
duke@1 1562 Message msg = MessageBase.createCloseConnection(giopVersion);
duke@1 1563 sendHelper(giopVersion, msg);
duke@1 1564 }
duke@1 1565
duke@1 1566 public void sendMessageError(GIOPVersion giopVersion)
duke@1 1567 throws IOException
duke@1 1568 {
duke@1 1569 Message msg = MessageBase.createMessageError(giopVersion);
duke@1 1570 sendHelper(giopVersion, msg);
duke@1 1571 }
duke@1 1572
duke@1 1573 /**
duke@1 1574 * Send a CancelRequest message. This does not lock the connection, so the
duke@1 1575 * caller needs to ensure this method is called appropriately.
duke@1 1576 * @exception IOException - could be due to abortive connection closure.
duke@1 1577 */
duke@1 1578 public void sendCancelRequest(GIOPVersion giopVersion, int requestId)
duke@1 1579 throws IOException
duke@1 1580 {
duke@1 1581
duke@1 1582 Message msg = MessageBase.createCancelRequest(giopVersion, requestId);
duke@1 1583 sendHelper(giopVersion, msg);
duke@1 1584 }
duke@1 1585
duke@1 1586 protected void sendHelper(GIOPVersion giopVersion, Message msg)
duke@1 1587 throws IOException
duke@1 1588 {
duke@1 1589 // REVISIT: See comments in CDROutputObject constructor.
duke@1 1590 CDROutputObject outputObject =
duke@1 1591 new CDROutputObject((ORB)orb, null, giopVersion, this, msg,
duke@1 1592 ORBConstants.STREAM_FORMAT_VERSION_1);
duke@1 1593 msg.write(outputObject);
duke@1 1594
duke@1 1595 outputObject.writeTo(this);
duke@1 1596 }
duke@1 1597
duke@1 1598 public void sendCancelRequestWithLock(GIOPVersion giopVersion,
duke@1 1599 int requestId)
duke@1 1600 throws IOException
duke@1 1601 {
duke@1 1602 writeLock();
duke@1 1603 try {
duke@1 1604 sendCancelRequest(giopVersion, requestId);
duke@1 1605 } finally {
duke@1 1606 writeUnlock();
duke@1 1607 }
duke@1 1608 }
duke@1 1609
duke@1 1610 // Begin Code Base methods ---------------------------------------
duke@1 1611 //
duke@1 1612 // Set this connection's code base IOR. The IOR comes from the
duke@1 1613 // SendingContext. This is an optional service context, but all
duke@1 1614 // JavaSoft ORBs send it.
duke@1 1615 //
duke@1 1616 // The set and get methods don't need to be synchronized since the
duke@1 1617 // first possible get would occur during reading a valuetype, and
duke@1 1618 // that would be after the set.
duke@1 1619
duke@1 1620 // Sets this connection's code base IOR. This is done after
duke@1 1621 // getting the IOR out of the SendingContext service context.
duke@1 1622 // Our ORBs always send this, but it's optional in CORBA.
duke@1 1623
duke@1 1624 public final void setCodeBaseIOR(IOR ior) {
duke@1 1625 codeBaseServerIOR = ior;
duke@1 1626 }
duke@1 1627
duke@1 1628 public final IOR getCodeBaseIOR() {
duke@1 1629 return codeBaseServerIOR;
duke@1 1630 }
duke@1 1631
duke@1 1632 // Get a CodeBase stub to use in unmarshaling. The CachedCodeBase
duke@1 1633 // won't connect to the remote codebase unless it's necessary.
duke@1 1634 public final CodeBase getCodeBase() {
duke@1 1635 return cachedCodeBase;
duke@1 1636 }
duke@1 1637
duke@1 1638 // End Code Base methods -----------------------------------------
duke@1 1639
duke@1 1640 // set transport read thresholds
duke@1 1641 protected void setReadTimeouts(ReadTimeouts readTimeouts) {
duke@1 1642 this.readTimeouts = readTimeouts;
duke@1 1643 }
duke@1 1644
duke@1 1645 protected void setPartialMessageMediator(CorbaMessageMediator messageMediator) {
duke@1 1646 partialMessageMediator = messageMediator;
duke@1 1647 }
duke@1 1648
duke@1 1649 protected CorbaMessageMediator getPartialMessageMediator() {
duke@1 1650 return partialMessageMediator;
duke@1 1651 }
duke@1 1652
duke@1 1653 public String toString()
duke@1 1654 {
duke@1 1655 synchronized ( stateEvent ){
duke@1 1656 return
duke@1 1657 "SocketOrChannelConnectionImpl[" + " "
duke@1 1658 + (socketChannel == null ?
duke@1 1659 socket.toString() : socketChannel.toString()) + " "
duke@1 1660 + getStateString( state ) + " "
duke@1 1661 + shouldUseSelectThreadToWait() + " "
duke@1 1662 + shouldUseWorkerThreadForEvent() + " "
duke@1 1663 + shouldReadGiopHeaderOnly()
duke@1 1664 + "]" ;
duke@1 1665 }
duke@1 1666 }
duke@1 1667
duke@1 1668 // Must be public - used in encoding.
duke@1 1669 public void dprint(String msg)
duke@1 1670 {
duke@1 1671 ORBUtility.dprint("SocketOrChannelConnectionImpl", msg);
duke@1 1672 }
duke@1 1673
duke@1 1674 protected void dprint(String msg, Throwable t)
duke@1 1675 {
duke@1 1676 dprint(msg);
duke@1 1677 t.printStackTrace(System.out);
duke@1 1678 }
duke@1 1679 }
duke@1 1680
duke@1 1681 // End of file.

mercurial