001/**
002 * Copyright (c) 2011, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.text.nlp.geocode;
031
032import java.io.BufferedReader;
033import java.io.File;
034import java.io.FileInputStream;
035import java.io.IOException;
036import java.io.InputStream;
037import java.io.InputStreamReader;
038import java.io.Reader;
039import java.io.UnsupportedEncodingException;
040import java.text.DateFormat;
041import java.text.SimpleDateFormat;
042import java.util.Date;
043import java.util.LinkedHashMap;
044import java.util.Map;
045
046import org.apache.commons.io.FileUtils;
047import org.apache.solr.client.solrj.embedded.EmbeddedSolrServer;
048import org.apache.solr.common.SolrInputDocument;
049import org.apache.solr.core.CoreContainer;
050import org.apache.solr.core.CoreDescriptor;
051import org.apache.solr.core.SolrConfig;
052import org.apache.solr.core.SolrCore;
053import org.apache.solr.core.SolrResourceLoader;
054import org.apache.solr.schema.IndexSchema;
055import org.slf4j.Logger;
056import org.slf4j.LoggerFactory;
057
058/**
059 * A harvester to parse and index the geo data into an embedded Solr index.
060 *
061 * @author Greg Pendlebury
062 */
063public class GeonamesIndexGenerator {
064    /** Logging */
065    private static Logger log = LoggerFactory.getLogger(GeonamesIndexGenerator.class);
066
067    /** Geonames uses tab-delimited files */
068    private static String DELIMITER = "\t";
069
070    /** Some constant counters */
071    private static int BATCH_SIZE = 20000;
072
073    /** Solr file names */
074    private static String SOLR_CONFIG = "solrconfig.xml";
075    private static String SOLR_SCHEMA = "schema.xml";
076
077    /** Buffered Reader for line by line */
078    private BufferedReader reader;
079
080    /** Basic date formatter */
081    private DateFormat dateFormat;
082
083    /** Column mappings */
084    private static final Map<String, Integer> columns;
085    static {
086        columns = new LinkedHashMap<String,Integer>();
087        columns.put("id",             0);
088        columns.put("utf8_name",      1);
089        columns.put("basic_name",     2);
090        columns.put("alternames", 3);
091        columns.put("latitude",       4);
092        columns.put("longitude",      5);
093        columns.put("feature_class",  6);
094        columns.put("feature_code",   7);
095        columns.put("country_code",   8);
096        // Skip other Country Codes : 9
097        // Skip Admin Codes         : 10-13
098        columns.put("population",     14);
099        columns.put("elevation",      15);
100        columns.put("gtopo30",        16);
101        columns.put("timezone",       17);
102        columns.put("date_modified",  18);
103    }
104
105    /** Solr index */
106    private SolrCore solrCore;
107    private CoreContainer solrContainer;
108    private EmbeddedSolrServer solrServer;
109
110    /**
111     * Basic constructor. Instantiate our reader and Solr.
112     *
113     * @param sourceFile The input file to read
114     * @throws Exception if any errors occur
115     */
116    public GeonamesIndexGenerator(File sourceFile) throws Exception {
117        // Variables
118        InputStream inStream = null;
119        Reader fileReader = null;
120
121        // Open a stream to file
122        try {
123            inStream = new FileInputStream(sourceFile);
124        } catch (IOException ex) {
125            log.error("Error opening file stream!");
126            throw new Exception(ex);
127        }
128
129        // Instantiate a UTF-8 reader from the stream
130        try {
131            fileReader = new InputStreamReader(inStream, "UTF-8");
132        } catch (UnsupportedEncodingException ex) {
133            try {
134                inStream.close();
135            } catch (IOException ioex) {
136                log.error("Failed closing input stream");
137            }
138            log.error("Error starting file reader!");
139            throw new Exception(ex);
140        }
141
142        reader = new BufferedReader(fileReader);
143
144        // Time to bring Solr online
145        // Find the Solr home
146        String solrHome = System.getProperty("geonames.solr.home");
147        if (solrHome == null) {
148            throw new Exception("No 'geonames.solr.home' provided!");
149        }
150        File solrHomeFile = new File(solrHome);
151                if(!solrHomeFile.exists()){
152                        solrHomeFile.mkdirs();
153                        File confDir = new File(solrHomeFile,"conf");
154                        confDir.mkdirs();
155                        FileUtils.copyInputStreamToFile(
156                                GeonamesIndexGenerator.class.getResourceAsStream(SOLR_CONFIG), 
157                                new File(confDir,SOLR_CONFIG)
158                        );
159                        FileUtils.copyInputStreamToFile(
160                                GeonamesIndexGenerator.class.getResourceAsStream(SOLR_SCHEMA), 
161                                new File(confDir,SOLR_SCHEMA)
162                        );
163        }
164        solrServer = startSolr(solrHome);
165    }
166
167    /**
168     * Start up an embedded Solr server.
169     *
170     * @param home The path to the Solr home directory
171     * @return EmbeddedSolrServer: The instantiated server
172     * @throws Exception if any errors occur
173     */
174    private EmbeddedSolrServer startSolr(String home) throws Exception {
175        try {
176            SolrConfig solrConfig = new SolrConfig(home, SOLR_CONFIG, null);
177            IndexSchema schema = new IndexSchema(solrConfig, SOLR_SCHEMA, null);
178
179            solrContainer = new CoreContainer(new SolrResourceLoader(
180                    SolrResourceLoader.locateSolrHome()));
181            CoreDescriptor descriptor = new CoreDescriptor(solrContainer, "",
182                    solrConfig.getResourceLoader().getInstanceDir());
183            descriptor.setConfigName(solrConfig.getResourceName());
184            descriptor.setSchemaName(schema.getResourceName());
185
186            solrCore = new SolrCore(null, solrConfig.getDataDir(),
187                    solrConfig, schema, descriptor);
188            solrContainer.register("cheese", solrCore, false);
189//            CoreAdminRequest.create
190            return new EmbeddedSolrServer(solrContainer, "cheese");
191        } catch(Exception ex) {
192            log.error("\nFailed to start Solr server\n");
193            throw ex;
194        }
195    }
196
197    /**
198     * Return the current date/time.
199     *
200     * @return Date: A Date object with the current date/time.
201     */
202    private Date now() {
203        return new Date();
204    }
205
206    /**
207     * Return a formatted time String of the current time.
208     *
209     * @return String: The current time String in the format HH:MM:SS
210     */
211    private String time() {
212        return time(now());
213    }
214
215    /**
216     * Return a formatted time String for the supplied Date.
217     *
218     * @param date The Date object to format
219     * @return String: The formatted time String in the format HH:MM:SS
220     */
221    private String time(Date date) {
222        if (dateFormat == null) {
223            dateFormat = new SimpleDateFormat("HH:mm:ss");
224        }
225        return dateFormat.format(date);
226    }
227
228    /**
229     * Get the data indicated by the field name, after looking up the index
230     * from the columns map.
231     *
232     * @param data An array of strings containing column data
233     * @param field The field name
234     * @return String: The data in that field, NULL if the field does not exist
235     */
236    private String get(String[] data, String field) {
237        Integer index = columns.get(field);
238        if (index == null) {
239            log.error("Field does not exist: {}", field);
240            return null;
241        }
242        return data[index];
243    }
244
245    /**
246     * Force a commit against the underlying Solr database.
247     *
248     */
249    private void commit() {
250        try {
251            solrServer.commit();
252        } catch(Exception ex) {
253            log.error("Failed to commit: ", ex);
254        }
255    }
256
257    /**
258     * Force an optimize call against the underlying Solr database.
259     *
260     */
261    private void optimize() {
262        try {
263            solrServer.optimize();
264        } catch(Exception ex) {
265            log.error("Failed to commit: ", ex);
266        }
267    }
268
269    /**
270     * Main processing loop for the function
271     *
272     * @param counter The number of rows to execute during this loop
273     * @param print Debugging flag to print all data processed
274     * @return int: The number of rows read this pass
275     * @throws Exception if any errors occur
276     */
277    public int loop(int counter, boolean print) throws Exception {
278        String line = null;
279        int i = 0;
280        try {
281            while (i < counter  && (line = reader.readLine()) != null) {
282                String[] row = line.split(DELIMITER);
283
284                i++;
285                if (print) {
286                    log.debug("====================");
287                    log.debug("Line: {}", i);
288                }
289                process(row, print);
290            }
291        } catch (IOException ex) {
292            throw new Exception(ex);
293        }
294
295        return i;
296    }
297
298    /**
299     * Trivial test for empty Geonames data. Looks for null, empty strings,
300     * or single space characters.
301     *
302     * @param input The data to test
303     * @return boolean: True if the data is consider 'empty', otherwise False
304     */
305    private boolean empty(String input) {
306        if (input == null  || input.equals("") || input.equals(" ")) {
307            return true;
308        }
309        return false;
310    }
311
312    /**
313     * Process the row of data pulled from Geonames.
314     *
315     * @param row A String array containing the columns of data
316     * @param print Debugging flag to print all data processed
317     */
318    private void process(String[] row, boolean print) {
319        if (print) {
320            for (String key : columns.keySet()) {
321                System.out.format("%17s => %20s\n", key, get(row, key));
322            }
323        }
324        try {
325            solrServer.add(createSolrDoc(row));
326        } catch(Exception ex) {
327            log.error("Failed to add document:");
328            for (String key : columns.keySet()) {
329                System.out.format("%17s => %20s\n", key, get(row, key));
330            }
331            log.error("Stack trace: ", ex);
332        }
333    }
334
335    /**
336     * Create a Solr document from the provided Geonames column data.
337     *
338     * @param row A String array containing the columns of data
339     * @return SolrInputDocument: The prepared document
340     */
341    private SolrInputDocument createSolrDoc(String[] row) {
342        float boost = 1.0f;
343
344        SolrInputDocument doc = new SolrInputDocument();
345        for (String key : columns.keySet()) {
346            String data = get(row, key);
347            // Fix dates
348            if (key.equals("date_modified")) {
349                data += "T00:00:00Z";
350            }
351            // Sometimes the geonames 'asciiname' is empty
352            if (key.equals("basic_name")) {
353                if (empty(data)) {
354                    data = get(row, "utf8_name");
355                    //log.warn("{}: ASCII Name missing," +
356                    //       " using UTF-8 version: '{}'", now(), data);
357                }
358                // We need a 'string' version, and a reversed thereof
359                String string = data.toLowerCase();
360                doc.addField("basic_name_str", string);
361                String rev = new StringBuffer(string).reverse().toString();
362                doc.addField("basic_name_rev", rev);
363            }
364            if (key.equals("alternames")){
365                String[] parts = data.split(",");
366                doc.addField("alternames", parts);
367            }
368            // Boost populated locations
369            if (key.equals("feature_code")) {
370                if (data.startsWith("PPL")) {
371                    boost *= 2;
372                }
373            }
374            if (!empty(data)) {
375                doc.addField(key, data);
376            }
377        }
378        // We are placing the boost on a field that all records have the same
379        //  value in. Then add 'AND boost:boost' to all queries.
380        doc.addField("boost", "boost", boost);
381        return doc;
382    }
383
384    /**
385     * Shutdown function for cleaning up instantiated object.
386     *
387     */
388    public void shutdown() {
389        if (reader != null) {
390            try {
391                reader.close();
392            } catch (IOException ex) {
393                log.error("Error shutting down the Reader!", ex);
394            }
395        }
396        if (solrContainer != null) {
397            solrContainer.shutdown();
398        }
399    }
400
401    /**
402     * Command line entry point.
403     *
404     * @param args Array of String parameters from the command line
405     * @throws IOException 
406     */
407    public static void main(String[] args) throws IOException {
408        // Make we were given an appropriate parameter
409        if (args.length < 1) {
410            log.error("ERROR: Usage requires input file!");
411            return;
412        }
413
414        // Validate it
415        File file = new File(args[0]);
416        if (file == null || !file.exists()) {
417            log.error("ERROR: The input file does not exist!");
418            return;
419        }
420
421        // Get ready to harvest
422        GeonamesIndexGenerator harvester = null;
423        try {
424            harvester = new GeonamesIndexGenerator(file);
425        } catch (Exception ex) {
426            // A reason for death was logged in the constructor
427            log.error("Stack trace: ", ex);
428        }
429
430        log.debug("\n\n===================\n\n");
431
432        // Tracking variables
433        Date start = harvester.now();
434        int count = 0;
435
436        // Run a single batch
437        try {
438            for (int i = 0; i < 500; i++) {
439                int read = harvester.loop(BATCH_SIZE, false);
440                count += read;
441                log.info("{}: Rows read: {}", harvester.time(), count);
442
443                // Commit after each batch
444                try {
445                    harvester.commit();
446                } catch (Exception ex) {
447                    log.info("Commit failed: {}", harvester.time());
448                    log.error("Stack trace: ", ex);
449                }
450
451                // Did we finish?
452                if (read != BATCH_SIZE) {
453                    break;
454                }
455            }
456        } catch (Exception ex) {
457            log.error("ERROR: An error occurred in the processing loop: ", ex);
458        }
459
460        // Reporting
461        Date finish = harvester.now();
462        float duration = (float) (finish.getTime() - start.getTime()) / (float) 1000;
463        log.info("\n\nTotal time for execution: {}", duration);
464        log.info("Total records processed: {}", count);
465        if (count == 0) {
466            log.info("Average records per second: 0");
467        } else {
468            float speed = (float) count / (float) duration;
469            log.info("Average records per second: {}", speed);
470        }
471
472        try {
473            harvester.commit();
474            log.info("\n{}: Index optimize...", harvester.time());
475            harvester.optimize();
476            log.info("{}: ... completed", harvester.time());
477        } catch (Exception ex) {
478            log.info("{}: ... failed", harvester.time());
479            log.error("Stack trace: ", ex);
480        }
481        log.info("\n\n===================\n\n");
482
483        harvester.shutdown();
484    }
485}