001/**
002 * Copyright (c) 2011, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.util.parallel;
031
032import java.util.Iterator;
033import java.util.List;
034import java.util.concurrent.CountDownLatch;
035import java.util.concurrent.ExecutionException;
036import java.util.concurrent.ExecutorCompletionService;
037import java.util.concurrent.ThreadPoolExecutor;
038
039import org.openimaj.util.function.Operation;
040import org.openimaj.util.parallel.partition.FixedSizeChunkPartitioner;
041import org.openimaj.util.parallel.partition.GrowingChunkPartitioner;
042import org.openimaj.util.parallel.partition.Partitioner;
043import org.openimaj.util.parallel.partition.RangePartitioner;
044import org.openimaj.util.stream.Stream;
045
046/**
047 * Parallel processing utilities for looping.
048 * <p>
049 * Inspired by the .NET Task Parallel Library. Allows control over the way data
050 * is partitioned using inspiration from <a href=
051 * "http://reedcopsey.com/2010/01/26/parallelism-in-net-part-5-partitioning-of-work/"
052 * >Reed Copsey's blog</a>.
053 *
054 * @author Jonathon Hare (jsh2@ecs.soton.ac.uk)
055 */
056public class Parallel {
057        private static class Task<T> implements Runnable {
058                private Iterator<T> iterator;
059                private Operation<T> op;
060
061                public Task(Iterator<T> iterator, Operation<T> op) {
062                        this.iterator = iterator;
063                        this.op = op;
064                }
065
066                @Override
067                public void run() {
068                        while (iterator.hasNext()) {
069                                op.perform(iterator.next());
070                        }
071                }
072        }
073
074        private static class BatchTask<T> implements Runnable {
075                private Iterator<T> iterator;
076                private Operation<Iterator<T>> op;
077
078                public BatchTask(Iterator<T> iterator, Operation<Iterator<T>> op) {
079                        this.iterator = iterator;
080                        this.op = op;
081                }
082
083                @Override
084                public void run() {
085                        op.perform(iterator);
086                }
087        }
088
089        /**
090         * An integer range with a step size.
091         *
092         * @author Jonathon Hare (jsh2@ecs.soton.ac.uk)
093         */
094        public static class IntRange {
095                /**
096                 * Starting value (inclusive)
097                 */
098                public final int start;
099
100                /**
101                 * Stopping value (exclusive)
102                 */
103                public final int stop;
104
105                /**
106                 * Increment amount
107                 */
108                public final int incr;
109
110                IntRange(int start, int stop, int incr) {
111                        this.start = start;
112                        this.stop = stop;
113                        this.incr = incr;
114                }
115        }
116
117        /**
118         * Parallel integer for loop.
119         *
120         * @param start
121         *            starting value
122         * @param stop
123         *            stopping value
124         * @param incr
125         *            increment amount
126         * @param op
127         *            operation to perform
128         * @param pool
129         *            the thread pool.
130         */
131        public static void forIndex(final int start, final int stop, final int incr, final Operation<Integer> op,
132                        final ThreadPoolExecutor pool)
133        {
134                int loops = pool.getMaximumPoolSize();
135                final int ops = (stop - start) / incr;
136
137                final double div = ops / (double) loops;
138                int chunksize = (int) div;
139                int remainder = (int) ((div - chunksize) * loops);
140
141                if (div < 1) {
142                        chunksize = 1;
143                        remainder = 0;
144                        loops = ops;
145                }
146
147                final CountDownLatch latch = new CountDownLatch(loops);
148
149                for (int i = start; i < stop;) {
150                        final int lo = i;
151                        i += chunksize * incr;
152                        if (remainder > 0) {
153                                i += incr;
154                                remainder--;
155                        }
156
157                        final int hi = Math.min(i, stop);
158
159                        pool.submit(new Runnable() {
160                                @Override
161                                public void run() {
162                                        for (int i = lo; i < hi; i += incr)
163                                                op.perform(i);
164                                        latch.countDown();
165                                }
166                        });
167                }
168                try {
169                        latch.await();
170                } catch (final InterruptedException e) {
171                }
172        }
173
174        /**
175         * Parallel integer for loop. Uses the default global thread pool.
176         *
177         * @see GlobalExecutorPool#getPool()
178         *
179         * @param start
180         *            starting value
181         * @param stop
182         *            stopping value
183         * @param incr
184         *            increment amount
185         * @param op
186         *            operation to perform
187         */
188        public static void forIndex(final int start, final int stop, final int incr, final Operation<Integer> op) {
189                forIndex(start, stop, incr, op, GlobalExecutorPool.getPool());
190        }
191
192        /**
193         * Parallel integer for loop. Fundamentally this is the same as
194         * {@link #forIndex(int, int, int, Operation)}, but potentially slightly
195         * faster as it avoids auto-boxing/unboxing and results in fewer method
196         * calls. The downside is that users have to write an extra loop to iterate
197         * over the {@link IntRange} object. Uses the default global thread pool.
198         *
199         * @param start
200         *            starting value
201         * @param stop
202         *            stopping value
203         * @param incr
204         *            increment amount
205         * @param op
206         *            operation to perform
207         */
208        public static void forRange(final int start, final int stop, final int incr, final Operation<IntRange> op) {
209                forRange(start, stop, incr, op, GlobalExecutorPool.getPool());
210        }
211
212        /**
213         * Parallel integer for loop. Fundamentally this is the same as
214         * {@link #forIndex(int, int, int, Operation, ThreadPoolExecutor)}, but
215         * potentially slightly faster as it avoids auto-boxing/unboxing and results
216         * in fewer method calls. The downside is that users have to write an extra
217         * loop to iterate over the {@link IntRange} object.
218         *
219         * @param start
220         *            starting value
221         * @param stop
222         *            stopping value
223         * @param incr
224         *            increment amount
225         * @param op
226         *            operation to perform
227         * @param pool
228         *            the thread pool.
229         */
230        public static void forRange(final int start, final int stop, final int incr, final Operation<IntRange> op,
231                        final ThreadPoolExecutor pool)
232        {
233                int loops = pool.getMaximumPoolSize();
234                final int ops = (stop - start) / incr;
235
236                final double div = ops / (double) loops;
237                int chunksize = (int) div;
238                int remainder = (int) ((div - chunksize) * loops);
239
240                if (div < 1) {
241                        chunksize = 1;
242                        remainder = 0;
243                        loops = ops;
244                }
245
246                final CountDownLatch latch = new CountDownLatch(loops);
247                final Thread thread = Thread.currentThread();
248                final Throwable[] exception = new Throwable[1];
249
250                for (int i = start; i < stop;) {
251                        final int lo = i;
252                        i += chunksize * incr;
253                        if (remainder > 0) {
254                                i += incr;
255                                remainder--;
256                        }
257
258                        final int hi = Math.min(i, stop);
259
260                        pool.submit(new Runnable() {
261                                @Override
262                                public void run() {
263                                        try {
264                                                op.perform(new IntRange(lo, hi, incr));
265                                        } catch (final Throwable t) {
266                                                exception[0] = t;
267                                                thread.interrupt();
268                                        } finally {
269                                                latch.countDown();
270                                        }
271                                }
272                        });
273                }
274                try {
275                        latch.await();
276                } catch (final InterruptedException e) {
277                        if (exception[0] instanceof Error)
278                                throw (Error) exception[0];
279                        if (exception[0] instanceof RuntimeException)
280                                throw (RuntimeException) exception[0];
281                        throw new RuntimeException(exception[0]);
282                }
283        }
284
285        /**
286         * Parallel ForEach loop over {@link Iterable} data. The data is
287         * automatically partitioned; if the data is a {@link List}, then a
288         * {@link RangePartitioner} is used, otherwise a
289         * {@link GrowingChunkPartitioner} is used.
290         *
291         * @see GlobalExecutorPool#getPool()
292         *
293         * @param <T>
294         *            type of the data items
295         * @param objects
296         *            the data
297         * @param op
298         *            the operation to apply
299         * @param pool
300         *            the thread pool.
301         */
302        public static <T> void forEach(final Iterable<T> objects, final Operation<T> op, final ThreadPoolExecutor pool) {
303                Partitioner<T> partitioner;
304                if (objects instanceof List) {
305                        partitioner = new RangePartitioner<T>((List<T>) objects, pool.getMaximumPoolSize());
306                } else {
307                        partitioner = new GrowingChunkPartitioner<T>(objects);
308                }
309                forEach(partitioner, op, pool);
310        }
311
312        /**
313         * Parallel ForEach loop over {@link Iterable} data. Uses the default global
314         * thread pool. The data is automatically partitioned; if the data is a
315         * {@link List}, then a {@link RangePartitioner} is used, otherwise a
316         * {@link GrowingChunkPartitioner} is used.
317         *
318         * @see GlobalExecutorPool#getPool()
319         *
320         * @param <T>
321         *            type of the data items
322         * @param objects
323         *            the data
324         * @param op
325         *            the operation to apply
326         */
327        public static <T> void forEach(final Iterable<T> objects, final Operation<T> op) {
328                forEach(objects, op, GlobalExecutorPool.getPool());
329        }
330
331        /**
332         * Parallel ForEach loop over partitioned data. Uses the default global
333         * thread pool.
334         *
335         * @see GlobalExecutorPool#getPool()
336         *
337         * @param <T>
338         *            type of the data items
339         * @param partitioner
340         *            the partitioner applied to the data
341         * @param op
342         *            the operation to apply
343         */
344        public static <T> void forEach(final Partitioner<T> partitioner, final Operation<T> op) {
345                forEach(partitioner, op, GlobalExecutorPool.getPool());
346        }
347
348        /**
349         * Parallel ForEach loop over partitioned data.
350         * <p>
351         * Implementation details: 1.) create partitions enumerator 2.) schedule
352         * nprocs partitions 3.) while there are still partitions to process 3.1) on
353         * completion of a partition schedule the next one 4.) wait for completion
354         * of remaining partitions
355         *
356         * @param <T>
357         *            type of the data items
358         * @param partitioner
359         *            the partitioner applied to the data
360         * @param op
361         *            the operation to apply
362         * @param pool
363         *            the thread pool.
364         */
365        public static <T>
366        void
367        forEach(final Partitioner<T> partitioner, final Operation<T> op, final ThreadPoolExecutor pool)
368        {
369                final ExecutorCompletionService<Boolean> completion = new ExecutorCompletionService<Boolean>(pool);
370                final Iterator<Iterator<T>> partitions = partitioner.getPartitions();
371                long submitted = 0;
372
373                for (int i = 0; i < pool.getMaximumPoolSize(); i++) {
374                        if (!partitions.hasNext())
375                                break;
376
377                        completion.submit(new Task<T>(partitions.next(), op), true);
378                        submitted++;
379                }
380
381                while (partitions.hasNext()) {
382                        try {
383                                completion.take().get();
384                        } catch (final InterruptedException e) {
385                                e.printStackTrace();
386                        } catch (final ExecutionException e) {
387                                e.printStackTrace();
388                        }
389                        completion.submit(new Task<T>(partitions.next(), op), true);
390                }
391
392                for (int i = 0; i < submitted; i++) {
393                        try {
394                                completion.take().get();
395                        } catch (final InterruptedException e) {
396                                e.printStackTrace();
397                        } catch (final ExecutionException e) {
398                                e.printStackTrace();
399                        }
400                }
401        }
402
403        /**
404         * Parallel ForEach loop over unpartitioned data. This is effectively the
405         * same as using a {@link FixedSizeChunkPartitioner} with a chunk size of 1,
406         * but with slightly less overhead. The unpartitioned for-each loop has
407         * slightly less throughput than a partitioned for-each loop, but exhibits
408         * much less delay in scheduling an item for processing as a partition does
409         * not have to first be populated. The unpartitioned for-each loop is
410         * particularly useful for processing temporal {@link Stream}s of data.
411         * <p>
412         * Implementation details: 1.) create partitions enumerator 2.) schedule
413         * nprocs partitions 3.) while there are still partitions to process 3.1) on
414         * completion of a partition schedule the next one 4.) wait for completion
415         * of remaining partitions
416         *
417         * @param <T>
418         *            type of the data items
419         * @param data
420         *            the iterator of data items
421         * @param op
422         *            the operation to apply
423         */
424        public static <T>
425        void
426        forEachUnpartitioned(final Iterator<T> data, final Operation<T> op)
427        {
428                forEachUnpartitioned(data, op, GlobalExecutorPool.getPool());
429        }
430
431        /**
432         * Parallel ForEach loop over unpartitioned data. This is effectively the
433         * same as using a {@link FixedSizeChunkPartitioner} with a chunk size of 1,
434         * but with slightly less overhead. The unpartitioned for-each loop has
435         * slightly less throughput than a partitioned for-each loop, but exhibits
436         * much less delay in scheduling an item for processing as a partition does
437         * not have to first be populated. The unpartitioned for-each loop is
438         * particularly useful for processing temporal {@link Stream}s of data.
439         * <p>
440         * Implementation details: 1.) create partitions enumerator 2.) schedule
441         * nprocs partitions 3.) while there are still partitions to process 3.1) on
442         * completion of a partition schedule the next one 4.) wait for completion
443         * of remaining partitions
444         *
445         * @param <T>
446         *            type of the data items
447         * @param data
448         *            the iterator of data items
449         * @param op
450         *            the operation to apply
451         * @param pool
452         *            the thread pool.
453         */
454        public static <T>
455        void
456        forEachUnpartitioned(final Iterator<T> data, final Operation<T> op, final ThreadPoolExecutor pool)
457        {
458                final ExecutorCompletionService<Boolean> completion = new ExecutorCompletionService<Boolean>(pool);
459                long submitted = 0;
460
461                for (int i = 0; i < pool.getMaximumPoolSize(); i++) {
462                        if (!data.hasNext())
463                                break;
464
465                        final T next = data.next();
466
467                        completion.submit(new Runnable() {
468                                @Override
469                                public void run() {
470                                        op.perform(next);
471                                }
472                        }, true);
473                        submitted++;
474                }
475
476                while (data.hasNext()) {
477                        final T next = data.next();
478
479                        try {
480                                completion.take().get();
481                        } catch (final InterruptedException e) {
482                                e.printStackTrace();
483                        } catch (final ExecutionException e) {
484                                e.printStackTrace();
485                        }
486                        completion.submit(new Runnable() {
487                                @Override
488                                public void run() {
489                                        op.perform(next);
490                                }
491                        }, true);
492                }
493
494                for (int i = 0; i < submitted; i++) {
495                        try {
496                                completion.take().get();
497                        } catch (final InterruptedException e) {
498                                e.printStackTrace();
499                        } catch (final ExecutionException e) {
500                                e.printStackTrace();
501                        }
502                }
503        }
504
505        /**
506         * Parallel ForEach loop over partitioned data with batches of data.
507         * <p>
508         * Implementation details: 1.) create partitions enumerator 2.) schedule
509         * nprocs partitions 3.) while there are still partitions to process 3.1) on
510         * completion of a partition schedule the next one 4.) wait for completion
511         * of remaining partitions
512         *
513         * @param <T>
514         *            type of the data items
515         * @param partitioner
516         *            the partitioner applied to the data
517         * @param op
518         *            the operation to apply
519         * @param pool
520         *            the thread pool.
521         */
522        public static <T>
523        void
524        forEachPartitioned(final Partitioner<T> partitioner, final Operation<Iterator<T>> op,
525                        final ThreadPoolExecutor pool)
526        {
527                final ExecutorCompletionService<Boolean> completion = new ExecutorCompletionService<Boolean>(pool);
528                final Iterator<Iterator<T>> partitions = partitioner.getPartitions();
529                long submitted = 0;
530
531                for (int i = 0; i < pool.getMaximumPoolSize(); i++) {
532                        if (!partitions.hasNext())
533                                break;
534
535                        completion.submit(new BatchTask<T>(partitions.next(), op), true);
536                        submitted++;
537                }
538
539                while (partitions.hasNext()) {
540                        try {
541                                completion.take().get();
542                        } catch (final InterruptedException e) {
543                                e.printStackTrace();
544                        } catch (final ExecutionException e) {
545                                e.printStackTrace();
546                        }
547                        completion.submit(new BatchTask<T>(partitions.next(), op), true);
548                }
549
550                for (int i = 0; i < submitted; i++) {
551                        try {
552                                completion.take().get();
553                        } catch (final InterruptedException e) {
554                                e.printStackTrace();
555                        } catch (final ExecutionException e) {
556                                e.printStackTrace();
557                        }
558                }
559        }
560
561        /**
562         * Parallel ForEach loop over batched partitioned data. Uses the default
563         * global thread pool.
564         *
565         * @see GlobalExecutorPool#getPool()
566         *
567         * @param <T>
568         *            type of the data items
569         * @param partitioner
570         *            the partitioner applied to the data
571         * @param op
572         *            the operation to apply
573         */
574        public static <T> void forEachPartitioned(final Partitioner<T> partitioner, final Operation<Iterator<T>> op) {
575                forEachPartitioned(partitioner, op, GlobalExecutorPool.getPool());
576        }
577}