001/**
002 * Copyright (c) 2011, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.util.stream.window;
031
032import java.util.concurrent.Callable;
033import java.util.concurrent.Future;
034import java.util.concurrent.ThreadPoolExecutor;
035
036import org.openimaj.util.pair.IndependentPair;
037import org.openimaj.util.parallel.GlobalExecutorPool;
038import org.openimaj.util.stream.AbstractStream;
039import org.openimaj.util.stream.Stream;
040
041
042/**
043 * A stream combiner takes two streams and produces a new stream of syncrhonised
044 * pairs of the stream values. The two streams are consumed in two threads which
045 * the {@link #next()} method waits to complete before returning
046 *
047 * @author Sina Samangooei (ss@ecs.soton.ac.uk)
048 * @param <AP>
049 * @param <AM>
050 * @param <BP>
051 * @param <BM>
052 *
053 */
054public class MetaPayloadStreamCombiner<AP,AM,BP,BM> extends AbstractStream<MetaPayload<IndependentPair<AP, BP>,IndependentPair<AM,BM>>>{
055
056        private Stream<? extends MetaPayload<BP,BM>> b;
057        private Stream<? extends MetaPayload<AP,AM>> a;
058        private Starter<MetaPayload<AP, AM>> astart;
059        private Starter<MetaPayload<BP, BM>> bstart;
060        private ThreadPoolExecutor service;
061
062        class Starter<T> implements Callable<T>{
063
064                private Stream<? extends T> stream;
065
066                public Starter(Stream<? extends T> a) {
067                        stream = a;
068                }
069
070                @Override
071                public T call() throws Exception {
072                        return stream.next();
073                }
074
075        }
076        /**
077         * @param a
078         * @param b
079         */
080        public <A extends MetaPayload<AP,AM>> MetaPayloadStreamCombiner( Stream<A> a, Stream<? extends MetaPayload<BP,BM>> b) {
081
082                this.a = a;
083                this.b = b;
084                this.astart = new Starter<MetaPayload<AP, AM>>(this.a);
085                this.bstart = new Starter<MetaPayload<BP, BM>>(this.b);
086                this.service = GlobalExecutorPool.getPool();
087
088        }
089        @Override
090        public boolean hasNext() {
091                return a.hasNext() && b.hasNext();
092        }
093
094        @Override
095        public MetaPayload<IndependentPair<AP, BP>,IndependentPair<AM,BM>> next() {
096                Future<MetaPayload<AP, AM>> futurea = this.service.submit(astart);
097                Future<MetaPayload<BP, BM>> futureb = this.service.submit(bstart);
098                try {
099                        MetaPayload<AP, AM> ai = futurea.get();
100                        MetaPayload<BP, BM> bi = futureb.get();
101                        IndependentPair<AP, BP> payloads = IndependentPair.pair(ai.getPayload(), bi.getPayload());
102                        IndependentPair<AM, BM> metas = IndependentPair.pair(ai.getMeta(), bi.getMeta());
103                        return new MetaPayload<IndependentPair<AP,BP>, IndependentPair<AM,BM>>(payloads, metas);
104                } catch (Exception e) {
105                        e.printStackTrace();
106                }
107                return null;
108
109        }
110        /**
111         * @param a
112         * @param b
113         * @return an aggregation of the two streams
114         */
115        public static <AP,AM,BP,BM> MetaPayloadStreamCombiner<AP,AM, BP,BM> combine(Stream<? extends MetaPayload<AP,AM>> a, Stream<? extends MetaPayload<BP,BM>> b) {
116                return new MetaPayloadStreamCombiner<AP,AM,BP,BM>(a, b);
117        }
118
119}