001/**
002 * Copyright (c) 2012, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.kestrel;
031
032import java.util.ArrayList;
033import java.util.Iterator;
034import java.util.List;
035
036import org.apache.commons.lang.StringUtils;
037import org.apache.log4j.Logger;
038import org.apache.thrift7.TException;
039
040import backtype.storm.spout.KestrelThriftClient;
041
042/**
043 * Define a connection to a single or set of Kestrel servers
044 * 
045 * @author Sina Samangooei (ss@ecs.soton.ac.uk)
046 * 
047 */
048public class KestrelServerSpec {
049
050        /**
051         * the default kestrel memcached port
052         */
053        public static final int DEFAULT_KESTREL_MEMCACHED_PORT = 22133;
054
055        /**
056         * the default kestrel thrift port
057         */
058        public static final int DEFAULT_KESTREL_THRIFT_PORT = 2229;
059
060        /**
061         * the default kestrel text protocol port
062         */
063        public static final int DEFAULT_KESTREL_TEXT_PORT = 2222;
064
065        /**
066         * the localhost
067         */
068        public static final String LOCALHOST = "127.0.0.1";
069
070        private static final Logger logger = Logger.getLogger(KestrelServerSpec.class);
071        /**
072         * the kestrel host
073         */
074        public String host;
075        /**
076         * the kestrel host port
077         */
078        public int port;
079
080        private KestrelThriftClient client;
081
082        /**
083         * A single kestrel host
084         * 
085         * @param kestrelHost
086         * @param port
087         */
088        public KestrelServerSpec(String kestrelHost, int port) {
089                this.host = kestrelHost;
090                this.port = port;
091        }
092
093        private KestrelServerSpec() {
094                this.host = LOCALHOST;
095        }
096
097        /**
098         * @return a local server spec using memcached
099         */
100        public static KestrelServerSpec localMemcached() {
101                final KestrelServerSpec ret = new KestrelServerSpec();
102                ret.port = DEFAULT_KESTREL_MEMCACHED_PORT;
103                return ret;
104        }
105
106        /**
107         * @return a local server spec using thrift
108         */
109        public static KestrelServerSpec localThrift() {
110                final KestrelServerSpec ret = new KestrelServerSpec();
111                ret.port = DEFAULT_KESTREL_THRIFT_PORT;
112                return ret;
113        }
114
115        /**
116         * @return a local server spec using text
117         */
118        public static KestrelServerSpec localText() {
119                final KestrelServerSpec ret = new KestrelServerSpec();
120                ret.port = DEFAULT_KESTREL_TEXT_PORT;
121                return ret;
122        }
123
124        /**
125         * Parse a list of strings in the format: host:port. If either host or port
126         * is left blank then the default is used
127         * 
128         * @param kestrelHosts
129         * @return all server specs
130         */
131        public static List<KestrelServerSpec> parseKestrelAddressList(List<String> kestrelHosts) {
132                final List<KestrelServerSpec> ret = new ArrayList<KestrelServerSpec>();
133                for (final String hostport : kestrelHosts) {
134                        String host = "";
135                        String port = "";
136                        if (hostport.contains(":")) {
137                                final int split = hostport.lastIndexOf(":");
138                                host = hostport.substring(0, split);
139                                port = hostport.substring(split + 1);
140                        }
141                        else {
142                                host = hostport;
143                        }
144                        if (host.length() == 0)
145                                host = KestrelServerSpec.LOCALHOST;
146                        if (port.length() == 0)
147                                port = "" + KestrelServerSpec.DEFAULT_KESTREL_THRIFT_PORT;
148                        ret.add(new KestrelServerSpec(host, Integer.parseInt(port)));
149                }
150                return ret;
151        }
152
153        /**
154         * Construct a string that looks like this: "host1:port1 host2:port2" from
155         * the list of {@link KestrelServerSpec}
156         * 
157         * @param kestrelSpecList
158         * @param port
159         * @return a string that looks like this: "host1:port1 host2:port2"
160         */
161        public static String kestrelAddressListAsString(List<KestrelServerSpec> kestrelSpecList, int port) {
162                final List<String> retList = new ArrayList<String>();
163                for (final KestrelServerSpec kestrelServerSpec : kestrelSpecList) {
164                        retList.add(String.format("%s:%s", kestrelServerSpec.host, port));
165                }
166                return StringUtils.join(retList, " ");
167        }
168
169        /**
170         * Get a valid Kestrel client, reconnecting if necessary
171         * 
172         * @return a valid client
173         * @throws TException
174         */
175        public KestrelThriftClient getValidClient() throws TException {
176                if (this.client == null) { // If client was blacklisted, remake it.
177                        logger.info("Attempting reconnect to kestrel " + this.host + ":" + this.port);
178                        this.client = new KestrelThriftClient(this.host, this.port);
179                }
180                return this.client;
181        }
182
183        /**
184         * An iterator to access a list of {@link KestrelServerSpec} in a round
185         * robin fasion. This iterator will always return a next as long as there
186         * are {@link KestrelServerSpec} in the provided list.
187         * 
188         * @param kestrelSpecList
189         * @return the iterator
190         */
191        public static Iterator<KestrelThriftClient> thriftClientIterator(final List<KestrelServerSpec> kestrelSpecList) {
192                return new Iterator<KestrelThriftClient>() {
193                        int index = 0;
194
195                        @Override
196                        public void remove() {
197                                throw new UnsupportedOperationException();
198                        }
199
200                        @Override
201                        public KestrelThriftClient next() {
202                                final int startIndex = index;
203                                do {
204                                        final KestrelServerSpec toRet = kestrelSpecList.get(index);
205                                        index++;
206                                        if (index >= kestrelSpecList.size()) {
207                                                index = 0;
208                                        }
209                                        try {
210                                                return toRet.getValidClient();
211                                        } catch (final TException e) {
212                                        }
213                                } while (index != startIndex);
214                                throw new RuntimeException("Couldn't find valid client");
215                        }
216
217                        @Override
218                        public boolean hasNext() {
219                                return kestrelSpecList.size() > 0;
220                        }
221                };
222        }
223
224        public void close() {
225                this.client.close();
226        }
227
228}