001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.hadoop.tools.twitter.token.outputmode.sparsecsv; 031 032import java.io.ByteArrayOutputStream; 033import java.io.DataInput; 034import java.io.DataOutputStream; 035import java.io.IOException; 036import java.util.ArrayList; 037import java.util.HashMap; 038 039import org.apache.hadoop.io.BytesWritable; 040import org.apache.hadoop.io.LongWritable; 041import org.apache.hadoop.io.Text; 042import org.apache.hadoop.mapreduce.Mapper; 043import org.openimaj.hadoop.tools.twitter.utils.WordDFIDF; 044import org.openimaj.io.IOUtils; 045import org.openimaj.io.wrappers.ReadableListBinary; 046import org.openimaj.util.pair.IndependentPair; 047 048/** 049 * Emits each word with the total number of times the word was seen 050 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 051 * 052 */ 053public class MapValuesByTime extends Mapper<Text,BytesWritable,LongWritable,BytesWritable>{ 054 055 056 /** 057 * construct the map instance (do nothing) 058 */ 059 public MapValuesByTime() { 060 // TODO Auto-generated constructor stub 061 } 062 063 @Override 064 public void setup(Mapper<Text,BytesWritable,LongWritable,BytesWritable>.Context context) throws IOException, InterruptedException { 065 loadOptions(context); 066 } 067 private static String[] options; 068 private static HashMap<String, IndependentPair<Long, Long>> wordIndex; 069 070 protected static synchronized void loadOptions(Mapper<Text,BytesWritable,LongWritable,BytesWritable>.Context context) throws IOException { 071 if (options == null) { 072 try { 073 options = context.getConfiguration().getStrings(Values.ARGS_KEY); 074 wordIndex = WordIndex.readWordCountLines(options[0]); 075 System.out.println("Wordindex loaded: " + wordIndex.size()); 076 } catch (Exception e) { 077 throw new IOException(e); 078 } 079 } 080 } 081 082 @Override 083 public void map(final Text key, BytesWritable value, final Mapper<Text,BytesWritable,LongWritable,BytesWritable>.Context context) throws IOException, InterruptedException{ 084 try { 085 if(!wordIndex.containsKey(key.toString())) return; 086 System.out.println("Mapping values for word: " + key); 087 final int wordI = (int)((long)(wordIndex.get(key.toString()).secondObject())); 088 IOUtils.deserialize(value.getBytes(), new ReadableListBinary<Object>(new ArrayList<Object>()){ 089 @Override 090 protected Object readValue(DataInput in) throws IOException { 091 WordDFIDF idf = new WordDFIDF(); 092 idf.readBinary(in); 093 System.out.println("... Found (" + key + ") at time: " + idf.timeperiod); 094 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 095 DataOutputStream dos = new DataOutputStream(baos); 096 idf.writeBinary(dos); 097 dos.writeInt(wordI); 098 dos.flush(); 099 dos.close(); 100 try { 101 context.write(new LongWritable(idf.timeperiod), new BytesWritable(baos.toByteArray())); 102 } catch (InterruptedException e) { 103 throw new IOException(e); 104 } 105 return new Object(); 106 } 107 }); 108 } catch (IOException e) { 109 e.printStackTrace(); 110 System.err.println("Couldnt read word or timeperiod from word: " + key); 111 } 112 113 } 114}