001/** 002 * Copyright (c) 2012, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package backtype.storm.spout; 031 032import java.util.ArrayList; 033import java.util.Arrays; 034import java.util.HashSet; 035import java.util.Iterator; 036import java.util.LinkedList; 037import java.util.List; 038import java.util.Map; 039import java.util.Queue; 040 041import net.lag.kestrel.thrift.Item; 042 043import org.apache.log4j.Logger; 044import org.apache.thrift7.TException; 045 046import backtype.storm.Config; 047import backtype.storm.task.TopologyContext; 048import backtype.storm.topology.OutputFieldsDeclarer; 049import backtype.storm.topology.base.BaseRichSpout; 050import backtype.storm.tuple.Fields; 051import backtype.storm.utils.Utils; 052 053/** 054 * This spout can be used to consume messages in a reliable way from a cluster 055 * of Kestrel servers. It is recommended that you set the parallelism hint to a 056 * multiple of the number of Kestrel servers, otherwise the read load will be 057 * higher on some Kestrel servers than others. 058 */ 059@SuppressWarnings("serial") 060public class KestrelThriftSpout extends BaseRichSpout { 061 public static Logger LOG = Logger.getLogger(KestrelThriftSpout.class); 062 063 public static final long BLACKLIST_TIME_MS = 1000 * 60; 064 public static final int BATCH_SIZE = 4000; 065 066 private List<String> _hosts = null; 067 private int _port = -1; 068 private String _queueName = null; 069 private SpoutOutputCollector _collector; 070 private Scheme _scheme; 071 072 private List<KestrelClientInfo> _kestrels; 073 private int _emitIndex; 074 075 private Queue<EmitItem> _emitBuffer = new LinkedList<EmitItem>(); 076 077 private class EmitItem { 078 public KestrelSourceId sourceId; 079 public List<Object> tuple; 080 081 public EmitItem(List<Object> tuple, KestrelSourceId sourceId) { 082 this.tuple = tuple; 083 this.sourceId = sourceId; 084 } 085 } 086 087 private static class KestrelSourceId { 088 public KestrelSourceId(int index, long id) { 089 this.index = index; 090 this.id = id; 091 } 092 093 int index; 094 long id; 095 } 096 097 private static class KestrelClientInfo { 098 public Long blacklistTillTimeMs; 099 public String host; 100 public int port; 101 102 private KestrelThriftClient client; 103 104 public KestrelClientInfo(String host, int port) { 105 this.host = host; 106 this.port = port; 107 this.blacklistTillTimeMs = 0L; 108 this.client = null; 109 } 110 111 public KestrelThriftClient getValidClient() throws TException { 112 if (this.client == null) { // If client was blacklisted, remake it. 113 LOG.info("Attempting reconnect to kestrel " + this.host + ":" + this.port); 114 this.client = new KestrelThriftClient(this.host, this.port); 115 } 116 return this.client; 117 } 118 119 public void closeClient() { 120 if (this.client != null) { 121 this.client.close(); 122 this.client = null; 123 } 124 } 125 } 126 127 public KestrelThriftSpout(List<String> hosts, int port, String queueName, Scheme scheme) { 128 if (hosts.isEmpty()) { 129 throw new IllegalArgumentException("Must configure at least one host"); 130 } 131 _port = port; 132 _hosts = hosts; 133 _queueName = queueName; 134 _scheme = scheme; 135 } 136 137 public KestrelThriftSpout(String hostname, int port, String queueName, Scheme scheme) { 138 this(Arrays.asList(hostname), port, queueName, scheme); 139 } 140 141 public KestrelThriftSpout(String hostname, int port, String queueName) { 142 this(hostname, port, queueName, new RawScheme()); 143 } 144 145 public KestrelThriftSpout(List<String> hosts, int port, String queueName) { 146 this(hosts, port, queueName, new RawScheme()); 147 } 148 149 public Fields getOutputFields() { 150 return _scheme.getOutputFields(); 151 } 152 153 int _messageTimeoutMillis; 154 155 @SuppressWarnings("rawtypes") 156 @Override 157 public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { 158 // TODO: should switch this to maxTopologyMessageTimeout 159 final Number timeout = (Number) conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS); 160 _messageTimeoutMillis = 1000 * timeout.intValue(); 161 _collector = collector; 162 _emitIndex = 0; 163 _kestrels = new ArrayList<KestrelClientInfo>(); 164 final int numTasks = context.getComponentTasks(context.getThisComponentId()).size(); 165 final int myIndex = context.getThisTaskIndex(); 166 final int numHosts = _hosts.size(); 167 if (numTasks < numHosts) { 168 for (final String host : _hosts) { 169 _kestrels.add(new KestrelClientInfo(host, _port)); 170 } 171 } else { 172 final String host = _hosts.get(myIndex % numHosts); 173 _kestrels.add(new KestrelClientInfo(host, _port)); 174 } 175 } 176 177 @Override 178 public void close() { 179 for (final KestrelClientInfo info : _kestrels) 180 info.closeClient(); 181 182 // Closing the client connection causes all the open reliable reads to 183 // be aborted. 184 // Thus, clear our local buffer of these reliable reads. 185 _emitBuffer.clear(); 186 187 _kestrels.clear(); 188 } 189 190 public boolean bufferKestrelGet(int index) { 191 assert _emitBuffer.size() == 0; // JTODO 192 193 final KestrelClientInfo info = _kestrels.get(index); 194 195 final long now = System.currentTimeMillis(); 196 if (now > info.blacklistTillTimeMs) { 197 List<Item> items = null; 198 try { 199 items = info.getValidClient().get(_queueName, BATCH_SIZE, 0, _messageTimeoutMillis); 200 // if(items.size()!=0)LOG.debug("Got this many items: " + 201 // items.size()); 202 } catch (final TException e) { 203 LOG.error("Error reading from client: " + e.getMessage()); 204 blacklist(info, e); 205 return false; 206 } 207 208 assert items.size() <= BATCH_SIZE; 209 // LOG.info("Kestrel batch get fetched " + items.size() + 210 // " items. (batchSize= " + BATCH_SIZE + 211 // " queueName=" + _queueName + ", index=" + index + ", host=" + 212 // info.host + ")"); 213 214 final HashSet<Long> toAck = new HashSet<Long>(); 215 216 for (final Item item : items) { 217 final List<Object> retItems = _scheme.deserialize(item.get_data()); 218 219 if (retItems != null) { 220 final EmitItem emitItem = new EmitItem(retItems, new KestrelSourceId(index, item.get_id())); 221 222 if (!_emitBuffer.offer(emitItem)) { 223 throw new RuntimeException("KestrelThriftSpout's Internal Buffer Enqeueue Failed."); 224 } 225 } else { 226 toAck.add(item.get_id()); 227 } 228 } 229 230 if (toAck.size() > 0) { 231 try { 232 info.client.confirm(_queueName, toAck); 233 } catch (final TException e) { 234 blacklist(info, e); 235 } 236 } 237 238 if (items.size() > 0) 239 return true; 240 } 241 return false; 242 } 243 244 public void tryEachKestrelUntilBufferFilled() { 245 for (int i = 0; i < _kestrels.size(); i++) { 246 final int index = (_emitIndex + i) % _kestrels.size(); 247 if (bufferKestrelGet(index)) { 248 _emitIndex = index; 249 break; 250 } 251 } 252 _emitIndex = (_emitIndex + 1) % _kestrels.size(); 253 } 254 255 int countTriples = 1; 256 int lastEmit = countTriples; 257 int emptyIterations = 0; 258 259 @Override 260 public void nextTuple() { 261 if (_emitBuffer.isEmpty()) 262 tryEachKestrelUntilBufferFilled(); 263 if (countTriples % 1000 == 0 && countTriples != lastEmit) { 264 LOG.debug("Number of triples emitted: " + countTriples); 265 LOG.debug("Number of empty iterations: " + emptyIterations); 266 emptyIterations = 0; 267 lastEmit = countTriples; 268 } 269 final EmitItem item = _emitBuffer.poll(); 270 if (item != null) { 271 countTriples += 1; 272 _collector.emit(item.tuple, item.sourceId); 273 } else { // If buffer is still empty here, then every kestrel Q is also 274 // empty. 275 emptyIterations++; 276 Utils.sleep(10); 277 } 278 } 279 280 private void blacklist(KestrelClientInfo info, Throwable t) { 281 282 // this case can happen when it fails to connect to Kestrel (and so 283 // never stores the connection) 284 info.closeClient(); 285 info.blacklistTillTimeMs = System.currentTimeMillis() + BLACKLIST_TIME_MS; 286 287 final int index = _kestrels.indexOf(info); 288 289 // we just closed the connection, so all open reliable reads will be 290 // aborted. empty buffers. 291 for (final Iterator<EmitItem> i = _emitBuffer.iterator(); i.hasNext();) { 292 final EmitItem item = i.next(); 293 if (item.sourceId.index == index) 294 i.remove(); 295 } 296 } 297 298 @Override 299 public void ack(Object msgId) { 300 final KestrelSourceId sourceId = (KestrelSourceId) msgId; 301 final KestrelClientInfo info = _kestrels.get(sourceId.index); 302 303 // if the transaction didn't exist, it just returns false. so this code 304 // works 305 // even if client gets blacklisted, disconnects, and kestrel puts the 306 // item 307 // back on the queue 308 try { 309 if (info.client != null) { 310 final HashSet<Long> xids = new HashSet<Long>(); 311 xids.add(sourceId.id); 312 info.client.confirm(_queueName, xids); 313 } 314 } catch (final TException e) { 315 blacklist(info, e); 316 } 317 } 318 319 @Override 320 public void fail(Object msgId) { 321 final KestrelSourceId sourceId = (KestrelSourceId) msgId; 322 final KestrelClientInfo info = _kestrels.get(sourceId.index); 323 324 // see not above about why this works with blacklisting strategy 325 try { 326 if (info.client != null) { 327 final HashSet<Long> xids = new HashSet<Long>(); 328 xids.add(sourceId.id); 329 info.client.abort(_queueName, xids); 330 } 331 } catch (final TException e) { 332 blacklist(info, e); 333 } 334 } 335 336 @Override 337 public void declareOutputFields(OutputFieldsDeclarer declarer) { 338 declarer.declare(getOutputFields()); 339 } 340}