001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.util.stream; 031 032import java.util.Iterator; 033import java.util.List; 034import java.util.NoSuchElementException; 035import java.util.concurrent.ThreadPoolExecutor; 036 037import org.openimaj.util.function.Function; 038import org.openimaj.util.function.MultiFunction; 039import org.openimaj.util.function.Operation; 040import org.openimaj.util.function.Predicate; 041import org.openimaj.util.parallel.Parallel; 042 043/** 044 * Abstract base implementation of a read-only (i.e. {@link #remove()} not 045 * supported) {@link Stream}s. 046 * 047 * @author Jonathon Hare (jsh2@ecs.soton.ac.uk) 048 * 049 * @param <T> 050 * The type of data item in the stream 051 */ 052public abstract class AbstractStream<T> implements Stream<T> { 053 @Override 054 public void forEach(Operation<T> op) { 055 while (hasNext()) { 056 op.perform(next()); 057 } 058 } 059 060 @Override 061 public void forEach(Operation<T> operation, Predicate<T> stopPredicate) { 062 while (hasNext()) { 063 final T next = next(); 064 operation.perform(next); 065 if (stopPredicate.test(next)) break; 066 } 067 } 068 069 @Override 070 public int forEach(Operation<T> operation, int limit) { 071 int seen = 0; 072 while (hasNext()) { 073 final T next = next(); 074 operation.perform(next); 075 seen ++; 076 if(seen >= limit) break; 077 } 078 return seen; 079 } 080 081 @Override 082 public void parallelForEach(Operation<T> op) { 083 Parallel.forEachUnpartitioned(this, op); 084 } 085 086 @Override 087 public void parallelForEach(Operation<T> op, ThreadPoolExecutor pool) { 088 Parallel.forEachUnpartitioned(this, op, pool); 089 } 090 091 class FilterStream extends AbstractStream<T> { 092 Predicate<T> filter; 093 T obj = null; 094 095 FilterStream(Predicate<T> predicate) { 096 this.filter = predicate; 097 } 098 099 @Override 100 public boolean hasNext() { 101 if (obj != null) 102 return true; 103 104 while (AbstractStream.this.hasNext() && !filter.test(obj = AbstractStream.this.next())) { 105 obj = null; 106 } 107 108 return obj != null; 109 } 110 111 @Override 112 public T next() { 113 if (!hasNext()) { 114 throw new NoSuchElementException("iteration has no more elements"); 115 } 116 117 final T toRet = obj; 118 obj = null; 119 120 return toRet; 121 } 122 } 123 124 @Override 125 public Stream<T> filter(Predicate<T> filter) { 126 return new FilterStream(filter); 127 } 128 129 @Override 130 public <R> Stream<R> transform(Function<Stream<T>, Stream<R>> transform) { 131 return transform.apply(this); 132 } 133 134 @Override 135 public <R> Stream<R> map(final Function<T, R> mapper) { 136 return new AbstractStream<R>() { 137 @Override 138 public boolean hasNext() { 139 return AbstractStream.this.hasNext(); 140 } 141 142 @Override 143 public R next() { 144 return mapper.apply(AbstractStream.this.next()); 145 } 146 }; 147 } 148 149 @Override 150 public <R> Stream<R> map(final MultiFunction<T, R> mapper) { 151 return new AbstractStream<R>() { 152 List<R> current; 153 int currentIndex; 154 155 @Override 156 public boolean hasNext() { 157 if (current != null && currentIndex >= current.size()) { 158 current = null; 159 currentIndex = 0; 160 } 161 162 if (current == null) { 163 if (AbstractStream.this.hasNext()) { 164 for (final T obj : AbstractStream.this) { 165 final List<R> list = mapper.apply(obj); 166 167 if (list != null && list.size() > 0) { 168 current = list; 169 currentIndex = 0; 170 return true; 171 } 172 } 173 } 174 return false; 175 } 176 177 return true; 178 } 179 180 @Override 181 public R next() { 182 if (!hasNext()) 183 throw new NoSuchElementException(); 184 185 final R ret = current.get(currentIndex); 186 currentIndex++; 187 188 return ret; 189 } 190 }; 191 } 192 193 /** 194 * Throws an UnsupportedOperationException() 195 */ 196 @Override 197 public void remove() { 198 throw new UnsupportedOperationException("Remove is not supported"); 199 } 200 201 @Override 202 public Iterator<T> iterator() { 203 return this; 204 } 205}