001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.util.stream.window; 031 032import java.util.concurrent.Callable; 033import java.util.concurrent.Future; 034import java.util.concurrent.ThreadPoolExecutor; 035 036import org.openimaj.util.pair.IndependentPair; 037import org.openimaj.util.parallel.GlobalExecutorPool; 038import org.openimaj.util.stream.AbstractStream; 039import org.openimaj.util.stream.Stream; 040 041 042/** 043 * A stream combiner takes two streams and produces a new stream of syncrhonised 044 * pairs of the stream values. The two streams are consumed in two threads which 045 * the {@link #next()} method waits to complete before returning 046 * 047 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 048 * @param <AP> 049 * @param <AM> 050 * @param <BP> 051 * @param <BM> 052 * 053 */ 054public class MetaPayloadStreamCombiner<AP,AM,BP,BM> extends AbstractStream<MetaPayload<IndependentPair<AP, BP>,IndependentPair<AM,BM>>>{ 055 056 private Stream<? extends MetaPayload<BP,BM>> b; 057 private Stream<? extends MetaPayload<AP,AM>> a; 058 private Starter<MetaPayload<AP, AM>> astart; 059 private Starter<MetaPayload<BP, BM>> bstart; 060 private ThreadPoolExecutor service; 061 062 class Starter<T> implements Callable<T>{ 063 064 private Stream<? extends T> stream; 065 066 public Starter(Stream<? extends T> a) { 067 stream = a; 068 } 069 070 @Override 071 public T call() throws Exception { 072 return stream.next(); 073 } 074 075 } 076 /** 077 * @param a 078 * @param b 079 */ 080 public <A extends MetaPayload<AP,AM>> MetaPayloadStreamCombiner( Stream<A> a, Stream<? extends MetaPayload<BP,BM>> b) { 081 082 this.a = a; 083 this.b = b; 084 this.astart = new Starter<MetaPayload<AP, AM>>(this.a); 085 this.bstart = new Starter<MetaPayload<BP, BM>>(this.b); 086 this.service = GlobalExecutorPool.getPool(); 087 088 } 089 @Override 090 public boolean hasNext() { 091 return a.hasNext() && b.hasNext(); 092 } 093 094 @Override 095 public MetaPayload<IndependentPair<AP, BP>,IndependentPair<AM,BM>> next() { 096 Future<MetaPayload<AP, AM>> futurea = this.service.submit(astart); 097 Future<MetaPayload<BP, BM>> futureb = this.service.submit(bstart); 098 try { 099 MetaPayload<AP, AM> ai = futurea.get(); 100 MetaPayload<BP, BM> bi = futureb.get(); 101 IndependentPair<AP, BP> payloads = IndependentPair.pair(ai.getPayload(), bi.getPayload()); 102 IndependentPair<AM, BM> metas = IndependentPair.pair(ai.getMeta(), bi.getMeta()); 103 return new MetaPayload<IndependentPair<AP,BP>, IndependentPair<AM,BM>>(payloads, metas); 104 } catch (Exception e) { 105 e.printStackTrace(); 106 } 107 return null; 108 109 } 110 /** 111 * @param a 112 * @param b 113 * @return an aggregation of the two streams 114 */ 115 public static <AP,AM,BP,BM> MetaPayloadStreamCombiner<AP,AM, BP,BM> combine(Stream<? extends MetaPayload<AP,AM>> a, Stream<? extends MetaPayload<BP,BM>> b) { 116 return new MetaPayloadStreamCombiner<AP,AM,BP,BM>(a, b); 117 } 118 119}