001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.hadoop.tools.twitter.token.mode.pointwisemi.count; 031 032import java.io.BufferedReader; 033import java.io.IOException; 034import java.io.InputStreamReader; 035import java.util.HashMap; 036import java.util.Map; 037 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.fs.FSDataInputStream; 040import org.apache.hadoop.fs.FSDataOutputStream; 041import org.apache.hadoop.fs.FileStatus; 042import org.apache.hadoop.fs.FileSystem; 043import org.apache.hadoop.fs.Path; 044import org.apache.hadoop.io.BytesWritable; 045import org.apache.hadoop.mapred.JobConf; 046import org.apache.hadoop.mapreduce.Job; 047import org.apache.hadoop.mapreduce.Reducer; 048import org.openimaj.hadoop.mapreduce.stage.helper.TextByteByteStage; 049import org.openimaj.hadoop.tools.HadoopToolsUtil; 050import org.openimaj.hadoop.tools.twitter.HadoopTwitterTokenToolOptions; 051import org.openimaj.io.IOUtils; 052 053/** 054 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 055 * 056 */ 057public class PairMutualInformation extends TextByteByteStage { 058 059 /** 060 * The time delta between time periods 061 */ 062 public static final String TIMEDELTA = "org.openimaj.hadoop.tools.twitter.token.mode.pairwisemi.timedelta"; 063 /** 064 * The location of the statistics file 065 */ 066 public static final String PAIR_STATS_FILE = "pairstats"; 067 /** 068 * The pairMI output directory 069 */ 070 public static final String PAIRMI_DIR = "pairmi"; 071 /** 072 * The root directory where timeperiod pair counts will be stored 073 */ 074 public static final String TIMEPERIOD_COUNT_OUTPUT_ROOT = "org.openimaj.hadoop.tools.twitter.token.mode.pairwisemi.timeoutloc"; 075 /** 076 * Name of the timeperiod count directory 077 */ 078 public static final String TIMEPERIOD_OUTPUT_NAME = "timeperiod_counts"; 079 private String[] nonHadoopArgs; 080 private long timedelta; 081 private Path actualOutputLocation; 082 083 /** 084 * @param nonHadoopArgs 085 * the arguments for configuration 086 * @param timedelta 087 */ 088 public PairMutualInformation(String[] nonHadoopArgs, long timedelta) { 089 this.nonHadoopArgs = nonHadoopArgs; 090 this.timedelta = timedelta; 091 } 092 093 @Override 094 public void setup(Job job) throws IOException { 095 job.getConfiguration().setStrings(HadoopTwitterTokenToolOptions.ARGS_KEY, nonHadoopArgs); 096 job.getConfiguration().setLong(TIMEDELTA, timedelta); 097 final Path tpcOutRoot = new Path(this.actualOutputLocation, TIMEPERIOD_OUTPUT_NAME); 098 job.getConfiguration().set(TIMEPERIOD_COUNT_OUTPUT_ROOT, tpcOutRoot.toString()); 099 if (timedelta != -1) { 100 // if there are multiple times, split a file per day 101 job.setNumReduceTasks(365); 102 } 103 104 ((JobConf) job.getConfiguration()).setOutputValueGroupingComparator(TokenPairValueGroupingComparator.class); 105 ((JobConf) job.getConfiguration()).setOutputKeyComparatorClass(TokenPairKeyComparator.class); 106 job.setPartitionerClass(TokenPairPartitioner.class); 107 } 108 109 @Override 110 public Class<PairEmit> mapper() { 111 return PairEmit.class; 112 } 113 114 @Override 115 public Class<? extends Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>> combiner() { 116 return PairEmitCombiner.class; 117 } 118 119 @Override 120 public Job stage(Path[] inputs, Path output, Configuration conf) throws Exception { 121 this.actualOutputLocation = output; 122 return super.stage(inputs, output, conf); 123 } 124 125 @Override 126 public Class<? extends Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>> reducer() { 127 return PairEmitCounter.class; 128 } 129 130 @Override 131 public String outname() { 132 return PAIRMI_DIR; 133 } 134 135 @Override 136 public void finished(Job job) { 137 final Path out = new Path(actualOutputLocation, PAIR_STATS_FILE); 138 FileSystem fs; 139 try { 140 fs = HadoopToolsUtil.getFileSystem(out); 141 final FSDataOutputStream os = fs.create(out); 142 IOUtils.writeASCII(os, new WritablePairEnum(job.getCounters(), PairEnum.values())); 143 } catch (final IOException e) { 144 } 145 } 146 147 /** 148 * Load the PointwisePMI stats file from an output location (Path: outpath/ 149 * {@link PairMutualInformation#PAIR_STATS_FILE} 150 * 151 * @param outpath 152 * @return a WritablePairEnum instance with the counter values filled 153 * @throws IOException 154 */ 155 public static WritablePairEnum loadStats(Path outpath) throws IOException { 156 Path pmistats = new Path(outpath, PairMutualInformation.PAIRMI_DIR); 157 pmistats = new Path(pmistats, PairMutualInformation.PAIR_STATS_FILE); 158 final FileSystem fs = HadoopToolsUtil.getFileSystem(pmistats); 159 final FSDataInputStream inp = fs.open(pmistats); 160 final WritablePairEnum ret = IOUtils.read(inp, WritablePairEnum.class); 161 return ret; 162 } 163 164 /** 165 * Load the total pairs seen in every time period from the pairmi location 166 * provided 167 * 168 * @param pairmiloc 169 * a directory which contains {@link #PAIRMI_DIR}/ 170 * {@link #TIMEPERIOD_OUTPUT_NAME} 171 * @return map of a time period to a count 172 * @throws IOException 173 */ 174 public static Map<Long, Long> loadTimeCounts(Path pairmiloc) throws IOException { 175 final Path dir = new Path(new Path(pairmiloc, PAIRMI_DIR), TIMEPERIOD_OUTPUT_NAME); 176 final FileSystem fs = HadoopToolsUtil.getFileSystem(dir); 177 final FileStatus[] timePaths = fs.listStatus(dir); 178 179 final Map<Long, Long> out = new HashMap<Long, Long>(); 180 for (final FileStatus fileStatus : timePaths) { 181 final Path fsp = fileStatus.getPath(); 182 final Long time = Long.parseLong(fsp.getName()); 183 final BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(fsp))); 184 final Long count = Long.parseLong(reader.readLine()); 185 out.put(time, count); 186 } 187 return out; 188 } 189 190}