001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.hadoop.tools.twitter.token.mode.pointwisemi.count; 031 032import java.io.IOException; 033import java.util.List; 034 035import org.apache.hadoop.io.BytesWritable; 036import org.apache.hadoop.io.LongWritable; 037import org.apache.hadoop.io.Text; 038import org.apache.hadoop.mapreduce.Mapper; 039import org.apache.log4j.Logger; 040import org.joda.time.DateTime; 041import org.kohsuke.args4j.CmdLineException; 042import org.openimaj.hadoop.tools.twitter.HadoopTwitterTokenToolOptions; 043import org.openimaj.io.IOUtils; 044import org.openimaj.twitter.USMFStatus; 045 046/** 047 * For each pair of tokens in a given document emit a count. Also defines a combiner 048 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 049 * 050 */ 051public class PairEmit extends Mapper<LongWritable, Text, BytesWritable, BytesWritable> { 052 053 /** 054 * The string which splits times and places. Constructed to be unlikely to be an actual token (words and punctuation) 055 */ 056 private static final long DEFAULT_TIME = -1; 057 private static HadoopTwitterTokenToolOptions options; 058 private static long timeDeltaMillis = DEFAULT_TIME; 059 Logger logger = Logger.getLogger(PairEmit.class); 060 061 protected static synchronized void loadOptions(Mapper<LongWritable, Text, BytesWritable, BytesWritable>.Context context) throws IOException { 062 if (options == null) { 063 try { 064 options = new HadoopTwitterTokenToolOptions(context.getConfiguration().getStrings(HadoopTwitterTokenToolOptions.ARGS_KEY)); 065 options.prepare(); 066 timeDeltaMillis = context.getConfiguration().getLong(PairMutualInformation.TIMEDELTA, DEFAULT_TIME) * 60 * 1000; 067 068 } catch (CmdLineException e) { 069 throw new IOException(e); 070 } catch (Exception e) { 071 throw new IOException(e); 072 } 073 } 074 } 075 076 @Override 077 protected void setup(Mapper<LongWritable, Text, BytesWritable, BytesWritable>.Context context) throws IOException, InterruptedException { 078 loadOptions(context); 079 } 080 081 @Override 082 protected void map(LongWritable key, Text value, Mapper<LongWritable,Text,BytesWritable,BytesWritable>.Context context) throws IOException ,InterruptedException { 083 List<String> tokens = null; 084 DateTime time = null; 085 try { 086 USMFStatus status = options.readStatus(value.toString()); 087 time = status.createdAt(); 088 if(time == null) return; 089 tokens = options.readStatusPart(value.toString()); 090 } catch (Exception e) { 091 return; 092 } 093 long timeIndex = DEFAULT_TIME; 094 if(timeDeltaMillis > 0) 095 timeIndex = (time.getMillis() / timeDeltaMillis) * timeDeltaMillis; 096 for (int i = 0; i < tokens.size(); i++) 097 { 098 String tok1 = tokens.get(i); 099 String tok2; 100 for (int j = i+1; j < tokens.size(); j++) { 101 tok2 = tokens.get(j); 102 TokenPairCount tpc = null; 103 int cmp = tok1.compareTo(tok2); 104 if(cmp > 0){ 105 tpc = new TokenPairCount(tok2, tok1); 106 } 107 else{ 108 tpc = new TokenPairCount(tok1, tok2); 109 } 110 tpc.paircount = 1; 111 BytesWritable keywrite = new BytesWritable(tpc.identifierBinary(timeIndex)); 112 context.write(keywrite, new BytesWritable(IOUtils.serialize(tpc))); 113 context.getCounter(PairEnum.PAIR).increment(1); 114 } 115 TokenPairCount tpc = new TokenPairCount(tok1); 116 tpc.paircount = tokens.size() - 1; 117 context.write(new BytesWritable(tpc.identifierBinary(timeIndex)), new BytesWritable(IOUtils.serialize(tpc))); 118 context.getCounter(PairEnum.UNARY).increment(1); 119 } 120 121 }; 122}