001package org.openimaj.picslurper; 002 003import java.io.File; 004import java.io.IOException; 005import java.util.ArrayList; 006import java.util.Iterator; 007import java.util.List; 008 009import org.apache.log4j.Logger; 010import org.kohsuke.args4j.CmdLineException; 011import org.kohsuke.args4j.CmdLineParser; 012import org.kohsuke.args4j.Option; 013import org.kohsuke.args4j.ProxyOptionHandler; 014import org.openimaj.io.FileUtils; 015import org.openimaj.picslurper.output.OutputListener; 016import org.openimaj.picslurper.output.OutputListenerMode; 017import org.openimaj.text.nlp.TweetTokeniserException; 018import org.openimaj.tools.FileToolsUtil; 019import org.openimaj.tools.InOutToolOptions; 020 021import backtype.storm.Config; 022import backtype.storm.LocalCluster; 023import backtype.storm.topology.TopologyBuilder; 024 025/** 026 * A tool for slurping images off twitter 027 * 028 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 029 * 030 */ 031public class StormPicSlurper extends InOutToolOptions { 032 033 private static Logger logger = Logger.getLogger(StormPicSlurper.class); 034 String[] args; 035 boolean stdin; 036 List<File> inputFiles; 037 boolean stdout; 038 File outputLocation; 039 File globalStatus; 040 Iterator<File> fileIterator; 041 File inputFile; 042 private static final String STATUS_FILE_NAME = "status.txt"; 043 /** 044 * System property name for whether twitter console login should be allowed 045 */ 046 public static final String ALLOW_CONSOLE_LOGIN = "twitter.console_login"; 047 048 @Option( 049 name = "--encoding", 050 aliases = "-e", 051 required = false, 052 usage = "The outputstreamwriter's text encoding", 053 metaVar = "STRING") 054 String encoding = "UTF-8"; 055 056 @Option( 057 name = "--no-stats", 058 aliases = "-ns", 059 required = false, 060 usage = "Don't try to keep stats of the tweets seen", 061 metaVar = "STRING") 062 boolean stats = true; 063 064 @Option( 065 name = "--no-threads", 066 aliases = "-j", 067 required = false, 068 usage = "Threads used to download images, defaults to n CPUs", 069 metaVar = "STRING") 070 int nThreads = Runtime.getRuntime().availableProcessors(); 071 072 @Option( 073 name = "--output-listener", 074 aliases = "-ol", 075 required = false, 076 usage = "Add an output listener which gets told about each image downloaded, its location, tweet and url", 077 handler = ProxyOptionHandler.class, 078 multiValued = true) 079 List<OutputListenerMode> outputListenerMode = new ArrayList<OutputListenerMode>(); 080 List<OutputListener> outputListenerModeOp = new ArrayList<OutputListener>(); 081 082 /** 083 * @param args 084 * tool arguments 085 */ 086 public StormPicSlurper(String[] args) { 087 this.args = args; 088 } 089 090 /** 091 * no args 092 */ 093 public StormPicSlurper() { 094 this.args = new String[] {}; 095 } 096 097 /** 098 * prepare the tool for running 099 */ 100 public void prepare() { 101 final CmdLineParser parser = new CmdLineParser(this); 102 try { 103 parser.parseArgument(args); 104 this.validate(); 105 } catch (final CmdLineException e) { 106 System.err.println(e.getMessage()); 107 System.err.println("Usage: java -jar JClusterQuantiser.jar [options...] [files...]"); 108 parser.printUsage(System.err); 109 System.err.println(this.getExtractUsageInfo()); 110 System.exit(1); 111 } 112 113 } 114 115 String getExtractUsageInfo() { 116 return "Grab some images and some stats"; 117 } 118 119 void validate() throws CmdLineException { 120 try { 121 if (FileToolsUtil.isStdin(this)) { 122 this.stdin = true; 123 } 124 else { 125 this.inputFiles = FileToolsUtil.validateLocalInput(this); 126 this.fileIterator = this.inputFiles.iterator(); 127 } 128 if (FileToolsUtil.isStdout(this)) { 129 this.stdout = true; 130 } 131 else 132 { 133 this.outputLocation = validateLocalOutput(this.getOutput(), this.isForce(), !this.isContinue()); 134 this.outputLocation.mkdirs(); 135 this.globalStatus = new File(outputLocation, STATUS_FILE_NAME); 136 // init the output file 137 PicSlurperUtils.updateStats(this.globalStatus, new StatusConsumption()); 138 } 139 } catch (final Exception e) { 140 throw new CmdLineException(null, e.getMessage()); 141 } 142 } 143 144 /** 145 * Validate the (local) ouput from an String and return the corresponding 146 * file. 147 * 148 * @param out 149 * where the file will go 150 * @param overwrite 151 * whether to overwrite existing files 152 * @param contin 153 * whether an existing output should be continued (i.e. ignored 154 * if it exists) 155 * @return the output file location, deleted if it is allowed to be deleted 156 * @throws IOException 157 * if the file exists, but can't be deleted 158 */ 159 public static File validateLocalOutput(String out, boolean overwrite, boolean contin) throws IOException { 160 if (out == null) { 161 throw new IOException("No output specified"); 162 } 163 final File output = new File(out); 164 if (output.exists()) { 165 if (overwrite) { 166 if (!FileUtils.deleteRecursive(output)) 167 throw new IOException("Couldn't delete existing output"); 168 } 169 else if (!contin) { 170 throw new IOException("Output already exists, didn't remove"); 171 } 172 } 173 return output; 174 } 175 176 void start() throws IOException, TweetTokeniserException, InterruptedException { 177 final LocalCluster cluster = new LocalCluster(); 178 LocalTweetSpout spout = null; 179 if (this.stdin) { 180 spout = new StdinSpout(); 181 } 182 else { 183 spout = new LocalFileTweetSpout(this.getAllInputs()); 184 } 185 final TopologyBuilder builder = new TopologyBuilder(); 186 builder.setSpout("stream_spout", spout); 187 // builder.setBolt("print", new 188 // PrintBolt()).shuffleGrouping("stream_spout"); 189 builder.setBolt("download", 190 new DownloadBolt(this.stats, this.globalStatus, this.outputLocation, this.outputListenerModeOp), 191 this.nThreads).shuffleGrouping("stream_spout"); 192 193 final Config conf = new Config(); 194 conf.setDebug(false); 195 cluster.submitTopology("urltop", conf, builder.createTopology()); 196 while (!LocalTweetSpout.isFinished()) { 197 Thread.sleep(10000); 198 } 199 logger.debug("TweetSpout says it is finished, shutting down cluster"); 200 cluster.shutdown(); 201 202 } 203 204 /** 205 * @param args 206 * @throws IOException 207 * @throws TweetTokeniserException 208 * @throws InterruptedException 209 */ 210 public static void main(String[] args) throws IOException, TweetTokeniserException, InterruptedException { 211 // Load the config 212 PicSlurper.loadConfig(); 213 final StormPicSlurper slurper = new StormPicSlurper(args); 214 slurper.prepare(); 215 slurper.start(); 216 } 217 218}