001package org.openimaj.picslurper;
002
003import java.io.File;
004import java.io.IOException;
005import java.util.ArrayList;
006import java.util.Iterator;
007import java.util.List;
008
009import org.apache.log4j.Logger;
010import org.kohsuke.args4j.CmdLineException;
011import org.kohsuke.args4j.CmdLineParser;
012import org.kohsuke.args4j.Option;
013import org.kohsuke.args4j.ProxyOptionHandler;
014import org.openimaj.io.FileUtils;
015import org.openimaj.picslurper.output.OutputListener;
016import org.openimaj.picslurper.output.OutputListenerMode;
017import org.openimaj.text.nlp.TweetTokeniserException;
018import org.openimaj.tools.FileToolsUtil;
019import org.openimaj.tools.InOutToolOptions;
020
021import backtype.storm.Config;
022import backtype.storm.LocalCluster;
023import backtype.storm.topology.TopologyBuilder;
024
025/**
026 * A tool for slurping images off twitter
027 * 
028 * @author Sina Samangooei (ss@ecs.soton.ac.uk)
029 * 
030 */
031public class StormPicSlurper extends InOutToolOptions {
032
033        private static Logger logger = Logger.getLogger(StormPicSlurper.class);
034        String[] args;
035        boolean stdin;
036        List<File> inputFiles;
037        boolean stdout;
038        File outputLocation;
039        File globalStatus;
040        Iterator<File> fileIterator;
041        File inputFile;
042        private static final String STATUS_FILE_NAME = "status.txt";
043        /**
044         * System property name for whether twitter console login should be allowed
045         */
046        public static final String ALLOW_CONSOLE_LOGIN = "twitter.console_login";
047
048        @Option(
049                        name = "--encoding",
050                        aliases = "-e",
051                        required = false,
052                        usage = "The outputstreamwriter's text encoding",
053                        metaVar = "STRING")
054        String encoding = "UTF-8";
055
056        @Option(
057                        name = "--no-stats",
058                        aliases = "-ns",
059                        required = false,
060                        usage = "Don't try to keep stats of the tweets seen",
061                        metaVar = "STRING")
062        boolean stats = true;
063
064        @Option(
065                        name = "--no-threads",
066                        aliases = "-j",
067                        required = false,
068                        usage = "Threads used to download images, defaults to n CPUs",
069                        metaVar = "STRING")
070        int nThreads = Runtime.getRuntime().availableProcessors();
071
072        @Option(
073                        name = "--output-listener",
074                        aliases = "-ol",
075                        required = false,
076                        usage = "Add an output listener which gets told about each image downloaded, its location, tweet and url",
077                        handler = ProxyOptionHandler.class,
078                        multiValued = true)
079        List<OutputListenerMode> outputListenerMode = new ArrayList<OutputListenerMode>();
080        List<OutputListener> outputListenerModeOp = new ArrayList<OutputListener>();
081
082        /**
083         * @param args
084         *            tool arguments
085         */
086        public StormPicSlurper(String[] args) {
087                this.args = args;
088        }
089
090        /**
091         * no args
092         */
093        public StormPicSlurper() {
094                this.args = new String[] {};
095        }
096
097        /**
098         * prepare the tool for running
099         */
100        public void prepare() {
101                final CmdLineParser parser = new CmdLineParser(this);
102                try {
103                        parser.parseArgument(args);
104                        this.validate();
105                } catch (final CmdLineException e) {
106                        System.err.println(e.getMessage());
107                        System.err.println("Usage: java -jar JClusterQuantiser.jar [options...] [files...]");
108                        parser.printUsage(System.err);
109                        System.err.println(this.getExtractUsageInfo());
110                        System.exit(1);
111                }
112
113        }
114
115        String getExtractUsageInfo() {
116                return "Grab some images and some stats";
117        }
118
119        void validate() throws CmdLineException {
120                try {
121                        if (FileToolsUtil.isStdin(this)) {
122                                this.stdin = true;
123                        }
124                        else {
125                                this.inputFiles = FileToolsUtil.validateLocalInput(this);
126                                this.fileIterator = this.inputFiles.iterator();
127                        }
128                        if (FileToolsUtil.isStdout(this)) {
129                                this.stdout = true;
130                        }
131                        else
132                        {
133                                this.outputLocation = validateLocalOutput(this.getOutput(), this.isForce(), !this.isContinue());
134                                this.outputLocation.mkdirs();
135                                this.globalStatus = new File(outputLocation, STATUS_FILE_NAME);
136                                // init the output file
137                                PicSlurperUtils.updateStats(this.globalStatus, new StatusConsumption());
138                        }
139                } catch (final Exception e) {
140                        throw new CmdLineException(null, e.getMessage());
141                }
142        }
143
144        /**
145         * Validate the (local) ouput from an String and return the corresponding
146         * file.
147         * 
148         * @param out
149         *            where the file will go
150         * @param overwrite
151         *            whether to overwrite existing files
152         * @param contin
153         *            whether an existing output should be continued (i.e. ignored
154         *            if it exists)
155         * @return the output file location, deleted if it is allowed to be deleted
156         * @throws IOException
157         *             if the file exists, but can't be deleted
158         */
159        public static File validateLocalOutput(String out, boolean overwrite, boolean contin) throws IOException {
160                if (out == null) {
161                        throw new IOException("No output specified");
162                }
163                final File output = new File(out);
164                if (output.exists()) {
165                        if (overwrite) {
166                                if (!FileUtils.deleteRecursive(output))
167                                        throw new IOException("Couldn't delete existing output");
168                        }
169                        else if (!contin) {
170                                throw new IOException("Output already exists, didn't remove");
171                        }
172                }
173                return output;
174        }
175
176        void start() throws IOException, TweetTokeniserException, InterruptedException {
177                final LocalCluster cluster = new LocalCluster();
178                LocalTweetSpout spout = null;
179                if (this.stdin) {
180                        spout = new StdinSpout();
181                }
182                else {
183                        spout = new LocalFileTweetSpout(this.getAllInputs());
184                }
185                final TopologyBuilder builder = new TopologyBuilder();
186                builder.setSpout("stream_spout", spout);
187                // builder.setBolt("print", new
188                // PrintBolt()).shuffleGrouping("stream_spout");
189                builder.setBolt("download",
190                                new DownloadBolt(this.stats, this.globalStatus, this.outputLocation, this.outputListenerModeOp),
191                                this.nThreads).shuffleGrouping("stream_spout");
192
193                final Config conf = new Config();
194                conf.setDebug(false);
195                cluster.submitTopology("urltop", conf, builder.createTopology());
196                while (!LocalTweetSpout.isFinished()) {
197                        Thread.sleep(10000);
198                }
199                logger.debug("TweetSpout says it is finished, shutting down cluster");
200                cluster.shutdown();
201
202        }
203
204        /**
205         * @param args
206         * @throws IOException
207         * @throws TweetTokeniserException
208         * @throws InterruptedException
209         */
210        public static void main(String[] args) throws IOException, TweetTokeniserException, InterruptedException {
211                // Load the config
212                PicSlurper.loadConfig();
213                final StormPicSlurper slurper = new StormPicSlurper(args);
214                slurper.prepare();
215                slurper.start();
216        }
217
218}