001/**
002 * Copyright (c) 2011, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.hadoop.sequencefile;
031
032import java.io.File;
033import java.io.IOException;
034import java.io.InputStream;
035import java.io.PrintStream;
036import java.io.PrintWriter;
037import java.lang.reflect.ParameterizedType;
038import java.math.BigInteger;
039import java.net.URI;
040import java.security.MessageDigest;
041import java.security.NoSuchAlgorithmException;
042import java.util.ArrayList;
043import java.util.HashMap;
044import java.util.Iterator;
045import java.util.LinkedHashMap;
046import java.util.List;
047import java.util.Map;
048import java.util.Map.Entry;
049import java.util.NoSuchElementException;
050import java.util.UUID;
051import java.util.zip.Deflater;
052import java.util.zip.ZipEntry;
053import java.util.zip.ZipOutputStream;
054
055import org.apache.hadoop.conf.Configuration;
056import org.apache.hadoop.fs.FSDataOutputStream;
057import org.apache.hadoop.fs.FileStatus;
058import org.apache.hadoop.fs.FileSystem;
059import org.apache.hadoop.fs.LocalFileSystem;
060import org.apache.hadoop.fs.Path;
061import org.apache.hadoop.fs.PathFilter;
062import org.apache.hadoop.io.SequenceFile;
063import org.apache.hadoop.io.SequenceFile.CompressionType;
064import org.apache.hadoop.io.SequenceFile.Metadata;
065import org.apache.hadoop.io.SequenceFile.Reader;
066import org.apache.hadoop.io.SequenceFile.Writer;
067import org.apache.hadoop.io.Text;
068import org.apache.hadoop.io.Writable;
069import org.apache.hadoop.io.compress.CompressionCodec;
070import org.apache.hadoop.io.compress.DefaultCodec;
071import org.apache.hadoop.util.ReflectionUtils;
072
073/**
074 * Base class for a utility class that deals with specifically typed sequence
075 * files.
076 *
077 * @author Jonathon Hare (jsh2@ecs.soton.ac.uk)
078 * @author Sina Samangooei (ss@ecs.soton.ac.uk)
079 *
080 * @param <K>
081 *            Key type
082 * @param <V>
083 *            Value type
084 */
085public abstract class SequenceFileUtility<K extends Writable, V extends Writable> implements Iterable<Entry<K, V>> {
086        protected Configuration config = new Configuration();
087        protected FileSystem fileSystem;
088        protected Path sequenceFilePath;
089
090        protected Writer writer;
091        protected CompressionType compressionType = CompressionType.BLOCK;
092
093        protected boolean isReader;
094
095        protected String uuid;
096
097        public SequenceFileUtility(String uriOrPath, boolean read) throws IOException {
098                setup(convertToURI(uriOrPath), read);
099        }
100
101        public SequenceFileUtility(URI uri, boolean read) throws IOException {
102                setup(uri, read);
103        }
104
105        public SequenceFileUtility(String uriOrPath, CompressionType compressionType) throws IOException {
106                this.compressionType = compressionType;
107                setup(convertToURI(uriOrPath), false);
108        }
109
110        public SequenceFileUtility(URI uri, CompressionType compressionType) throws IOException {
111                this.compressionType = compressionType;
112                setup(uri, false);
113        }
114
115        /**
116         * Get a list of all the reducer outputs in a directory. If the given
117         * path/uri is not a directory, then it is assumed that it is a SequenceFile
118         * and returned directly.
119         *
120         * @param uriOrPath
121         *            the path or uri
122         * @return the reducer outputs
123         * @throws IOException
124         */
125        public static URI[] getReducerFiles(String uriOrPath) throws IOException {
126                return getFiles(uriOrPath, "part-r-");
127        }
128
129        /**
130         * Get a list of all the sequence files (with a given name prefix) in a
131         * directory. If the given uri is not a directory, then it is assumed that
132         * it is a SequenceFile and returned directly.
133         *
134         * @param uriOrPath
135         *            the path or uri
136         * @param filenamePrefix
137         *            the prefix of the file name
138         * @return the matching files
139         * @throws IOException
140         */
141        public static URI[] getFiles(String uriOrPath, final String filenamePrefix) throws IOException {
142                final Configuration config = new Configuration();
143                final URI uri = convertToURI(uriOrPath);
144                final FileSystem fs = FileSystem.get(uri, config);
145                final Path path = new Path(uri.toString());
146
147                if (fs.getFileStatus(path).isDirectory()) {
148                        final FileStatus[] files = fs.listStatus(path, new PathFilter() {
149                                @Override
150                                public boolean accept(Path p) {
151                                        return p.getName().startsWith(filenamePrefix);
152                                }
153                        });
154
155                        final URI[] uris = new URI[files.length];
156                        int i = 0;
157                        for (final FileStatus status : files) {
158                                uris[i++] = status.getPath().toUri();
159                        }
160                        return uris;
161                } else {
162                        return new URI[] { uri };
163                }
164        }
165
166        /**
167         * Get a list of all the sequence files (with a given name prefix) in the
168         * set of input paths. If a given uri is not a directory, then it is assumed
169         * that it is a SequenceFile and returned directly.
170         *
171         * @param uriOrPaths
172         *            the paths or uris
173         * @param filenamePrefix
174         *            the prefix of the file name
175         * @return the list of sequence files
176         * @throws IOException
177         */
178        public static Path[] getFilePaths(String[] uriOrPaths, String filenamePrefix) throws IOException {
179                final List<Path> pathList = new ArrayList<Path>();
180                for (final String uriOrPath : uriOrPaths) {
181                        final Path[] paths = getFilePaths(uriOrPath, filenamePrefix);
182                        for (final Path path : paths) {
183                                pathList.add(path);
184                        }
185                }
186                return pathList.toArray(new Path[pathList.size()]);
187        }
188
189        /**
190         * Get a list of all the sequence files (with a given name prefix) in the
191         * set of input paths.
192         * <p>
193         * Optionally a subdirectory can be provided; if provided the subdirectory
194         * is appended to each path (i.e. PATH/subdirectory).
195         * <p>
196         * If the given uri is not a directory, then it is assumed that it is a
197         * single SequenceFile and returned directly.
198         *
199         * @param uriOrPaths
200         *            the URI or path to the directory/file
201         * @param subdir
202         *            the optional subdirectory (may be null)
203         * @param filenamePrefix
204         *            the prefix of the file name
205         * @return the list of sequence files
206         * @throws IOException
207         */
208        public static Path[] getFilePaths(String[] uriOrPaths, String subdir, String filenamePrefix) throws IOException {
209                final List<Path> pathList = new ArrayList<Path>();
210
211                for (String uriOrPath : uriOrPaths) {
212                        if (subdir != null)
213                                uriOrPath += "/" + subdir;
214
215                        final Path[] paths = getFilePaths(uriOrPath, filenamePrefix);
216                        for (final Path path : paths) {
217                                pathList.add(path);
218                        }
219                }
220                return pathList.toArray(new Path[pathList.size()]);
221        }
222
223        /**
224         * Get a list of all the sequence files (with a given name prefix) in a
225         * directory. If the given uri is not a directory, then it is assumed that
226         * it is a SequenceFile and returned directly.
227         *
228         * @param uriOrPath
229         *            the path or uri
230         * @param filenamePrefix
231         *            the prefix of the file name
232         * @return the list of sequence files
233         * @throws IOException
234         */
235        public static Path[] getFilePaths(String uriOrPath, final String filenamePrefix) throws IOException {
236                final Configuration config = new Configuration();
237                final URI uri = convertToURI(uriOrPath);
238                final FileSystem fs = FileSystem.get(uri, config);
239
240                final Path path = new Path(uri);
241
242                if (fs.getFileStatus(path).isDirectory()) {
243                        final FileStatus[] files = fs.listStatus(path, new PathFilter() {
244                                @Override
245                                public boolean accept(Path p) {
246                                        return p.getName().startsWith(filenamePrefix);
247                                }
248                        });
249
250                        final Path[] uris = new Path[files.length];
251                        int i = 0;
252                        for (final FileStatus status : files) {
253                                uris[i++] = status.getPath();
254                        }
255                        return uris;
256                } else {
257                        return new Path[] { path };
258                }
259        }
260
261        /**
262         * Get a list of all the sequence files whose names match the given regular
263         * expression in a directory. If the given uri is not a directory, then it
264         * is assumed that it is a SequenceFile and returned directly.
265         *
266         * @param uriOrPath
267         *            the path or uri
268         * @param regex
269         *            the regular expression to match
270         * @return a list of files
271         * @throws IOException
272         */
273        public static URI[] getFilesRegex(String uriOrPath, final String regex) throws IOException {
274                final Configuration config = new Configuration();
275                final URI uri = convertToURI(uriOrPath);
276                final FileSystem fs = FileSystem.get(uri, config);
277                final Path path = new Path(uri.toString());
278
279                if (fs.getFileStatus(path).isDirectory()) {
280                        final FileStatus[] files = fs.listStatus(path, new PathFilter() {
281                                @Override
282                                public boolean accept(Path p) {
283                                        return (regex == null || p.getName().matches(regex));
284                                }
285                        });
286
287                        final URI[] uris = new URI[files.length];
288                        int i = 0;
289                        for (final FileStatus status : files) {
290                                uris[i++] = status.getPath().toUri();
291                        }
292                        return uris;
293                } else {
294                        return new URI[] { uri };
295                }
296        }
297
298        /**
299         * Return a list of the keys in the sequence file. Read mode only.
300         *
301         * @return keys.
302         */
303        @SuppressWarnings("unchecked")
304        public Map<K, Long> listKeysAndOffsets() {
305                if (!isReader) {
306                        throw new UnsupportedOperationException("Cannot read keys in write mode");
307                }
308
309                Reader reader = null;
310                try {
311                        final Map<K, Long> keys = new LinkedHashMap<K, Long>();
312
313                        reader = createReader();
314                        final Class<K> keyClass = (Class<K>) reader.getKeyClass();
315                        K key = ReflectionUtils.newInstance(keyClass, config);
316                        final V val = ReflectionUtils.newInstance((Class<V>) reader.getValueClass(), config);
317                        long start = 0L;
318                        long end = 0L;
319                        while (reader.next(key, val)) {
320                                final long pos = reader.getPosition();
321                                if (pos != end) {
322                                        start = end;
323                                        end = pos;
324                                }
325                                keys.put(key, start);
326                                key = ReflectionUtils.newInstance(keyClass, config);
327                        }
328
329                        return keys;
330                } catch (final Exception e) {
331                        throw new RuntimeException(e);
332                } finally {
333                        if (reader != null)
334                                try {
335                                        reader.close();
336                                } catch (final IOException e1) {
337                                }
338                }
339        }
340
341        /**
342         * Go through a sequence file, applying each
343         * {@link RecordInformationExtractor} to each key, printing out the results
344         * in order to the provided {@link PrintStream}
345         *
346         * @param extractors
347         *            the {@link RecordInformationExtractor}s to apply
348         * @param stream
349         *            the stream to write to
350         * @param delim
351         */
352        @SuppressWarnings("unchecked")
353        public void extract(List<RecordInformationExtractor> extractors, PrintStream stream, String delim) {
354                if (!isReader) {
355                        throw new UnsupportedOperationException("Cannot read keys in write mode");
356                }
357
358                Reader reader = null;
359                try {
360                        reader = createReader();
361
362                        final Class<K> keyClass = (Class<K>) reader.getKeyClass();
363                        K key = ReflectionUtils.newInstance(keyClass, config);
364                        final V val = ReflectionUtils.newInstance((Class<V>) reader.getValueClass(), config);
365                        long start = 0L;
366                        long end = 0L;
367                        int count = 0;
368                        while (reader.next(key, val)) {
369                                final long pos = reader.getPosition();
370                                if (pos != end) {
371                                        start = end;
372                                        end = pos;
373                                }
374
375                                // Apply the filters and print
376                                String recordString = "";
377                                for (final RecordInformationExtractor extractor : extractors) {
378                                        recordString += extractor.extract(key, val, start, sequenceFilePath) + delim;
379                                }
380
381                                if (recordString.length() >= delim.length())
382                                        recordString = recordString.substring(0, recordString.length() - delim.length());
383
384                                stream.println(recordString);
385                                count++;
386
387                                System.err.printf("\rOutputted: %10d", count);
388                                key = ReflectionUtils.newInstance(keyClass, config);
389                        }
390                        System.err.println();
391                } catch (final Exception e) {
392                        throw new RuntimeException(e);
393                } finally {
394                        if (reader != null)
395                                try {
396                                        reader.close();
397                                } catch (final IOException e1) {
398                                }
399                }
400        }
401
402        public SequenceFileUtility(String uriOrPath, CompressionType compressionType, Map<String, String> metadata)
403                        throws IOException
404        {
405                this.compressionType = compressionType;
406                setup(convertToURI(uriOrPath), false);
407        }
408
409        public SequenceFileUtility(URI uri, CompressionType compressionType, Map<String, String> metadata)
410                        throws IOException
411        {
412                this.compressionType = compressionType;
413                setup(uri, false);
414        }
415
416        /**
417         * Converts a string representing a file or uri to a uri object.
418         *
419         * @param uriOrPath
420         *            uri or path to convert
421         * @return uri
422         */
423        public static URI convertToURI(String uriOrPath) {
424                if (uriOrPath.contains("://")) {
425                        return URI.create(uriOrPath);
426                } else {
427                        return new File(uriOrPath).toURI();
428                }
429        }
430
431        private void setup(URI uri, boolean read) throws IOException {
432                setup(uri, read, null);
433        }
434
435        private void setup(URI uri, boolean read, Map<String, String> metadata) throws IOException {
436                fileSystem = getFileSystem(uri);
437                sequenceFilePath = new Path(uri.toString());
438
439                this.isReader = read;
440
441                if (read) {
442                        Reader reader = null;
443
444                        try {
445                                reader = createReader();
446                                final Text uuidText = reader.getMetadata().get(new Text(MetadataConfiguration.UUID_KEY));
447                                if (uuidText != null)
448                                        uuid = uuidText.toString();
449
450                                if (!reader.isCompressed())
451                                        compressionType = CompressionType.NONE;
452                                else if (reader.isBlockCompressed())
453                                        compressionType = CompressionType.BLOCK;
454                                else
455                                        compressionType = CompressionType.RECORD;
456                        } catch (final Exception e) {
457                                throw new RuntimeException(e);
458                        } finally {
459                                if (reader != null)
460                                        try {
461                                                reader.close();
462                                        } catch (final IOException e1) {
463                                        }
464                        }
465                } else {
466                        if (metadata == null) {
467                                metadata = new HashMap<String, String>();
468                        }
469
470                        if (!metadata.containsKey(MetadataConfiguration.UUID_KEY)) {
471                                uuid = UUID.randomUUID().toString();
472                                metadata.put(MetadataConfiguration.UUID_KEY, uuid);
473                        }
474
475                        // if the output directory is a directory, then create the file
476                        // inside the
477                        // directory with the name given by the uuid
478                        if (fileSystem.exists(sequenceFilePath) && fileSystem.getFileStatus(sequenceFilePath).isDirectory()) {
479                                sequenceFilePath = new Path(sequenceFilePath, uuid + ".seq");
480                        }
481
482                        writer = createWriter(metadata);
483                }
484        }
485
486        @SuppressWarnings("unchecked")
487        private Writer createWriter(Map<String, String> metadata) throws IOException {
488                final Metadata md = new Metadata();
489
490                for (final Entry<String, String> e : metadata.entrySet()) {
491                        md.set(new Text(e.getKey()), new Text(e.getValue()));
492                }
493                final Class<K> keyClass = (Class<K>) ((ParameterizedType) getClass().getGenericSuperclass())
494                                .getActualTypeArguments()[0];
495                final Class<V> valueClass = (Class<V>) ((ParameterizedType) getClass().getGenericSuperclass())
496                                .getActualTypeArguments()[1];
497
498                return SequenceFile.createWriter(fileSystem, config, sequenceFilePath, keyClass, valueClass, compressionType,
499                                new DefaultCodec(), null,
500                                md);
501        }
502
503        private Reader createReader() throws IOException {
504                // if(this.fileSystem.getFileStatus(sequenceFilePath).isDir())
505                // sequenceFilePath = new Path(sequenceFilePath,"part-r-00000");
506                return new Reader(fileSystem, sequenceFilePath, config);
507        }
508
509        /**
510         * Get the UUID of this file
511         *
512         * @return UUID
513         */
514        public String getUUID() {
515                return uuid;
516        }
517
518        /**
519         * Return the metadata map. Read mode only.
520         *
521         * @return metadata
522         */
523        public Map<Text, Text> getMetadata() {
524                if (!isReader) {
525                        throw new UnsupportedOperationException("Cannot read metadata in write mode");
526                }
527
528                Reader reader = null;
529                try {
530                        reader = createReader();
531                        final Map<Text, Text> metadata = reader.getMetadata().getMetadata();
532                        return metadata;
533                } catch (final Exception e) {
534                        throw new RuntimeException(e);
535                } finally {
536                        if (reader != null)
537                                try {
538                                        reader.close();
539                                } catch (final IOException e1) {
540                                }
541                }
542        }
543
544        /**
545         * Return a list of the keys in the sequence file. Read mode only.
546         *
547         * @return keys.
548         */
549        @SuppressWarnings("unchecked")
550        public List<K> listKeys() {
551                if (!isReader) {
552                        throw new UnsupportedOperationException("Cannot read keys in write mode");
553                }
554
555                Reader reader = null;
556                try {
557                        final List<K> keys = new ArrayList<K>();
558
559                        reader = createReader();
560                        final Class<K> keyClass = (Class<K>) reader.getKeyClass();
561                        K key = ReflectionUtils.newInstance(keyClass, config);
562
563                        while (reader.next(key)) {
564                                keys.add(key);
565                                key = ReflectionUtils.newInstance(keyClass, config);
566                        }
567
568                        return keys;
569                } catch (final Exception e) {
570                        throw new RuntimeException(e);
571                } finally {
572                        if (reader != null)
573                                try {
574                                        reader.close();
575                                } catch (final IOException e1) {
576                                }
577                }
578        }
579
580        /**
581         * Extracts file to a directory. Read mode only.
582         *
583         * @param uriOrPath
584         *            path or uri to extract to.
585         * @throws IOException
586         */
587        public void exportData(String uriOrPath) throws IOException {
588                exportData(uriOrPath, NamingStrategy.KEY, new ExtractionState(), false, 0);
589        }
590
591        /**
592         * Extracts file to a directory. Read mode only.
593         *
594         * @param uriOrPath
595         *            path or uri to extract to.
596         * @param naming
597         *            the naming strategy
598         * @param extrState
599         *            the extraction state
600         * @param addExtension
601         *            if true, then file extensions are added to each record
602         *            automatically
603         * @param offset
604         *            offset from which to start. Can be used to reduce number of
605         *            files extracted.
606         * @throws IOException
607         */
608        public void exportData(String uriOrPath, NamingStrategy naming, ExtractionState extrState, boolean addExtension,
609                        long offset)
610                        throws IOException
611        {
612                FileSystem fs = null;
613                Path p = null;
614
615                if (uriOrPath != null) {
616                        final URI uri = convertToURI(uriOrPath);
617
618                        fs = getFileSystem(uri);
619                        p = new Path(uri.toString());
620                }
621
622                exportData(fs, p, naming, extrState, addExtension, offset);
623        }
624
625        public static ZipOutputStream openZipOutputStream(String uriOrPath) throws IOException {
626                final URI uri = convertToURI(uriOrPath);
627
628                final FileSystem fs = getFileSystem(uri, new Configuration());
629                final Path path = new Path(uri.toString());
630
631                final ZipOutputStream zos = new ZipOutputStream(fs.create(path));
632                zos.setLevel(Deflater.BEST_COMPRESSION);
633                return zos;
634        }
635
636        /**
637         * Extracts file to a directory. Read mode only.
638         *
639         * @param uriOrPath
640         *            path or uri to extract to.
641         * @param naming
642         *            the naming strategy
643         * @param state
644         *            the extraction state
645         * @param addExtension
646         *            if true, then file extensions are added to each record
647         *            automatically
648         * @param offset
649         *            offset from which to start. Can be used to reduce number of
650         *            files extracted.
651         * @throws IOException
652         */
653        public void exportDataToZip(String uriOrPath, NamingStrategy naming, ExtractionState state, boolean addExtension,
654                        long offset)
655                        throws IOException
656        {
657                if (uriOrPath != null) {
658
659                        ZipOutputStream zos = null;
660                        try {
661                                zos = openZipOutputStream(uriOrPath);
662                                exportDataToZip(zos, naming, state, addExtension, offset);
663                        } finally {
664                                if (zos != null)
665                                        try {
666                                                zos.close();
667                                        } catch (final IOException e) {
668                                        }
669                                ;
670                        }
671                }
672        }
673
674        /**
675         * Extracts file to a zip file. Read mode only.
676         *
677         * @param zos
678         *            The {@link ZipOutputStream} to write to
679         * @param naming
680         *            The naming strategy
681         * @param extrState
682         *            The extration state
683         * @param addExtension
684         *            if true, then file extensions are added to each record
685         *            automatically
686         * @param offset
687         *            offset from which to start. Can be used to reduce number of
688         *            files extracted.
689         * @throws IOException
690         */
691        public void exportDataToZip(ZipOutputStream zos, NamingStrategy naming, ExtractionState extrState,
692                        boolean addExtension, long offset)
693                        throws IOException
694        {
695                if (!isReader) {
696                        throw new UnsupportedOperationException("Cannot read keys in write mode");
697                }
698
699                Reader reader = null;
700                try {
701                        reader = createReader();
702                        if (offset > 0)
703                                reader.seek(offset);
704
705                        @SuppressWarnings("unchecked")
706                        final K key = ReflectionUtils.newInstance((Class<K>) reader.getKeyClass(), config);
707                        @SuppressWarnings("unchecked")
708                        final V val = ReflectionUtils.newInstance((Class<V>) reader.getValueClass(), config);
709
710                        while (reader.next(key)) {
711
712                                if (extrState.allowNext()) {
713                                        reader.getCurrentValue(val);
714
715                                        String name = naming.getName(key, val, extrState, addExtension);
716
717                                        while (name.startsWith("/"))
718                                                name = name.substring(1);
719
720                                        final ZipEntry ze = new ZipEntry(name);
721                                        zos.putNextEntry(ze);
722                                        writeZipData(zos, val);
723                                        zos.closeEntry();
724
725                                        extrState.tick();
726                                } else {
727                                        extrState.tick();
728                                }
729                                if (extrState.isFinished())
730                                        break;
731                        }
732                } catch (final Exception e) {
733                        throw new RuntimeException(e);
734                } finally {
735                        if (reader != null)
736                                try {
737                                        reader.close();
738                                } catch (final IOException e1) {
739                                }
740                }
741        }
742
743        /**
744         * Extracts file to a directory. Read mode only.
745         *
746         * @param fs
747         *            filesystem of output file
748         * @param dirPath
749         *            path to extract to
750         */
751        public void exportData(FileSystem fs, Path dirPath) {
752                exportData(fs, dirPath, NamingStrategy.KEY, new ExtractionState(), false, 0);
753        }
754
755        /**
756         * Extracts file to a directory. Read mode only.
757         *
758         * @param fs
759         *            filesystem of output file
760         * @param dirPath
761         *            path to extract to
762         * @param naming
763         *            the naming strategy
764         * @param extrState
765         *            the extraction state
766         * @param addExtension
767         *            if true, then file extensions are added to each record
768         *            automatically
769         * @param offset
770         *            offset from which to start. Can be used to reduce number of
771         *            files extracted.
772         */
773        @SuppressWarnings("unchecked")
774        public void exportData(FileSystem fs, Path dirPath, NamingStrategy naming, ExtractionState extrState,
775                        boolean addExtension, long offset)
776        {
777                if (!isReader) {
778                        throw new UnsupportedOperationException("Cannot read keys in write mode");
779                }
780
781                Reader reader = null;
782                try {
783                        if (fs != null)
784                                fs.mkdirs(dirPath);
785
786                        reader = createReader();
787                        if (offset > 0)
788                                reader.seek(offset);
789
790                        final K key = ReflectionUtils.newInstance((Class<K>) reader.getKeyClass(), config);
791                        final V val = ReflectionUtils.newInstance((Class<V>) reader.getValueClass(), config);
792
793                        while (reader.next(key)) {
794
795                                if (extrState.allowNext()) {
796                                        reader.getCurrentValue(val);
797                                        if (dirPath != null) {
798                                                String name = naming.getName(key, val, extrState, addExtension);
799                                                if (name.startsWith("/"))
800                                                        name = "." + name;
801
802                                                final Path outFilePath = new Path(dirPath, name);
803                                                // System.out.println("NP: " + np);
804                                                System.out.println("Path: " + outFilePath);
805                                                writeFile(fs, outFilePath, val);
806                                                extrState.tick();
807                                        } else {
808                                                System.out.println(key.toString());
809                                                printFile(val);
810                                                extrState.tick();
811                                        }
812                                } else {
813                                        extrState.tick();
814                                }
815                                if (extrState.isFinished())
816                                        break;
817                        }
818                } catch (final Exception e) {
819                        throw new RuntimeException(e);
820                } finally {
821                        if (reader != null)
822                                try {
823                                        reader.close();
824                                } catch (final IOException e1) {
825                                }
826                }
827        }
828
829        @SuppressWarnings("unchecked")
830        public void exportData(NamingStrategy np, ExtractionState nps, long offset, KeyValueDump<K, V> dump) {
831                if (!isReader) {
832                        throw new UnsupportedOperationException("Cannot read keys in write mode");
833                }
834
835                Reader reader = null;
836                try {
837
838                        reader = createReader();
839                        if (offset > 0)
840                                reader.seek(offset);
841
842                        final K key = ReflectionUtils.newInstance((Class<K>) reader.getKeyClass(), config);
843                        final V val = ReflectionUtils.newInstance((Class<V>) reader.getValueClass(), config);
844
845                        while (reader.next(key)) {
846
847                                if (nps.allowNext()) {
848                                        reader.getCurrentValue(val);
849                                        dump.dumpValue(key, val);
850                                }
851                                nps.tick();
852                                if (nps.isFinished())
853                                        break;
854                        }
855                } catch (final Exception e) {
856                        throw new RuntimeException(e);
857                } finally {
858                        if (reader != null)
859                                try {
860                                        reader.close();
861                                } catch (final IOException e1) {
862                                }
863                }
864        }
865
866        /**
867         * Close the underlying writer. Does nothing in read mode.
868         *
869         * @throws IOException
870         */
871        public void close() throws IOException {
872                if (writer != null)
873                        writer.close();
874        }
875
876        /**
877         * Get number of records in file. Read mode only.
878         *
879         * @return number of records
880         */
881        public long getNumberRecords() {
882                if (!isReader) {
883                        throw new UnsupportedOperationException("Cannot read keys in write mode");
884                }
885
886                Reader reader = null;
887                try {
888                        reader = createReader();
889
890                        final Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), config);
891
892                        long count = 0;
893                        while (reader.next(key)) {
894                                count++;
895                        }
896                        return count;
897                } catch (final Exception e) {
898                        throw new RuntimeException(e);
899                } finally {
900                        if (reader != null)
901                                try {
902                                        reader.close();
903                                } catch (final IOException e1) {
904                                }
905                }
906        }
907
908        /**
909         * @return the compression codec in use for this file.
910         */
911        public Class<? extends CompressionCodec> getCompressionCodecClass() {
912                if (!isReader)
913                        return DefaultCodec.class;
914
915                Reader reader = null;
916                try {
917                        reader = createReader();
918                        if (reader.getCompressionCodec() == null)
919                                return null;
920                        return reader.getCompressionCodec().getClass();
921                } catch (final Exception e) {
922                        throw new RuntimeException(e);
923                } finally {
924                        if (reader != null)
925                                try {
926                                        reader.close();
927                                } catch (final IOException e1) {
928                                }
929                }
930        }
931
932        /**
933         * @return he compression mode used for this sequence file.
934         */
935        public CompressionType getCompressionType() {
936                return compressionType;
937        }
938
939        /**
940         * Get the filesystem associated with a uri.
941         *
942         * @param uri
943         * @return the filesystem
944         * @throws IOException
945         */
946        public FileSystem getFileSystem(URI uri) throws IOException {
947                return getFileSystem(uri, config);
948        }
949
950        /**
951         * Get the filesystem associated with a uri.
952         *
953         * @param uri
954         * @param config
955         * @return the filesystem
956         * @throws IOException
957         */
958        public static FileSystem getFileSystem(URI uri, Configuration config) throws IOException {
959                FileSystem fs = FileSystem.get(uri, config);
960                if (fs instanceof LocalFileSystem)
961                        fs = ((LocalFileSystem) fs).getRaw();
962                return fs;
963        }
964
965        /**
966         * Get a path from a uri.
967         *
968         * @param uri
969         * @return the path
970         * @throws IOException
971         */
972        public Path getPath(URI uri) throws IOException {
973                return new Path(uri.toString());
974        }
975
976        /**
977         * Get the MD5 sum of a file
978         *
979         * @param fs
980         * @param p
981         * @return the md5sum
982         */
983        public static String md5sum(FileSystem fs, Path p) {
984                MessageDigest digest;
985                try {
986                        digest = MessageDigest.getInstance("MD5");
987                } catch (final NoSuchAlgorithmException e1) {
988                        throw new RuntimeException(e1);
989                }
990
991                InputStream is = null;
992                try {
993                        final byte[] buffer = new byte[8192];
994                        int read = 0;
995
996                        is = fs.open(p);
997                        while ((read = is.read(buffer)) > 0) {
998                                digest.update(buffer, 0, read);
999                        }
1000                        final byte[] md5sum = digest.digest();
1001
1002                        final BigInteger bigInt = new BigInteger(1, md5sum);
1003                        return bigInt.toString(16);
1004                } catch (final IOException e) {
1005                        throw new RuntimeException("Unable to process file for MD5", e);
1006                } finally {
1007                        try {
1008                                if (is != null)
1009                                        is.close();
1010                        } catch (final IOException e) {
1011                        }
1012                }
1013        }
1014
1015        protected abstract V readFile(FileSystem fs, Path path) throws IOException;
1016
1017        protected abstract void writeFile(FileSystem fs, Path path, V value) throws IOException;
1018
1019        protected abstract void writeZipData(ZipOutputStream zos, V value) throws IOException;
1020
1021        protected abstract void printFile(V value) throws IOException;
1022
1023        /**
1024         * Append data read from a file to the sequence file.
1025         *
1026         * @param key
1027         * @param fs
1028         * @param p
1029         * @throws IOException
1030         */
1031        public void appendFile(K key, FileSystem fs, Path p) throws IOException {
1032                if (isReader) {
1033                        throw new UnsupportedOperationException("Cannot write data in read mode");
1034                }
1035
1036                writer.append(key, readFile(fs, p));
1037        }
1038
1039        /**
1040         * Append data to a sequence file.
1041         *
1042         * @param key
1043         * @param value
1044         * @throws IOException
1045         */
1046        public void appendData(K key, V value) throws IOException {
1047                if (isReader) {
1048                        throw new UnsupportedOperationException("Cannot write data in read mode");
1049                }
1050                writer.append(key, value);
1051        }
1052
1053        /**
1054         * Interface for objects that can make a key from a path
1055         *
1056         * @param <K>
1057         */
1058        public interface KeyProvider<K> {
1059                K getKey(FileSystem fs, Path path);
1060
1061                K getKey(FileSystem fs, Path path, Path base);
1062        }
1063
1064        /**
1065         * A class that provides Text keys by calculating a UUID from the MD5 of a
1066         * file
1067         */
1068        public static class MD5UUIDKeyProvider implements KeyProvider<Text> {
1069                @Override
1070                public Text getKey(FileSystem fs, Path path) {
1071                        final UUID uuid = UUID.nameUUIDFromBytes(SequenceFileUtility.md5sum(fs, path).getBytes());
1072                        return new Text(uuid.toString());
1073                }
1074
1075                @Override
1076                public Text getKey(FileSystem fs, Path path, Path base) {
1077                        return this.getKey(fs, path);
1078                }
1079        }
1080
1081        /**
1082         * A class that provides Text keys from the name of a file
1083         */
1084        public static class FilenameKeyProvider implements KeyProvider<Text> {
1085                @Override
1086                public Text getKey(FileSystem fs, Path path) {
1087                        return new Text(path.getName());
1088                }
1089
1090                @Override
1091                public Text getKey(FileSystem fs, Path path, Path base) {
1092                        return this.getKey(fs, path);
1093                }
1094        }
1095
1096        /**
1097         * A class that provides Text keys from the relative path + name of a file
1098         */
1099        public static class RelativePathFilenameKeyProvider implements KeyProvider<Text> {
1100                @Override
1101                public Text getKey(FileSystem fs, Path path) {
1102                        return new Text(path.toUri().getPath());
1103                }
1104
1105                @Override
1106                public Text getKey(FileSystem fs, Path path, Path base) {
1107                        return new Text(path.toUri().getPath().substring(base.toUri().getPath().length()));
1108                }
1109        }
1110
1111        /**
1112         * Append files to a sequenceFile.
1113         *
1114         * @param fs
1115         *            The filesystem of the files being added.
1116         * @param path
1117         *            The path of the file(s) being added.
1118         * @param recurse
1119         *            If true, then subdirectories are also searched
1120         * @param pathFilter
1121         *            Filter for omitting files. Can be null.
1122         * @param keyProvider
1123         *            Object that can return a key for a given file.
1124         * @return Paths and their respective keys for files that were added.
1125         * @throws IOException
1126         */
1127        public Map<Path, K> appendFiles(FileSystem fs, Path path, boolean recurse, PathFilter pathFilter,
1128                        KeyProvider<K> keyProvider)
1129                        throws IOException
1130        {
1131                final LinkedHashMap<Path, K> addedFiles = new LinkedHashMap<Path, K>();
1132                appendFiles(fs, path, recurse, pathFilter, keyProvider, addedFiles);
1133                return addedFiles;
1134        }
1135
1136        private void appendFiles(final FileSystem fs, Path path, boolean recurse, PathFilter pathFilter,
1137                        KeyProvider<K> keyProvider,
1138                        Map<Path, K> addedFiles) throws IOException
1139        {
1140                if (fs.isFile(path)) {
1141                        if (pathFilter == null || pathFilter.accept(path)) {
1142                                final K key = keyProvider.getKey(fs, path);
1143                                appendFile(key, fs, path);
1144                                addedFiles.put(path, key);
1145                        }
1146                } else if (recurse) {
1147                        // fs.listStatus(path);
1148                        final FileStatus[] status = fs.listStatus(path, new PathFilter() {
1149
1150                                @Override
1151                                public boolean accept(Path potential) {
1152                                        try {
1153                                                fs.getStatus(potential);
1154                                                return true;
1155                                        } catch (final IOException e) {
1156                                                return false;
1157                                        }
1158                                }
1159
1160                        });
1161                        for (final FileStatus stat : status) {
1162                                appendFiles(fs, stat.getPath(), path.getParent(), pathFilter, keyProvider, addedFiles);
1163                        }
1164                }
1165        }
1166
1167        private void appendFiles(FileSystem fs, Path path, Path base, PathFilter pathFilter, KeyProvider<K> keyProvider,
1168                        Map<Path, K> addedFiles)
1169                        throws IOException
1170        {
1171                if (fs.isFile(path)) {
1172                        if (pathFilter == null || pathFilter.accept(path)) {
1173                                final K key = keyProvider.getKey(fs, path, base);
1174                                appendFile(key, fs, path);
1175                                addedFiles.put(path, key);
1176                        }
1177                } else {
1178                        try {
1179                                final FileStatus[] status = fs.listStatus(path);
1180
1181                                for (final FileStatus stat : status) {
1182                                        appendFiles(fs, stat.getPath(), base, pathFilter, keyProvider, addedFiles);
1183                                }
1184                        } catch (final Throwable e) {
1185                                System.err.println("Failed listing status on path: " + path);
1186                        }
1187                }
1188        }
1189
1190        public void writePathMap(Map<Path, K> map) throws IOException {
1191                final Path p = new Path(sequenceFilePath.getParent(), sequenceFilePath.getName().substring(0,
1192                                sequenceFilePath.getName().lastIndexOf("."))
1193                                + "-map.txt");
1194                FSDataOutputStream dos = null;
1195                PrintWriter pw = null;
1196
1197                try {
1198                        dos = fileSystem.create(p);
1199                        pw = new PrintWriter(dos);
1200
1201                        for (final Entry<Path, K> e : map.entrySet()) {
1202                                pw.println(e.getValue() + " " + e.getKey());
1203                        }
1204                } finally {
1205                        if (pw != null)
1206                                pw.close();
1207                        if (dos != null)
1208                                try {
1209                                        dos.close();
1210                                } catch (final IOException e) {
1211                                }
1212                }
1213        }
1214
1215        /**
1216         * Search for the record identified by queryKey.
1217         *
1218         * @param queryKey
1219         *            the key.
1220         * @param offset
1221         *            the offset from which to commence search
1222         * @return the found value, or null.
1223         */
1224        @SuppressWarnings("unchecked")
1225        public V find(K queryKey, long offset) {
1226                if (!isReader) {
1227                        throw new UnsupportedOperationException("Cannot find key in write mode");
1228                }
1229
1230                Reader reader = null;
1231                try {
1232                        reader = createReader();
1233                        if (offset > 0)
1234                                reader.seek(offset);
1235
1236                        final K key = ReflectionUtils.newInstance((Class<K>) reader.getKeyClass(), config);
1237
1238                        while (reader.next(key)) {
1239                                // System.out.println(key);
1240                                if (key.equals(queryKey)) {
1241                                        final V val = ReflectionUtils.newInstance((Class<V>) reader.getValueClass(), config);
1242
1243                                        reader.getCurrentValue(val);
1244
1245                                        return val;
1246                                }
1247                        }
1248                        return null;
1249                } catch (final Exception e) {
1250                        throw new RuntimeException(e);
1251                } finally {
1252                        if (reader != null)
1253                                try {
1254                                        reader.close();
1255                                } catch (final IOException e1) {
1256                                }
1257                }
1258        }
1259
1260        /**
1261         * Search for the record identified by queryKey. Uses a linear search from
1262         * the beginning of the file.
1263         *
1264         * @param queryKey
1265         * @return the found value, or null.
1266         */
1267        public V find(K queryKey) {
1268                return find(queryKey, 0);
1269        }
1270
1271        /**
1272         * Find a record and write the value to a file.
1273         *
1274         * @param key
1275         * @param uriOrPath
1276         * @param offset
1277         * @return false if record not found, true otherwise.
1278         * @throws IOException
1279         */
1280        public boolean findAndExport(K key, String uriOrPath, long offset) throws IOException {
1281                FileSystem fs = null;
1282                Path p = null;
1283
1284                if (uriOrPath != null) {
1285                        final URI uri = convertToURI(uriOrPath);
1286
1287                        fs = getFileSystem(uri);
1288                        p = new Path(uri.toString());
1289                }
1290
1291                return findAndExport(key, fs, p, offset);
1292        }
1293
1294        /**
1295         * Find a record and write the value to a file.
1296         *
1297         * @param key
1298         * @param fs
1299         * @param dirPath
1300         * @param offset
1301         * @return false if record not found, true otherwise.
1302         * @throws IOException
1303         */
1304        public boolean findAndExport(K key, FileSystem fs, Path dirPath, long offset) throws IOException {
1305                final V value = find(key, offset);
1306
1307                if (value == null)
1308                        return false;
1309
1310                if (fs != null && fs != null) {
1311                        final Path outFilePath = new Path(dirPath, key.toString());
1312                        writeFile(fs, outFilePath, value);
1313                } else {
1314                        printFile(value);
1315                }
1316
1317                return true;
1318        }
1319
1320        public Path getSequenceFilePath() {
1321                return sequenceFilePath;
1322        }
1323
1324        class SequenceFileEntry implements Entry<K, V> {
1325                K key;
1326                V value;
1327
1328                public SequenceFileEntry(K k, V v) {
1329                        key = k;
1330                        value = v;
1331                }
1332
1333                @Override
1334                public K getKey() {
1335                        return key;
1336                }
1337
1338                @Override
1339                public V getValue() {
1340                        return value;
1341                }
1342
1343                @Override
1344                public V setValue(V value) {
1345                        this.value = value;
1346                        return value;
1347                }
1348        }
1349
1350        class SequenceFileIterator implements Iterator<Entry<K, V>> {
1351                Reader reader = null;
1352                Entry<K, V> next;
1353                boolean shouldMove = true;
1354
1355                @SuppressWarnings("unchecked")
1356                public SequenceFileIterator() {
1357                        try {
1358                                reader = createReader();
1359
1360                                next = new SequenceFileEntry(ReflectionUtils.newInstance((Class<K>) reader.getKeyClass(), config),
1361                                                ReflectionUtils.newInstance(
1362                                                                (Class<V>) reader.getValueClass(), config));
1363                        } catch (final IOException e) {
1364                                throw new RuntimeException(e);
1365                        }
1366                }
1367
1368                @Override
1369                public boolean hasNext() {
1370                        tryGetNext();
1371                        return next != null;
1372                }
1373
1374                private void tryGetNext() {
1375                        if (next != null && shouldMove) {
1376                                shouldMove = false;
1377                                try {
1378                                        if (!reader.next(next.getKey(), next.getValue())) {
1379                                                next = null;
1380                                                try {
1381                                                        reader.close();
1382                                                } catch (final IOException e1) {
1383                                                }
1384                                        }
1385                                } catch (final IOException e) {
1386                                        try {
1387                                                reader.close();
1388                                        } catch (final IOException e1) {
1389                                        }
1390                                        throw new RuntimeException(e);
1391                                }
1392                        }
1393                }
1394
1395                @Override
1396                public Entry<K, V> next() {
1397                        tryGetNext();
1398
1399                        if (next == null) {
1400                                throw new NoSuchElementException();
1401                        }
1402                        shouldMove = true;
1403                        return next;
1404                }
1405
1406                @Override
1407                public void remove() {
1408                }
1409        }
1410
1411        @Override
1412        public Iterator<Entry<K, V>> iterator() {
1413                if (!isReader) {
1414                        throw new UnsupportedOperationException("Cannot iterate in write mode");
1415                }
1416
1417                return new SequenceFileIterator();
1418        }
1419}