java - Simple Producer Consumer used to send partition notification using HCATALOG -


i not able receive notifications using hcatalog using jms. wrote simple producer consumer program. apache mq service running in background. able send simple text messages using apachemq. "markpartitionforevent()" not able send event consumer's "onmessage()" call. refered following link : https://cwiki.apache.org/confluence/display/hive/hcatalog+notification

please guide

import org.apache.hadoop.hive.conf.hiveconf; import org.apache.hadoop.*; import org.apache.hadoop.conf.configuration; import org.apache.thrift.texception; import javax.jms.messageconsumer; import javax.management.*; import org.apache.activemq.activemqconnectionfactory; import org.apache.hadoop.hive.metastore.hivemetastoreclient; import org.apache.hadoop.hive.metastore.api.partitioneventtype; import org.apache.hadoop.hive.metastore.api.partition; import org.apache.hcatalog.common.hcatconstants; import java.util.properties; import javax.jms.*; import javax.jdo.*; import javax.naming.*; import java.io.*; import java.io.inputstreamreader; import java.util.*; import javax.jms.messageconsumer; import org.slf4j.loggerfactory; import org.datanucleus.api.jdo.*; import javax.jdo.metadata.jdometadata; import org.datanucleus.store.rdbms.rdbmsstoremanager; import org.datanucleus.properties.propertystore; import org.datanucleus.store.abstractstoremanager; import org.apache.hadoop.hive.metastore.api.table;  class consumer implements messagelistener {     public void start(){         try         {                hiveconf hiveconf;             activemqconnectionfactory connfac = new activemqconnectionfactory("tcp://localhost:61616");             connection conn = connfac.createconnection();             conn.start();              hiveconf = new hiveconf(consumer.class);             hivemetastoreclient msc = new hivemetastoreclient(hiveconf);              table table = msc.gettable("mydb","mytbl");             table.getparameters().put(hcatconstants.hcat_msgbus_topic_name, "topic"+ ".hcatalog");             system.out.println("table = " + table.tostring());             map<string,string> map = table.getparameters();             system.out.println("map= " + map.tostring());             string fetchtopic = map.get(hcatconstants.hcat_msgbus_topic_name);              system.out.println("fetchtopic = " + fetchtopic);               string topicname = msc.gettable("mydb",                     "mytbl").getparameters().get(hcatconstants.hcat_msgbus_topic_name);             system.out.println("topicname = " + topicname);              session session = conn.createsession(false, session.auto_acknowledge);             if (session == null)                 system.out.println("null");             system.out.println(session.tostring());              destination hcattopic = session.createtopic(fetchtopic);             messageconsumer consumer = session.createconsumer(hcattopic);             consumer.setmessagelistener(this);         }         catch(exception e){             system.out.println("error");             e.printstacktrace();         }     }     @override         public void onmessage(message message){             try             {                 if(message.getstringproperty(hcatconstants.hcat_event).equals(hcatconstants.hcat_partition_done_event)){                     mapmessage mapmsg = (mapmessage)message;                     enumeration<string> keys = mapmsg.getmapnames();                      while(keys.hasmoreelements())                     {                         string key = keys.nextelement();                         system.out.println(key + " : " + mapmsg.getstring(key));                     }                     system.out.println("message: "+message);                 }             }             catch(exception e){                 system.out.println("error");             }          } };  class producer extends thread {     private hiveconf hiveconf;      public producer()     {     }      @override         public void run() {             try             {                  hiveconf = new hiveconf(this.getclass());                 hivemetastoreclient msc = new hivemetastoreclient(hiveconf);                  hashmap<string,string> partmap = new hashmap<string, string>();                 partmap.put("date","20110711");                 partmap.put("date","20110712");                 partmap.put("date","20110714");                  while(true)                 {                     msc.markpartitionforevent("mydb", "mytbl", partmap, partitioneventtype.load_done);                     thread.sleep(1000);                 }             }             catch(exception e){                 system.out.println("error");             }         }  }; 


Comments

Popular posts from this blog

matlab - "Contour not rendered for non-finite ZData" -

delphi - Indy UDP Read Contents of Adata -

javascript - Any ideas when Firefox is likely to implement lengthAdjust and textLength? -