001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.hadoop.tools.twitter.token.outputmode.sparsecsv; 031 032import java.io.BufferedReader; 033import java.io.ByteArrayInputStream; 034import java.io.IOException; 035import java.io.InputStreamReader; 036import java.io.StringWriter; 037import java.nio.channels.Channels; 038import java.util.ArrayList; 039import java.util.LinkedHashMap; 040import java.util.Map.Entry; 041 042import org.apache.hadoop.fs.FSDataInputStream; 043import org.apache.hadoop.fs.FileSystem; 044import org.apache.hadoop.fs.Path; 045import org.apache.hadoop.io.BytesWritable; 046import org.apache.hadoop.io.LongWritable; 047import org.apache.hadoop.io.NullWritable; 048import org.apache.hadoop.io.Text; 049import org.apache.hadoop.mapreduce.Job; 050import org.apache.hadoop.mapreduce.Mapper; 051import org.apache.hadoop.mapreduce.Reducer; 052import org.openimaj.hadoop.mapreduce.stage.StageProvider; 053import org.openimaj.hadoop.mapreduce.stage.helper.SequenceFileTextStage; 054import org.openimaj.hadoop.tools.HadoopToolsUtil; 055import org.openimaj.hadoop.tools.twitter.token.mode.dfidf.CountTweetsInTimeperiod; 056import org.openimaj.hadoop.tools.twitter.utils.TweetCountWordMap; 057import org.openimaj.io.IOUtils; 058import org.openimaj.util.pair.IndependentPair; 059 060import com.Ostermiller.util.CSVParser; 061import com.Ostermiller.util.CSVPrinter; 062import com.jmatio.io.MatFileWriter; 063import com.jmatio.types.MLArray; 064import com.jmatio.types.MLCell; 065import com.jmatio.types.MLDouble; 066 067 068public class TimeIndex extends StageProvider{ 069 070 /** 071 * Emits each word with the total number of times the word was seen 072 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 073 * 074 */ 075 public static class Map extends Mapper<LongWritable,BytesWritable,LongWritable,LongWritable>{ 076 public Map() { 077 // TODO Auto-generated constructor stub 078 } 079 @Override 080 public void map(final LongWritable key, BytesWritable value, final Mapper<LongWritable,BytesWritable,LongWritable,LongWritable>.Context context){ 081 try { 082 final TweetCountWordMap periodCountWordCount = IOUtils.read(new ByteArrayInputStream(value.getBytes()), TweetCountWordMap.class); 083 if(!key.equals(CountTweetsInTimeperiod.Map.END_TIME)){ 084 context.write(key, new LongWritable(periodCountWordCount.getNTweets())); 085 } 086 087 } catch (Exception e) { 088 System.err.println("Couldnt read timeperiod: " + key); 089 } 090 } 091 } 092 /** 093 * Writes each word,count 094 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 095 * 096 */ 097 public static class Reduce extends Reducer<LongWritable,LongWritable,NullWritable,Text>{ 098 public Reduce() { 099 // TODO Auto-generated constructor stub 100 } 101 @Override 102 public void reduce(LongWritable timeslot, Iterable<LongWritable> counts, Reducer<LongWritable,LongWritable,NullWritable,Text>.Context context){ 103 try { 104 String timeStr = timeslot.toString(); 105 long total = 0; 106 for (LongWritable count : counts) { 107 total += count.get(); 108 } 109 StringWriter swriter = new StringWriter(); 110 CSVPrinter writer = new CSVPrinter(swriter); 111 writer.write(new String[]{timeStr,total + ""}); 112 writer.flush(); 113 String toWrote = swriter.toString(); 114 context.write(NullWritable.get(), new Text(toWrote)); 115 return; 116 117 } catch (Exception e) { 118 System.err.println("Couldn't reduce to final file"); 119 } 120 } 121 } 122 123 /** 124 * from a report output path get the words 125 * @param path report output path 126 * @return map of time to an a pair containing <count, lineindex> 127 * @throws IOException 128 */ 129 public static LinkedHashMap<Long, IndependentPair<Long, Long>> readTimeCountLines(String path) throws IOException { 130 String wordPath = path + "/times"; 131 Path p = HadoopToolsUtil.getInputPaths(wordPath)[0]; 132 FileSystem fs = HadoopToolsUtil.getFileSystem(p); 133 FSDataInputStream toRead = fs.open(p); 134 BufferedReader reader = new BufferedReader(new InputStreamReader(toRead)); 135 CSVParser csvreader = new CSVParser(reader); 136 long lineN = 0; 137 String[] next = null; 138 LinkedHashMap<Long, IndependentPair<Long, Long>> toRet = new LinkedHashMap<Long, IndependentPair<Long,Long>>(); 139 while((next = csvreader.getLine())!=null && next.length > 0){ 140 toRet.put(Long.parseLong(next[0]), IndependentPair.pair(Long.parseLong(next[1]), lineN)); 141 lineN ++; 142 } 143 return toRet; 144 } 145 146 @Override 147 public SequenceFileTextStage<LongWritable,BytesWritable, LongWritable,LongWritable,NullWritable,Text>stage() { 148 return new SequenceFileTextStage<LongWritable,BytesWritable, LongWritable,LongWritable,NullWritable,Text>() { 149 150 @Override 151 public void setup(Job job) { 152 job.setSortComparatorClass(LongWritable.Comparator.class); 153 job.setNumReduceTasks(1); 154 } 155 @Override 156 public Class<? extends Mapper<LongWritable, BytesWritable, LongWritable, LongWritable>> mapper() { 157 return TimeIndex.Map.class; 158 } 159 @Override 160 public Class<? extends Reducer<LongWritable, LongWritable,NullWritable,Text>> reducer() { 161 return TimeIndex.Reduce.class; 162 } 163 164 @Override 165 public String outname() { 166 return "times"; 167 } 168 }; 169 } 170 171 /** 172 * Write a CSV timeIndex to a {@link MLCell} writen to a .mat data file 173 * @param path 174 * @throws IOException 175 */ 176 public static void writeToMatlab(String path) throws IOException { 177 Path timeMatPath = new Path(path + "/times/timeIndex.mat"); 178 FileSystem fs = HadoopToolsUtil.getFileSystem(timeMatPath); 179 LinkedHashMap<Long, IndependentPair<Long, Long>> timeIndex = readTimeCountLines(path); 180 MLCell timeCell = new MLCell("times",new int[]{timeIndex.size(),2}); 181 182 System.out.println("... reading times"); 183 for (Entry<Long, IndependentPair<Long, Long>> ent : timeIndex.entrySet()) { 184 long time = (long)ent.getKey(); 185 int timeCellIndex = (int)(long)ent.getValue().secondObject(); 186 long count = ent.getValue().firstObject(); 187 timeCell.set(new MLDouble(null, new double[][]{new double[]{time}}), timeCellIndex,0); 188 timeCell.set(new MLDouble(null, new double[][]{new double[]{count}}), timeCellIndex,1); 189 } 190 ArrayList<MLArray> list = new ArrayList<MLArray>(); 191 list.add(timeCell); 192 new MatFileWriter(Channels.newChannel(fs.create(timeMatPath)),list ); 193 } 194 195}