001package NeoVisionaries.WebSockets;
002
003import java.io.IOException;
004import java.net.Inet4Address;
005import java.net.Inet6Address;
006import java.net.InetAddress;
007import java.net.InetSocketAddress;
008import java.net.Socket;
009import java.net.SocketAddress;
010import java.util.ArrayList;
011import java.util.List;
012import java.util.concurrent.CountDownLatch;
013import java.util.concurrent.TimeUnit;
014
015import javax.net.SocketFactory;
016
017
018/**
019 * Lets multiple sockets race the given IP addresses until one has been
020 * established.
021 * 
022 * <EMBED CLASS='external-html' DATA-FILE-ID=LICENSE><BR /><BR />
023 *
024 * This follows <a href="https://tools.ietf.org/html/rfc6555">RFC 6555 (Happy
025 * Eyeballs)</a>.
026 *
027 * @author Lennart Grahl
028 */
029public class SocketInitiator {
030    /**
031     * A <i>wait</i> signal will be awaited by a {@link SocketRacer} before it
032     * starts to connect.
033     *
034     * When a {@link SocketRacer} <i>A</i> is done, it will unblock the
035     * following racer <i>B</i> by marking <i>B's</i> signal as <i>done</i>.
036     */
037    private class Signal
038    {
039        private final CountDownLatch mLatch;
040        private final int mMaxDelay;
041
042
043        Signal(int maxDelay)
044        {
045            mLatch    = new CountDownLatch(1);
046            mMaxDelay = maxDelay;
047        }
048
049
050        boolean isDone()
051        {
052            return mLatch.getCount() == 0;
053        }
054
055
056        void await() throws InterruptedException
057        {
058            mLatch.await(mMaxDelay, TimeUnit.MILLISECONDS);
059        }
060
061
062        void done()
063        {
064            mLatch.countDown();
065        }
066    }
067
068
069    /**
070     * This thread connects to a socket and notifies a {@link SocketFuture}
071     * shared across all racer threads when it is done. A racer thread is done
072     * when...
073     *
074     * <ul>
075     * <li>it has established a connection, or</li>
076     * <li>when establishing a connection failed with an exception, or</li>
077     * <li>another racer established a connection.</li>
078     * </ul>
079     */
080    private class SocketRacer extends Thread
081    {
082        private final SocketFuture mFuture;
083        private final SocketFactory mSocketFactory;
084        private final SocketAddress mSocketAddress;
085        private String[] mServerNames;
086        private final int mConnectTimeout;
087        private final Signal mStartSignal;
088        private final Signal mDoneSignal;
089
090
091        SocketRacer(
092                SocketFuture future, SocketFactory socketFactory, SocketAddress socketAddress,
093                String[] serverNames, int connectTimeout, Signal startSignal, Signal doneSignal)
094        {
095            mFuture         = future;
096            mSocketFactory  = socketFactory;
097            mSocketAddress  = socketAddress;
098            mServerNames    = serverNames;
099            mConnectTimeout = connectTimeout;
100            mStartSignal    = startSignal;
101            mDoneSignal     = doneSignal;
102        }
103
104
105        public void run() {
106            Socket socket = null;
107            try
108            {
109                // Await start signal.
110                if (mStartSignal != null)
111                {
112                    mStartSignal.await();
113                }
114
115                // Check if a socket has already been established.
116                if (mFuture.hasSocket())
117                {
118                    return;
119                }
120
121                // Let the socket factory create a socket.
122                socket = mSocketFactory.createSocket();
123
124                // Set up server names for SNI as necessary if possible.
125                SNIHelper.setServerNames(socket, mServerNames);
126
127                // Connect to the server (either a proxy or a WebSocket endpoint).
128                socket.connect(mSocketAddress, mConnectTimeout);
129
130                // Socket established.
131                complete(socket);
132            }
133            catch (Exception e)
134            {
135                abort(e);
136
137                if (socket != null)
138                {
139                    try
140                    {
141                        socket.close();
142                    }
143                    catch (IOException ioe)
144                    {
145                        // ignored
146                    }
147                }
148            }
149        }
150
151
152        private void complete(Socket socket)
153        {
154            synchronized (mFuture)
155            {
156                // Check if already completed or aborted.
157                if (mDoneSignal.isDone()) {
158                    return;
159                }
160
161                // Socket established.
162                mFuture.setSocket(this, socket);
163
164                // Socket racer complete.
165                mDoneSignal.done();
166            }
167        }
168
169
170        void abort(Exception exception)
171        {
172            synchronized (mFuture)
173            {
174                // Check if already completed or aborted.
175                if (mDoneSignal.isDone())
176                {
177                    return;
178                }
179
180                // Socket not established.
181                mFuture.setException(exception);
182
183                // Socket racer complete.
184                mDoneSignal.done();
185            }
186        }
187    }
188
189
190    /**
191     * The socket future is shared across all {@link SocketRacer} threads and
192     * aggregates the results. A socket future is considered fulfilled when...
193     *
194     * <ul>
195     * <li>any racer thread has established a socket in which case all
196     *     other racers will be stopped, or</li>
197     * <li>all racer threads returned with an exception, or</li>
198     * <li>there was no racer thread (e.g. in case there is no network
199     *     interface).</li>
200     * </ul>
201     *
202     * In the first case, the socket will be returned. In all other cases, an
203     * exception will be thrown, indicating the failure type.
204     */
205    private class SocketFuture
206    {
207        private CountDownLatch mLatch;
208        private List<SocketRacer> mRacers;
209        private Socket mSocket;
210        private Exception mException;
211
212
213        synchronized boolean hasSocket()
214        {
215            return mSocket != null;
216        }
217
218
219        synchronized void setSocket(SocketRacer current, Socket socket)
220        {
221            // Sanity check.
222            if (mLatch == null || mRacers == null)
223            {
224                throw new IllegalStateException("Cannot set socket before awaiting!");
225            }
226
227            // Set socket if not already set, otherwise close socket.
228            if (mSocket == null)
229            {
230                mSocket = socket;
231
232                // Stop all other racers.
233                for (SocketRacer racer: mRacers)
234                {
235                    // Skip instance that is setting the socket.
236                    if (racer == current)
237                    {
238                        continue;
239                    }
240                    racer.abort(new InterruptedException());
241                    racer.interrupt();
242                }
243            }
244            else
245            {
246                try
247                {
248                    socket.close();
249                }
250                catch (IOException e)
251                {
252                    // ignored
253                }
254            }
255
256            // Racer complete.
257            mLatch.countDown();
258        }
259
260
261        synchronized void setException(Exception exception)
262        {
263            // Sanity check.
264            if (mLatch == null || mRacers == null)
265            {
266                throw new IllegalStateException("Cannot set exception before awaiting!");
267            }
268
269            // Set exception if not already set.
270            if (mException == null)
271            {
272                mException = exception;
273            }
274
275            // Racer complete.
276            mLatch.countDown();
277        }
278
279
280        Socket await(List<SocketRacer> racers) throws Exception
281        {
282            // Store racers.
283            mRacers = racers;
284
285            // Create new latch.
286            mLatch = new CountDownLatch(mRacers.size());
287
288            // Start each racer.
289            for (SocketRacer racer: mRacers)
290            {
291                racer.start();
292            }
293
294            // Wait until all racers complete.
295            mLatch.await();
296
297            // Return the socket, if any, otherwise the first exception raised
298            if (mSocket != null)
299            {
300                return mSocket;
301            }
302            else if (mException != null)
303            {
304                throw mException;
305            }
306            else
307            {
308                throw new WebSocketException(WebSocketError.SOCKET_CONNECT_ERROR,
309                        "No viable interface to connect");
310            }
311        }
312    }
313
314
315    private final SocketFactory mSocketFactory;
316    private final Address mAddress;
317    private final int mConnectTimeout;
318    private final String[] mServerNames;
319    private final DualStackMode mMode;
320    private final int mFallbackDelay;
321
322
323    public SocketInitiator(
324            SocketFactory socketFactory, Address address, int connectTimeout, String[] serverNames,
325            DualStackMode mode, int fallbackDelay)
326    {
327        mSocketFactory  = socketFactory;
328        mAddress        = address;
329        mConnectTimeout = connectTimeout;
330        mServerNames    = serverNames;
331        mMode           = mode;
332        mFallbackDelay  = fallbackDelay;
333    }
334
335
336    public Socket establish(InetAddress[] addresses) throws Exception
337    {
338        // Create socket future.
339        SocketFuture future = new SocketFuture();
340
341        // Create socket racer for each IP address.
342        List<SocketRacer> racers = new ArrayList<SocketRacer>(addresses.length);
343        int delay = 0;
344        Signal startSignal = null;
345        for (InetAddress address: addresses)
346        {
347            // Check if the mode requires us to skip this address.
348            if (mMode == DualStackMode.IPV4_ONLY && !(address instanceof Inet4Address)
349                || mMode == DualStackMode.IPV6_ONLY && !(address instanceof Inet6Address))
350            {
351                continue;
352            }
353
354            // Increase the *happy eyeballs* delay (see RFC 6555, sec 5.5).
355            delay += mFallbackDelay;
356
357            // Create the *done* signal which acts as a *start* signal for the subsequent racer.
358            Signal doneSignal = new Signal(delay);
359
360            // Create racer to establish the socket.
361            SocketAddress socketAddress = new InetSocketAddress(address, mAddress.getPort());
362            SocketRacer racer = new SocketRacer(
363                    future, mSocketFactory, socketAddress, mServerNames, mConnectTimeout,
364                    startSignal, doneSignal);
365            racers.add(racer);
366
367            // Replace *start* signal with this racer's *done* signal.
368            startSignal = doneSignal;
369        }
370
371        // Wait until one of the sockets has been established, or all failed with an exception.
372        return future.await(racers);
373    }
374}