001/** 002 * Copyright (c) 2012, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package backtype.storm.spout; 031 032import java.util.ArrayList; 033import java.util.HashSet; 034import java.util.Iterator; 035import java.util.LinkedList; 036import java.util.List; 037import java.util.Map; 038import java.util.Queue; 039import java.util.Set; 040 041import net.lag.kestrel.thrift.Item; 042 043import org.apache.log4j.Logger; 044import org.apache.thrift7.TException; 045import org.openimaj.kestrel.KestrelServerSpec; 046import org.openimaj.time.Timer; 047 048import backtype.storm.task.TopologyContext; 049import backtype.storm.topology.OutputFieldsDeclarer; 050import backtype.storm.topology.base.BaseRichSpout; 051import backtype.storm.utils.Utils; 052 053import com.google.gson.Gson; 054import com.google.gson.GsonBuilder; 055 056/** 057 * A version of the {@link KestrelThriftSpout} that is purposefully unreliable 058 * and cuts other corners in an effort for improved speeds 059 * 060 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 061 * 062 */ 063public class UnreliableKestrelThriftSpout extends BaseRichSpout { 064 065 private class EmitItem { 066 public KestrelSourceId sourceId; 067 public List<Object> tuple; 068 069 public EmitItem(List<Object> tuple, KestrelSourceId sourceId) { 070 this.tuple = tuple; 071 this.sourceId = sourceId; 072 } 073 } 074 075 private static class KestrelSourceId { 076 public KestrelSourceId(int index, long id) { 077 this.index = index; 078 this.id = id; 079 } 080 081 int index; 082 long id; 083 084 @Override 085 public String toString() { 086 return String.format("{client:%s,id:%s}", index, id); 087 } 088 } 089 090 /** 091 * 092 */ 093 private static final long serialVersionUID = -3531693744499668571L; 094 private static final int HOLD_ITEMS = 1000; 095 private static final int TIMEOUT = 100; 096 private static final Logger logger = Logger.getLogger(UnreliableKestrelThriftSpout.class); 097 private List<KestrelServerSpec> clients; 098 private Scheme scheme; 099 private Queue<EmitItem> tuples; 100 private String queue; 101 private int MAX_ITEMS_PER_QUEUE; 102 private SpoutOutputCollector collector; 103 private List<String> hosts; 104 private int port; 105 private String ackQueue; 106 private Iterator<KestrelThriftClient> ackIterator; 107 108 /** 109 * @param serverSpecs 110 * servers to connect to in a round robin fasion 111 * @param scheme 112 * how items should be read 113 * @param inputQueue 114 * queue from which to read 115 */ 116 public UnreliableKestrelThriftSpout( 117 List<KestrelServerSpec> serverSpecs, 118 Scheme scheme, 119 String inputQueue) 120 { 121 this.scheme = scheme; 122 this.queue = inputQueue; 123 this.port = -1; 124 this.hosts = new ArrayList<String>(); 125 for (final KestrelServerSpec kestrelServerSpec : serverSpecs) { 126 this.hosts.add(kestrelServerSpec.host); 127 this.port = kestrelServerSpec.port; 128 } 129 this.ackQueue = null; 130 } 131 132 /** 133 * The ackQueue holds statistics about acknowledgement time useful for 134 * measuring topology throughput 135 * 136 * @param ackQueue 137 */ 138 public void setAckQueue(String ackQueue) { 139 this.ackQueue = ackQueue; 140 } 141 142 @Override 143 public void open(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, SpoutOutputCollector collector) { 144 this.collector = collector; 145 this.clients = new ArrayList<KestrelServerSpec>(); 146 for (final String specs : this.hosts) { 147 clients.add(new KestrelServerSpec(specs, this.port)); 148 } 149 MAX_ITEMS_PER_QUEUE = HOLD_ITEMS / this.clients.size(); 150 this.tuples = new LinkedList<EmitItem>(); 151 this.ackIterator = KestrelServerSpec.thriftClientIterator(clients); 152 153 } 154 155 @Override 156 public void close() { 157 for (final KestrelServerSpec client : this.clients) 158 client.close(); 159 } 160 161 @Override 162 public void nextTuple() { 163 getSomeMoreTuples(); 164 if (this.tuples.size() == 0) 165 { 166 Utils.sleep(10); 167 return; 168 } 169 final EmitItem poll = this.tuples.poll(); 170 collector.emit(poll.tuple, poll.sourceId); 171 // collector.emit(poll.tuple); 172 } 173 174 int readTotal = 0; 175 176 private void getSomeMoreTuples() { 177 if (this.tuples.size() > HOLD_ITEMS - MAX_ITEMS_PER_QUEUE / 2) 178 return; 179 int clientIndex = 0; 180 for (final KestrelServerSpec clientSpec : this.clients) { 181 try { 182 final KestrelThriftClient client = clientSpec.getValidClient(); 183 final List<Item> ret = client 184 .get(this.queue, MAX_ITEMS_PER_QUEUE, TIMEOUT, TIMEOUT * MAX_ITEMS_PER_QUEUE); 185 readTotal += ret.size(); 186 logger.debug("Read total: " + readTotal); 187 final Set<Long> ids = new HashSet<Long>(); 188 for (final Item item : ret) { 189 final long kestrelId = item.get_id(); 190 ids.add(kestrelId); 191 final List<Object> deserialize = scheme.deserialize(item.get_data()); 192 if (deserialize == null) 193 continue; // we silently skip null items 194 195 final EmitItem e = new EmitItem(deserialize, new KestrelSourceId(clientIndex, kestrelId)); 196 this.tuples.add(e); 197 } 198 // We immediately confirm ALL items. This is the thing that 199 // makes it unreliable! 200 client.confirm(this.queue, ids); 201 } catch (final TException e) { 202 } 203 clientIndex++; 204 } 205 } 206 207 @Override 208 public void declareOutputFields(OutputFieldsDeclarer declarer) { 209 declarer.declare(scheme.getOutputFields()); 210 } 211 212 private final static Gson gson = new GsonBuilder().create(); 213 int acked = 0; 214 private Timer ackTimer; 215 216 @Override 217 public void ack(Object msgId) { 218 if (acked == 0) { 219 ackTimer = Timer.timer(); 220 } 221 acked++; 222 if (acked % 1000 == 0) { 223 emitToAckQueue(); 224 } 225 } 226 227 private void emitToAckQueue() { 228 logger.debug("Acked: " + acked); 229 final float throughput = acked / ((float) ackTimer.duration() / 1000); 230 if (this.ackQueue != null) { 231 final AckStats stats = new AckStats(throughput); 232 final KestrelThriftClient client = getNextValidClient(); 233 try { 234 client.put(this.ackQueue, gson.toJson(stats), 0); 235 } catch (final TException e) { 236 logger.error("Failed to write acknowledgement"); 237 } 238 } 239 } 240 241 private KestrelThriftClient getNextValidClient() { 242 return this.ackIterator.next(); 243 } 244 245 @Override 246 public void fail(Object msgId) { 247 final KestrelSourceId sourceId = (KestrelSourceId) msgId; 248 logger.debug("Failing: " + sourceId); 249 } 250 251}