001/**
002 * Copyright (c) 2011, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.hadoop.tools.twitter.token.outputmode.jacard;
031
032import java.io.BufferedReader;
033import java.io.DataInput;
034import java.io.IOException;
035import java.io.InputStreamReader;
036import java.io.StringWriter;
037import java.util.ArrayList;
038import java.util.LinkedHashMap;
039import org.apache.hadoop.fs.FSDataInputStream;
040import org.apache.hadoop.fs.FileSystem;
041import org.apache.hadoop.fs.Path;
042import org.apache.hadoop.io.BooleanWritable;
043import org.apache.hadoop.io.BytesWritable;
044import org.apache.hadoop.io.LongWritable;
045import org.apache.hadoop.io.NullWritable;
046import org.apache.hadoop.io.Text;
047import org.apache.hadoop.mapreduce.Job;
048import org.apache.hadoop.mapreduce.Mapper;
049import org.apache.hadoop.mapreduce.Reducer;
050import org.openimaj.hadoop.mapreduce.MultiStagedJob;
051import org.openimaj.hadoop.mapreduce.stage.StageAppender;
052import org.openimaj.hadoop.mapreduce.stage.helper.SequenceFileStage;
053import org.openimaj.hadoop.mapreduce.stage.helper.SequenceFileTextStage;
054import org.openimaj.hadoop.tools.HadoopToolsUtil;
055import org.openimaj.hadoop.tools.twitter.utils.WordDFIDF;
056import org.openimaj.io.IOUtils;
057import org.openimaj.io.wrappers.ReadableListBinary;
058
059/**
060 * Count word instances (not occurences) across times. Allows for investigation of how
061 * the vocabulary has changed over time.
062 * 
063 * @author Sina Samangooei (ss@ecs.soton.ac.uk)
064 *
065 */
066public class CumulativeTimeWord extends StageAppender{
067        private long timeDelta;
068        
069        /**
070         * @param timeDelta the delta between times
071         * @param timeEldest the eldest time
072         */
073        public CumulativeTimeWord(long timeDelta, long timeEldest) {
074                this.timeDelta = timeDelta;
075                this.timeEldest = timeEldest;
076        }
077        private long timeEldest;
078        /**
079         * For every word occurrence, emit <word-time,false> for its time period, and <word-time,true> for every time period from
080         * timePeriod + delta until eldestTime. The final time period should be comparing itself to every word ever emitted.
081         * 
082         * @author Sina Samangooei (ss@ecs.soton.ac.uk)
083         */
084        public static class IntersectionUnionMap extends Mapper<Text,BytesWritable,BytesWritable,BooleanWritable>{
085                private long eldestTime;
086                private long deltaTime;
087                
088                public IntersectionUnionMap() { }
089                @Override
090                protected void setup(Mapper<Text,BytesWritable,BytesWritable,BooleanWritable>.Context context) throws IOException ,InterruptedException {
091                        this.eldestTime = context.getConfiguration().getLong(TIME_ELDEST, -1);
092                        this.deltaTime = context.getConfiguration().getLong(TIME_DELTA, -1);
093                        if(eldestTime < 0 || deltaTime < 0){
094                                throw new IOException("Couldn't read reasonable time configurations");
095                        }
096                };
097                @Override
098                protected void map(final Text word, BytesWritable value, final Mapper<Text,BytesWritable,BytesWritable,BooleanWritable>.Context context) throws java.io.IOException ,InterruptedException {
099                        IOUtils.deserialize(value.getBytes(), new ReadableListBinary<Object>(new ArrayList<Object>()){
100                                private BooleanWritable TRUE_WRITEABLE = new BooleanWritable(true);
101                                private BooleanWritable FALSE_WRITEABLE = new BooleanWritable(false);
102
103                                @Override
104                                protected Object readValue(DataInput in) throws IOException {
105                                        WordDFIDF idf = new WordDFIDF();
106                                        idf.readBinary(in);
107                                        try {
108                                                String currentword = word.toString();
109                                                ReadWritableStringLong timeWordPair = new ReadWritableStringLong(currentword, idf.timeperiod);
110                                                context.write(new BytesWritable(IOUtils.serialize(timeWordPair)),FALSE_WRITEABLE );
111                                                for (long futureTime = idf.timeperiod + deltaTime; futureTime <= eldestTime; futureTime+=deltaTime) {
112                                                        ReadWritableStringLong futurePair = new ReadWritableStringLong(currentword, futureTime);
113                                                        context.write(new BytesWritable(IOUtils.serialize(futurePair)),TRUE_WRITEABLE );
114                                                }
115                                        } catch (InterruptedException e) {
116                                                throw new IOException("");
117                                        }
118                                        return new Object();
119                                }
120                        });
121                };
122        }
123        
124        /**
125         * Recieve every word-time either from the current time period or from past time periods.
126         * Has this word appeared either in the past and now? intersection == 1
127         * Has this word appeared both in the past and now? union == 1
128         * 
129         * emit the time period with the length of the union set, the length of the intersection set and the ratio of these two (The Jacard Index)
130         * 
131         * @author Sina Samangooei (ss@ecs.soton.ac.uk)
132         *
133         */
134        public static class IntersectionUnionReduce extends Reducer<BytesWritable,BooleanWritable,LongWritable,BytesWritable>{
135                public IntersectionUnionReduce() {}
136                @Override
137                protected void reduce(BytesWritable wordtimeb, Iterable<BooleanWritable> wordBools, Reducer<BytesWritable,BooleanWritable,LongWritable,BytesWritable>.Context context) throws IOException ,InterruptedException {
138                        ReadWritableStringLong wordtime = IOUtils.deserialize(wordtimeb.getBytes(), ReadWritableStringLong.class);
139                        long time = wordtime.secondObject();
140                        boolean seenInPresent = false;
141                        boolean seenInPast = false;
142                        for (BooleanWritable isfrompast: wordBools) {
143                                boolean frompast = isfrompast.get();
144                                seenInPresent |= !frompast;
145                                seenInPast |= frompast;
146                                if(seenInPast && seenInPresent){
147                                        // then we've seen all the ones from this time if we were to see them, so we can break early. MASSIVE SAVINGS HERE
148                                        break;
149                                }
150                        }
151                        ReadWritableBooleanBoolean intersectionUnion = new ReadWritableBooleanBoolean(seenInPast && seenInPresent,seenInPast || seenInPresent);
152                        context.write(new LongWritable(time), new BytesWritable(IOUtils.serialize(intersectionUnion)));
153                };
154        }
155        
156        /**
157         * 
158         * 
159         * @author Sina Samangooei (ss@ecs.soton.ac.uk)
160         *
161         */
162        public static class JacardReduce extends Reducer<LongWritable,BytesWritable,NullWritable,Text>{
163                public JacardReduce () {}
164                @Override
165                protected void reduce(LongWritable time, Iterable<BytesWritable> inersectionUnionBs, Reducer<LongWritable,BytesWritable,NullWritable,Text>.Context context) throws IOException ,InterruptedException {
166                        long intersection = 0;
167                        long union = 0;
168                        for (BytesWritable intersectionUnionb : inersectionUnionBs) {                           
169                                ReadWritableBooleanBoolean intersectionUnion = IOUtils.deserialize(intersectionUnionb.getBytes(), ReadWritableBooleanBoolean.class);
170                                intersection += intersectionUnion.firstObject() ? 1 : 0;
171                                union += intersectionUnion.secondObject() ? 1 : 0;
172                        }
173                        JacardIndex jind = new JacardIndex(time.get(),intersection,union);
174                        StringWriter writer = new StringWriter();
175                        IOUtils.writeASCII(writer, jind);
176                        context.write(NullWritable.get(), new Text(writer.toString()));
177                };
178        }
179        
180        protected static final String TIME_DELTA = "org.openimaj.hadoop.tools.twitter.token.time_delta";
181        protected static final String TIME_ELDEST = "org.openimaj.hadoop.tools.twitter.token.time_eldest";
182        @Override
183        public void stage(MultiStagedJob stages) {
184                SequenceFileStage<Text, BytesWritable, BytesWritable, BooleanWritable, LongWritable, BytesWritable> intersectionunion = 
185                new SequenceFileStage<Text, BytesWritable, BytesWritable, BooleanWritable, LongWritable, BytesWritable>() {
186                        @Override
187                        public void setup(Job job) {
188                                job.getConfiguration().setLong(CumulativeTimeWord.TIME_DELTA, timeDelta);
189                                job.getConfiguration().setLong(CumulativeTimeWord.TIME_ELDEST, timeEldest);
190                                job.setNumReduceTasks((int) (1.75 * 6 * 8));
191                        }
192                        @Override
193                        public java.lang.Class<? extends org.apache.hadoop.mapreduce.Mapper<Text,BytesWritable,BytesWritable,BooleanWritable>> mapper() {
194                                return CumulativeTimeWord.IntersectionUnionMap.class;
195                        };
196                        @Override
197                        public Class<? extends Reducer<BytesWritable, BooleanWritable, LongWritable, BytesWritable>> reducer() {
198                                return CumulativeTimeWord.IntersectionUnionReduce.class;
199                        }
200                
201                        @Override
202                        public String outname() {
203                                return "intersectionunion";
204                        }
205                };
206                stages.queueStage(intersectionunion);
207                SequenceFileTextStage<LongWritable, BytesWritable, LongWritable, BytesWritable, NullWritable, Text> s =
208                new SequenceFileTextStage<LongWritable, BytesWritable, LongWritable, BytesWritable, NullWritable, Text>() {
209                        @Override
210                        public void setup(Job job) {
211                                job.setNumReduceTasks((int) (1.75 * 6 * 8));
212                        }
213                        
214                        @Override
215                        public Class<? extends Reducer<LongWritable, BytesWritable, NullWritable, Text>> reducer() {
216                                return CumulativeTimeWord.JacardReduce.class;
217                        }
218                        
219                        @Override
220                        public String outname() {
221                                return "jacardindex";
222                        }
223                };
224                stages.queueStage(s);
225        }
226        
227        /**
228         * from a report output path get the words
229         * @param path report output path
230         * @return map of time to an a pair containing <count, JacardIndex> 
231         * @throws IOException 
232         */
233        public static LinkedHashMap<Long, JacardIndex> readTimeCountLines(String path) throws IOException {
234                String wordPath = path + "/jacardindex";
235                Path p = HadoopToolsUtil.getInputPaths(wordPath)[0];
236                FileSystem fs = HadoopToolsUtil.getFileSystem(p);
237                FSDataInputStream toRead = fs.open(p);
238                BufferedReader reader = new BufferedReader(new InputStreamReader(toRead));
239                LinkedHashMap<Long, JacardIndex> toRet = new LinkedHashMap<Long, JacardIndex>();
240                String next = null;
241                while((next = reader.readLine())!=null){
242                        JacardIndex jindex = JacardIndex.fromString(next);
243                        toRet.put(jindex.time, jindex);
244                }
245                return toRet;
246        }
247
248}