FAQ
Author: khorgath
Date: Wed Apr 8 17:16:12 2015
New Revision: 1672123

URL: http://svn.apache.org/r1672123
Log:
HIVE-8164 : Adding in a ReplicationTask that converts a Notification Event to actionable tasks (Sushanth Sowmyan, reviewed by Alan Gates)

Added:
     hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/
     hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/Command.java
     hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/HCatReplicationTaskIterator.java
     hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/NoopReplicationTask.java
     hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/ReplicationTask.java
     hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/ReplicationUtils.java
     hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/StagingDirectoryProvider.java
     hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/commands/
     hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/commands/NoopCommand.java
Modified:
     hive/trunk/hcatalog/webhcat/java-client/pom.xml
     hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClient.java
     hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java
     hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatNotificationEvent.java
     hive/trunk/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java

Modified: hive/trunk/hcatalog/webhcat/java-client/pom.xml
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/java-client/pom.xml?rev=1672123&r1=1672122&r2=1672123&view=diff
==============================================================================
--- hive/trunk/hcatalog/webhcat/java-client/pom.xml (original)
+++ hive/trunk/hcatalog/webhcat/java-client/pom.xml Wed Apr 8 17:16:12 2015
@@ -46,6 +46,11 @@
        <version>${project.version}</version>
      </dependency>
      <dependency>
+ <groupId>org.apache.hive.hcatalog</groupId>
+ <artifactId>hive-hcatalog-server-extensions</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-exec</artifactId>
        <version>${project.version}</version>

Modified: hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClient.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClient.java?rev=1672123&r1=1672122&r2=1672123&view=diff
==============================================================================
--- hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClient.java (original)
+++ hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClient.java Wed Apr 8 17:16:12 2015
@@ -18,6 +18,7 @@
   */
  package org.apache.hive.hcatalog.api;

+import java.util.Iterator;
  import java.util.List;
  import java.util.Map;

@@ -27,6 +28,7 @@ import org.apache.hadoop.hive.common.cla
  import org.apache.hadoop.hive.metastore.IMetaStoreClient;
  import org.apache.hadoop.hive.metastore.api.PartitionEventType;
  import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hive.hcatalog.api.repl.ReplicationTask;
  import org.apache.hive.hcatalog.common.HCatException;
  import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;

