001/** 002 * Copyright (c) 2012, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.kestrel; 031 032import java.util.ArrayList; 033import java.util.Iterator; 034import java.util.List; 035 036import org.apache.commons.lang.StringUtils; 037import org.apache.log4j.Logger; 038import org.apache.thrift7.TException; 039 040import backtype.storm.spout.KestrelThriftClient; 041 042/** 043 * Define a connection to a single or set of Kestrel servers 044 * 045 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 046 * 047 */ 048public class KestrelServerSpec { 049 050 /** 051 * the default kestrel memcached port 052 */ 053 public static final int DEFAULT_KESTREL_MEMCACHED_PORT = 22133; 054 055 /** 056 * the default kestrel thrift port 057 */ 058 public static final int DEFAULT_KESTREL_THRIFT_PORT = 2229; 059 060 /** 061 * the default kestrel text protocol port 062 */ 063 public static final int DEFAULT_KESTREL_TEXT_PORT = 2222; 064 065 /** 066 * the localhost 067 */ 068 public static final String LOCALHOST = "127.0.0.1"; 069 070 private static final Logger logger = Logger.getLogger(KestrelServerSpec.class); 071 /** 072 * the kestrel host 073 */ 074 public String host; 075 /** 076 * the kestrel host port 077 */ 078 public int port; 079 080 private KestrelThriftClient client; 081 082 /** 083 * A single kestrel host 084 * 085 * @param kestrelHost 086 * @param port 087 */ 088 public KestrelServerSpec(String kestrelHost, int port) { 089 this.host = kestrelHost; 090 this.port = port; 091 } 092 093 private KestrelServerSpec() { 094 this.host = LOCALHOST; 095 } 096 097 /** 098 * @return a local server spec using memcached 099 */ 100 public static KestrelServerSpec localMemcached() { 101 final KestrelServerSpec ret = new KestrelServerSpec(); 102 ret.port = DEFAULT_KESTREL_MEMCACHED_PORT; 103 return ret; 104 } 105 106 /** 107 * @return a local server spec using thrift 108 */ 109 public static KestrelServerSpec localThrift() { 110 final KestrelServerSpec ret = new KestrelServerSpec(); 111 ret.port = DEFAULT_KESTREL_THRIFT_PORT; 112 return ret; 113 } 114 115 /** 116 * @return a local server spec using text 117 */ 118 public static KestrelServerSpec localText() { 119 final KestrelServerSpec ret = new KestrelServerSpec(); 120 ret.port = DEFAULT_KESTREL_TEXT_PORT; 121 return ret; 122 } 123 124 /** 125 * Parse a list of strings in the format: host:port. If either host or port 126 * is left blank then the default is used 127 * 128 * @param kestrelHosts 129 * @return all server specs 130 */ 131 public static List<KestrelServerSpec> parseKestrelAddressList(List<String> kestrelHosts) { 132 final List<KestrelServerSpec> ret = new ArrayList<KestrelServerSpec>(); 133 for (final String hostport : kestrelHosts) { 134 String host = ""; 135 String port = ""; 136 if (hostport.contains(":")) { 137 final int split = hostport.lastIndexOf(":"); 138 host = hostport.substring(0, split); 139 port = hostport.substring(split + 1); 140 } 141 else { 142 host = hostport; 143 } 144 if (host.length() == 0) 145 host = KestrelServerSpec.LOCALHOST; 146 if (port.length() == 0) 147 port = "" + KestrelServerSpec.DEFAULT_KESTREL_THRIFT_PORT; 148 ret.add(new KestrelServerSpec(host, Integer.parseInt(port))); 149 } 150 return ret; 151 } 152 153 /** 154 * Construct a string that looks like this: "host1:port1 host2:port2" from 155 * the list of {@link KestrelServerSpec} 156 * 157 * @param kestrelSpecList 158 * @param port 159 * @return a string that looks like this: "host1:port1 host2:port2" 160 */ 161 public static String kestrelAddressListAsString(List<KestrelServerSpec> kestrelSpecList, int port) { 162 final List<String> retList = new ArrayList<String>(); 163 for (final KestrelServerSpec kestrelServerSpec : kestrelSpecList) { 164 retList.add(String.format("%s:%s", kestrelServerSpec.host, port)); 165 } 166 return StringUtils.join(retList, " "); 167 } 168 169 /** 170 * Get a valid Kestrel client, reconnecting if necessary 171 * 172 * @return a valid client 173 * @throws TException 174 */ 175 public KestrelThriftClient getValidClient() throws TException { 176 if (this.client == null) { // If client was blacklisted, remake it. 177 logger.info("Attempting reconnect to kestrel " + this.host + ":" + this.port); 178 this.client = new KestrelThriftClient(this.host, this.port); 179 } 180 return this.client; 181 } 182 183 /** 184 * An iterator to access a list of {@link KestrelServerSpec} in a round 185 * robin fasion. This iterator will always return a next as long as there 186 * are {@link KestrelServerSpec} in the provided list. 187 * 188 * @param kestrelSpecList 189 * @return the iterator 190 */ 191 public static Iterator<KestrelThriftClient> thriftClientIterator(final List<KestrelServerSpec> kestrelSpecList) { 192 return new Iterator<KestrelThriftClient>() { 193 int index = 0; 194 195 @Override 196 public void remove() { 197 throw new UnsupportedOperationException(); 198 } 199 200 @Override 201 public KestrelThriftClient next() { 202 final int startIndex = index; 203 do { 204 final KestrelServerSpec toRet = kestrelSpecList.get(index); 205 index++; 206 if (index >= kestrelSpecList.size()) { 207 index = 0; 208 } 209 try { 210 return toRet.getValidClient(); 211 } catch (final TException e) { 212 } 213 } while (index != startIndex); 214 throw new RuntimeException("Couldn't find valid client"); 215 } 216 217 @Override 218 public boolean hasNext() { 219 return kestrelSpecList.size() > 0; 220 } 221 }; 222 } 223 224 public void close() { 225 this.client.close(); 226 } 227 228}