001package org.apache.commons.jcs3.utils.discovery;
002
003/*
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *   http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied.  See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 */
021
022import java.io.IOException;
023import java.net.Inet6Address;
024import java.net.InetAddress;
025import java.net.InetSocketAddress;
026import java.net.NetworkInterface;
027import java.net.StandardProtocolFamily;
028import java.net.StandardSocketOptions;
029import java.nio.ByteBuffer;
030import java.nio.channels.DatagramChannel;
031import java.nio.channels.MembershipKey;
032import java.nio.channels.SelectionKey;
033import java.nio.channels.Selector;
034import java.util.Iterator;
035import java.util.concurrent.ArrayBlockingQueue;
036import java.util.concurrent.ExecutorService;
037import java.util.concurrent.atomic.AtomicBoolean;
038import java.util.concurrent.atomic.AtomicInteger;
039
040import org.apache.commons.jcs3.engine.CacheInfo;
041import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
042import org.apache.commons.jcs3.engine.behavior.IShutdownObserver;
043import org.apache.commons.jcs3.log.Log;
044import org.apache.commons.jcs3.log.LogManager;
045import org.apache.commons.jcs3.utils.net.HostNameUtil;
046import org.apache.commons.jcs3.utils.threadpool.PoolConfiguration;
047import org.apache.commons.jcs3.utils.threadpool.PoolConfiguration.WhenBlockedPolicy;
048import org.apache.commons.jcs3.utils.threadpool.ThreadPoolManager;
049
050/** Receives UDP Discovery messages. */
051public class UDPDiscoveryReceiver
052    implements Runnable, IShutdownObserver
053{
054    /** The log factory */
055    private static final Log log = LogManager.getLog( UDPDiscoveryReceiver.class );
056
057    /** The channel used for communication. */
058    private DatagramChannel multicastChannel;
059
060    /** The group membership key. */
061    private MembershipKey multicastGroupKey;
062
063    /** The selector. */
064    private Selector selector;
065
066    /**
067     * TODO: Consider using the threadpool manager to get this thread pool. For now place a tight
068     * restriction on the pool size
069     */
070    private static final int maxPoolSize = 2;
071
072    /** The processor */
073    private final ExecutorService pooledExecutor;
074
075    /** number of messages received. For debugging and testing. */
076    private final AtomicInteger cnt = new AtomicInteger(0);
077
078    /** Service to get cache names and handle request broadcasts */
079    private final UDPDiscoveryService service;
080
081    /** Serializer */
082    private IElementSerializer serializer;
083
084    /** Is it shutdown. */
085    private final AtomicBoolean shutdown = new AtomicBoolean(false);
086
087    /**
088     * Constructor for the UDPDiscoveryReceiver object.
089     * <p>
090     * We determine our own host using InetAddress
091     *<p>
092     * @param service
093     * @param multicastInterfaceString
094     * @param multicastAddressString
095     * @param multicastPort
096     * @throws IOException
097     */
098    public UDPDiscoveryReceiver( final UDPDiscoveryService service,
099            final String multicastInterfaceString,
100            final String multicastAddressString,
101            final int multicastPort )
102        throws IOException
103    {
104        this(service, multicastInterfaceString,
105                InetAddress.getByName( multicastAddressString ),
106                multicastPort);
107    }
108
109    /**
110     * Constructor for the UDPDiscoveryReceiver object.
111     * <p>
112     * @param service
113     * @param multicastInterfaceString
114     * @param multicastAddress
115     * @param multicastPort
116     * @throws IOException
117     * @since 3.1
118     */
119    public UDPDiscoveryReceiver( final UDPDiscoveryService service,
120            final String multicastInterfaceString,
121            final InetAddress multicastAddress,
122            final int multicastPort )
123        throws IOException
124    {
125        this.service = service;
126        if (service != null)
127        {
128            this.serializer = service.getSerializer();
129        }
130
131        // create a small thread pool to handle a barrage
132        this.pooledExecutor = ThreadPoolManager.getInstance().createPool(
133                new PoolConfiguration(false, 0, maxPoolSize, maxPoolSize, 0,
134                        WhenBlockedPolicy.DISCARDOLDEST, maxPoolSize),
135                "JCS-UDPDiscoveryReceiver-", Thread.MIN_PRIORITY);
136
137        log.info( "Constructing listener, [{0}:{1}]", multicastAddress, multicastPort );
138        createSocket( multicastInterfaceString, multicastAddress, multicastPort );
139    }
140
141    /**
142     * Creates the socket for this class.
143     * <p>
144     * @param multicastInterfaceString
145     * @param multicastAddress
146     * @param multicastPort
147     * @throws IOException
148     */
149    private void createSocket( final String multicastInterfaceString, final InetAddress multicastAddress,
150            final int multicastPort )
151        throws IOException
152    {
153        try
154        {
155            // Use dedicated interface if specified
156            NetworkInterface multicastInterface = null;
157            if (multicastInterfaceString != null)
158            {
159                multicastInterface = NetworkInterface.getByName(multicastInterfaceString);
160            }
161            else
162            {
163                multicastInterface = HostNameUtil.getMulticastNetworkInterface();
164            }
165            if (multicastInterface != null)
166            {
167                log.info("Using network interface {0}", multicastInterface::getDisplayName);
168            }
169
170            multicastChannel = DatagramChannel.open(
171                    multicastAddress instanceof Inet6Address ?
172                            StandardProtocolFamily.INET6 : StandardProtocolFamily.INET)
173                    .setOption(StandardSocketOptions.SO_REUSEADDR, true)
174                    .setOption(StandardSocketOptions.IP_MULTICAST_IF, multicastInterface)
175                    .bind(new InetSocketAddress(multicastPort));
176            multicastChannel.configureBlocking(false);
177
178            log.info("Joining Group: [{0}] on {1}", multicastAddress, multicastInterface);
179            multicastGroupKey = multicastChannel.join(multicastAddress, multicastInterface);
180
181            selector = Selector.open();
182            multicastChannel.register(selector, SelectionKey.OP_READ);
183        }
184        catch ( final IOException e )
185        {
186            log.error( "Could not bind to multicast address [{0}:{1}]", multicastAddress,
187                    multicastPort, e );
188            throw e;
189        }
190    }
191
192    private final ArrayBlockingQueue<UDPDiscoveryMessage> msgQueue =
193            new ArrayBlockingQueue<>(maxPoolSize);
194
195    /**
196     * Wait for multicast message
197     *
198     * @return the object message
199     * @throws IOException
200     * @deprecated no longer used
201     */
202    @Deprecated
203    public Object waitForMessage()
204        throws IOException
205    {
206        try
207        {
208            return msgQueue.take();
209        }
210        catch (InterruptedException e)
211        {
212            throw new IOException("Interrupted waiting for message", e);
213        }
214    }
215
216    /** Main processing method for the UDPDiscoveryReceiver object */
217    @Override
218    public void run()
219    {
220        try
221        {
222            log.debug( "Waiting for message." );
223
224            while (!shutdown.get())
225            {
226                int activeKeys = selector.select();
227                if (activeKeys == 0)
228                {
229                    continue;
230                }
231
232                for (Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();)
233                {
234                    if (shutdown.get())
235                    {
236                        break;
237                    }
238
239                    SelectionKey key = i.next();
240                    i.remove();
241
242                    if (!key.isValid())
243                    {
244                        continue;
245                    }
246
247                    if (key.isReadable())
248                    {
249                        cnt.incrementAndGet();
250                        log.debug( "{0} messages received.", this::getCnt );
251
252                        DatagramChannel mc = (DatagramChannel) key.channel();
253
254                        ByteBuffer byteBuffer = ByteBuffer.allocate(65536);
255                        InetSocketAddress sourceAddress =
256                                (InetSocketAddress) mc.receive(byteBuffer);
257                        byteBuffer.flip();
258
259                        try
260                        {
261                            log.debug("Received packet from address [{0}]", sourceAddress);
262                            byte[] bytes = new byte[byteBuffer.limit()];
263                            byteBuffer.get(bytes);
264                            Object obj = serializer.deSerialize(bytes, null);
265
266                            if (obj instanceof UDPDiscoveryMessage)
267                            {
268                                // Ensure that the address we're supposed to send to is, indeed, the address
269                                // of the machine on the other end of this connection.  This guards against
270                                // instances where we don't exactly get the right local host address
271                                final UDPDiscoveryMessage msg = (UDPDiscoveryMessage) obj;
272                                msg.setHost(sourceAddress.getHostString());
273
274                                log.debug( "Read object from address [{0}], object=[{1}]",
275                                        sourceAddress, obj );
276
277                                // Just to keep the functionality of the deprecated waitForMessage method
278                                synchronized (msgQueue)
279                                {
280                                    // Check if queue full already?
281                                    if (msgQueue.remainingCapacity() == 0)
282                                    {
283                                        // remove oldest element from queue
284                                        msgQueue.remove();
285                                    }
286
287                                    msgQueue.add(msg);
288                                }
289
290                                pooledExecutor.execute(() -> handleMessage(msg));
291                                log.debug( "Passed handler to executor." );
292                            }
293                        }
294                        catch ( final IOException | ClassNotFoundException e )
295                        {
296                            log.error( "Error receiving multicast packet", e );
297                        }
298                    }
299                }
300            } // end while
301        }
302        catch ( final IOException e )
303        {
304            log.error( "Unexpected exception in UDP receiver.", e );
305        }
306    }
307
308    /**
309     * @param cnt The cnt to set.
310     */
311    public void setCnt( final int cnt )
312    {
313        this.cnt.set(cnt);
314    }
315
316    /**
317     * @return Returns the cnt.
318     */
319    public int getCnt()
320    {
321        return cnt.get();
322    }
323
324    /**
325     * For testing
326     *
327     * @param serializer the serializer to set
328     * @since 3.1
329     */
330    protected void setSerializer(IElementSerializer serializer)
331    {
332        this.serializer = serializer;
333    }
334
335    /**
336     * Separate thread run when a command comes into the UDPDiscoveryReceiver.
337     * @deprecated No longer used
338     */
339    @Deprecated
340    public class MessageHandler
341        implements Runnable
342    {
343        /** The message to handle. Passed in during construction. */
344        private final UDPDiscoveryMessage message;
345
346        /**
347         * @param message
348         */
349        public MessageHandler( final UDPDiscoveryMessage message )
350        {
351            this.message = message;
352        }
353
354        /**
355         * Process the message.
356         */
357        @Override
358        public void run()
359        {
360            handleMessage(message);
361        }
362    }
363
364    /**
365     * Separate thread run when a command comes into the UDPDiscoveryReceiver.
366     */
367    private void handleMessage(UDPDiscoveryMessage message)
368    {
369        // consider comparing ports here instead.
370        if ( message.getRequesterId() == CacheInfo.listenerId )
371        {
372            log.debug( "Ignoring message sent from self" );
373        }
374        else
375        {
376            log.debug( "Process message sent from another" );
377            log.debug( "Message = {0}", message );
378
379            if ( message.getHost() == null || message.getCacheNames() == null || message.getCacheNames().isEmpty() )
380            {
381                log.debug( "Ignoring invalid message: {0}", message );
382            }
383            else
384            {
385                processMessage(message);
386            }
387        }
388    }
389
390    /**
391     * Process the incoming message.
392     */
393    private void processMessage(UDPDiscoveryMessage message)
394    {
395        final DiscoveredService discoveredService = new DiscoveredService(message);
396
397        switch (message.getMessageType())
398        {
399            case REMOVE:
400                log.debug( "Removing service from set {0}", discoveredService );
401                service.removeDiscoveredService( discoveredService );
402                break;
403            case REQUEST:
404                // if this is a request message, have the service handle it and
405                // return
406                log.debug( "Message is a Request Broadcast, will have the service handle it." );
407                service.serviceRequestBroadcast();
408                break;
409            case PASSIVE:
410            default:
411                log.debug( "Adding or updating service to set {0}", discoveredService );
412                service.addOrUpdateService( discoveredService );
413                break;
414        }
415    }
416
417    /** Shuts down the socket. */
418    @Override
419    public void shutdown()
420    {
421        if (shutdown.compareAndSet(false, true))
422        {
423            try
424            {
425                selector.close();
426                multicastGroupKey.drop();
427                multicastChannel.close();
428            }
429            catch ( final IOException e )
430            {
431                log.error( "Problem closing socket" );
432            }
433        }
434    }
435}