001/**
002 * Copyright (c) 2011, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.hadoop.tools.twitter;
031
032import java.io.IOException;
033import java.io.StringReader;
034import java.util.Arrays;
035import java.util.List;
036
037import org.apache.commons.lang.ArrayUtils;
038import org.apache.hadoop.util.ToolRunner;
039import org.kohsuke.args4j.CmdLineException;
040import org.kohsuke.args4j.CmdLineParser;
041import org.kohsuke.args4j.Option;
042import org.kohsuke.args4j.ProxyOptionHandler;
043import org.kohsuke.args4j.util.ArgsUtil;
044import org.openimaj.hadoop.sequencefile.SequenceFileUtility;
045import org.openimaj.hadoop.tools.HadoopToolsUtil;
046import org.openimaj.hadoop.tools.twitter.token.mode.TwitterTokenMode;
047import org.openimaj.hadoop.tools.twitter.token.mode.TwitterTokenModeOption;
048import org.openimaj.hadoop.tools.twitter.token.outputmode.TwitterTokenOutputMode;
049import org.openimaj.hadoop.tools.twitter.token.outputmode.TwitterTokenOutputModeOption;
050import org.openimaj.io.IOUtils;
051import org.openimaj.tools.InOutToolOptions;
052import org.openimaj.tools.twitter.options.StatusType;
053import org.openimaj.twitter.GeneralJSONTwitter;
054import org.openimaj.twitter.USMFStatus;
055
056import com.jayway.jsonpath.JsonPath;
057
058/**
059 * Hadoop specific options for twitter preprocessing
060 * 
061 * @author Sina Samangooei (ss@ecs.soton.ac.uk)
062 * 
063 */
064public class HadoopTwitterTokenToolOptions extends InOutToolOptions {
065        @Option(
066                        name = "--mode",
067                        aliases = "-m",
068                        required = false,
069                        usage = "How should the tweet tokens should be counted and processed.",
070                        handler = ProxyOptionHandler.class,
071                        multiValued = true)
072        TwitterTokenModeOption modeOptions = TwitterTokenModeOption.JUST_OUTPUT;
073        TwitterTokenMode modeOptionsOp = TwitterTokenModeOption.JUST_OUTPUT.getOptions();
074
075        @Option(
076                        name = "--output-mode",
077                        aliases = "-om",
078                        required = false,
079                        usage = "How should tokens be outputted.",
080                        handler = ProxyOptionHandler.class)
081        private TwitterTokenOutputModeOption outputModeOptions = TwitterTokenOutputModeOption.NONE;
082        TwitterTokenOutputMode outputModeOptionsOp = TwitterTokenOutputModeOption.NONE.getOptions();
083
084        @Option(
085                        name = "--json-path",
086                        aliases = "-j",
087                        required = false,
088                        usage = "A JSONPath query defining the field to find tokens to count",
089                        metaVar = "STRING")
090        String tokensJSONPath = "analysis.stemmed";
091
092        @Option(
093                        name = "--json-path-filter",
094                        aliases = "-jf",
095                        required = false,
096                        usage = "Add jsonpath filters, if a given entry passes the filters it is used",
097                        multiValued = true)
098        List<String> jsonPathFilters;
099        private JsonPathFilterSet filters;
100
101        @Option(
102                        name = "--preprocessing-tool",
103                        aliases = "-pp",
104                        required = false,
105                        usage = "Launch an initial stage where the preprocessing tool is used. The input and output values may be ignored",
106                        metaVar = "STRING")
107        private String preprocessingOptions = null;
108
109        @Option(
110                        name = "--status-input-type",
111                        aliases = "-sit",
112                        required = false,
113                        usage = "The type of social media message being consumed")
114        StatusType statusType = StatusType.TWITTER;
115
116        private String[] args;
117
118        private boolean beforeMaps;
119
120        private String[] originalArgs;
121        private JsonPath jsonPath;
122
123        /**
124         * The key in which command line arguments are held for each mapper to read
125         * the options instance
126         */
127        public static final String ARGS_KEY = "TOKEN_ARGS";
128
129        /**
130         * Initialise the options
131         * 
132         * @param args
133         *            the arguments after going through the hadoop tool (i.e. minus
134         *            the -D hadoop arguments)
135         * @param originalArgs
136         *            the original arguments as typed into the command line (useful
137         *            for subhadoop tasks launched)
138         * @param beforeMaps
139         *            whether this job is occuring before the maps
140         * @throws CmdLineException
141         */
142        public HadoopTwitterTokenToolOptions(String[] args, String[] originalArgs, boolean beforeMaps)
143                        throws CmdLineException
144        {
145                this.args = args;
146                this.originalArgs = originalArgs;
147                this.beforeMaps = beforeMaps;
148                if (this.beforeMaps)
149                        this.prepareCL();
150                else
151                        this.prepare();
152        }
153
154        /**
155         * @param args
156         *            Just the arguments (hadoop arguments assumed to be the same)
157         * @throws CmdLineException
158         */
159        public HadoopTwitterTokenToolOptions(String[] args) throws CmdLineException {
160                this(args, args, false);
161        }
162
163        /**
164         * prepare the tool for running (command line version)
165         */
166        public void prepareCL() {
167                final CmdLineParser parser = new CmdLineParser(this);
168                try {
169                        parser.parseArgument(args);
170                        // prepareMultivaluedArgument(modeOptions,TwitterTokenModeOption.JUST_OUTPUT);
171                        // prepareMultivaluedArgument(modeOptionsOp,TwitterTokenModeOption.JUST_OUTPUT.getOptions());
172                        this.validate();
173                } catch (final CmdLineException e) {
174                        System.err.println(e.getMessage());
175                        System.err.println("Usage: java -jar JClusterQuantiser.jar [options...] [files...]");
176                        parser.printUsage(System.err);
177                        System.exit(1);
178                }
179        }
180
181        /**
182         * @throws CmdLineException
183         */
184        public void prepare() throws CmdLineException {
185                final CmdLineParser parser = new CmdLineParser(this);
186                parser.parseArgument(args);
187                // prepareMultivaluedArgument(modeOptions,TwitterTokenModeOption.JUST_OUTPUT);
188                // prepareMultivaluedArgument(modeOptionsOp,TwitterTokenModeOption.JUST_OUTPUT.getOptions());
189                // System.out.println(Arrays.toString(args));
190                // System.out.println("Number of mode options: " + modeOptions.size());
191                this.validate();
192        }
193
194        private void validate() throws CmdLineException {
195                if (this.beforeMaps)
196                {
197                        HadoopToolsUtil.validateInput(this);
198                        if (!noOutput())
199                                HadoopToolsUtil.validateOutput(this);
200                }
201                jsonPath = JsonPath.compile(getJsonPath());
202        }
203
204        /**
205         * @return is there any actual output this phase
206         */
207        public boolean noOutput() {
208
209                return (this.modeOptions == TwitterTokenModeOption.JUST_OUTPUT);
210        }
211
212        // /**
213        // * @return the delta between time windows in minutes
214        // */
215        // public long getTimeDelta() {
216        // return this.timeDelta;
217        // }
218
219        /**
220         * @return the JSONPath query used to extract tokens
221         */
222        public String getJsonPath() {
223                return this.tokensJSONPath;
224        }
225
226        /**
227         * @return the original arguments including the hadoop arguments
228         */
229        public String[] getArgs() {
230                return this.originalArgs;
231        }
232
233        /**
234         * @return the arguments minus the hadoop arguments
235         */
236        public String[] getNonHadoopArgs() {
237                // return this.args;
238                try {
239                        return ArgsUtil.extractArguments(this);
240                } catch (final Exception e) {
241                        e.printStackTrace();
242                        return new String[0];
243                }
244        }
245
246        /**
247         * @param mode
248         *            output a completed token mode
249         * @throws Exception
250         */
251        public void output(TwitterTokenMode mode) throws Exception {
252                this.outputModeOptionsOp.write(this, mode);
253        }
254
255        /**
256         * If there were any preprocessing arguments, perform the preprocessing and
257         * use the preprocessing output as the input to the rest of the process.
258         * 
259         * @throws Exception
260         */
261        public void performPreprocessing() throws Exception {
262                if (noOutput())
263                        return;
264                if (this.preprocessingOptions == null)
265                        return;
266
267                final String output = this.getOutput() + "/preprocessing";
268                final boolean outExists = HadoopToolsUtil.fileExists(output);
269                if (!outExists || // if the file doesn't exist
270                                SequenceFileUtility.getFilePaths(output, "part").length == 0 // or
271                                                                                                                                                                // no
272                                                                                                                                                                // part
273                                                                                                                                                                // file
274                                                                                                                                                                // was
275                                                                                                                                                                // found
276                )
277                {
278                        // if the file exists, the part file was not found, remove the file!
279                        if (outExists) {
280                                HadoopToolsUtil.removeFile(output);
281                        }
282                        String inputPart = "";
283                        if (this.getInputFile() != null) {
284                                inputPart = "-if " + this.getInputFile();
285                        }
286                        else {
287                                inputPart = "-i " + this.getInput();
288                        }
289                        this.preprocessingOptions = inputPart + " -o " + output + " " + preprocessingOptions;
290                        final String[] hadoopArgs = Arrays.copyOf(this.originalArgs, this.originalArgs.length - this.args.length);
291                        if (this.isForce())
292                                this.preprocessingOptions += " -rm";
293                        String[] preprocessingArgs = this.preprocessingOptions.split(" ");
294                        preprocessingArgs = (String[]) ArrayUtils.addAll(hadoopArgs, preprocessingArgs);
295                        ToolRunner.run(new HadoopTwitterPreprocessingTool(), preprocessingArgs);
296                }
297                else {
298                        System.out.println("Preprocessing exists, using...");
299                }
300                this.setInput(output);
301                this.statusType = StatusType.USMF;
302                return;
303
304        }
305
306        public JsonPathFilterSet getFilters() {
307                if (this.filters == null) {
308                        this.filters = new JsonPathFilterSet(jsonPathFilters);
309                }
310                return this.filters;
311        }
312
313        public USMFStatus readStatus(String svalue) throws IOException {
314                final USMFStatus status = IOUtils.read(new StringReader(svalue), new USMFStatus(GeneralJSONTwitter.class));
315                // TwitterStatus status = TwitterStatus.fromString(svalue);
316                if (status.isInvalid())
317                        throw new IOException("Invalid tweet");
318                return status;
319        }
320
321        /**
322         * Read json from text and try to extract the part to the type required
323         * 
324         * @param <T>
325         * @param svalue
326         * @return a part of type T
327         * @throws IOException
328         */
329        @SuppressWarnings("unchecked")
330        public <T> T readStatusPart(String svalue) throws IOException {
331
332                if (this.filters != null && !this.filters.filter(svalue))
333                        return null;
334                final Object tokens = this.jsonPath.read(svalue);
335                if (tokens == null) {
336                        return null;
337                }
338                try {
339                        return (T) tokens;
340                } catch (final Throwable e) {
341                        throw new IOException("Couldn't cast to type");
342                }
343        }
344
345        public StatusType getStatusType() {
346                return this.statusType;
347        }
348}