FAQ
Author: gunther
Date: Fri Oct 3 19:36:09 2014
New Revision: 1629299

URL: http://svn.apache.org/r1629299
Log:
Revert HIVE-7957 - checkin incorrectly included other files. (Gunther Hagleitner)

Modified:
     hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java
     hive/trunk/pom.xml
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java

Modified: hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java
URL: http://svn.apache.org/viewvc/hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java?rev=1629299&r1=1629298&r2=1629299&view=diff
==============================================================================
--- hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java (original)
+++ hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java Fri Oct 3 19:36:09 2014
@@ -47,7 +47,6 @@ import org.apache.hadoop.security.SaslRp
  import org.apache.hadoop.security.UserGroupInformation;
  import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
  import org.apache.hadoop.security.authorize.AuthorizationException;
-import org.apache.hadoop.security.authorize.DefaultImpersonationProvider;
  import org.apache.hadoop.security.authorize.ProxyUsers;
  import org.apache.hadoop.security.token.SecretManager.InvalidToken;
  import org.apache.hadoop.security.token.Token;
@@ -130,7 +129,7 @@ public class TestHadoop20SAuthBridge ext
      }
      builder.append("127.0.1.1,");
      builder.append(InetAddress.getLocalHost().getCanonicalHostName());
- conf.setStrings(DefaultImpersonationProvider.getProxySuperuserIpConfKey(superUserShortName),
+ conf.setStrings(ProxyUsers.getProxySuperuserIpConfKey(superUserShortName),
          builder.toString());
    }

