FAQ
Author: gates
Date: Fri Jan 24 20:37:50 2014
New Revision: 1561158

URL: http://svn.apache.org/r1561158
Log:
HIVE-6248 HCatReader/Writer should hide Hadoop and Hive classes

Added:
     hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/ReaderContextImpl.java
     hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/WriterContextImpl.java
Modified:
     hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/DataTransferFactory.java
     hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/HCatReader.java
     hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/ReaderContext.java
     hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/WriterContext.java
     hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatInputFormatReader.java
     hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java
     hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/TestReaderWriter.java

Modified: hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/DataTransferFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/DataTransferFactory.java?rev=1561158&r1=1561157&r2=1561158&view=diff
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/DataTransferFactory.java (original)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/DataTransferFactory.java Fri Jan 24 20:37:50 2014
@@ -21,8 +21,6 @@ package org.apache.hive.hcatalog.data.tr

  import java.util.Map;

-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
  import org.apache.hive.hcatalog.data.transfer.impl.HCatInputFormatReader;
  import org.apache.hive.hcatalog.data.transfer.impl.HCatOutputFormatWriter;
  import org.apache.hive.hcatalog.data.transfer.state.DefaultStateProvider;
@@ -56,16 +54,16 @@ public class DataTransferFactory {
     * This should only be called once from every slave node to obtain an instance
     * of {@link HCatReader}.
     *
- * @param split
- * input split obtained at master node
- * @param config
- * configuration obtained at master node
+ * @param context
+ * reader context obtained at the master node
+ * @param slaveNumber
+ * which slave this is, determines which part of the read is done
     * @return {@link HCatReader}
     */
- public static HCatReader getHCatReader(final InputSplit split,
- final Configuration config) {
+ public static HCatReader getHCatReader(final ReaderContext context,
+ int slaveNumber) {
      // In future, this may examine config to return appropriate HCatReader
- return getHCatReader(split, config, DefaultStateProvider.get());
+ return getHCatReader(context, slaveNumber, DefaultStateProvider.get());
    }

    /**
@@ -73,18 +71,19 @@ public class DataTransferFactory {
     * of {@link HCatReader}. This should be called if an external system has some
     * state to provide to HCatalog.
     *
- * @param split
- * input split obtained at master node
- * @param config
- * configuration obtained at master node
+ * @param context
+ * reader context obtained at the master node
+ * @param slaveNumber
+ * which slave this is, determines which part of the read is done
     * @param sp
     * {@link StateProvider}
     * @return {@link HCatReader}
     */
- public static HCatReader getHCatReader(final InputSplit split,
- final Configuration config, StateProvider sp) {
+ public static HCatReader getHCatReader(final ReaderContext context,
+ int slaveNumber,
+ StateProvider sp) {
      // In future, this may examine config to return appropriate HCatReader
- return new HCatInputFormatReader(split, config, sp);
+ return new HCatInputFormatReader(context, slaveNumber, sp);
    }

    /**
@@ -131,6 +130,6 @@ public class DataTransferFactory {
    public static HCatWriter getHCatWriter(final WriterContext cntxt,
                         final StateProvider sp) {
      // In future, this may examine context to return appropriate HCatWriter
- return new HCatOutputFormatWriter(cntxt.getConf(), sp);
+ return new HCatOutputFormatWriter(cntxt, sp);
    }
  }

Modified: hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/HCatReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/HCatReader.java?rev=1561158&r1=1561157&r2=1561158&view=diff
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/HCatReader.java (original)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/HCatReader.java Fri Jan 24 20:37:50 2014
@@ -95,11 +95,4 @@ public abstract class HCatReader {
      this.conf = conf;
    }

- public Configuration getConf() {
- if (null == conf) {
- throw new IllegalStateException(
- "HCatReader is not constructed correctly.");
- }
- return conf;
- }
  }

Modified: hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/ReaderContext.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/ReaderContext.java?rev=1561158&r1=1561157&r2=1561158&view=diff
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/ReaderContext.java (original)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/ReaderContext.java Fri Jan 24 20:37:50 2014
@@ -20,70 +20,22 @@
  package org.apache.hive.hcatalog.data.transfer;

  import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.ArrayList;
-import java.util.List;

-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hive.hcatalog.mapreduce.HCatSplit;

  /**
- * This class will contain information of different {@link InputSplit} obtained
- * at master node and configuration. This class implements
- * {@link Externalizable} so it can be serialized using standard java
- * mechanisms.
+ * This read context is obtained by the master node and should be distributed
+ * to the slaves. The contents of the class are opaque to the client. This
+ * interface extends {@link java.io.Externalizable} so that implementing
+ * classes can be serialized using standard Java mechanisms.
   */
-public class ReaderContext implements Externalizable, Configurable {
+public interface ReaderContext extends Externalizable {

- private static final long serialVersionUID = -2656468331739574367L;
- private List<InputSplit> splits;
- private Configuration conf;
+ /**
+ * Determine the number of splits available in this {@link ReaderContext}.
+ * The client is not required to have this many slave nodes,
+ * as one slave can be used to read multiple splits.
+ * @return number of splits
+ */
+ public int numSplits();

- public ReaderContext() {
- this.splits = new ArrayList<InputSplit>();
- this.conf = new Configuration();
- }
-
- public void setInputSplits(final List<InputSplit> splits) {
- this.splits = splits;
- }
-
- public List<InputSplit> getSplits() {
- return splits;
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public void setConf(final Configuration config) {
- conf = config;
- }
-
- @Override
- public void writeExternal(ObjectOutput out) throws IOException {
- conf.write(out);
- out.writeInt(splits.size());
- for (InputSplit split : splits) {
- ((HCatSplit) split).write(out);
- }
- }
-
- @Override
- public void readExternal(ObjectInput in) throws IOException,
- ClassNotFoundException {
- conf.readFields(in);
- int numOfSplits = in.readInt();
- for (int i = 0; i < numOfSplits; i++) {
- HCatSplit split = new HCatSplit();
- split.readFields(in);
- splits.add(split);
- }
- }
  }

Modified: hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/WriterContext.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/WriterContext.java?rev=1561158&r1=1561157&r2=1561158&view=diff
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/WriterContext.java (original)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/WriterContext.java Fri Jan 24 20:37:50 2014
@@ -20,46 +20,14 @@
  package org.apache.hive.hcatalog.data.transfer;

  import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;

  /**
- * This contains information obtained at master node to help prepare slave nodes
- * for writer. This class implements {@link Externalizable} so it can be
- * serialized using standard java mechanisms. Master should serialize it and
+ * This contains information obtained at master node to be distributed to
+ * slaves nodes that will do the writing.
+ * This class implements {@link Externalizable} so it can be
+ * serialized using standard Java mechanisms. Master should serialize it and
   * make it available to slaves to prepare for writes.
   */
-public class WriterContext implements Externalizable, Configurable {
-
- private static final long serialVersionUID = -5899374262971611840L;
- private Configuration conf;
-
- public WriterContext() {
- conf = new Configuration();
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public void setConf(final Configuration config) {
- this.conf = config;
- }
-
- @Override
- public void writeExternal(ObjectOutput out) throws IOException {
- conf.write(out);
- }
+public interface WriterContext extends Externalizable {

- @Override
- public void readExternal(ObjectInput in) throws IOException,
- ClassNotFoundException {
- conf.readFields(in);
- }
  }

Modified: hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatInputFormatReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatInputFormatReader.java?rev=1561158&r1=1561157&r2=1561158&view=diff
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatInputFormatReader.java (original)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatInputFormatReader.java Fri Jan 24 20:37:50 2014
@@ -48,10 +48,10 @@ public class HCatInputFormatReader exten

    private InputSplit split;

- public HCatInputFormatReader(InputSplit split, Configuration config,
- StateProvider sp) {
- super(config, sp);
- this.split = split;
+ public HCatInputFormatReader(ReaderContext context, int slaveNumber,
+ StateProvider sp) {
+ super(((ReaderContextImpl)context).getConf(), sp);
+ this.split = ((ReaderContextImpl)context).getSplits().get(slaveNumber);
    }

    public HCatInputFormatReader(ReadEntity info, Map<String, String> config) {
@@ -64,7 +64,7 @@ public class HCatInputFormatReader exten
        Job job = new Job(conf);
        HCatInputFormat hcif = HCatInputFormat.setInput(
          job, re.getDbName(), re.getTableName(), re.getFilterString());
- ReaderContext cntxt = new ReaderContext();
+ ReaderContextImpl cntxt = new ReaderContextImpl();
        cntxt.setInputSplits(hcif.getSplits(
            ShimLoader.getHadoopShims().getHCatShim().createJobContext(job.getConfiguration(), null)));
        cntxt.setConf(job.getConfiguration());

Modified: hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java?rev=1561158&r1=1561157&r2=1561158&view=diff
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java (original)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java Fri Jan 24 20:37:50 2014
@@ -52,8 +52,8 @@ public class HCatOutputFormatWriter exte
      super(we, config);
    }

- public HCatOutputFormatWriter(Configuration config, StateProvider sp) {
- super(config, sp);
+ public HCatOutputFormatWriter(WriterContext cntxt, StateProvider sp) {
+ super(((WriterContextImpl)cntxt).getConf(), sp);
    }

    @Override
@@ -74,7 +74,7 @@ public class HCatOutputFormatWriter exte
      } catch (InterruptedException e) {
        throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
      }
- WriterContext cntxt = new WriterContext();
+ WriterContextImpl cntxt = new WriterContextImpl();
      cntxt.setConf(job.getConfiguration());
      return cntxt;
    }
@@ -124,10 +124,14 @@ public class HCatOutputFormatWriter exte

    @Override
    public void commit(WriterContext context) throws HCatException {
+ WriterContextImpl cntxtImpl = (WriterContextImpl)context;
      try {
- new HCatOutputFormat().getOutputCommitter(ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptContext(
- context.getConf(), ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptID()))
- .commitJob(ShimLoader.getHadoopShims().getHCatShim().createJobContext(context.getConf(), null));
+ new HCatOutputFormat().getOutputCommitter(
+ ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptContext(
+ cntxtImpl.getConf(),
+ ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptID()))
+ .commitJob(ShimLoader.getHadoopShims().getHCatShim().createJobContext(
+ cntxtImpl.getConf(), null));
      } catch (IOException e) {
        throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
      } catch (InterruptedException e) {
@@ -137,11 +141,14 @@ public class HCatOutputFormatWriter exte

    @Override
    public void abort(WriterContext context) throws HCatException {
+ WriterContextImpl cntxtImpl = (WriterContextImpl)context;
      try {
- new HCatOutputFormat().getOutputCommitter(ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptContext(
- context.getConf(), ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptID()))
- .abortJob(ShimLoader.getHadoopShims().getHCatShim().createJobContext(
- context.getConf(), null), State.FAILED);
+ new HCatOutputFormat().getOutputCommitter(
+ ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptContext(
+ cntxtImpl.getConf(),
+ ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptID()))
+ .abortJob(ShimLoader.getHadoopShims().getHCatShim().createJobContext(
+ cntxtImpl.getConf(), null), State.FAILED);
      } catch (IOException e) {
        throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
      } catch (InterruptedException e) {

Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/ReaderContextImpl.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/ReaderContextImpl.java?rev=1561158&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/ReaderContextImpl.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/ReaderContextImpl.java Fri Jan 24 20:37:50 2014
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hive.hcatalog.data.transfer.impl;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hive.hcatalog.data.transfer.ReaderContext;
+import org.apache.hive.hcatalog.mapreduce.HCatSplit;
+
+/**
+ * This class contains the list of {@link InputSplit}s obtained
+ * at master node and the configuration.
+ */
+class ReaderContextImpl implements ReaderContext, Configurable {
+
+ private static final long serialVersionUID = -2656468331739574367L;
+ private List<InputSplit> splits;
+ private Configuration conf;
+
+ public ReaderContextImpl() {
+ this.splits = new ArrayList<InputSplit>();
+ this.conf = new Configuration();
+ }
+
+ void setInputSplits(final List<InputSplit> splits) {
+ this.splits = splits;
+ }
+
+ List<InputSplit> getSplits() {
+ return splits;
+ }
+
+ @Override
+ public int numSplits() {
+ return splits.size();
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(final Configuration config) {
+ conf = config;
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ conf.write(out);
+ out.writeInt(splits.size());
+ for (InputSplit split : splits) {
+ ((HCatSplit) split).write(out);
+ }
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException,
+ ClassNotFoundException {
+ conf.readFields(in);
+ int numOfSplits = in.readInt();
+ for (int i = 0; i < numOfSplits; i++) {
+ HCatSplit split = new HCatSplit();
+ split.readFields(in);
+ splits.add(split);
+ }
+ }
+}

Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/WriterContextImpl.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/WriterContextImpl.java?rev=1561158&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/WriterContextImpl.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/WriterContextImpl.java Fri Jan 24 20:37:50 2014
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hive.hcatalog.data.transfer.impl;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hive.hcatalog.data.transfer.WriterContext;
+
+/**
+ * This contains information obtained at master node to help prepare slave nodes
+ * for writer. This class implements {@link Externalizable} so it can be
+ * serialized using standard java mechanisms. Master should serialize it and
+ * make it available to slaves to prepare for writes.
+ */
+class WriterContextImpl implements WriterContext, Configurable {
+
+ private static final long serialVersionUID = -5899374262971611840L;
+ private Configuration conf;
+
+ public WriterContextImpl() {
+ conf = new Configuration();
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(final Configuration config) {
+ this.conf = config;
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ conf.write(out);
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException,
+ ClassNotFoundException {
+ conf.readFields(in);
+ }
+}

Modified: hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/TestReaderWriter.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/TestReaderWriter.java?rev=1561158&r1=1561157&r2=1561158&view=diff
==============================================================================
--- hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/TestReaderWriter.java (original)
+++ hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/TestReaderWriter.java Fri Jan 24 20:37:50 2014
@@ -95,8 +95,8 @@ public class TestReaderWriter extends HC
      readCntxt = (ReaderContext) ois.readObject();
      ois.close();

- for (InputSplit split : readCntxt.getSplits()) {
- runsInSlave(split, readCntxt.getConf());
+ for (int i = 0; i < readCntxt.numSplits(); i++) {
+ runsInSlave(readCntxt, i);
      }
    }

@@ -117,9 +117,9 @@ public class TestReaderWriter extends HC
      return cntxt;
    }

- private void runsInSlave(InputSplit split, Configuration config) throws HCatException {
+ private void runsInSlave(ReaderContext cntxt, int slaveNum) throws HCatException {

- HCatReader reader = DataTransferFactory.getHCatReader(split, config);
+ HCatReader reader = DataTransferFactory.getHCatReader(cntxt, slaveNum);
      Iterator<HCatRecord> itr = reader.read();
      int i = 1;
      while (itr.hasNext()) {

Search Discussions

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommits @
categorieshive, hadoop
postedJan 24, '14 at 8:38p
activeJan 24, '14 at 8:38p
posts1
users1
websitehive.apache.org

1 user in discussion

Gates: 1 post

People

Translate

site design / logo © 2021 Grokbase