001package org.openimaj.picslurper; 002 003import java.io.File; 004import java.util.List; 005import java.util.Map; 006 007import org.openimaj.picslurper.output.OutputListener; 008import org.openimaj.twitter.collection.StreamJSONStatusList.ReadableWritableJSON; 009 010import twitter4j.Status; 011import twitter4j.internal.json.z_T4JInternalJSONImplFactory; 012import twitter4j.internal.org.json.JSONObject; 013import backtype.storm.task.OutputCollector; 014import backtype.storm.task.TopologyContext; 015import backtype.storm.topology.IRichBolt; 016import backtype.storm.topology.OutputFieldsDeclarer; 017import backtype.storm.tuple.Tuple; 018 019/** 020 * A download bolt instantiates {@link StatusConsumer} on {@link Status} 021 * recieved 022 * 023 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 024 * 025 */ 026public class DownloadBolt implements IRichBolt { 027 028 /** 029 * 030 */ 031 private static final long serialVersionUID = -6459004768597784620L; 032 private boolean stats; 033 private File globalStats; 034 private File outputLocation; 035 private OutputCollector collector; 036 private z_T4JInternalJSONImplFactory factory; 037 private List<OutputListener> outmodes; 038 039 /** 040 * Information for the {@link StatusConsumer} instances 041 * 042 * @param stats 043 * @param globalStats 044 * @param outputLocation 045 * @param outmodes 046 */ 047 public DownloadBolt(boolean stats, File globalStats, File outputLocation, List<OutputListener> outmodes) { 048 this.stats = stats; 049 this.globalStats = globalStats; 050 this.outputLocation = outputLocation; 051 this.outmodes = outmodes; 052 } 053 054 @SuppressWarnings("rawtypes") 055 @Override 056 public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { 057 this.collector = collector; 058 factory = new z_T4JInternalJSONImplFactory(null); 059 060 } 061 062 @Override 063 public void execute(Tuple input) { 064 final ReadableWritableJSON json = (ReadableWritableJSON) input.getValue(0); 065 final StatusConsumer consumer = new StatusConsumer(stats, globalStats, outputLocation, outmodes); 066 try { 067 consumer.consume(factory.createStatus(new JSONObject(json))); 068 collector.ack(input); 069 } catch (final Exception e) { 070 collector.fail(input); 071 } 072 } 073 074 @Override 075 public void cleanup() { 076 } 077 078 @Override 079 public void declareOutputFields(OutputFieldsDeclarer declarer) { 080 } 081 082 @Override 083 public Map<String, Object> getComponentConfiguration() { 084 return null; 085 } 086 087}