001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.util.stream; 031 032import org.openimaj.util.concurrent.BlockingDroppingQueue; 033 034/** 035 * Base for a {@link Stream} with an internal buffer based on a 036 * {@link BlockingDroppingQueue}. The use of the {@link BlockingDroppingQueue} 037 * allows the stream to potentially drop items that are not consumed at a fast 038 * enough rate (although this depends on the actual 039 * {@link BlockingDroppingQueue}). 040 * <p> 041 * This class is intended to be used to build {@link Stream} implementations 042 * that are connected to external, live data-sources that can potentially 043 * produce data at a rate which exceeds the rate at which the stream can be 044 * processed or consumed. 045 * 046 * @author Jonathon Hare (jsh2@ecs.soton.ac.uk) 047 * 048 * @param <T> 049 * The type of data item in the stream 050 */ 051public class BlockingDroppingBufferedStream<T> extends AbstractStream<T> { 052 BlockingDroppingQueue<T> buffer; 053 private boolean isClosed = false; 054 055 /** 056 * Construct with the given backing queue 057 * 058 * @param buffer 059 * the backing buffer 060 */ 061 public BlockingDroppingBufferedStream(BlockingDroppingQueue<T> buffer) { 062 this.buffer = buffer; 063 } 064 065 protected void register(T obj) throws InterruptedException { 066 buffer.offer(obj); 067 } 068 069 @Override 070 public boolean hasNext() { 071 return !isClosed; 072 } 073 074 /** 075 * Close the stream (make hasNext return false) 076 */ 077 public void close() { 078 this.isClosed = true; 079 } 080 081 @Override 082 public T next() { 083 try { 084 return buffer.take(); 085 } catch (final InterruptedException e) { 086 return next(); 087 } 088 } 089 090 /** 091 * Get the underlying {@link BlockingDroppingQueue} that is used as the 092 * internal buffer. 093 * 094 * @return the buffer 095 */ 096 public BlockingDroppingQueue<T> getBuffer() { 097 return buffer; 098 } 099}