001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.hadoop.tools.twitter.token.outputmode.sparsecsv; 031 032import java.io.ByteArrayInputStream; 033import java.io.DataInputStream; 034import java.io.IOException; 035import java.io.StringWriter; 036import java.nio.channels.Channels; 037import java.util.ArrayList; 038import java.util.HashMap; 039 040import org.apache.hadoop.fs.FSDataOutputStream; 041import org.apache.hadoop.fs.FileSystem; 042import org.apache.hadoop.fs.Path; 043import org.apache.hadoop.io.BytesWritable; 044import org.apache.hadoop.io.LongWritable; 045import org.apache.hadoop.io.NullWritable; 046import org.apache.hadoop.io.Text; 047import org.apache.hadoop.mapreduce.Reducer; 048import org.openimaj.hadoop.tools.HadoopToolsUtil; 049import org.openimaj.hadoop.tools.twitter.utils.WordDFIDF; 050import org.openimaj.util.pair.IndependentPair; 051 052import com.Ostermiller.util.CSVPrinter; 053import com.jmatio.io.MatFileWriter; 054import com.jmatio.types.MLArray; 055import com.jmatio.types.MLInt64; 056import com.jmatio.types.MLSparse; 057 058/** 059 * Writes each word,count 060 * 061 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 062 * 063 */ 064public class ReduceValuesByTime extends Reducer<LongWritable, BytesWritable, NullWritable, Text> { 065 /** 066 * construct the reduce instance, do nothing 067 */ 068 public ReduceValuesByTime() { 069 // TODO Auto-generated constructor stub 070 } 071 072 @Override 073 public void setup(Reducer<LongWritable, BytesWritable, NullWritable, Text>.Context context) throws IOException, InterruptedException { 074 loadOptions(context); 075 } 076 077 private static String[] options; 078 private static HashMap<String, IndependentPair<Long, Long>> wordIndex; 079 private static HashMap<Long, IndependentPair<Long, Long>> timeIndex; 080 private static String valuesLocation; 081 private static boolean matlabOut; 082 083 protected static synchronized void loadOptions(Reducer<LongWritable, BytesWritable, NullWritable, Text>.Context context) throws IOException { 084 if (options == null) { 085 try { 086 options = context.getConfiguration().getStrings(Values.ARGS_KEY); 087 matlabOut = context.getConfiguration().getBoolean(Values.MATLAB_OUT, false); 088 timeIndex = TimeIndex.readTimeCountLines(options[0]); 089 if (matlabOut) { 090 wordIndex = WordIndex.readWordCountLines(options[0]); 091 valuesLocation = options[0] + "/values/values.%d.mat"; 092 } 093 System.out.println("timeindex loaded: " + timeIndex.size()); 094 } catch (Exception e) { 095 throw new IOException(e); 096 } 097 } 098 } 099 @Override 100 public void reduce(LongWritable timeslot, Iterable<BytesWritable> manylines, Reducer<LongWritable, BytesWritable, NullWritable, Text>.Context context) throws IOException, InterruptedException { 101 try { 102 if (matlabOut) { 103 System.out.println("Creating matlab file for timeslot: " + timeslot); 104 createWriteToMatlab(timeslot, manylines); 105 } 106 else { 107 final StringWriter swriter = new StringWriter(); 108 final CSVPrinter writer = new CSVPrinter(swriter); 109 for (BytesWritable word : manylines) { 110 ByteArrayInputStream bais = new ByteArrayInputStream(word.getBytes()); 111 DataInputStream dis = new DataInputStream(bais); 112 WordDFIDF idf = new WordDFIDF(); 113 idf.readBinary(dis); 114 int timeI = (int) ((long) timeIndex.get(idf.timeperiod).secondObject()); 115 int wordI = dis.readInt(); 116 writer.writeln(new String[] { wordI + "", timeI + "", idf.wf + "", idf.tf + "", idf.Twf + "", idf.Ttf + "" }); 117 writer.flush(); 118 swriter.flush(); 119 } 120 context.write(NullWritable.get(), new Text(swriter.toString())); 121 } 122 123 } catch (Exception e) { 124 e.printStackTrace(); 125 System.err.println("Couldn't reduce to final file"); 126 throw new IOException(e); 127 } 128 } 129 130 private void createWriteToMatlab(LongWritable timeslot, Iterable<BytesWritable> manylines) throws IOException { 131 System.out.println("Creating matlab file for timeslot: " + timeslot); 132 MLSparse matarr = new MLSparse(String.format("values_%d", timeslot.get()), new int[] { wordIndex.size(), 2 }, 0, wordIndex.size() * 2); 133 long Ttf = 0; 134 long tf = 0; 135 boolean set = false; 136 for (BytesWritable word : manylines) { 137 ByteArrayInputStream bais = new ByteArrayInputStream(word.getBytes()); 138 DataInputStream dis = new DataInputStream(bais); 139 WordDFIDF idf = new WordDFIDF(); 140 idf.readBinary(dis); 141 int wordI = dis.readInt(); 142 // writer.writeln(new String[]{wordI + "",timeI + "",idf.wf + 143 // "",idf.tf + "",idf.Twf + "", idf.Ttf + ""}); 144 // writer.flush(); 145 // swriter.flush(); 146 if (!set) { 147 tf = idf.tf; 148 Ttf = idf.Ttf; 149 set = true; 150 } 151 else { 152 if (tf != idf.tf) 153 throw new IOException("Error writing matlab file, tf doesn't match"); 154 if (Ttf != idf.Ttf) 155 throw new IOException("Error writing matlab file, Ttf doesn't match"); 156 } 157 matarr.set((double) idf.wf, wordI, 0); 158 matarr.set((double) idf.Twf, wordI, 1); 159 } 160 MLInt64 tfMat = new MLInt64(String.format("tf_%d", timeslot.get()), new long[][] { new long[] { tf } }); 161 MLInt64 TtfMat = new MLInt64(String.format("Ttf_%d", timeslot.get()), new long[][] { new long[] { Ttf } }); 162 ArrayList<MLArray> list = new ArrayList<MLArray>(); 163 list.add(tfMat); 164 list.add(TtfMat); 165 list.add(matarr); 166 Path outLoc = new Path(String.format(valuesLocation, timeslot.get())); 167 FileSystem fs = HadoopToolsUtil.getFileSystem(outLoc); 168 FSDataOutputStream os = fs.create(outLoc); 169 new MatFileWriter(Channels.newChannel(os), list); 170 } 171}