001/**
002 * Copyright (c) 2011, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.util.stream.combine;
031
032import java.util.concurrent.Callable;
033import java.util.concurrent.Future;
034import java.util.concurrent.ThreadPoolExecutor;
035
036import org.openimaj.util.data.Context;
037import org.openimaj.util.parallel.GlobalExecutorPool;
038import org.openimaj.util.stream.AbstractStream;
039import org.openimaj.util.stream.Stream;
040
041/**
042 * A stream combiner takes two streams of {@link Context} objects and produces a
043 * new stream of {@link Context}s which contain the stream values from both
044 * input streams. To combine contexts, prefix strings are added to all the keys
045 * to ensure that there is no overlap. The key prefixes can be empty if it is
046 * known that the context keys will not collide. The two streams are consumed in
047 * two threads which the {@link #next()} method waits to complete before
048 * returning.
049 * 
050 * @author Sina Samangooei (ss@ecs.soton.ac.uk)
051 */
052public class ContextStreamCombiner extends AbstractStream<Context> {
053        class Starter implements Callable<Context> {
054                private Stream<Context> stream;
055
056                public Starter(Stream<Context> a) {
057                        stream = a;
058                }
059
060                @Override
061                public Context call() throws Exception {
062                        return stream.next();
063                }
064        }
065
066        private ThreadPoolExecutor service;
067        private Stream<Context> b;
068        private Stream<Context> a;
069        private String aprefix;
070        private String bprefix;
071
072        /**
073         * Construct the combiner to consume the two given streams. The keys from
074         * the first {@link Stream} will be prefixed "a" and the second "b".
075         * 
076         * @param a
077         *            the first stream
078         * @param b
079         *            the second stream
080         */
081        public ContextStreamCombiner(Stream<Context> a, Stream<Context> b) {
082                this.a = a;
083                this.b = b;
084                this.aprefix = "a";
085                this.bprefix = "b";
086                this.service = GlobalExecutorPool.getPool();
087
088        }
089
090        /**
091         * Construct the combiner to consume the two given streams, using the given
092         * prefixes to modify the keys from the respective streams.
093         * 
094         * @param a
095         *            the first stream
096         * @param b
097         *            the second stream
098         * @param aprefix
099         *            the first stream key prefix
100         * @param bprefix
101         *            the second stream key prefix
102         */
103        public ContextStreamCombiner(Stream<Context> a, Stream<Context> b, String aprefix, String bprefix) {
104                this.a = a;
105                this.b = b;
106                this.aprefix = aprefix;
107                this.bprefix = bprefix;
108                this.service = GlobalExecutorPool.getPool();
109
110        }
111
112        @Override
113        public boolean hasNext() {
114                return a.hasNext() && b.hasNext();
115        }
116
117        @Override
118        public Context next() {
119                final Future<Context> futurea = this.service.submit(new Starter(a));
120                final Future<Context> futureb = this.service.submit(new Starter(b));
121                try {
122                        final Context a = futurea.get();
123                        final Context b = futureb.get();
124                        return a.combine(b, aprefix, bprefix);
125                } catch (final Exception e) {
126                        e.printStackTrace();
127                }
128                return null;
129
130        }
131
132        /**
133         * Utility method to combine two streams. The keys from the first
134         * {@link Stream} will be prefixed "a" and the second "b".
135         * 
136         * @param a
137         *            the first stream
138         * @param b
139         *            the second stream
140         * @return the combined stream
141         */
142        public static Stream<Context> combine(Stream<Context> a, Stream<Context> b) {
143                return new ContextStreamCombiner(a, b);
144        }
145}