Grokbase Groups Hive commits May 2013
FAQ
Author: mithun
Date: Tue May 7 23:14:12 2013
New Revision: 1480128

URL: http://svn.apache.org/r1480128
Log:
HCATALOG-627 - Adding thread-safety to NotificationListener. (amalakar via mithun)

Modified:
     hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java

Modified: hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java?rev=1480128&r1=1480127&r2=1480128&view=diff
==============================================================================
--- hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java (original)
+++ hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java Tue May 7 23:14:12 2013
@@ -24,9 +24,9 @@ import java.util.HashMap;

  import javax.jms.Connection;
  import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
  import javax.jms.Destination;
  import javax.jms.ExceptionListener;
-import javax.jms.IllegalStateException;
  import javax.jms.JMSException;
  import javax.jms.Message;
  import javax.jms.MessageProducer;
@@ -76,17 +76,42 @@ import org.slf4j.LoggerFactory;
  public class NotificationListener extends MetaStoreEventListener {

      private static final Logger LOG = LoggerFactory.getLogger(NotificationListener.class);
- protected Session session;
      protected Connection conn;
      private static MessageFactory messageFactory = MessageFactory.getInstance();
+ public static final int NUM_RETRIES = 1;
+ private static final String HEALTH_CHECK_TOPIC_SUFFIX = "jms_health_check";
+ private static final String HEALTH_CHECK_MSG = "HCAT_JMS_HEALTH_CHECK_MESSAGE";
+
+ protected final ThreadLocal<Session> session = new ThreadLocal<Session>() {
+ @Override
+ protected Session initialValue() {
+ try {
+ return createSession();
+ } catch (Exception e) {
+ LOG.error("Couldn't create JMS Session", e);
+ return null;
+ }
+ }
+
+ @Override
+ public void remove() {
+ if (get() != null) {
+ try {
+ get().close();
+ } catch (Exception e) {
+ LOG.error("Unable to close bad JMS session, ignored error", e);
+ }
+ }
+ super.remove();
+ }
+ };

      /**
       * Create message bus connection and session in constructor.
       */
      public NotificationListener(final Configuration conf) {
-
          super(conf);
- createConnection();
+ testAndCreateConnection();
      }

      private static String getTopicName(Partition partition,
@@ -178,7 +203,7 @@ public class NotificationListener extend
          // Subscriber can get notification about drop of a database in HCAT
          // by listening on a topic named "HCAT" and message selector string
          // as "HCAT_EVENT = HCAT_DROP_DATABASE"
- if (dbEvent.getStatus()) {
+ if (dbEvent.getStatus()) {
              String topicName = getTopicPrefix(dbEvent.getHandler().getHiveConf());
              send(messageFactory.buildDropDatabaseMessage(dbEvent.getDatabase()), topicName);
          }
@@ -216,7 +241,7 @@ public class NotificationListener extend
          }
      }

- private String getTopicPrefix(HiveConf conf) {
+ private String getTopicPrefix(Configuration conf) {
          return conf.get(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX,
              HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX);
      }
@@ -253,86 +278,102 @@ public class NotificationListener extend
       * @param topicName is the name on message broker on which message is sent.
       */
      protected void send(HCatEventMessage hCatEventMessage, String topicName) {
- try {
- if(null == session){
- // this will happen, if we never able to establish a connection.
- createConnection();
- if (null == session){
- // Still not successful, return from here.
- LOG.error("Invalid session. Failed to send message on topic: " +
- topicName + " event: " + hCatEventMessage.getEventType());
- return;
- }
- }
-
- Destination topic = getTopic(topicName);
+ send(hCatEventMessage, topicName, NUM_RETRIES);
+ }

- if (null == topic){
- // Still not successful, return from here.
- LOG.error("Invalid session. Failed to send message on topic: " +
- topicName + " event: " + hCatEventMessage.getEventType());
- return;
+ /**
+ * @param hCatEventMessage The HCatEventMessage being sent over JMS, this method is threadsafe
+ * @param topicName is the name on message broker on which message is sent.
+ * @param retries the number of retry attempts
+ */
+ protected void send(HCatEventMessage hCatEventMessage, String topicName, int retries) {
+ try {
+ if (session.get() == null) {
+ // Need to reconnect
+ throw new JMSException("Invalid JMS session");
              }
-
- MessageProducer producer = session.createProducer(topic);
- Message msg = session.createTextMessage(hCatEventMessage.toString());
+ Destination topic = createTopic(topicName);
+ Message msg = session.get().createTextMessage(hCatEventMessage.toString());

              msg.setStringProperty(HCatConstants.HCAT_EVENT, hCatEventMessage.getEventType().toString());
              msg.setStringProperty(HCatConstants.HCAT_MESSAGE_VERSION, messageFactory.getVersion());
              msg.setStringProperty(HCatConstants.HCAT_MESSAGE_FORMAT, messageFactory.getMessageFormat());
+ MessageProducer producer = createProducer(topic);
              producer.send(msg);
              // Message must be transacted before we return.
- session.commit();
- }
- catch(Exception e){
- // Gobble up the exception. Message delivery is best effort.
- LOG.error("Failed to send message on topic: " + topicName +
- " event: " + hCatEventMessage.getEventType(), e);
+ session.get().commit();
+ } catch (Exception e) {
+ if (retries >= 0) {
+ // this may happen if we were able to establish connection once, but its no longer valid
+ LOG.error("Seems like connection is lost. Will retry. Retries left : " + retries + ". error was:", e);
+ testAndCreateConnection();
+ send(hCatEventMessage, topicName, retries - 1);
+ } else {
+ // Gobble up the exception. Message delivery is best effort.
+ LOG.error("Failed to send message on topic: " + topicName +
+ " event: " + hCatEventMessage.getEventType() + " after retries: " + NUM_RETRIES, e);
+ }
          }
      }

      /**
- * Get the topic object for the topicName, it also tries to reconnect
- * if the connection appears to be broken.
+ * Get the topic object for the topicName
       *
       * @param topicName The String identifying the message-topic.
       * @return A {@link Topic} object corresponding to the specified topicName.
       * @throws JMSException
       */
- protected Topic getTopic(final String topicName) throws JMSException {
- Topic topic;
+ protected Topic createTopic(final String topicName) throws JMSException {
+ return session.get().createTopic(topicName);
+ }
+
+ /**
+ * Does a health check on the connection by sending a dummy message.
+ * Create the connection if the connection is found to be bad
+ * Also recreates the session
+ */
+ protected synchronized void testAndCreateConnection() {
+ if (conn != null) {
+ // This method is reached when error occurs while sending msg, so the session must be bad
+ session.remove();
+ if (!isConnectionHealthy()) {
+ // I am the first thread to detect the error, cleanup old connection & reconnect
+ try {
+ conn.close();
+ } catch (Exception e) {
+ LOG.error("Unable to close bad JMS connection, ignored error", e);
+ }
+ conn = createConnection();
+ }
+ } else {
+ conn = createConnection();
+ }
          try {
- // Topics are created on demand. If it doesn't exist on broker it will
- // be created when broker receives this message.
- topic = session.createTopic(topicName);
- } catch (IllegalStateException ise) {
- // this will happen if we were able to establish connection once, but its no longer valid,
- // ise is thrown, catch it and retry.
- LOG.error("Seems like connection is lost. Retrying", ise);
- createConnection();
- topic = session.createTopic(topicName);
+ session.set(createSession());
+ } catch (JMSException e) {
+ LOG.error("Couldn't create JMS session, ignored the error", e);
          }
- return topic;
      }

- protected void createConnection() {
-
+ /**
+ * Create the JMS connection
+ * @return newly created JMS connection
+ */
+ protected Connection createConnection() {
+ LOG.info("Will create new JMS connection");
          Context jndiCntxt;
+ Connection jmsConnection = null;
          try {
              jndiCntxt = new InitialContext();
- ConnectionFactory connFac = (ConnectionFactory) jndiCntxt
- .lookup("ConnectionFactory");
- Connection conn = connFac.createConnection();
- conn.start();
- conn.setExceptionListener(new ExceptionListener() {
+ ConnectionFactory connFac = (ConnectionFactory) jndiCntxt.lookup("ConnectionFactory");
+ jmsConnection = connFac.createConnection();
+ jmsConnection.start();
+ jmsConnection.setExceptionListener(new ExceptionListener() {
                  @Override
                  public void onException(JMSException jmse) {
- LOG.error(jmse.toString());
+ LOG.error("JMS Exception listener received exception. Ignored the error", jmse);
                  }
              });
- // We want message to be sent when session commits, thus we run in
- // transacted mode.
- session = conn.createSession(true, Session.SESSION_TRANSACTED);
          } catch (NamingException e) {
              LOG.error("JNDI error while setting up Message Bus connection. "
                  + "Please make sure file named 'jndi.properties' is in "
@@ -342,20 +383,54 @@ public class NotificationListener extend
          } catch (Throwable t) {
              LOG.error("Unable to connect to JMS provider", t);
          }
+ return jmsConnection;
+ }
+
+ /**
+ * Send a dummy message to probe if the JMS connection is healthy
+ * @return true if connection is healthy, false otherwise
+ */
+ protected boolean isConnectionHealthy() {
+ try {
+ Topic topic = createTopic(getTopicPrefix(getConf()) + "." + HEALTH_CHECK_TOPIC_SUFFIX);
+ MessageProducer producer = createProducer(topic);
+ Message msg = session.get().createTextMessage(HEALTH_CHECK_MSG);
+ producer.send(msg, DeliveryMode.NON_PERSISTENT, 4, 0);
+ } catch (Exception e) {
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Creates a JMS session
+ * @return newly create JMS session
+ * @throws JMSException
+ */
+ protected Session createSession() throws JMSException {
+ // We want message to be sent when session commits, thus we run in
+ // transacted mode.
+ return conn.createSession(true, Session.SESSION_TRANSACTED);
+ }
+
+ /**
+ * Create a JMS producer
+ * @param topic
+ * @return newly created message producer
+ * @throws JMSException
+ */
+ protected MessageProducer createProducer(Destination topic) throws JMSException {
+ return session.get().createProducer(topic);
      }

      @Override
      protected void finalize() throws Throwable {
- // Close the connection before dying.
- try {
- if (null != session)
- session.close();
- if (conn != null) {
+ if (conn != null) {
+ try {
                  conn.close();
+ } catch (Exception e) {
+ LOG.error("Couldn't close jms connection, ignored the error", e);
              }
-
- } catch (Exception ignore) {
- LOG.info("Failed to close message bus connection.", ignore);
          }
      }

Search Discussions

Related Discussions

Discussion Navigation
viewthread | post
posts ‹ prev | 1 of 1 | next ›
Discussion Overview
groupcommits @
categorieshive, hadoop
postedMay 7, '13 at 11:14p
activeMay 7, '13 at 11:14p
posts1
users1
websitehive.apache.org

1 user in discussion

Mithun: 1 post

People

Translate

site design / logo © 2021 Grokbase