001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.hadoop.tools.twitter; 031 032import java.util.HashMap; 033import org.apache.hadoop.conf.Configured; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.io.BytesWritable; 036import org.apache.hadoop.io.Text; 037import org.apache.hadoop.mapreduce.Counter; 038import org.apache.hadoop.mapreduce.Job; 039import org.apache.hadoop.mapreduce.Mapper; 040import org.apache.hadoop.mapreduce.Reducer; 041import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; 042import org.apache.hadoop.util.Tool; 043import org.apache.hadoop.util.ToolRunner; 044import org.openimaj.hadoop.mapreduce.TextBytesJobUtil; 045import org.openimaj.hadoop.sequencefile.TextBytesSequenceFileUtility; 046import org.openimaj.hadoop.tools.HadoopToolsUtil; 047 048enum CounterEnum{ 049 CHEESE,FLEES; 050} 051public class HadoopCounterTest extends Configured implements Tool{ 052 053 public static class CounterMapper extends Mapper<Text, BytesWritable, Text, BytesWritable>{ 054 public CounterMapper() { 055 // TODO Auto-generated constructor stub 056 } 057 @Override 058 protected void map(Text key, BytesWritable value, Mapper<Text, BytesWritable, Text, BytesWritable>.Context context) throws java.io.IOException, InterruptedException 059 { 060 context.getCounter(CounterEnum.CHEESE).increment(10); 061 context.getCounter(CounterEnum.FLEES).increment(20); 062 context.write(key, value); 063 } 064 } 065 066 public static class CounterReducer extends Reducer<Text, BytesWritable, Text, BytesWritable>{ 067 public CounterReducer() { 068 // TODO Auto-generated constructor stub 069 } 070 @Override 071 protected void reduce(Text key, Iterable<BytesWritable> values, Context context){ 072 Counter cheeseCounter = context.getCounter(CounterEnum.CHEESE); 073 Counter fleesCounter = context.getCounter(CounterEnum.FLEES); 074 System.out.println(cheeseCounter.getName() + ": " + cheeseCounter.getValue()); 075 System.out.println(fleesCounter.getName() + ": " + fleesCounter.getValue()); 076 077 } 078 } 079 @Override 080 public int run(String[] args) throws Exception { 081// String clusterFileString = options.getInputString(); 082 Path[] paths = new Path[]{new Path(args[0])}; 083 TextBytesSequenceFileUtility util = new TextBytesSequenceFileUtility(paths[0].toUri() , true); 084 HadoopToolsUtil.removeFile(args[1]); 085 Job job = TextBytesJobUtil.createJob(paths, new Path(args[1]), new HashMap<String,String>(), this.getConf()); 086 job.setJarByClass(this.getClass()); 087 job.setMapperClass(CounterMapper.class); 088 job.setReducerClass(CounterReducer.class); 089 090 SequenceFileOutputFormat.setCompressOutput(job, false); 091 long start,end; 092 start = System.currentTimeMillis(); 093 job.waitForCompletion(true); 094 end = System.currentTimeMillis(); 095 System.out.println("Took: " + (end - start) + "ms"); 096 return 0; 097 } 098 099 public static void main(String[] args) throws Exception { 100 ToolRunner.run(new HadoopCounterTest(), args); 101 } 102}