Fri, 04 Apr 2014 14:58:04 +0400
8029073: (corba) New connection reclaimed when number of connection is greater than highwatermark
Reviewed-by: coffeys
1 /*
2 * Copyright (c) 2001, 2013, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
26 package com.sun.corba.se.impl.transport;
28 import java.io.IOException;
29 import java.net.InetSocketAddress;
30 import java.net.ServerSocket;
31 import java.net.Socket;
32 import java.nio.channels.SelectableChannel;
33 import java.nio.channels.SelectionKey;
34 import java.nio.channels.ServerSocketChannel;
35 import java.nio.channels.SocketChannel;
36 import java.util.Iterator;
38 import com.sun.corba.se.pept.broker.Broker;
39 import com.sun.corba.se.pept.encoding.InputObject;
40 import com.sun.corba.se.pept.encoding.OutputObject;
41 import com.sun.corba.se.pept.protocol.MessageMediator;
42 import com.sun.corba.se.pept.transport.Acceptor;
43 import com.sun.corba.se.pept.transport.Connection;
44 import com.sun.corba.se.pept.transport.ContactInfo;
45 import com.sun.corba.se.pept.transport.EventHandler;
46 import com.sun.corba.se.pept.transport.InboundConnectionCache;
47 import com.sun.corba.se.pept.transport.Selector;
49 import com.sun.corba.se.spi.extension.RequestPartitioningPolicy;
50 import com.sun.corba.se.spi.ior.IORTemplate;
51 import com.sun.corba.se.spi.ior.TaggedProfileTemplate;
52 import com.sun.corba.se.spi.ior.iiop.IIOPAddress ;
53 import com.sun.corba.se.spi.ior.iiop.IIOPFactories;
54 import com.sun.corba.se.spi.ior.iiop.IIOPProfileTemplate ;
55 import com.sun.corba.se.spi.ior.iiop.GIOPVersion ;
56 import com.sun.corba.se.spi.ior.iiop.AlternateIIOPAddressComponent;
57 import com.sun.corba.se.spi.logging.CORBALogDomains;
58 import com.sun.corba.se.spi.orb.ORB;
59 import com.sun.corba.se.spi.orbutil.threadpool.Work;
60 import com.sun.corba.se.spi.protocol.CorbaMessageMediator;
61 import com.sun.corba.se.spi.transport.CorbaAcceptor;
62 import com.sun.corba.se.spi.transport.CorbaConnection;
63 import com.sun.corba.se.spi.transport.SocketInfo;
64 import com.sun.corba.se.spi.transport.SocketOrChannelAcceptor;
66 import com.sun.corba.se.impl.encoding.CDRInputObject;
67 import com.sun.corba.se.impl.encoding.CDROutputObject;
68 import com.sun.corba.se.impl.logging.ORBUtilSystemException;
69 import com.sun.corba.se.impl.oa.poa.Policies; // REVISIT impl/poa specific
70 import com.sun.corba.se.impl.orbutil.ORBConstants;
71 import com.sun.corba.se.impl.orbutil.ORBUtility;
73 // BEGIN Legacy support.
74 import com.sun.corba.se.spi.legacy.connection.LegacyServerSocketEndPointInfo;
75 // END Legacy support.
77 /**
78 * @author Harold Carr
79 */
80 public class SocketOrChannelAcceptorImpl
81 extends
82 EventHandlerBase
83 implements
84 CorbaAcceptor,
85 SocketOrChannelAcceptor,
86 Work,
87 // BEGIN Legacy
88 SocketInfo,
89 LegacyServerSocketEndPointInfo
90 // END Legacy
91 {
92 protected ServerSocketChannel serverSocketChannel;
93 protected ServerSocket serverSocket;
94 protected int port;
95 protected long enqueueTime;
96 protected boolean initialized;
97 protected ORBUtilSystemException wrapper ;
98 protected InboundConnectionCache connectionCache;
100 // BEGIN Legacy
101 protected String type = "";
102 protected String name = "";
103 protected String hostname;
104 protected int locatorPort;
105 // END Legacy
107 public SocketOrChannelAcceptorImpl(ORB orb)
108 {
109 this.orb = orb;
110 wrapper = ORBUtilSystemException.get( orb,
111 CORBALogDomains.RPC_TRANSPORT ) ;
113 setWork(this);
114 initialized = false;
116 // BEGIN Legacy support.
117 this.hostname = orb.getORBData().getORBServerHost();
118 this.name = LegacyServerSocketEndPointInfo.NO_NAME;
119 this.locatorPort = -1;
120 // END Legacy support.
121 }
123 public SocketOrChannelAcceptorImpl(ORB orb, int port)
124 {
125 this(orb);
126 this.port = port;
127 }
129 // BEGIN Legacy support.
130 public SocketOrChannelAcceptorImpl(ORB orb, int port,
131 String name, String type)
132 {
133 this(orb, port);
134 this.name = name;
135 this.type = type;
136 }
137 // END Legacy support.
139 ////////////////////////////////////////////////////
140 //
141 // pept.transport.Acceptor
142 //
144 public boolean initialize()
145 {
146 if (initialized) {
147 return false;
148 }
149 if (orb.transportDebugFlag) {
150 dprint(".initialize: " + this);
151 }
152 InetSocketAddress inetSocketAddress = null;
153 try {
154 if (orb.getORBData().getListenOnAllInterfaces().equals(ORBConstants.LISTEN_ON_ALL_INTERFACES)) {
155 inetSocketAddress = new InetSocketAddress(port);
156 } else {
157 String host = orb.getORBData().getORBServerHost();
158 inetSocketAddress = new InetSocketAddress(host, port);
159 }
160 serverSocket = orb.getORBData().getSocketFactory()
161 .createServerSocket(type, inetSocketAddress);
162 internalInitialize();
163 } catch (Throwable t) {
164 throw wrapper.createListenerFailed( t, Integer.toString(port) ) ;
165 }
166 initialized = true;
167 return true;
168 }
170 protected void internalInitialize()
171 throws Exception
172 {
173 // Determine the listening port (for the IOR).
174 // This is important when using emphemeral ports (i.e.,
175 // when the port value to the constructor is 0).
177 port = serverSocket.getLocalPort();
179 // Register with transport (also sets up monitoring).
181 orb.getCorbaTransportManager().getInboundConnectionCache(this);
183 // Finish configuation.
185 serverSocketChannel = serverSocket.getChannel();
187 if (serverSocketChannel != null) {
188 setUseSelectThreadToWait(
189 orb.getORBData().acceptorSocketUseSelectThreadToWait());
190 serverSocketChannel.configureBlocking(
191 ! orb.getORBData().acceptorSocketUseSelectThreadToWait());
192 } else {
193 // Configure to use listener and reader threads.
194 setUseSelectThreadToWait(false);
195 }
196 setUseWorkerThreadForEvent(
197 orb.getORBData().acceptorSocketUseWorkerThreadForEvent());
199 }
201 public boolean initialized()
202 {
203 return initialized;
204 }
206 public String getConnectionCacheType()
207 {
208 return this.getClass().toString();
209 }
211 public void setConnectionCache(InboundConnectionCache connectionCache)
212 {
213 this.connectionCache = connectionCache;
214 }
216 public InboundConnectionCache getConnectionCache()
217 {
218 return connectionCache;
219 }
221 public boolean shouldRegisterAcceptEvent()
222 {
223 return true;
224 }
226 public void accept()
227 {
228 try {
229 SocketChannel socketChannel = null;
230 Socket socket = null;
231 if (serverSocketChannel == null) {
232 socket = serverSocket.accept();
233 } else {
234 socketChannel = serverSocketChannel.accept();
235 socket = socketChannel.socket();
236 }
237 orb.getORBData().getSocketFactory()
238 .setAcceptedSocketOptions(this, serverSocket, socket);
239 if (orb.transportDebugFlag) {
240 dprint(".accept: " +
241 (serverSocketChannel == null
242 ? serverSocket.toString()
243 : serverSocketChannel.toString()));
244 }
246 CorbaConnection connection =
247 new SocketOrChannelConnectionImpl(orb, this, socket);
248 if (orb.transportDebugFlag) {
249 dprint(".accept: new: " + connection);
250 }
252 // NOTE: The connection MUST be put in the cache BEFORE being
253 // registered with the selector. Otherwise if the bytes
254 // are read on the connection it will attempt a time stamp
255 // but the cache will be null, resulting in NPE.
257 // A connection needs to be timestamped before putting to the cache.
258 // Otherwise the newly created connection (with 0 timestamp) could be
259 // incorrectly reclaimed by concurrent reclaim() call OR if there
260 // will be no events on this connection then it could be reclaimed
261 // by upcoming reclaim() call.
262 getConnectionCache().stampTime(connection);
263 getConnectionCache().put(this, connection);
265 if (connection.shouldRegisterServerReadEvent()) {
266 Selector selector = orb.getTransportManager().getSelector(0);
267 selector.registerForEvent(connection.getEventHandler());
268 }
270 getConnectionCache().reclaim();
272 } catch (IOException e) {
273 if (orb.transportDebugFlag) {
274 dprint(".accept:", e);
275 }
276 orb.getTransportManager().getSelector(0).unregisterForEvent(this);
277 // REVISIT - need to close - recreate - then register new one.
278 orb.getTransportManager().getSelector(0).registerForEvent(this);
279 // NOTE: if register cycling we do not want to shut down ORB
280 // since local beans will still work. Instead one will see
281 // a growing log file to alert admin of problem.
282 }
283 }
285 public void close ()
286 {
287 try {
288 if (orb.transportDebugFlag) {
289 dprint(".close->:");
290 }
291 Selector selector = orb.getTransportManager().getSelector(0);
292 selector.unregisterForEvent(this);
293 if (serverSocketChannel != null) {
294 serverSocketChannel.close();
295 }
296 if (serverSocket != null) {
297 serverSocket.close();
298 }
299 } catch (IOException e) {
300 if (orb.transportDebugFlag) {
301 dprint(".close:", e);
302 }
303 } finally {
304 if (orb.transportDebugFlag) {
305 dprint(".close<-:");
306 }
307 }
308 }
310 public EventHandler getEventHandler()
311 {
312 return this;
313 }
315 ////////////////////////////////////////////////////
316 //
317 // CorbaAcceptor
318 //
320 public String getObjectAdapterId()
321 {
322 return null;
323 }
325 public String getObjectAdapterManagerId()
326 {
327 return null;
328 }
330 public void addToIORTemplate(IORTemplate iorTemplate,
331 Policies policies,
332 String codebase)
333 {
334 Iterator iterator = iorTemplate.iteratorById(
335 org.omg.IOP.TAG_INTERNET_IOP.value);
337 String hostname = orb.getORBData().getORBServerHost();
339 if (iterator.hasNext()) {
340 // REVISIT - how does this play with legacy ORBD port exchange?
341 IIOPAddress iiopAddress =
342 IIOPFactories.makeIIOPAddress(orb, hostname, port);
343 AlternateIIOPAddressComponent iiopAddressComponent =
344 IIOPFactories.makeAlternateIIOPAddressComponent(iiopAddress);
346 while (iterator.hasNext()) {
347 TaggedProfileTemplate taggedProfileTemplate =
348 (TaggedProfileTemplate) iterator.next();
349 taggedProfileTemplate.add(iiopAddressComponent);
350 }
351 } else {
352 GIOPVersion version = orb.getORBData().getGIOPVersion();
353 int templatePort;
354 if (policies.forceZeroPort()) {
355 templatePort = 0;
356 } else if (policies.isTransient()) {
357 templatePort = port;
358 } else {
359 templatePort = orb.getLegacyServerSocketManager()
360 .legacyGetPersistentServerPort(SocketInfo.IIOP_CLEAR_TEXT);
361 }
362 IIOPAddress addr =
363 IIOPFactories.makeIIOPAddress(orb, hostname, templatePort);
364 IIOPProfileTemplate iiopProfile =
365 IIOPFactories.makeIIOPProfileTemplate(orb, version, addr);
366 if (version.supportsIORIIOPProfileComponents()) {
367 iiopProfile.add(IIOPFactories.makeCodeSetsComponent(orb));
368 iiopProfile.add(IIOPFactories.makeMaxStreamFormatVersionComponent());
369 RequestPartitioningPolicy rpPolicy = (RequestPartitioningPolicy)
370 policies.get_effective_policy(
371 ORBConstants.REQUEST_PARTITIONING_POLICY);
372 if (rpPolicy != null) {
373 iiopProfile.add(
374 IIOPFactories.makeRequestPartitioningComponent(
375 rpPolicy.getValue()));
376 }
377 if (codebase != null && codebase != "") {
378 iiopProfile.add(IIOPFactories. makeJavaCodebaseComponent(codebase));
379 }
380 if (orb.getORBData().isJavaSerializationEnabled()) {
381 iiopProfile.add(
382 IIOPFactories.makeJavaSerializationComponent());
383 }
384 }
385 iorTemplate.add(iiopProfile);
386 }
387 }
389 public String getMonitoringName()
390 {
391 return "AcceptedConnections";
392 }
394 ////////////////////////////////////////////////////
395 //
396 // EventHandler methods
397 //
399 public SelectableChannel getChannel()
400 {
401 return serverSocketChannel;
402 }
404 public int getInterestOps()
405 {
406 return SelectionKey.OP_ACCEPT;
407 }
409 public Acceptor getAcceptor()
410 {
411 return this;
412 }
414 public Connection getConnection()
415 {
416 throw new RuntimeException("Should not happen.");
417 }
419 ////////////////////////////////////////////////////
420 //
421 // Work methods.
422 //
424 /* CONFLICT: with legacy below.
425 public String getName()
426 {
427 return this.toString();
428 }
429 */
431 public void doWork()
432 {
433 try {
434 if (orb.transportDebugFlag) {
435 dprint(".doWork->: " + this);
436 }
437 if (selectionKey.isAcceptable()) {
438 accept();
439 } else {
440 if (orb.transportDebugFlag) {
441 dprint(".doWork: ! selectionKey.isAcceptable: " + this);
442 }
443 }
444 } catch (SecurityException se) {
445 if (orb.transportDebugFlag) {
446 dprint(".doWork: ignoring SecurityException: "
447 + se
448 + " " + this);
449 }
450 String permissionStr = ORBUtility.getClassSecurityInfo(getClass());
451 wrapper.securityExceptionInAccept(se, permissionStr);
452 } catch (Exception ex) {
453 if (orb.transportDebugFlag) {
454 dprint(".doWork: ignoring Exception: "
455 + ex
456 + " " + this);
457 }
458 wrapper.exceptionInAccept(ex);
459 } catch (Throwable t) {
460 if (orb.transportDebugFlag) {
461 dprint(".doWork: ignoring Throwable: "
462 + t
463 + " " + this);
464 }
465 } finally {
467 // IMPORTANT: To avoid bug (4953599), we force the
468 // Thread that does the NIO select to also do the
469 // enable/disable of Ops using SelectionKey.interestOps().
470 // Otherwise, the SelectionKey.interestOps() may block
471 // indefinitely.
472 // NOTE: If "acceptorSocketUseWorkerThreadForEvent" is
473 // set to to false in ParserTable.java, then this method,
474 // doWork(), will get executed by the same thread
475 // (SelectorThread) that does the NIO select.
476 // If "acceptorSocketUseWorkerThreadForEvent" is set
477 // to true, a WorkerThread will execute this method,
478 // doWork(). Hence, the registering of the enabling of
479 // the SelectionKey's interestOps is done here instead
480 // of calling SelectionKey.interestOps(<interest op>).
482 Selector selector = orb.getTransportManager().getSelector(0);
483 selector.registerInterestOps(this);
485 if (orb.transportDebugFlag) {
486 dprint(".doWork<-:" + this);
487 }
488 }
489 }
491 public void setEnqueueTime(long timeInMillis)
492 {
493 enqueueTime = timeInMillis;
494 }
496 public long getEnqueueTime()
497 {
498 return enqueueTime;
499 }
502 //
503 // Factory methods.
504 //
506 // REVISIT: refactor into common base or delegate.
507 public MessageMediator createMessageMediator(Broker broker,
508 Connection connection)
509 {
510 // REVISIT - no factoring so cheat to avoid code dup right now.
511 // REVISIT **** COUPLING !!!!
512 ContactInfo contactInfo = new SocketOrChannelContactInfoImpl();
513 return contactInfo.createMessageMediator(broker, connection);
514 }
516 // REVISIT: refactor into common base or delegate.
517 public MessageMediator finishCreatingMessageMediator(Broker broker,
518 Connection connection,
519 MessageMediator messageMediator)
520 {
521 // REVISIT - no factoring so cheat to avoid code dup right now.
522 // REVISIT **** COUPLING !!!!
523 ContactInfo contactInfo = new SocketOrChannelContactInfoImpl();
524 return contactInfo.finishCreatingMessageMediator(broker,
525 connection, messageMediator);
526 }
528 public InputObject createInputObject(Broker broker,
529 MessageMediator messageMediator)
530 {
531 CorbaMessageMediator corbaMessageMediator = (CorbaMessageMediator)
532 messageMediator;
533 return new CDRInputObject((ORB)broker,
534 (CorbaConnection)messageMediator.getConnection(),
535 corbaMessageMediator.getDispatchBuffer(),
536 corbaMessageMediator.getDispatchHeader());
537 }
539 public OutputObject createOutputObject(Broker broker,
540 MessageMediator messageMediator)
541 {
542 CorbaMessageMediator corbaMessageMediator = (CorbaMessageMediator)
543 messageMediator;
544 return sun.corba.OutputStreamFactory.newCDROutputObject((ORB) broker,
545 corbaMessageMediator, corbaMessageMediator.getReplyHeader(),
546 corbaMessageMediator.getStreamFormatVersion());
547 }
549 ////////////////////////////////////////////////////
550 //
551 // SocketOrChannelAcceptor
552 //
554 public ServerSocket getServerSocket()
555 {
556 return serverSocket;
557 }
559 ////////////////////////////////////////////////////
560 //
561 // Implementation.
562 //
564 public String toString()
565 {
566 String sock;
567 if (serverSocketChannel == null) {
568 if (serverSocket == null) {
569 sock = "(not initialized)";
570 } else {
571 sock = serverSocket.toString();
572 }
573 } else {
574 sock = serverSocketChannel.toString();
575 }
577 return
578 toStringName() +
579 "["
580 + sock + " "
581 + type + " "
582 + shouldUseSelectThreadToWait() + " "
583 + shouldUseWorkerThreadForEvent()
584 + "]" ;
585 }
587 protected String toStringName()
588 {
589 return "SocketOrChannelAcceptorImpl";
590 }
592 protected void dprint(String msg)
593 {
594 ORBUtility.dprint(toStringName(), msg);
595 }
597 protected void dprint(String msg, Throwable t)
598 {
599 dprint(msg);
600 t.printStackTrace(System.out);
601 }
603 // BEGIN Legacy support
604 ////////////////////////////////////////////////////
605 //
606 // LegacyServerSocketEndPointInfo and EndPointInfo
607 //
609 public String getType()
610 {
611 return type;
612 }
614 public String getHostName()
615 {
616 return hostname;
617 }
619 public String getHost()
620 {
621 return hostname;
622 }
624 public int getPort()
625 {
626 return port;
627 }
629 public int getLocatorPort()
630 {
631 return locatorPort;
632 }
634 public void setLocatorPort (int port)
635 {
636 locatorPort = port;
637 }
639 public String getName()
640 {
641 // Kluge alert:
642 // Work and Legacy both define getName.
643 // Try to make this behave best for most cases.
644 String result =
645 name.equals(LegacyServerSocketEndPointInfo.NO_NAME) ?
646 this.toString() : name;
647 return result;
648 }
649 // END Legacy support
650 }
652 // End of file.