001/**
002 * Copyright (c) 2012, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package backtype.storm.spout;
031
032import java.util.ArrayList;
033import java.util.HashSet;
034import java.util.Iterator;
035import java.util.LinkedList;
036import java.util.List;
037import java.util.Map;
038import java.util.Queue;
039import java.util.Set;
040
041import net.lag.kestrel.thrift.Item;
042
043import org.apache.log4j.Logger;
044import org.apache.thrift7.TException;
045import org.openimaj.kestrel.KestrelServerSpec;
046import org.openimaj.time.Timer;
047
048import backtype.storm.task.TopologyContext;
049import backtype.storm.topology.OutputFieldsDeclarer;
050import backtype.storm.topology.base.BaseRichSpout;
051import backtype.storm.utils.Utils;
052
053import com.google.gson.Gson;
054import com.google.gson.GsonBuilder;
055
056/**
057 * A version of the {@link KestrelThriftSpout} that is purposefully unreliable
058 * and cuts other corners in an effort for improved speeds
059 * 
060 * @author Sina Samangooei (ss@ecs.soton.ac.uk)
061 * 
062 */
063public class UnreliableKestrelThriftSpout extends BaseRichSpout {
064
065        private class EmitItem {
066                public KestrelSourceId sourceId;
067                public List<Object> tuple;
068
069                public EmitItem(List<Object> tuple, KestrelSourceId sourceId) {
070                        this.tuple = tuple;
071                        this.sourceId = sourceId;
072                }
073        }
074
075        private static class KestrelSourceId {
076                public KestrelSourceId(int index, long id) {
077                        this.index = index;
078                        this.id = id;
079                }
080
081                int index;
082                long id;
083
084                @Override
085                public String toString() {
086                        return String.format("{client:%s,id:%s}", index, id);
087                }
088        }
089
090        /**
091         * 
092         */
093        private static final long serialVersionUID = -3531693744499668571L;
094        private static final int HOLD_ITEMS = 1000;
095        private static final int TIMEOUT = 100;
096        private static final Logger logger = Logger.getLogger(UnreliableKestrelThriftSpout.class);
097        private List<KestrelServerSpec> clients;
098        private Scheme scheme;
099        private Queue<EmitItem> tuples;
100        private String queue;
101        private int MAX_ITEMS_PER_QUEUE;
102        private SpoutOutputCollector collector;
103        private List<String> hosts;
104        private int port;
105        private String ackQueue;
106        private Iterator<KestrelThriftClient> ackIterator;
107
108        /**
109         * @param serverSpecs
110         *            servers to connect to in a round robin fasion
111         * @param scheme
112         *            how items should be read
113         * @param inputQueue
114         *            queue from which to read
115         */
116        public UnreliableKestrelThriftSpout(
117                        List<KestrelServerSpec> serverSpecs,
118                        Scheme scheme,
119                        String inputQueue)
120        {
121                this.scheme = scheme;
122                this.queue = inputQueue;
123                this.port = -1;
124                this.hosts = new ArrayList<String>();
125                for (final KestrelServerSpec kestrelServerSpec : serverSpecs) {
126                        this.hosts.add(kestrelServerSpec.host);
127                        this.port = kestrelServerSpec.port;
128                }
129                this.ackQueue = null;
130        }
131
132        /**
133         * The ackQueue holds statistics about acknowledgement time useful for
134         * measuring topology throughput
135         * 
136         * @param ackQueue
137         */
138        public void setAckQueue(String ackQueue) {
139                this.ackQueue = ackQueue;
140        }
141
142        @Override
143        public void open(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, SpoutOutputCollector collector) {
144                this.collector = collector;
145                this.clients = new ArrayList<KestrelServerSpec>();
146                for (final String specs : this.hosts) {
147                        clients.add(new KestrelServerSpec(specs, this.port));
148                }
149                MAX_ITEMS_PER_QUEUE = HOLD_ITEMS / this.clients.size();
150                this.tuples = new LinkedList<EmitItem>();
151                this.ackIterator = KestrelServerSpec.thriftClientIterator(clients);
152
153        }
154
155        @Override
156        public void close() {
157                for (final KestrelServerSpec client : this.clients)
158                        client.close();
159        }
160
161        @Override
162        public void nextTuple() {
163                getSomeMoreTuples();
164                if (this.tuples.size() == 0)
165                {
166                        Utils.sleep(10);
167                        return;
168                }
169                final EmitItem poll = this.tuples.poll();
170                collector.emit(poll.tuple, poll.sourceId);
171                // collector.emit(poll.tuple);
172        }
173
174        int readTotal = 0;
175
176        private void getSomeMoreTuples() {
177                if (this.tuples.size() > HOLD_ITEMS - MAX_ITEMS_PER_QUEUE / 2)
178                        return;
179                int clientIndex = 0;
180                for (final KestrelServerSpec clientSpec : this.clients) {
181                        try {
182                                final KestrelThriftClient client = clientSpec.getValidClient();
183                                final List<Item> ret = client
184                                                .get(this.queue, MAX_ITEMS_PER_QUEUE, TIMEOUT, TIMEOUT * MAX_ITEMS_PER_QUEUE);
185                                readTotal += ret.size();
186                                logger.debug("Read total: " + readTotal);
187                                final Set<Long> ids = new HashSet<Long>();
188                                for (final Item item : ret) {
189                                        final long kestrelId = item.get_id();
190                                        ids.add(kestrelId);
191                                        final List<Object> deserialize = scheme.deserialize(item.get_data());
192                                        if (deserialize == null)
193                                                continue; // we silently skip null items
194
195                                        final EmitItem e = new EmitItem(deserialize, new KestrelSourceId(clientIndex, kestrelId));
196                                        this.tuples.add(e);
197                                }
198                                // We immediately confirm ALL items. This is the thing that
199                                // makes it unreliable!
200                                client.confirm(this.queue, ids);
201                        } catch (final TException e) {
202                        }
203                        clientIndex++;
204                }
205        }
206
207        @Override
208        public void declareOutputFields(OutputFieldsDeclarer declarer) {
209                declarer.declare(scheme.getOutputFields());
210        }
211
212        private final static Gson gson = new GsonBuilder().create();
213        int acked = 0;
214        private Timer ackTimer;
215
216        @Override
217        public void ack(Object msgId) {
218                if (acked == 0) {
219                        ackTimer = Timer.timer();
220                }
221                acked++;
222                if (acked % 1000 == 0) {
223                        emitToAckQueue();
224                }
225        }
226
227        private void emitToAckQueue() {
228                logger.debug("Acked: " + acked);
229                final float throughput = acked / ((float) ackTimer.duration() / 1000);
230                if (this.ackQueue != null) {
231                        final AckStats stats = new AckStats(throughput);
232                        final KestrelThriftClient client = getNextValidClient();
233                        try {
234                                client.put(this.ackQueue, gson.toJson(stats), 0);
235                        } catch (final TException e) {
236                                logger.error("Failed to write acknowledgement");
237                        }
238                }
239        }
240
241        private KestrelThriftClient getNextValidClient() {
242                return this.ackIterator.next();
243        }
244
245        @Override
246        public void fail(Object msgId) {
247                final KestrelSourceId sourceId = (KestrelSourceId) msgId;
248                logger.debug("Failing: " + sourceId);
249        }
250
251}