001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.hadoop.tools.fastkmeans; 031 032import java.io.File; 033import java.io.IOException; 034 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.io.BytesWritable; 038import org.apache.hadoop.io.IntWritable; 039import org.apache.hadoop.mapreduce.Job; 040import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; 041import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; 042import org.openimaj.hadoop.sequencefile.SequenceFileUtility; 043 044 045public class SequenceFileByteDataSelector { 046 047 /** 048 * 049 */ 050 private static final long serialVersionUID = -5976796322589912944L; 051 private String sequenceFilePath; 052 private String fileType; 053 054 public SequenceFileByteDataSelector(String sequenceFilePath, String fileType) throws IOException, InterruptedException, ClassNotFoundException{ 055 056 this.sequenceFilePath = sequenceFilePath; 057 this.fileType = fileType; 058 059// File tmpFile = File.createTempFile("feature",".count"); 060// tmpFile.delete(); 061// Path outpath = new Path(SequenceFileUtility.convertToURI(tmpFile.getAbsolutePath()).toString()); 062// System.out.println("It is all going to: " + outpath); 063// 064// 065// Path[] sequenceFiles = SequenceFileUtility.getFilePaths(sequenceFilePath, "part"); 066// Configuration conf = new Configuration(); 067// 068// Job job = new Job(conf, "featurecount"); 069// job.setNumReduceTasks(1); 070// job.setJarByClass(SequenceFileByteDataSelector.class); 071// job.setOutputKeyClass(Text.class); 072// job.setOutputValueClass(IntWritable.class); 073// job.setMapperClass(FeatureCount.Map.class); 074// job.setCombinerClass(FeatureCount.Reduce.class); 075// job.setReducerClass(FeatureCount.Reduce.class); 076// 077// job.setInputFormatClass(SequenceFileInputFormat.class); 078// job.setOutputFormatClass(TextOutputFormat.class); 079// 080// job.getConfiguration().setStrings(FeatureCount.FILETYPE_KEY, new String[]{fileType}); 081// 082// SequenceFileInputFormat.setInputPaths(job, sequenceFiles); 083// FileOutputFormat.setOutputPath(job, outpath); 084// 085// job.waitForCompletion(true); 086// totalRecords = getTotalFromFile(tmpFile); 087// System.out.println("... Total records was: " + totalRecords); 088 } 089 090// private int getTotalFromFile(File tmpFile) throws NumberFormatException, IOException { 091// // Get the part file 092// File[] fs = tmpFile.listFiles(new FileFilter(){ @Override public boolean accept(File arg0) {return arg0.getName().startsWith("part");} }); 093// File f = fs[0]; 094// BufferedReader reader = new BufferedReader(new FileReader(f)); 095// return Integer.parseInt(reader.readLine().split("\t")[1]); 096// } 097 098 public Path getRandomRows(int k) throws IOException, InterruptedException, ClassNotFoundException { 099 // Create the output path 100 File tmpFile = File.createTempFile("feature",".select"); 101 tmpFile.delete(); 102 Path outpath = new Path(SequenceFileUtility.convertToURI(tmpFile.getAbsolutePath()).toString()); 103 System.out.println("It is all going to: " + outpath); 104 105 Path[] sequenceFiles = SequenceFileUtility.getFilePaths(sequenceFilePath, "part"); 106 Configuration conf = new Configuration(); 107 108 Job job = new Job(conf, "featureselect"); 109 job.setNumReduceTasks(1); 110 job.setJarByClass(SequenceFileByteDataSelector.class); 111 job.setOutputKeyClass(IntWritable.class); 112 job.setOutputValueClass(BytesWritable.class); 113 114 job.setMapperClass(FeatureSelect.Map.class); 115// job.setCombinerClass(FeatureSelect.Reduce.class); 116 job.setReducerClass(FeatureSelect.Reduce.class); 117 118 job.setInputFormatClass(SequenceFileInputFormat.class); 119 job.setOutputFormatClass(SequenceFileOutputFormat.class); 120// job.setOutputFormatClass(TextOutputFormat.class); 121 122 job.getConfiguration().setStrings(FeatureSelect.FILETYPE_KEY, new String[]{fileType}); 123 job.getConfiguration().setStrings(FeatureSelect.NFEATURE_KEY, new String[]{"" + k}); 124 125 SequenceFileInputFormat.setInputPaths(job, sequenceFiles); 126 SequenceFileOutputFormat.setOutputPath(job, outpath); 127 SequenceFileOutputFormat.setCompressOutput(job, false); 128// FileOutputFormat.setOutputPath(job, outpath); 129 job.waitForCompletion(true); 130 return outpath; 131 132 } 133}