001/**
002 * Copyright (c) 2011, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.util.stream;
031
032import java.util.Iterator;
033import java.util.concurrent.ThreadPoolExecutor;
034
035import org.openimaj.util.function.Function;
036import org.openimaj.util.function.MultiFunction;
037import org.openimaj.util.function.Operation;
038import org.openimaj.util.function.Predicate;
039import org.openimaj.util.parallel.Parallel;
040
041/**
042 * Interface describing a stream of data items. Streams are sequences of items
043 * supporting both sequential and parallel bulk operations. Streams support lazy
044 * transformative operations (transforming a stream to another stream) such as
045 * {@link #filter(Predicate)} and {@link #map(Function)}, and consuming
046 * operations, such as {@link #forEach(Operation)} and {@link #next()}.
047 * <p>
048 * Streams may be either bounded or infinite in length. Once an item has been
049 * extracted from a stream, it is said to be consumed and is no longer available
050 * for operations on the stream.
051 *
052 * @author Jonathon Hare (jsh2@ecs.soton.ac.uk)
053 *
054 * @param <T>
055 *            The type of data item in the stream
056 */
057public interface Stream<T> extends Iterator<T>, Iterable<T> {
058
059        /**
060         * Apply the given {@link Operation} to each item in the stream. Items are
061         * presented to the {@link Operation} in the order they appear in the
062         * stream.
063         * <p>
064         * Note: for an unbounded stream, this method will never return unless some
065         * form of exception is raised.
066         *
067         * @param op
068         *            the {@link Operation} to apply
069         */
070        public void forEach(Operation<T> op);
071
072        /**
073         * Apply the given {@link Operation} to each item in the stream. Items are
074         * presented to the {@link Operation} in the order they appear in the
075         * stream. The given {@link Predicate} can be used to stop processing of the
076         * stream once some condition is met.
077         * <p>
078         * Note: for an unbounded stream, this method will never return unless some
079         * form of exception is raised or the condition of the
080         * <tt>stopPredicate</tt> is met.
081         *
082         * @param operation
083         *            the {@link Operation} to apply
084         * @param stopPredicate
085         *            a predicate representing a condition that once met causes
086         *            processing to stop
087         */
088        public void forEach(Operation<T> operation, Predicate<T> stopPredicate);
089
090        /**
091         * Apply the given {@link Operation} to each item in the stream. Items are
092         * presented to the {@link Operation} in the order they appear in the
093         * stream. The given {@link Predicate} can be used to stop processing of the
094         * stream once some condition is met.
095         * <p>
096         * Note: for an unbounded stream, this method will never return unless some
097         * form of exception is raised or the condition of the
098         * <tt>stopPredicate</tt> is met.
099         *
100         * @param operation
101         *            the {@link Operation} to apply
102         * @param limit
103         *            the number of items to read from the stream
104         * @return the number of items read
105         */
106        public int forEach(Operation<T> operation, int limit);
107
108        /**
109         * Apply the given {@link Operation} to each item in the stream, making use
110         * of multiple threads. The order in which operations are performed on the
111         * stream is not guaranteed.
112         * <p>
113         * This method is intended to be a shortcut to calling
114         * {@link Parallel#forEachUnpartitioned(Iterator, Operation)}.
115         * <p>
116         * Note: for an unbounded stream, this method will never return unless some
117         * form of exception is raised.
118         *
119         * @param op
120         *            the {@link Operation} to apply
121         */
122        public void parallelForEach(Operation<T> op);
123
124        /**
125         * Apply the given {@link Operation} to each item in the stream, making use
126         * of multiple threads. The order in which operations are performed on the
127         * stream is not guaranteed.
128         * <p>
129         * This method is intended to be a shortcut to calling
130         * {@link Parallel#forEachUnpartitioned(Iterator, Operation, ThreadPoolExecutor)}.
131         * <p>
132         * Note: for an unbounded stream, this method will never return unless some
133         * form of exception is raised.
134         *
135         * @param op
136         *            the {@link Operation} to apply
137         * @param pool
138         *            the thread pool.
139         */
140        public void parallelForEach(Operation<T> op, ThreadPoolExecutor pool);
141
142        /**
143         * Transform the stream by creating a view that consists of only the items
144         * that match the given {@link Predicate}.
145         *
146         * @param filter
147         *            the predicate
148         * @return a new stream consisting of the matched items from this stream
149         */
150        public Stream<T> filter(Predicate<T> filter);
151
152        /**
153         * Transform the stream by creating a new stream that transforms the items
154         * in this stream with the given {@link Function}.
155         *
156         * @param mapper
157         *            the function to apply
158         * @return a new stream with transformed items from this stream
159         */
160        public <R> Stream<R> map(Function<T, R> mapper);
161
162        /**
163         * Transform the stream by creating a new stream that transforms the items
164         * in this stream with the given {@link Function}.
165         *
166         * @param mapper
167         *            the function to apply
168         * @return a new stream with transformed items from this stream
169         */
170        public <R> Stream<R> map(MultiFunction<T, R> mapper);
171
172        /**
173         * Transform the stream using the given function to transform the items in
174         * this stream.
175         *
176         * @param transform
177         *            the transform function
178         * @return a new stream with transformed items from this stream
179         */
180        public <R> Stream<R> transform(Function<Stream<T>, Stream<R>> transform);
181}