001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.hadoop.tools.twitter; 031 032import java.io.IOException; 033import java.io.StringReader; 034import java.util.Arrays; 035import java.util.List; 036 037import org.apache.commons.lang.ArrayUtils; 038import org.apache.hadoop.util.ToolRunner; 039import org.kohsuke.args4j.CmdLineException; 040import org.kohsuke.args4j.CmdLineParser; 041import org.kohsuke.args4j.Option; 042import org.kohsuke.args4j.ProxyOptionHandler; 043import org.kohsuke.args4j.util.ArgsUtil; 044import org.openimaj.hadoop.sequencefile.SequenceFileUtility; 045import org.openimaj.hadoop.tools.HadoopToolsUtil; 046import org.openimaj.hadoop.tools.twitter.token.mode.TwitterTokenMode; 047import org.openimaj.hadoop.tools.twitter.token.mode.TwitterTokenModeOption; 048import org.openimaj.hadoop.tools.twitter.token.outputmode.TwitterTokenOutputMode; 049import org.openimaj.hadoop.tools.twitter.token.outputmode.TwitterTokenOutputModeOption; 050import org.openimaj.io.IOUtils; 051import org.openimaj.tools.InOutToolOptions; 052import org.openimaj.tools.twitter.options.StatusType; 053import org.openimaj.twitter.GeneralJSONTwitter; 054import org.openimaj.twitter.USMFStatus; 055 056import com.jayway.jsonpath.JsonPath; 057 058/** 059 * Hadoop specific options for twitter preprocessing 060 * 061 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 062 * 063 */ 064public class HadoopTwitterTokenToolOptions extends InOutToolOptions { 065 @Option( 066 name = "--mode", 067 aliases = "-m", 068 required = false, 069 usage = "How should the tweet tokens should be counted and processed.", 070 handler = ProxyOptionHandler.class, 071 multiValued = true) 072 TwitterTokenModeOption modeOptions = TwitterTokenModeOption.JUST_OUTPUT; 073 TwitterTokenMode modeOptionsOp = TwitterTokenModeOption.JUST_OUTPUT.getOptions(); 074 075 @Option( 076 name = "--output-mode", 077 aliases = "-om", 078 required = false, 079 usage = "How should tokens be outputted.", 080 handler = ProxyOptionHandler.class) 081 private TwitterTokenOutputModeOption outputModeOptions = TwitterTokenOutputModeOption.NONE; 082 TwitterTokenOutputMode outputModeOptionsOp = TwitterTokenOutputModeOption.NONE.getOptions(); 083 084 @Option( 085 name = "--json-path", 086 aliases = "-j", 087 required = false, 088 usage = "A JSONPath query defining the field to find tokens to count", 089 metaVar = "STRING") 090 String tokensJSONPath = "analysis.stemmed"; 091 092 @Option( 093 name = "--json-path-filter", 094 aliases = "-jf", 095 required = false, 096 usage = "Add jsonpath filters, if a given entry passes the filters it is used", 097 multiValued = true) 098 List<String> jsonPathFilters; 099 private JsonPathFilterSet filters; 100 101 @Option( 102 name = "--preprocessing-tool", 103 aliases = "-pp", 104 required = false, 105 usage = "Launch an initial stage where the preprocessing tool is used. The input and output values may be ignored", 106 metaVar = "STRING") 107 private String preprocessingOptions = null; 108 109 @Option( 110 name = "--status-input-type", 111 aliases = "-sit", 112 required = false, 113 usage = "The type of social media message being consumed") 114 StatusType statusType = StatusType.TWITTER; 115 116 private String[] args; 117 118 private boolean beforeMaps; 119 120 private String[] originalArgs; 121 private JsonPath jsonPath; 122 123 /** 124 * The key in which command line arguments are held for each mapper to read 125 * the options instance 126 */ 127 public static final String ARGS_KEY = "TOKEN_ARGS"; 128 129 /** 130 * Initialise the options 131 * 132 * @param args 133 * the arguments after going through the hadoop tool (i.e. minus 134 * the -D hadoop arguments) 135 * @param originalArgs 136 * the original arguments as typed into the command line (useful 137 * for subhadoop tasks launched) 138 * @param beforeMaps 139 * whether this job is occuring before the maps 140 * @throws CmdLineException 141 */ 142 public HadoopTwitterTokenToolOptions(String[] args, String[] originalArgs, boolean beforeMaps) 143 throws CmdLineException 144 { 145 this.args = args; 146 this.originalArgs = originalArgs; 147 this.beforeMaps = beforeMaps; 148 if (this.beforeMaps) 149 this.prepareCL(); 150 else 151 this.prepare(); 152 } 153 154 /** 155 * @param args 156 * Just the arguments (hadoop arguments assumed to be the same) 157 * @throws CmdLineException 158 */ 159 public HadoopTwitterTokenToolOptions(String[] args) throws CmdLineException { 160 this(args, args, false); 161 } 162 163 /** 164 * prepare the tool for running (command line version) 165 */ 166 public void prepareCL() { 167 final CmdLineParser parser = new CmdLineParser(this); 168 try { 169 parser.parseArgument(args); 170 // prepareMultivaluedArgument(modeOptions,TwitterTokenModeOption.JUST_OUTPUT); 171 // prepareMultivaluedArgument(modeOptionsOp,TwitterTokenModeOption.JUST_OUTPUT.getOptions()); 172 this.validate(); 173 } catch (final CmdLineException e) { 174 System.err.println(e.getMessage()); 175 System.err.println("Usage: java -jar JClusterQuantiser.jar [options...] [files...]"); 176 parser.printUsage(System.err); 177 System.exit(1); 178 } 179 } 180 181 /** 182 * @throws CmdLineException 183 */ 184 public void prepare() throws CmdLineException { 185 final CmdLineParser parser = new CmdLineParser(this); 186 parser.parseArgument(args); 187 // prepareMultivaluedArgument(modeOptions,TwitterTokenModeOption.JUST_OUTPUT); 188 // prepareMultivaluedArgument(modeOptionsOp,TwitterTokenModeOption.JUST_OUTPUT.getOptions()); 189 // System.out.println(Arrays.toString(args)); 190 // System.out.println("Number of mode options: " + modeOptions.size()); 191 this.validate(); 192 } 193 194 private void validate() throws CmdLineException { 195 if (this.beforeMaps) 196 { 197 HadoopToolsUtil.validateInput(this); 198 if (!noOutput()) 199 HadoopToolsUtil.validateOutput(this); 200 } 201 jsonPath = JsonPath.compile(getJsonPath()); 202 } 203 204 /** 205 * @return is there any actual output this phase 206 */ 207 public boolean noOutput() { 208 209 return (this.modeOptions == TwitterTokenModeOption.JUST_OUTPUT); 210 } 211 212 // /** 213 // * @return the delta between time windows in minutes 214 // */ 215 // public long getTimeDelta() { 216 // return this.timeDelta; 217 // } 218 219 /** 220 * @return the JSONPath query used to extract tokens 221 */ 222 public String getJsonPath() { 223 return this.tokensJSONPath; 224 } 225 226 /** 227 * @return the original arguments including the hadoop arguments 228 */ 229 public String[] getArgs() { 230 return this.originalArgs; 231 } 232 233 /** 234 * @return the arguments minus the hadoop arguments 235 */ 236 public String[] getNonHadoopArgs() { 237 // return this.args; 238 try { 239 return ArgsUtil.extractArguments(this); 240 } catch (final Exception e) { 241 e.printStackTrace(); 242 return new String[0]; 243 } 244 } 245 246 /** 247 * @param mode 248 * output a completed token mode 249 * @throws Exception 250 */ 251 public void output(TwitterTokenMode mode) throws Exception { 252 this.outputModeOptionsOp.write(this, mode); 253 } 254 255 /** 256 * If there were any preprocessing arguments, perform the preprocessing and 257 * use the preprocessing output as the input to the rest of the process. 258 * 259 * @throws Exception 260 */ 261 public void performPreprocessing() throws Exception { 262 if (noOutput()) 263 return; 264 if (this.preprocessingOptions == null) 265 return; 266 267 final String output = this.getOutput() + "/preprocessing"; 268 final boolean outExists = HadoopToolsUtil.fileExists(output); 269 if (!outExists || // if the file doesn't exist 270 SequenceFileUtility.getFilePaths(output, "part").length == 0 // or 271 // no 272 // part 273 // file 274 // was 275 // found 276 ) 277 { 278 // if the file exists, the part file was not found, remove the file! 279 if (outExists) { 280 HadoopToolsUtil.removeFile(output); 281 } 282 String inputPart = ""; 283 if (this.getInputFile() != null) { 284 inputPart = "-if " + this.getInputFile(); 285 } 286 else { 287 inputPart = "-i " + this.getInput(); 288 } 289 this.preprocessingOptions = inputPart + " -o " + output + " " + preprocessingOptions; 290 final String[] hadoopArgs = Arrays.copyOf(this.originalArgs, this.originalArgs.length - this.args.length); 291 if (this.isForce()) 292 this.preprocessingOptions += " -rm"; 293 String[] preprocessingArgs = this.preprocessingOptions.split(" "); 294 preprocessingArgs = (String[]) ArrayUtils.addAll(hadoopArgs, preprocessingArgs); 295 ToolRunner.run(new HadoopTwitterPreprocessingTool(), preprocessingArgs); 296 } 297 else { 298 System.out.println("Preprocessing exists, using..."); 299 } 300 this.setInput(output); 301 this.statusType = StatusType.USMF; 302 return; 303 304 } 305 306 public JsonPathFilterSet getFilters() { 307 if (this.filters == null) { 308 this.filters = new JsonPathFilterSet(jsonPathFilters); 309 } 310 return this.filters; 311 } 312 313 public USMFStatus readStatus(String svalue) throws IOException { 314 final USMFStatus status = IOUtils.read(new StringReader(svalue), new USMFStatus(GeneralJSONTwitter.class)); 315 // TwitterStatus status = TwitterStatus.fromString(svalue); 316 if (status.isInvalid()) 317 throw new IOException("Invalid tweet"); 318 return status; 319 } 320 321 /** 322 * Read json from text and try to extract the part to the type required 323 * 324 * @param <T> 325 * @param svalue 326 * @return a part of type T 327 * @throws IOException 328 */ 329 @SuppressWarnings("unchecked") 330 public <T> T readStatusPart(String svalue) throws IOException { 331 332 if (this.filters != null && !this.filters.filter(svalue)) 333 return null; 334 final Object tokens = this.jsonPath.read(svalue); 335 if (tokens == null) { 336 return null; 337 } 338 try { 339 return (T) tokens; 340 } catch (final Throwable e) { 341 throw new IOException("Couldn't cast to type"); 342 } 343 } 344 345 public StatusType getStatusType() { 346 return this.statusType; 347 } 348}