001package org.openimaj.picslurper.client;
002
003
004import java.io.ByteArrayInputStream;
005import java.io.UnsupportedEncodingException;
006import java.util.List;
007import java.util.Set;
008
009import org.openimaj.io.IOUtils;
010import org.openimaj.picslurper.output.WriteableImageOutput;
011import org.zeromq.ZMQ.Socket;
012
013
014/**
015 * @author Sina Samangooei (ss@ecs.soton.ac.uk)
016 *
017 */
018public class ZMQStreamingPicslurperClusterer implements Runnable {
019        private Socket subscriber;
020        private TrendDetector detector;
021
022        private static class CleanupRunner implements Runnable {
023
024                @Override
025                public void run() {
026
027                }
028
029        }
030
031        private static class TrendingRunner implements Runnable {
032
033
034                private TrendDetector trendDet;
035
036                public TrendingRunner(TrendDetector instance) {
037                        trendDet = instance;
038                }
039
040                @Override
041                public void run() {
042                        while (true) {
043                                try {
044                                        Thread.sleep(5000);
045                                } catch (InterruptedException e) {
046                                }
047                                List<Set<WriteableImageOutput>> trending = trendDet.trending(10);
048                                for (Set<WriteableImageOutput> set : trending) {
049                                        System.out.println(String.format("[%d] %s", set.size(),set.toString()));
050                                }
051                        }
052                }
053
054        }
055
056        public ZMQStreamingPicslurperClusterer(TrendDetector instance) {
057                detector = instance;
058        }
059
060        /**
061         * @param args
062         * @throws UnsupportedEncodingException
063         */
064        public static void main(String args[]) throws UnsupportedEncodingException {
065                TrendDetector instance = new TrendDetector();
066                new Thread(new CleanupRunner()).start();
067                new Thread(new TrendingRunner(instance)).start();
068                new Thread(new ZMQStreamingPicslurperClusterer(instance)).start();
069        }
070
071        @Override
072        public void run() {
073                while (true) {
074                        subscriber.recv(0);
075                        ByteArrayInputStream stream = new ByteArrayInputStream(
076                                        subscriber.recv(0));
077                        WriteableImageOutput instance = null;
078                        try {
079                                instance = IOUtils.read(stream, WriteableImageOutput.class,"UTF-8");
080                                detector.indexImage(instance);
081
082                                System.out.println("SUCCESS!");
083                        } catch (Throwable e) {
084                                System.err.println("FAILED: ");
085                                if(instance != null){
086                                        System.err.println("instance.file = " + instance.file);
087                                        System.err.println("instance.url = " + instance.url);
088
089                                }
090                                e.printStackTrace();
091                        }
092                }
093        }
094}