001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.hadoop.sequencefile; 031 032import java.io.File; 033import java.io.IOException; 034import java.io.InputStream; 035import java.io.PrintStream; 036import java.io.PrintWriter; 037import java.lang.reflect.ParameterizedType; 038import java.math.BigInteger; 039import java.net.URI; 040import java.security.MessageDigest; 041import java.security.NoSuchAlgorithmException; 042import java.util.ArrayList; 043import java.util.HashMap; 044import java.util.Iterator; 045import java.util.LinkedHashMap; 046import java.util.List; 047import java.util.Map; 048import java.util.Map.Entry; 049import java.util.NoSuchElementException; 050import java.util.UUID; 051import java.util.zip.Deflater; 052import java.util.zip.ZipEntry; 053import java.util.zip.ZipOutputStream; 054 055import org.apache.hadoop.conf.Configuration; 056import org.apache.hadoop.fs.FSDataOutputStream; 057import org.apache.hadoop.fs.FileStatus; 058import org.apache.hadoop.fs.FileSystem; 059import org.apache.hadoop.fs.LocalFileSystem; 060import org.apache.hadoop.fs.Path; 061import org.apache.hadoop.fs.PathFilter; 062import org.apache.hadoop.io.SequenceFile; 063import org.apache.hadoop.io.SequenceFile.CompressionType; 064import org.apache.hadoop.io.SequenceFile.Metadata; 065import org.apache.hadoop.io.SequenceFile.Reader; 066import org.apache.hadoop.io.SequenceFile.Writer; 067import org.apache.hadoop.io.Text; 068import org.apache.hadoop.io.Writable; 069import org.apache.hadoop.io.compress.CompressionCodec; 070import org.apache.hadoop.io.compress.DefaultCodec; 071import org.apache.hadoop.util.ReflectionUtils; 072 073/** 074 * Base class for a utility class that deals with specifically typed sequence 075 * files. 076 * 077 * @author Jonathon Hare (jsh2@ecs.soton.ac.uk) 078 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 079 * 080 * @param <K> 081 * Key type 082 * @param <V> 083 * Value type 084 */ 085public abstract class SequenceFileUtility<K extends Writable, V extends Writable> implements Iterable<Entry<K, V>> { 086 protected Configuration config = new Configuration(); 087 protected FileSystem fileSystem; 088 protected Path sequenceFilePath; 089 090 protected Writer writer; 091 protected CompressionType compressionType = CompressionType.BLOCK; 092 093 protected boolean isReader; 094 095 protected String uuid; 096 097 public SequenceFileUtility(String uriOrPath, boolean read) throws IOException { 098 setup(convertToURI(uriOrPath), read); 099 } 100 101 public SequenceFileUtility(URI uri, boolean read) throws IOException { 102 setup(uri, read); 103 } 104 105 public SequenceFileUtility(String uriOrPath, CompressionType compressionType) throws IOException { 106 this.compressionType = compressionType; 107 setup(convertToURI(uriOrPath), false); 108 } 109 110 public SequenceFileUtility(URI uri, CompressionType compressionType) throws IOException { 111 this.compressionType = compressionType; 112 setup(uri, false); 113 } 114 115 /** 116 * Get a list of all the reducer outputs in a directory. If the given 117 * path/uri is not a directory, then it is assumed that it is a SequenceFile 118 * and returned directly. 119 * 120 * @param uriOrPath 121 * the path or uri 122 * @return the reducer outputs 123 * @throws IOException 124 */ 125 public static URI[] getReducerFiles(String uriOrPath) throws IOException { 126 return getFiles(uriOrPath, "part-r-"); 127 } 128 129 /** 130 * Get a list of all the sequence files (with a given name prefix) in a 131 * directory. If the given uri is not a directory, then it is assumed that 132 * it is a SequenceFile and returned directly. 133 * 134 * @param uriOrPath 135 * the path or uri 136 * @param filenamePrefix 137 * the prefix of the file name 138 * @return the matching files 139 * @throws IOException 140 */ 141 public static URI[] getFiles(String uriOrPath, final String filenamePrefix) throws IOException { 142 final Configuration config = new Configuration(); 143 final URI uri = convertToURI(uriOrPath); 144 final FileSystem fs = FileSystem.get(uri, config); 145 final Path path = new Path(uri.toString()); 146 147 if (fs.getFileStatus(path).isDirectory()) { 148 final FileStatus[] files = fs.listStatus(path, new PathFilter() { 149 @Override 150 public boolean accept(Path p) { 151 return p.getName().startsWith(filenamePrefix); 152 } 153 }); 154 155 final URI[] uris = new URI[files.length]; 156 int i = 0; 157 for (final FileStatus status : files) { 158 uris[i++] = status.getPath().toUri(); 159 } 160 return uris; 161 } else { 162 return new URI[] { uri }; 163 } 164 } 165 166 /** 167 * Get a list of all the sequence files (with a given name prefix) in the 168 * set of input paths. If a given uri is not a directory, then it is assumed 169 * that it is a SequenceFile and returned directly. 170 * 171 * @param uriOrPaths 172 * the paths or uris 173 * @param filenamePrefix 174 * the prefix of the file name 175 * @return the list of sequence files 176 * @throws IOException 177 */ 178 public static Path[] getFilePaths(String[] uriOrPaths, String filenamePrefix) throws IOException { 179 final List<Path> pathList = new ArrayList<Path>(); 180 for (final String uriOrPath : uriOrPaths) { 181 final Path[] paths = getFilePaths(uriOrPath, filenamePrefix); 182 for (final Path path : paths) { 183 pathList.add(path); 184 } 185 } 186 return pathList.toArray(new Path[pathList.size()]); 187 } 188 189 /** 190 * Get a list of all the sequence files (with a given name prefix) in the 191 * set of input paths. 192 * <p> 193 * Optionally a subdirectory can be provided; if provided the subdirectory 194 * is appended to each path (i.e. PATH/subdirectory). 195 * <p> 196 * If the given uri is not a directory, then it is assumed that it is a 197 * single SequenceFile and returned directly. 198 * 199 * @param uriOrPaths 200 * the URI or path to the directory/file 201 * @param subdir 202 * the optional subdirectory (may be null) 203 * @param filenamePrefix 204 * the prefix of the file name 205 * @return the list of sequence files 206 * @throws IOException 207 */ 208 public static Path[] getFilePaths(String[] uriOrPaths, String subdir, String filenamePrefix) throws IOException { 209 final List<Path> pathList = new ArrayList<Path>(); 210 211 for (String uriOrPath : uriOrPaths) { 212 if (subdir != null) 213 uriOrPath += "/" + subdir; 214 215 final Path[] paths = getFilePaths(uriOrPath, filenamePrefix); 216 for (final Path path : paths) { 217 pathList.add(path); 218 } 219 } 220 return pathList.toArray(new Path[pathList.size()]); 221 } 222 223 /** 224 * Get a list of all the sequence files (with a given name prefix) in a 225 * directory. If the given uri is not a directory, then it is assumed that 226 * it is a SequenceFile and returned directly. 227 * 228 * @param uriOrPath 229 * the path or uri 230 * @param filenamePrefix 231 * the prefix of the file name 232 * @return the list of sequence files 233 * @throws IOException 234 */ 235 public static Path[] getFilePaths(String uriOrPath, final String filenamePrefix) throws IOException { 236 final Configuration config = new Configuration(); 237 final URI uri = convertToURI(uriOrPath); 238 final FileSystem fs = FileSystem.get(uri, config); 239 240 final Path path = new Path(uri); 241 242 if (fs.getFileStatus(path).isDirectory()) { 243 final FileStatus[] files = fs.listStatus(path, new PathFilter() { 244 @Override 245 public boolean accept(Path p) { 246 return p.getName().startsWith(filenamePrefix); 247 } 248 }); 249 250 final Path[] uris = new Path[files.length]; 251 int i = 0; 252 for (final FileStatus status : files) { 253 uris[i++] = status.getPath(); 254 } 255 return uris; 256 } else { 257 return new Path[] { path }; 258 } 259 } 260 261 /** 262 * Get a list of all the sequence files whose names match the given regular 263 * expression in a directory. If the given uri is not a directory, then it 264 * is assumed that it is a SequenceFile and returned directly. 265 * 266 * @param uriOrPath 267 * the path or uri 268 * @param regex 269 * the regular expression to match 270 * @return a list of files 271 * @throws IOException 272 */ 273 public static URI[] getFilesRegex(String uriOrPath, final String regex) throws IOException { 274 final Configuration config = new Configuration(); 275 final URI uri = convertToURI(uriOrPath); 276 final FileSystem fs = FileSystem.get(uri, config); 277 final Path path = new Path(uri.toString()); 278 279 if (fs.getFileStatus(path).isDirectory()) { 280 final FileStatus[] files = fs.listStatus(path, new PathFilter() { 281 @Override 282 public boolean accept(Path p) { 283 return (regex == null || p.getName().matches(regex)); 284 } 285 }); 286 287 final URI[] uris = new URI[files.length]; 288 int i = 0; 289 for (final FileStatus status : files) { 290 uris[i++] = status.getPath().toUri(); 291 } 292 return uris; 293 } else { 294 return new URI[] { uri }; 295 } 296 } 297 298 /** 299 * Return a list of the keys in the sequence file. Read mode only. 300 * 301 * @return keys. 302 */ 303 @SuppressWarnings("unchecked") 304 public Map<K, Long> listKeysAndOffsets() { 305 if (!isReader) { 306 throw new UnsupportedOperationException("Cannot read keys in write mode"); 307 } 308 309 Reader reader = null; 310 try { 311 final Map<K, Long> keys = new LinkedHashMap<K, Long>(); 312 313 reader = createReader(); 314 final Class<K> keyClass = (Class<K>) reader.getKeyClass(); 315 K key = ReflectionUtils.newInstance(keyClass, config); 316 final V val = ReflectionUtils.newInstance((Class<V>) reader.getValueClass(), config); 317 long start = 0L; 318 long end = 0L; 319 while (reader.next(key, val)) { 320 final long pos = reader.getPosition(); 321 if (pos != end) { 322 start = end; 323 end = pos; 324 } 325 keys.put(key, start); 326 key = ReflectionUtils.newInstance(keyClass, config); 327 } 328 329 return keys; 330 } catch (final Exception e) { 331 throw new RuntimeException(e); 332 } finally { 333 if (reader != null) 334 try { 335 reader.close(); 336 } catch (final IOException e1) { 337 } 338 } 339 } 340 341 /** 342 * Go through a sequence file, applying each 343 * {@link RecordInformationExtractor} to each key, printing out the results 344 * in order to the provided {@link PrintStream} 345 * 346 * @param extractors 347 * the {@link RecordInformationExtractor}s to apply 348 * @param stream 349 * the stream to write to 350 * @param delim 351 */ 352 @SuppressWarnings("unchecked") 353 public void extract(List<RecordInformationExtractor> extractors, PrintStream stream, String delim) { 354 if (!isReader) { 355 throw new UnsupportedOperationException("Cannot read keys in write mode"); 356 } 357 358 Reader reader = null; 359 try { 360 reader = createReader(); 361 362 final Class<K> keyClass = (Class<K>) reader.getKeyClass(); 363 K key = ReflectionUtils.newInstance(keyClass, config); 364 final V val = ReflectionUtils.newInstance((Class<V>) reader.getValueClass(), config); 365 long start = 0L; 366 long end = 0L; 367 int count = 0; 368 while (reader.next(key, val)) { 369 final long pos = reader.getPosition(); 370 if (pos != end) { 371 start = end; 372 end = pos; 373 } 374 375 // Apply the filters and print 376 String recordString = ""; 377 for (final RecordInformationExtractor extractor : extractors) { 378 recordString += extractor.extract(key, val, start, sequenceFilePath) + delim; 379 } 380 381 if (recordString.length() >= delim.length()) 382 recordString = recordString.substring(0, recordString.length() - delim.length()); 383 384 stream.println(recordString); 385 count++; 386 387 System.err.printf("\rOutputted: %10d", count); 388 key = ReflectionUtils.newInstance(keyClass, config); 389 } 390 System.err.println(); 391 } catch (final Exception e) { 392 throw new RuntimeException(e); 393 } finally { 394 if (reader != null) 395 try { 396 reader.close(); 397 } catch (final IOException e1) { 398 } 399 } 400 } 401 402 public SequenceFileUtility(String uriOrPath, CompressionType compressionType, Map<String, String> metadata) 403 throws IOException 404 { 405 this.compressionType = compressionType; 406 setup(convertToURI(uriOrPath), false); 407 } 408 409 public SequenceFileUtility(URI uri, CompressionType compressionType, Map<String, String> metadata) 410 throws IOException 411 { 412 this.compressionType = compressionType; 413 setup(uri, false); 414 } 415 416 /** 417 * Converts a string representing a file or uri to a uri object. 418 * 419 * @param uriOrPath 420 * uri or path to convert 421 * @return uri 422 */ 423 public static URI convertToURI(String uriOrPath) { 424 if (uriOrPath.contains("://")) { 425 return URI.create(uriOrPath); 426 } else { 427 return new File(uriOrPath).toURI(); 428 } 429 } 430 431 private void setup(URI uri, boolean read) throws IOException { 432 setup(uri, read, null); 433 } 434 435 private void setup(URI uri, boolean read, Map<String, String> metadata) throws IOException { 436 fileSystem = getFileSystem(uri); 437 sequenceFilePath = new Path(uri.toString()); 438 439 this.isReader = read; 440 441 if (read) { 442 Reader reader = null; 443 444 try { 445 reader = createReader(); 446 final Text uuidText = reader.getMetadata().get(new Text(MetadataConfiguration.UUID_KEY)); 447 if (uuidText != null) 448 uuid = uuidText.toString(); 449 450 if (!reader.isCompressed()) 451 compressionType = CompressionType.NONE; 452 else if (reader.isBlockCompressed()) 453 compressionType = CompressionType.BLOCK; 454 else 455 compressionType = CompressionType.RECORD; 456 } catch (final Exception e) { 457 throw new RuntimeException(e); 458 } finally { 459 if (reader != null) 460 try { 461 reader.close(); 462 } catch (final IOException e1) { 463 } 464 } 465 } else { 466 if (metadata == null) { 467 metadata = new HashMap<String, String>(); 468 } 469 470 if (!metadata.containsKey(MetadataConfiguration.UUID_KEY)) { 471 uuid = UUID.randomUUID().toString(); 472 metadata.put(MetadataConfiguration.UUID_KEY, uuid); 473 } 474 475 // if the output directory is a directory, then create the file 476 // inside the 477 // directory with the name given by the uuid 478 if (fileSystem.exists(sequenceFilePath) && fileSystem.getFileStatus(sequenceFilePath).isDirectory()) { 479 sequenceFilePath = new Path(sequenceFilePath, uuid + ".seq"); 480 } 481 482 writer = createWriter(metadata); 483 } 484 } 485 486 @SuppressWarnings("unchecked") 487 private Writer createWriter(Map<String, String> metadata) throws IOException { 488 final Metadata md = new Metadata(); 489 490 for (final Entry<String, String> e : metadata.entrySet()) { 491 md.set(new Text(e.getKey()), new Text(e.getValue())); 492 } 493 final Class<K> keyClass = (Class<K>) ((ParameterizedType) getClass().getGenericSuperclass()) 494 .getActualTypeArguments()[0]; 495 final Class<V> valueClass = (Class<V>) ((ParameterizedType) getClass().getGenericSuperclass()) 496 .getActualTypeArguments()[1]; 497 498 return SequenceFile.createWriter(fileSystem, config, sequenceFilePath, keyClass, valueClass, compressionType, 499 new DefaultCodec(), null, 500 md); 501 } 502 503 private Reader createReader() throws IOException { 504 // if(this.fileSystem.getFileStatus(sequenceFilePath).isDir()) 505 // sequenceFilePath = new Path(sequenceFilePath,"part-r-00000"); 506 return new Reader(fileSystem, sequenceFilePath, config); 507 } 508 509 /** 510 * Get the UUID of this file 511 * 512 * @return UUID 513 */ 514 public String getUUID() { 515 return uuid; 516 } 517 518 /** 519 * Return the metadata map. Read mode only. 520 * 521 * @return metadata 522 */ 523 public Map<Text, Text> getMetadata() { 524 if (!isReader) { 525 throw new UnsupportedOperationException("Cannot read metadata in write mode"); 526 } 527 528 Reader reader = null; 529 try { 530 reader = createReader(); 531 final Map<Text, Text> metadata = reader.getMetadata().getMetadata(); 532 return metadata; 533 } catch (final Exception e) { 534 throw new RuntimeException(e); 535 } finally { 536 if (reader != null) 537 try { 538 reader.close(); 539 } catch (final IOException e1) { 540 } 541 } 542 } 543 544 /** 545 * Return a list of the keys in the sequence file. Read mode only. 546 * 547 * @return keys. 548 */ 549 @SuppressWarnings("unchecked") 550 public List<K> listKeys() { 551 if (!isReader) { 552 throw new UnsupportedOperationException("Cannot read keys in write mode"); 553 } 554 555 Reader reader = null; 556 try { 557 final List<K> keys = new ArrayList<K>(); 558 559 reader = createReader(); 560 final Class<K> keyClass = (Class<K>) reader.getKeyClass(); 561 K key = ReflectionUtils.newInstance(keyClass, config); 562 563 while (reader.next(key)) { 564 keys.add(key); 565 key = ReflectionUtils.newInstance(keyClass, config); 566 } 567 568 return keys; 569 } catch (final Exception e) { 570 throw new RuntimeException(e); 571 } finally { 572 if (reader != null) 573 try { 574 reader.close(); 575 } catch (final IOException e1) { 576 } 577 } 578 } 579 580 /** 581 * Extracts file to a directory. Read mode only. 582 * 583 * @param uriOrPath 584 * path or uri to extract to. 585 * @throws IOException 586 */ 587 public void exportData(String uriOrPath) throws IOException { 588 exportData(uriOrPath, NamingStrategy.KEY, new ExtractionState(), false, 0); 589 } 590 591 /** 592 * Extracts file to a directory. Read mode only. 593 * 594 * @param uriOrPath 595 * path or uri to extract to. 596 * @param naming 597 * the naming strategy 598 * @param extrState 599 * the extraction state 600 * @param addExtension 601 * if true, then file extensions are added to each record 602 * automatically 603 * @param offset 604 * offset from which to start. Can be used to reduce number of 605 * files extracted. 606 * @throws IOException 607 */ 608 public void exportData(String uriOrPath, NamingStrategy naming, ExtractionState extrState, boolean addExtension, 609 long offset) 610 throws IOException 611 { 612 FileSystem fs = null; 613 Path p = null; 614 615 if (uriOrPath != null) { 616 final URI uri = convertToURI(uriOrPath); 617 618 fs = getFileSystem(uri); 619 p = new Path(uri.toString()); 620 } 621 622 exportData(fs, p, naming, extrState, addExtension, offset); 623 } 624 625 public static ZipOutputStream openZipOutputStream(String uriOrPath) throws IOException { 626 final URI uri = convertToURI(uriOrPath); 627 628 final FileSystem fs = getFileSystem(uri, new Configuration()); 629 final Path path = new Path(uri.toString()); 630 631 final ZipOutputStream zos = new ZipOutputStream(fs.create(path)); 632 zos.setLevel(Deflater.BEST_COMPRESSION); 633 return zos; 634 } 635 636 /** 637 * Extracts file to a directory. Read mode only. 638 * 639 * @param uriOrPath 640 * path or uri to extract to. 641 * @param naming 642 * the naming strategy 643 * @param state 644 * the extraction state 645 * @param addExtension 646 * if true, then file extensions are added to each record 647 * automatically 648 * @param offset 649 * offset from which to start. Can be used to reduce number of 650 * files extracted. 651 * @throws IOException 652 */ 653 public void exportDataToZip(String uriOrPath, NamingStrategy naming, ExtractionState state, boolean addExtension, 654 long offset) 655 throws IOException 656 { 657 if (uriOrPath != null) { 658 659 ZipOutputStream zos = null; 660 try { 661 zos = openZipOutputStream(uriOrPath); 662 exportDataToZip(zos, naming, state, addExtension, offset); 663 } finally { 664 if (zos != null) 665 try { 666 zos.close(); 667 } catch (final IOException e) { 668 } 669 ; 670 } 671 } 672 } 673 674 /** 675 * Extracts file to a zip file. Read mode only. 676 * 677 * @param zos 678 * The {@link ZipOutputStream} to write to 679 * @param naming 680 * The naming strategy 681 * @param extrState 682 * The extration state 683 * @param addExtension 684 * if true, then file extensions are added to each record 685 * automatically 686 * @param offset 687 * offset from which to start. Can be used to reduce number of 688 * files extracted. 689 * @throws IOException 690 */ 691 public void exportDataToZip(ZipOutputStream zos, NamingStrategy naming, ExtractionState extrState, 692 boolean addExtension, long offset) 693 throws IOException 694 { 695 if (!isReader) { 696 throw new UnsupportedOperationException("Cannot read keys in write mode"); 697 } 698 699 Reader reader = null; 700 try { 701 reader = createReader(); 702 if (offset > 0) 703 reader.seek(offset); 704 705 @SuppressWarnings("unchecked") 706 final K key = ReflectionUtils.newInstance((Class<K>) reader.getKeyClass(), config); 707 @SuppressWarnings("unchecked") 708 final V val = ReflectionUtils.newInstance((Class<V>) reader.getValueClass(), config); 709 710 while (reader.next(key)) { 711 712 if (extrState.allowNext()) { 713 reader.getCurrentValue(val); 714 715 String name = naming.getName(key, val, extrState, addExtension); 716 717 while (name.startsWith("/")) 718 name = name.substring(1); 719 720 final ZipEntry ze = new ZipEntry(name); 721 zos.putNextEntry(ze); 722 writeZipData(zos, val); 723 zos.closeEntry(); 724 725 extrState.tick(); 726 } else { 727 extrState.tick(); 728 } 729 if (extrState.isFinished()) 730 break; 731 } 732 } catch (final Exception e) { 733 throw new RuntimeException(e); 734 } finally { 735 if (reader != null) 736 try { 737 reader.close(); 738 } catch (final IOException e1) { 739 } 740 } 741 } 742 743 /** 744 * Extracts file to a directory. Read mode only. 745 * 746 * @param fs 747 * filesystem of output file 748 * @param dirPath 749 * path to extract to 750 */ 751 public void exportData(FileSystem fs, Path dirPath) { 752 exportData(fs, dirPath, NamingStrategy.KEY, new ExtractionState(), false, 0); 753 } 754 755 /** 756 * Extracts file to a directory. Read mode only. 757 * 758 * @param fs 759 * filesystem of output file 760 * @param dirPath 761 * path to extract to 762 * @param naming 763 * the naming strategy 764 * @param extrState 765 * the extraction state 766 * @param addExtension 767 * if true, then file extensions are added to each record 768 * automatically 769 * @param offset 770 * offset from which to start. Can be used to reduce number of 771 * files extracted. 772 */ 773 @SuppressWarnings("unchecked") 774 public void exportData(FileSystem fs, Path dirPath, NamingStrategy naming, ExtractionState extrState, 775 boolean addExtension, long offset) 776 { 777 if (!isReader) { 778 throw new UnsupportedOperationException("Cannot read keys in write mode"); 779 } 780 781 Reader reader = null; 782 try { 783 if (fs != null) 784 fs.mkdirs(dirPath); 785 786 reader = createReader(); 787 if (offset > 0) 788 reader.seek(offset); 789 790 final K key = ReflectionUtils.newInstance((Class<K>) reader.getKeyClass(), config); 791 final V val = ReflectionUtils.newInstance((Class<V>) reader.getValueClass(), config); 792 793 while (reader.next(key)) { 794 795 if (extrState.allowNext()) { 796 reader.getCurrentValue(val); 797 if (dirPath != null) { 798 String name = naming.getName(key, val, extrState, addExtension); 799 if (name.startsWith("/")) 800 name = "." + name; 801 802 final Path outFilePath = new Path(dirPath, name); 803 // System.out.println("NP: " + np); 804 System.out.println("Path: " + outFilePath); 805 writeFile(fs, outFilePath, val); 806 extrState.tick(); 807 } else { 808 System.out.println(key.toString()); 809 printFile(val); 810 extrState.tick(); 811 } 812 } else { 813 extrState.tick(); 814 } 815 if (extrState.isFinished()) 816 break; 817 } 818 } catch (final Exception e) { 819 throw new RuntimeException(e); 820 } finally { 821 if (reader != null) 822 try { 823 reader.close(); 824 } catch (final IOException e1) { 825 } 826 } 827 } 828 829 @SuppressWarnings("unchecked") 830 public void exportData(NamingStrategy np, ExtractionState nps, long offset, KeyValueDump<K, V> dump) { 831 if (!isReader) { 832 throw new UnsupportedOperationException("Cannot read keys in write mode"); 833 } 834 835 Reader reader = null; 836 try { 837 838 reader = createReader(); 839 if (offset > 0) 840 reader.seek(offset); 841 842 final K key = ReflectionUtils.newInstance((Class<K>) reader.getKeyClass(), config); 843 final V val = ReflectionUtils.newInstance((Class<V>) reader.getValueClass(), config); 844 845 while (reader.next(key)) { 846 847 if (nps.allowNext()) { 848 reader.getCurrentValue(val); 849 dump.dumpValue(key, val); 850 } 851 nps.tick(); 852 if (nps.isFinished()) 853 break; 854 } 855 } catch (final Exception e) { 856 throw new RuntimeException(e); 857 } finally { 858 if (reader != null) 859 try { 860 reader.close(); 861 } catch (final IOException e1) { 862 } 863 } 864 } 865 866 /** 867 * Close the underlying writer. Does nothing in read mode. 868 * 869 * @throws IOException 870 */ 871 public void close() throws IOException { 872 if (writer != null) 873 writer.close(); 874 } 875 876 /** 877 * Get number of records in file. Read mode only. 878 * 879 * @return number of records 880 */ 881 public long getNumberRecords() { 882 if (!isReader) { 883 throw new UnsupportedOperationException("Cannot read keys in write mode"); 884 } 885 886 Reader reader = null; 887 try { 888 reader = createReader(); 889 890 final Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), config); 891 892 long count = 0; 893 while (reader.next(key)) { 894 count++; 895 } 896 return count; 897 } catch (final Exception e) { 898 throw new RuntimeException(e); 899 } finally { 900 if (reader != null) 901 try { 902 reader.close(); 903 } catch (final IOException e1) { 904 } 905 } 906 } 907 908 /** 909 * @return the compression codec in use for this file. 910 */ 911 public Class<? extends CompressionCodec> getCompressionCodecClass() { 912 if (!isReader) 913 return DefaultCodec.class; 914 915 Reader reader = null; 916 try { 917 reader = createReader(); 918 if (reader.getCompressionCodec() == null) 919 return null; 920 return reader.getCompressionCodec().getClass(); 921 } catch (final Exception e) { 922 throw new RuntimeException(e); 923 } finally { 924 if (reader != null) 925 try { 926 reader.close(); 927 } catch (final IOException e1) { 928 } 929 } 930 } 931 932 /** 933 * @return he compression mode used for this sequence file. 934 */ 935 public CompressionType getCompressionType() { 936 return compressionType; 937 } 938 939 /** 940 * Get the filesystem associated with a uri. 941 * 942 * @param uri 943 * @return the filesystem 944 * @throws IOException 945 */ 946 public FileSystem getFileSystem(URI uri) throws IOException { 947 return getFileSystem(uri, config); 948 } 949 950 /** 951 * Get the filesystem associated with a uri. 952 * 953 * @param uri 954 * @param config 955 * @return the filesystem 956 * @throws IOException 957 */ 958 public static FileSystem getFileSystem(URI uri, Configuration config) throws IOException { 959 FileSystem fs = FileSystem.get(uri, config); 960 if (fs instanceof LocalFileSystem) 961 fs = ((LocalFileSystem) fs).getRaw(); 962 return fs; 963 } 964 965 /** 966 * Get a path from a uri. 967 * 968 * @param uri 969 * @return the path 970 * @throws IOException 971 */ 972 public Path getPath(URI uri) throws IOException { 973 return new Path(uri.toString()); 974 } 975 976 /** 977 * Get the MD5 sum of a file 978 * 979 * @param fs 980 * @param p 981 * @return the md5sum 982 */ 983 public static String md5sum(FileSystem fs, Path p) { 984 MessageDigest digest; 985 try { 986 digest = MessageDigest.getInstance("MD5"); 987 } catch (final NoSuchAlgorithmException e1) { 988 throw new RuntimeException(e1); 989 } 990 991 InputStream is = null; 992 try { 993 final byte[] buffer = new byte[8192]; 994 int read = 0; 995 996 is = fs.open(p); 997 while ((read = is.read(buffer)) > 0) { 998 digest.update(buffer, 0, read); 999 } 1000 final byte[] md5sum = digest.digest(); 1001 1002 final BigInteger bigInt = new BigInteger(1, md5sum); 1003 return bigInt.toString(16); 1004 } catch (final IOException e) { 1005 throw new RuntimeException("Unable to process file for MD5", e); 1006 } finally { 1007 try { 1008 if (is != null) 1009 is.close(); 1010 } catch (final IOException e) { 1011 } 1012 } 1013 } 1014 1015 protected abstract V readFile(FileSystem fs, Path path) throws IOException; 1016 1017 protected abstract void writeFile(FileSystem fs, Path path, V value) throws IOException; 1018 1019 protected abstract void writeZipData(ZipOutputStream zos, V value) throws IOException; 1020 1021 protected abstract void printFile(V value) throws IOException; 1022 1023 /** 1024 * Append data read from a file to the sequence file. 1025 * 1026 * @param key 1027 * @param fs 1028 * @param p 1029 * @throws IOException 1030 */ 1031 public void appendFile(K key, FileSystem fs, Path p) throws IOException { 1032 if (isReader) { 1033 throw new UnsupportedOperationException("Cannot write data in read mode"); 1034 } 1035 1036 writer.append(key, readFile(fs, p)); 1037 } 1038 1039 /** 1040 * Append data to a sequence file. 1041 * 1042 * @param key 1043 * @param value 1044 * @throws IOException 1045 */ 1046 public void appendData(K key, V value) throws IOException { 1047 if (isReader) { 1048 throw new UnsupportedOperationException("Cannot write data in read mode"); 1049 } 1050 writer.append(key, value); 1051 } 1052 1053 /** 1054 * Interface for objects that can make a key from a path 1055 * 1056 * @param <K> 1057 */ 1058 public interface KeyProvider<K> { 1059 K getKey(FileSystem fs, Path path); 1060 1061 K getKey(FileSystem fs, Path path, Path base); 1062 } 1063 1064 /** 1065 * A class that provides Text keys by calculating a UUID from the MD5 of a 1066 * file 1067 */ 1068 public static class MD5UUIDKeyProvider implements KeyProvider<Text> { 1069 @Override 1070 public Text getKey(FileSystem fs, Path path) { 1071 final UUID uuid = UUID.nameUUIDFromBytes(SequenceFileUtility.md5sum(fs, path).getBytes()); 1072 return new Text(uuid.toString()); 1073 } 1074 1075 @Override 1076 public Text getKey(FileSystem fs, Path path, Path base) { 1077 return this.getKey(fs, path); 1078 } 1079 } 1080 1081 /** 1082 * A class that provides Text keys from the name of a file 1083 */ 1084 public static class FilenameKeyProvider implements KeyProvider<Text> { 1085 @Override 1086 public Text getKey(FileSystem fs, Path path) { 1087 return new Text(path.getName()); 1088 } 1089 1090 @Override 1091 public Text getKey(FileSystem fs, Path path, Path base) { 1092 return this.getKey(fs, path); 1093 } 1094 } 1095 1096 /** 1097 * A class that provides Text keys from the relative path + name of a file 1098 */ 1099 public static class RelativePathFilenameKeyProvider implements KeyProvider<Text> { 1100 @Override 1101 public Text getKey(FileSystem fs, Path path) { 1102 return new Text(path.toUri().getPath()); 1103 } 1104 1105 @Override 1106 public Text getKey(FileSystem fs, Path path, Path base) { 1107 return new Text(path.toUri().getPath().substring(base.toUri().getPath().length())); 1108 } 1109 } 1110 1111 /** 1112 * Append files to a sequenceFile. 1113 * 1114 * @param fs 1115 * The filesystem of the files being added. 1116 * @param path 1117 * The path of the file(s) being added. 1118 * @param recurse 1119 * If true, then subdirectories are also searched 1120 * @param pathFilter 1121 * Filter for omitting files. Can be null. 1122 * @param keyProvider 1123 * Object that can return a key for a given file. 1124 * @return Paths and their respective keys for files that were added. 1125 * @throws IOException 1126 */ 1127 public Map<Path, K> appendFiles(FileSystem fs, Path path, boolean recurse, PathFilter pathFilter, 1128 KeyProvider<K> keyProvider) 1129 throws IOException 1130 { 1131 final LinkedHashMap<Path, K> addedFiles = new LinkedHashMap<Path, K>(); 1132 appendFiles(fs, path, recurse, pathFilter, keyProvider, addedFiles); 1133 return addedFiles; 1134 } 1135 1136 private void appendFiles(final FileSystem fs, Path path, boolean recurse, PathFilter pathFilter, 1137 KeyProvider<K> keyProvider, 1138 Map<Path, K> addedFiles) throws IOException 1139 { 1140 if (fs.isFile(path)) { 1141 if (pathFilter == null || pathFilter.accept(path)) { 1142 final K key = keyProvider.getKey(fs, path); 1143 appendFile(key, fs, path); 1144 addedFiles.put(path, key); 1145 } 1146 } else if (recurse) { 1147 // fs.listStatus(path); 1148 final FileStatus[] status = fs.listStatus(path, new PathFilter() { 1149 1150 @Override 1151 public boolean accept(Path potential) { 1152 try { 1153 fs.getStatus(potential); 1154 return true; 1155 } catch (final IOException e) { 1156 return false; 1157 } 1158 } 1159 1160 }); 1161 for (final FileStatus stat : status) { 1162 appendFiles(fs, stat.getPath(), path.getParent(), pathFilter, keyProvider, addedFiles); 1163 } 1164 } 1165 } 1166 1167 private void appendFiles(FileSystem fs, Path path, Path base, PathFilter pathFilter, KeyProvider<K> keyProvider, 1168 Map<Path, K> addedFiles) 1169 throws IOException 1170 { 1171 if (fs.isFile(path)) { 1172 if (pathFilter == null || pathFilter.accept(path)) { 1173 final K key = keyProvider.getKey(fs, path, base); 1174 appendFile(key, fs, path); 1175 addedFiles.put(path, key); 1176 } 1177 } else { 1178 try { 1179 final FileStatus[] status = fs.listStatus(path); 1180 1181 for (final FileStatus stat : status) { 1182 appendFiles(fs, stat.getPath(), base, pathFilter, keyProvider, addedFiles); 1183 } 1184 } catch (final Throwable e) { 1185 System.err.println("Failed listing status on path: " + path); 1186 } 1187 } 1188 } 1189 1190 public void writePathMap(Map<Path, K> map) throws IOException { 1191 final Path p = new Path(sequenceFilePath.getParent(), sequenceFilePath.getName().substring(0, 1192 sequenceFilePath.getName().lastIndexOf(".")) 1193 + "-map.txt"); 1194 FSDataOutputStream dos = null; 1195 PrintWriter pw = null; 1196 1197 try { 1198 dos = fileSystem.create(p); 1199 pw = new PrintWriter(dos); 1200 1201 for (final Entry<Path, K> e : map.entrySet()) { 1202 pw.println(e.getValue() + " " + e.getKey()); 1203 } 1204 } finally { 1205 if (pw != null) 1206 pw.close(); 1207 if (dos != null) 1208 try { 1209 dos.close(); 1210 } catch (final IOException e) { 1211 } 1212 } 1213 } 1214 1215 /** 1216 * Search for the record identified by queryKey. 1217 * 1218 * @param queryKey 1219 * the key. 1220 * @param offset 1221 * the offset from which to commence search 1222 * @return the found value, or null. 1223 */ 1224 @SuppressWarnings("unchecked") 1225 public V find(K queryKey, long offset) { 1226 if (!isReader) { 1227 throw new UnsupportedOperationException("Cannot find key in write mode"); 1228 } 1229 1230 Reader reader = null; 1231 try { 1232 reader = createReader(); 1233 if (offset > 0) 1234 reader.seek(offset); 1235 1236 final K key = ReflectionUtils.newInstance((Class<K>) reader.getKeyClass(), config); 1237 1238 while (reader.next(key)) { 1239 // System.out.println(key); 1240 if (key.equals(queryKey)) { 1241 final V val = ReflectionUtils.newInstance((Class<V>) reader.getValueClass(), config); 1242 1243 reader.getCurrentValue(val); 1244 1245 return val; 1246 } 1247 } 1248 return null; 1249 } catch (final Exception e) { 1250 throw new RuntimeException(e); 1251 } finally { 1252 if (reader != null) 1253 try { 1254 reader.close(); 1255 } catch (final IOException e1) { 1256 } 1257 } 1258 } 1259 1260 /** 1261 * Search for the record identified by queryKey. Uses a linear search from 1262 * the beginning of the file. 1263 * 1264 * @param queryKey 1265 * @return the found value, or null. 1266 */ 1267 public V find(K queryKey) { 1268 return find(queryKey, 0); 1269 } 1270 1271 /** 1272 * Find a record and write the value to a file. 1273 * 1274 * @param key 1275 * @param uriOrPath 1276 * @param offset 1277 * @return false if record not found, true otherwise. 1278 * @throws IOException 1279 */ 1280 public boolean findAndExport(K key, String uriOrPath, long offset) throws IOException { 1281 FileSystem fs = null; 1282 Path p = null; 1283 1284 if (uriOrPath != null) { 1285 final URI uri = convertToURI(uriOrPath); 1286 1287 fs = getFileSystem(uri); 1288 p = new Path(uri.toString()); 1289 } 1290 1291 return findAndExport(key, fs, p, offset); 1292 } 1293 1294 /** 1295 * Find a record and write the value to a file. 1296 * 1297 * @param key 1298 * @param fs 1299 * @param dirPath 1300 * @param offset 1301 * @return false if record not found, true otherwise. 1302 * @throws IOException 1303 */ 1304 public boolean findAndExport(K key, FileSystem fs, Path dirPath, long offset) throws IOException { 1305 final V value = find(key, offset); 1306 1307 if (value == null) 1308 return false; 1309 1310 if (fs != null && fs != null) { 1311 final Path outFilePath = new Path(dirPath, key.toString()); 1312 writeFile(fs, outFilePath, value); 1313 } else { 1314 printFile(value); 1315 } 1316 1317 return true; 1318 } 1319 1320 public Path getSequenceFilePath() { 1321 return sequenceFilePath; 1322 } 1323 1324 class SequenceFileEntry implements Entry<K, V> { 1325 K key; 1326 V value; 1327 1328 public SequenceFileEntry(K k, V v) { 1329 key = k; 1330 value = v; 1331 } 1332 1333 @Override 1334 public K getKey() { 1335 return key; 1336 } 1337 1338 @Override 1339 public V getValue() { 1340 return value; 1341 } 1342 1343 @Override 1344 public V setValue(V value) { 1345 this.value = value; 1346 return value; 1347 } 1348 } 1349 1350 class SequenceFileIterator implements Iterator<Entry<K, V>> { 1351 Reader reader = null; 1352 Entry<K, V> next; 1353 boolean shouldMove = true; 1354 1355 @SuppressWarnings("unchecked") 1356 public SequenceFileIterator() { 1357 try { 1358 reader = createReader(); 1359 1360 next = new SequenceFileEntry(ReflectionUtils.newInstance((Class<K>) reader.getKeyClass(), config), 1361 ReflectionUtils.newInstance( 1362 (Class<V>) reader.getValueClass(), config)); 1363 } catch (final IOException e) { 1364 throw new RuntimeException(e); 1365 } 1366 } 1367 1368 @Override 1369 public boolean hasNext() { 1370 tryGetNext(); 1371 return next != null; 1372 } 1373 1374 private void tryGetNext() { 1375 if (next != null && shouldMove) { 1376 shouldMove = false; 1377 try { 1378 if (!reader.next(next.getKey(), next.getValue())) { 1379 next = null; 1380 try { 1381 reader.close(); 1382 } catch (final IOException e1) { 1383 } 1384 } 1385 } catch (final IOException e) { 1386 try { 1387 reader.close(); 1388 } catch (final IOException e1) { 1389 } 1390 throw new RuntimeException(e); 1391 } 1392 } 1393 } 1394 1395 @Override 1396 public Entry<K, V> next() { 1397 tryGetNext(); 1398 1399 if (next == null) { 1400 throw new NoSuchElementException(); 1401 } 1402 shouldMove = true; 1403 return next; 1404 } 1405 1406 @Override 1407 public void remove() { 1408 } 1409 } 1410 1411 @Override 1412 public Iterator<Entry<K, V>> iterator() { 1413 if (!isReader) { 1414 throw new UnsupportedOperationException("Cannot iterate in write mode"); 1415 } 1416 1417 return new SequenceFileIterator(); 1418 } 1419}