FAQ
HIVE-12049: HiveServer2: Provide an option to write serialized thrift objects in final tasks (Rohit Dholakia reviewed by Ashutosh Chauhan, Gopal Vijayaraghavan, Lefty Leverenz, Vaibhav Gumashta)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/fb230f9d
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/fb230f9d
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/fb230f9d

Branch: refs/heads/master
Commit: fb230f9df5b7c990c80326671d9975a6f05e1600
Parents: 145e253
Author: Vaibhav Gumashta <vgumashta@hortonworks.com>
Authored: Fri Apr 22 12:22:55 2016 -0700
Committer: Vaibhav Gumashta <vgumashta@hortonworks.com>
Committed: Fri Apr 22 12:22:55 2016 -0700

----------------------------------------------------------------------
  .../java/org/apache/hive/beeline/Commands.java | 2 +-
  .../org/apache/hadoop/hive/conf/HiveConf.java | 11 +
  .../apache/hive/jdbc/TestJdbcWithMiniHS2.java | 108 ++++-
  .../org/apache/hive/jdbc/HiveBaseResultSet.java | 2 +-
  .../apache/hive/jdbc/HiveResultSetMetaData.java | 2 +-
  .../org/apache/hive/jdbc/HiveStatement.java | 16 +-
  .../java/org/apache/hive/jdbc/JdbcColumn.java | 2 +-
  ql/pom.xml | 6 +
  .../java/org/apache/hadoop/hive/ql/Driver.java | 19 +
  .../hive/ql/exec/DefaultFetchFormatter.java | 77 ----
  .../hadoop/hive/ql/exec/FetchFormatter.java | 71 ---
  .../apache/hadoop/hive/ql/exec/FetchTask.java | 3 +-
  .../hadoop/hive/ql/exec/FileSinkOperator.java | 27 +-
  .../hadoop/hive/ql/exec/ListSinkOperator.java | 11 +-
  .../org/apache/hadoop/hive/ql/parse/QB.java | 5 +
  .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 22 +-
  .../hadoop/hive/ql/parse/TaskCompiler.java | 62 ++-
  .../apache/hadoop/hive/ql/plan/FetchWork.java | 10 +
  .../hadoop/hive/ql/plan/FileSinkDesc.java | 9 +
  .../apache/hadoop/hive/ql/plan/PlanUtils.java | 10 +-
  serde/pom.xml | 5 +
  .../hive/serde2/DefaultFetchFormatter.java | 73 +++
  .../hadoop/hive/serde2/FetchFormatter.java | 37 ++
  .../hadoop/hive/serde2/NoOpFetchFormatter.java | 48 ++
  .../apache/hadoop/hive/serde2/SerDeUtils.java | 3 +-
  .../hadoop/hive/serde2/thrift/ColumnBuffer.java | 439 +++++++++++++++++++
  .../hive/serde2/thrift/ThriftFormatter.java | 40 ++
  .../serde2/thrift/ThriftJDBCBinarySerDe.java | 178 ++++++++
  .../apache/hadoop/hive/serde2/thrift/Type.java | 438 ++++++++++++++++++
  .../hadoop/hive/serde2/typeinfo/TypeInfo.java | 14 +-
  service-rpc/if/TCLIService.thrift | 2 +
  .../gen/thrift/gen-cpp/TCLIService_types.cpp | 44 ++
  .../src/gen/thrift/gen-cpp/TCLIService_types.h | 20 +-
  .../apache/hive/service/rpc/thrift/TRowSet.java | 222 +++++++++-
  service-rpc/src/gen/thrift/gen-php/Types.php | 46 ++
  .../src/gen/thrift/gen-py/TCLIService/ttypes.py | 28 +-
  .../gen/thrift/gen-rb/t_c_l_i_service_types.rb | 6 +-
  .../org/apache/hive/service/cli/Column.java | 434 ------------------
  .../apache/hive/service/cli/ColumnBasedSet.java | 84 +++-
  .../hive/service/cli/ColumnDescriptor.java | 12 +-
  .../apache/hive/service/cli/ColumnValue.java | 1 +
  .../apache/hive/service/cli/RowSetFactory.java | 17 +-
  .../apache/hive/service/cli/TableSchema.java | 4 +-
  .../java/org/apache/hive/service/cli/Type.java | 348 ---------------
  .../apache/hive/service/cli/TypeDescriptor.java | 1 +
  .../cli/operation/GetCatalogsOperation.java | 2 +-
  .../cli/operation/GetColumnsOperation.java | 4 +-
  .../cli/operation/GetFunctionsOperation.java | 8 +-
  .../cli/operation/GetSchemasOperation.java | 5 +-
  .../cli/operation/GetTableTypesOperation.java | 9 +-
  .../cli/operation/GetTablesOperation.java | 2 +-
  .../cli/operation/GetTypeInfoOperation.java | 4 +-
  .../cli/operation/HiveCommandOperation.java | 4 +-
  .../service/cli/operation/OperationManager.java | 12 +-
  .../service/cli/operation/SQLOperation.java | 20 +-
  .../service/cli/session/HiveSessionImpl.java | 8 +-
  .../service/cli/thrift/ThriftCLIService.java | 2 +-
  .../apache/hive/service/cli/CLIServiceTest.java | 2 +-
  .../org/apache/hive/service/cli/TestColumn.java | 14 +-
  59 files changed, 2066 insertions(+), 1049 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/beeline/src/java/org/apache/hive/beeline/Commands.java
