001/**
002 * Copyright (c) 2011, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.util.concurrent;
031
032import java.util.AbstractQueue;
033import java.util.Collection;
034import java.util.ConcurrentModificationException;
035import java.util.Iterator;
036import java.util.NoSuchElementException;
037import java.util.concurrent.TimeUnit;
038import java.util.concurrent.locks.Condition;
039import java.util.concurrent.locks.ReentrantLock;
040
041/**
042 * A bounded variant of a {@linkplain BlockingDroppingQueue blocking queue}
043 * backed by an array.
044 *
045 * <p>
046 * Elements in the queue are ordered as FIFO (first-in-first-out). The
047 * <em>head</em> of the queue is that element that has been on the queue the
048 * longest time. The <em>tail</em> of the queue is that element that has been on
049 * the queue the shortest time. New elements are inserted at the tail of the
050 * queue, and the queue retrieval operations obtain elements at the head of the
051 * queue.
052 *
053 * <p>
054 * This is a classic &quot;ring buffer&quot;, in which a fixed-sized array holds
055 * elements inserted by producers and extracted by consumers. Once created, the
056 * capacity cannot be increased. Attempts to <tt>put</tt> an element into a full
057 * queue will result in the oldest item being removed to make room; attempts to
058 * <tt>take</tt> an element from an empty queue will block.
059 *
060 * <p>
061 * This class supports an optional fairness policy for ordering waiting producer
062 * and consumer threads. By default, this ordering is not guaranteed. However, a
063 * queue constructed with fairness set to <tt>true</tt> grants threads access in
064 * FIFO order. Fairness generally decreases throughput but reduces variability
065 * and avoids starvation.
066 *
067 * <p>
068 * This class and its iterator implement all of the <em>optional</em> methods of
069 * the {@link Collection} and {@link Iterator} interfaces.
070 *
071 * @author Jonathon Hare (jsh2@ecs.soton.ac.uk)
072 * @param <E>
073 *            the type of elements held in this collection
074 */
075public class ArrayBlockingDroppingQueue<E> extends AbstractQueue<E>
076implements BlockingDroppingQueue<E>, java.io.Serializable
077{
078        private static final long serialVersionUID = -2040484277582233451L;
079
080        private long insertCount;
081        private long dropCount;
082
083        @Override
084        public E put(E e) throws InterruptedException {
085                if (e == null)
086                        throw new NullPointerException();
087                final E[] items = this.items;
088                final ReentrantLock lock = this.lock;
089                lock.lock();
090                try {
091                        E ret = null;
092                        if (count == items.length) {
093                                // drop an item to make room
094                                ret = extract();
095                                ++dropCount;
096                        }
097                        insert(e);
098
099                        return ret;
100                } finally {
101                        lock.unlock();
102                }
103        }
104
105        @Override
106        public long insertCount() {
107                final ReentrantLock lock = this.lock;
108                lock.lock();
109                try {
110                        return insertCount;
111                } finally {
112                        lock.unlock();
113                }
114        }
115
116        @Override
117        public long dropCount() {
118                final ReentrantLock lock = this.lock;
119                lock.lock();
120                try {
121                        return dropCount;
122                } finally {
123                        lock.unlock();
124                }
125        }
126
127        /**
128         * Inserts element at current put position, advances, and signals. Call only
129         * when holding lock.
130         */
131        private void insert(E x) {
132                items[putIndex] = x;
133                putIndex = inc(putIndex);
134                ++count;
135                ++insertCount;
136                notEmpty.signal();
137        }
138
139        /*************************************************************************************
140         * EVERYTHING BELOW THIS POINT IS COPIED FROM ArrayBlockingQueue.
141         *************************************************************************************/
142        /** The queued items */
143        private final E[] items;
144
145        /** items index for next take, poll or remove */
146        private int takeIndex;
147
148        /** items index for next put, offer, or add. */
149        private int putIndex;
150
151        /** Number of items in the queue */
152        private int count;
153
154        /*
155         * Concurrency control uses the classic two-condition algorithm found in any
156         * textbook.
157         */
158
159        /** Main lock guarding all access */
160        private final ReentrantLock lock;
161        /** Condition for waiting takes */
162        private final Condition notEmpty;
163        /** Condition for waiting puts */
164        private final Condition notFull;
165
166        // Internal helper methods
167
168        /**
169         * Circularly increment i.
170         */
171        final int inc(int i) {
172                return (++i == items.length) ? 0 : i;
173        }
174
175        /**
176         * Extracts element at current take position, advances, and signals. Call
177         * only when holding lock.
178         */
179        private E extract() {
180                final E[] items = this.items;
181                final E x = items[takeIndex];
182                items[takeIndex] = null;
183                takeIndex = inc(takeIndex);
184                --count;
185                notFull.signal();
186                return x;
187        }
188
189        /**
190         * Utility for remove and iterator.remove: Delete item at position i. Call
191         * only when holding lock.
192         */
193        void removeAt(int i) {
194                final E[] items = this.items;
195                // if removing front item, just advance
196                if (i == takeIndex) {
197                        items[takeIndex] = null;
198                        takeIndex = inc(takeIndex);
199                } else {
200                        // slide over all others up through putIndex.
201                        for (;;) {
202                                final int nexti = inc(i);
203                                if (nexti != putIndex) {
204                                        items[i] = items[nexti];
205                                        i = nexti;
206                                } else {
207                                        items[i] = null;
208                                        putIndex = i;
209                                        break;
210                                }
211                        }
212                }
213                --count;
214                notFull.signal();
215        }
216
217        /**
218         * Creates an <tt>ArrayBlockingDroppingQueue</tt> with the given (fixed)
219         * capacity and default access policy.
220         *
221         * @param capacity
222         *            the capacity of this queue
223         * @throws IllegalArgumentException
224         *             if <tt>capacity</tt> is less than 1
225         */
226        public ArrayBlockingDroppingQueue(int capacity) {
227                this(capacity, false);
228        }
229
230        /**
231         * Creates an <tt>ArrayBlockingDroppingQueue</tt> with the given (fixed)
232         * capacity and the specified access policy.
233         *
234         * @param capacity
235         *            the capacity of this queue
236         * @param fair
237         *            if <tt>true</tt> then queue accesses for threads blocked on
238         *            insertion or removal, are processed in FIFO order; if
239         *            <tt>false</tt> the access order is unspecified.
240         * @throws IllegalArgumentException
241         *             if <tt>capacity</tt> is less than 1
242         */
243        @SuppressWarnings("unchecked")
244        public ArrayBlockingDroppingQueue(int capacity, boolean fair) {
245                if (capacity <= 0)
246                        throw new IllegalArgumentException();
247                this.items = (E[]) new Object[capacity];
248                lock = new ReentrantLock(fair);
249                notEmpty = lock.newCondition();
250                notFull = lock.newCondition();
251        }
252
253        /**
254         * Creates an <tt>ArrayBlockingDroppingQueue</tt> with the given (fixed)
255         * capacity, the specified access policy and initially containing the
256         * elements of the given collection, added in traversal order of the
257         * collection's iterator.
258         *
259         * @param capacity
260         *            the capacity of this queue
261         * @param fair
262         *            if <tt>true</tt> then queue accesses for threads blocked on
263         *            insertion or removal, are processed in FIFO order; if
264         *            <tt>false</tt> the access order is unspecified.
265         * @param c
266         *            the collection of elements to initially contain
267         * @throws IllegalArgumentException
268         *             if <tt>capacity</tt> is less than <tt>c.size()</tt>, or less
269         *             than 1.
270         * @throws NullPointerException
271         *             if the specified collection or any of its elements are null
272         */
273        public ArrayBlockingDroppingQueue(int capacity, boolean fair,
274                        Collection<? extends E> c)
275        {
276                this(capacity, fair);
277                if (capacity < c.size())
278                        throw new IllegalArgumentException();
279
280                for (final Iterator<? extends E> it = c.iterator(); it.hasNext();)
281                        add(it.next());
282        }
283
284        /**
285         * Inserts the specified element at the tail of this queue if it is possible
286         * to do so immediately without exceeding the queue's capacity, returning
287         * <tt>true</tt> upon success and throwing an <tt>IllegalStateException</tt>
288         * if this queue is full.
289         *
290         * @param e
291         *            the element to add
292         * @return <tt>true</tt> (as specified by {@link Collection#add})
293         * @throws IllegalStateException
294         *             if this queue is full
295         * @throws NullPointerException
296         *             if the specified element is null
297         */
298        @Override
299        public boolean add(E e) {
300                return super.add(e);
301        }
302
303        @Override
304        public boolean offer(E e) {
305                if (e == null)
306                        throw new NullPointerException();
307                final ReentrantLock lock = this.lock;
308                lock.lock();
309                try {
310                        if (count == items.length)
311                                return false;
312                        else {
313                                insert(e);
314                                return true;
315                        }
316                } finally {
317                        lock.unlock();
318                }
319        }
320
321        @Override
322        public E poll() {
323                final ReentrantLock lock = this.lock;
324                lock.lock();
325                try {
326                        if (count == 0)
327                                return null;
328                        final E x = extract();
329                        return x;
330                } finally {
331                        lock.unlock();
332                }
333        }
334
335        @Override
336        public E take() throws InterruptedException {
337                final ReentrantLock lock = this.lock;
338                lock.lockInterruptibly();
339                try {
340                        try {
341                                while (count == 0)
342                                        notEmpty.await();
343                        } catch (final InterruptedException ie) {
344                                notEmpty.signal(); // propagate to non-interrupted thread
345                                throw ie;
346                        }
347                        final E x = extract();
348                        return x;
349                } finally {
350                        lock.unlock();
351                }
352        }
353
354        @Override
355        public E poll(long timeout, TimeUnit unit) throws InterruptedException {
356                long nanos = unit.toNanos(timeout);
357                final ReentrantLock lock = this.lock;
358                lock.lockInterruptibly();
359                try {
360                        for (;;) {
361                                if (count != 0) {
362                                        final E x = extract();
363                                        return x;
364                                }
365                                if (nanos <= 0)
366                                        return null;
367                                try {
368                                        nanos = notEmpty.awaitNanos(nanos);
369                                } catch (final InterruptedException ie) {
370                                        notEmpty.signal(); // propagate to non-interrupted thread
371                                        throw ie;
372                                }
373
374                        }
375                } finally {
376                        lock.unlock();
377                }
378        }
379
380        @Override
381        public E peek() {
382                final ReentrantLock lock = this.lock;
383                lock.lock();
384                try {
385                        return (count == 0) ? null : items[takeIndex];
386                } finally {
387                        lock.unlock();
388                }
389        }
390
391        // this doc comment is overridden to remove the reference to collections
392        // greater in size than Integer.MAX_VALUE
393        /**
394         * Returns the number of elements in this queue.
395         *
396         * @return the number of elements in this queue
397         */
398        @Override
399        public int size() {
400                final ReentrantLock lock = this.lock;
401                lock.lock();
402                try {
403                        return count;
404                } finally {
405                        lock.unlock();
406                }
407        }
408
409        // this doc comment is a modified copy of the inherited doc comment,
410        // without the reference to unlimited queues.
411        /**
412         * Returns the number of additional elements that this queue can ideally (in
413         * the absence of memory or resource constraints) accept without blocking.
414         * This is always equal to the initial capacity of this queue less the
415         * current <tt>size</tt> of this queue.
416         *
417         * <p>
418         * Note that you <em>cannot</em> always tell if an attempt to insert an
419         * element will succeed by inspecting <tt>remainingCapacity</tt> because it
420         * may be the case that another thread is about to insert or remove an
421         * element.
422         */
423        @Override
424        public int remainingCapacity() {
425                final ReentrantLock lock = this.lock;
426                lock.lock();
427                try {
428                        return items.length - count;
429                } finally {
430                        lock.unlock();
431                }
432        }
433
434        /**
435         * Removes a single instance of the specified element from this queue, if it
436         * is present. More formally, removes an element <tt>e</tt> such that
437         * <tt>o.equals(e)</tt>, if this queue contains one or more such elements.
438         * Returns <tt>true</tt> if this queue contained the specified element (or
439         * equivalently, if this queue changed as a result of the call).
440         *
441         * @param o
442         *            element to be removed from this queue, if present
443         * @return <tt>true</tt> if this queue changed as a result of the call
444         */
445        @Override
446        public boolean remove(Object o) {
447                if (o == null)
448                        return false;
449                final E[] items = this.items;
450                final ReentrantLock lock = this.lock;
451                lock.lock();
452                try {
453                        int i = takeIndex;
454                        int k = 0;
455                        for (;;) {
456                                if (k++ >= count)
457                                        return false;
458                                if (o.equals(items[i])) {
459                                        removeAt(i);
460                                        return true;
461                                }
462                                i = inc(i);
463                        }
464
465                } finally {
466                        lock.unlock();
467                }
468        }
469
470        /**
471         * Returns <tt>true</tt> if this queue contains the specified element. More
472         * formally, returns <tt>true</tt> if and only if this queue contains at
473         * least one element <tt>e</tt> such that <tt>o.equals(e)</tt>.
474         *
475         * @param o
476         *            object to be checked for containment in this queue
477         * @return <tt>true</tt> if this queue contains the specified element
478         */
479        @Override
480        public boolean contains(Object o) {
481                if (o == null)
482                        return false;
483                final E[] items = this.items;
484                final ReentrantLock lock = this.lock;
485                lock.lock();
486                try {
487                        int i = takeIndex;
488                        int k = 0;
489                        while (k++ < count) {
490                                if (o.equals(items[i]))
491                                        return true;
492                                i = inc(i);
493                        }
494                        return false;
495                } finally {
496                        lock.unlock();
497                }
498        }
499
500        /**
501         * Returns an array containing all of the elements in this queue, in proper
502         * sequence.
503         *
504         * <p>
505         * The returned array will be "safe" in that no references to it are
506         * maintained by this queue. (In other words, this method must allocate a
507         * new array). The caller is thus free to modify the returned array.
508         *
509         * <p>
510         * This method acts as bridge between array-based and collection-based APIs.
511         *
512         * @return an array containing all of the elements in this queue
513         */
514        @Override
515        public Object[] toArray() {
516                final E[] items = this.items;
517                final ReentrantLock lock = this.lock;
518                lock.lock();
519                try {
520                        final Object[] a = new Object[count];
521                        int k = 0;
522                        int i = takeIndex;
523                        while (k < count) {
524                                a[k++] = items[i];
525                                i = inc(i);
526                        }
527                        return a;
528                } finally {
529                        lock.unlock();
530                }
531        }
532
533        /**
534         * Returns an array containing all of the elements in this queue, in proper
535         * sequence; the runtime type of the returned array is that of the specified
536         * array. If the queue fits in the specified array, it is returned therein.
537         * Otherwise, a new array is allocated with the runtime type of the
538         * specified array and the size of this queue.
539         *
540         * <p>
541         * If this queue fits in the specified array with room to spare (i.e., the
542         * array has more elements than this queue), the element in the array
543         * immediately following the end of the queue is set to <tt>null</tt>.
544         *
545         * <p>
546         * Like the {@link #toArray()} method, this method acts as bridge between
547         * array-based and collection-based APIs. Further, this method allows
548         * precise control over the runtime type of the output array, and may, under
549         * certain circumstances, be used to save allocation costs.
550         *
551         * <p>
552         * Suppose <tt>x</tt> is a queue known to contain only strings. The
553         * following code can be used to dump the queue into a newly allocated array
554         * of <tt>String</tt>:
555         *
556         * <pre>
557         * String[] y = x.toArray(new String[0]);
558         * </pre>
559         *
560         * Note that <tt>toArray(new Object[0])</tt> is identical in function to
561         * <tt>toArray()</tt>.
562         *
563         * @param a
564         *            the array into which the elements of the queue are to be
565         *            stored, if it is big enough; otherwise, a new array of the
566         *            same runtime type is allocated for this purpose
567         * @return an array containing all of the elements in this queue
568         * @throws ArrayStoreException
569         *             if the runtime type of the specified array is not a supertype
570         *             of the runtime type of every element in this queue
571         * @throws NullPointerException
572         *             if the specified array is null
573         */
574        @SuppressWarnings("unchecked")
575        @Override
576        public <T> T[] toArray(T[] a) {
577                final E[] items = this.items;
578                final ReentrantLock lock = this.lock;
579                lock.lock();
580                try {
581                        if (a.length < count)
582                                a = (T[]) java.lang.reflect.Array.newInstance(a.getClass().getComponentType(), count);
583
584                        int k = 0;
585                        int i = takeIndex;
586                        while (k < count) {
587                                a[k++] = (T) items[i];
588                                i = inc(i);
589                        }
590                        if (a.length > count)
591                                a[count] = null;
592                        return a;
593                } finally {
594                        lock.unlock();
595                }
596        }
597
598        @Override
599        public String toString() {
600                final ReentrantLock lock = this.lock;
601                lock.lock();
602                try {
603                        return super.toString();
604                } finally {
605                        lock.unlock();
606                }
607        }
608
609        /**
610         * Atomically removes all of the elements from this queue. The queue will be
611         * empty after this call returns.
612         */
613        @Override
614        public void clear() {
615                final E[] items = this.items;
616                final ReentrantLock lock = this.lock;
617                lock.lock();
618                try {
619                        int i = takeIndex;
620                        int k = count;
621                        while (k-- > 0) {
622                                items[i] = null;
623                                i = inc(i);
624                        }
625                        count = 0;
626                        putIndex = 0;
627                        takeIndex = 0;
628                        notFull.signalAll();
629                } finally {
630                        lock.unlock();
631                }
632        }
633
634        /**
635         * @throws UnsupportedOperationException
636         *             {@inheritDoc}
637         * @throws ClassCastException
638         *             {@inheritDoc}
639         * @throws NullPointerException
640         *             {@inheritDoc}
641         * @throws IllegalArgumentException
642         *             {@inheritDoc}
643         */
644        @Override
645        public int drainTo(Collection<? super E> c) {
646                if (c == null)
647                        throw new NullPointerException();
648                if (c == this)
649                        throw new IllegalArgumentException();
650                final E[] items = this.items;
651                final ReentrantLock lock = this.lock;
652                lock.lock();
653                try {
654                        int i = takeIndex;
655                        int n = 0;
656                        final int max = count;
657                        while (n < max) {
658                                c.add(items[i]);
659                                items[i] = null;
660                                i = inc(i);
661                                ++n;
662                        }
663                        if (n > 0) {
664                                count = 0;
665                                putIndex = 0;
666                                takeIndex = 0;
667                                notFull.signalAll();
668                        }
669                        return n;
670                } finally {
671                        lock.unlock();
672                }
673        }
674
675        /**
676         * @throws UnsupportedOperationException
677         *             {@inheritDoc}
678         * @throws ClassCastException
679         *             {@inheritDoc}
680         * @throws NullPointerException
681         *             {@inheritDoc}
682         * @throws IllegalArgumentException
683         *             {@inheritDoc}
684         */
685        @Override
686        public int drainTo(Collection<? super E> c, int maxElements) {
687                if (c == null)
688                        throw new NullPointerException();
689                if (c == this)
690                        throw new IllegalArgumentException();
691                if (maxElements <= 0)
692                        return 0;
693                final E[] items = this.items;
694                final ReentrantLock lock = this.lock;
695                lock.lock();
696                try {
697                        int i = takeIndex;
698                        int n = 0;
699
700                        final int max = (maxElements < count) ? maxElements : count;
701                        while (n < max) {
702                                c.add(items[i]);
703                                items[i] = null;
704                                i = inc(i);
705                                ++n;
706                        }
707                        if (n > 0) {
708                                count -= n;
709                                takeIndex = i;
710                                notFull.signalAll();
711                        }
712                        return n;
713                } finally {
714                        lock.unlock();
715                }
716        }
717
718        /**
719         * Returns an iterator over the elements in this queue in proper sequence.
720         * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
721         * will never throw {@link ConcurrentModificationException}, and guarantees
722         * to traverse elements as they existed upon construction of the iterator,
723         * and may (but is not guaranteed to) reflect any modifications subsequent
724         * to construction.
725         *
726         * @return an iterator over the elements in this queue in proper sequence
727         */
728        @Override
729        public Iterator<E> iterator() {
730                final ReentrantLock lock = this.lock;
731                lock.lock();
732                try {
733                        return new Itr();
734                } finally {
735                        lock.unlock();
736                }
737        }
738
739        /**
740         * Iterator for ArrayBlockingDroppingQueue
741         */
742        private class Itr implements Iterator<E> {
743                /**
744                 * Index of element to be returned by next, or a negative number if no
745                 * such.
746                 */
747                private int nextIndex;
748
749                /**
750                 * nextItem holds on to item fields because once we claim that an
751                 * element exists in hasNext(), we must return it in the following
752                 * next() call even if it was in the process of being removed when
753                 * hasNext() was called.
754                 */
755                private E nextItem;
756
757                /**
758                 * Index of element returned by most recent call to next. Reset to -1 if
759                 * this element is deleted by a call to remove.
760                 */
761                private int lastRet;
762
763                Itr() {
764                        lastRet = -1;
765                        if (count == 0)
766                                nextIndex = -1;
767                        else {
768                                nextIndex = takeIndex;
769                                nextItem = items[takeIndex];
770                        }
771                }
772
773                @Override
774                public boolean hasNext() {
775                        /*
776                         * No sync. We can return true by mistake here only if this iterator
777                         * passed across threads, which we don't support anyway.
778                         */
779                        return nextIndex >= 0;
780                }
781
782                /**
783                 * Checks whether nextIndex is valid; if so setting nextItem. Stops
784                 * iterator when either hits putIndex or sees null item.
785                 */
786                private void checkNext() {
787                        if (nextIndex == putIndex) {
788                                nextIndex = -1;
789                                nextItem = null;
790                        } else {
791                                nextItem = items[nextIndex];
792                                if (nextItem == null)
793                                        nextIndex = -1;
794                        }
795                }
796
797                @Override
798                public E next() {
799                        final ReentrantLock lock = ArrayBlockingDroppingQueue.this.lock;
800                        lock.lock();
801                        try {
802                                if (nextIndex < 0)
803                                        throw new NoSuchElementException();
804                                lastRet = nextIndex;
805                                final E x = nextItem;
806                                nextIndex = inc(nextIndex);
807                                checkNext();
808                                return x;
809                        } finally {
810                                lock.unlock();
811                        }
812                }
813
814                @Override
815                public void remove() {
816                        final ReentrantLock lock = ArrayBlockingDroppingQueue.this.lock;
817                        lock.lock();
818                        try {
819                                final int i = lastRet;
820                                if (i == -1)
821                                        throw new IllegalStateException();
822                                lastRet = -1;
823
824                                final int ti = takeIndex;
825                                removeAt(i);
826                                // back up cursor (reset to front if was first element)
827                                nextIndex = (i == ti) ? takeIndex : i;
828                                checkNext();
829                        } finally {
830                                lock.unlock();
831                        }
832                }
833        }
834}