001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.util.stream; 031 032import java.util.Iterator; 033import java.util.concurrent.ThreadPoolExecutor; 034 035import org.openimaj.util.function.Function; 036import org.openimaj.util.function.MultiFunction; 037import org.openimaj.util.function.Operation; 038import org.openimaj.util.function.Predicate; 039import org.openimaj.util.parallel.Parallel; 040 041/** 042 * Interface describing a stream of data items. Streams are sequences of items 043 * supporting both sequential and parallel bulk operations. Streams support lazy 044 * transformative operations (transforming a stream to another stream) such as 045 * {@link #filter(Predicate)} and {@link #map(Function)}, and consuming 046 * operations, such as {@link #forEach(Operation)} and {@link #next()}. 047 * <p> 048 * Streams may be either bounded or infinite in length. Once an item has been 049 * extracted from a stream, it is said to be consumed and is no longer available 050 * for operations on the stream. 051 * 052 * @author Jonathon Hare (jsh2@ecs.soton.ac.uk) 053 * 054 * @param <T> 055 * The type of data item in the stream 056 */ 057public interface Stream<T> extends Iterator<T>, Iterable<T> { 058 059 /** 060 * Apply the given {@link Operation} to each item in the stream. Items are 061 * presented to the {@link Operation} in the order they appear in the 062 * stream. 063 * <p> 064 * Note: for an unbounded stream, this method will never return unless some 065 * form of exception is raised. 066 * 067 * @param op 068 * the {@link Operation} to apply 069 */ 070 public void forEach(Operation<T> op); 071 072 /** 073 * Apply the given {@link Operation} to each item in the stream. Items are 074 * presented to the {@link Operation} in the order they appear in the 075 * stream. The given {@link Predicate} can be used to stop processing of the 076 * stream once some condition is met. 077 * <p> 078 * Note: for an unbounded stream, this method will never return unless some 079 * form of exception is raised or the condition of the 080 * <tt>stopPredicate</tt> is met. 081 * 082 * @param operation 083 * the {@link Operation} to apply 084 * @param stopPredicate 085 * a predicate representing a condition that once met causes 086 * processing to stop 087 */ 088 public void forEach(Operation<T> operation, Predicate<T> stopPredicate); 089 090 /** 091 * Apply the given {@link Operation} to each item in the stream. Items are 092 * presented to the {@link Operation} in the order they appear in the 093 * stream. The given {@link Predicate} can be used to stop processing of the 094 * stream once some condition is met. 095 * <p> 096 * Note: for an unbounded stream, this method will never return unless some 097 * form of exception is raised or the condition of the 098 * <tt>stopPredicate</tt> is met. 099 * 100 * @param operation 101 * the {@link Operation} to apply 102 * @param limit 103 * the number of items to read from the stream 104 * @return the number of items read 105 */ 106 public int forEach(Operation<T> operation, int limit); 107 108 /** 109 * Apply the given {@link Operation} to each item in the stream, making use 110 * of multiple threads. The order in which operations are performed on the 111 * stream is not guaranteed. 112 * <p> 113 * This method is intended to be a shortcut to calling 114 * {@link Parallel#forEachUnpartitioned(Iterator, Operation)}. 115 * <p> 116 * Note: for an unbounded stream, this method will never return unless some 117 * form of exception is raised. 118 * 119 * @param op 120 * the {@link Operation} to apply 121 */ 122 public void parallelForEach(Operation<T> op); 123 124 /** 125 * Apply the given {@link Operation} to each item in the stream, making use 126 * of multiple threads. The order in which operations are performed on the 127 * stream is not guaranteed. 128 * <p> 129 * This method is intended to be a shortcut to calling 130 * {@link Parallel#forEachUnpartitioned(Iterator, Operation, ThreadPoolExecutor)}. 131 * <p> 132 * Note: for an unbounded stream, this method will never return unless some 133 * form of exception is raised. 134 * 135 * @param op 136 * the {@link Operation} to apply 137 * @param pool 138 * the thread pool. 139 */ 140 public void parallelForEach(Operation<T> op, ThreadPoolExecutor pool); 141 142 /** 143 * Transform the stream by creating a view that consists of only the items 144 * that match the given {@link Predicate}. 145 * 146 * @param filter 147 * the predicate 148 * @return a new stream consisting of the matched items from this stream 149 */ 150 public Stream<T> filter(Predicate<T> filter); 151 152 /** 153 * Transform the stream by creating a new stream that transforms the items 154 * in this stream with the given {@link Function}. 155 * 156 * @param mapper 157 * the function to apply 158 * @return a new stream with transformed items from this stream 159 */ 160 public <R> Stream<R> map(Function<T, R> mapper); 161 162 /** 163 * Transform the stream by creating a new stream that transforms the items 164 * in this stream with the given {@link Function}. 165 * 166 * @param mapper 167 * the function to apply 168 * @return a new stream with transformed items from this stream 169 */ 170 public <R> Stream<R> map(MultiFunction<T, R> mapper); 171 172 /** 173 * Transform the stream using the given function to transform the items in 174 * this stream. 175 * 176 * @param transform 177 * the transform function 178 * @return a new stream with transformed items from this stream 179 */ 180 public <R> Stream<R> transform(Function<Stream<T>, Stream<R>> transform); 181}