001package org.openimaj.picslurper;
002
003import java.io.File;
004import java.util.List;
005import java.util.Map;
006
007import org.openimaj.picslurper.output.OutputListener;
008import org.openimaj.twitter.collection.StreamJSONStatusList.ReadableWritableJSON;
009
010import twitter4j.Status;
011import twitter4j.internal.json.z_T4JInternalJSONImplFactory;
012import twitter4j.internal.org.json.JSONObject;
013import backtype.storm.task.OutputCollector;
014import backtype.storm.task.TopologyContext;
015import backtype.storm.topology.IRichBolt;
016import backtype.storm.topology.OutputFieldsDeclarer;
017import backtype.storm.tuple.Tuple;
018
019/**
020 * A download bolt instantiates {@link StatusConsumer} on {@link Status}
021 * recieved
022 * 
023 * @author Sina Samangooei (ss@ecs.soton.ac.uk)
024 *
025 */
026public class DownloadBolt implements IRichBolt {
027
028        /**
029         *
030         */
031        private static final long serialVersionUID = -6459004768597784620L;
032        private boolean stats;
033        private File globalStats;
034        private File outputLocation;
035        private OutputCollector collector;
036        private z_T4JInternalJSONImplFactory factory;
037        private List<OutputListener> outmodes;
038
039        /**
040         * Information for the {@link StatusConsumer} instances
041         * 
042         * @param stats
043         * @param globalStats
044         * @param outputLocation
045         * @param outmodes
046         */
047        public DownloadBolt(boolean stats, File globalStats, File outputLocation, List<OutputListener> outmodes) {
048                this.stats = stats;
049                this.globalStats = globalStats;
050                this.outputLocation = outputLocation;
051                this.outmodes = outmodes;
052        }
053
054        @SuppressWarnings("rawtypes")
055        @Override
056        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
057                this.collector = collector;
058                factory = new z_T4JInternalJSONImplFactory(null);
059
060        }
061
062        @Override
063        public void execute(Tuple input) {
064                final ReadableWritableJSON json = (ReadableWritableJSON) input.getValue(0);
065                final StatusConsumer consumer = new StatusConsumer(stats, globalStats, outputLocation, outmodes);
066                try {
067                        consumer.consume(factory.createStatus(new JSONObject(json)));
068                        collector.ack(input);
069                } catch (final Exception e) {
070                        collector.fail(input);
071                }
072        }
073
074        @Override
075        public void cleanup() {
076        }
077
078        @Override
079        public void declareOutputFields(OutputFieldsDeclarer declarer) {
080        }
081
082        @Override
083        public Map<String, Object> getComponentConfiguration() {
084                return null;
085        }
086
087}