001/**
002 * Copyright (c) 2011, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.hadoop.tools.twitter.token.mode.pointwisemi.sort;
031
032import java.io.ByteArrayInputStream;
033import java.io.ByteArrayOutputStream;
034import java.io.DataInputStream;
035import java.io.DataOutputStream;
036import java.io.IOException;
037
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.io.BytesWritable;
040import org.apache.hadoop.io.NullWritable;
041import org.apache.hadoop.io.Text;
042import org.apache.hadoop.mapred.JobConf;
043import org.apache.hadoop.mapreduce.Job;
044import org.apache.hadoop.mapreduce.Mapper;
045import org.apache.hadoop.mapreduce.Reducer;
046import org.openimaj.hadoop.mapreduce.stage.helper.SequenceFileTextStage;
047import org.openimaj.util.pair.IndependentPair;
048
049/**
050 * Sort pairs by PMI within timeperiods
051 * @author Sina Samangooei (ss@ecs.soton.ac.uk)
052 *
053 */
054public class PMIPairSort extends SequenceFileTextStage<BytesWritable,BytesWritable,BytesWritable,BytesWritable,NullWritable,Text>{
055        /**
056         * The minimum PMI
057         */
058        public static final String MINP_KEY = "org.openimaj.hadoop.tools.twitter.token.outputmode.pointwisemi.maxp";
059        /**
060         * the output name
061         */
062        public static final String PMI_NAME = "pmi_sort";
063        /**
064         * The minimum number of pairs
065         */
066        public static final String MINPAIRCOUNT_KEY = "org.openimaj.hadoop.tools.twitter.token.outputmode.pointwisemi.minpaircount";
067        /**
068         * The location of the pairmi
069         */
070        public static final String PAIRMI_LOC = "org.openimaj.hadoop.tools.twitter.token.outputmode.pointwisemi.location";
071        private double minp;
072        private Path outpath;
073        private int minPairCount;
074
075        /**
076         * @param minp the minimum PMI value
077         * @param outpath for loading the PMIStats file
078         */
079        public PMIPairSort(double minp,Path outpath) {
080                this.minp = minp;
081                this.outpath = outpath;
082        }
083        
084        /**
085         * @param minp the minimum PMI value
086         * @param minPairCount the minimum number of pairs to emit
087         * @param outpath for loading the PMIStats file
088         */
089        public PMIPairSort(double minp, int minPairCount,Path outpath) {
090                this.minp = minp;
091                this.outpath = outpath;
092                this.minPairCount = minPairCount;
093        }
094
095        @Override
096        public Class<? extends Mapper<BytesWritable, BytesWritable, BytesWritable, BytesWritable>> mapper() {
097                return PMISortMapper.class;
098        }
099        
100        @Override
101        public Class<? extends Reducer<BytesWritable, BytesWritable, NullWritable,Text>> reducer() {
102                return PMISortReducer.class;
103        }
104        @Override
105        public String outname() {
106                return PMI_NAME;
107        }
108        @Override
109        public void setup(Job job) {
110                job.getConfiguration().setFloat(MINP_KEY, (float) this.minp);
111                job.getConfiguration().setInt(MINPAIRCOUNT_KEY, this.minPairCount);
112                job.getConfiguration().set(PAIRMI_LOC,this.outpath.toString());
113                ((JobConf)job.getConfiguration()).setOutputValueGroupingComparator(PMISortValueGroupingComparator.class);
114                ((JobConf)job.getConfiguration()).setOutputKeyComparatorClass(PMISortKeyComparator.class);
115                job.setPartitionerClass(PMISortPartitioner.class);
116        }
117
118        /**
119         * write time pmi to a byte array
120         * @param timet
121         * @param pmi
122         * @return a byte array encoding of time and pmi
123         * @throws IOException
124         */
125        public static byte[] timePMIBinary(long timet, double pmi) throws IOException {
126                ByteArrayOutputStream baos = new ByteArrayOutputStream();
127                DataOutputStream dos = new DataOutputStream(baos);
128                dos.writeLong(timet);
129                dos.writeDouble(pmi);
130                dos.flush();
131                dos.close();
132                return baos.toByteArray();
133        }
134        
135        /**
136         * read time and pmi from a byte array. class {@link PMIPairSort#parseTimeBinary(byte[], int, int)} with 
137         * start = 0 and len = bytes.length
138         * @param bytes the bytes to parse
139         * @return time and pmi pair
140         * @throws IOException
141         */
142        public static IndependentPair<Long, Double> parseTimeBinary(byte[] bytes) throws IOException {
143                return parseTimeBinary(bytes,0,bytes.length);
144        }
145        
146        /**
147         * use a {@link ByteArrayInputStream} and a {@link DataInputStream} to read a byte[] 
148         * @param bytes
149         * @param start offset into bytes
150         * @param len length to read
151         * @return the time pmi pair
152         * @throws IOException
153         */
154        public static IndependentPair<Long, Double> parseTimeBinary(byte[] bytes,int start, int len) throws IOException {
155                ByteArrayInputStream bais = new ByteArrayInputStream(bytes,start,len);
156                DataInputStream dis = new DataInputStream(bais);
157                return IndependentPair.pair(dis.readLong(), dis.readDouble());
158        }
159}