001/**
002 * Copyright (c) 2012, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package backtype.storm.spout;
031
032import java.util.ArrayList;
033import java.util.Arrays;
034import java.util.HashSet;
035import java.util.Iterator;
036import java.util.LinkedList;
037import java.util.List;
038import java.util.Map;
039import java.util.Queue;
040
041import net.lag.kestrel.thrift.Item;
042
043import org.apache.log4j.Logger;
044import org.apache.thrift7.TException;
045
046import backtype.storm.Config;
047import backtype.storm.task.TopologyContext;
048import backtype.storm.topology.OutputFieldsDeclarer;
049import backtype.storm.topology.base.BaseRichSpout;
050import backtype.storm.tuple.Fields;
051import backtype.storm.utils.Utils;
052
053/**
054 * This spout can be used to consume messages in a reliable way from a cluster
055 * of Kestrel servers. It is recommended that you set the parallelism hint to a
056 * multiple of the number of Kestrel servers, otherwise the read load will be
057 * higher on some Kestrel servers than others.
058 */
059@SuppressWarnings("serial")
060public class KestrelThriftSpout extends BaseRichSpout {
061        public static Logger LOG = Logger.getLogger(KestrelThriftSpout.class);
062
063        public static final long BLACKLIST_TIME_MS = 1000 * 60;
064        public static final int BATCH_SIZE = 4000;
065
066        private List<String> _hosts = null;
067        private int _port = -1;
068        private String _queueName = null;
069        private SpoutOutputCollector _collector;
070        private Scheme _scheme;
071
072        private List<KestrelClientInfo> _kestrels;
073        private int _emitIndex;
074
075        private Queue<EmitItem> _emitBuffer = new LinkedList<EmitItem>();
076
077        private class EmitItem {
078                public KestrelSourceId sourceId;
079                public List<Object> tuple;
080
081                public EmitItem(List<Object> tuple, KestrelSourceId sourceId) {
082                        this.tuple = tuple;
083                        this.sourceId = sourceId;
084                }
085        }
086
087        private static class KestrelSourceId {
088                public KestrelSourceId(int index, long id) {
089                        this.index = index;
090                        this.id = id;
091                }
092
093                int index;
094                long id;
095        }
096
097        private static class KestrelClientInfo {
098                public Long blacklistTillTimeMs;
099                public String host;
100                public int port;
101
102                private KestrelThriftClient client;
103
104                public KestrelClientInfo(String host, int port) {
105                        this.host = host;
106                        this.port = port;
107                        this.blacklistTillTimeMs = 0L;
108                        this.client = null;
109                }
110
111                public KestrelThriftClient getValidClient() throws TException {
112                        if (this.client == null) { // If client was blacklisted, remake it.
113                                LOG.info("Attempting reconnect to kestrel " + this.host + ":" + this.port);
114                                this.client = new KestrelThriftClient(this.host, this.port);
115                        }
116                        return this.client;
117                }
118
119                public void closeClient() {
120                        if (this.client != null) {
121                                this.client.close();
122                                this.client = null;
123                        }
124                }
125        }
126
127        public KestrelThriftSpout(List<String> hosts, int port, String queueName, Scheme scheme) {
128                if (hosts.isEmpty()) {
129                        throw new IllegalArgumentException("Must configure at least one host");
130                }
131                _port = port;
132                _hosts = hosts;
133                _queueName = queueName;
134                _scheme = scheme;
135        }
136
137        public KestrelThriftSpout(String hostname, int port, String queueName, Scheme scheme) {
138                this(Arrays.asList(hostname), port, queueName, scheme);
139        }
140
141        public KestrelThriftSpout(String hostname, int port, String queueName) {
142                this(hostname, port, queueName, new RawScheme());
143        }
144
145        public KestrelThriftSpout(List<String> hosts, int port, String queueName) {
146                this(hosts, port, queueName, new RawScheme());
147        }
148
149        public Fields getOutputFields() {
150                return _scheme.getOutputFields();
151        }
152
153        int _messageTimeoutMillis;
154
155        @SuppressWarnings("rawtypes")
156        @Override
157        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
158                // TODO: should switch this to maxTopologyMessageTimeout
159                final Number timeout = (Number) conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS);
160                _messageTimeoutMillis = 1000 * timeout.intValue();
161                _collector = collector;
162                _emitIndex = 0;
163                _kestrels = new ArrayList<KestrelClientInfo>();
164                final int numTasks = context.getComponentTasks(context.getThisComponentId()).size();
165                final int myIndex = context.getThisTaskIndex();
166                final int numHosts = _hosts.size();
167                if (numTasks < numHosts) {
168                        for (final String host : _hosts) {
169                                _kestrels.add(new KestrelClientInfo(host, _port));
170                        }
171                } else {
172                        final String host = _hosts.get(myIndex % numHosts);
173                        _kestrels.add(new KestrelClientInfo(host, _port));
174                }
175        }
176
177        @Override
178        public void close() {
179                for (final KestrelClientInfo info : _kestrels)
180                        info.closeClient();
181
182                // Closing the client connection causes all the open reliable reads to
183                // be aborted.
184                // Thus, clear our local buffer of these reliable reads.
185                _emitBuffer.clear();
186
187                _kestrels.clear();
188        }
189
190        public boolean bufferKestrelGet(int index) {
191                assert _emitBuffer.size() == 0; // JTODO
192
193                final KestrelClientInfo info = _kestrels.get(index);
194
195                final long now = System.currentTimeMillis();
196                if (now > info.blacklistTillTimeMs) {
197                        List<Item> items = null;
198                        try {
199                                items = info.getValidClient().get(_queueName, BATCH_SIZE, 0, _messageTimeoutMillis);
200                                // if(items.size()!=0)LOG.debug("Got this many items: " +
201                                // items.size());
202                        } catch (final TException e) {
203                                LOG.error("Error reading from client: " + e.getMessage());
204                                blacklist(info, e);
205                                return false;
206                        }
207
208                        assert items.size() <= BATCH_SIZE;
209                        // LOG.info("Kestrel batch get fetched " + items.size() +
210                        // " items. (batchSize= " + BATCH_SIZE +
211                        // " queueName=" + _queueName + ", index=" + index + ", host=" +
212                        // info.host + ")");
213
214                        final HashSet<Long> toAck = new HashSet<Long>();
215
216                        for (final Item item : items) {
217                                final List<Object> retItems = _scheme.deserialize(item.get_data());
218
219                                if (retItems != null) {
220                                        final EmitItem emitItem = new EmitItem(retItems, new KestrelSourceId(index, item.get_id()));
221
222                                        if (!_emitBuffer.offer(emitItem)) {
223                                                throw new RuntimeException("KestrelThriftSpout's Internal Buffer Enqeueue Failed.");
224                                        }
225                                } else {
226                                        toAck.add(item.get_id());
227                                }
228                        }
229
230                        if (toAck.size() > 0) {
231                                try {
232                                        info.client.confirm(_queueName, toAck);
233                                } catch (final TException e) {
234                                        blacklist(info, e);
235                                }
236                        }
237
238                        if (items.size() > 0)
239                                return true;
240                }
241                return false;
242        }
243
244        public void tryEachKestrelUntilBufferFilled() {
245                for (int i = 0; i < _kestrels.size(); i++) {
246                        final int index = (_emitIndex + i) % _kestrels.size();
247                        if (bufferKestrelGet(index)) {
248                                _emitIndex = index;
249                                break;
250                        }
251                }
252                _emitIndex = (_emitIndex + 1) % _kestrels.size();
253        }
254
255        int countTriples = 1;
256        int lastEmit = countTriples;
257        int emptyIterations = 0;
258
259        @Override
260        public void nextTuple() {
261                if (_emitBuffer.isEmpty())
262                        tryEachKestrelUntilBufferFilled();
263                if (countTriples % 1000 == 0 && countTriples != lastEmit) {
264                        LOG.debug("Number of triples emitted: " + countTriples);
265                        LOG.debug("Number of empty iterations: " + emptyIterations);
266                        emptyIterations = 0;
267                        lastEmit = countTriples;
268                }
269                final EmitItem item = _emitBuffer.poll();
270                if (item != null) {
271                        countTriples += 1;
272                        _collector.emit(item.tuple, item.sourceId);
273                } else { // If buffer is still empty here, then every kestrel Q is also
274                                        // empty.
275                        emptyIterations++;
276                        Utils.sleep(10);
277                }
278        }
279
280        private void blacklist(KestrelClientInfo info, Throwable t) {
281
282                // this case can happen when it fails to connect to Kestrel (and so
283                // never stores the connection)
284                info.closeClient();
285                info.blacklistTillTimeMs = System.currentTimeMillis() + BLACKLIST_TIME_MS;
286
287                final int index = _kestrels.indexOf(info);
288
289                // we just closed the connection, so all open reliable reads will be
290                // aborted. empty buffers.
291                for (final Iterator<EmitItem> i = _emitBuffer.iterator(); i.hasNext();) {
292                        final EmitItem item = i.next();
293                        if (item.sourceId.index == index)
294                                i.remove();
295                }
296        }
297
298        @Override
299        public void ack(Object msgId) {
300                final KestrelSourceId sourceId = (KestrelSourceId) msgId;
301                final KestrelClientInfo info = _kestrels.get(sourceId.index);
302
303                // if the transaction didn't exist, it just returns false. so this code
304                // works
305                // even if client gets blacklisted, disconnects, and kestrel puts the
306                // item
307                // back on the queue
308                try {
309                        if (info.client != null) {
310                                final HashSet<Long> xids = new HashSet<Long>();
311                                xids.add(sourceId.id);
312                                info.client.confirm(_queueName, xids);
313                        }
314                } catch (final TException e) {
315                        blacklist(info, e);
316                }
317        }
318
319        @Override
320        public void fail(Object msgId) {
321                final KestrelSourceId sourceId = (KestrelSourceId) msgId;
322                final KestrelClientInfo info = _kestrels.get(sourceId.index);
323
324                // see not above about why this works with blacklisting strategy
325                try {
326                        if (info.client != null) {
327                                final HashSet<Long> xids = new HashSet<Long>();
328                                xids.add(sourceId.id);
329                                info.client.abort(_queueName, xids);
330                        }
331                } catch (final TException e) {
332                        blacklist(info, e);
333                }
334        }
335
336        @Override
337        public void declareOutputFields(OutputFieldsDeclarer declarer) {
338                declarer.declare(getOutputFields());
339        }
340}