@@ -467,6 +469,23 @@ public abstract class HCatClient {
     */
    public abstract String getMessageBusTopicName(String dbName, String tableName) throws HCatException;

+
+ /**
+ * Get an iterator that iterates over a list of replication tasks needed to replicate all the
+ * events that have taken place for a given db/table.
+ * @param lastEventId : The last event id that was processed for this reader. The returned
+ * replication tasks will start from this point forward
+ * @param maxEvents : Maximum number of events to consider for generating the
+ * replication tasks. If < 1, then all available events will be considered.
+ * @param dbName : The database name for which we're interested in the events for.
+ * @param tableName : The table name for which we're interested in the events for - if null,
+ * then this function will behave as if it were running at a db level.
+ * @return an iterator over a list of replication events that can be processed one by one.
+ * @throws HCatException
+ */
+ public abstract Iterator<ReplicationTask> getReplicationTasks(
+ long lastEventId, int maxEvents, String dbName, String tableName) throws HCatException;
+
    /**
     * Get a list of notifications
     * @param lastEventId The last event id that was consumed by this reader. The returned

Modified: hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java?rev=1672123&r1=1672122&r2=1672123&view=diff
==============================================================================
--- hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java (original)
+++ hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java Wed Apr 8 17:16:12 2015
@@ -21,9 +21,11 @@ package org.apache.hive.hcatalog.api;
  import java.io.IOException;
  import java.util.ArrayList;
  import java.util.Arrays;
+import java.util.Iterator;
  import java.util.List;
  import java.util.Map;

+import com.google.common.base.Function;
  import com.google.common.collect.Lists;
  import com.google.common.collect.Maps;
  import org.apache.commons.lang.StringUtils;
@@ -63,6 +65,8 @@ import org.apache.hadoop.hive.serde2.obj
  import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
  import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
  import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hive.hcatalog.api.repl.HCatReplicationTaskIterator;
+import org.apache.hive.hcatalog.api.repl.ReplicationTask;
  import org.apache.hive.hcatalog.common.HCatConstants;
  import org.apache.hive.hcatalog.common.HCatException;
  import org.apache.hive.hcatalog.common.HCatUtil;
@@ -72,6 +76,8 @@ import org.apache.thrift.TException;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;

+import javax.annotation.Nullable;
+
  /**
   * The HCatClientHMSImpl is the Hive Metastore client based implementation of
   * HCatClient.
@@ -965,18 +971,27 @@ public class HCatClientHMSImpl extends H
    }

    @Override
+ public Iterator<ReplicationTask> getReplicationTasks(
+ long lastEventId, int maxEvents, String dbName, String tableName) throws HCatException {
+ return new HCatReplicationTaskIterator(this,lastEventId,maxEvents,dbName,tableName);
+ }
+
+ @Override
    public List<HCatNotificationEvent> getNextNotification(long lastEventId, int maxEvents,
                                                           IMetaStoreClient.NotificationFilter filter)
        throws HCatException {
      try {
- List<HCatNotificationEvent> events = new ArrayList<HCatNotificationEvent>();
        NotificationEventResponse rsp = hmsClient.getNextNotification(lastEventId, maxEvents, filter);
        if (rsp != null && rsp.getEvents() != null) {
- for (NotificationEvent event : rsp.getEvents()) {
- events.add(new HCatNotificationEvent(event));
- }
+ return Lists.transform(rsp.getEvents(), new Function<NotificationEvent, HCatNotificationEvent>() {
+ @Override
+ public HCatNotificationEvent apply(@Nullable NotificationEvent notificationEvent) {
+ return new HCatNotificationEvent(notificationEvent);
+ }
+ });
+ } else {
+ return new ArrayList<HCatNotificationEvent>();
        }
- return events;
      } catch (TException e) {
        throw new ConnectionFailureException("TException while getting notifications", e);
      }

Modified: hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatNotificationEvent.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatNotificationEvent.java?rev=1672123&r1=1672122&r2=1672123&view=diff
==============================================================================
--- hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatNotificationEvent.java (original)
+++ hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatNotificationEvent.java Wed Apr 8 17:16:12 2015
@@ -32,6 +32,8 @@ public class HCatNotificationEvent {
    private String tableName;
    private String message;

+ public enum Scope { DB, TABLE, UNKNOWN };
+
    HCatNotificationEvent(NotificationEvent event) {
      eventId = event.getEventId();
      eventTime = event.getEventTime();
@@ -45,6 +47,20 @@ public class HCatNotificationEvent {
      return eventId;
    }

+ public Scope getEventScope() {
+ // Eventually, we want this to be a richer description of having
+ // a DB, TABLE, ROLE, etc scope. For now, we have a trivial impl
+ // of having only DB and TABLE scopes, as determined by whether
+ // or not the tableName is null.
+ if (dbName != null){
+ if (tableName != null){
+ return Scope.TABLE;
+ }
+ return Scope.DB;
+ }
+ return Scope.UNKNOWN;
+ }
+
    public int getEventTime() {
      return eventTime;
    }

Added: hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/Command.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/Command.java?rev=1672123&view=auto
==============================================================================
--- hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/Command.java (added)
+++ hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/Command.java Wed Apr 8 17:16:12 2015
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hive.hcatalog.api.repl;
+
+import org.apache.hadoop.io.Writable;
+
+import java.util.List;
+
+/**
+ * Interface that abstracts the notion of one atomic command to execute.
+ * If the command does not execute and raises some exception, then Command
+ * provides a conditional to check if the operation is intended to be
+ * retriable - i.e. whether the command is considered idempotent. If it is,
+ * then the user could attempt to redo the particular command they were
+ * running. If not, then they can check another conditional to check
+ * if their action is undo-able. If undoable, then they can then attempt
+ * to undo the action by asking the command how to undo it. If not, they
+ * can then in turn act upon the exception in whatever manner they see
+ * fit (typically by raising an error).
+ *
+ * We also have two more methods that help cleanup of temporary locations
+ * used by this Command. cleanupLocationsPerRetry() provides a list of
+ * directories that are intended to be cleaned up every time this Command
+ * needs to be retried. cleanupLocationsAfterEvent() provides a list of
+ * directories that should be cleaned up after the event for which this
+ * Command is generated is successfully processed.
+ */
+public interface Command extends Writable {
+ List<String> get();
+ boolean isRetriable();
+ boolean isUndoable();
+ List<String> getUndo();
+ List<String> cleanupLocationsPerRetry();
+ List<String> cleanupLocationsAfterEvent();
+ long getEventId();
+}

Added: hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/HCatReplicationTaskIterator.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/HCatReplicationTaskIterator.java?rev=1672123&view=auto
==============================================================================
--- hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/HCatReplicationTaskIterator.java (added)
+++ hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/HCatReplicationTaskIterator.java Wed Apr 8 17:16:12 2015
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hive.hcatalog.api.repl;
+
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hive.hcatalog.api.HCatClient;
+import org.apache.hive.hcatalog.api.HCatNotificationEvent;
+import org.apache.hive.hcatalog.common.HCatException;
+
+import java.util.Iterator;
+
+public class HCatReplicationTaskIterator implements Iterator<ReplicationTask>{
+ private Iterator<HCatNotificationEvent> notifIter = null;
+
+ private class HCatReplicationTaskIteratorNotificationFilter implements IMetaStoreClient.NotificationFilter {
+
+ private String dbName;
+ private String tableName;
+ public HCatReplicationTaskIteratorNotificationFilter(String dbName, String tableName){
+ this.dbName = dbName;
+ this.tableName = tableName;
+ }
+ @Override
+ public boolean accept(NotificationEvent event) {
+ if (event == null){
+ return false; // get rid of trivial case first, so that we can safely assume non-null
+ }
+ if (this.dbName == null){
+ return true; // if our dbName is null, we're interested in all wh events
+ }
+ if (this.dbName.equalsIgnoreCase(event.getDbName())){
+ if (
+ (this.tableName == null)
+ // if our dbName is equal, but tableName is blank, we're interested in this db-level event
+ || (this.tableName.equalsIgnoreCase(event.getTableName()))
+ // table level event that matches us
+ ){
+ return true;
+ }
+ }
+ return false;
+ }
+ }
+
+ public HCatReplicationTaskIterator(
+ HCatClient hcatClient, long eventFrom, int maxEvents, String dbName, String tableName) throws HCatException {
+
+ init(hcatClient,eventFrom,maxEvents, new HCatReplicationTaskIteratorNotificationFilter(dbName,tableName));
+ }
+
+ public HCatReplicationTaskIterator(
+ HCatClient hcatClient, long eventFrom, int maxEvents,
+ IMetaStoreClient.NotificationFilter filter) throws HCatException{
+ init(hcatClient,eventFrom,maxEvents,filter);
+ }
+ private void init(HCatClient hcatClient, long eventFrom, int maxEvents, IMetaStoreClient.NotificationFilter filter) throws HCatException {
+ // Simple implementation for now, this will later expand to do DAG evaluation.
+ this.notifIter = hcatClient.getNextNotification(eventFrom, maxEvents,filter).iterator();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return notifIter.hasNext();
+ }
+
+ @Override
+ public ReplicationTask next() {
+ return ReplicationTask.create(notifIter.next());
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("remove() not supported on HCatReplicationTaskIterator");
+ }
+
+
+
+}
+
+
+

Added: hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/NoopReplicationTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/NoopReplicationTask.java?rev=1672123&view=auto
==============================================================================
--- hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/NoopReplicationTask.java (added)
+++ hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/NoopReplicationTask.java Wed Apr 8 17:16:12 2015
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hive.hcatalog.api.repl;
+
+import org.apache.hive.hcatalog.api.HCatNotificationEvent;
+import org.apache.hive.hcatalog.api.repl.commands.NoopCommand;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This class is there to help testing, and to help initial development
+ * and will be the default Replication Task for under-development replication
+ * tasks to override.
+ *
+ * This is not intended to be a permanent class, and will likely move to the test
+ * package after initial implementation.
+ */
+
+public class NoopReplicationTask extends ReplicationTask {
+
+ List<Command> noopReturn = null;
+
+ public NoopReplicationTask(HCatNotificationEvent event) {
+ super(event);
+ noopReturn = new ArrayList<Command>();
+ noopReturn.add(new NoopCommand(event.getEventId()));
+ }
+
+ @Override
+ public boolean needsStagingDirs() {
+ return false;
+ }
+
+ @Override
+ public boolean isActionable(){
+ return true;
+ }
+
+ /**
+ * Returns a list of commands to send to a hive driver on the source warehouse
+ * @return a list of commands to send to a hive driver on the source warehouse
+ */
+ @Override
+ public Iterable<? extends Command> getSrcWhCommands() {
+ verifyActionable();
+ return noopReturn;
+ }
+
+ /**
+ * Returns a list of commands to send to a hive driver on the dest warehouse
+ * @return a list of commands to send to a hive driver on the dest warehouse
+ */
+ @Override
+ public Iterable<? extends Command> getDstWhCommands() {
+ verifyActionable();
+ return noopReturn;
+ }
+
+}
+

Added: hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/ReplicationTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/ReplicationTask.java?rev=1672123&view=auto
==============================================================================
--- hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/ReplicationTask.java (added)
+++ hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/ReplicationTask.java Wed Apr 8 17:16:12 2015
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hive.hcatalog.api.repl;
+
+import com.google.common.base.Function;
+import org.apache.hive.hcatalog.api.HCatNotificationEvent;
+import org.apache.hive.hcatalog.common.HCatConstants;
+import org.apache.hive.hcatalog.messaging.MessageFactory;
+
+
+/**
+ * ReplicationTask captures the concept of what it'd take to replicate changes from
+ * one warehouse to another given a notification event that captures what changed.
+ */
+public abstract class ReplicationTask {
+ protected HCatNotificationEvent event;
+ protected StagingDirectoryProvider srcStagingDirProvider = null;
+ protected StagingDirectoryProvider dstStagingDirProvider = null;
+ protected Function<String,String> tableNameMapping = null;
+ protected Function<String,String> dbNameMapping = null;
+
+ protected static MessageFactory messageFactory = MessageFactory.getInstance();
+
+ public interface Factory {
+ public ReplicationTask create(HCatNotificationEvent event);
+ }
+
+ /**
+ * Dummy NoopFactory for testing, returns a NoopReplicationTask for all recognized events.
+ * Warning : this will eventually go away or move to the test section - it's intended only
+ * for integration testing purposes.
+ */
+ public static class NoopFactory implements Factory {
+ @Override
+ public ReplicationTask create(HCatNotificationEvent event) {
+ // TODO : Java 1.7+ support using String with switches, but IDEs don't all seem to know that.
+ // If casing is fine for now. But we should eventually remove this. Also, I didn't want to
+ // create another enum just for this.
+ String eventType = event.getEventType();
+ if (eventType.equals(HCatConstants.HCAT_CREATE_DATABASE_EVENT)) {
+ return new NoopReplicationTask(event);
+ } else if (eventType.equals(HCatConstants.HCAT_DROP_DATABASE_EVENT)) {
+ return new NoopReplicationTask(event);
+ } else if (eventType.equals(HCatConstants.HCAT_CREATE_TABLE_EVENT)) {
+ return new NoopReplicationTask(event);
+ } else if (eventType.equals(HCatConstants.HCAT_DROP_TABLE_EVENT)) {
+ return new NoopReplicationTask(event);
+ } else if (eventType.equals(HCatConstants.HCAT_ADD_PARTITION_EVENT)) {
+ return new NoopReplicationTask(event);
+ } else if (eventType.equals(HCatConstants.HCAT_DROP_PARTITION_EVENT)) {
+ return new NoopReplicationTask(event);
+ } else if (eventType.equals(HCatConstants.HCAT_ALTER_TABLE_EVENT)) {
+ return new NoopReplicationTask(event);
+ } else if (eventType.equals(HCatConstants.HCAT_ALTER_PARTITION_EVENT)) {
+ return new NoopReplicationTask(event);
+ } else if (eventType.equals(HCatConstants.HCAT_INSERT_EVENT)) {
+ return new NoopReplicationTask(event);
+ } else {
+ throw new IllegalStateException("Unrecognized Event type, no replication task available");
+ }
+ }
+ }
+
+ private static Factory factoryInstance = null;
+ private static Factory getFactoryInstance() {
+ if (factoryInstance == null){
+ // TODO: Eventually, we'll have a bit here that looks at a config param to instantiate
+ // the appropriate factory, with EXIMFactory being the default - that allows
+ // others to implement their own ReplicationTask.Factory for other replication
+ // implementations.
+ // That addition will be brought in by the EXIMFactory patch.
+ factoryInstance = new NoopFactory();
+ }
+ return factoryInstance;
+ }
+
+ /**
+ * Factory method to return appropriate subtype of ReplicationTask for given event
+ * @param event HCatEventMessage returned by the notification subsystem
+ * @return corresponding ReplicationTask
+ */
+ public static ReplicationTask create(HCatNotificationEvent event){
+ if (event == null){
+ throw new IllegalArgumentException("event should not be null");
+ }
+ return getFactoryInstance().create(event);
+ }
+
+ // Primary entry point is a factory method instead of ctor
+ // to allow for future ctor mutabulity in design
+ protected ReplicationTask(HCatNotificationEvent event) {
+ this.event = event;
+ }
+
+ /**
+ * Returns the event that this ReplicationTask is attempting to replicate
+ * @return underlying event
+ */
+ public HCatNotificationEvent getEvent(){
+ return this.event;
+ }
+
+ /**
+ * Returns true if the replication task in question needs to create staging
+ * directories to complete its operation. This will mean that you will need
+ * to copy these directories over to the destination warehouse for each
+ * source-destination warehouse pair.
+ * If this is true, you will need to call .withSrcStagingDirProvider(...)
+ * and .withDstStagingDirProvider(...) before this ReplicationTask is usable
+ */
+ public abstract boolean needsStagingDirs();
+
+ /**
+ * Returns true if this ReplicationTask is prepared with all info it needs, and is
+ * ready to be used
+ */
+ public boolean isActionable(){
+ if (! this.needsStagingDirs()) {
+ return true;
+ }
+ if ((srcStagingDirProvider != null) && (dstStagingDirProvider != null)){
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * See {@link org.apache.hive.hcatalog.api.repl.StagingDirectoryProvider}
+ * @param srcStagingDirProvider Staging Directory Provider for the source warehouse
+ * @return this
+ */
+ public ReplicationTask withSrcStagingDirProvider(StagingDirectoryProvider srcStagingDirProvider){
+ this.srcStagingDirProvider = srcStagingDirProvider;
+ return this;
+ }
+
+ /**
+ * See {@link org.apache.hive.hcatalog.api.repl.StagingDirectoryProvider}
+ * @param dstStagingDirProvider Staging Directory Provider for the destination warehouse
+ * @return this replication task
+ */
+ public ReplicationTask withDstStagingDirProvider(StagingDirectoryProvider dstStagingDirProvider){
+ this.dstStagingDirProvider = dstStagingDirProvider;
+ return this;
+ }
+
+ /**
+ * Allows a user to specify a table name mapping, where the the function provided maps the name of
+ * the table in the source warehouse to the name of the table in the dest warehouse. It is expected
+ * that if the mapping does not exist, it should return the same name sent in. Or, if the function
+ * throws an IllegalArgumentException as well, a ReplicationTask will use the same key sent in.
+ * That way, the default will then be that the destination db name is the same as the src db name
+ *
+ * If you want to use a Map<String,String> mapping instead of a Function<String,String>,
+ * simply call this function as .withTableNameMapping(com.google.common.base.Functions.forMap(tableMap))
+ * @param tableNameMapping
+ * @return this replication task
+ */
+ public ReplicationTask withTableNameMapping(Function<String,String> tableNameMapping){
+ this.tableNameMapping = tableNameMapping;
+ return this;
+ }
+
+ /**
+ * Allows a user to specify a db name mapping, where the the function provided maps the name of
+ * the db in the source warehouse to the name of the db in the dest warehouse. It is expected
+ * that if the mapping does not exist, it should return the same name sent in. Or, if the function
+ * throws an IllegalArgumentException as well, a ReplicationTask will use the same key sent in.
+ * That way, the default will then be that the destination db name is the same as the src db name
+ *
+ * If you want to use a Map<String,String> mapping instead of a Function<String,String>,
+ * simply call this function as .withDb(com.google.common.base.Functions.forMap(dbMap))
+ * @param dbNameMapping
+ * @return this replication task
+ */
+ public ReplicationTask withDbNameMapping(Function<String,String> dbNameMapping){
+ this.dbNameMapping = dbNameMapping;
+ return this;
+ }
+
+ protected void verifyActionable() {
+ if (!this.isActionable()){
+ throw new IllegalStateException("actionable command on task called when ReplicationTask is still not actionable.");
+ }
+ }
+
+ /**
+ * Returns a Iterable<Command> to send to a hive driver on the source warehouse
+ *
+ * If you *need* a List<Command> instead, you can use guava's
+ * ImmutableList.copyOf(iterable) or Lists.newArrayList(iterable) to
+ * get the underlying list, but this defeats the purpose of making this
+ * interface an Iterable rather than a List, since it is very likely
+ * that the number of Commands returned here will cause your process
+ * to run OOM.
+ */
+ abstract public Iterable<? extends Command> getSrcWhCommands();
+
+ /**
+ * Returns a Iterable<Command> to send to a hive driver on the source warehouse
+ *
+ * If you *need* a List<Command> instead, you can use guava's
+ * ImmutableList.copyOf(iterable) or Lists.newArrayList(iterable) to
+ * get the underlying list, but this defeats the purpose of making this
+ * interface an Iterable rather than a List, since it is very likely
+ * that the number of Commands returned here will cause your process
+ * to run OOM.
+ */
+ abstract public Iterable<? extends Command> getDstWhCommands();
+
+ protected void validateEventType(HCatNotificationEvent event, String allowedEventType) {
+ if (event == null || !allowedEventType.equals(event.getEventType())){
+ throw new IllegalStateException(this.getClass().getName() + " valid only for " +
+ allowedEventType + " events.");
+ }
+ }
+}
+

Added: hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/ReplicationUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/ReplicationUtils.java?rev=1672123&view=auto
==============================================================================
--- hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/ReplicationUtils.java (added)
+++ hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/ReplicationUtils.java Wed Apr 8 17:16:12 2015
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hive.hcatalog.api.repl;
+
+import com.google.common.base.Function;
+import com.google.common.base.Objects;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.io.IOExceptionWithCause;
+import org.apache.hive.hcatalog.api.HCatDatabase;
+import org.apache.hive.hcatalog.api.HCatPartition;
+import org.apache.hive.hcatalog.api.HCatTable;
+import org.apache.hive.hcatalog.data.ReaderWriter;
+
+import javax.annotation.Nullable;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Map;
+
+public class ReplicationUtils {
+
+ private final static String REPL_STATE_ID = "repl.last.id"; // TODO : define in ReplicationSpec, and point this to that once that's patched in.
+
+ private ReplicationUtils(){
+ // dummy private constructor, since this class is a collection of static utility methods.
+ }
+
+ /**
+ * Gets the last known replication state of this db. This is
+ * applicable only if it is the destination of a replication
+ * and has had data replicated into it via imports previously.
+ * Defaults to 0.
+ */
+ public static long getLastReplicationId(HCatDatabase db){
+ Map<String, String> props = db.getProperties();
+ if (props != null){
+ if (props.containsKey(REPL_STATE_ID)){
+ return Long.parseLong(props.get(REPL_STATE_ID));
+ }
+ }
+ return 0l; // default is to return earliest possible state.
+ }
+
+
+ /**
+ * Gets the last known replication state of the provided table. This
+ * is applicable only if it is the destination of a replication
+ * and has had data replicated into it via imports previously.
+ * Defaults to 0.
+ */
+ public static long getLastReplicationId(HCatTable tbl) {
+ Map<String, String> tblProps = tbl.getTblProps();
+ if (tblProps != null){
+ if (tblProps.containsKey(REPL_STATE_ID)){
+ return Long.parseLong(tblProps.get(REPL_STATE_ID));
+ }
+ }
+ return 0l; // default is to return earliest possible state.
+ }
+
+ /**
+ * Gets the last known replication state of the provided partition.
+ * This is applicable only if it is the destination of a replication
+ * and has had data replicated into it via imports previously.
+ * If that is not available, but parent table is provided,
+ * defaults to parent table's replication state. If that is also
+ * unknown, defaults to 0.
+ */
+ public static long getLastReplicationId(HCatPartition ptn, @Nullable HCatTable parentTable) {
+ Map<String,String> parameters = ptn.getParameters();
+ if (parameters != null){
+ if (parameters.containsKey(REPL_STATE_ID)){
+ return Long.parseLong(parameters.get(REPL_STATE_ID));
+ }
+ }
+
+ if (parentTable != null){
+ return getLastReplicationId(parentTable);
+ }
+ return 0l; // default is to return earliest possible state.
+ }
+
+ /**
+ * Used to generate a unique key for a combination of given event id, dbname,
+ * tablename and partition keyvalues. This is used to feed in a name for creating
+ * staging directories for exports and imports. This should be idempotent given
+ * the same values, i.e. hashcode-like, but at the same time, be guaranteed to be
+ * different for every possible partition, while being "readable-ish". Basically,
+ * we concat the alphanumberic versions of all of the above, along with a hashcode
+ * of the db, tablename and ptn key-value pairs
+ */
+ public static String getUniqueKey(long eventId, String db, String table, Map<String, String> ptnDesc) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(eventId);
+ sb.append('.');
+ sb.append(toStringWordCharsOnly(db));
+ sb.append('.');
+ sb.append(toStringWordCharsOnly(table));
+ sb.append('.');
+ sb.append(toStringWordCharsOnly(ptnDesc));
+ sb.append('.');
+ sb.append(Objects.hashCode(db, table, ptnDesc));
+ return sb.toString();
+ }
+
+ /**
+ * Return alphanumeric(and '_') representation of a Map<String,String>
+ *
+ */
+ private static String toStringWordCharsOnly(Map<String, String> map) {
+ if (map == null){
+ return "null";
+ }
+ StringBuilder sb = new StringBuilder();
+ boolean first = true;
+ for (Map.Entry<String,String> e : map.entrySet()){
+ if (!first){
+ sb.append(',');
+ }
+ sb.append(toStringWordCharsOnly(e.getKey()));
+ sb.append('=');
+ sb.append(toStringWordCharsOnly(e.getValue()));
+ first = false;
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Return alphanumeric(and '_') chars only of a string, lowercased
+ */
+ public static String toStringWordCharsOnly(String s){
+ return (s == null) ? "null" : s.replaceAll("[\\W]", "").toLowerCase();
+ }
+
+ /**
+ * Return a mapping from a given map function if available, and the key itself if not.
+ */
+ public static String mapIfMapAvailable(String s, Function<String, String> mapping){
+ try {
+ return mapping.apply(s);
+ } catch (IllegalArgumentException iae){
+ // The key wasn't present in the mapping, return the key itself, since no mapping was available
+ return s;
+ }
+ }
+
+ public static String partitionDescriptor(Map<String,String> ptnDesc) {
+ StringBuilder sb = new StringBuilder();
+ if ((ptnDesc != null) && (!ptnDesc.isEmpty())){
+ boolean first = true;
+ sb.append(" PARTITION (");
+ for (Map.Entry e : ptnDesc.entrySet()){
+ if (!first){
+ sb.append(", ");
+ } else {
+ first = false;
+ }
+ sb.append(e.getKey()); // TODO : verify if any quoting is needed for keys
+ sb.append('=');
+ sb.append('"');
+ sb.append(e.getValue()); // TODO : verify if any escaping is needed for values
+ sb.append('"');
+ }
+ sb.append(')');
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Command implements Writable, but that's not terribly easy to use compared
+ * to String, even if it plugs in easily into the rest of Hadoop. Provide
+ * utility methods to easily serialize and deserialize Commands
+ *
+ * serializeCommand returns a base64 String representation of given command
+ */
+ public static String serializeCommand(Command command) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutput dataOutput = new DataOutputStream(baos);
+ ReaderWriter.writeDatum(dataOutput,command.getClass().getName());
+ command.write(dataOutput);
+ return Base64.encodeBase64URLSafeString(baos.toByteArray());
+ }
+
+ /**
+ * Command implements Writable, but that's not terribly easy to use compared
+ * to String, even if it plugs in easily into the rest of Hadoop. Provide
+ * utility methods to easily serialize and deserialize Commands
+ *
+ * deserializeCommand instantiates a concrete Command and initializes it,
+ * given a base64 String representation of it.
+ */
+ public static Command deserializeCommand(String s) throws IOException {
+ DataInput dataInput = new DataInputStream(new ByteArrayInputStream(Base64.decodeBase64(s)));
+ String clazz = (String) ReaderWriter.readDatum(dataInput);
+ Command cmd;
+ try {
+ cmd = (Command)Class.forName(clazz).newInstance();
+ } catch (Exception e) {
+ throw new IOExceptionWithCause("Error instantiating class "+clazz,e);
+ }
+ cmd.readFields(dataInput);
+ return cmd;
+ }
+
+}

Added: hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/StagingDirectoryProvider.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/StagingDirectoryProvider.java?rev=1672123&view=auto
==============================================================================
--- hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/StagingDirectoryProvider.java (added)
+++ hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/StagingDirectoryProvider.java Wed Apr 8 17:16:12 2015
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hive.hcatalog.api.repl;
+
+/**
+ * Interface for a client to provide a Staging Directory specification
+ */
+public interface StagingDirectoryProvider {
+
+ /**
+ * Return a temporary staging directory for a given key
+ * @param key key for the directory, usually a name of a partition
+ * Note that when overriding this method, no guarantees are made about the
+ * contents of the key, other than that is unique per partition.
+ * @return A parth specification to use as a temporary staging directory
+ */
+ String getStagingDirectory(String key);
+
+ /**
+ * Trivial implementation of this interface - creates
+ */
+ public class TrivialImpl implements StagingDirectoryProvider {
+
+ String prefix = null;
+
+ /**
+ * Trivial implementation of StagingDirectoryProvider which takes a temporary directory
+ * and creates directories inside that for each key. Note that this is intended as a
+ * trivial implementation, and if any further "advanced" behaviour is desired,
+ * it is better that the user roll their own.
+ *
+ * @param base temp directory inside which other tmp dirs are created
+ * @param separator path separator. Usually should be "/"
+ */
+ public TrivialImpl(String base,String separator){
+ this.prefix = base + separator;
+ }
+
+ @Override
+ public String getStagingDirectory(String key) {
+ return prefix + key;
+ }
+ }
+}

Added: hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/commands/NoopCommand.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/commands/NoopCommand.java?rev=1672123&view=auto
==============================================================================
--- hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/commands/NoopCommand.java (added)
+++ hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/commands/NoopCommand.java Wed Apr 8 17:16:12 2015
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hive.hcatalog.api.repl.commands;
+
+
+import org.apache.hive.hcatalog.api.repl.Command;
+import org.apache.hive.hcatalog.data.ReaderWriter;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This class is there to help testing, and to help initial development
+ * and will be the default Command for NoopReplicationTask
+ *
+ * This is not intended to be a permanent class, and will likely move to the test
+ * package after initial implementation.
+ */
+
+public class NoopCommand implements Command {
+ private long eventId;
+
+ public NoopCommand(){
+ // trivial ctor to support Writable reflections instantiation
+ // do not expect to use this object as-is, unless you call
+ // readFields after using this ctor
+ }
+
+ public NoopCommand(long eventId){
+ this.eventId = eventId;
+ }
+
+ @Override
+ public List<String> get() {
+ return new ArrayList<String>();
+ }
+
+ @Override
+ public boolean isRetriable() {
+ return true;
+ }
+
+ @Override
+ public boolean isUndoable() {
+ return true;
+ }
+
+ @Override
+ public List<String> getUndo() {
+ return new ArrayList<String>();
+ }
+
+ @Override
+ public List<String> cleanupLocationsPerRetry() {
+ return new ArrayList<String>();
+ }
+
+ @Override
+ public List<String> cleanupLocationsAfterEvent() {
+ return new ArrayList<String>();
+ }
+
+ @Override
+ public long getEventId() {
+ return eventId;
+ }
+
+ @Override
+ public void write(DataOutput dataOutput) throws IOException {
+ ReaderWriter.writeDatum(dataOutput, Long.valueOf(eventId));
+ }
+
+ @Override
+ public void readFields(DataInput dataInput) throws IOException {
+ eventId = ((Long)ReaderWriter.readDatum(dataInput)).longValue();
+ }
+}
+

Modified: hive/trunk/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java?rev=1672123&r1=1672122&r2=1672123&view=diff
==============================================================================
--- hive/trunk/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java (original)
+++ hive/trunk/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java Wed Apr 8 17:16:12 2015
@@ -18,18 +18,24 @@
   */
  package org.apache.hive.hcatalog.api;

+import java.io.IOException;
  import java.math.BigInteger;
  import java.util.ArrayList;
  import java.util.Arrays;
  import java.util.EnumSet;
  import java.util.HashMap;
+import java.util.Iterator;
  import java.util.List;
  import java.util.Map;
  import java.util.Random;

+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.hive.conf.HiveConf;
  import org.apache.hadoop.hive.metastore.HiveMetaStore;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
  import org.apache.hadoop.hive.metastore.api.PartitionEventType;
  import org.apache.hadoop.hive.ql.WindowsPathUtil;
  import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
@@ -42,12 +48,17 @@ import org.apache.hadoop.hive.ql.metadat
  import org.apache.hadoop.hive.serde.serdeConstants;
  import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe;
  import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hive.hcatalog.api.repl.Command;
+import org.apache.hive.hcatalog.api.repl.ReplicationTask;
+import org.apache.hive.hcatalog.api.repl.ReplicationUtils;
+import org.apache.hive.hcatalog.api.repl.StagingDirectoryProvider;
  import org.apache.hive.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
  import org.apache.hive.hcatalog.common.HCatConstants;
  import org.apache.hive.hcatalog.common.HCatException;
  import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
  import org.apache.hive.hcatalog.data.schema.HCatFieldSchema.Type;
  import org.apache.hive.hcatalog.NoExitSecurityManager;
+import org.apache.hive.hcatalog.listener.DbNotificationListener;
  import org.junit.AfterClass;
  import org.junit.BeforeClass;
  import org.junit.Test;
@@ -63,6 +74,8 @@ import static org.junit.Assert.assertArr

  import org.apache.hadoop.util.Shell;

+import javax.annotation.Nullable;
+
  public class TestHCatClient {
    private static final Logger LOG = LoggerFactory.getLogger(TestHCatClient.class);
    private static final String msPort = "20101";
@@ -113,6 +126,8 @@ public class TestHCatClient {
        WindowsPathUtil.convertPathsFromWindowsToHdfs(hcatConf);
      }

+ System.setProperty(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS.varname,
+ DbNotificationListener.class.getName()); // turn on db notification listener on metastore
      Thread t = new Thread(new RunMS(msPort));
      t.start();
      Thread.sleep(10000);
@@ -792,6 +807,113 @@ public class TestHCatClient {
      }
    }

+ /**
+ * Test for event-based replication scenario
+ *
+ * Does not test if replication actually happened, merely tests if we're able to consume a repl task
+ * iter appropriately, calling all the functions expected of the interface, without errors.
+ */
+ @Test
+ public void testReplicationTaskIter() throws Exception {
+
+ HCatClient sourceMetastore = HCatClient.create(new Configuration(hcatConf));
+
+ List<HCatNotificationEvent> notifs = sourceMetastore.getNextNotification(
+ 0, 0, new IMetaStoreClient.NotificationFilter() {
+ @Override
+ public boolean accept(NotificationEvent event) {
+ return true;
+ }
+ });
+ for(HCatNotificationEvent n : notifs){
+ LOG.info("notif from dblistener:" + n.getEventId()
+ + ":" + n.getEventTime() + ",t:" + n.getEventType() + ",o:" + n.getDbName() + "." + n.getTableName());
+ }
+
+ Iterator<ReplicationTask> taskIter = sourceMetastore.getReplicationTasks(0, 0, "mydb", null);
+ while(taskIter.hasNext()){
+ ReplicationTask task = taskIter.next();
+ HCatNotificationEvent n = task.getEvent();
+ LOG.info("notif from tasks:" + n.getEventId()
+ + ":" + n.getEventTime() + ",t:" + n.getEventType() + ",o:" + n.getDbName() + "." + n.getTableName()
+ + ",s:" + n.getEventScope());
+ LOG.info("task :" + task.getClass().getName());
+ if (task.needsStagingDirs()){
+ StagingDirectoryProvider provider = new StagingDirectoryProvider() {
+ @Override
+ public String getStagingDirectory(String key) {
+ LOG.info("getStagingDirectory(" + key + ") called!");
+ return "/tmp/" + key.replaceAll(" ","_");
+ }
+ };
+ task
+ .withSrcStagingDirProvider(provider)
+ .withDstStagingDirProvider(provider);
+ }
+ if (task.isActionable()){
+ LOG.info("task was actionable!");
+ Function<Command, String> commandDebugPrinter = new Function<Command, String>() {
+ @Override
+ public String apply(@Nullable Command cmd) {
+ StringBuilder sb = new StringBuilder();
+ String serializedCmd = null;
+ try {
+ serializedCmd = ReplicationUtils.serializeCommand(cmd);
+ } catch (IOException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ sb.append("SERIALIZED:"+serializedCmd+"\n");
+ Command command = null;
+ try {
+ command = ReplicationUtils.deserializeCommand(serializedCmd);
+ } catch (IOException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ sb.append("CMD:[" + command.getClass().getName() + "]\n");
+ sb.append("EVENTID:[" +command.getEventId()+"]\n");
+ for (String s : command.get()) {
+ sb.append("CMD:" + s);
+ sb.append("\n");
+ }
+ sb.append("Retriable:" + command.isRetriable() + "\n");
+ sb.append("Undoable:" + command.isUndoable() + "\n");
+ if (command.isUndoable()) {
+ for (String s : command.getUndo()) {
+ sb.append("UNDO:" + s);
+ sb.append("\n");
+ }
+ }
+ List<String> locns = command.cleanupLocationsPerRetry();
+ sb.append("cleanupLocationsPerRetry entries :" + locns.size());
+ for (String s : locns){
+ sb.append("RETRY_CLEANUP:"+s);
+ sb.append("\n");
+ }
+ locns = command.cleanupLocationsAfterEvent();
+ sb.append("cleanupLocationsAfterEvent entries :" + locns.size());
+ for (String s : locns){
+ sb.append("AFTER_EVENT_CLEANUP:"+s);
+ sb.append("\n");
+ }
+ return sb.toString();
+ }
+ };
+ LOG.info("On src:");
+ for (String s : Iterables.transform(task.getSrcWhCommands(), commandDebugPrinter)){
+ LOG.info(s);
+ }
+ LOG.info("On dest:");
+ for (String s : Iterables.transform(task.getDstWhCommands(), commandDebugPrinter)){
+ LOG.info(s);
+ }
+ } else {
+ LOG.info("task was not actionable.");
+ }
+ }
+ }
+
    /**
     * Test for detecting schema-changes for an HCatalog table, across 2 different HCat instances.
     * A table is created with the same schema on 2 HCat instances. The table-schema is modified on the source HCat

Search Discussions

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommits @
categorieshive, hadoop
postedApr 8, '15 at 5:16p
activeApr 8, '15 at 5:16p
posts1
users1
websitehive.apache.org

1 user in discussion

Khorgath: 1 post

People

Translate

site design / logo © 2022 Grokbase