@@ -293,7 +292,7 @@ public class TestHadoop20SAuthBridge ext
    private void setGroupsInConf(String[] groupNames, String proxyUserName)
    throws IOException {
     conf.set(
- DefaultImpersonationProvider.getProxySuperuserGroupConfKey(proxyUserName),
+ ProxyUsers.getProxySuperuserGroupConfKey(proxyUserName),
        StringUtils.join(",", Arrays.asList(groupNames)));
      configureSuperUserIPAddresses(conf, proxyUserName);
      ProxyUsers.refreshSuperUserGroupsConfiguration(conf);

Modified: hive/trunk/pom.xml
URL: http://svn.apache.org/viewvc/hive/trunk/pom.xml?rev=1629299&r1=1629298&r2=1629299&view=diff
==============================================================================
--- hive/trunk/pom.xml (original)
+++ hive/trunk/pom.xml Fri Oct 3 19:36:09 2014
@@ -115,7 +115,7 @@
      <groovy.version>2.1.6</groovy.version>
      <hadoop-20.version>0.20.2</hadoop-20.version>
      <hadoop-20S.version>1.2.1</hadoop-20S.version>
- <hadoop-23.version>2.5.0</hadoop-23.version>
+ <hadoop-23.version>2.4.0</hadoop-23.version>
      <hadoop.bin.path>${basedir}/${hive.path.to.root}/testutils/hadoop</hadoop.bin.path>
      <hbase.hadoop1.version>0.98.3-hadoop1</hbase.hadoop1.version>
      <hbase.hadoop2.version>0.98.3-hadoop2</hbase.hadoop2.version>
@@ -151,7 +151,7 @@
      <stax.version>1.0.1</stax.version>
      <slf4j.version>1.7.5</slf4j.version>
      <ST4.version>4.0.4</ST4.version>
- <tez.version>0.5.1</tez.version>
+ <tez.version>0.5.0</tez.version>
      <super-csv.version>2.2.0</super-csv.version>
      <tempus-fugit.version>1.1</tempus-fugit.version>
      <snappy.version>0.2</snappy.version>

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java?rev=1629299&r1=1629298&r2=1629299&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java Fri Oct 3 19:36:09 2014
@@ -60,6 +60,9 @@ public class AppMasterEventOperator exte
    protected void initDataBuffer(boolean skipPruning) throws HiveException {
      buffer = new DataOutputBuffer();
      try {
+ // where does this go to?
+ buffer.writeUTF(((TezContext) TezContext.get()).getTezProcessorContext().getTaskVertexName());
+
        // add any other header info
        getConf().writeEventHeader(buffer);


Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java?rev=1629299&r1=1629298&r2=1629299&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java Fri Oct 3 19:36:09 2014
@@ -31,7 +31,6 @@ import java.util.List;
  import java.util.Map;
  import java.util.Set;
  import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentSkipListSet;
  import java.util.concurrent.LinkedBlockingQueue;
  import java.util.concurrent.atomic.AtomicBoolean;

@@ -60,7 +59,6 @@ import org.apache.hadoop.hive.serde2.typ
  import org.apache.hadoop.io.BytesWritable;
  import org.apache.hadoop.mapred.JobConf;
  import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.dag.api.event.VertexState;
  import org.apache.tez.runtime.api.InputInitializerContext;
  import org.apache.tez.runtime.api.events.InputInitializerEvent;

@@ -79,13 +77,12 @@ public class DynamicPartitionPruner {

    private final BytesWritable writable = new BytesWritable();

- private final BlockingQueue<Object> queue = new LinkedBlockingQueue<Object>();
-
- private final Set<String> sourcesWaitingForEvents = new HashSet<String>();
+ private final BlockingQueue<InputInitializerEvent> queue =
+ new LinkedBlockingQueue<InputInitializerEvent>();

    private int sourceInfoCount = 0;

- private final Object endOfEvents = new Object();
+ private InputInitializerContext context;

    public DynamicPartitionPruner() {
    }
@@ -94,21 +91,8 @@ public class DynamicPartitionPruner {
        throws SerDeException, IOException,
        InterruptedException, HiveException {

- synchronized(sourcesWaitingForEvents) {
- initialize(work, jobConf);
-
- if (sourcesWaitingForEvents.isEmpty()) {
- return;
- }
-
- Set<VertexState> states = Collections.singleton(VertexState.SUCCEEDED);
- for (String source : sourcesWaitingForEvents) {
- // we need to get state transition updates for the vertices that will send
- // events to us. once we have received all events and a vertex has succeeded,
- // we can move to do the pruning.
- context.registerForVertexStateUpdates(source, states);
- }
- }
+ this.context = context;
+ this.initialize(work, jobConf);

      LOG.info("Waiting for events (" + sourceInfoCount + " items) ...");
      // synchronous event processing loop. Won't return until all events have
@@ -118,7 +102,7 @@ public class DynamicPartitionPruner {
      LOG.info("Ok to proceed.");
    }

- public BlockingQueue<Object> getQueue() {
+ public BlockingQueue<InputInitializerEvent> getQueue() {
      return queue;
    }

@@ -127,14 +111,11 @@ public class DynamicPartitionPruner {
      sourceInfoCount = 0;
    }

- public void initialize(MapWork work, JobConf jobConf) throws SerDeException {
+ private void initialize(MapWork work, JobConf jobConf) throws SerDeException {
      this.clear();
      Map<String, SourceInfo> columnMap = new HashMap<String, SourceInfo>();
- Set<String> sources = work.getEventSourceTableDescMap().keySet();
-
- sourcesWaitingForEvents.addAll(sources);

- for (String s : sources) {
+ for (String s : work.getEventSourceTableDescMap().keySet()) {
        List<TableDesc> tables = work.getEventSourceTableDescMap().get(s);
        List<String> columnNames = work.getEventSourceColumnNameMap().get(s);
        List<ExprNodeDesc> partKeyExprs = work.getEventSourcePartKeyExprMap().get(s);
@@ -296,30 +277,46 @@ public class DynamicPartitionPruner {

    private void processEvents() throws SerDeException, IOException, InterruptedException {
      int eventCount = 0;
+ int neededEvents = getExpectedNumberOfEvents();

- while (true) {
- Object element = queue.take();
-
- if (element == endOfEvents) {
- // we're done processing events
- break;
- }
-
- InputInitializerEvent event = (InputInitializerEvent) element;
-
+ while (neededEvents > eventCount) {
+ InputInitializerEvent event = queue.take();
        LOG.info("Input event: " + event.getTargetInputName() + ", " + event.getTargetVertexName()
            + ", " + (event.getUserPayload().limit() - event.getUserPayload().position()));
- processPayload(event.getUserPayload(), event.getSourceVertexName());
+ processPayload(event.getUserPayload());
        eventCount += 1;
+ neededEvents = getExpectedNumberOfEvents();
+ LOG.info("Needed events: " + neededEvents + ", received events: " + eventCount);
      }
- LOG.info("Received events: " + eventCount);
    }

- @SuppressWarnings("deprecation")
- private String processPayload(ByteBuffer payload, String sourceName) throws SerDeException,
- IOException {
+ private int getExpectedNumberOfEvents() throws InterruptedException {
+ int neededEvents = 0;
+
+ boolean notInitialized;
+ do {
+ neededEvents = 0;
+ notInitialized = false;
+ for (String s : sourceInfoMap.keySet()) {
+ int multiplier = sourceInfoMap.get(s).size();
+ int taskNum = context.getVertexNumTasks(s);
+ LOG.info("Vertex " + s + " has " + taskNum + " events.");
+ if (taskNum < 0) {
+ notInitialized = true;
+ Thread.sleep(10);
+ continue;
+ }
+ neededEvents += (taskNum * multiplier);
+ }
+ } while (notInitialized);
+
+ return neededEvents;
+ }

+ @SuppressWarnings("deprecation")
+ private String processPayload(ByteBuffer payload) throws SerDeException, IOException {
      DataInputStream in = new DataInputStream(new ByteBufferBackedInputStream(payload));
+ String sourceName = in.readUTF();
      String columnName = in.readUTF();
      boolean skip = in.readBoolean();

@@ -393,26 +390,4 @@ public class DynamicPartitionPruner {
      }
    }

- public void addEvent(InputInitializerEvent event) {
- synchronized(sourcesWaitingForEvents) {
- if (sourcesWaitingForEvents.contains(event.getSourceVertexName())) {
- queue.offer(event);
- }
- }
- }
-
- public void processVertex(String name) {
- LOG.info("Vertex succeeded: " + name);
-
- synchronized(sourcesWaitingForEvents) {
- sourcesWaitingForEvents.remove(name);
-
- if (sourcesWaitingForEvents.isEmpty()) {
- // we've got what we need; mark the queue
- queue.offer(endOfEvents);
- } else {
- LOG.info("Waiting for " + sourcesWaitingForEvents.size() + " events.");
- }
- }
- }
  }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java?rev=1629299&r1=1629298&r2=1629299&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java Fri Oct 3 19:36:09 2014
@@ -38,9 +38,8 @@ import org.apache.hadoop.mapred.JobConf;
  import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper;
  import org.apache.hadoop.util.ReflectionUtils;
  import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.TaskLocationHint;
  import org.apache.tez.dag.api.VertexLocationHint;
-import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.api.TaskLocationHint;
  import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem;
  import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
  import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
@@ -244,14 +243,9 @@ public class HiveSplitGenerator extends
    }

    @Override
- public void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
- pruner.processVertex(stateUpdate.getVertexName());
- }
-
- @Override
    public void handleInputInitializerEvent(List<InputInitializerEvent> events) throws Exception {
      for (InputInitializerEvent e : events) {
- pruner.addEvent(e);
+ pruner.getQueue().put(e);
      }
    }
  }

Search Discussions

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommits @
categorieshive, hadoop
postedOct 3, '14 at 7:36p
activeOct 3, '14 at 7:36p
posts1
users1
websitehive.apache.org

1 user in discussion

Gunther: 1 post

People

Translate

site design / logo © 2021 Grokbase