001package org.openimaj.picslurper.client; 002 003 004import java.io.ByteArrayInputStream; 005import java.io.UnsupportedEncodingException; 006import java.util.List; 007import java.util.Set; 008 009import org.openimaj.io.IOUtils; 010import org.openimaj.picslurper.output.WriteableImageOutput; 011import org.zeromq.ZMQ.Socket; 012 013 014/** 015 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 016 * 017 */ 018public class ZMQStreamingPicslurperClusterer implements Runnable { 019 private Socket subscriber; 020 private TrendDetector detector; 021 022 private static class CleanupRunner implements Runnable { 023 024 @Override 025 public void run() { 026 027 } 028 029 } 030 031 private static class TrendingRunner implements Runnable { 032 033 034 private TrendDetector trendDet; 035 036 public TrendingRunner(TrendDetector instance) { 037 trendDet = instance; 038 } 039 040 @Override 041 public void run() { 042 while (true) { 043 try { 044 Thread.sleep(5000); 045 } catch (InterruptedException e) { 046 } 047 List<Set<WriteableImageOutput>> trending = trendDet.trending(10); 048 for (Set<WriteableImageOutput> set : trending) { 049 System.out.println(String.format("[%d] %s", set.size(),set.toString())); 050 } 051 } 052 } 053 054 } 055 056 public ZMQStreamingPicslurperClusterer(TrendDetector instance) { 057 detector = instance; 058 } 059 060 /** 061 * @param args 062 * @throws UnsupportedEncodingException 063 */ 064 public static void main(String args[]) throws UnsupportedEncodingException { 065 TrendDetector instance = new TrendDetector(); 066 new Thread(new CleanupRunner()).start(); 067 new Thread(new TrendingRunner(instance)).start(); 068 new Thread(new ZMQStreamingPicslurperClusterer(instance)).start(); 069 } 070 071 @Override 072 public void run() { 073 while (true) { 074 subscriber.recv(0); 075 ByteArrayInputStream stream = new ByteArrayInputStream( 076 subscriber.recv(0)); 077 WriteableImageOutput instance = null; 078 try { 079 instance = IOUtils.read(stream, WriteableImageOutput.class,"UTF-8"); 080 detector.indexImage(instance); 081 082 System.out.println("SUCCESS!"); 083 } catch (Throwable e) { 084 System.err.println("FAILED: "); 085 if(instance != null){ 086 System.err.println("instance.file = " + instance.file); 087 System.err.println("instance.url = " + instance.url); 088 089 } 090 e.printStackTrace(); 091 } 092 } 093 } 094}