001/**
002 * Copyright (c) 2011, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.demos.sandbox.ml.linear.learner.stream;
031
032import java.net.UnknownHostException;
033import java.util.List;
034
035import org.openimaj.util.stream.AbstractStream;
036
037import com.mongodb.BasicDBObject;
038import com.mongodb.DB;
039import com.mongodb.DBCollection;
040import com.mongodb.DBCursor;
041import com.mongodb.DBObject;
042import com.mongodb.MongoClient;
043import com.mongodb.ServerAddress;
044
045/**
046 * Encapsulates a MongoDB {@link DBCursor} instantiated from a {@link DBObject} query
047 *
048 * @author Sina Samangooei (ss@ecs.soton.ac.uk)
049 *
050 * @param <T>
051 */
052public abstract class MongoDBQueryStream<T> extends AbstractStream<T>{
053        private MongoClient mongoClient;
054        private DB db;
055        private DBCollection collection;
056        private DBCursor cursor;
057
058        /**
059         * @throws UnknownHostException
060         *
061         */
062        public MongoDBQueryStream() throws UnknownHostException {
063                setup("localhost");
064                prepareCursor();
065        }
066
067        /**
068         * @param seeds
069         * @throws UnknownHostException
070         *
071         */
072        public MongoDBQueryStream(List<ServerAddress> seeds) throws UnknownHostException {
073                this.mongoClient = new MongoClient(seeds);
074                this.db = mongoClient.getDB( getDBName() );
075                this.collection = db.getCollection(getCollectionName());
076                prepareCursor();
077        }
078
079        private void prepareCursor(){
080                this.cursor = this.collection
081                                        .find(getQuery(),getProjection())
082                                        .sort(getSort())
083                                        .limit(getLimit());
084        }
085
086        /**
087         * @return the limit of the query, defaults to 0
088         */
089        public int getLimit() {
090                return 0;
091        }
092
093        /**
094         * @return the data to project from the query
095         */
096        public DBObject getProjection() {
097                return new BasicDBObject();
098        }
099
100        /**
101         * @return how the query show be sorted
102         */
103        public DBObject getSort() {
104                return new BasicDBObject();
105        }
106
107        /**
108         * By Default returns a new and empty {@link BasicDBObject} meaning this
109         * stream goes through all documents in a given collection. Overwrite to alter
110         * query
111         * @return a {@link DBObject} query
112         */
113        public DBObject getQuery() {
114                return new BasicDBObject();
115        }
116
117        /**
118         * @return the name of the collection to query
119         */
120        public abstract String getCollectionName() ;
121
122        /**
123         * @return the name of the database to query
124         */
125        public abstract String getDBName() ;
126
127        private void setup(String host) throws UnknownHostException {
128                this.mongoClient = new MongoClient( host);
129                this.db = mongoClient.getDB( getDBName() );
130                this.collection = db.getCollection(getCollectionName());
131        }
132
133        @Override
134        public boolean hasNext() {
135                return this.cursor.hasNext();
136        }
137
138        @Override
139        public T next() {
140                return constructObjects(this.cursor.next());
141        }
142
143        /**
144         * @param next
145         * @return construct the stream's object using the next document from the {@link DBCursor}
146         */
147        public abstract T constructObjects(DBObject next) ;
148}