1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30 package org.openimaj.hadoop.tools.globalfeature;
31
32 import java.io.ByteArrayInputStream;
33 import java.io.ByteArrayOutputStream;
34 import java.util.ArrayList;
35 import java.util.Arrays;
36 import java.util.HashMap;
37 import java.util.List;
38 import java.util.Map;
39
40 import org.apache.hadoop.conf.Configured;
41 import org.apache.hadoop.fs.Path;
42 import org.apache.hadoop.io.BytesWritable;
43 import org.apache.hadoop.io.SequenceFile;
44 import org.apache.hadoop.io.Text;
45 import org.apache.hadoop.mapreduce.Job;
46 import org.apache.hadoop.mapreduce.Mapper;
47 import org.apache.hadoop.util.Tool;
48 import org.apache.hadoop.util.ToolRunner;
49 import org.apache.log4j.Logger;
50 import org.openimaj.feature.FeatureVector;
51 import org.openimaj.hadoop.mapreduce.TextBytesJobUtil;
52 import org.openimaj.hadoop.sequencefile.MetadataConfiguration;
53 import org.openimaj.hadoop.tools.HadoopToolsUtil;
54 import org.openimaj.image.ImageUtilities;
55 import org.openimaj.image.MBFImage;
56 import org.openimaj.io.IOUtils;
57
58
59
60
61
62
63
64
65 public class HadoopGlobalFeaturesTool extends Configured implements Tool
66 {
67 private static final String ARGS_KEY = "globalfeatures.args";
68 private static Logger logger = Logger.getLogger(HadoopGlobalFeaturesTool.class);
69
70 static class GlobalFeaturesMapper extends Mapper<Text, BytesWritable, Text, BytesWritable> {
71 private HadoopGlobalFeaturesOptions options;
72
73 public GlobalFeaturesMapper() {
74 }
75
76 @Override
77 protected void setup(Mapper<Text, BytesWritable, Text, BytesWritable>.Context context) {
78 options = new HadoopGlobalFeaturesOptions(context.getConfiguration().getStrings(ARGS_KEY));
79 }
80
81 @Override
82 protected void
83 map(Text key, BytesWritable value, Mapper<Text, BytesWritable, Text, BytesWritable>.Context context)
84 throws InterruptedException
85 {
86 try {
87 final MBFImage img = ImageUtilities.readMBF(new ByteArrayInputStream(value.getBytes()));
88 final FeatureVector fv = options.featureOp.extract(img);
89
90 final ByteArrayOutputStream baos = new ByteArrayOutputStream();
91 if (options.binary)
92 IOUtils.writeBinary(baos, fv);
93 else
94 IOUtils.writeASCII(baos, fv);
95
96 context.write(key, new BytesWritable(baos.toByteArray()));
97 } catch (final Exception e) {
98 logger.warn("Problem processing image " + key + " (" + e + ")");
99 }
100 }
101 }
102
103 @Override
104 public int run(String[] args) throws Exception {
105 final HadoopGlobalFeaturesOptions options = new HadoopGlobalFeaturesOptions(args, true);
106
107 final Map<String, String> metadata = new HashMap<String, String>();
108 metadata.put(MetadataConfiguration.CONTENT_TYPE_KEY, "application/globalfeature-" + options.feature + "-"
109 + (options.binary ? "bin" : "ascii"));
110
111 metadata.put("clusterquantiser.filetype", (options.binary ? "bin" : "ascii"));
112
113 final List<Path> allPaths = new ArrayList<Path>();
114 for (final String p : options.input) {
115 allPaths.addAll(Arrays.asList(HadoopToolsUtil.getInputPaths(p)));
116 }
117
118 final Job job = TextBytesJobUtil.createJob(allPaths, new Path(options.output), metadata, this.getConf());
119 job.setJarByClass(this.getClass());
120 job.setMapperClass(GlobalFeaturesMapper.class);
121 job.getConfiguration().setStrings(ARGS_KEY, args);
122 job.setNumReduceTasks(0);
123
124 job.waitForCompletion(true);
125
126 return 0;
127 }
128
129
130
131
132
133
134
135
136
137 public static void main(String[] args) throws Exception
138 {
139 ToolRunner.run(new HadoopGlobalFeaturesTool(), args);
140 }
141 }