1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30 package backtype.storm.spout;
31
32 import java.util.ArrayList;
33 import java.util.Arrays;
34 import java.util.HashSet;
35 import java.util.Iterator;
36 import java.util.LinkedList;
37 import java.util.List;
38 import java.util.Map;
39 import java.util.Queue;
40
41 import net.lag.kestrel.thrift.Item;
42
43 import org.apache.log4j.Logger;
44 import org.apache.thrift7.TException;
45
46 import backtype.storm.Config;
47 import backtype.storm.task.TopologyContext;
48 import backtype.storm.topology.OutputFieldsDeclarer;
49 import backtype.storm.topology.base.BaseRichSpout;
50 import backtype.storm.tuple.Fields;
51 import backtype.storm.utils.Utils;
52
53
54
55
56
57
58
59 @SuppressWarnings("serial")
60 public class KestrelThriftSpout extends BaseRichSpout {
61 public static Logger LOG = Logger.getLogger(KestrelThriftSpout.class);
62
63 public static final long BLACKLIST_TIME_MS = 1000 * 60;
64 public static final int BATCH_SIZE = 4000;
65
66 private List<String> _hosts = null;
67 private int _port = -1;
68 private String _queueName = null;
69 private SpoutOutputCollector _collector;
70 private Scheme _scheme;
71
72 private List<KestrelClientInfo> _kestrels;
73 private int _emitIndex;
74
75 private Queue<EmitItem> _emitBuffer = new LinkedList<EmitItem>();
76
77 private class EmitItem {
78 public KestrelSourceId sourceId;
79 public List<Object> tuple;
80
81 public EmitItem(List<Object> tuple, KestrelSourceId sourceId) {
82 this.tuple = tuple;
83 this.sourceId = sourceId;
84 }
85 }
86
87 private static class KestrelSourceId {
88 public KestrelSourceId(int index, long id) {
89 this.index = index;
90 this.id = id;
91 }
92
93 int index;
94 long id;
95 }
96
97 private static class KestrelClientInfo {
98 public Long blacklistTillTimeMs;
99 public String host;
100 public int port;
101
102 private KestrelThriftClient client;
103
104 public KestrelClientInfo(String host, int port) {
105 this.host = host;
106 this.port = port;
107 this.blacklistTillTimeMs = 0L;
108 this.client = null;
109 }
110
111 public KestrelThriftClient getValidClient() throws TException {
112 if (this.client == null) {
113 LOG.info("Attempting reconnect to kestrel " + this.host + ":" + this.port);
114 this.client = new KestrelThriftClient(this.host, this.port);
115 }
116 return this.client;
117 }
118
119 public void closeClient() {
120 if (this.client != null) {
121 this.client.close();
122 this.client = null;
123 }
124 }
125 }
126
127 public KestrelThriftSpout(List<String> hosts, int port, String queueName, Scheme scheme) {
128 if (hosts.isEmpty()) {
129 throw new IllegalArgumentException("Must configure at least one host");
130 }
131 _port = port;
132 _hosts = hosts;
133 _queueName = queueName;
134 _scheme = scheme;
135 }
136
137 public KestrelThriftSpout(String hostname, int port, String queueName, Scheme scheme) {
138 this(Arrays.asList(hostname), port, queueName, scheme);
139 }
140
141 public KestrelThriftSpout(String hostname, int port, String queueName) {
142 this(hostname, port, queueName, new RawScheme());
143 }
144
145 public KestrelThriftSpout(List<String> hosts, int port, String queueName) {
146 this(hosts, port, queueName, new RawScheme());
147 }
148
149 public Fields getOutputFields() {
150 return _scheme.getOutputFields();
151 }
152
153 int _messageTimeoutMillis;
154
155 @SuppressWarnings("rawtypes")
156 @Override
157 public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
158
159 final Number timeout = (Number) conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS);
160 _messageTimeoutMillis = 1000 * timeout.intValue();
161 _collector = collector;
162 _emitIndex = 0;
163 _kestrels = new ArrayList<KestrelClientInfo>();
164 final int numTasks = context.getComponentTasks(context.getThisComponentId()).size();
165 final int myIndex = context.getThisTaskIndex();
166 final int numHosts = _hosts.size();
167 if (numTasks < numHosts) {
168 for (final String host : _hosts) {
169 _kestrels.add(new KestrelClientInfo(host, _port));
170 }
171 } else {
172 final String host = _hosts.get(myIndex % numHosts);
173 _kestrels.add(new KestrelClientInfo(host, _port));
174 }
175 }
176
177 @Override
178 public void close() {
179 for (final KestrelClientInfo info : _kestrels)
180 info.closeClient();
181
182
183
184
185 _emitBuffer.clear();
186
187 _kestrels.clear();
188 }
189
190 public boolean bufferKestrelGet(int index) {
191 assert _emitBuffer.size() == 0;
192
193 final KestrelClientInfo info = _kestrels.get(index);
194
195 final long now = System.currentTimeMillis();
196 if (now > info.blacklistTillTimeMs) {
197 List<Item> items = null;
198 try {
199 items = info.getValidClient().get(_queueName, BATCH_SIZE, 0, _messageTimeoutMillis);
200
201
202 } catch (final TException e) {
203 LOG.error("Error reading from client: " + e.getMessage());
204 blacklist(info, e);
205 return false;
206 }
207
208 assert items.size() <= BATCH_SIZE;
209
210
211
212
213
214 final HashSet<Long> toAck = new HashSet<Long>();
215
216 for (final Item item : items) {
217 final List<Object> retItems = _scheme.deserialize(item.get_data());
218
219 if (retItems != null) {
220 final EmitItem emitItem = new EmitItem(retItems, new KestrelSourceId(index, item.get_id()));
221
222 if (!_emitBuffer.offer(emitItem)) {
223 throw new RuntimeException("KestrelThriftSpout's Internal Buffer Enqeueue Failed.");
224 }
225 } else {
226 toAck.add(item.get_id());
227 }
228 }
229
230 if (toAck.size() > 0) {
231 try {
232 info.client.confirm(_queueName, toAck);
233 } catch (final TException e) {
234 blacklist(info, e);
235 }
236 }
237
238 if (items.size() > 0)
239 return true;
240 }
241 return false;
242 }
243
244 public void tryEachKestrelUntilBufferFilled() {
245 for (int i = 0; i < _kestrels.size(); i++) {
246 final int index = (_emitIndex + i) % _kestrels.size();
247 if (bufferKestrelGet(index)) {
248 _emitIndex = index;
249 break;
250 }
251 }
252 _emitIndex = (_emitIndex + 1) % _kestrels.size();
253 }
254
255 int countTriples = 1;
256 int lastEmit = countTriples;
257 int emptyIterations = 0;
258
259 @Override
260 public void nextTuple() {
261 if (_emitBuffer.isEmpty())
262 tryEachKestrelUntilBufferFilled();
263 if (countTriples % 1000 == 0 && countTriples != lastEmit) {
264 LOG.debug("Number of triples emitted: " + countTriples);
265 LOG.debug("Number of empty iterations: " + emptyIterations);
266 emptyIterations = 0;
267 lastEmit = countTriples;
268 }
269 final EmitItem item = _emitBuffer.poll();
270 if (item != null) {
271 countTriples += 1;
272 _collector.emit(item.tuple, item.sourceId);
273 } else {
274
275 emptyIterations++;
276 Utils.sleep(10);
277 }
278 }
279
280 private void blacklist(KestrelClientInfo info, Throwable t) {
281
282
283
284 info.closeClient();
285 info.blacklistTillTimeMs = System.currentTimeMillis() + BLACKLIST_TIME_MS;
286
287 final int index = _kestrels.indexOf(info);
288
289
290
291 for (final Iterator<EmitItem> i = _emitBuffer.iterator(); i.hasNext();) {
292 final EmitItem item = i.next();
293 if (item.sourceId.index == index)
294 i.remove();
295 }
296 }
297
298 @Override
299 public void ack(Object msgId) {
300 final KestrelSourceId sourceId = (KestrelSourceId) msgId;
301 final KestrelClientInfo info = _kestrels.get(sourceId.index);
302
303
304
305
306
307
308 try {
309 if (info.client != null) {
310 final HashSet<Long> xids = new HashSet<Long>();
311 xids.add(sourceId.id);
312 info.client.confirm(_queueName, xids);
313 }
314 } catch (final TException e) {
315 blacklist(info, e);
316 }
317 }
318
319 @Override
320 public void fail(Object msgId) {
321 final KestrelSourceId sourceId = (KestrelSourceId) msgId;
322 final KestrelClientInfo info = _kestrels.get(sourceId.index);
323
324
325 try {
326 if (info.client != null) {
327 final HashSet<Long> xids = new HashSet<Long>();
328 xids.add(sourceId.id);
329 info.client.abort(_queueName, xids);
330 }
331 } catch (final TException e) {
332 blacklist(info, e);
333 }
334 }
335
336 @Override
337 public void declareOutputFields(OutputFieldsDeclarer declarer) {
338 declarer.declare(getOutputFields());
339 }
340 }