001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.hadoop.tools.downloader; 031 032import java.io.IOException; 033import java.net.URL; 034import java.util.List; 035 036import org.apache.hadoop.fs.FSDataOutputStream; 037import org.apache.hadoop.fs.Path; 038import org.apache.hadoop.io.BytesWritable; 039import org.apache.hadoop.io.LongWritable; 040import org.apache.hadoop.io.Text; 041import org.apache.hadoop.mapreduce.Mapper; 042import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 043import org.apache.log4j.Logger; 044import org.openimaj.hadoop.tools.downloader.InputMode.Parser; 045import org.openimaj.io.HttpUtils; 046import org.openimaj.util.pair.IndependentPair; 047 048/** 049 * A Hadoop {@link Mapper} for downloading files. 050 * 051 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 052 * @author Jonathon Hare (jsh2@ecs.soton.ac.uk) 053 */ 054public class DownloadMapper extends Mapper<LongWritable, Text, Text, BytesWritable> { 055 private static Logger logger = Logger.getLogger(DownloadMapper.class); 056 057 private Parser parser; 058 private long sleep; 059 private boolean followRedirects; 060 private static FSDataOutputStream failureWriter = null; 061 062 protected enum Counters { 063 DOWNLOADED, 064 FAILED, 065 PARSE_ERROR 066 } 067 068 @Override 069 protected void setup(Context context) throws IOException, InterruptedException { 070 final HadoopDownloaderOptions options = new HadoopDownloaderOptions(context.getConfiguration().getStrings( 071 HadoopDownloader.ARGS_KEY)); 072 options.prepare(false); 073 074 parser = options.getInputParser(); 075 sleep = options.getSleep(); 076 followRedirects = options.followRedirects(); 077 078 synchronized (DownloadMapper.class) { 079 if (options.writeFailures() && failureWriter != null) { 080 final String[] taskId = context.getConfiguration().get("mapred.task.id").split("_"); 081 Path workPath = FileOutputFormat.getWorkOutputPath(context); 082 workPath = workPath.suffix("/failures" + "-" + taskId[4].substring(1)); 083 failureWriter = workPath.getFileSystem(context.getConfiguration()).create(workPath); 084 } 085 } 086 } 087 088 @Override 089 protected void cleanup(Context context) throws IOException, InterruptedException { 090 if (failureWriter != null) { 091 failureWriter.close(); 092 failureWriter = null; 093 } 094 095 super.cleanup(context); 096 } 097 098 @Override 099 public void map(LongWritable index, Text urlLine, Context context) { 100 logger.info("Attempting to download: " + urlLine); 101 102 try { 103 final IndependentPair<String, List<URL>> urlData = parser.parse(urlLine.toString()); 104 105 if (urlData == null) { 106 logger.trace("parser returned null; record skipped."); 107 return; 108 } 109 110 boolean downloaded = false; 111 for (final URL potential : urlData.secondObject()) { 112 downloaded = tryDownload(urlData.firstObject(), potential, context); 113 114 if (downloaded) { 115 logger.info("Dowloaded: " + potential); 116 context.getCounter(Counters.DOWNLOADED).increment(1); 117 return; 118 } 119 120 logger.trace("Not found; trying next"); 121 } 122 123 if (!downloaded) { 124 logger.info("Failed to download: " + urlLine); 125 context.getCounter(Counters.FAILED).increment(1); 126 writeFailure(urlLine, context); 127 } else { 128 context.getCounter(Counters.DOWNLOADED).increment(1); 129 } 130 } catch (final Exception e) { 131 logger.info("Error parsing: " + urlLine); 132 logger.trace(e); 133 context.getCounter(Counters.PARSE_ERROR).increment(1); 134 writeFailure(urlLine, context); 135 } 136 137 if (sleep > 0) { 138 try { 139 logger.trace("Waiting before continuing"); 140 Thread.sleep(sleep); 141 } catch (final InterruptedException e) { 142 logger.trace("Wait was interupted; ignoring"); 143 } 144 } 145 } 146 147 private synchronized static void writeFailure(Text urlLine, Context context) { 148 if (failureWriter != null) { 149 try { 150 failureWriter.writeUTF(urlLine + "\n"); 151 } catch (final IOException e) { 152 logger.error(e); 153 } 154 } 155 } 156 157 private boolean tryDownload(String key, URL url, Context context) throws InterruptedException { 158 try { 159 final byte[] bytes = HttpUtils.readURLAsBytes(url, followRedirects); 160 161 if (bytes == null) 162 return false; 163 164 final BytesWritable bw = new BytesWritable(bytes); 165 context.write(new Text(key), bw); 166 } catch (final IOException e) { 167 logger.trace(e); 168 return false; 169 } 170 171 return true; 172 } 173}