001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.hadoop.tools.twitter.token.mode.pointwisemi.sort; 031 032import java.io.ByteArrayInputStream; 033import java.io.ByteArrayOutputStream; 034import java.io.DataInputStream; 035import java.io.DataOutputStream; 036import java.io.IOException; 037 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.io.BytesWritable; 040import org.apache.hadoop.io.NullWritable; 041import org.apache.hadoop.io.Text; 042import org.apache.hadoop.mapred.JobConf; 043import org.apache.hadoop.mapreduce.Job; 044import org.apache.hadoop.mapreduce.Mapper; 045import org.apache.hadoop.mapreduce.Reducer; 046import org.openimaj.hadoop.mapreduce.stage.helper.SequenceFileTextStage; 047import org.openimaj.util.pair.IndependentPair; 048 049/** 050 * Sort pairs by PMI within timeperiods 051 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 052 * 053 */ 054public class PMIPairSort extends SequenceFileTextStage<BytesWritable,BytesWritable,BytesWritable,BytesWritable,NullWritable,Text>{ 055 /** 056 * The minimum PMI 057 */ 058 public static final String MINP_KEY = "org.openimaj.hadoop.tools.twitter.token.outputmode.pointwisemi.maxp"; 059 /** 060 * the output name 061 */ 062 public static final String PMI_NAME = "pmi_sort"; 063 /** 064 * The minimum number of pairs 065 */ 066 public static final String MINPAIRCOUNT_KEY = "org.openimaj.hadoop.tools.twitter.token.outputmode.pointwisemi.minpaircount"; 067 /** 068 * The location of the pairmi 069 */ 070 public static final String PAIRMI_LOC = "org.openimaj.hadoop.tools.twitter.token.outputmode.pointwisemi.location"; 071 private double minp; 072 private Path outpath; 073 private int minPairCount; 074 075 /** 076 * @param minp the minimum PMI value 077 * @param outpath for loading the PMIStats file 078 */ 079 public PMIPairSort(double minp,Path outpath) { 080 this.minp = minp; 081 this.outpath = outpath; 082 } 083 084 /** 085 * @param minp the minimum PMI value 086 * @param minPairCount the minimum number of pairs to emit 087 * @param outpath for loading the PMIStats file 088 */ 089 public PMIPairSort(double minp, int minPairCount,Path outpath) { 090 this.minp = minp; 091 this.outpath = outpath; 092 this.minPairCount = minPairCount; 093 } 094 095 @Override 096 public Class<? extends Mapper<BytesWritable, BytesWritable, BytesWritable, BytesWritable>> mapper() { 097 return PMISortMapper.class; 098 } 099 100 @Override 101 public Class<? extends Reducer<BytesWritable, BytesWritable, NullWritable,Text>> reducer() { 102 return PMISortReducer.class; 103 } 104 @Override 105 public String outname() { 106 return PMI_NAME; 107 } 108 @Override 109 public void setup(Job job) { 110 job.getConfiguration().setFloat(MINP_KEY, (float) this.minp); 111 job.getConfiguration().setInt(MINPAIRCOUNT_KEY, this.minPairCount); 112 job.getConfiguration().set(PAIRMI_LOC,this.outpath.toString()); 113 ((JobConf)job.getConfiguration()).setOutputValueGroupingComparator(PMISortValueGroupingComparator.class); 114 ((JobConf)job.getConfiguration()).setOutputKeyComparatorClass(PMISortKeyComparator.class); 115 job.setPartitionerClass(PMISortPartitioner.class); 116 } 117 118 /** 119 * write time pmi to a byte array 120 * @param timet 121 * @param pmi 122 * @return a byte array encoding of time and pmi 123 * @throws IOException 124 */ 125 public static byte[] timePMIBinary(long timet, double pmi) throws IOException { 126 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 127 DataOutputStream dos = new DataOutputStream(baos); 128 dos.writeLong(timet); 129 dos.writeDouble(pmi); 130 dos.flush(); 131 dos.close(); 132 return baos.toByteArray(); 133 } 134 135 /** 136 * read time and pmi from a byte array. class {@link PMIPairSort#parseTimeBinary(byte[], int, int)} with 137 * start = 0 and len = bytes.length 138 * @param bytes the bytes to parse 139 * @return time and pmi pair 140 * @throws IOException 141 */ 142 public static IndependentPair<Long, Double> parseTimeBinary(byte[] bytes) throws IOException { 143 return parseTimeBinary(bytes,0,bytes.length); 144 } 145 146 /** 147 * use a {@link ByteArrayInputStream} and a {@link DataInputStream} to read a byte[] 148 * @param bytes 149 * @param start offset into bytes 150 * @param len length to read 151 * @return the time pmi pair 152 * @throws IOException 153 */ 154 public static IndependentPair<Long, Double> parseTimeBinary(byte[] bytes,int start, int len) throws IOException { 155 ByteArrayInputStream bais = new ByteArrayInputStream(bytes,start,len); 156 DataInputStream dis = new DataInputStream(bais); 157 return IndependentPair.pair(dis.readLong(), dis.readDouble()); 158 } 159}