----------------------------------------------------------------------
diff --git a/beeline/src/java/org/apache/hive/beeline/Commands.java b/beeline/src/java/org/apache/hive/beeline/Commands.java
index 22e2066..0178333 100644
--- a/beeline/src/java/org/apache/hive/beeline/Commands.java
+++ b/beeline/src/java/org/apache/hive/beeline/Commands.java
@@ -1179,7 +1179,7 @@ public class Commands {
    private void showRemainingLogsIfAny(Statement statement) {
      if (statement instanceof HiveStatement) {
        HiveStatement hiveStatement = (HiveStatement) statement;
- List<String> logs;
+ List<String> logs = null;
        do {
          try {
            logs = hiveStatement.getQueryLog();

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 5cf1609..c52b9d9 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2037,6 +2037,7 @@ public class HiveConf extends Configuration {
          new TimeValidator(TimeUnit.SECONDS),
          "Number of seconds a request will wait to acquire the compile lock before giving up. " +
          "Setting it to 0s disables the timeout."),
+
      // HiveServer2 WebUI
      HIVE_SERVER2_WEBUI_BIND_HOST("hive.server2.webui.host", "0.0.0.0", "The host address the HiveServer2 WebUI will listen on"),
      HIVE_SERVER2_WEBUI_PORT("hive.server2.webui.port", 10002, "The port the HiveServer2 WebUI will listen on. This can be"
@@ -2167,6 +2168,7 @@ public class HiveConf extends Configuration {
          new TimeValidator(TimeUnit.SECONDS),
          "Keepalive time (in seconds) for an idle worker thread. When the number of workers exceeds min workers, " +
          "excessive threads are killed after this time interval."),
+
      // Configuration for async thread pool in SessionManager
      HIVE_SERVER2_ASYNC_EXEC_THREADS("hive.server2.async.exec.threads", 100,
          "Number of threads in the async thread pool for HiveServer2"),
@@ -2330,6 +2332,14 @@ public class HiveConf extends Configuration {
      HIVE_SERVER2_THRIFT_CLIENT_PASSWORD("hive.server2.thrift.client.password", "anonymous","Password to use against " +
        "thrift client"),

+ // ResultSet serialization settings
+ HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS("hive.server2.thrift.resultset.serialize.in.tasks", false,
+ "Whether we should serialize the Thrift structures used in JDBC ResultSet RPC in task nodes.\n " +
+ "We use SequenceFile and ThriftJDBCBinarySerDe to read and write the final results if this is true."),
+ // TODO: Make use of this config to configure fetch size
+ HIVE_SERVER2_THRIFT_RESULTSET_MAX_FETCH_SIZE("hive.server2.thrift.resultset.max.fetch.size", 1000,
+ "Max number of rows sent in one Fetch RPC call by the server to the client."),
+
      HIVE_SECURITY_COMMAND_WHITELIST("hive.security.command.whitelist", "set,reset,dfs,add,list,delete,reload,compile",
          "Comma separated list of non-SQL Hive commands users are authorized to execute"),

@@ -3646,6 +3656,7 @@ public class HiveConf extends Configuration {
      ConfVars.HIVE_RESULTSET_USE_UNIQUE_COLUMN_NAMES.varname,
      ConfVars.HIVE_STATS_COLLECT_PART_LEVEL_STATS.varname,
      ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LEVEL.varname,
+ ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS.varname,
      ConfVars.HIVE_SUPPORT_SQL11_RESERVED_KEYWORDS.varname,
      ConfVars.JOB_DEBUG_CAPTURE_STACKTRACES.varname,
      ConfVars.JOB_DEBUG_TIMEOUT.varname,

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
index 10c8ff2..815ccfa 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
@@ -431,6 +431,112 @@ public class TestJdbcWithMiniHS2 {
      }
    }

+ private void setSerializeInTasksInConf(HiveConf conf) {
+ conf.setBoolean("hive.server2.thrift.resultset.serialize.in.tasks", true);
+ conf.setInt("hive.server2.thrift.resultset.max.fetch.size", 1000);
+ }
+
+ @Test
+ public void testMetadataQueriesWithSerializeThriftInTasks() throws Exception {
+ //stop HiveServer2
+ if (miniHS2.isStarted()) {
+ miniHS2.stop();
+ }
+
+ HiveConf conf = new HiveConf();
+ String userName;
+ setSerializeInTasksInConf(conf);
+ miniHS2 = new MiniHS2(conf);
+ Map<String, String> confOverlay = new HashMap<String, String>();
+ miniHS2.start(confOverlay);
+
+ userName = System.getProperty("user.name");
+ hs2Conn = getConnection(miniHS2.getJdbcURL(), userName, "password");
+ Statement stmt = hs2Conn.createStatement();
+ stmt.execute("drop table if exists testThriftSerializeShow");
+ stmt.execute("create table testThriftSerializeShow (a int)");
+ ResultSet rs = stmt.executeQuery("show tables");
+ assertTrue(rs.next());
+ stmt.execute("describe testThriftSerializeShow");
+ stmt.execute("explain select a from testThriftSerializeShow");
+ stmt.execute("drop table testThriftSerializeShow");
+ stmt.close();
+ }
+
+ @Test
+ public void testSelectThriftSerializeInTasks() throws Exception {
+ if (miniHS2.isStarted()) {
+ miniHS2.stop();
+ }
+
+ HiveConf conf = new HiveConf();
+ String userName;
+ setSerializeInTasksInConf(conf);
+ miniHS2 = new MiniHS2(conf);
+ Map<String, String> confOverlay = new HashMap<String, String>();
+ miniHS2.start(confOverlay);
+
+ userName = System.getProperty("user.name");
+ hs2Conn = getConnection(miniHS2.getJdbcURL(), userName, "password");
+ Statement stmt = hs2Conn.createStatement();
+
+ stmt.execute("drop table if exists testSelectThriftOrders");
+ stmt.execute("drop table if exists testSelectThriftCustomers");
+ stmt.execute("create table testSelectThriftOrders (orderid int, orderdate string, customerid int)");
+ stmt.execute("create table testSelectThriftCustomers (customerid int, customername string, customercountry string)");
+ stmt.execute("insert into testSelectThriftOrders values (1, '2015-09-09', 123), (2, '2015-10-10', 246), (3, '2015-11-11', 356)");
+ stmt.execute("insert into testSelectThriftCustomers values (123, 'David', 'America'), (246, 'John', 'Canada'), (356, 'Mary', 'CostaRica')");
+ ResultSet countOrders = stmt.executeQuery("select count(*) from testSelectThriftOrders");
+ while (countOrders.next()) {
+ assertEquals(3, countOrders.getInt(1));
+ }
+ ResultSet maxOrders = stmt.executeQuery("select max(customerid) from testSelectThriftCustomers");
+ while (maxOrders.next()) {
+ assertEquals(356, maxOrders.getInt(1));
+ }
+ stmt.execute("drop table testSelectThriftOrders");
+ stmt.execute("drop table testSelectThriftCustomers");
+ stmt.close();
+ }
+
+ @Test
+ public void testJoinThriftSerializeInTasks() throws Exception {
+ //stop HiveServer2
+ if (miniHS2.isStarted()) {
+ miniHS2.stop();
+ }
+ HiveConf conf = new HiveConf();
+ String userName;
+
+ setSerializeInTasksInConf(conf);
+
+ miniHS2 = new MiniHS2(conf);
+ Map<String, String> confOverlay = new HashMap<String, String>();
+ miniHS2.start(confOverlay);
+
+ userName = System.getProperty("user.name");
+ hs2Conn = getConnection(miniHS2.getJdbcURL(), userName, "password");
+ Statement stmt = hs2Conn.createStatement();
+ stmt.execute("drop table if exists testThriftJoinOrders");
+ stmt.execute("drop table if exists testThriftJoinCustomers");
+ stmt.execute("create table testThriftJoinOrders (orderid int, orderdate string, customerid int)");
+ stmt.execute("create table testThriftJoinCustomers (customerid int, customername string, customercountry string)");
+ stmt.execute("insert into testThriftJoinOrders values (1, '2015-09-09', 123), (2, '2015-10-10', 246), (3, '2015-11-11', 356)");
+ stmt.execute("insert into testThriftJoinCustomers values (123, 'David', 'America'), (246, 'John', 'Canada'), (356, 'Mary', 'CostaRica')");
+ ResultSet joinResultSet = stmt.executeQuery("select testThriftJoinOrders.orderid, testThriftJoinCustomers.customername from testThriftJoinOrders inner join testThriftJoinCustomers where testThriftJoinOrders.customerid=testThriftJoinCustomers.customerid");
+ Map<Integer, String> expectedResult = new HashMap<Integer, String>();
+ expectedResult.put(1, "David");
+ expectedResult.put(2, "John");
+ expectedResult.put(3, "Mary");
+ for (int i = 1; i < 4; i++) {
+ assertTrue(joinResultSet.next());
+ assertEquals(joinResultSet.getString(2), expectedResult.get(i));
+ }
+ stmt.execute("drop table testThriftJoinOrders");
+ stmt.execute("drop table testThriftJoinCustomers");
+ stmt.close();
+ }
+
    /**
     * Tests the creation of the 3 scratch dirs: hdfs, local, downloaded resources (which is also local).
     * 1. Test with doAs=false: open a new JDBC session and verify the presence of directories/permissions
@@ -810,4 +916,4 @@ public class TestJdbcWithMiniHS2 {
      }
      return -1;
    }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/jdbc/src/java/org/apache/hive/jdbc/HiveBaseResultSet.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveBaseResultSet.java b/jdbc/src/java/org/apache/hive/jdbc/HiveBaseResultSet.java
index 88ba853..93f093f 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/HiveBaseResultSet.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/HiveBaseResultSet.java
@@ -46,8 +46,8 @@ import java.util.Map;

  import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
  import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth;
+import org.apache.hadoop.hive.serde2.thrift.Type;
  import org.apache.hive.service.cli.TableSchema;
-import org.apache.hive.service.cli.Type;

  /**
   * Data independent base class which implements the common part of

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/jdbc/src/java/org/apache/hive/jdbc/HiveResultSetMetaData.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveResultSetMetaData.java b/jdbc/src/java/org/apache/hive/jdbc/HiveResultSetMetaData.java
index 16a0894..f6c38d8 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/HiveResultSetMetaData.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/HiveResultSetMetaData.java
@@ -22,7 +22,7 @@ import java.sql.ResultSetMetaData;
  import java.sql.SQLException;
  import java.util.List;

-import org.apache.hive.service.cli.Type;
+import org.apache.hadoop.hive.serde2.thrift.Type;

  /**
   * HiveResultSetMetaData.

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
index f5b9672..3cc6b74 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
@@ -35,6 +35,7 @@ import org.apache.hive.service.rpc.thrift.TGetOperationStatusReq;
  import org.apache.hive.service.rpc.thrift.TGetOperationStatusResp;
  import org.apache.hive.service.rpc.thrift.TOperationHandle;
  import org.apache.hive.service.rpc.thrift.TSessionHandle;
+import org.apache.thrift.TException;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;

@@ -881,15 +882,22 @@ public class HiveStatement implements java.sql.Statement {
        }
      } catch (SQLException e) {
        throw e;
+ } catch (TException e) {
+ throw new SQLException("Error when getting query log: " + e, e);
      } catch (Exception e) {
        throw new SQLException("Error when getting query log: " + e, e);
      }

- RowSet rowSet = RowSetFactory.create(tFetchResultsResp.getResults(),
- connection.getProtocol());
- for (Object[] row : rowSet) {
- logs.add(String.valueOf(row[0]));
+ try {
+ RowSet rowSet;
+ rowSet = RowSetFactory.create(tFetchResultsResp.getResults(), connection.getProtocol());
+ for (Object[] row : rowSet) {
+ logs.add(String.valueOf(row[0]));
+ }
+ } catch (TException e) {
+ throw new SQLException("Error building result set for query log: " + e, e);
      }
+
      return logs;
    }


http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/jdbc/src/java/org/apache/hive/jdbc/JdbcColumn.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/JdbcColumn.java b/jdbc/src/java/org/apache/hive/jdbc/JdbcColumn.java
index 691fd0e..5aed679 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/JdbcColumn.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/JdbcColumn.java
@@ -27,7 +27,7 @@ import java.sql.Types;
  import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
  import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth;
  import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hive.service.cli.Type;
+import org.apache.hadoop.hive.serde2.thrift.Type;


  /**

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/pom.xml
----------------------------------------------------------------------
diff --git a/ql/pom.xml b/ql/pom.xml
index ebb9599..aaa3271 100644
--- a/ql/pom.xml
+++ b/ql/pom.xml
@@ -70,6 +70,11 @@
      </dependency>
      <dependency>
        <groupId>org.apache.hive</groupId>
+ <artifactId>hive-service-rpc</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
        <artifactId>hive-llap-client</artifactId>
        <version>${project.version}</version>
      </dependency>
@@ -803,6 +808,7 @@
                    <include>org.apache.hive:hive-serde</include>
                    <include>org.apache.hive:hive-llap-client</include>
                    <include>org.apache.hive:hive-metastore</include>
+ <include>org.apache.hive:hive-service-rpc</include>
                    <include>com.esotericsoftware:kryo-shaded</include>
      <include>com.esotericsoftware:minlog</include>
      <include>org.objenesis:objenesis</include>

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 65744ac..48fb060 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -108,6 +108,7 @@ import org.apache.hadoop.hive.ql.session.OperationLog.LoggingLevel;
  import org.apache.hadoop.hive.ql.session.SessionState;
  import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
  import org.apache.hadoop.hive.serde2.ByteStream;
+import org.apache.hadoop.hive.serde2.thrift.ThriftJDBCBinarySerDe;
  import org.apache.hadoop.hive.shims.Utils;
  import org.apache.hadoop.mapred.ClusterStatus;
  import org.apache.hadoop.mapred.JobClient;
@@ -932,6 +933,13 @@ public class Driver implements CommandProcessor {
      return plan;
    }

+ /**
+ * @return The current FetchTask associated with the Driver's plan, if any.
+ */
+ public FetchTask getFetchTask() {
+ return fetchTask;
+ }
+
    // Write the current set of valid transactions into the conf file so that it can be read by
    // the input format.
    private void recordValidTxns() throws LockException {
@@ -1880,6 +1888,17 @@ public class Driver implements CommandProcessor {
        throw new IOException("FAILED: Operation cancelled");
      }
      if (isFetchingTable()) {
+ /**
+ * If resultset serialization to thrift object is enabled, and if the destination table is
+ * indeed written using ThriftJDBCBinarySerDe, read one row from the output sequence file,
+ * since it is a blob of row batches.
+ */
+ if (fetchTask.getWork().isHiveServerQuery() && HiveConf.getBoolVar(conf,
+ HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS)
+ && (fetchTask.getTblDesc().getSerdeClassName()
+ .equalsIgnoreCase(ThriftJDBCBinarySerDe.class.getName()))) {
+ maxRows = 1;
+ }
        fetchTask.setMaxRows(maxRows);
        return fetchTask.fetch(res);
      }

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/src/java/org/apache/hadoop/hive/ql/exec/DefaultFetchFormatter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DefaultFetchFormatter.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DefaultFetchFormatter.java
deleted file mode 100644
index b8be3a5..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DefaultFetchFormatter.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.exec;
-
-import java.io.IOException;
-import java.util.Properties;
-
-import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT;
-import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_NULL_FORMAT;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.common.JavaUtils;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.serde2.DelimitedJSONSerDe;
-import org.apache.hadoop.hive.serde2.SerDe;
-import org.apache.hadoop.hive.serde2.SerDeUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hive.common.util.ReflectionUtil;
-
-/**
- * serialize row by user specified serde and call toString() to make string type result
- */
-public class DefaultFetchFormatter<T> implements FetchFormatter<String> {
-
- private SerDe mSerde;
-
- @Override
- public void initialize(Configuration hconf, Properties props) throws HiveException {
- try {
- mSerde = initializeSerde(hconf, props);
- } catch (Exception e) {
- throw new HiveException(e);
- }
- }
-
- private SerDe initializeSerde(Configuration conf, Properties props) throws Exception {
- String serdeName = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEFETCHOUTPUTSERDE);
- Class<? extends SerDe> serdeClass = Class.forName(serdeName, true,
- Utilities.getSessionSpecifiedClassLoader()).asSubclass(SerDe.class);
- // cast only needed for Hadoop 0.17 compatibility
- SerDe serde = ReflectionUtil.newInstance(serdeClass, null);
-
- Properties serdeProps = new Properties();
- if (serde instanceof DelimitedJSONSerDe) {
- serdeProps.put(SERIALIZATION_FORMAT, props.getProperty(SERIALIZATION_FORMAT));
- serdeProps.put(SERIALIZATION_NULL_FORMAT, props.getProperty(SERIALIZATION_NULL_FORMAT));
- }
- SerDeUtils.initializeSerDe(serde, conf, serdeProps, null);
- return serde;
- }
-
- @Override
- public String convert(Object row, ObjectInspector rowOI) throws Exception {
- return mSerde.serialize(row, rowOI).toString();
- }
-
- @Override
- public void close() throws IOException {
- }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchFormatter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchFormatter.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchFormatter.java
deleted file mode 100644
index c2ed0d6..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchFormatter.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.exec;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.serde2.SerDeUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-
-/**
- * internal-use only
- *
- * Used in ListSinkOperator for formatting final output
- */
-public interface FetchFormatter<T> extends Closeable {
-
- void initialize(Configuration hconf, Properties props) throws Exception;
-
- T convert(Object row, ObjectInspector rowOI) throws Exception;
-
- public static class ThriftFormatter implements FetchFormatter<Object> {
-
- int protocol;
-
- @Override
- public void initialize(Configuration hconf, Properties props) throws Exception {
- protocol = hconf.getInt(ListSinkOperator.OUTPUT_PROTOCOL, 0);
- }
-
- @Override
- public Object convert(Object row, ObjectInspector rowOI) throws Exception {
- StructObjectInspector structOI = (StructObjectInspector) rowOI;
- List<? extends StructField> fields = structOI.getAllStructFieldRefs();
-
- Object[] converted = new Object[fields.size()];
- for (int i = 0 ; i < converted.length; i++) {
- StructField fieldRef = fields.get(i);
- Object field = structOI.getStructFieldData(row, fieldRef);
- converted[i] = field == null ? null :
- SerDeUtils.toThriftPayload(field, fieldRef.getFieldObjectInspector(), protocol);
- }
- return converted;
- }
-
- @Override
- public void close() throws IOException {
- }
- }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
index 0b0c336..b96ea04 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
@@ -48,12 +48,10 @@ import org.apache.hadoop.util.StringUtils;
   **/
  public class FetchTask extends Task<FetchWork> implements Serializable {
    private static final long serialVersionUID = 1L;
-
    private int maxRows = 100;
    private FetchOperator fetch;
    private ListSinkOperator sink;
    private int totalRows;
-
    private static transient final Logger LOG = LoggerFactory.getLogger(FetchTask.class);

    public FetchTask() {
@@ -186,4 +184,5 @@ public class FetchTask extends Task<FetchWork> implements Serializable {
        fetch.clearFetchContext();
      }
    }
+
  }

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index ec6381b..3ec63ee 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -71,6 +71,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructField;
  import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
  import org.apache.hadoop.hive.serde2.objectinspector.SubStructObjectInspector;
  import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
+import org.apache.hadoop.hive.serde2.thrift.ThriftJDBCBinarySerDe;
  import org.apache.hadoop.hive.shims.HadoopShims.StoragePolicyShim;
  import org.apache.hadoop.hive.shims.HadoopShims.StoragePolicyValue;
  import org.apache.hadoop.hive.shims.ShimLoader;
@@ -122,7 +123,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
    protected transient long numRows = 0;
    protected transient long cntr = 1;
    protected transient long logEveryNRows = 0;
-
+ protected transient int rowIndex = 0;
    /**
     * Counters.
     */
@@ -374,7 +375,6 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
        // half of the script.timeout but less than script.timeout, we will still
        // be able to report progress.
        timeOut = hconf.getInt("mapred.healthChecker.script.timeout", 600000) / 2;
-
        if (hconf instanceof JobConf) {
          jc = (JobConf) hconf;
        } else {
@@ -656,6 +656,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements

    protected Writable recordValue;

+
    @Override
    public void process(Object row, int tag) throws HiveException {
      /* Create list bucketing sub-directory only if stored-as-directories is on. */
@@ -717,8 +718,12 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
          } else {
            fpaths = fsp;
          }
- // use SerDe to serialize r, and write it out
          recordValue = serializer.serialize(row, inputObjInspectors[0]);
+ // if serializer is ThriftJDBCBinarySerDe, then recordValue is null if the buffer is not full (the size of buffer
+ // is kept track of in the SerDe)
+ if (recordValue == null) {
+ return;
+ }
        }

        rowOutWriters = fpaths.outWriters;
@@ -745,6 +750,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
          LOG.info(toString() + ": records written - " + numRows);
        }

+ // This should always be 0 for the final result file
        int writerOffset = findWriterOffset(row);
        // This if/else chain looks ugly in the inner loop, but given that it will be 100% the same
        // for a given operator branch prediction should work quite nicely on it.
@@ -1012,9 +1018,22 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements

      lastProgressReport = System.currentTimeMillis();
      if (!abort) {
+ // If serializer is ThriftJDBCBinarySerDe, then it buffers rows to a certain limit (hive.server2.thrift.resultset.max.fetch.size)
+ // and serializes the whole batch when the buffer is full. The serialize returns null if the buffer is not full
+ // (the size of buffer is kept track of in the ThriftJDBCBinarySerDe).
+ if (conf.isHiveServerQuery() && HiveConf.getBoolVar(hconf,
+ HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS) &&
+ serializer.getClass().getName().equalsIgnoreCase(ThriftJDBCBinarySerDe.class.getName())) {
+ try {
+ recordValue = serializer.serialize(null, inputObjInspectors[0]);
+ rowOutWriters = fpaths.outWriters;
+ rowOutWriters[0].write(recordValue);
+ } catch (SerDeException | IOException e) {
+ throw new HiveException(e);
+ }
+ }
        for (FSPaths fsp : valToPaths.values()) {
          fsp.closeWriters(abort);
-
          // before closing the operator check if statistics gathering is requested
          // and is provided by record writer. this is different from the statistics
          // gathering done in processOp(). In processOp(), for each row added

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java
index b081cd0..9bf363c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java
@@ -27,6 +27,9 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
  import org.apache.hadoop.hive.ql.plan.ListSinkDesc;
  import org.apache.hadoop.hive.ql.plan.api.OperatorType;
  import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.DefaultFetchFormatter;
+import org.apache.hadoop.hive.serde2.FetchFormatter;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
  import org.apache.hadoop.util.ReflectionUtils;

  /**
@@ -34,10 +37,6 @@ import org.apache.hadoop.util.ReflectionUtils;
   * and finally arrives to this operator.
   */
  public class ListSinkOperator extends Operator<ListSinkDesc> {
-
- public static final String OUTPUT_FORMATTER = "output.formatter";
- public static final String OUTPUT_PROTOCOL = "output.protocol";
-
    private transient List res;
    private transient FetchFormatter fetcher;
    private transient int numRows;
@@ -62,7 +61,7 @@ public class ListSinkOperator extends Operator<ListSinkDesc> {
    }

    private FetchFormatter initializeFetcher(Configuration conf) throws Exception {
- String formatterName = conf.get(OUTPUT_FORMATTER);
+ String formatterName = conf.get(SerDeUtils.LIST_SINK_OUTPUT_FORMATTER);
      FetchFormatter fetcher;
      if (formatterName != null && !formatterName.isEmpty()) {
        Class<? extends FetchFormatter> fetcherClass = Class.forName(formatterName, true,
@@ -71,12 +70,10 @@ public class ListSinkOperator extends Operator<ListSinkDesc> {
      } else {
        fetcher = new DefaultFetchFormatter();
      }
-
      // selectively used by fetch formatter
      Properties props = new Properties();
      props.put(serdeConstants.SERIALIZATION_FORMAT, "" + Utilities.tabCode);
      props.put(serdeConstants.SERIALIZATION_NULL_FORMAT, getConf().getSerializationNullFormat());
-
      fetcher.initialize(conf, props);
      return fetcher;
    }

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java
index cf3bbf0..de7b151 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java
@@ -263,6 +263,11 @@ public class QB {
      this.isQuery = isQuery;
    }

+ /**
+ * Set to true in SemanticAnalyzer.getMetadataForDestFile,
+ * if destination is a file and query is not CTAS
+ * @return
+ */
    public boolean getIsQuery() {
      return isQuery;
    }

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 96df189..005b53f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -203,6 +203,7 @@ import org.apache.hadoop.hive.ql.util.ResourceDownloader;
  import org.apache.hadoop.hive.serde.serdeConstants;
  import org.apache.hadoop.hive.serde2.Deserializer;
  import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe;
+import org.apache.hadoop.hive.serde2.NoOpFetchFormatter;
  import org.apache.hadoop.hive.serde2.NullStructSerDe;
  import org.apache.hadoop.hive.serde2.SerDeException;
  import org.apache.hadoop.hive.serde2.SerDeUtils;
@@ -214,6 +215,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
  import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
  import org.apache.hadoop.hive.serde2.objectinspector.StructField;
  import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.thrift.ThriftJDBCBinarySerDe;
  import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
  import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
  import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
@@ -6834,8 +6836,23 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {

        if (tblDesc == null) {
          if (qb.getIsQuery()) {
- String fileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT);
- table_desc = PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, fileFormat);
+ String fileFormat;
+ if (SessionState.get().isHiveServerQuery() &&
+ conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS)) {
+ fileFormat = "SequenceFile";
+ HiveConf.setVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT, fileFormat);
+ table_desc=
+ PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, fileFormat,
+ ThriftJDBCBinarySerDe.class);
+ // Set the fetch formatter to be a no-op for the ListSinkOperator, since we'll
+ // write out formatted thrift objects to SequenceFile
+ conf.set(SerDeUtils.LIST_SINK_OUTPUT_FORMATTER, NoOpFetchFormatter.class.getName());
+ } else {
+ fileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT);
+ table_desc =
+ PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, fileFormat,
+ LazySimpleSerDe.class);
+ }
          } else {
            table_desc = PlanUtils.getDefaultTableDesc(qb.getDirectoryDesc(), cols, colTypes);
          }
@@ -6907,6 +6924,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
        dpCtx,
        dest_path);

+ fileSinkDesc.setHiveServerQuery(SessionState.get().isHiveServerQuery());
      // If this is an insert, update, or delete on an ACID table then mark that so the
      // FileSinkOperator knows how to properly write to it.
      if (destTableIsAcid) {

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
index f7d7a40..75ca5f9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
@@ -58,7 +58,15 @@ import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
  import org.apache.hadoop.hive.ql.plan.MoveWork;
  import org.apache.hadoop.hive.ql.plan.PlanUtils;
  import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.session.SessionState;
  import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.DefaultFetchFormatter;
+import org.apache.hadoop.hive.serde2.NoOpFetchFormatter;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.hive.serde2.thrift.ThriftFormatter;
+import org.apache.hadoop.hive.serde2.thrift.ThriftJDBCBinarySerDe;

  import com.google.common.collect.Interner;
  import com.google.common.collect.Interners;
@@ -97,6 +105,20 @@ public abstract class TaskCompiler {
      int outerQueryLimit = pCtx.getQueryProperties().getOuterQueryLimit();

      if (pCtx.getFetchTask() != null) {
+ if (pCtx.getFetchTask().getTblDesc() == null) {
+ return;
+ }
+ pCtx.getFetchTask().getWork().setHiveServerQuery(SessionState.get().isHiveServerQuery());
+ TableDesc resultTab = pCtx.getFetchTask().getTblDesc();
+ // If the serializer is ThriftJDBCBinarySerDe, then it requires that NoOpFetchFormatter be used. But when it isn't,
+ // then either the ThriftFormatter or the DefaultFetchFormatter should be used.
+ if (!resultTab.getSerdeClassName().equalsIgnoreCase(ThriftJDBCBinarySerDe.class.getName())) {
+ if (SessionState.get().isHiveServerQuery()) {
+ conf.set(SerDeUtils.LIST_SINK_OUTPUT_FORMATTER,ThriftFormatter.class.getName());
+ } else {
+ conf.set(SerDeUtils.LIST_SINK_OUTPUT_FORMATTER, DefaultFetchFormatter.class.getName());
+ }
+ }
        return;
      }

@@ -117,13 +139,34 @@ public abstract class TaskCompiler {
        String cols = loadFileDesc.getColumns();
        String colTypes = loadFileDesc.getColumnTypes();

+ String resFileFormat;
        TableDesc resultTab = pCtx.getFetchTableDesc();
        if (resultTab == null) {
- String resFileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT);
- resultTab = PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat);
+ resFileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT);
+ if (SessionState.get().isHiveServerQuery() && (conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS))
+ && (resFileFormat.equalsIgnoreCase("SequenceFile"))) {
+ resultTab =
+ PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat,
+ ThriftJDBCBinarySerDe.class);
+ // Set the fetch formatter to be a no-op for the ListSinkOperator, since we'll
+ // read formatted thrift objects from the output SequenceFile written by Tasks.
+ conf.set(SerDeUtils.LIST_SINK_OUTPUT_FORMATTER, NoOpFetchFormatter.class.getName());
+ } else {
+ resultTab =
+ PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat,
+ LazySimpleSerDe.class);
+ }
+ } else {
+ if (resultTab.getProperties().getProperty(serdeConstants.SERIALIZATION_LIB)
+ .equalsIgnoreCase(ThriftJDBCBinarySerDe.class.getName())) {
+ // Set the fetch formatter to be a no-op for the ListSinkOperator, since we'll
+ // read formatted thrift objects from the output SequenceFile written by Tasks.
+ conf.set(SerDeUtils.LIST_SINK_OUTPUT_FORMATTER, NoOpFetchFormatter.class.getName());
+ }
        }

        FetchWork fetch = new FetchWork(loadFileDesc.getSourcePath(), resultTab, outerQueryLimit);
+ fetch.setHiveServerQuery(SessionState.get().isHiveServerQuery());
        fetch.setSource(pCtx.getFetchSource());
        fetch.setSink(pCtx.getFetchSink());

@@ -322,8 +365,19 @@ public abstract class TaskCompiler {
      String cols = loadFileWork.get(0).getColumns();
      String colTypes = loadFileWork.get(0).getColumnTypes();

- String resFileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT);
- TableDesc resultTab = PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat);
+ String resFileFormat;
+ TableDesc resultTab;
+ if (SessionState.get().isHiveServerQuery() && conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS)) {
+ resFileFormat = "SequenceFile";
+ resultTab =
+ PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat,
+ ThriftJDBCBinarySerDe.class);
+ } else {
+ resFileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT);
+ resultTab =
+ PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat,
+ LazySimpleSerDe.class);
+ }

      fetch = new FetchWork(loadFileWork.get(0).getSourcePath(), resultTab, outerQueryLimit);


