001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.hadoop.tools.twitter; 031 032import java.io.IOException; 033 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.io.LongWritable; 036import org.apache.hadoop.io.NullWritable; 037import org.apache.hadoop.io.Text; 038import org.apache.hadoop.mapreduce.Job; 039import org.apache.hadoop.mapreduce.Mapper; 040import org.kohsuke.args4j.CmdLineException; 041import org.openimaj.hadoop.mapreduce.StageRunner; 042import org.openimaj.hadoop.mapreduce.stage.Stage; 043import org.openimaj.hadoop.mapreduce.stage.helper.TextStage; 044import org.openimaj.hadoop.tools.HadoopToolsUtil; 045 046 047 048/** 049 * A hadoop implementation of twitter preprocessing 050 * 051 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 052 * 053 */ 054public class HadoopTwitterPreprocessingTool extends StageRunner { 055 String[] args; 056 private HadoopTwitterPreprocessingToolOptions options; 057 /** 058 * where arguments are held 059 */ 060 public static final String ARGS_KEY = "twitter.preprocessing.args"; 061 062 063 @Override 064 public Stage<?, ?, ?, ?, ?, ?, ?, ?> stage() { 065 066 return new TextStage(){ 067 @Override 068 public Class<? extends Mapper<LongWritable, Text, NullWritable, Text>> mapper() { 069 return SimpleTwitterPreprocessingMapper.class; 070 } 071 072 @Override 073 public void setup(Job job) { 074 if(options.reducerMode == ReducerModeOption.NULL){ 075 job.setNumReduceTasks(0); 076 } 077 else if(options.reducerMode == ReducerModeOption.IDENTITY){ 078 job.setNumReduceTasks(1); 079 } 080 job.getConfiguration().setStrings(HadoopTwitterPreprocessingTool.ARGS_KEY, HadoopToolsUtil.encodeArgs(args)); 081 } 082 083 @Override 084 public boolean lzoCompress() { 085 return options.lzoCompress; 086 } 087 }; 088 } 089 090 @Override 091 public Path output() { 092 return options.getOutputPath(); 093 } 094 095 @Override 096 public Path[] inputs() throws IOException { 097 return options.getInputPaths(); 098 } 099 100 @Override 101 public void args(String[] args) throws CmdLineException, IOException { 102 this.options = new HadoopTwitterPreprocessingToolOptions(args,true); 103 options.prepare(); 104 this.args = args; 105 } 106 107 @Override 108 public boolean shouldWait() { 109 return !options.returnImmediately; 110 } 111 112 /** 113 * run the tool 114 * @param args 115 * @throws Exception 116 */ 117 public static void main(String[] args) throws Exception { 118 try { 119 new HadoopTwitterPreprocessingTool().runMain(args); 120 } catch (CmdLineException e) { 121 System.err.print(e); 122 } 123 } 124}