001/**
002 * Copyright (c) 2011, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.hadoop.tools.twitter.token.mode.pointwisemi.count;
031
032import java.io.BufferedReader;
033import java.io.IOException;
034import java.io.InputStreamReader;
035import java.util.HashMap;
036import java.util.Map;
037
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.fs.FSDataInputStream;
040import org.apache.hadoop.fs.FSDataOutputStream;
041import org.apache.hadoop.fs.FileStatus;
042import org.apache.hadoop.fs.FileSystem;
043import org.apache.hadoop.fs.Path;
044import org.apache.hadoop.io.BytesWritable;
045import org.apache.hadoop.mapred.JobConf;
046import org.apache.hadoop.mapreduce.Job;
047import org.apache.hadoop.mapreduce.Reducer;
048import org.openimaj.hadoop.mapreduce.stage.helper.TextByteByteStage;
049import org.openimaj.hadoop.tools.HadoopToolsUtil;
050import org.openimaj.hadoop.tools.twitter.HadoopTwitterTokenToolOptions;
051import org.openimaj.io.IOUtils;
052
053/**
054 * @author Sina Samangooei (ss@ecs.soton.ac.uk)
055 *
056 */
057public class PairMutualInformation extends TextByteByteStage {
058
059        /**
060         * The time delta between time periods
061         */
062        public static final String TIMEDELTA = "org.openimaj.hadoop.tools.twitter.token.mode.pairwisemi.timedelta";
063        /**
064         * The location of the statistics file
065         */
066        public static final String PAIR_STATS_FILE = "pairstats";
067        /**
068         * The pairMI output directory
069         */
070        public static final String PAIRMI_DIR = "pairmi";
071        /**
072         * The root directory where timeperiod pair counts will be stored
073         */
074        public static final String TIMEPERIOD_COUNT_OUTPUT_ROOT = "org.openimaj.hadoop.tools.twitter.token.mode.pairwisemi.timeoutloc";
075        /**
076         * Name of the timeperiod count directory
077         */
078        public static final String TIMEPERIOD_OUTPUT_NAME = "timeperiod_counts";
079        private String[] nonHadoopArgs;
080        private long timedelta;
081        private Path actualOutputLocation;
082
083        /**
084         * @param nonHadoopArgs
085         *            the arguments for configuration
086         * @param timedelta
087         */
088        public PairMutualInformation(String[] nonHadoopArgs, long timedelta) {
089                this.nonHadoopArgs = nonHadoopArgs;
090                this.timedelta = timedelta;
091        }
092
093        @Override
094        public void setup(Job job) throws IOException {
095                job.getConfiguration().setStrings(HadoopTwitterTokenToolOptions.ARGS_KEY, nonHadoopArgs);
096                job.getConfiguration().setLong(TIMEDELTA, timedelta);
097                final Path tpcOutRoot = new Path(this.actualOutputLocation, TIMEPERIOD_OUTPUT_NAME);
098                job.getConfiguration().set(TIMEPERIOD_COUNT_OUTPUT_ROOT, tpcOutRoot.toString());
099                if (timedelta != -1) {
100                        // if there are multiple times, split a file per day
101                        job.setNumReduceTasks(365);
102                }
103
104                ((JobConf) job.getConfiguration()).setOutputValueGroupingComparator(TokenPairValueGroupingComparator.class);
105                ((JobConf) job.getConfiguration()).setOutputKeyComparatorClass(TokenPairKeyComparator.class);
106                job.setPartitionerClass(TokenPairPartitioner.class);
107        }
108
109        @Override
110        public Class<PairEmit> mapper() {
111                return PairEmit.class;
112        }
113
114        @Override
115        public Class<? extends Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>> combiner() {
116                return PairEmitCombiner.class;
117        }
118
119        @Override
120        public Job stage(Path[] inputs, Path output, Configuration conf) throws Exception {
121                this.actualOutputLocation = output;
122                return super.stage(inputs, output, conf);
123        }
124
125        @Override
126        public Class<? extends Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>> reducer() {
127                return PairEmitCounter.class;
128        }
129
130        @Override
131        public String outname() {
132                return PAIRMI_DIR;
133        }
134
135        @Override
136        public void finished(Job job) {
137                final Path out = new Path(actualOutputLocation, PAIR_STATS_FILE);
138                FileSystem fs;
139                try {
140                        fs = HadoopToolsUtil.getFileSystem(out);
141                        final FSDataOutputStream os = fs.create(out);
142                        IOUtils.writeASCII(os, new WritablePairEnum(job.getCounters(), PairEnum.values()));
143                } catch (final IOException e) {
144                }
145        }
146
147        /**
148         * Load the PointwisePMI stats file from an output location (Path: outpath/
149         * {@link PairMutualInformation#PAIR_STATS_FILE}
150         *
151         * @param outpath
152         * @return a WritablePairEnum instance with the counter values filled
153         * @throws IOException
154         */
155        public static WritablePairEnum loadStats(Path outpath) throws IOException {
156                Path pmistats = new Path(outpath, PairMutualInformation.PAIRMI_DIR);
157                pmistats = new Path(pmistats, PairMutualInformation.PAIR_STATS_FILE);
158                final FileSystem fs = HadoopToolsUtil.getFileSystem(pmistats);
159                final FSDataInputStream inp = fs.open(pmistats);
160                final WritablePairEnum ret = IOUtils.read(inp, WritablePairEnum.class);
161                return ret;
162        }
163
164        /**
165         * Load the total pairs seen in every time period from the pairmi location
166         * provided
167         *
168         * @param pairmiloc
169         *            a directory which contains {@link #PAIRMI_DIR}/
170         *            {@link #TIMEPERIOD_OUTPUT_NAME}
171         * @return map of a time period to a count
172         * @throws IOException
173         */
174        public static Map<Long, Long> loadTimeCounts(Path pairmiloc) throws IOException {
175                final Path dir = new Path(new Path(pairmiloc, PAIRMI_DIR), TIMEPERIOD_OUTPUT_NAME);
176                final FileSystem fs = HadoopToolsUtil.getFileSystem(dir);
177                final FileStatus[] timePaths = fs.listStatus(dir);
178
179                final Map<Long, Long> out = new HashMap<Long, Long>();
180                for (final FileStatus fileStatus : timePaths) {
181                        final Path fsp = fileStatus.getPath();
182                        final Long time = Long.parseLong(fsp.getName());
183                        final BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(fsp)));
184                        final Long count = Long.parseLong(reader.readLine());
185                        out.put(time, count);
186                }
187                return out;
188        }
189
190}