http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java
index d68c64c..8ea6440 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java
@@ -63,6 +63,16 @@ public class FetchWork implements Serializable {
     */
    private String serializationNullFormat = "NULL";

+ private boolean isHiveServerQuery;
+
+ public boolean isHiveServerQuery() {
+ return isHiveServerQuery;
+ }
+
+ public void setHiveServerQuery(boolean isHiveServerQuery) {
+ this.isHiveServerQuery = isHiveServerQuery;
+ }
+
    public FetchWork() {
    }


http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
index 07fd2dc..0064fca 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
@@ -97,6 +97,7 @@ public class FileSinkDesc extends AbstractOperatorDesc {

    private transient Table table;
    private Path destPath;
+ private boolean isHiveServerQuery;

    public FileSinkDesc() {
    }
@@ -160,6 +161,14 @@ public class FileSinkDesc extends AbstractOperatorDesc {
      return ret;
    }

+ public boolean isHiveServerQuery() {
+ return this.isHiveServerQuery;
+ }
+
+ public void setHiveServerQuery(boolean isHiveServerQuery) {
+ this.isHiveServerQuery = isHiveServerQuery;
+ }
+
    @Explain(displayName = "directory", explainLevels = { Level.EXTENDED })
    public Path getDirName() {
      return dirName;

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
index 2992568..c39a46f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
@@ -280,13 +280,13 @@ public final class PlanUtils {
    }

    public static TableDesc getDefaultQueryOutputTableDesc(String cols, String colTypes,
- String fileFormat) {
- TableDesc tblDesc = getTableDesc(LazySimpleSerDe.class, "" + Utilities.ctrlaCode, cols, colTypes,
- false, false, fileFormat);
- //enable escaping
+ String fileFormat, Class<? extends Deserializer> serdeClass) {
+ TableDesc tblDesc =
+ getTableDesc(serdeClass, "" + Utilities.ctrlaCode, cols, colTypes, false, false, fileFormat);
+ // enable escaping
      tblDesc.getProperties().setProperty(serdeConstants.ESCAPE_CHAR, "\\");
      tblDesc.getProperties().setProperty(serdeConstants.SERIALIZATION_ESCAPE_CRLF, "true");
- //enable extended nesting levels
+ // enable extended nesting levels
      tblDesc.getProperties().setProperty(
          LazySerDeParameters.SERIALIZATION_EXTEND_ADDITIONAL_NESTING_LEVELS, "true");
      return tblDesc;

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/serde/pom.xml
----------------------------------------------------------------------
diff --git a/serde/pom.xml b/serde/pom.xml
index cea7fce..9f50764 100644
--- a/serde/pom.xml
+++ b/serde/pom.xml
@@ -41,6 +41,11 @@
      </dependency>
      <dependency>
        <groupId>org.apache.hive</groupId>
+ <artifactId>hive-service-rpc</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
        <artifactId>hive-shims</artifactId>
        <version>${project.version}</version>
      </dependency>

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/serde/src/java/org/apache/hadoop/hive/serde2/DefaultFetchFormatter.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/DefaultFetchFormatter.java b/serde/src/java/org/apache/hadoop/hive/serde2/DefaultFetchFormatter.java
new file mode 100644
index 0000000..3038037
--- /dev/null
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/DefaultFetchFormatter.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.serde2;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT;
+import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_NULL_FORMAT;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hive.common.util.ReflectionUtil;
+
+/**
+ * serialize row by user specified serde and call toString() to make string type result
+ */
+public class DefaultFetchFormatter<T> implements FetchFormatter<String> {
+
+ private SerDe mSerde;
+
+ @Override
+ public void initialize(Configuration hconf, Properties props) throws SerDeException {
+ mSerde = initializeSerde(hconf, props);
+ }
+
+ private SerDe initializeSerde(Configuration conf, Properties props) throws SerDeException {
+ String serdeName = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEFETCHOUTPUTSERDE);
+ Class<? extends SerDe> serdeClass;
+ try {
+ serdeClass =
+ Class.forName(serdeName, true, JavaUtils.getClassLoader()).asSubclass(SerDe.class);
+ } catch (ClassNotFoundException e) {
+ throw new SerDeException(e);
+ }
+ // cast only needed for Hadoop 0.17 compatibility
+ SerDe serde = ReflectionUtil.newInstance(serdeClass, null);
+ Properties serdeProps = new Properties();
+ if (serde instanceof DelimitedJSONSerDe) {
+ serdeProps.put(SERIALIZATION_FORMAT, props.getProperty(SERIALIZATION_FORMAT));
+ serdeProps.put(SERIALIZATION_NULL_FORMAT, props.getProperty(SERIALIZATION_NULL_FORMAT));
+ }
+ SerDeUtils.initializeSerDe(serde, conf, serdeProps, null);
+ return serde;
+ }
+
+ @Override
+ public String convert(Object row, ObjectInspector rowOI) throws Exception {
+ return mSerde.serialize(row, rowOI).toString();
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/serde/src/java/org/apache/hadoop/hive/serde2/FetchFormatter.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/FetchFormatter.java b/serde/src/java/org/apache/hadoop/hive/serde2/FetchFormatter.java
new file mode 100644
index 0000000..5cc93aa
--- /dev/null
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/FetchFormatter.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.serde2;
+
+import java.io.Closeable;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+
+/**
+ * internal-use only
+ *
+ * Used in ListSinkOperator for formatting final output
+ */
+public interface FetchFormatter<T> extends Closeable {
+
+ void initialize(Configuration hconf, Properties props) throws Exception;
+
+ T convert(Object row, ObjectInspector rowOI) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/serde/src/java/org/apache/hadoop/hive/serde2/NoOpFetchFormatter.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/NoOpFetchFormatter.java b/serde/src/java/org/apache/hadoop/hive/serde2/NoOpFetchFormatter.java
new file mode 100644
index 0000000..91929f1
--- /dev/null
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/NoOpFetchFormatter.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.serde2;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+
+/**
+ * A No-op fetch formatter.
+ * ListSinkOperator uses this when reading from the destination table which has data serialized by
+ * ThriftJDBCBinarySerDe to a SequenceFile.
+ */
+public class NoOpFetchFormatter<T> implements FetchFormatter<Object> {
+
+ @Override
+ public void initialize(Configuration hconf, Properties props) throws SerDeException {
+ }
+
+ // this returns the row as is because this formatter is only called when
+ // the ThriftJDBCBinarySerDe was used to serialize the rows to thrift-able objects.
+ @Override
+ public Object convert(Object row, ObjectInspector rowOI) throws Exception {
+ return new Object[] { row };
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/serde/src/java/org/apache/hadoop/hive/serde2/SerDeUtils.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/SerDeUtils.java b/serde/src/java/org/apache/hadoop/hive/serde2/SerDeUtils.java
index 90439a2..6e08dfd 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/SerDeUtils.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/SerDeUtils.java
@@ -71,7 +71,8 @@ public final class SerDeUtils {

    // lower case null is used within json objects
    private static final String JSON_NULL = "null";
-
+ public static final String LIST_SINK_OUTPUT_FORMATTER = "list.sink.output.formatter";
+ public static final String LIST_SINK_OUTPUT_PROTOCOL = "list.sink.output.protocol";
    public static final Logger LOG = LoggerFactory.getLogger(SerDeUtils.class.getName());

    /**

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ColumnBuffer.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ColumnBuffer.java b/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ColumnBuffer.java
new file mode 100644
index 0000000..929c405
--- /dev/null
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ColumnBuffer.java
@@ -0,0 +1,439 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.serde2.thrift;
+
+import java.nio.ByteBuffer;
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.hive.service.rpc.thrift.TBinaryColumn;
+import org.apache.hive.service.rpc.thrift.TBoolColumn;
+import org.apache.hive.service.rpc.thrift.TByteColumn;
+import org.apache.hive.service.rpc.thrift.TColumn;
+import org.apache.hive.service.rpc.thrift.TDoubleColumn;
+import org.apache.hive.service.rpc.thrift.TI16Column;
+import org.apache.hive.service.rpc.thrift.TI32Column;
+import org.apache.hive.service.rpc.thrift.TI64Column;
+import org.apache.hive.service.rpc.thrift.TStringColumn;
+
+import com.google.common.primitives.Booleans;
+import com.google.common.primitives.Bytes;
+import com.google.common.primitives.Doubles;
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
+import com.google.common.primitives.Shorts;
+
+/**
+ * ColumnBuffer
+ */
+public class ColumnBuffer extends AbstractList {
+
+ private static final int DEFAULT_SIZE = 100;
+
+ private final Type type;
+
+ private BitSet nulls;
+
+ private int size;
+ private boolean[] boolVars;
+ private byte[] byteVars;
+ private short[] shortVars;
+ private int[] intVars;
+ private long[] longVars;
+ private double[] doubleVars;
+ private List<String> stringVars;
+ private List<ByteBuffer> binaryVars;
+
+ public ColumnBuffer(Type type, BitSet nulls, Object values) {
+ this.type = type;
+ this.nulls = nulls;
+ if (type == Type.BOOLEAN_TYPE) {
+ boolVars = (boolean[]) values;
+ size = boolVars.length;
+ } else if (type == Type.TINYINT_TYPE) {
+ byteVars = (byte[]) values;
+ size = byteVars.length;
+ } else if (type == Type.SMALLINT_TYPE) {
+ shortVars = (short[]) values;
+ size = shortVars.length;
+ } else if (type == Type.INT_TYPE) {
+ intVars = (int[]) values;
+ size = intVars.length;
+ } else if (type == Type.BIGINT_TYPE) {
+ longVars = (long[]) values;
+ size = longVars.length;
+ } else if (type == Type.DOUBLE_TYPE) {
+ doubleVars = (double[]) values;
+ size = doubleVars.length;
+ } else if (type == Type.BINARY_TYPE) {
+ binaryVars = (List<ByteBuffer>) values;
+ size = binaryVars.size();
+ } else if (type == Type.STRING_TYPE) {
+ stringVars = (List<String>) values;
+ size = stringVars.size();
+ } else {
+ throw new IllegalStateException("invalid union object");
+ }
+ }
+
+ public ColumnBuffer(Type type) {
+ nulls = new BitSet();
+ switch (type) {
+ case BOOLEAN_TYPE:
+ boolVars = new boolean[DEFAULT_SIZE];
+ break;
+ case TINYINT_TYPE:
+ byteVars = new byte[DEFAULT_SIZE];
+ break;
+ case SMALLINT_TYPE:
+ shortVars = new short[DEFAULT_SIZE];
+ break;
+ case INT_TYPE:
+ intVars = new int[DEFAULT_SIZE];
+ break;
+ case BIGINT_TYPE:
+ longVars = new long[DEFAULT_SIZE];
+ break;
+ case FLOAT_TYPE:
+ case DOUBLE_TYPE:
+ type = Type.DOUBLE_TYPE;
+ doubleVars = new double[DEFAULT_SIZE];
+ break;
+ case BINARY_TYPE:
+ binaryVars = new ArrayList<ByteBuffer>();
+ break;
+ default:
+ type = Type.STRING_TYPE;
+ stringVars = new ArrayList<String>();
+ }
+ this.type = type;
+ }
+
+ public ColumnBuffer(TColumn colValues) {
+ if (colValues.isSetBoolVal()) {
+ type = Type.BOOLEAN_TYPE;
+ nulls = toBitset(colValues.getBoolVal().getNulls());
+ boolVars = Booleans.toArray(colValues.getBoolVal().getValues());
+ size = boolVars.length;
+ } else if (colValues.isSetByteVal()) {
+ type = Type.TINYINT_TYPE;
+ nulls = toBitset(colValues.getByteVal().getNulls());
+ byteVars = Bytes.toArray(colValues.getByteVal().getValues());
+ size = byteVars.length;
+ } else if (colValues.isSetI16Val()) {
+ type = Type.SMALLINT_TYPE;
+ nulls = toBitset(colValues.getI16Val().getNulls());
+ shortVars = Shorts.toArray(colValues.getI16Val().getValues());
+ size = shortVars.length;
+ } else if (colValues.isSetI32Val()) {
+ type = Type.INT_TYPE;
+ nulls = toBitset(colValues.getI32Val().getNulls());
+ intVars = Ints.toArray(colValues.getI32Val().getValues());
+ size = intVars.length;
+ } else if (colValues.isSetI64Val()) {
+ type = Type.BIGINT_TYPE;
+ nulls = toBitset(colValues.getI64Val().getNulls());
+ longVars = Longs.toArray(colValues.getI64Val().getValues());
+ size = longVars.length;
+ } else if (colValues.isSetDoubleVal()) {
+ type = Type.DOUBLE_TYPE;
+ nulls = toBitset(colValues.getDoubleVal().getNulls());
+ doubleVars = Doubles.toArray(colValues.getDoubleVal().getValues());
+ size = doubleVars.length;
+ } else if (colValues.isSetBinaryVal()) {
+ type = Type.BINARY_TYPE;
+ nulls = toBitset(colValues.getBinaryVal().getNulls());
+ binaryVars = colValues.getBinaryVal().getValues();
+ size = binaryVars.size();
+ } else if (colValues.isSetStringVal()) {
+ type = Type.STRING_TYPE;
+ nulls = toBitset(colValues.getStringVal().getNulls());
+ stringVars = colValues.getStringVal().getValues();
+ size = stringVars.size();
+ } else {
+ throw new IllegalStateException("invalid union object");
+ }
+ }
+
+ public ColumnBuffer extractSubset(int start, int end) {
+ BitSet subNulls = nulls.get(start, end);
+ if (type == Type.BOOLEAN_TYPE) {
+ ColumnBuffer subset =
+ new ColumnBuffer(type, subNulls, Arrays.copyOfRange(boolVars, start, end));
+ boolVars = Arrays.copyOfRange(boolVars, end, size);
+ nulls = nulls.get(start, size);
+ size = boolVars.length;
+ return subset;
+ }
+ if (type == Type.TINYINT_TYPE) {
+ ColumnBuffer subset =
+ new ColumnBuffer(type, subNulls, Arrays.copyOfRange(byteVars, start, end));
+ byteVars = Arrays.copyOfRange(byteVars, end, size);
+ nulls = nulls.get(start, size);
+ size = byteVars.length;
+ return subset;
+ }
+ if (type == Type.SMALLINT_TYPE) {
+ ColumnBuffer subset =
+ new ColumnBuffer(type, subNulls, Arrays.copyOfRange(shortVars, start, end));
+ shortVars = Arrays.copyOfRange(shortVars, end, size);
+ nulls = nulls.get(start, size);
+ size = shortVars.length;
+ return subset;
+ }
+ if (type == Type.INT_TYPE) {
+ ColumnBuffer subset =
+ new ColumnBuffer(type, subNulls, Arrays.copyOfRange(intVars, start, end));
+ intVars = Arrays.copyOfRange(intVars, end, size);
+ nulls = nulls.get(start, size);
+ size = intVars.length;
+ return subset;
+ }
+ if (type == Type.BIGINT_TYPE) {
+ ColumnBuffer subset =
+ new ColumnBuffer(type, subNulls, Arrays.copyOfRange(longVars, start, end));
+ longVars = Arrays.copyOfRange(longVars, end, size);
+ nulls = nulls.get(start, size);
+ size = longVars.length;
+ return subset;
+ }
+ if (type == Type.DOUBLE_TYPE) {
+ ColumnBuffer subset =
+ new ColumnBuffer(type, subNulls, Arrays.copyOfRange(doubleVars, start, end));
+ doubleVars = Arrays.copyOfRange(doubleVars, end, size);
+ nulls = nulls.get(start, size);
+ size = doubleVars.length;
+ return subset;
+ }
+ if (type == Type.BINARY_TYPE) {
+ ColumnBuffer subset = new ColumnBuffer(type, subNulls, binaryVars.subList(start, end));
+ binaryVars = binaryVars.subList(end, binaryVars.size());
+ nulls = nulls.get(start, size);
+ size = binaryVars.size();
+ return subset;
+ }
+ if (type == Type.STRING_TYPE) {
+ ColumnBuffer subset = new ColumnBuffer(type, subNulls, stringVars.subList(start, end));
+ stringVars = stringVars.subList(end, stringVars.size());
+ nulls = nulls.get(start, size);
+ size = stringVars.size();
+ return subset;
+ }
+ throw new IllegalStateException("invalid union object");
+ }
+
+ private static final byte[] MASKS = new byte[] {
+ 0x01, 0x02, 0x04, 0x08, 0x10, 0x20, 0x40, (byte)0x80
+ };
+
+ private static BitSet toBitset(byte[] nulls) {
+ BitSet bitset = new BitSet();
+ int bits = nulls.length * 8;
+ for (int i = 0; i < bits; i++) {
+ bitset.set(i, (nulls[i / 8] & MASKS[i % 8]) != 0);
+ }
+ return bitset;
+ }
+
+ private static byte[] toBinary(BitSet bitset) {
+ byte[] nulls = new byte[1 + (bitset.length() / 8)];
+ for (int i = 0; i < bitset.length(); i++) {
+ nulls[i / 8] |= bitset.get(i) ? MASKS[i % 8] : 0;
+ }
+ return nulls;
+ }
+
+ public Type getType() {
+ return type;
+ }
+
+ @Override
+ public Object get(int index) {
+ if (nulls.get(index)) {
+ return null;
+ }
+ switch (type) {
+ case BOOLEAN_TYPE:
+ return boolVars[index];
+ case TINYINT_TYPE:
+ return byteVars[index];
+ case SMALLINT_TYPE:
+ return shortVars[index];
+ case INT_TYPE:
+ return intVars[index];
+ case BIGINT_TYPE:
+ return longVars[index];
+ case DOUBLE_TYPE:
+ return doubleVars[index];
+ case STRING_TYPE:
+ return stringVars.get(index);
+ case BINARY_TYPE:
+ return binaryVars.get(index).array();
+ }
+ return null;
+ }
+
+ @Override
+ public int size() {
+ return size;
+ }
+
+ public TColumn toTColumn() {
+ TColumn value = new TColumn();
+ ByteBuffer nullMasks = ByteBuffer.wrap(toBinary(nulls));
+ switch (type) {
+ case BOOLEAN_TYPE:
+ value.setBoolVal(new TBoolColumn(Booleans.asList(Arrays.copyOfRange(boolVars, 0, size)),
+ nullMasks));
+ break;
+ case TINYINT_TYPE:
+ value.setByteVal(new TByteColumn(Bytes.asList(Arrays.copyOfRange(byteVars, 0, size)),
+ nullMasks));
+ break;
+ case SMALLINT_TYPE:
+ value.setI16Val(new TI16Column(Shorts.asList(Arrays.copyOfRange(shortVars, 0, size)),
+ nullMasks));
+ break;
+ case INT_TYPE:
+ value.setI32Val(new TI32Column(Ints.asList(Arrays.copyOfRange(intVars, 0, size)), nullMasks));
+ break;
+ case BIGINT_TYPE:
+ value
+ .setI64Val(new TI64Column(Longs.asList(Arrays.copyOfRange(longVars, 0, size)), nullMasks));
+ break;
+ case DOUBLE_TYPE:
+ value.setDoubleVal(new TDoubleColumn(Doubles.asList(Arrays.copyOfRange(doubleVars, 0, size)),
+ nullMasks));
+ break;
+ case STRING_TYPE:
+ value.setStringVal(new TStringColumn(stringVars, nullMasks));
+ break;
+ case BINARY_TYPE:
+ value.setBinaryVal(new TBinaryColumn(binaryVars, nullMasks));
+ break;
+ }
+ return value;
+ }
+
+ private static final ByteBuffer EMPTY_BINARY = ByteBuffer.allocate(0);
+ private static final String EMPTY_STRING = "";
+
+ public void addValue(Object field) {
+ addValue(this.type, field);
+ }
+
+ public void addValue(Type type, Object field) {
+ switch (type) {
+ case BOOLEAN_TYPE:
+ nulls.set(size, field == null);
+ boolVars()[size] = field == null ? true : (Boolean) field;
+ break;
+ case TINYINT_TYPE:
+ nulls.set(size, field == null);
+ byteVars()[size] = field == null ? 0 : (Byte) field;
+ break;
+ case SMALLINT_TYPE:
+ nulls.set(size, field == null);
+ shortVars()[size] = field == null ? 0 : (Short) field;
+ break;
+ case INT_TYPE:
+ nulls.set(size, field == null);
+ intVars()[size] = field == null ? 0 : (Integer) field;
+ break;
+ case BIGINT_TYPE:
+ nulls.set(size, field == null);
+ longVars()[size] = field == null ? 0 : (Long) field;
+ break;
+ case FLOAT_TYPE:
+ nulls.set(size, field == null);
+ doubleVars()[size] = field == null ? 0 : new Double(field.toString());
+ break;
+ case DOUBLE_TYPE:
+ nulls.set(size, field == null);
+ doubleVars()[size] = field == null ? 0 : (Double) field;
+ break;
+ case BINARY_TYPE:
+ nulls.set(binaryVars.size(), field == null);
+ binaryVars.add(field == null ? EMPTY_BINARY : ByteBuffer.wrap((byte[]) field));
+ break;
+ default:
+ nulls.set(stringVars.size(), field == null);
+ stringVars.add(field == null ? EMPTY_STRING : String.valueOf(field));
+ break;
+ }
+ size++;
+ }
+
+ private boolean[] boolVars() {
+ if (boolVars.length == size) {
+ boolean[] newVars = new boolean[size << 1];
+ System.arraycopy(boolVars, 0, newVars, 0, size);
+ return boolVars = newVars;
+ }
+ return boolVars;
+ }
+
+ private byte[] byteVars() {
+ if (byteVars.length == size) {
+ byte[] newVars = new byte[size << 1];
+ System.arraycopy(byteVars, 0, newVars, 0, size);
+ return byteVars = newVars;
+ }
+ return byteVars;
+ }
+
+ private short[] shortVars() {
+ if (shortVars.length == size) {
+ short[] newVars = new short[size << 1];
+ System.arraycopy(shortVars, 0, newVars, 0, size);
+ return shortVars = newVars;
+ }
+ return shortVars;
+ }
+
+ private int[] intVars() {
+ if (intVars.length == size) {
+ int[] newVars = new int[size << 1];
+ System.arraycopy(intVars, 0, newVars, 0, size);
+ return intVars = newVars;
+ }
+ return intVars;
+ }
+
+ private long[] longVars() {
+ if (longVars.length == size) {
+ long[] newVars = new long[size << 1];
+ System.arraycopy(longVars, 0, newVars, 0, size);
+ return longVars = newVars;
+ }
+ return longVars;
+ }
+
+ private double[] doubleVars() {
+ if (doubleVars.length == size) {
+ double[] newVars = new double[size << 1];
+ System.arraycopy(doubleVars, 0, newVars, 0, size);
+ return doubleVars = newVars;
+ }
+ return doubleVars;
+ }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftFormatter.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftFormatter.java b/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftFormatter.java
new file mode 100644
index 0000000..a4c120e
--- /dev/null
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftFormatter.java
@@ -0,0 +1,40 @@
+package org.apache.hadoop.hive.serde2.thrift;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.serde2.FetchFormatter;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+
+public class ThriftFormatter implements FetchFormatter<Object> {
+
+ int protocol;
+
+ @Override
+ public void initialize(Configuration hconf, Properties props) throws Exception {
+ protocol = hconf.getInt(SerDeUtils.LIST_SINK_OUTPUT_PROTOCOL, 0);
+ }
+
+ @Override
+ public Object convert(Object row, ObjectInspector rowOI) throws Exception {
+ StructObjectInspector structOI = (StructObjectInspector) rowOI;
+ List<? extends StructField> fields = structOI.getAllStructFieldRefs();
+ Object[] converted = new Object[fields.size()];
+ for (int i = 0 ; i < converted.length; i++) {
+ StructField fieldRef = fields.get(i);
+ Object field = structOI.getStructFieldData(row, fieldRef);
+ converted[i] = field == null ? null :
+ SerDeUtils.toThriftPayload(field, fieldRef.getFieldObjectInspector(), protocol);
+ }
+ return converted;
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftJDBCBinarySerDe.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftJDBCBinarySerDe.java b/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftJDBCBinarySerDe.java
new file mode 100644
index 0000000..5c31974
--- /dev/null
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftJDBCBinarySerDe.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.serde2.thrift;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.ByteStream;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hive.service.rpc.thrift.TColumn;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TIOStreamTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This SerDe is used to serialize the final output to thrift-able objects directly in the SerDe. Use this SerDe only for final output resultSets.
+ * It is used if HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS is set to true. It buffers rows that come in from FileSink till it reaches max_buffer_size (also configurable)
+ * or all rows are finished and FileSink.closeOp() is called.
+ */
+public class ThriftJDBCBinarySerDe extends AbstractSerDe {
+ public static final Logger LOG = LoggerFactory.getLogger(ThriftJDBCBinarySerDe.class.getName());
+ private List<String> columnNames;
+ private List<TypeInfo> columnTypes;
+ private ColumnBuffer[] columnBuffers;
+ private TypeInfo rowTypeInfo;
+ private ArrayList<Object> row;
+ private BytesWritable serializedBytesWritable = new BytesWritable();
+ private ByteStream.Output output = new ByteStream.Output();
+ private TProtocol protocol = new TCompactProtocol(new TIOStreamTransport(output));
+ private ThriftFormatter thriftFormatter = new ThriftFormatter();
+ private int MAX_BUFFERED_ROWS;
+ private int count;
+ private StructObjectInspector rowObjectInspector;
+
+
+ @Override
+ public void initialize(Configuration conf, Properties tbl) throws SerDeException {
+ // Get column names
+ MAX_BUFFERED_ROWS = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_MAX_FETCH_SIZE);
+ String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS);
+ String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
+ if (columnNameProperty.length() == 0) {
+ columnNames = new ArrayList<String>();
+ } else {
+ columnNames = Arrays.asList(columnNameProperty.split(","));
+ }
+ if (columnTypeProperty.length() == 0) {
+ columnTypes = new ArrayList<TypeInfo>();
+ } else {
+ columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
+ }
+ rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes);
+ rowObjectInspector =
+ (StructObjectInspector) TypeInfoUtils
+ .getStandardWritableObjectInspectorFromTypeInfo(rowTypeInfo);
+
+ initializeRowAndColumns();
+ try {
+ thriftFormatter.initialize(conf, tbl);
+ } catch (Exception e) {
+ new SerDeException(e);
+ }
+ }
+
+ @Override
+ public Class<? extends Writable> getSerializedClass() {
+ return BytesWritable.class;
+ }
+
+ private Writable serializeBatch() throws SerDeException {
+ output.reset();
+ for (int i = 0; i < columnBuffers.length; i++) {
+ TColumn tColumn = columnBuffers[i].toTColumn();
+ try {
+ tColumn.write(protocol);
+ } catch(TException e) {
+ throw new SerDeException(e);
+ }
+ }
+ initializeRowAndColumns();
+ serializedBytesWritable.set(output.getData(), 0, output.getLength());
+ return serializedBytesWritable;
+ }
+
+ // use the columnNames to initialize the reusable row object and the columnBuffers. reason this is being done is if buffer is full, we should reinitialize the
+ // column buffers, otherwise at the end when closeOp() is called, things get printed multiple times.
+ private void initializeRowAndColumns() {
+ row = new ArrayList<Object>(columnNames.size());
+ for (int i = 0; i < columnNames.size(); i++) {
+ row.add(null);
+ }
+ // Initialize column buffers
+ columnBuffers = new ColumnBuffer[columnNames.size()];
+ for (int i = 0; i < columnBuffers.length; i++) {
+ columnBuffers[i] = new ColumnBuffer(Type.getType(columnTypes.get(i)));
+ }
+ }
+
+ /**
+ * Write TColumn objects to the underlying stream of TProtocol
+ */
+ @Override
+ public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException {
+ //if row is null, it means there are no more rows (closeOp()). another case can be that the buffer is full.
+ if (obj == null)
+ return serializeBatch();
+ count += 1;
+ StructObjectInspector soi = (StructObjectInspector) objInspector;
+ List<? extends StructField> fields = soi.getAllStructFieldRefs();
+ try {
+ Object[] formattedRow = (Object[]) thriftFormatter.convert(obj, objInspector);
+ for (int i = 0; i < columnNames.size(); i++) {
+ columnBuffers[i].addValue(formattedRow[i]);
+ }
+ } catch (Exception e) {
+ throw new SerDeException(e);
+ }
+ if (count == MAX_BUFFERED_ROWS) {
+ count = 0;
+ return serializeBatch();
+ }
+ return null;
+ }
+
+ @Override
+ public SerDeStats getSerDeStats() {
+ return null;
+ }
+
+ /**
+ * Return the bytes from this writable blob.
+ * Eventually the client of this method will interpret the byte using the Thrift Protocol
+ */
+ @Override
+ public Object deserialize(Writable blob) throws SerDeException {
+ return ((BytesWritable) blob).getBytes();
+ }
+
+ @Override
+ public ObjectInspector getObjectInspector() throws SerDeException {
+ return rowObjectInspector;
+ }
+
+}

Search Discussions

Discussion Posts

Previous

Follow ups

Related Discussions

Discussion Navigation
viewthread | post
posts ‹ prev | 2 of 3 | next ›
Discussion Overview
groupcommits @
categorieshive, hadoop
postedApr 22, '16 at 7:23p
activeApr 22, '16 at 7:23p
posts3
users1
websitehive.apache.org

1 user in discussion

Vgumashta: 3 posts

People

Translate

site design / logo © 2021 Grokbase