001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.util.concurrent; 031 032import java.util.AbstractQueue; 033import java.util.Collection; 034import java.util.ConcurrentModificationException; 035import java.util.Iterator; 036import java.util.NoSuchElementException; 037import java.util.concurrent.TimeUnit; 038import java.util.concurrent.locks.Condition; 039import java.util.concurrent.locks.ReentrantLock; 040 041/** 042 * A bounded variant of a {@linkplain BlockingDroppingQueue blocking queue} 043 * backed by an array. 044 * 045 * <p> 046 * Elements in the queue are ordered as FIFO (first-in-first-out). The 047 * <em>head</em> of the queue is that element that has been on the queue the 048 * longest time. The <em>tail</em> of the queue is that element that has been on 049 * the queue the shortest time. New elements are inserted at the tail of the 050 * queue, and the queue retrieval operations obtain elements at the head of the 051 * queue. 052 * 053 * <p> 054 * This is a classic "ring buffer", in which a fixed-sized array holds 055 * elements inserted by producers and extracted by consumers. Once created, the 056 * capacity cannot be increased. Attempts to <tt>put</tt> an element into a full 057 * queue will result in the oldest item being removed to make room; attempts to 058 * <tt>take</tt> an element from an empty queue will block. 059 * 060 * <p> 061 * This class supports an optional fairness policy for ordering waiting producer 062 * and consumer threads. By default, this ordering is not guaranteed. However, a 063 * queue constructed with fairness set to <tt>true</tt> grants threads access in 064 * FIFO order. Fairness generally decreases throughput but reduces variability 065 * and avoids starvation. 066 * 067 * <p> 068 * This class and its iterator implement all of the <em>optional</em> methods of 069 * the {@link Collection} and {@link Iterator} interfaces. 070 * 071 * @author Jonathon Hare (jsh2@ecs.soton.ac.uk) 072 * @param <E> 073 * the type of elements held in this collection 074 */ 075public class ArrayBlockingDroppingQueue<E> extends AbstractQueue<E> 076implements BlockingDroppingQueue<E>, java.io.Serializable 077{ 078 private static final long serialVersionUID = -2040484277582233451L; 079 080 private long insertCount; 081 private long dropCount; 082 083 @Override 084 public E put(E e) throws InterruptedException { 085 if (e == null) 086 throw new NullPointerException(); 087 final E[] items = this.items; 088 final ReentrantLock lock = this.lock; 089 lock.lock(); 090 try { 091 E ret = null; 092 if (count == items.length) { 093 // drop an item to make room 094 ret = extract(); 095 ++dropCount; 096 } 097 insert(e); 098 099 return ret; 100 } finally { 101 lock.unlock(); 102 } 103 } 104 105 @Override 106 public long insertCount() { 107 final ReentrantLock lock = this.lock; 108 lock.lock(); 109 try { 110 return insertCount; 111 } finally { 112 lock.unlock(); 113 } 114 } 115 116 @Override 117 public long dropCount() { 118 final ReentrantLock lock = this.lock; 119 lock.lock(); 120 try { 121 return dropCount; 122 } finally { 123 lock.unlock(); 124 } 125 } 126 127 /** 128 * Inserts element at current put position, advances, and signals. Call only 129 * when holding lock. 130 */ 131 private void insert(E x) { 132 items[putIndex] = x; 133 putIndex = inc(putIndex); 134 ++count; 135 ++insertCount; 136 notEmpty.signal(); 137 } 138 139 /************************************************************************************* 140 * EVERYTHING BELOW THIS POINT IS COPIED FROM ArrayBlockingQueue. 141 *************************************************************************************/ 142 /** The queued items */ 143 private final E[] items; 144 145 /** items index for next take, poll or remove */ 146 private int takeIndex; 147 148 /** items index for next put, offer, or add. */ 149 private int putIndex; 150 151 /** Number of items in the queue */ 152 private int count; 153 154 /* 155 * Concurrency control uses the classic two-condition algorithm found in any 156 * textbook. 157 */ 158 159 /** Main lock guarding all access */ 160 private final ReentrantLock lock; 161 /** Condition for waiting takes */ 162 private final Condition notEmpty; 163 /** Condition for waiting puts */ 164 private final Condition notFull; 165 166 // Internal helper methods 167 168 /** 169 * Circularly increment i. 170 */ 171 final int inc(int i) { 172 return (++i == items.length) ? 0 : i; 173 } 174 175 /** 176 * Extracts element at current take position, advances, and signals. Call 177 * only when holding lock. 178 */ 179 private E extract() { 180 final E[] items = this.items; 181 final E x = items[takeIndex]; 182 items[takeIndex] = null; 183 takeIndex = inc(takeIndex); 184 --count; 185 notFull.signal(); 186 return x; 187 } 188 189 /** 190 * Utility for remove and iterator.remove: Delete item at position i. Call 191 * only when holding lock. 192 */ 193 void removeAt(int i) { 194 final E[] items = this.items; 195 // if removing front item, just advance 196 if (i == takeIndex) { 197 items[takeIndex] = null; 198 takeIndex = inc(takeIndex); 199 } else { 200 // slide over all others up through putIndex. 201 for (;;) { 202 final int nexti = inc(i); 203 if (nexti != putIndex) { 204 items[i] = items[nexti]; 205 i = nexti; 206 } else { 207 items[i] = null; 208 putIndex = i; 209 break; 210 } 211 } 212 } 213 --count; 214 notFull.signal(); 215 } 216 217 /** 218 * Creates an <tt>ArrayBlockingDroppingQueue</tt> with the given (fixed) 219 * capacity and default access policy. 220 * 221 * @param capacity 222 * the capacity of this queue 223 * @throws IllegalArgumentException 224 * if <tt>capacity</tt> is less than 1 225 */ 226 public ArrayBlockingDroppingQueue(int capacity) { 227 this(capacity, false); 228 } 229 230 /** 231 * Creates an <tt>ArrayBlockingDroppingQueue</tt> with the given (fixed) 232 * capacity and the specified access policy. 233 * 234 * @param capacity 235 * the capacity of this queue 236 * @param fair 237 * if <tt>true</tt> then queue accesses for threads blocked on 238 * insertion or removal, are processed in FIFO order; if 239 * <tt>false</tt> the access order is unspecified. 240 * @throws IllegalArgumentException 241 * if <tt>capacity</tt> is less than 1 242 */ 243 @SuppressWarnings("unchecked") 244 public ArrayBlockingDroppingQueue(int capacity, boolean fair) { 245 if (capacity <= 0) 246 throw new IllegalArgumentException(); 247 this.items = (E[]) new Object[capacity]; 248 lock = new ReentrantLock(fair); 249 notEmpty = lock.newCondition(); 250 notFull = lock.newCondition(); 251 } 252 253 /** 254 * Creates an <tt>ArrayBlockingDroppingQueue</tt> with the given (fixed) 255 * capacity, the specified access policy and initially containing the 256 * elements of the given collection, added in traversal order of the 257 * collection's iterator. 258 * 259 * @param capacity 260 * the capacity of this queue 261 * @param fair 262 * if <tt>true</tt> then queue accesses for threads blocked on 263 * insertion or removal, are processed in FIFO order; if 264 * <tt>false</tt> the access order is unspecified. 265 * @param c 266 * the collection of elements to initially contain 267 * @throws IllegalArgumentException 268 * if <tt>capacity</tt> is less than <tt>c.size()</tt>, or less 269 * than 1. 270 * @throws NullPointerException 271 * if the specified collection or any of its elements are null 272 */ 273 public ArrayBlockingDroppingQueue(int capacity, boolean fair, 274 Collection<? extends E> c) 275 { 276 this(capacity, fair); 277 if (capacity < c.size()) 278 throw new IllegalArgumentException(); 279 280 for (final Iterator<? extends E> it = c.iterator(); it.hasNext();) 281 add(it.next()); 282 } 283 284 /** 285 * Inserts the specified element at the tail of this queue if it is possible 286 * to do so immediately without exceeding the queue's capacity, returning 287 * <tt>true</tt> upon success and throwing an <tt>IllegalStateException</tt> 288 * if this queue is full. 289 * 290 * @param e 291 * the element to add 292 * @return <tt>true</tt> (as specified by {@link Collection#add}) 293 * @throws IllegalStateException 294 * if this queue is full 295 * @throws NullPointerException 296 * if the specified element is null 297 */ 298 @Override 299 public boolean add(E e) { 300 return super.add(e); 301 } 302 303 @Override 304 public boolean offer(E e) { 305 if (e == null) 306 throw new NullPointerException(); 307 final ReentrantLock lock = this.lock; 308 lock.lock(); 309 try { 310 if (count == items.length) 311 return false; 312 else { 313 insert(e); 314 return true; 315 } 316 } finally { 317 lock.unlock(); 318 } 319 } 320 321 @Override 322 public E poll() { 323 final ReentrantLock lock = this.lock; 324 lock.lock(); 325 try { 326 if (count == 0) 327 return null; 328 final E x = extract(); 329 return x; 330 } finally { 331 lock.unlock(); 332 } 333 } 334 335 @Override 336 public E take() throws InterruptedException { 337 final ReentrantLock lock = this.lock; 338 lock.lockInterruptibly(); 339 try { 340 try { 341 while (count == 0) 342 notEmpty.await(); 343 } catch (final InterruptedException ie) { 344 notEmpty.signal(); // propagate to non-interrupted thread 345 throw ie; 346 } 347 final E x = extract(); 348 return x; 349 } finally { 350 lock.unlock(); 351 } 352 } 353 354 @Override 355 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 356 long nanos = unit.toNanos(timeout); 357 final ReentrantLock lock = this.lock; 358 lock.lockInterruptibly(); 359 try { 360 for (;;) { 361 if (count != 0) { 362 final E x = extract(); 363 return x; 364 } 365 if (nanos <= 0) 366 return null; 367 try { 368 nanos = notEmpty.awaitNanos(nanos); 369 } catch (final InterruptedException ie) { 370 notEmpty.signal(); // propagate to non-interrupted thread 371 throw ie; 372 } 373 374 } 375 } finally { 376 lock.unlock(); 377 } 378 } 379 380 @Override 381 public E peek() { 382 final ReentrantLock lock = this.lock; 383 lock.lock(); 384 try { 385 return (count == 0) ? null : items[takeIndex]; 386 } finally { 387 lock.unlock(); 388 } 389 } 390 391 // this doc comment is overridden to remove the reference to collections 392 // greater in size than Integer.MAX_VALUE 393 /** 394 * Returns the number of elements in this queue. 395 * 396 * @return the number of elements in this queue 397 */ 398 @Override 399 public int size() { 400 final ReentrantLock lock = this.lock; 401 lock.lock(); 402 try { 403 return count; 404 } finally { 405 lock.unlock(); 406 } 407 } 408 409 // this doc comment is a modified copy of the inherited doc comment, 410 // without the reference to unlimited queues. 411 /** 412 * Returns the number of additional elements that this queue can ideally (in 413 * the absence of memory or resource constraints) accept without blocking. 414 * This is always equal to the initial capacity of this queue less the 415 * current <tt>size</tt> of this queue. 416 * 417 * <p> 418 * Note that you <em>cannot</em> always tell if an attempt to insert an 419 * element will succeed by inspecting <tt>remainingCapacity</tt> because it 420 * may be the case that another thread is about to insert or remove an 421 * element. 422 */ 423 @Override 424 public int remainingCapacity() { 425 final ReentrantLock lock = this.lock; 426 lock.lock(); 427 try { 428 return items.length - count; 429 } finally { 430 lock.unlock(); 431 } 432 } 433 434 /** 435 * Removes a single instance of the specified element from this queue, if it 436 * is present. More formally, removes an element <tt>e</tt> such that 437 * <tt>o.equals(e)</tt>, if this queue contains one or more such elements. 438 * Returns <tt>true</tt> if this queue contained the specified element (or 439 * equivalently, if this queue changed as a result of the call). 440 * 441 * @param o 442 * element to be removed from this queue, if present 443 * @return <tt>true</tt> if this queue changed as a result of the call 444 */ 445 @Override 446 public boolean remove(Object o) { 447 if (o == null) 448 return false; 449 final E[] items = this.items; 450 final ReentrantLock lock = this.lock; 451 lock.lock(); 452 try { 453 int i = takeIndex; 454 int k = 0; 455 for (;;) { 456 if (k++ >= count) 457 return false; 458 if (o.equals(items[i])) { 459 removeAt(i); 460 return true; 461 } 462 i = inc(i); 463 } 464 465 } finally { 466 lock.unlock(); 467 } 468 } 469 470 /** 471 * Returns <tt>true</tt> if this queue contains the specified element. More 472 * formally, returns <tt>true</tt> if and only if this queue contains at 473 * least one element <tt>e</tt> such that <tt>o.equals(e)</tt>. 474 * 475 * @param o 476 * object to be checked for containment in this queue 477 * @return <tt>true</tt> if this queue contains the specified element 478 */ 479 @Override 480 public boolean contains(Object o) { 481 if (o == null) 482 return false; 483 final E[] items = this.items; 484 final ReentrantLock lock = this.lock; 485 lock.lock(); 486 try { 487 int i = takeIndex; 488 int k = 0; 489 while (k++ < count) { 490 if (o.equals(items[i])) 491 return true; 492 i = inc(i); 493 } 494 return false; 495 } finally { 496 lock.unlock(); 497 } 498 } 499 500 /** 501 * Returns an array containing all of the elements in this queue, in proper 502 * sequence. 503 * 504 * <p> 505 * The returned array will be "safe" in that no references to it are 506 * maintained by this queue. (In other words, this method must allocate a 507 * new array). The caller is thus free to modify the returned array. 508 * 509 * <p> 510 * This method acts as bridge between array-based and collection-based APIs. 511 * 512 * @return an array containing all of the elements in this queue 513 */ 514 @Override 515 public Object[] toArray() { 516 final E[] items = this.items; 517 final ReentrantLock lock = this.lock; 518 lock.lock(); 519 try { 520 final Object[] a = new Object[count]; 521 int k = 0; 522 int i = takeIndex; 523 while (k < count) { 524 a[k++] = items[i]; 525 i = inc(i); 526 } 527 return a; 528 } finally { 529 lock.unlock(); 530 } 531 } 532 533 /** 534 * Returns an array containing all of the elements in this queue, in proper 535 * sequence; the runtime type of the returned array is that of the specified 536 * array. If the queue fits in the specified array, it is returned therein. 537 * Otherwise, a new array is allocated with the runtime type of the 538 * specified array and the size of this queue. 539 * 540 * <p> 541 * If this queue fits in the specified array with room to spare (i.e., the 542 * array has more elements than this queue), the element in the array 543 * immediately following the end of the queue is set to <tt>null</tt>. 544 * 545 * <p> 546 * Like the {@link #toArray()} method, this method acts as bridge between 547 * array-based and collection-based APIs. Further, this method allows 548 * precise control over the runtime type of the output array, and may, under 549 * certain circumstances, be used to save allocation costs. 550 * 551 * <p> 552 * Suppose <tt>x</tt> is a queue known to contain only strings. The 553 * following code can be used to dump the queue into a newly allocated array 554 * of <tt>String</tt>: 555 * 556 * <pre> 557 * String[] y = x.toArray(new String[0]); 558 * </pre> 559 * 560 * Note that <tt>toArray(new Object[0])</tt> is identical in function to 561 * <tt>toArray()</tt>. 562 * 563 * @param a 564 * the array into which the elements of the queue are to be 565 * stored, if it is big enough; otherwise, a new array of the 566 * same runtime type is allocated for this purpose 567 * @return an array containing all of the elements in this queue 568 * @throws ArrayStoreException 569 * if the runtime type of the specified array is not a supertype 570 * of the runtime type of every element in this queue 571 * @throws NullPointerException 572 * if the specified array is null 573 */ 574 @SuppressWarnings("unchecked") 575 @Override 576 public <T> T[] toArray(T[] a) { 577 final E[] items = this.items; 578 final ReentrantLock lock = this.lock; 579 lock.lock(); 580 try { 581 if (a.length < count) 582 a = (T[]) java.lang.reflect.Array.newInstance(a.getClass().getComponentType(), count); 583 584 int k = 0; 585 int i = takeIndex; 586 while (k < count) { 587 a[k++] = (T) items[i]; 588 i = inc(i); 589 } 590 if (a.length > count) 591 a[count] = null; 592 return a; 593 } finally { 594 lock.unlock(); 595 } 596 } 597 598 @Override 599 public String toString() { 600 final ReentrantLock lock = this.lock; 601 lock.lock(); 602 try { 603 return super.toString(); 604 } finally { 605 lock.unlock(); 606 } 607 } 608 609 /** 610 * Atomically removes all of the elements from this queue. The queue will be 611 * empty after this call returns. 612 */ 613 @Override 614 public void clear() { 615 final E[] items = this.items; 616 final ReentrantLock lock = this.lock; 617 lock.lock(); 618 try { 619 int i = takeIndex; 620 int k = count; 621 while (k-- > 0) { 622 items[i] = null; 623 i = inc(i); 624 } 625 count = 0; 626 putIndex = 0; 627 takeIndex = 0; 628 notFull.signalAll(); 629 } finally { 630 lock.unlock(); 631 } 632 } 633 634 /** 635 * @throws UnsupportedOperationException 636 * {@inheritDoc} 637 * @throws ClassCastException 638 * {@inheritDoc} 639 * @throws NullPointerException 640 * {@inheritDoc} 641 * @throws IllegalArgumentException 642 * {@inheritDoc} 643 */ 644 @Override 645 public int drainTo(Collection<? super E> c) { 646 if (c == null) 647 throw new NullPointerException(); 648 if (c == this) 649 throw new IllegalArgumentException(); 650 final E[] items = this.items; 651 final ReentrantLock lock = this.lock; 652 lock.lock(); 653 try { 654 int i = takeIndex; 655 int n = 0; 656 final int max = count; 657 while (n < max) { 658 c.add(items[i]); 659 items[i] = null; 660 i = inc(i); 661 ++n; 662 } 663 if (n > 0) { 664 count = 0; 665 putIndex = 0; 666 takeIndex = 0; 667 notFull.signalAll(); 668 } 669 return n; 670 } finally { 671 lock.unlock(); 672 } 673 } 674 675 /** 676 * @throws UnsupportedOperationException 677 * {@inheritDoc} 678 * @throws ClassCastException 679 * {@inheritDoc} 680 * @throws NullPointerException 681 * {@inheritDoc} 682 * @throws IllegalArgumentException 683 * {@inheritDoc} 684 */ 685 @Override 686 public int drainTo(Collection<? super E> c, int maxElements) { 687 if (c == null) 688 throw new NullPointerException(); 689 if (c == this) 690 throw new IllegalArgumentException(); 691 if (maxElements <= 0) 692 return 0; 693 final E[] items = this.items; 694 final ReentrantLock lock = this.lock; 695 lock.lock(); 696 try { 697 int i = takeIndex; 698 int n = 0; 699 700 final int max = (maxElements < count) ? maxElements : count; 701 while (n < max) { 702 c.add(items[i]); 703 items[i] = null; 704 i = inc(i); 705 ++n; 706 } 707 if (n > 0) { 708 count -= n; 709 takeIndex = i; 710 notFull.signalAll(); 711 } 712 return n; 713 } finally { 714 lock.unlock(); 715 } 716 } 717 718 /** 719 * Returns an iterator over the elements in this queue in proper sequence. 720 * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that 721 * will never throw {@link ConcurrentModificationException}, and guarantees 722 * to traverse elements as they existed upon construction of the iterator, 723 * and may (but is not guaranteed to) reflect any modifications subsequent 724 * to construction. 725 * 726 * @return an iterator over the elements in this queue in proper sequence 727 */ 728 @Override 729 public Iterator<E> iterator() { 730 final ReentrantLock lock = this.lock; 731 lock.lock(); 732 try { 733 return new Itr(); 734 } finally { 735 lock.unlock(); 736 } 737 } 738 739 /** 740 * Iterator for ArrayBlockingDroppingQueue 741 */ 742 private class Itr implements Iterator<E> { 743 /** 744 * Index of element to be returned by next, or a negative number if no 745 * such. 746 */ 747 private int nextIndex; 748 749 /** 750 * nextItem holds on to item fields because once we claim that an 751 * element exists in hasNext(), we must return it in the following 752 * next() call even if it was in the process of being removed when 753 * hasNext() was called. 754 */ 755 private E nextItem; 756 757 /** 758 * Index of element returned by most recent call to next. Reset to -1 if 759 * this element is deleted by a call to remove. 760 */ 761 private int lastRet; 762 763 Itr() { 764 lastRet = -1; 765 if (count == 0) 766 nextIndex = -1; 767 else { 768 nextIndex = takeIndex; 769 nextItem = items[takeIndex]; 770 } 771 } 772 773 @Override 774 public boolean hasNext() { 775 /* 776 * No sync. We can return true by mistake here only if this iterator 777 * passed across threads, which we don't support anyway. 778 */ 779 return nextIndex >= 0; 780 } 781 782 /** 783 * Checks whether nextIndex is valid; if so setting nextItem. Stops 784 * iterator when either hits putIndex or sees null item. 785 */ 786 private void checkNext() { 787 if (nextIndex == putIndex) { 788 nextIndex = -1; 789 nextItem = null; 790 } else { 791 nextItem = items[nextIndex]; 792 if (nextItem == null) 793 nextIndex = -1; 794 } 795 } 796 797 @Override 798 public E next() { 799 final ReentrantLock lock = ArrayBlockingDroppingQueue.this.lock; 800 lock.lock(); 801 try { 802 if (nextIndex < 0) 803 throw new NoSuchElementException(); 804 lastRet = nextIndex; 805 final E x = nextItem; 806 nextIndex = inc(nextIndex); 807 checkNext(); 808 return x; 809 } finally { 810 lock.unlock(); 811 } 812 } 813 814 @Override 815 public void remove() { 816 final ReentrantLock lock = ArrayBlockingDroppingQueue.this.lock; 817 lock.lock(); 818 try { 819 final int i = lastRet; 820 if (i == -1) 821 throw new IllegalStateException(); 822 lastRet = -1; 823 824 final int ti = takeIndex; 825 removeAt(i); 826 // back up cursor (reset to front if was first element) 827 nextIndex = (i == ti) ? takeIndex : i; 828 checkNext(); 829 } finally { 830 lock.unlock(); 831 } 832 } 833 } 834}