001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.hadoop.tools.twitter.token.outputmode.jacard; 031 032import java.io.BufferedReader; 033import java.io.DataInput; 034import java.io.IOException; 035import java.io.InputStreamReader; 036import java.io.StringWriter; 037import java.util.ArrayList; 038import java.util.LinkedHashMap; 039import org.apache.hadoop.fs.FSDataInputStream; 040import org.apache.hadoop.fs.FileSystem; 041import org.apache.hadoop.fs.Path; 042import org.apache.hadoop.io.BooleanWritable; 043import org.apache.hadoop.io.BytesWritable; 044import org.apache.hadoop.io.LongWritable; 045import org.apache.hadoop.io.NullWritable; 046import org.apache.hadoop.io.Text; 047import org.apache.hadoop.mapreduce.Job; 048import org.apache.hadoop.mapreduce.Mapper; 049import org.apache.hadoop.mapreduce.Reducer; 050import org.openimaj.hadoop.mapreduce.MultiStagedJob; 051import org.openimaj.hadoop.mapreduce.stage.StageAppender; 052import org.openimaj.hadoop.mapreduce.stage.helper.SequenceFileStage; 053import org.openimaj.hadoop.mapreduce.stage.helper.SequenceFileTextStage; 054import org.openimaj.hadoop.tools.HadoopToolsUtil; 055import org.openimaj.hadoop.tools.twitter.utils.WordDFIDF; 056import org.openimaj.io.IOUtils; 057import org.openimaj.io.wrappers.ReadableListBinary; 058 059/** 060 * Count word instances (not occurences) across times. Allows for investigation of how 061 * the vocabulary has changed over time. 062 * 063 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 064 * 065 */ 066public class CumulativeTimeWord extends StageAppender{ 067 private long timeDelta; 068 069 /** 070 * @param timeDelta the delta between times 071 * @param timeEldest the eldest time 072 */ 073 public CumulativeTimeWord(long timeDelta, long timeEldest) { 074 this.timeDelta = timeDelta; 075 this.timeEldest = timeEldest; 076 } 077 private long timeEldest; 078 /** 079 * For every word occurrence, emit <word-time,false> for its time period, and <word-time,true> for every time period from 080 * timePeriod + delta until eldestTime. The final time period should be comparing itself to every word ever emitted. 081 * 082 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 083 */ 084 public static class IntersectionUnionMap extends Mapper<Text,BytesWritable,BytesWritable,BooleanWritable>{ 085 private long eldestTime; 086 private long deltaTime; 087 088 public IntersectionUnionMap() { } 089 @Override 090 protected void setup(Mapper<Text,BytesWritable,BytesWritable,BooleanWritable>.Context context) throws IOException ,InterruptedException { 091 this.eldestTime = context.getConfiguration().getLong(TIME_ELDEST, -1); 092 this.deltaTime = context.getConfiguration().getLong(TIME_DELTA, -1); 093 if(eldestTime < 0 || deltaTime < 0){ 094 throw new IOException("Couldn't read reasonable time configurations"); 095 } 096 }; 097 @Override 098 protected void map(final Text word, BytesWritable value, final Mapper<Text,BytesWritable,BytesWritable,BooleanWritable>.Context context) throws java.io.IOException ,InterruptedException { 099 IOUtils.deserialize(value.getBytes(), new ReadableListBinary<Object>(new ArrayList<Object>()){ 100 private BooleanWritable TRUE_WRITEABLE = new BooleanWritable(true); 101 private BooleanWritable FALSE_WRITEABLE = new BooleanWritable(false); 102 103 @Override 104 protected Object readValue(DataInput in) throws IOException { 105 WordDFIDF idf = new WordDFIDF(); 106 idf.readBinary(in); 107 try { 108 String currentword = word.toString(); 109 ReadWritableStringLong timeWordPair = new ReadWritableStringLong(currentword, idf.timeperiod); 110 context.write(new BytesWritable(IOUtils.serialize(timeWordPair)),FALSE_WRITEABLE ); 111 for (long futureTime = idf.timeperiod + deltaTime; futureTime <= eldestTime; futureTime+=deltaTime) { 112 ReadWritableStringLong futurePair = new ReadWritableStringLong(currentword, futureTime); 113 context.write(new BytesWritable(IOUtils.serialize(futurePair)),TRUE_WRITEABLE ); 114 } 115 } catch (InterruptedException e) { 116 throw new IOException(""); 117 } 118 return new Object(); 119 } 120 }); 121 }; 122 } 123 124 /** 125 * Recieve every word-time either from the current time period or from past time periods. 126 * Has this word appeared either in the past and now? intersection == 1 127 * Has this word appeared both in the past and now? union == 1 128 * 129 * emit the time period with the length of the union set, the length of the intersection set and the ratio of these two (The Jacard Index) 130 * 131 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 132 * 133 */ 134 public static class IntersectionUnionReduce extends Reducer<BytesWritable,BooleanWritable,LongWritable,BytesWritable>{ 135 public IntersectionUnionReduce() {} 136 @Override 137 protected void reduce(BytesWritable wordtimeb, Iterable<BooleanWritable> wordBools, Reducer<BytesWritable,BooleanWritable,LongWritable,BytesWritable>.Context context) throws IOException ,InterruptedException { 138 ReadWritableStringLong wordtime = IOUtils.deserialize(wordtimeb.getBytes(), ReadWritableStringLong.class); 139 long time = wordtime.secondObject(); 140 boolean seenInPresent = false; 141 boolean seenInPast = false; 142 for (BooleanWritable isfrompast: wordBools) { 143 boolean frompast = isfrompast.get(); 144 seenInPresent |= !frompast; 145 seenInPast |= frompast; 146 if(seenInPast && seenInPresent){ 147 // then we've seen all the ones from this time if we were to see them, so we can break early. MASSIVE SAVINGS HERE 148 break; 149 } 150 } 151 ReadWritableBooleanBoolean intersectionUnion = new ReadWritableBooleanBoolean(seenInPast && seenInPresent,seenInPast || seenInPresent); 152 context.write(new LongWritable(time), new BytesWritable(IOUtils.serialize(intersectionUnion))); 153 }; 154 } 155 156 /** 157 * 158 * 159 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 160 * 161 */ 162 public static class JacardReduce extends Reducer<LongWritable,BytesWritable,NullWritable,Text>{ 163 public JacardReduce () {} 164 @Override 165 protected void reduce(LongWritable time, Iterable<BytesWritable> inersectionUnionBs, Reducer<LongWritable,BytesWritable,NullWritable,Text>.Context context) throws IOException ,InterruptedException { 166 long intersection = 0; 167 long union = 0; 168 for (BytesWritable intersectionUnionb : inersectionUnionBs) { 169 ReadWritableBooleanBoolean intersectionUnion = IOUtils.deserialize(intersectionUnionb.getBytes(), ReadWritableBooleanBoolean.class); 170 intersection += intersectionUnion.firstObject() ? 1 : 0; 171 union += intersectionUnion.secondObject() ? 1 : 0; 172 } 173 JacardIndex jind = new JacardIndex(time.get(),intersection,union); 174 StringWriter writer = new StringWriter(); 175 IOUtils.writeASCII(writer, jind); 176 context.write(NullWritable.get(), new Text(writer.toString())); 177 }; 178 } 179 180 protected static final String TIME_DELTA = "org.openimaj.hadoop.tools.twitter.token.time_delta"; 181 protected static final String TIME_ELDEST = "org.openimaj.hadoop.tools.twitter.token.time_eldest"; 182 @Override 183 public void stage(MultiStagedJob stages) { 184 SequenceFileStage<Text, BytesWritable, BytesWritable, BooleanWritable, LongWritable, BytesWritable> intersectionunion = 185 new SequenceFileStage<Text, BytesWritable, BytesWritable, BooleanWritable, LongWritable, BytesWritable>() { 186 @Override 187 public void setup(Job job) { 188 job.getConfiguration().setLong(CumulativeTimeWord.TIME_DELTA, timeDelta); 189 job.getConfiguration().setLong(CumulativeTimeWord.TIME_ELDEST, timeEldest); 190 job.setNumReduceTasks((int) (1.75 * 6 * 8)); 191 } 192 @Override 193 public java.lang.Class<? extends org.apache.hadoop.mapreduce.Mapper<Text,BytesWritable,BytesWritable,BooleanWritable>> mapper() { 194 return CumulativeTimeWord.IntersectionUnionMap.class; 195 }; 196 @Override 197 public Class<? extends Reducer<BytesWritable, BooleanWritable, LongWritable, BytesWritable>> reducer() { 198 return CumulativeTimeWord.IntersectionUnionReduce.class; 199 } 200 201 @Override 202 public String outname() { 203 return "intersectionunion"; 204 } 205 }; 206 stages.queueStage(intersectionunion); 207 SequenceFileTextStage<LongWritable, BytesWritable, LongWritable, BytesWritable, NullWritable, Text> s = 208 new SequenceFileTextStage<LongWritable, BytesWritable, LongWritable, BytesWritable, NullWritable, Text>() { 209 @Override 210 public void setup(Job job) { 211 job.setNumReduceTasks((int) (1.75 * 6 * 8)); 212 } 213 214 @Override 215 public Class<? extends Reducer<LongWritable, BytesWritable, NullWritable, Text>> reducer() { 216 return CumulativeTimeWord.JacardReduce.class; 217 } 218 219 @Override 220 public String outname() { 221 return "jacardindex"; 222 } 223 }; 224 stages.queueStage(s); 225 } 226 227 /** 228 * from a report output path get the words 229 * @param path report output path 230 * @return map of time to an a pair containing <count, JacardIndex> 231 * @throws IOException 232 */ 233 public static LinkedHashMap<Long, JacardIndex> readTimeCountLines(String path) throws IOException { 234 String wordPath = path + "/jacardindex"; 235 Path p = HadoopToolsUtil.getInputPaths(wordPath)[0]; 236 FileSystem fs = HadoopToolsUtil.getFileSystem(p); 237 FSDataInputStream toRead = fs.open(p); 238 BufferedReader reader = new BufferedReader(new InputStreamReader(toRead)); 239 LinkedHashMap<Long, JacardIndex> toRet = new LinkedHashMap<Long, JacardIndex>(); 240 String next = null; 241 while((next = reader.readLine())!=null){ 242 JacardIndex jindex = JacardIndex.fromString(next); 243 toRet.put(jindex.time, jindex); 244 } 245 return toRet; 246 } 247 248}