001/**
002 * Copyright (c) 2011, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.hadoop.tools.twitter.token.mode.pointwisemi.count;
031
032import java.io.IOException;
033import java.io.PrintWriter;
034import java.util.HashMap;
035import java.util.Map;
036
037import org.apache.hadoop.fs.FileSystem;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.io.BytesWritable;
040import org.apache.hadoop.mapreduce.Reducer;
041import org.apache.log4j.Logger;
042import org.openimaj.hadoop.tools.HadoopToolsUtil;
043import org.openimaj.io.IOUtils;
044
045/**
046 * The input to this reducer is ordered firstly by unary/pairs then within these sets by word
047 * Given a particular time period, first read all unary counts and combine for each word
048 * Then for all pairs, combine pair instances for a given pair then emit onces a new pair or the end is reached
049 * 
050 * Once the first non unary word is found, start counting for a particular word
051 * @author Sina Samangooei (ss@ecs.soton.ac.uk)
052 *
053 */
054public class PairEmitCounter extends Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable> {
055        Map<String,Long> unaryCounts = null;
056        private Path timeperiodCountOutputRoot;
057        Logger logger = Logger.getLogger(PairEmitCounter.class);
058        
059        @Override
060        protected void setup(Reducer<BytesWritable,BytesWritable,BytesWritable,BytesWritable>.Context context) throws IOException ,InterruptedException {
061                this.timeperiodCountOutputRoot = new Path(context.getConfiguration().get(PairMutualInformation.TIMEPERIOD_COUNT_OUTPUT_ROOT));
062                if(!HadoopToolsUtil.getFileSystem(this.timeperiodCountOutputRoot ).mkdirs(this.timeperiodCountOutputRoot)) throw new IOException("Couldn't create: " + this.timeperiodCountOutputRoot);
063        };
064        
065        public PairEmitCounter() {
066                this.unaryCounts = new HashMap<String,Long>();
067        }
068        @Override
069        protected void reduce(BytesWritable timeB, Iterable<BytesWritable> paircounts, Reducer<BytesWritable,BytesWritable,BytesWritable,BytesWritable>.Context context) throws IOException ,InterruptedException {
070                long time = TokenPairCount.timeFromBinaryIdentity(timeB.getBytes());
071                // Prepare the timeperiod output
072                Path timeperiodCountOutput = new Path(timeperiodCountOutputRoot,""+time);
073                
074                long pairsCount = 0;
075                // Start with unary count
076                TokenPairCollector collector = new TokenPairCollector();
077                for (BytesWritable bytesWritable : paircounts) {
078                        TokenPairCount newcount = IOUtils.deserialize(bytesWritable.getBytes(), TokenPairCount.class);
079                        TokenPairCount count = collector.add(newcount);
080                        if(count!=null){
081                                pairsCount += count.paircount;
082                                // this is the combined counts for this unary word in this time period
083                                addUnaryWordCount(count);
084                                // Now check if the current word is a pair, if so next part!
085                                if(collector.isCurrentPair()){
086                                        break;
087                                }
088                        }
089                }
090                
091                writeTimeperiodCount(timeperiodCountOutput,pairsCount);
092                
093                for (BytesWritable bytesWritable : paircounts) {
094                        TokenPairCount newcount = IOUtils.deserialize(bytesWritable.getBytes(), TokenPairCount.class);
095                        if(newcount.isSingle){
096                                // The list was not sorted!
097                                throw new IOException("List of TokenPairCounts was not sorted such that ALL singles appeared before pairs");
098                        }
099                        TokenPairCount count = collector.add(newcount);
100                        if(count != null){
101                                emitPairCount(time,count,context);
102                        }
103                }
104                emitPairCount(time,collector.getCurrent(),context);
105        }
106        private void writeTimeperiodCount(Path timeperiodCountOutput,long pairsCount) throws IOException {
107                FileSystem fs = HadoopToolsUtil.getFileSystem(timeperiodCountOutput);
108                PrintWriter writer = new PrintWriter(fs.create(timeperiodCountOutput));
109                writer.println(pairsCount);
110                writer.flush();
111                writer.close();
112        }
113
114        private void emitPairCount(long time, TokenPairCount currentcount, Reducer<BytesWritable,BytesWritable,BytesWritable,BytesWritable>.Context context) throws IOException, InterruptedException {
115                long tok1count = this.unaryCounts.get(currentcount.firstObject());
116                long tok2count = this.unaryCounts.get(currentcount.secondObject());
117                BytesWritable key = new BytesWritable(currentcount.identifierBinary(time));
118                TokenPairUnaryCount tpuc = new TokenPairUnaryCount(currentcount, tok1count,tok2count);
119                context.write(key, new BytesWritable(IOUtils.serialize(tpuc)));
120        }
121        private void addUnaryWordCount(TokenPairCount currentcount) {
122                this.unaryCounts.put(currentcount.firstObject(), currentcount.paircount);
123        }
124}