001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.util.parallel; 031 032import java.util.Iterator; 033import java.util.List; 034import java.util.concurrent.CountDownLatch; 035import java.util.concurrent.ExecutionException; 036import java.util.concurrent.ExecutorCompletionService; 037import java.util.concurrent.ThreadPoolExecutor; 038 039import org.openimaj.util.function.Operation; 040import org.openimaj.util.parallel.partition.FixedSizeChunkPartitioner; 041import org.openimaj.util.parallel.partition.GrowingChunkPartitioner; 042import org.openimaj.util.parallel.partition.Partitioner; 043import org.openimaj.util.parallel.partition.RangePartitioner; 044import org.openimaj.util.stream.Stream; 045 046/** 047 * Parallel processing utilities for looping. 048 * <p> 049 * Inspired by the .NET Task Parallel Library. Allows control over the way data 050 * is partitioned using inspiration from <a href= 051 * "http://reedcopsey.com/2010/01/26/parallelism-in-net-part-5-partitioning-of-work/" 052 * >Reed Copsey's blog</a>. 053 * 054 * @author Jonathon Hare (jsh2@ecs.soton.ac.uk) 055 */ 056public class Parallel { 057 private static class Task<T> implements Runnable { 058 private Iterator<T> iterator; 059 private Operation<T> op; 060 061 public Task(Iterator<T> iterator, Operation<T> op) { 062 this.iterator = iterator; 063 this.op = op; 064 } 065 066 @Override 067 public void run() { 068 while (iterator.hasNext()) { 069 op.perform(iterator.next()); 070 } 071 } 072 } 073 074 private static class BatchTask<T> implements Runnable { 075 private Iterator<T> iterator; 076 private Operation<Iterator<T>> op; 077 078 public BatchTask(Iterator<T> iterator, Operation<Iterator<T>> op) { 079 this.iterator = iterator; 080 this.op = op; 081 } 082 083 @Override 084 public void run() { 085 op.perform(iterator); 086 } 087 } 088 089 /** 090 * An integer range with a step size. 091 * 092 * @author Jonathon Hare (jsh2@ecs.soton.ac.uk) 093 */ 094 public static class IntRange { 095 /** 096 * Starting value (inclusive) 097 */ 098 public final int start; 099 100 /** 101 * Stopping value (exclusive) 102 */ 103 public final int stop; 104 105 /** 106 * Increment amount 107 */ 108 public final int incr; 109 110 IntRange(int start, int stop, int incr) { 111 this.start = start; 112 this.stop = stop; 113 this.incr = incr; 114 } 115 } 116 117 /** 118 * Parallel integer for loop. 119 * 120 * @param start 121 * starting value 122 * @param stop 123 * stopping value 124 * @param incr 125 * increment amount 126 * @param op 127 * operation to perform 128 * @param pool 129 * the thread pool. 130 */ 131 public static void forIndex(final int start, final int stop, final int incr, final Operation<Integer> op, 132 final ThreadPoolExecutor pool) 133 { 134 int loops = pool.getMaximumPoolSize(); 135 final int ops = (stop - start) / incr; 136 137 final double div = ops / (double) loops; 138 int chunksize = (int) div; 139 int remainder = (int) ((div - chunksize) * loops); 140 141 if (div < 1) { 142 chunksize = 1; 143 remainder = 0; 144 loops = ops; 145 } 146 147 final CountDownLatch latch = new CountDownLatch(loops); 148 149 for (int i = start; i < stop;) { 150 final int lo = i; 151 i += chunksize * incr; 152 if (remainder > 0) { 153 i += incr; 154 remainder--; 155 } 156 157 final int hi = Math.min(i, stop); 158 159 pool.submit(new Runnable() { 160 @Override 161 public void run() { 162 for (int i = lo; i < hi; i += incr) 163 op.perform(i); 164 latch.countDown(); 165 } 166 }); 167 } 168 try { 169 latch.await(); 170 } catch (final InterruptedException e) { 171 } 172 } 173 174 /** 175 * Parallel integer for loop. Uses the default global thread pool. 176 * 177 * @see GlobalExecutorPool#getPool() 178 * 179 * @param start 180 * starting value 181 * @param stop 182 * stopping value 183 * @param incr 184 * increment amount 185 * @param op 186 * operation to perform 187 */ 188 public static void forIndex(final int start, final int stop, final int incr, final Operation<Integer> op) { 189 forIndex(start, stop, incr, op, GlobalExecutorPool.getPool()); 190 } 191 192 /** 193 * Parallel integer for loop. Fundamentally this is the same as 194 * {@link #forIndex(int, int, int, Operation)}, but potentially slightly 195 * faster as it avoids auto-boxing/unboxing and results in fewer method 196 * calls. The downside is that users have to write an extra loop to iterate 197 * over the {@link IntRange} object. Uses the default global thread pool. 198 * 199 * @param start 200 * starting value 201 * @param stop 202 * stopping value 203 * @param incr 204 * increment amount 205 * @param op 206 * operation to perform 207 */ 208 public static void forRange(final int start, final int stop, final int incr, final Operation<IntRange> op) { 209 forRange(start, stop, incr, op, GlobalExecutorPool.getPool()); 210 } 211 212 /** 213 * Parallel integer for loop. Fundamentally this is the same as 214 * {@link #forIndex(int, int, int, Operation, ThreadPoolExecutor)}, but 215 * potentially slightly faster as it avoids auto-boxing/unboxing and results 216 * in fewer method calls. The downside is that users have to write an extra 217 * loop to iterate over the {@link IntRange} object. 218 * 219 * @param start 220 * starting value 221 * @param stop 222 * stopping value 223 * @param incr 224 * increment amount 225 * @param op 226 * operation to perform 227 * @param pool 228 * the thread pool. 229 */ 230 public static void forRange(final int start, final int stop, final int incr, final Operation<IntRange> op, 231 final ThreadPoolExecutor pool) 232 { 233 int loops = pool.getMaximumPoolSize(); 234 final int ops = (stop - start) / incr; 235 236 final double div = ops / (double) loops; 237 int chunksize = (int) div; 238 int remainder = (int) ((div - chunksize) * loops); 239 240 if (div < 1) { 241 chunksize = 1; 242 remainder = 0; 243 loops = ops; 244 } 245 246 final CountDownLatch latch = new CountDownLatch(loops); 247 final Thread thread = Thread.currentThread(); 248 final Throwable[] exception = new Throwable[1]; 249 250 for (int i = start; i < stop;) { 251 final int lo = i; 252 i += chunksize * incr; 253 if (remainder > 0) { 254 i += incr; 255 remainder--; 256 } 257 258 final int hi = Math.min(i, stop); 259 260 pool.submit(new Runnable() { 261 @Override 262 public void run() { 263 try { 264 op.perform(new IntRange(lo, hi, incr)); 265 } catch (final Throwable t) { 266 exception[0] = t; 267 thread.interrupt(); 268 } finally { 269 latch.countDown(); 270 } 271 } 272 }); 273 } 274 try { 275 latch.await(); 276 } catch (final InterruptedException e) { 277 if (exception[0] instanceof Error) 278 throw (Error) exception[0]; 279 if (exception[0] instanceof RuntimeException) 280 throw (RuntimeException) exception[0]; 281 throw new RuntimeException(exception[0]); 282 } 283 } 284 285 /** 286 * Parallel ForEach loop over {@link Iterable} data. The data is 287 * automatically partitioned; if the data is a {@link List}, then a 288 * {@link RangePartitioner} is used, otherwise a 289 * {@link GrowingChunkPartitioner} is used. 290 * 291 * @see GlobalExecutorPool#getPool() 292 * 293 * @param <T> 294 * type of the data items 295 * @param objects 296 * the data 297 * @param op 298 * the operation to apply 299 * @param pool 300 * the thread pool. 301 */ 302 public static <T> void forEach(final Iterable<T> objects, final Operation<T> op, final ThreadPoolExecutor pool) { 303 Partitioner<T> partitioner; 304 if (objects instanceof List) { 305 partitioner = new RangePartitioner<T>((List<T>) objects, pool.getMaximumPoolSize()); 306 } else { 307 partitioner = new GrowingChunkPartitioner<T>(objects); 308 } 309 forEach(partitioner, op, pool); 310 } 311 312 /** 313 * Parallel ForEach loop over {@link Iterable} data. Uses the default global 314 * thread pool. The data is automatically partitioned; if the data is a 315 * {@link List}, then a {@link RangePartitioner} is used, otherwise a 316 * {@link GrowingChunkPartitioner} is used. 317 * 318 * @see GlobalExecutorPool#getPool() 319 * 320 * @param <T> 321 * type of the data items 322 * @param objects 323 * the data 324 * @param op 325 * the operation to apply 326 */ 327 public static <T> void forEach(final Iterable<T> objects, final Operation<T> op) { 328 forEach(objects, op, GlobalExecutorPool.getPool()); 329 } 330 331 /** 332 * Parallel ForEach loop over partitioned data. Uses the default global 333 * thread pool. 334 * 335 * @see GlobalExecutorPool#getPool() 336 * 337 * @param <T> 338 * type of the data items 339 * @param partitioner 340 * the partitioner applied to the data 341 * @param op 342 * the operation to apply 343 */ 344 public static <T> void forEach(final Partitioner<T> partitioner, final Operation<T> op) { 345 forEach(partitioner, op, GlobalExecutorPool.getPool()); 346 } 347 348 /** 349 * Parallel ForEach loop over partitioned data. 350 * <p> 351 * Implementation details: 1.) create partitions enumerator 2.) schedule 352 * nprocs partitions 3.) while there are still partitions to process 3.1) on 353 * completion of a partition schedule the next one 4.) wait for completion 354 * of remaining partitions 355 * 356 * @param <T> 357 * type of the data items 358 * @param partitioner 359 * the partitioner applied to the data 360 * @param op 361 * the operation to apply 362 * @param pool 363 * the thread pool. 364 */ 365 public static <T> 366 void 367 forEach(final Partitioner<T> partitioner, final Operation<T> op, final ThreadPoolExecutor pool) 368 { 369 final ExecutorCompletionService<Boolean> completion = new ExecutorCompletionService<Boolean>(pool); 370 final Iterator<Iterator<T>> partitions = partitioner.getPartitions(); 371 long submitted = 0; 372 373 for (int i = 0; i < pool.getMaximumPoolSize(); i++) { 374 if (!partitions.hasNext()) 375 break; 376 377 completion.submit(new Task<T>(partitions.next(), op), true); 378 submitted++; 379 } 380 381 while (partitions.hasNext()) { 382 try { 383 completion.take().get(); 384 } catch (final InterruptedException e) { 385 e.printStackTrace(); 386 } catch (final ExecutionException e) { 387 e.printStackTrace(); 388 } 389 completion.submit(new Task<T>(partitions.next(), op), true); 390 } 391 392 for (int i = 0; i < submitted; i++) { 393 try { 394 completion.take().get(); 395 } catch (final InterruptedException e) { 396 e.printStackTrace(); 397 } catch (final ExecutionException e) { 398 e.printStackTrace(); 399 } 400 } 401 } 402 403 /** 404 * Parallel ForEach loop over unpartitioned data. This is effectively the 405 * same as using a {@link FixedSizeChunkPartitioner} with a chunk size of 1, 406 * but with slightly less overhead. The unpartitioned for-each loop has 407 * slightly less throughput than a partitioned for-each loop, but exhibits 408 * much less delay in scheduling an item for processing as a partition does 409 * not have to first be populated. The unpartitioned for-each loop is 410 * particularly useful for processing temporal {@link Stream}s of data. 411 * <p> 412 * Implementation details: 1.) create partitions enumerator 2.) schedule 413 * nprocs partitions 3.) while there are still partitions to process 3.1) on 414 * completion of a partition schedule the next one 4.) wait for completion 415 * of remaining partitions 416 * 417 * @param <T> 418 * type of the data items 419 * @param data 420 * the iterator of data items 421 * @param op 422 * the operation to apply 423 */ 424 public static <T> 425 void 426 forEachUnpartitioned(final Iterator<T> data, final Operation<T> op) 427 { 428 forEachUnpartitioned(data, op, GlobalExecutorPool.getPool()); 429 } 430 431 /** 432 * Parallel ForEach loop over unpartitioned data. This is effectively the 433 * same as using a {@link FixedSizeChunkPartitioner} with a chunk size of 1, 434 * but with slightly less overhead. The unpartitioned for-each loop has 435 * slightly less throughput than a partitioned for-each loop, but exhibits 436 * much less delay in scheduling an item for processing as a partition does 437 * not have to first be populated. The unpartitioned for-each loop is 438 * particularly useful for processing temporal {@link Stream}s of data. 439 * <p> 440 * Implementation details: 1.) create partitions enumerator 2.) schedule 441 * nprocs partitions 3.) while there are still partitions to process 3.1) on 442 * completion of a partition schedule the next one 4.) wait for completion 443 * of remaining partitions 444 * 445 * @param <T> 446 * type of the data items 447 * @param data 448 * the iterator of data items 449 * @param op 450 * the operation to apply 451 * @param pool 452 * the thread pool. 453 */ 454 public static <T> 455 void 456 forEachUnpartitioned(final Iterator<T> data, final Operation<T> op, final ThreadPoolExecutor pool) 457 { 458 final ExecutorCompletionService<Boolean> completion = new ExecutorCompletionService<Boolean>(pool); 459 long submitted = 0; 460 461 for (int i = 0; i < pool.getMaximumPoolSize(); i++) { 462 if (!data.hasNext()) 463 break; 464 465 final T next = data.next(); 466 467 completion.submit(new Runnable() { 468 @Override 469 public void run() { 470 op.perform(next); 471 } 472 }, true); 473 submitted++; 474 } 475 476 while (data.hasNext()) { 477 final T next = data.next(); 478 479 try { 480 completion.take().get(); 481 } catch (final InterruptedException e) { 482 e.printStackTrace(); 483 } catch (final ExecutionException e) { 484 e.printStackTrace(); 485 } 486 completion.submit(new Runnable() { 487 @Override 488 public void run() { 489 op.perform(next); 490 } 491 }, true); 492 } 493 494 for (int i = 0; i < submitted; i++) { 495 try { 496 completion.take().get(); 497 } catch (final InterruptedException e) { 498 e.printStackTrace(); 499 } catch (final ExecutionException e) { 500 e.printStackTrace(); 501 } 502 } 503 } 504 505 /** 506 * Parallel ForEach loop over partitioned data with batches of data. 507 * <p> 508 * Implementation details: 1.) create partitions enumerator 2.) schedule 509 * nprocs partitions 3.) while there are still partitions to process 3.1) on 510 * completion of a partition schedule the next one 4.) wait for completion 511 * of remaining partitions 512 * 513 * @param <T> 514 * type of the data items 515 * @param partitioner 516 * the partitioner applied to the data 517 * @param op 518 * the operation to apply 519 * @param pool 520 * the thread pool. 521 */ 522 public static <T> 523 void 524 forEachPartitioned(final Partitioner<T> partitioner, final Operation<Iterator<T>> op, 525 final ThreadPoolExecutor pool) 526 { 527 final ExecutorCompletionService<Boolean> completion = new ExecutorCompletionService<Boolean>(pool); 528 final Iterator<Iterator<T>> partitions = partitioner.getPartitions(); 529 long submitted = 0; 530 531 for (int i = 0; i < pool.getMaximumPoolSize(); i++) { 532 if (!partitions.hasNext()) 533 break; 534 535 completion.submit(new BatchTask<T>(partitions.next(), op), true); 536 submitted++; 537 } 538 539 while (partitions.hasNext()) { 540 try { 541 completion.take().get(); 542 } catch (final InterruptedException e) { 543 e.printStackTrace(); 544 } catch (final ExecutionException e) { 545 e.printStackTrace(); 546 } 547 completion.submit(new BatchTask<T>(partitions.next(), op), true); 548 } 549 550 for (int i = 0; i < submitted; i++) { 551 try { 552 completion.take().get(); 553 } catch (final InterruptedException e) { 554 e.printStackTrace(); 555 } catch (final ExecutionException e) { 556 e.printStackTrace(); 557 } 558 } 559 } 560 561 /** 562 * Parallel ForEach loop over batched partitioned data. Uses the default 563 * global thread pool. 564 * 565 * @see GlobalExecutorPool#getPool() 566 * 567 * @param <T> 568 * type of the data items 569 * @param partitioner 570 * the partitioner applied to the data 571 * @param op 572 * the operation to apply 573 */ 574 public static <T> void forEachPartitioned(final Partitioner<T> partitioner, final Operation<Iterator<T>> op) { 575 forEachPartitioned(partitioner, op, GlobalExecutorPool.getPool()); 576 } 577}