View Javadoc

1   /**
2    * Copyright (c) 2012, The University of Southampton and the individual contributors.
3    * All rights reserved.
4    *
5    * Redistribution and use in source and binary forms, with or without modification,
6    * are permitted provided that the following conditions are met:
7    *
8    *   * 	Redistributions of source code must retain the above copyright notice,
9    * 	this list of conditions and the following disclaimer.
10   *
11   *   *	Redistributions in binary form must reproduce the above copyright notice,
12   * 	this list of conditions and the following disclaimer in the documentation
13   * 	and/or other materials provided with the distribution.
14   *
15   *   *	Neither the name of the University of Southampton nor the names of its
16   * 	contributors may be used to endorse or promote products derived from this
17   * 	software without specific prior written permission.
18   *
19   * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
20   * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21   * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22   * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
23   * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25   * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
26   * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27   * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28   * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29   */
30  package backtype.storm.spout;
31  
32  import java.util.ArrayList;
33  import java.util.Arrays;
34  import java.util.HashSet;
35  import java.util.Iterator;
36  import java.util.LinkedList;
37  import java.util.List;
38  import java.util.Map;
39  import java.util.Queue;
40  
41  import net.lag.kestrel.thrift.Item;
42  
43  import org.apache.log4j.Logger;
44  import org.apache.thrift7.TException;
45  
46  import backtype.storm.Config;
47  import backtype.storm.task.TopologyContext;
48  import backtype.storm.topology.OutputFieldsDeclarer;
49  import backtype.storm.topology.base.BaseRichSpout;
50  import backtype.storm.tuple.Fields;
51  import backtype.storm.utils.Utils;
52  
53  /**
54   * This spout can be used to consume messages in a reliable way from a cluster
55   * of Kestrel servers. It is recommended that you set the parallelism hint to a
56   * multiple of the number of Kestrel servers, otherwise the read load will be
57   * higher on some Kestrel servers than others.
58   */
59  @SuppressWarnings("serial")
60  public class KestrelThriftSpout extends BaseRichSpout {
61  	public static Logger LOG = Logger.getLogger(KestrelThriftSpout.class);
62  
63  	public static final long BLACKLIST_TIME_MS = 1000 * 60;
64  	public static final int BATCH_SIZE = 4000;
65  
66  	private List<String> _hosts = null;
67  	private int _port = -1;
68  	private String _queueName = null;
69  	private SpoutOutputCollector _collector;
70  	private Scheme _scheme;
71  
72  	private List<KestrelClientInfo> _kestrels;
73  	private int _emitIndex;
74  
75  	private Queue<EmitItem> _emitBuffer = new LinkedList<EmitItem>();
76  
77  	private class EmitItem {
78  		public KestrelSourceId sourceId;
79  		public List<Object> tuple;
80  
81  		public EmitItem(List<Object> tuple, KestrelSourceId sourceId) {
82  			this.tuple = tuple;
83  			this.sourceId = sourceId;
84  		}
85  	}
86  
87  	private static class KestrelSourceId {
88  		public KestrelSourceId(int index, long id) {
89  			this.index = index;
90  			this.id = id;
91  		}
92  
93  		int index;
94  		long id;
95  	}
96  
97  	private static class KestrelClientInfo {
98  		public Long blacklistTillTimeMs;
99  		public String host;
100 		public int port;
101 
102 		private KestrelThriftClient client;
103 
104 		public KestrelClientInfo(String host, int port) {
105 			this.host = host;
106 			this.port = port;
107 			this.blacklistTillTimeMs = 0L;
108 			this.client = null;
109 		}
110 
111 		public KestrelThriftClient getValidClient() throws TException {
112 			if (this.client == null) { // If client was blacklisted, remake it.
113 				LOG.info("Attempting reconnect to kestrel " + this.host + ":" + this.port);
114 				this.client = new KestrelThriftClient(this.host, this.port);
115 			}
116 			return this.client;
117 		}
118 
119 		public void closeClient() {
120 			if (this.client != null) {
121 				this.client.close();
122 				this.client = null;
123 			}
124 		}
125 	}
126 
127 	public KestrelThriftSpout(List<String> hosts, int port, String queueName, Scheme scheme) {
128 		if (hosts.isEmpty()) {
129 			throw new IllegalArgumentException("Must configure at least one host");
130 		}
131 		_port = port;
132 		_hosts = hosts;
133 		_queueName = queueName;
134 		_scheme = scheme;
135 	}
136 
137 	public KestrelThriftSpout(String hostname, int port, String queueName, Scheme scheme) {
138 		this(Arrays.asList(hostname), port, queueName, scheme);
139 	}
140 
141 	public KestrelThriftSpout(String hostname, int port, String queueName) {
142 		this(hostname, port, queueName, new RawScheme());
143 	}
144 
145 	public KestrelThriftSpout(List<String> hosts, int port, String queueName) {
146 		this(hosts, port, queueName, new RawScheme());
147 	}
148 
149 	public Fields getOutputFields() {
150 		return _scheme.getOutputFields();
151 	}
152 
153 	int _messageTimeoutMillis;
154 
155 	@SuppressWarnings("rawtypes")
156 	@Override
157 	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
158 		// TODO: should switch this to maxTopologyMessageTimeout
159 		final Number timeout = (Number) conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS);
160 		_messageTimeoutMillis = 1000 * timeout.intValue();
161 		_collector = collector;
162 		_emitIndex = 0;
163 		_kestrels = new ArrayList<KestrelClientInfo>();
164 		final int numTasks = context.getComponentTasks(context.getThisComponentId()).size();
165 		final int myIndex = context.getThisTaskIndex();
166 		final int numHosts = _hosts.size();
167 		if (numTasks < numHosts) {
168 			for (final String host : _hosts) {
169 				_kestrels.add(new KestrelClientInfo(host, _port));
170 			}
171 		} else {
172 			final String host = _hosts.get(myIndex % numHosts);
173 			_kestrels.add(new KestrelClientInfo(host, _port));
174 		}
175 	}
176 
177 	@Override
178 	public void close() {
179 		for (final KestrelClientInfo info : _kestrels)
180 			info.closeClient();
181 
182 		// Closing the client connection causes all the open reliable reads to
183 		// be aborted.
184 		// Thus, clear our local buffer of these reliable reads.
185 		_emitBuffer.clear();
186 
187 		_kestrels.clear();
188 	}
189 
190 	public boolean bufferKestrelGet(int index) {
191 		assert _emitBuffer.size() == 0; // JTODO
192 
193 		final KestrelClientInfo info = _kestrels.get(index);
194 
195 		final long now = System.currentTimeMillis();
196 		if (now > info.blacklistTillTimeMs) {
197 			List<Item> items = null;
198 			try {
199 				items = info.getValidClient().get(_queueName, BATCH_SIZE, 0, _messageTimeoutMillis);
200 				// if(items.size()!=0)LOG.debug("Got this many items: " +
201 				// items.size());
202 			} catch (final TException e) {
203 				LOG.error("Error reading from client: " + e.getMessage());
204 				blacklist(info, e);
205 				return false;
206 			}
207 
208 			assert items.size() <= BATCH_SIZE;
209 			// LOG.info("Kestrel batch get fetched " + items.size() +
210 			// " items. (batchSize= " + BATCH_SIZE +
211 			// " queueName=" + _queueName + ", index=" + index + ", host=" +
212 			// info.host + ")");
213 
214 			final HashSet<Long> toAck = new HashSet<Long>();
215 
216 			for (final Item item : items) {
217 				final List<Object> retItems = _scheme.deserialize(item.get_data());
218 
219 				if (retItems != null) {
220 					final EmitItem emitItem = new EmitItem(retItems, new KestrelSourceId(index, item.get_id()));
221 
222 					if (!_emitBuffer.offer(emitItem)) {
223 						throw new RuntimeException("KestrelThriftSpout's Internal Buffer Enqeueue Failed.");
224 					}
225 				} else {
226 					toAck.add(item.get_id());
227 				}
228 			}
229 
230 			if (toAck.size() > 0) {
231 				try {
232 					info.client.confirm(_queueName, toAck);
233 				} catch (final TException e) {
234 					blacklist(info, e);
235 				}
236 			}
237 
238 			if (items.size() > 0)
239 				return true;
240 		}
241 		return false;
242 	}
243 
244 	public void tryEachKestrelUntilBufferFilled() {
245 		for (int i = 0; i < _kestrels.size(); i++) {
246 			final int index = (_emitIndex + i) % _kestrels.size();
247 			if (bufferKestrelGet(index)) {
248 				_emitIndex = index;
249 				break;
250 			}
251 		}
252 		_emitIndex = (_emitIndex + 1) % _kestrels.size();
253 	}
254 
255 	int countTriples = 1;
256 	int lastEmit = countTriples;
257 	int emptyIterations = 0;
258 
259 	@Override
260 	public void nextTuple() {
261 		if (_emitBuffer.isEmpty())
262 			tryEachKestrelUntilBufferFilled();
263 		if (countTriples % 1000 == 0 && countTriples != lastEmit) {
264 			LOG.debug("Number of triples emitted: " + countTriples);
265 			LOG.debug("Number of empty iterations: " + emptyIterations);
266 			emptyIterations = 0;
267 			lastEmit = countTriples;
268 		}
269 		final EmitItem item = _emitBuffer.poll();
270 		if (item != null) {
271 			countTriples += 1;
272 			_collector.emit(item.tuple, item.sourceId);
273 		} else { // If buffer is still empty here, then every kestrel Q is also
274 					// empty.
275 			emptyIterations++;
276 			Utils.sleep(10);
277 		}
278 	}
279 
280 	private void blacklist(KestrelClientInfo info, Throwable t) {
281 
282 		// this case can happen when it fails to connect to Kestrel (and so
283 		// never stores the connection)
284 		info.closeClient();
285 		info.blacklistTillTimeMs = System.currentTimeMillis() + BLACKLIST_TIME_MS;
286 
287 		final int index = _kestrels.indexOf(info);
288 
289 		// we just closed the connection, so all open reliable reads will be
290 		// aborted. empty buffers.
291 		for (final Iterator<EmitItem> i = _emitBuffer.iterator(); i.hasNext();) {
292 			final EmitItem item = i.next();
293 			if (item.sourceId.index == index)
294 				i.remove();
295 		}
296 	}
297 
298 	@Override
299 	public void ack(Object msgId) {
300 		final KestrelSourceId sourceId = (KestrelSourceId) msgId;
301 		final KestrelClientInfo info = _kestrels.get(sourceId.index);
302 
303 		// if the transaction didn't exist, it just returns false. so this code
304 		// works
305 		// even if client gets blacklisted, disconnects, and kestrel puts the
306 		// item
307 		// back on the queue
308 		try {
309 			if (info.client != null) {
310 				final HashSet<Long> xids = new HashSet<Long>();
311 				xids.add(sourceId.id);
312 				info.client.confirm(_queueName, xids);
313 			}
314 		} catch (final TException e) {
315 			blacklist(info, e);
316 		}
317 	}
318 
319 	@Override
320 	public void fail(Object msgId) {
321 		final KestrelSourceId sourceId = (KestrelSourceId) msgId;
322 		final KestrelClientInfo info = _kestrels.get(sourceId.index);
323 
324 		// see not above about why this works with blacklisting strategy
325 		try {
326 			if (info.client != null) {
327 				final HashSet<Long> xids = new HashSet<Long>();
328 				xids.add(sourceId.id);
329 				info.client.abort(_queueName, xids);
330 			}
331 		} catch (final TException e) {
332 			blacklist(info, e);
333 		}
334 	}
335 
336 	@Override
337 	public void declareOutputFields(OutputFieldsDeclarer declarer) {
338 		declarer.declare(getOutputFields());
339 	}
340 }