src/share/classes/com/sun/corba/se/impl/transport/SocketOrChannelAcceptorImpl.java

Fri, 04 Apr 2014 14:58:04 +0400

author
aefimov
date
Fri, 04 Apr 2014 14:58:04 +0400
changeset 645
693525eeea85
parent 478
80161c61aa68
child 748
6845b95cba6b
child 1205
803798e13dd5
permissions
-rw-r--r--

8029073: (corba) New connection reclaimed when number of connection is greater than highwatermark
Reviewed-by: coffeys

     1 /*
     2  * Copyright (c) 2001, 2013, Oracle and/or its affiliates. All rights reserved.
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     4  *
     5  * This code is free software; you can redistribute it and/or modify it
     6  * under the terms of the GNU General Public License version 2 only, as
     7  * published by the Free Software Foundation.  Oracle designates this
     8  * particular file as subject to the "Classpath" exception as provided
     9  * by Oracle in the LICENSE file that accompanied this code.
    10  *
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
    14  * version 2 for more details (a copy is included in the LICENSE file that
    15  * accompanied this code).
    16  *
    17  * You should have received a copy of the GNU General Public License version
    18  * 2 along with this work; if not, write to the Free Software Foundation,
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
    20  *
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
    22  * or visit www.oracle.com if you need additional information or have any
    23  * questions.
    24  */
    26 package com.sun.corba.se.impl.transport;
    28 import java.io.IOException;
    29 import java.net.InetSocketAddress;
    30 import java.net.ServerSocket;
    31 import java.net.Socket;
    32 import java.nio.channels.SelectableChannel;
    33 import java.nio.channels.SelectionKey;
    34 import java.nio.channels.ServerSocketChannel;
    35 import java.nio.channels.SocketChannel;
    36 import java.util.Iterator;
    38 import com.sun.corba.se.pept.broker.Broker;
    39 import com.sun.corba.se.pept.encoding.InputObject;
    40 import com.sun.corba.se.pept.encoding.OutputObject;
    41 import com.sun.corba.se.pept.protocol.MessageMediator;
    42 import com.sun.corba.se.pept.transport.Acceptor;
    43 import com.sun.corba.se.pept.transport.Connection;
    44 import com.sun.corba.se.pept.transport.ContactInfo;
    45 import com.sun.corba.se.pept.transport.EventHandler;
    46 import com.sun.corba.se.pept.transport.InboundConnectionCache;
    47 import com.sun.corba.se.pept.transport.Selector;
    49 import com.sun.corba.se.spi.extension.RequestPartitioningPolicy;
    50 import com.sun.corba.se.spi.ior.IORTemplate;
    51 import com.sun.corba.se.spi.ior.TaggedProfileTemplate;
    52 import com.sun.corba.se.spi.ior.iiop.IIOPAddress ;
    53 import com.sun.corba.se.spi.ior.iiop.IIOPFactories;
    54 import com.sun.corba.se.spi.ior.iiop.IIOPProfileTemplate ;
    55 import com.sun.corba.se.spi.ior.iiop.GIOPVersion ;
    56 import com.sun.corba.se.spi.ior.iiop.AlternateIIOPAddressComponent;
    57 import com.sun.corba.se.spi.logging.CORBALogDomains;
    58 import com.sun.corba.se.spi.orb.ORB;
    59 import com.sun.corba.se.spi.orbutil.threadpool.Work;
    60 import com.sun.corba.se.spi.protocol.CorbaMessageMediator;
    61 import com.sun.corba.se.spi.transport.CorbaAcceptor;
    62 import com.sun.corba.se.spi.transport.CorbaConnection;
    63 import com.sun.corba.se.spi.transport.SocketInfo;
    64 import com.sun.corba.se.spi.transport.SocketOrChannelAcceptor;
    66 import com.sun.corba.se.impl.encoding.CDRInputObject;
    67 import com.sun.corba.se.impl.encoding.CDROutputObject;
    68 import com.sun.corba.se.impl.logging.ORBUtilSystemException;
    69 import com.sun.corba.se.impl.oa.poa.Policies; // REVISIT impl/poa specific
    70 import com.sun.corba.se.impl.orbutil.ORBConstants;
    71 import com.sun.corba.se.impl.orbutil.ORBUtility;
    73 // BEGIN Legacy support.
    74 import com.sun.corba.se.spi.legacy.connection.LegacyServerSocketEndPointInfo;
    75 // END Legacy support.
    77 /**
    78  * @author Harold Carr
    79  */
    80 public class SocketOrChannelAcceptorImpl
    81     extends
    82         EventHandlerBase
    83     implements
    84         CorbaAcceptor,
    85         SocketOrChannelAcceptor,
    86         Work,
    87         // BEGIN Legacy
    88         SocketInfo,
    89         LegacyServerSocketEndPointInfo
    90         // END Legacy
    91 {
    92     protected ServerSocketChannel serverSocketChannel;
    93     protected ServerSocket serverSocket;
    94     protected int port;
    95     protected long enqueueTime;
    96     protected boolean initialized;
    97     protected ORBUtilSystemException wrapper ;
    98     protected InboundConnectionCache connectionCache;
   100     // BEGIN Legacy
   101     protected String type = "";
   102     protected String name = "";
   103     protected String hostname;
   104     protected int locatorPort;
   105     // END Legacy
   107     public SocketOrChannelAcceptorImpl(ORB orb)
   108     {
   109         this.orb = orb;
   110         wrapper = ORBUtilSystemException.get( orb,
   111             CORBALogDomains.RPC_TRANSPORT ) ;
   113         setWork(this);
   114         initialized = false;
   116         // BEGIN Legacy support.
   117         this.hostname = orb.getORBData().getORBServerHost();
   118         this.name = LegacyServerSocketEndPointInfo.NO_NAME;
   119         this.locatorPort = -1;
   120         // END Legacy support.
   121     }
   123     public SocketOrChannelAcceptorImpl(ORB orb, int port)
   124     {
   125         this(orb);
   126         this.port = port;
   127     }
   129     // BEGIN Legacy support.
   130     public SocketOrChannelAcceptorImpl(ORB orb, int port,
   131                                        String name, String type)
   132     {
   133         this(orb, port);
   134         this.name = name;
   135         this.type = type;
   136     }
   137     // END Legacy support.
   139     ////////////////////////////////////////////////////
   140     //
   141     // pept.transport.Acceptor
   142     //
   144     public boolean initialize()
   145     {
   146         if (initialized) {
   147             return false;
   148         }
   149         if (orb.transportDebugFlag) {
   150             dprint(".initialize: " + this);
   151         }
   152         InetSocketAddress inetSocketAddress = null;
   153         try {
   154             if (orb.getORBData().getListenOnAllInterfaces().equals(ORBConstants.LISTEN_ON_ALL_INTERFACES)) {
   155                 inetSocketAddress = new InetSocketAddress(port);
   156             } else {
   157                 String host = orb.getORBData().getORBServerHost();
   158                 inetSocketAddress = new InetSocketAddress(host, port);
   159             }
   160             serverSocket = orb.getORBData().getSocketFactory()
   161                 .createServerSocket(type, inetSocketAddress);
   162             internalInitialize();
   163         } catch (Throwable t) {
   164             throw wrapper.createListenerFailed( t, Integer.toString(port) ) ;
   165         }
   166         initialized = true;
   167         return true;
   168     }
   170     protected void internalInitialize()
   171         throws Exception
   172     {
   173         // Determine the listening port (for the IOR).
   174         // This is important when using emphemeral ports (i.e.,
   175         // when the port value to the constructor is 0).
   177         port = serverSocket.getLocalPort();
   179         // Register with transport (also sets up monitoring).
   181         orb.getCorbaTransportManager().getInboundConnectionCache(this);
   183         // Finish configuation.
   185         serverSocketChannel = serverSocket.getChannel();
   187         if (serverSocketChannel != null) {
   188             setUseSelectThreadToWait(
   189                 orb.getORBData().acceptorSocketUseSelectThreadToWait());
   190             serverSocketChannel.configureBlocking(
   191                 ! orb.getORBData().acceptorSocketUseSelectThreadToWait());
   192         } else {
   193             // Configure to use listener and reader threads.
   194             setUseSelectThreadToWait(false);
   195         }
   196         setUseWorkerThreadForEvent(
   197             orb.getORBData().acceptorSocketUseWorkerThreadForEvent());
   199     }
   201     public boolean initialized()
   202     {
   203         return initialized;
   204     }
   206     public String getConnectionCacheType()
   207     {
   208         return this.getClass().toString();
   209     }
   211     public void setConnectionCache(InboundConnectionCache connectionCache)
   212     {
   213         this.connectionCache = connectionCache;
   214     }
   216     public InboundConnectionCache getConnectionCache()
   217     {
   218         return connectionCache;
   219     }
   221     public boolean shouldRegisterAcceptEvent()
   222     {
   223         return true;
   224     }
   226     public void accept()
   227     {
   228         try {
   229             SocketChannel socketChannel = null;
   230             Socket socket = null;
   231             if (serverSocketChannel == null) {
   232                 socket = serverSocket.accept();
   233             } else {
   234                 socketChannel = serverSocketChannel.accept();
   235                 socket = socketChannel.socket();
   236             }
   237             orb.getORBData().getSocketFactory()
   238                 .setAcceptedSocketOptions(this, serverSocket, socket);
   239             if (orb.transportDebugFlag) {
   240                 dprint(".accept: " +
   241                        (serverSocketChannel == null
   242                         ? serverSocket.toString()
   243                         : serverSocketChannel.toString()));
   244             }
   246             CorbaConnection connection =
   247                 new SocketOrChannelConnectionImpl(orb, this, socket);
   248             if (orb.transportDebugFlag) {
   249                 dprint(".accept: new: " + connection);
   250             }
   252             // NOTE: The connection MUST be put in the cache BEFORE being
   253             // registered with the selector.  Otherwise if the bytes
   254             // are read on the connection it will attempt a time stamp
   255             // but the cache will be null, resulting in NPE.
   257             // A connection needs to be timestamped before putting to the cache.
   258             // Otherwise the newly created connection (with 0 timestamp) could be
   259             // incorrectly reclaimed by concurrent reclaim() call OR if there
   260             // will be no events on this connection then it could be reclaimed
   261             // by upcoming reclaim() call.
   262             getConnectionCache().stampTime(connection);
   263             getConnectionCache().put(this, connection);
   265             if (connection.shouldRegisterServerReadEvent()) {
   266                 Selector selector = orb.getTransportManager().getSelector(0);
   267                 selector.registerForEvent(connection.getEventHandler());
   268             }
   270             getConnectionCache().reclaim();
   272         } catch (IOException e) {
   273             if (orb.transportDebugFlag) {
   274                 dprint(".accept:", e);
   275             }
   276             orb.getTransportManager().getSelector(0).unregisterForEvent(this);
   277             // REVISIT - need to close - recreate - then register new one.
   278             orb.getTransportManager().getSelector(0).registerForEvent(this);
   279             // NOTE: if register cycling we do not want to shut down ORB
   280             // since local beans will still work.  Instead one will see
   281             // a growing log file to alert admin of problem.
   282         }
   283     }
   285     public void close ()
   286     {
   287         try {
   288             if (orb.transportDebugFlag) {
   289                 dprint(".close->:");
   290             }
   291             Selector selector = orb.getTransportManager().getSelector(0);
   292             selector.unregisterForEvent(this);
   293             if (serverSocketChannel != null) {
   294                 serverSocketChannel.close();
   295             }
   296             if (serverSocket != null) {
   297                 serverSocket.close();
   298             }
   299         } catch (IOException e) {
   300             if (orb.transportDebugFlag) {
   301                 dprint(".close:", e);
   302             }
   303         } finally {
   304             if (orb.transportDebugFlag) {
   305                 dprint(".close<-:");
   306             }
   307         }
   308     }
   310     public EventHandler getEventHandler()
   311     {
   312         return this;
   313     }
   315     ////////////////////////////////////////////////////
   316     //
   317     // CorbaAcceptor
   318     //
   320     public String getObjectAdapterId()
   321     {
   322         return null;
   323     }
   325     public String getObjectAdapterManagerId()
   326     {
   327         return null;
   328     }
   330     public void addToIORTemplate(IORTemplate iorTemplate,
   331                                  Policies policies,
   332                                  String codebase)
   333     {
   334         Iterator iterator = iorTemplate.iteratorById(
   335             org.omg.IOP.TAG_INTERNET_IOP.value);
   337         String hostname = orb.getORBData().getORBServerHost();
   339         if (iterator.hasNext()) {
   340             // REVISIT - how does this play with legacy ORBD port exchange?
   341             IIOPAddress iiopAddress =
   342                 IIOPFactories.makeIIOPAddress(orb, hostname, port);
   343             AlternateIIOPAddressComponent iiopAddressComponent =
   344                 IIOPFactories.makeAlternateIIOPAddressComponent(iiopAddress);
   346             while (iterator.hasNext()) {
   347                 TaggedProfileTemplate taggedProfileTemplate =
   348                     (TaggedProfileTemplate) iterator.next();
   349                 taggedProfileTemplate.add(iiopAddressComponent);
   350             }
   351         } else {
   352             GIOPVersion version = orb.getORBData().getGIOPVersion();
   353             int templatePort;
   354             if (policies.forceZeroPort()) {
   355                 templatePort = 0;
   356             } else if (policies.isTransient()) {
   357                 templatePort = port;
   358             } else {
   359                 templatePort = orb.getLegacyServerSocketManager()
   360                    .legacyGetPersistentServerPort(SocketInfo.IIOP_CLEAR_TEXT);
   361             }
   362             IIOPAddress addr =
   363                 IIOPFactories.makeIIOPAddress(orb, hostname, templatePort);
   364             IIOPProfileTemplate iiopProfile =
   365                 IIOPFactories.makeIIOPProfileTemplate(orb, version, addr);
   366             if (version.supportsIORIIOPProfileComponents()) {
   367                 iiopProfile.add(IIOPFactories.makeCodeSetsComponent(orb));
   368                 iiopProfile.add(IIOPFactories.makeMaxStreamFormatVersionComponent());
   369                 RequestPartitioningPolicy rpPolicy = (RequestPartitioningPolicy)
   370                     policies.get_effective_policy(
   371                                       ORBConstants.REQUEST_PARTITIONING_POLICY);
   372                 if (rpPolicy != null) {
   373                     iiopProfile.add(
   374                          IIOPFactories.makeRequestPartitioningComponent(
   375                              rpPolicy.getValue()));
   376                 }
   377                 if (codebase != null && codebase != "") {
   378                     iiopProfile.add(IIOPFactories. makeJavaCodebaseComponent(codebase));
   379                 }
   380                 if (orb.getORBData().isJavaSerializationEnabled()) {
   381                     iiopProfile.add(
   382                            IIOPFactories.makeJavaSerializationComponent());
   383                 }
   384             }
   385             iorTemplate.add(iiopProfile);
   386         }
   387     }
   389     public String getMonitoringName()
   390     {
   391         return "AcceptedConnections";
   392     }
   394     ////////////////////////////////////////////////////
   395     //
   396     // EventHandler methods
   397     //
   399     public SelectableChannel getChannel()
   400     {
   401         return serverSocketChannel;
   402     }
   404     public int getInterestOps()
   405     {
   406         return SelectionKey.OP_ACCEPT;
   407     }
   409     public Acceptor getAcceptor()
   410     {
   411         return this;
   412     }
   414     public Connection getConnection()
   415     {
   416         throw new RuntimeException("Should not happen.");
   417     }
   419     ////////////////////////////////////////////////////
   420     //
   421     // Work methods.
   422     //
   424     /* CONFLICT: with legacy below.
   425     public String getName()
   426     {
   427         return this.toString();
   428     }
   429     */
   431     public void doWork()
   432     {
   433         try {
   434             if (orb.transportDebugFlag) {
   435                 dprint(".doWork->: " + this);
   436             }
   437             if (selectionKey.isAcceptable()) {
   438                         accept();
   439             } else {
   440                 if (orb.transportDebugFlag) {
   441                     dprint(".doWork: ! selectionKey.isAcceptable: " + this);
   442                 }
   443             }
   444         } catch (SecurityException se) {
   445             if (orb.transportDebugFlag) {
   446                 dprint(".doWork: ignoring SecurityException: "
   447                        + se
   448                        + " " + this);
   449             }
   450             String permissionStr = ORBUtility.getClassSecurityInfo(getClass());
   451             wrapper.securityExceptionInAccept(se, permissionStr);
   452         } catch (Exception ex) {
   453             if (orb.transportDebugFlag) {
   454                 dprint(".doWork: ignoring Exception: "
   455                        + ex
   456                        + " " + this);
   457             }
   458             wrapper.exceptionInAccept(ex);
   459         } catch (Throwable t) {
   460             if (orb.transportDebugFlag) {
   461                 dprint(".doWork: ignoring Throwable: "
   462                        + t
   463                        + " " + this);
   464             }
   465         } finally {
   467             // IMPORTANT: To avoid bug (4953599), we force the
   468             // Thread that does the NIO select to also do the
   469             // enable/disable of Ops using SelectionKey.interestOps().
   470             // Otherwise, the SelectionKey.interestOps() may block
   471             // indefinitely.
   472             // NOTE: If "acceptorSocketUseWorkerThreadForEvent" is
   473             // set to to false in ParserTable.java, then this method,
   474             // doWork(), will get executed by the same thread
   475             // (SelectorThread) that does the NIO select.
   476             // If "acceptorSocketUseWorkerThreadForEvent" is set
   477             // to true, a WorkerThread will execute this method,
   478             // doWork(). Hence, the registering of the enabling of
   479             // the SelectionKey's interestOps is done here instead
   480             // of calling SelectionKey.interestOps(<interest op>).
   482             Selector selector = orb.getTransportManager().getSelector(0);
   483             selector.registerInterestOps(this);
   485             if (orb.transportDebugFlag) {
   486                 dprint(".doWork<-:" + this);
   487             }
   488         }
   489     }
   491     public void setEnqueueTime(long timeInMillis)
   492     {
   493         enqueueTime = timeInMillis;
   494     }
   496     public long getEnqueueTime()
   497     {
   498         return enqueueTime;
   499     }
   502     //
   503     // Factory methods.
   504     //
   506     // REVISIT: refactor into common base or delegate.
   507     public MessageMediator createMessageMediator(Broker broker,
   508                                                  Connection connection)
   509     {
   510         // REVISIT - no factoring so cheat to avoid code dup right now.
   511         // REVISIT **** COUPLING !!!!
   512         ContactInfo contactInfo = new SocketOrChannelContactInfoImpl();
   513         return contactInfo.createMessageMediator(broker, connection);
   514     }
   516     // REVISIT: refactor into common base or delegate.
   517     public MessageMediator finishCreatingMessageMediator(Broker broker,
   518                                                          Connection connection,
   519                                                          MessageMediator messageMediator)
   520     {
   521         // REVISIT - no factoring so cheat to avoid code dup right now.
   522         // REVISIT **** COUPLING !!!!
   523         ContactInfo contactInfo = new SocketOrChannelContactInfoImpl();
   524         return contactInfo.finishCreatingMessageMediator(broker,
   525                                           connection, messageMediator);
   526     }
   528     public InputObject createInputObject(Broker broker,
   529                                          MessageMediator messageMediator)
   530     {
   531         CorbaMessageMediator corbaMessageMediator = (CorbaMessageMediator)
   532             messageMediator;
   533         return new CDRInputObject((ORB)broker,
   534                                   (CorbaConnection)messageMediator.getConnection(),
   535                                   corbaMessageMediator.getDispatchBuffer(),
   536                                   corbaMessageMediator.getDispatchHeader());
   537     }
   539     public OutputObject createOutputObject(Broker broker,
   540                                            MessageMediator messageMediator)
   541     {
   542         CorbaMessageMediator corbaMessageMediator = (CorbaMessageMediator)
   543             messageMediator;
   544         return sun.corba.OutputStreamFactory.newCDROutputObject((ORB) broker,
   545                        corbaMessageMediator, corbaMessageMediator.getReplyHeader(),
   546                        corbaMessageMediator.getStreamFormatVersion());
   547     }
   549     ////////////////////////////////////////////////////
   550     //
   551     // SocketOrChannelAcceptor
   552     //
   554     public ServerSocket getServerSocket()
   555     {
   556         return serverSocket;
   557     }
   559     ////////////////////////////////////////////////////
   560     //
   561     // Implementation.
   562     //
   564     public String toString()
   565     {
   566         String sock;
   567         if (serverSocketChannel == null) {
   568             if (serverSocket == null) {
   569                 sock = "(not initialized)";
   570             } else {
   571                 sock = serverSocket.toString();
   572             }
   573         } else {
   574             sock = serverSocketChannel.toString();
   575         }
   577         return
   578             toStringName() +
   579             "["
   580             + sock + " "
   581             + type + " "
   582             + shouldUseSelectThreadToWait() + " "
   583             + shouldUseWorkerThreadForEvent()
   584             + "]" ;
   585     }
   587     protected String toStringName()
   588     {
   589         return "SocketOrChannelAcceptorImpl";
   590     }
   592     protected void dprint(String msg)
   593     {
   594         ORBUtility.dprint(toStringName(), msg);
   595     }
   597     protected void dprint(String msg, Throwable t)
   598     {
   599         dprint(msg);
   600         t.printStackTrace(System.out);
   601     }
   603     // BEGIN Legacy support
   604     ////////////////////////////////////////////////////
   605     //
   606     // LegacyServerSocketEndPointInfo and EndPointInfo
   607     //
   609     public String getType()
   610     {
   611         return type;
   612     }
   614     public String getHostName()
   615     {
   616         return hostname;
   617     }
   619     public String getHost()
   620     {
   621         return hostname;
   622     }
   624     public int getPort()
   625     {
   626         return port;
   627     }
   629     public int getLocatorPort()
   630     {
   631         return locatorPort;
   632     }
   634     public void setLocatorPort (int port)
   635     {
   636         locatorPort = port;
   637     }
   639     public String getName()
   640     {
   641         // Kluge alert:
   642         // Work and Legacy both define getName.
   643         // Try to make this behave best for most cases.
   644         String result =
   645             name.equals(LegacyServerSocketEndPointInfo.NO_NAME) ?
   646             this.toString() : name;
   647         return result;
   648     }
   649     // END Legacy support
   650 }
   652 // End of file.

mercurial