001/**
002 * Copyright (c) 2011, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.hadoop.tools.twitter.token.mode.match;
031
032import java.io.IOException;
033import java.util.ArrayList;
034import java.util.List;
035import java.util.Map;
036import java.util.regex.Pattern;
037
038import org.apache.hadoop.io.LongWritable;
039import org.apache.hadoop.io.NullWritable;
040import org.apache.hadoop.io.Text;
041import org.apache.hadoop.mapreduce.JobContext;
042import org.apache.hadoop.mapreduce.Mapper;
043import org.kohsuke.args4j.CmdLineException;
044import org.openimaj.hadoop.tools.twitter.HadoopTwitterTokenToolOptions;
045import org.openimaj.hadoop.tools.twitter.JsonPathFilterSet;
046
047import com.jayway.jsonpath.JsonPath;
048
049/**
050 * For each tweet match each token against each regex. if the tweet matches at
051 * all, emit the tweet.
052 * 
053 * @author Sina Samangooei (ss@ecs.soton.ac.uk)
054 * 
055 */
056public class TokenRegexMapper extends Mapper<LongWritable, Text, NullWritable, Text> {
057        /**
058                 * 
059                 */
060        public TokenRegexMapper() {
061        }
062
063        private static ArrayList<Pattern> regexes;
064        private static HadoopTwitterTokenToolOptions options;
065        private static JsonPath jsonPath;
066        private static JsonPathFilterSet filters;
067
068        @Override
069        protected void setup(Mapper<LongWritable, Text, NullWritable, Text>.Context context) throws java.io.IOException,
070                        InterruptedException
071        {
072                load(context);
073        };
074
075        private static synchronized void load(JobContext context) throws IOException {
076                if (regexes == null) {
077                        try {
078                                regexes = new ArrayList<Pattern>();
079                                final String[] rstrings = context.getConfiguration().getStrings(TokenRegexStage.REGEX_KEY);
080                                for (final String regex : rstrings) {
081                                        regexes.add(Pattern.compile(regex));
082                                }
083                                options = new HadoopTwitterTokenToolOptions(context.getConfiguration().getStrings(
084                                                HadoopTwitterTokenToolOptions.ARGS_KEY));
085                                options.prepare();
086                                jsonPath = JsonPath.compile(options.getJsonPath());
087                                filters = options.getFilters();
088                        } catch (final CmdLineException e) {
089                                throw new IOException(e);
090                        } catch (final Exception e) {
091                                throw new IOException(e);
092                        }
093                }
094        }
095
096        @Override
097        protected void cleanup(org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, NullWritable, Text>.Context context)
098                        throws IOException, InterruptedException
099        {
100                regexes = null;
101        };
102
103        @SuppressWarnings("unchecked")
104        @Override
105        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, NullWritable, Text>.Context context)
106                        throws java.io.IOException, InterruptedException
107        {
108                List<String> tokens = null;
109                try {
110                        final String svalue = value.toString();
111                        if (!filters.filter(svalue))
112                                return;
113                        final Object found = jsonPath.read(svalue);
114                        if (found == null) {
115                                // System.err.println("Couldn't read the tokens from the tweet");
116                                return;
117                        }
118                        if (found instanceof String) {
119                                tokens = new ArrayList<String>();
120                                tokens.add((String) found);
121                        }
122                        else if (found instanceof List) {
123                                tokens = (List<String>) found;
124                        }
125                        else if (found instanceof Map) {
126                                final Map<String, Object> things = (Map<String, Object>) found;
127                                tokens = new ArrayList<String>();
128                                for (final Object v : things.values()) {
129                                        tokens.add(v.toString());
130                                }
131                        }
132                        if (tokens.size() == 0) {
133                                return; // Quietly quit, value exists but was empty
134                        }
135
136                } catch (final Exception e) {
137                        System.out.println("Couldn't get tokens from:\n" + value + "\nwith jsonpath:\n" + jsonPath);
138                        return;
139                }
140                boolean found = false;
141                for (final String token : tokens) {
142                        for (final Pattern regex : regexes) {
143                                found = regex.matcher(token).find();
144                                if (found)
145                                        break;
146                        }
147                        if (found)
148                                break;
149                }
150                if (found) {
151                        context.write(NullWritable.get(), value);
152                }
153        };
154}