FAQ
Repository: hive
Updated Branches:
   refs/heads/llap fc7343dd1 -> 7b9096a92


http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/llap-common/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java
deleted file mode 100644
index 4e000ff..0000000
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java
+++ /dev/null
@@ -1,155 +0,0 @@
-package org.apache.hadoop.hive.llap;
-
-import com.google.common.base.Preconditions;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.JobConf;
-
-import org.apache.hadoop.hive.llap.Row;
-import org.apache.hadoop.hive.llap.FieldDesc;
-import org.apache.hadoop.hive.llap.Schema;
-import org.apache.hadoop.hive.llap.TypeDesc;
-import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.hive.serde2.SerDe;
-import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.io.HiveCharWritable;
-import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
-import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class LlapRowRecordReader implements RecordReader<NullWritable, Row> {
-
- private static final Logger LOG = LoggerFactory.getLogger(LlapRowRecordReader.class);
-
- Configuration conf;
- RecordReader<NullWritable, Text> reader;
- Schema schema;
- SerDe serde;
- final Text textData = new Text();
-
- public LlapRowRecordReader(Configuration conf, Schema schema, RecordReader<NullWritable, Text> reader) {
- this.conf = conf;
- this.schema = schema;
- this.reader = reader;
- }
-
- @Override
- public void close() throws IOException {
- reader.close();
- }
-
- @Override
- public NullWritable createKey() {
- return NullWritable.get();
- }
-
- @Override
- public Row createValue() {
- return new Row(schema);
- }
-
- @Override
- public long getPos() throws IOException {
- return 0;
- }
-
- @Override
- public float getProgress() throws IOException {
- return 0;
- }
-
- @Override
- public boolean next(NullWritable key, Row value) throws IOException {
- Preconditions.checkArgument(value != null);
-
- if (serde == null) {
- try {
- serde = initSerDe(conf);
- } catch (SerDeException err) {
- throw new IOException(err);
- }
- }
-
- boolean hasNext = reader.next(key, textData);
- if (hasNext) {
- // Deserialize Text to column values, and populate the row record
- Object rowObj;
- try {
- StructObjectInspector rowOI = (StructObjectInspector) serde.getObjectInspector();
- rowObj = serde.deserialize(textData);
- List<? extends StructField> colFields = rowOI.getAllStructFieldRefs();
- for (int idx = 0; idx < colFields.size(); ++idx) {
- StructField field = colFields.get(idx);
- Object colValue = rowOI.getStructFieldData(rowObj, field);
- Preconditions.checkState(field.getFieldObjectInspector().getCategory() == Category.PRIMITIVE,
- "Cannot handle non-primitive column type " + field.getFieldObjectInspector().getTypeName());
-
- PrimitiveObjectInspector poi = (PrimitiveObjectInspector) field.getFieldObjectInspector();
- // char/varchar special cased here since the row record handles them using Text
- switch (poi.getPrimitiveCategory()) {
- case CHAR:
- value.setValue(idx, ((HiveCharWritable) poi.getPrimitiveWritableObject(colValue)).getPaddedValue());
- break;
- case VARCHAR:
- value.setValue(idx, ((HiveVarcharWritable) poi.getPrimitiveWritableObject(colValue)).getTextValue());
- break;
- default:
- value.setValue(idx, (Writable) poi.getPrimitiveWritableObject(colValue));
- break;
- }
- }
- } catch (SerDeException err) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Error deserializing row from text: " + textData);
- }
- throw new IOException("Error deserializing row data", err);
- }
- }
-
- return hasNext;
- }
-
- public Schema getSchema() {
- return schema;
- }
-
- protected SerDe initSerDe(Configuration conf) throws SerDeException {
- Properties props = new Properties();
- StringBuffer columnsBuffer = new StringBuffer();
- StringBuffer typesBuffer = new StringBuffer();
- boolean isFirst = true;
- for (FieldDesc colDesc : schema.getColumns()) {
- if (!isFirst) {
- columnsBuffer.append(',');
- typesBuffer.append(',');
- }
- columnsBuffer.append(colDesc.getName());
- typesBuffer.append(colDesc.getTypeDesc().toString());
- isFirst = false;
- }
- String columns = columnsBuffer.toString();
- String types = typesBuffer.toString();
- props.put(serdeConstants.LIST_COLUMNS, columns);
- props.put(serdeConstants.LIST_COLUMN_TYPES, types);
- SerDe serde = new LazySimpleSerDe();
- serde.initialize(conf, props);
-
- return serde;
- }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/llap-ext-client/pom.xml
----------------------------------------------------------------------
diff --git a/llap-ext-client/pom.xml b/llap-ext-client/pom.xml
new file mode 100644
index 0000000..5a7e385
--- /dev/null
+++ b/llap-ext-client/pom.xml
@@ -0,0 +1,140 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 " rel="nofollow">http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive</artifactId>
+ <version>2.1.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>hive-llap-ext-client</artifactId>
+ <packaging>jar</packaging>
+ <name>Hive Llap External Client</name>
+
+ <properties>
+ <hive.path.to.root>..</hive.path.to.root>
+ </properties>
+
+ <dependencies>
+ <!-- dependencies are always listed in sorted order by groupId, artifactId -->
+ <!-- intra-project -->
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-jdbc</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-llap-client</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <!-- inter-project -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ <optional>true</optional>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commmons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>${hadoop.version}</version>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-registry</artifactId>
+ <version>${hadoop.version}</version>
+ <optional>true</optional>
+ </dependency>
+ <!-- test inter-project -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${junit.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>${commons-lang3.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <version>${mockito-all.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commmons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+ <build>
+ <sourceDirectory>${basedir}/src/java</sourceDirectory>
+ <testSourceDirectory>${basedir}/src/test</testSourceDirectory>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/gen/protobuf/gen-java</source>
+ <source>src/gen/thrift/gen-javabean</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
new file mode 100644
index 0000000..61eb2ea
--- /dev/null
+++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import java.sql.SQLException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.sql.DriverManager;
+
+import java.io.IOException;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataInputStream;
+import java.io.ByteArrayInputStream;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.InputSplitWithLocationInfo;
+import org.apache.hadoop.mapred.SplitLocationInfo;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hive.llap.ext.LlapInputSplit;
+
+import com.google.common.base.Preconditions;
+
+public class LlapBaseInputFormat<V extends WritableComparable> implements InputFormat<NullWritable, V> {
+
+ private static String driverName = "org.apache.hive.jdbc.HiveDriver";
+ private String url; // "jdbc:hive2://localhost:10000/default"
+ private String user; // "hive",
+ private String pwd; // ""
+ private String query;
+
+ public static final String URL_KEY = "llap.if.hs2.connection";
+ public static final String QUERY_KEY = "llap.if.query";
+ public static final String USER_KEY = "llap.if.user";
+ public static final String PWD_KEY = "llap.if.pwd";
+
+ public final String SPLIT_QUERY = "select get_splits(\"%s\",%d)";
+
+ private Connection con;
+ private Statement stmt;
+
+ public LlapBaseInputFormat(String url, String user, String pwd, String query) {
+ this.url = url;
+ this.user = user;
+ this.pwd = pwd;
+ this.query = query;
+ }
+
+ public LlapBaseInputFormat() {}
+
+
+ @Override
+ public RecordReader<NullWritable, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
+ LlapInputSplit llapSplit = (LlapInputSplit) split;
+ return llapSplit.getInputFormat().getRecordReader(llapSplit.getSplit(), job, reporter);
+ }
+
+ @Override
+ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+ List<InputSplit> ins = new ArrayList<InputSplit>();
+
+ if (url == null) url = job.get(URL_KEY);
+ if (query == null) query = job.get(QUERY_KEY);
+ if (user == null) user = job.get(USER_KEY);
+ if (pwd == null) pwd = job.get(PWD_KEY);
+
+ if (url == null || query == null) {
+ throw new IllegalStateException();
+ }
+
+ try {
+ Class.forName(driverName);
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+
+ try {
+ con = DriverManager.getConnection(url,user,pwd);
+ stmt = con.createStatement();
+ String sql = String.format(SPLIT_QUERY, query, numSplits);
+ ResultSet res = stmt.executeQuery(sql);
+ while (res.next()) {
+ // deserialize split
+ DataInput in = new DataInputStream(res.getBinaryStream(3));
+ InputSplitWithLocationInfo is = (InputSplitWithLocationInfo)Class.forName(res.getString(2)).newInstance();
+ is.readFields(in);
+ ins.add(new LlapInputSplit(is, res.getString(1)));
+ }
+
+ res.close();
+ stmt.close();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ return ins.toArray(new InputSplit[ins.size()]);
+ }
+
+ public void close() {
+ try {
+ con.close();
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapDump.java
----------------------------------------------------------------------
diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapDump.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapDump.java
new file mode 100644
index 0000000..ce419af
--- /dev/null
+++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapDump.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap;
+
+import java.io.OutputStream;
+import java.io.InputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.FileInputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.io.RCFile.Reader;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.hive.llap.io.api.LlapProxy;
+import org.apache.hadoop.hive.llap.LlapBaseRecordReader;
+import org.apache.hadoop.hive.llap.Schema;
+
+import org.apache.hadoop.hive.llap.LlapBaseInputFormat;
+
+public class LlapDump {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LlapDump.class);
+
+ private static String url = "jdbc:hive2://localhost:10000/default";
+ private static String user = "hive";
+ private static String pwd = "";
+ private static String query = "select * from test";
+ private static String numSplits = "1";
+
+ public static void main(String[] args) throws Exception {
+ Options opts = createOptions();
+ CommandLine cli = new GnuParser().parse(opts, args);
+
+ if (cli.hasOption('h')) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("orcfiledump", opts);
+ return;
+ }
+
+ if (cli.hasOption('l')) {
+ url = cli.getOptionValue("l");
+ }
+
+ if (cli.hasOption('u')) {
+ user = cli.getOptionValue("u");
+ }
+
+ if (cli.hasOption('p')) {
+ pwd = cli.getOptionValue("p");
+ }
+
+ if (cli.hasOption('n')) {
+ numSplits = cli.getOptionValue("n");
+ }
+
+ if (cli.getArgs().length > 0) {
+ query = cli.getArgs()[0];
+ }
+
+ System.out.println("url: "+url);
+ System.out.println("user: "+user);
+ System.out.println("query: "+query);
+
+ LlapBaseInputFormat format = new LlapBaseInputFormat(url, user, pwd, query);
+ JobConf job = new JobConf();
+
+ InputSplit[] splits = format.getSplits(job, Integer.parseInt(numSplits));
+
+ if (splits.length == 0) {
+ System.out.println("No splits returned - empty scan");
+ System.out.println("Results: ");
+ } else {
+ boolean first = true;
+
+ for (InputSplit s: splits) {
+ LOG.info("Processing input split s from " + Arrays.toString(s.getLocations()));
+ RecordReader<NullWritable, Text> reader = format.getRecordReader(s, job, null);
+
+ if (reader instanceof LlapBaseRecordReader && first) {
+ Schema schema = ((LlapBaseRecordReader)reader).getSchema();
+ System.out.println(""+schema);
+ }
+
+ if (first) {
+ System.out.println("Results: ");
+ System.out.println("");
+ first = false;
+ }
+
+ Text value = reader.createValue();
+ while (reader.next(NullWritable.get(), value)) {
+ System.out.println(value);
+ }
+ }
+ System.exit(0);
+ }
+ }
+
+ static Options createOptions() {
+ Options result = new Options();
+
+ result.addOption(OptionBuilder
+ .withLongOpt("location")
+ .withDescription("HS2 url")
+ .hasArg()
+ .create('l'));
+
+ result.addOption(OptionBuilder
+ .withLongOpt("user")
+ .withDescription("user name")
+ .hasArg()
+ .create('u'));
+
+ result.addOption(OptionBuilder
+ .withLongOpt("pwd")
+ .withDescription("password")
+ .hasArg()
+ .create('p'));
+
+ result.addOption(OptionBuilder
+ .withLongOpt("num")
+ .withDescription("number of splits")
+ .hasArg()
+ .create('n'));
+
+ return result;
+ }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapRowInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapRowInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapRowInputFormat.java
new file mode 100644
index 0000000..6ecb0f9
--- /dev/null
+++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapRowInputFormat.java
@@ -0,0 +1,36 @@
+package org.apache.hadoop.hive.llap;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.llap.LlapBaseRecordReader;
+import org.apache.hadoop.hive.llap.LlapRowRecordReader;
+import org.apache.hadoop.hive.llap.Row;
+import org.apache.hadoop.hive.llap.Schema;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hive.llap.ext.LlapInputSplit;
+
+
+public class LlapRowInputFormat implements InputFormat<NullWritable, Row> {
+ LlapBaseInputFormat<Text> baseInputFormat = new LlapBaseInputFormat<Text>();
+
+ @Override
+ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+ return baseInputFormat.getSplits(job, numSplits);
+ }
+
+ @Override
+ public RecordReader<NullWritable, Row> getRecordReader(InputSplit split, JobConf job, Reporter reporter)
+ throws IOException {
+ LlapInputSplit<Text> llapSplit = (LlapInputSplit<Text>) split;
+ LlapBaseRecordReader<Text> reader = (LlapBaseRecordReader<Text>) baseInputFormat.getRecordReader(llapSplit, job, reporter);
+ return new LlapRowRecordReader(job, reader.getSchema(), reader);
+ }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/llap-ext-client/src/java/org/apache/hive/llap/ext/LlapInputSplit.java
----------------------------------------------------------------------
diff --git a/llap-ext-client/src/java/org/apache/hive/llap/ext/LlapInputSplit.java b/llap-ext-client/src/java/org/apache/hive/llap/ext/LlapInputSplit.java
new file mode 100644
index 0000000..d8881c4
--- /dev/null
+++ b/llap-ext-client/src/java/org/apache/hive/llap/ext/LlapInputSplit.java
@@ -0,0 +1,73 @@
+package org.apache.hive.llap.ext;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.InputSplitWithLocationInfo;
+import org.apache.hadoop.mapred.SplitLocationInfo;
+
+
+public class LlapInputSplit<V extends WritableComparable> implements InputSplitWithLocationInfo {
+ InputSplitWithLocationInfo nativeSplit;
+ String inputFormatClassName;
+
+ public LlapInputSplit() {
+ }
+
+ public LlapInputSplit(InputSplitWithLocationInfo nativeSplit, String inputFormatClassName) {
+ this.nativeSplit = nativeSplit;
+ this.inputFormatClassName = inputFormatClassName;
+ }
+
+ @Override
+ public long getLength() throws IOException {
+ return nativeSplit.getLength();
+ }
+
+ @Override
+ public String[] getLocations() throws IOException {
+ return nativeSplit.getLocations();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeUTF(inputFormatClassName);
+ out.writeUTF(nativeSplit.getClass().getName());
+ nativeSplit.write(out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ inputFormatClassName = in.readUTF();
+ String splitClass = in.readUTF();
+ try {
+ nativeSplit = (InputSplitWithLocationInfo)Class.forName(splitClass).newInstance();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ nativeSplit.readFields(in);
+ }
+
+ @Override
+ public SplitLocationInfo[] getLocationInfo() throws IOException {
+ return nativeSplit.getLocationInfo();
+ }
+
+ public InputSplit getSplit() {
+ return nativeSplit;
+ }
+
+ public InputFormat<NullWritable, V> getInputFormat() throws IOException {
+ try {
+ return (InputFormat<NullWritable, V>) Class.forName(inputFormatClassName)
+ .newInstance();
+ } catch(Exception e) {
+ throw new IOException(e);
+ }
+ }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
deleted file mode 100644
index 0930d60..0000000
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
+++ /dev/null
@@ -1,392 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.llap;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
-
-import org.apache.commons.collections4.ListUtils;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.llap.LlapBaseRecordReader.ReaderEvent;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
-import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient;
-import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient.LlapTaskUmbilicalExternalResponder;
-import org.apache.hadoop.hive.llap.registry.ServiceInstance;
-import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
-import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
-import org.apache.hadoop.hive.llap.tez.Converters;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.tez.common.security.JobTokenIdentifier;
-import org.apache.tez.common.security.TokenCache;
-import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
-import org.apache.tez.runtime.api.impl.TaskSpec;
-import org.apache.tez.runtime.api.impl.TezEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.runtime.api.impl.EventType;
-import org.apache.tez.runtime.api.impl.TezEvent;
-import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
-import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
-
-
-public class LlapInputFormat<V extends WritableComparable> implements InputFormat<NullWritable, V> {
-
- private static final Logger LOG = LoggerFactory.getLogger(LlapInputFormat.class);
-
- public LlapInputFormat() {
- }
-
- /*
- * This proxy record reader has the duty of establishing a connected socket with LLAP, then fire
- * off the work in the split to LLAP and finally return the connected socket back in an
- * LlapRecordReader. The LlapRecordReader class reads the results from the socket.
- */
- @Override
- public RecordReader<NullWritable, V> getRecordReader(InputSplit split, JobConf job,
- Reporter reporter) throws IOException {
-
- LlapInputSplit llapSplit = (LlapInputSplit) split;
-
- // Set conf to use LLAP user rather than current user for LLAP Zk registry.
- HiveConf.setVar(job, HiveConf.ConfVars.LLAP_ZK_REGISTRY_USER, llapSplit.getLlapUser());
- SubmitWorkInfo submitWorkInfo = SubmitWorkInfo.fromBytes(llapSplit.getPlanBytes());
-
- ServiceInstance serviceInstance = getServiceInstance(job, llapSplit);
- String host = serviceInstance.getHost();
- int llapSubmitPort = serviceInstance.getRpcPort();
-
- LOG.info("Found service instance for host " + host + " with rpc port " + llapSubmitPort
- + " and outputformat port " + serviceInstance.getOutputFormatPort());
-
- LlapRecordReaderTaskUmbilicalExternalResponder umbilicalResponder =
- new LlapRecordReaderTaskUmbilicalExternalResponder();
- LlapTaskUmbilicalExternalClient llapClient =
- new LlapTaskUmbilicalExternalClient(job, submitWorkInfo.getTokenIdentifier(),
- submitWorkInfo.getToken(), umbilicalResponder);
- llapClient.init(job);
- llapClient.start();
-
- SubmitWorkRequestProto submitWorkRequestProto =
- constructSubmitWorkRequestProto(submitWorkInfo, llapSplit.getSplitNum(),
- llapClient.getAddress(), submitWorkInfo.getToken());
-
- TezEvent tezEvent = new TezEvent();
- DataInputBuffer dib = new DataInputBuffer();
- dib.reset(llapSplit.getFragmentBytes(), 0, llapSplit.getFragmentBytes().length);
- tezEvent.readFields(dib);
- List<TezEvent> tezEventList = Lists.newArrayList();
- tezEventList.add(tezEvent);
-
- llapClient.submitWork(submitWorkRequestProto, host, llapSubmitPort, tezEventList);
-
- String id = HiveConf.getVar(job, HiveConf.ConfVars.HIVEQUERYID) + "_" + llapSplit.getSplitNum();
-
- HiveConf conf = new HiveConf();
- Socket socket = new Socket(host,
- serviceInstance.getOutputFormatPort());
-
- LOG.debug("Socket connected");
-
- socket.getOutputStream().write(id.getBytes());
- socket.getOutputStream().write(0);
- socket.getOutputStream().flush();
-
- LOG.info("Registered id: " + id);
-
- LlapBaseRecordReader recordReader = new LlapBaseRecordReader(socket.getInputStream(), llapSplit.getSchema(), Text.class);
- umbilicalResponder.setRecordReader(recordReader);
- return recordReader;
- }
-
- @Override
- public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
- throw new IOException("These are not the splits you are looking for.");
- }
-
- private ServiceInstance getServiceInstance(JobConf job, LlapInputSplit llapSplit) throws IOException {
- LlapRegistryService registryService = LlapRegistryService.getClient(job);
- String host = llapSplit.getLocations()[0];
-
- ServiceInstance serviceInstance = getServiceInstanceForHost(registryService, host);
- if (serviceInstance == null) {
- throw new IOException("No service instances found for " + host + " in registry");
- }
-
- return serviceInstance;
- }
-
- private ServiceInstance getServiceInstanceForHost(LlapRegistryService registryService, String host) throws IOException {
- InetAddress address = InetAddress.getByName(host);
- ServiceInstanceSet instanceSet = registryService.getInstances();
- ServiceInstance serviceInstance = null;
-
- // The name used in the service registry may not match the host name we're using.
- // Try hostname/canonical hostname/host address
-
- String name = address.getHostName();
- LOG.info("Searching service instance by hostname " + name);
- serviceInstance = selectServiceInstance(instanceSet.getByHost(name));
- if (serviceInstance != null) {
- return serviceInstance;
- }
-
- name = address.getCanonicalHostName();
- LOG.info("Searching service instance by canonical hostname " + name);
- serviceInstance = selectServiceInstance(instanceSet.getByHost(name));
- if (serviceInstance != null) {
- return serviceInstance;
- }
-
- name = address.getHostAddress();
- LOG.info("Searching service instance by address " + name);
- serviceInstance = selectServiceInstance(instanceSet.getByHost(name));
- if (serviceInstance != null) {
- return serviceInstance;
- }
-
- return serviceInstance;
- }
-
- private ServiceInstance selectServiceInstance(Set<ServiceInstance> serviceInstances) {
- if (serviceInstances == null || serviceInstances.isEmpty()) {
- return null;
- }
-
- // Get the first live service instance
- for (ServiceInstance serviceInstance : serviceInstances) {
- if (serviceInstance.isAlive()) {
- return serviceInstance;
- }
- }
-
- LOG.info("No live service instances were found");
- return null;
- }
-
- private SubmitWorkRequestProto constructSubmitWorkRequestProto(SubmitWorkInfo submitWorkInfo,
- int taskNum,
- InetSocketAddress address,
- Token<JobTokenIdentifier> token) throws
- IOException {
- TaskSpec taskSpec = submitWorkInfo.getTaskSpec();
- ApplicationId appId = submitWorkInfo.getFakeAppId();
-
- SubmitWorkRequestProto.Builder builder = SubmitWorkRequestProto.newBuilder();
- // This works, assuming the executor is running within YARN.
- LOG.info("Setting user in submitWorkRequest to: " +
- System.getenv(ApplicationConstants.Environment.USER.name()));
- builder.setUser(System.getenv(ApplicationConstants.Environment.USER.name()));
- builder.setApplicationIdString(appId.toString());
- builder.setAppAttemptNumber(0);
- builder.setTokenIdentifier(appId.toString());
-
- ContainerId containerId =
- ContainerId.newInstance(ApplicationAttemptId.newInstance(appId, 0), taskNum);
- builder.setContainerIdString(containerId.toString());
-
- builder.setAmHost(address.getHostName());
- builder.setAmPort(address.getPort());
- Credentials taskCredentials = new Credentials();
- // Credentials can change across DAGs. Ideally construct only once per DAG.
- // TODO Figure out where credentials will come from. Normally Hive sets up
- // URLs on the tez dag, for which Tez acquires credentials.
-
- // taskCredentials.addAll(getContext().getCredentials());
-
- // Preconditions.checkState(currentQueryIdentifierProto.getDagIdentifier() ==
- // taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId());
- // ByteBuffer credentialsBinary = credentialMap.get(currentQueryIdentifierProto);
- // if (credentialsBinary == null) {
- // credentialsBinary = serializeCredentials(getContext().getCredentials());
- // credentialMap.putIfAbsent(currentQueryIdentifierProto, credentialsBinary.duplicate());
- // } else {
- // credentialsBinary = credentialsBinary.duplicate();
- // }
- // builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary));
- Credentials credentials = new Credentials();
- TokenCache.setSessionToken(token, credentials);
- ByteBuffer credentialsBinary = serializeCredentials(credentials);
- builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary));
-
-
- builder.setFragmentSpec(Converters.convertTaskSpecToProto(taskSpec));
-
- FragmentRuntimeInfo.Builder runtimeInfo = FragmentRuntimeInfo.newBuilder();
- runtimeInfo.setCurrentAttemptStartTime(System.currentTimeMillis());
- runtimeInfo.setWithinDagPriority(0);
- runtimeInfo.setDagStartTime(submitWorkInfo.getCreationTime());
- runtimeInfo.setFirstAttemptStartTime(submitWorkInfo.getCreationTime());
- runtimeInfo.setNumSelfAndUpstreamTasks(taskSpec.getVertexParallelism());
- runtimeInfo.setNumSelfAndUpstreamCompletedTasks(0);
-
-
- builder.setUsingTezAm(false);
- builder.setFragmentRuntimeInfo(runtimeInfo.build());
- return builder.build();
- }
-
- private ByteBuffer serializeCredentials(Credentials credentials) throws IOException {
- Credentials containerCredentials = new Credentials();
- containerCredentials.addAll(credentials);
- DataOutputBuffer containerTokens_dob = new DataOutputBuffer();
- containerCredentials.writeTokenStorageToStream(containerTokens_dob);
- return ByteBuffer.wrap(containerTokens_dob.getData(), 0, containerTokens_dob.getLength());
- }
-
- private static class LlapRecordReaderTaskUmbilicalExternalResponder implements LlapTaskUmbilicalExternalResponder {
- protected LlapBaseRecordReader recordReader = null;
- protected LinkedBlockingQueue<ReaderEvent> queuedEvents = new LinkedBlockingQueue<ReaderEvent>();
-
- public LlapRecordReaderTaskUmbilicalExternalResponder() {
- }
-
- @Override
- public void submissionFailed(String fragmentId, Throwable throwable) {
- try {
- sendOrQueueEvent(ReaderEvent.errorEvent(
- "Received submission failed event for fragment ID " + fragmentId));
- } catch (Exception err) {
- LOG.error("Error during heartbeat responder:", err);
- }
- }
-
- @Override
- public void heartbeat(TezHeartbeatRequest request) {
- TezTaskAttemptID taskAttemptId = request.getCurrentTaskAttemptID();
- List<TezEvent> inEvents = request.getEvents();
- for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) {
- EventType eventType = tezEvent.getEventType();
- try {
- switch (eventType) {
- case TASK_ATTEMPT_COMPLETED_EVENT:
- sendOrQueueEvent(ReaderEvent.doneEvent());
- break;
- case TASK_ATTEMPT_FAILED_EVENT:
- TaskAttemptFailedEvent taskFailedEvent = (TaskAttemptFailedEvent) tezEvent.getEvent();
- sendOrQueueEvent(ReaderEvent.errorEvent(taskFailedEvent.getDiagnostics()));
- break;
- case TASK_STATUS_UPDATE_EVENT:
- // If we want to handle counters
- break;
- default:
- LOG.warn("Unhandled event type " + eventType);
- break;
- }
- } catch (Exception err) {
- LOG.error("Error during heartbeat responder:", err);
- }
- }
- }
-
- @Override
- public void taskKilled(TezTaskAttemptID taskAttemptId) {
- try {
- sendOrQueueEvent(ReaderEvent.errorEvent(
- "Received task killed event for task ID " + taskAttemptId));
- } catch (Exception err) {
- LOG.error("Error during heartbeat responder:", err);
- }
- }
-
- @Override
- public void heartbeatTimeout(String taskAttemptId) {
- try {
- sendOrQueueEvent(ReaderEvent.errorEvent(
- "Timed out waiting for heartbeat for task ID " + taskAttemptId));
- } catch (Exception err) {
- LOG.error("Error during heartbeat responder:", err);
- }
- }
-
- public synchronized LlapBaseRecordReader getRecordReader() {
- return recordReader;
- }
-
- public synchronized void setRecordReader(LlapBaseRecordReader recordReader) {
- this.recordReader = recordReader;
-
- if (recordReader == null) {
- return;
- }
-
- // If any events were queued by the responder, give them to the record reader now.
- while (!queuedEvents.isEmpty()) {
- ReaderEvent readerEvent = queuedEvents.poll();
- LOG.debug("Sending queued event to record reader: " + readerEvent.getEventType());
- recordReader.handleEvent(readerEvent);
- }
- }
-
- /**
- * Send the ReaderEvents to the record reader, if it is registered to this responder.
- * If there is no registered record reader, add them to a list of pending reader events
- * since we don't want to drop these events.
- * @param readerEvent
- */
- protected synchronized void sendOrQueueEvent(ReaderEvent readerEvent) {
- LlapBaseRecordReader recordReader = getRecordReader();
- if (recordReader != null) {
- recordReader.handleEvent(readerEvent);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("No registered record reader, queueing event " + readerEvent.getEventType()
- + " with message " + readerEvent.getMessage());
- }
-
- try {
- queuedEvents.put(readerEvent);
- } catch (Exception err) {
- throw new RuntimeException("Unexpected exception while queueing reader event", err);
- }
- }
- }
-
- /**
- * Clear the list of queued reader events if we are not interested in sending any pending events to any registering record reader.
- */
- public void clearQueuedEvents() {
- queuedEvents.clear();
- }
- }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java b/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
deleted file mode 100644
index 7d06637..0000000
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
+++ /dev/null
@@ -1,415 +0,0 @@
-package org.apache.hadoop.hive.llap.ext;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.base.Preconditions;
-import org.apache.commons.collections4.ListUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
-import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
-import org.apache.hadoop.hive.llap.tez.LlapProtocolClientProxy;
-import org.apache.hadoop.hive.llap.tezplugins.helpers.LlapTaskUmbilicalServer;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.ipc.ProtocolSignature;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.tez.common.security.JobTokenIdentifier;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.impl.EventType;
-import org.apache.tez.runtime.api.impl.TezEvent;
-import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
-import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class LlapTaskUmbilicalExternalClient extends AbstractService {
-
- private static final Logger LOG = LoggerFactory.getLogger(LlapTaskUmbilicalExternalClient.class);
-
- private final LlapProtocolClientProxy communicator;
- private volatile LlapTaskUmbilicalServer llapTaskUmbilicalServer;
- private final Configuration conf;
- private final LlapTaskUmbilicalProtocol umbilical;
-
- protected final String tokenIdentifier;
- protected final Token<JobTokenIdentifier> sessionToken;
-
- private final ConcurrentMap<String, PendingEventData> pendingEvents = new ConcurrentHashMap<>();
- private final ConcurrentMap<String, TaskHeartbeatInfo> registeredTasks= new ConcurrentHashMap<String, TaskHeartbeatInfo>();
- private LlapTaskUmbilicalExternalResponder responder = null;
- private final ScheduledThreadPoolExecutor timer;
- private final long connectionTimeout;
-
- private static class TaskHeartbeatInfo {
- final String taskAttemptId;
- final String hostname;
- final int port;
- final AtomicLong lastHeartbeat = new AtomicLong();
-
- public TaskHeartbeatInfo(String taskAttemptId, String hostname, int port) {
- this.taskAttemptId = taskAttemptId;
- this.hostname = hostname;
- this.port = port;
- this.lastHeartbeat.set(System.currentTimeMillis());
- }
- }
-
- private static class PendingEventData {
- final TaskHeartbeatInfo heartbeatInfo;
- final List<TezEvent> tezEvents;
-
- public PendingEventData(TaskHeartbeatInfo heartbeatInfo, List<TezEvent> tezEvents) {
- this.heartbeatInfo = heartbeatInfo;
- this.tezEvents = tezEvents;
- }
- }
-
- // TODO KKK Work out the details of the tokenIdentifier, and the session token.
- // It may just be possible to create one here - since Shuffle is not involved, and this is only used
- // for communication from LLAP-Daemons to the server. It will need to be sent in as part
- // of the job submission request.
- public LlapTaskUmbilicalExternalClient(Configuration conf, String tokenIdentifier,
- Token<JobTokenIdentifier> sessionToken, LlapTaskUmbilicalExternalResponder responder) {
- super(LlapTaskUmbilicalExternalClient.class.getName());
- this.conf = conf;
- this.umbilical = new LlapTaskUmbilicalExternalImpl();
- this.tokenIdentifier = tokenIdentifier;
- this.sessionToken = sessionToken;
- this.responder = responder;
- this.timer = new ScheduledThreadPoolExecutor(1);
- this.connectionTimeout = HiveConf.getTimeVar(conf,
- HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS);
- // TODO. No support for the LLAP token yet. Add support for configurable threads, however 1 should always be enough.
- this.communicator = new LlapProtocolClientProxy(1, conf, null);
- this.communicator.init(conf);
- }
-
- @Override
- public void serviceStart() throws IOException {
- int numHandlers = HiveConf.getIntVar(conf,
- HiveConf.ConfVars.LLAP_TMP_EXT_CLIENT_NUM_SERVER_HANDLERS);
- llapTaskUmbilicalServer = new LlapTaskUmbilicalServer(conf, umbilical, numHandlers, tokenIdentifier, sessionToken);
- communicator.start();
- }
-
- @Override
- public void serviceStop() {
- llapTaskUmbilicalServer.shutdownServer();
- timer.shutdown();
- if (this.communicator != null) {
- this.communicator.stop();
- }
- }
-
- public InetSocketAddress getAddress() {
- return llapTaskUmbilicalServer.getAddress();
- }
-
-
- /**
- * Submit the work for actual execution. This should always have the usingTezAm flag disabled
- * @param submitWorkRequestProto
- */
- public void submitWork(final SubmitWorkRequestProto submitWorkRequestProto, String llapHost, int llapPort, List<TezEvent> tezEvents) {
- Preconditions.checkArgument(submitWorkRequestProto.getUsingTezAm() == false);
-
- // Register the pending events to be sent for this spec.
- String fragmentId = submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString();
- PendingEventData pendingEventData = new PendingEventData(
- new TaskHeartbeatInfo(fragmentId, llapHost, llapPort),
- tezEvents);
- pendingEvents.putIfAbsent(fragmentId, pendingEventData);
-
- // Setup timer task to check for hearbeat timeouts
- timer.scheduleAtFixedRate(new HeartbeatCheckTask(),
- connectionTimeout, connectionTimeout, TimeUnit.MILLISECONDS);
-
- // Send out the actual SubmitWorkRequest
- communicator.sendSubmitWork(submitWorkRequestProto, llapHost, llapPort,
- new LlapProtocolClientProxy.ExecuteRequestCallback<LlapDaemonProtocolProtos.SubmitWorkResponseProto>() {
-
- @Override
- public void setResponse(LlapDaemonProtocolProtos.SubmitWorkResponseProto response) {
- if (response.hasSubmissionState()) {
- if (response.getSubmissionState().equals(LlapDaemonProtocolProtos.SubmissionStateProto.REJECTED)) {
- String msg = "Fragment: " + submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString() + " rejected. Server Busy.";
- LOG.info(msg);
- if (responder != null) {
- Throwable err = new RuntimeException(msg);
- responder.submissionFailed(submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString(), err);
- }
- return;
- }
- }
- }
-
- @Override
- public void indicateError(Throwable t) {
- String msg = "Failed to submit: " + submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString();
- LOG.error(msg, t);
- Throwable err = new RuntimeException(msg, t);
- responder.submissionFailed(submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString(), err);
- }
- });
-
-
-
-
-
-// // TODO Also send out information saying that the fragment is finishable - if that is not already included in the main fragment.
-// // This entire call is only required if we're doing more than scans. MRInput has no dependencies and is always finishable
-// QueryIdentifierProto queryIdentifier = QueryIdentifierProto
-// .newBuilder()
-// .setAppIdentifier(submitWorkRequestProto.getApplicationIdString()).setDagIdentifier(submitWorkRequestProto.getFragmentSpec().getDagId())
-// .build();
-// LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto sourceStateUpdatedRequest =
-// LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto.newBuilder().setQueryIdentifier(queryIdentifier).setState(
-// LlapDaemonProtocolProtos.SourceStateProto.S_SUCCEEDED).
-// setSrcName(TODO)
-// communicator.sendSourceStateUpdate(LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto.newBuilder().setQueryIdentifier(submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString()).set);
-
-
- }
-
- private void updateHeartbeatInfo(String taskAttemptId) {
- int updateCount = 0;
-
- PendingEventData pendingEventData = pendingEvents.get(taskAttemptId);
- if (pendingEventData != null) {
- pendingEventData.heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis());
- updateCount++;
- }
-
- TaskHeartbeatInfo heartbeatInfo = registeredTasks.get(taskAttemptId);
- if (heartbeatInfo != null) {
- heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis());
- updateCount++;
- }
-
- if (updateCount == 0) {
- LOG.warn("No tasks found for heartbeat from taskAttemptId " + taskAttemptId);
- }
- }
-
- private void updateHeartbeatInfo(String hostname, int port) {
- int updateCount = 0;
-
- for (String key : pendingEvents.keySet()) {
- PendingEventData pendingEventData = pendingEvents.get(key);
- if (pendingEventData != null) {
- if (pendingEventData.heartbeatInfo.hostname.equals(hostname)
- && pendingEventData.heartbeatInfo.port == port) {
- pendingEventData.heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis());
- updateCount++;
- }
- }
- }
-
- for (String key : registeredTasks.keySet()) {
- TaskHeartbeatInfo heartbeatInfo = registeredTasks.get(key);
- if (heartbeatInfo != null) {
- if (heartbeatInfo.hostname.equals(hostname)
- && heartbeatInfo.port == port) {
- heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis());
- updateCount++;
- }
- }
- }
-
- if (updateCount == 0) {
- LOG.info("No tasks found for heartbeat from hostname " + hostname + ", port " + port);
- }
- }
-
- private class HeartbeatCheckTask implements Runnable {
- public void run() {
- long currentTime = System.currentTimeMillis();
- List<String> timedOutTasks = new ArrayList<String>();
-
- // Check both pending and registered tasks for timeouts
- for (String key : pendingEvents.keySet()) {
- PendingEventData pendingEventData = pendingEvents.get(key);
- if (pendingEventData != null) {
- if (currentTime - pendingEventData.heartbeatInfo.lastHeartbeat.get() >= connectionTimeout) {
- timedOutTasks.add(key);
- }
- }
- }
- for (String timedOutTask : timedOutTasks) {
- LOG.info("Pending taskAttemptId " + timedOutTask + " timed out");
- responder.heartbeatTimeout(timedOutTask);
- pendingEvents.remove(timedOutTask);
- // TODO: Do we need to tell the LLAP daemon we are no longer interested in this task?
- }
-
- timedOutTasks.clear();
- for (String key : registeredTasks.keySet()) {
- TaskHeartbeatInfo heartbeatInfo = registeredTasks.get(key);
- if (heartbeatInfo != null) {
- if (currentTime - heartbeatInfo.lastHeartbeat.get() >= connectionTimeout) {
- timedOutTasks.add(key);
- }
- }
- }
- for (String timedOutTask : timedOutTasks) {
- LOG.info("Running taskAttemptId " + timedOutTask + " timed out");
- responder.heartbeatTimeout(timedOutTask);
- registeredTasks.remove(timedOutTask);
- // TODO: Do we need to tell the LLAP daemon we are no longer interested in this task?
- }
- }
- }
-
- public interface LlapTaskUmbilicalExternalResponder {
- void submissionFailed(String fragmentId, Throwable throwable);
- void heartbeat(TezHeartbeatRequest request);
- void taskKilled(TezTaskAttemptID taskAttemptId);
- void heartbeatTimeout(String fragmentId);
- }
-
-
-
- // TODO Ideally, the server should be shared across all client sessions running on the same node.
- private class LlapTaskUmbilicalExternalImpl implements LlapTaskUmbilicalProtocol {
-
- @Override
- public boolean canCommit(TezTaskAttemptID taskid) throws IOException {
- // Expecting only a single instance of a task to be running.
- return true;
- }
-
- @Override
- public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOException,
- TezException {
- // Keep-alive information. The client should be informed and will have to take care of re-submitting the work.
- // Some parts of fault tolerance go here.
-
- // This also provides completion information, and a possible notification when task actually starts running (first heartbeat)
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Received heartbeat from container, request=" + request);
- }
-
- // Incoming events can be ignored until the point when shuffle needs to be handled, instead of just scans.
- TezHeartbeatResponse response = new TezHeartbeatResponse();
-
- response.setLastRequestId(request.getRequestId());
- // Assuming TaskAttemptId and FragmentIdentifierString are the same. Verify this.
- TezTaskAttemptID taskAttemptId = request.getCurrentTaskAttemptID();
- String taskAttemptIdString = taskAttemptId.toString();
-
- updateHeartbeatInfo(taskAttemptIdString);
-
- List<TezEvent> tezEvents = null;
- PendingEventData pendingEventData = pendingEvents.remove(taskAttemptIdString);
- if (pendingEventData == null) {
- tezEvents = Collections.emptyList();
-
- // If this heartbeat was not from a pending event and it's not in our list of registered tasks,
- if (!registeredTasks.containsKey(taskAttemptIdString)) {
- LOG.info("Unexpected heartbeat from " + taskAttemptIdString);
- response.setShouldDie(); // Do any of the other fields need to be set?
- return response;
- }
- } else {
- tezEvents = pendingEventData.tezEvents;
- // Tasks removed from the pending list should then be added to the registered list.
- registeredTasks.put(taskAttemptIdString, pendingEventData.heartbeatInfo);
- }
-
- response.setLastRequestId(request.getRequestId());
- // Irrelevant from eventIds. This can be tracked in the AM itself, instead of polluting the task.
- // Also since we have all the MRInput events here - they'll all be sent in together.
- response.setNextFromEventId(0); // Irrelevant. See comment above.
- response.setNextPreRoutedEventId(0); //Irrelevant. See comment above.
- response.setEvents(tezEvents);
-
- List<TezEvent> inEvents = request.getEvents();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Heartbeat from " + taskAttemptIdString +
- " events: " + (inEvents != null ? inEvents.size() : -1));
- }
- for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) {
- EventType eventType = tezEvent.getEventType();
- switch (eventType) {
- case TASK_ATTEMPT_COMPLETED_EVENT:
- LOG.debug("Task completed event for " + taskAttemptIdString);
- registeredTasks.remove(taskAttemptIdString);
- break;
- case TASK_ATTEMPT_FAILED_EVENT:
- LOG.debug("Task failed event for " + taskAttemptIdString);
- registeredTasks.remove(taskAttemptIdString);
- break;
- case TASK_STATUS_UPDATE_EVENT:
- // If we want to handle counters
- LOG.debug("Task update event for " + taskAttemptIdString);
- break;
- default:
- LOG.warn("Unhandled event type " + eventType);
- break;
- }
- }
-
- // Pass the request on to the responder
- try {
- if (responder != null) {
- responder.heartbeat(request);
- }
- } catch (Exception err) {
- LOG.error("Error during responder execution", err);
- }
-
- return response;
- }
-
- @Override
- public void nodeHeartbeat(Text hostname, int port) throws IOException {
- updateHeartbeatInfo(hostname.toString(), port);
- // No need to propagate to this to the responder
- }
-
- @Override
- public void taskKilled(TezTaskAttemptID taskAttemptId) throws IOException {
- String taskAttemptIdString = taskAttemptId.toString();
- LOG.error("Task killed - " + taskAttemptIdString);
- registeredTasks.remove(taskAttemptIdString);
-
- try {
- if (responder != null) {
- responder.taskKilled(taskAttemptId);
- }
- } catch (Exception err) {
- LOG.error("Error during responder execution", err);
- }
- }
-
- @Override
- public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
- return 0;
- }
-
- @Override
- public ProtocolSignature getProtocolSignature(String protocol, long clientVersion,
- int clientMethodsHash) throws IOException {
- return ProtocolSignature.getProtocolSignature(this, protocol,
- clientVersion, clientMethodsHash);
- }
- }
-
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java
deleted file mode 100644
index dbd591a..0000000
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java
+++ /dev/null
@@ -1,57 +0,0 @@
-package org.apache.hadoop.hive.llap.tezplugins.helpers;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.token.Token;
-import org.apache.tez.common.security.JobTokenIdentifier;
-import org.apache.tez.common.security.JobTokenSecretManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class LlapTaskUmbilicalServer {
-
- private static final Logger LOG = LoggerFactory.getLogger(LlapTaskUmbilicalServer.class);
-
- protected volatile Server server;
- private final InetSocketAddress address;
- private final AtomicBoolean started = new AtomicBoolean(true);
-
- public LlapTaskUmbilicalServer(Configuration conf, LlapTaskUmbilicalProtocol umbilical, int numHandlers, String tokenIdentifier, Token<JobTokenIdentifier> token) throws
- IOException {
- JobTokenSecretManager jobTokenSecretManager =
- new JobTokenSecretManager();
- jobTokenSecretManager.addTokenForJob(tokenIdentifier, token);
-
- server = new RPC.Builder(conf)
- .setProtocol(LlapTaskUmbilicalProtocol.class)
- .setBindAddress("0.0.0.0")
- .setPort(0)
- .setInstance(umbilical)
- .setNumHandlers(numHandlers)
- .setSecretManager(jobTokenSecretManager).build();
-
- server.start();
- this.address = NetUtils.getConnectAddress(server);
- LOG.info(
- "Started TaskUmbilicalServer: " + umbilical.getClass().getName() + " at address: " + address +
- " with numHandlers=" + numHandlers);
- }
-
- public InetSocketAddress getAddress() {
- return this.address;
- }
-
- public void shutdownServer() {
- if (started.get()) { // Primarily to avoid multiple shutdowns.
- started.set(false);
- server.stop();
- }
- }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2337e89..f773d2f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -50,6 +50,7 @@
      <module>service</module>
      <module>llap-common</module>
      <module>llap-client</module>
+ <module>llap-ext-client</module>
      <module>llap-tez</module>
      <module>llap-server</module>
      <module>shims</module>

http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/ql/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
deleted file mode 100644
index 7073280..0000000
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.llap;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.DataInputStream;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.llap.Schema;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.JobConf;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class LlapBaseRecordReader<V extends WritableComparable> implements RecordReader<NullWritable, V> {
- private static final Logger LOG = LoggerFactory.getLogger(LlapBaseRecordReader.class);
-
- DataInputStream din;
- Schema schema;
- Class<V> clazz;
-
-
- protected Thread readerThread = null;
- protected LinkedBlockingQueue<ReaderEvent> readerEvents = new LinkedBlockingQueue<ReaderEvent>();
-
- public LlapBaseRecordReader(InputStream in, Schema schema, Class<V> clazz) {
- din = new DataInputStream(in);
- this.schema = schema;
- this.clazz = clazz;
- this.readerThread = Thread.currentThread();
- }
-
- public Schema getSchema() {
- return schema;
- }
-
- @Override
- public void close() throws IOException {
- din.close();
- }
-
- @Override
- public long getPos() { return 0; }
-
- @Override
- public float getProgress() { return 0f; }
-
- @Override
- public NullWritable createKey() {
- return NullWritable.get();
- }
-
- @Override
- public V createValue() {
- try {
- return clazz.newInstance();
- } catch (Exception e) {
- return null;
- }
- }
-
- @Override
- public boolean next(NullWritable key, V value) throws IOException {
- try {
- // Need a way to know what thread to interrupt, since this is a blocking thread.
- setReaderThread(Thread.currentThread());
-
- value.readFields(din);
- return true;
- } catch (EOFException eof) {
- // End of input. There should be a reader event available, or coming soon, so okay to be blocking call.
- ReaderEvent event = getReaderEvent();
- switch (event.getEventType()) {
- case DONE:
- break;
- default:
- throw new IOException("Expected reader event with done status, but got "
- + event.getEventType() + " with message " + event.getMessage());
- }
- return false;
- } catch (IOException io) {
- if (Thread.interrupted()) {
- // Either we were interrupted by one of:
- // 1. handleEvent(), in which case there is a reader event waiting for us in the queue
- // 2. Some other unrelated cause which interrupted us, in which case there may not be a reader event coming.
- // Either way we should not try to block trying to read the reader events queue.
- if (readerEvents.isEmpty()) {
- // Case 2.
- throw io;
- } else {
- // Case 1. Fail the reader, sending back the error we received from the reader event.
- ReaderEvent event = getReaderEvent();
- switch (event.getEventType()) {
- case ERROR:
- throw new IOException("Received reader event error: " + event.getMessage());
- default:
- throw new IOException("Got reader event type " + event.getEventType() + ", expected error event");
- }
- }
- } else {
- // If we weren't interrupted, just propagate the error
- throw io;
- }
- }
- }
-
- /**
- * Define success/error events which are passed to the reader from a different thread.
- * The reader will check for these events on end of input and interruption of the reader thread.
- */
- public static class ReaderEvent {
- public enum EventType {
- DONE,
- ERROR
- }
-
- protected final EventType eventType;
- protected final String message;
-
- protected ReaderEvent(EventType type, String message) {
- this.eventType = type;
- this.message = message;
- }
-
- public static ReaderEvent doneEvent() {
- return new ReaderEvent(EventType.DONE, "");
- }
-
- public static ReaderEvent errorEvent(String message) {
- return new ReaderEvent(EventType.ERROR, message);
- }
-
- public EventType getEventType() {
- return eventType;
- }
-
- public String getMessage() {
- return message;
- }
- }
-
- public void handleEvent(ReaderEvent event) {
- switch (event.getEventType()) {
- case DONE:
- // Reader will check for the event queue upon the end of the input stream - no need to interrupt.
- readerEvents.add(event);
- break;
- case ERROR:
- readerEvents.add(event);
- if (readerThread == null) {
- throw new RuntimeException("Reader thread is unexpectedly null, during ReaderEvent error " + event.getMessage());
- }
- // Reader is using a blocking socket .. interrupt it.
- if (LOG.isDebugEnabled()) {
- LOG.debug("Interrupting reader thread due to reader event with error " + event.getMessage());
- }
- getReaderThread().interrupt();
- break;
- default:
- throw new RuntimeException("Unhandled ReaderEvent type " + event.getEventType() + " with message " + event.getMessage());
- }
- }
-
- protected ReaderEvent getReaderEvent() {
- try {
- ReaderEvent event = readerEvents.take();
- return event;
- } catch (InterruptedException ie) {
- throw new RuntimeException("Interrupted while getting readerEvents, not expected", ie);
- }
- }
-
- protected synchronized void setReaderThread(Thread readerThread) {
- this.readerThread = readerThread;
- }
-
- protected synchronized Thread getReaderThread() {
- return readerThread;
- }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java
deleted file mode 100644
index 02aedfd..0000000
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.llap;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.hive.llap.Schema;
-import org.apache.hadoop.mapred.InputSplitWithLocationInfo;
-import org.apache.hadoop.mapred.SplitLocationInfo;
-import org.apache.thrift.TDeserializer;
-import org.apache.thrift.TSerializer;
-
-public class LlapInputSplit implements InputSplitWithLocationInfo {
-
- int splitNum;
- byte[] planBytes;
- byte[] fragmentBytes;
- SplitLocationInfo[] locations;
- Schema schema;
- String llapUser;
-
- public LlapInputSplit() {
- }
-
- public LlapInputSplit(int splitNum, byte[] planBytes, byte[] fragmentBytes, SplitLocationInfo[] locations, Schema schema, String llapUser) {
- this.planBytes = planBytes;
- this.fragmentBytes = fragmentBytes;
- this.locations = locations;
- this.schema = schema;
- this.splitNum = splitNum;
- this.llapUser = llapUser;
- }
-
- public Schema getSchema() {
- return schema;
- }
-
- @Override
- public long getLength() throws IOException {
- return 0;
- }
-
- @Override
- public String[] getLocations() throws IOException {
- String[] locs = new String[locations.length];
- for (int i = 0; i < locations.length; ++i) {
- locs[i] = locations[i].getLocation();
- }
- return locs;
- }
-
- public int getSplitNum() {
- return splitNum;
- }
-
- public byte[] getPlanBytes() {
- return planBytes;
- }
-
- public byte[] getFragmentBytes() {
- return fragmentBytes;
- }
-
-
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeInt(splitNum);
- out.writeInt(planBytes.length);
- out.write(planBytes);
-
- out.writeInt(fragmentBytes.length);
- out.write(fragmentBytes);
-
- out.writeInt(locations.length);
- for (int i = 0; i < locations.length; ++i) {
- out.writeUTF(locations[i].getLocation());
- }
-
- schema.write(out);
- out.writeUTF(llapUser);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- splitNum = in.readInt();
- int length = in.readInt();
- planBytes = new byte[length];
- in.readFully(planBytes);
-
- length = in.readInt();
- fragmentBytes = new byte[length];
- in.readFully(fragmentBytes);
-
- length = in.readInt();
- locations = new SplitLocationInfo[length];
-
- for (int i = 0; i < length; ++i) {
- locations[i] = new SplitLocationInfo(in.readUTF(), false);
- }
-
- schema = new Schema();
- schema.readFields(in);
- llapUser = in.readUTF();
- }
-
- @Override
- public SplitLocationInfo[] getLocationInfo() throws IOException {
- return locations;
- }
-
- public String getLlapUser() {
- return llapUser;
- }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/ql/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java b/ql/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java
deleted file mode 100644
index 83149ab..0000000
--- a/ql/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java
+++ /dev/null
@@ -1,103 +0,0 @@
-package org.apache.hadoop.hive.llap;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.tez.common.security.JobTokenIdentifier;
-import org.apache.tez.common.security.JobTokenSecretManager;
-import org.apache.tez.runtime.api.impl.TaskSpec;
-
-public class SubmitWorkInfo implements Writable {
-
- private TaskSpec taskSpec;
- private ApplicationId fakeAppId;
- private long creationTime;
-
- // This is used to communicate over the LlapUmbilicalProtocol. Not related to tokens used to
- // talk to LLAP daemons itself via the securit work.
- private Token<JobTokenIdentifier> token;
-
- public SubmitWorkInfo(TaskSpec taskSpec, ApplicationId fakeAppId, long creationTime) {
- this.taskSpec = taskSpec;
- this.fakeAppId = fakeAppId;
- this.token = createJobToken();
- this.creationTime = creationTime;
- }
-
- // Empty constructor for writable etc.
- public SubmitWorkInfo() {
- }
-
- public TaskSpec getTaskSpec() {
- return taskSpec;
- }
-
- public ApplicationId getFakeAppId() {
- return fakeAppId;
- }
-
- public String getTokenIdentifier() {
- return fakeAppId.toString();
- }
-
- public Token<JobTokenIdentifier> getToken() {
- return token;
- }
-
- public long getCreationTime() {
- return creationTime;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- taskSpec.write(out);
- out.writeLong(fakeAppId.getClusterTimestamp());
- out.writeInt(fakeAppId.getId());
- token.write(out);
- out.writeLong(creationTime);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- taskSpec = new TaskSpec();
- taskSpec.readFields(in);
- long appIdTs = in.readLong();
- int appIdId = in.readInt();
- fakeAppId = ApplicationId.newInstance(appIdTs, appIdId);
- token = new Token<>();
- token.readFields(in);
- creationTime = in.readLong();
- }
-
- public static byte[] toBytes(SubmitWorkInfo submitWorkInfo) throws IOException {
- DataOutputBuffer dob = new DataOutputBuffer();
- submitWorkInfo.write(dob);
- return dob.getData();
- }
-
- public static SubmitWorkInfo fromBytes(byte[] submitWorkInfoBytes) throws IOException {
- DataInputBuffer dib = new DataInputBuffer();
- dib.reset(submitWorkInfoBytes, 0, submitWorkInfoBytes.length);
- SubmitWorkInfo submitWorkInfo = new SubmitWorkInfo();
- submitWorkInfo.readFields(dib);
- return submitWorkInfo;
- }
-
-
- private Token<JobTokenIdentifier> createJobToken() {
- String tokenIdentifier = fakeAppId.toString();
- JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(
- tokenIdentifier));
- Token<JobTokenIdentifier> sessionToken = new Token<JobTokenIdentifier>(identifier,
- new JobTokenSecretManager());
- sessionToken.setService(identifier.getJobId());
- return sessionToken;
- }
-}

Search Discussions

  • Jdere at Apr 15, 2016 at 11:45 pm
    HIVE-13529: Move around some of the classes created during llap branch work


    Project: http://git-wip-us.apache.org/repos/asf/hive/repo
    Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7b9096a9
    Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7b9096a9
    Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7b9096a9

    Branch: refs/heads/llap
    Commit: 7b9096a922f9706909ba0e52d8188d182a79612f
    Parents: fc7343d
    Author: Jason Dere <jdere@hortonworks.com>
    Authored: Fri Apr 15 16:45:32 2016 -0700
    Committer: Jason Dere <jdere@hortonworks.com>
    Committed: Fri Apr 15 16:45:32 2016 -0700

    ----------------------------------------------------------------------
      itests/hive-unit/pom.xml | 5 +
      .../hadoop/hive/jdbc/TestLlapInputSplit.java | 100 -----
      .../hive/llap/ext/TestLlapInputSplit.java | 100 +++++
      .../apache/hive/jdbc/TestJdbcWithMiniLlap.java | 4 +-
      .../apache/hive/jdbc/LlapBaseInputFormat.java | 135 ------
      .../src/java/org/apache/hive/jdbc/LlapDump.java | 164 --------
      .../org/apache/hive/jdbc/LlapInputSplit.java | 73 ----
      .../apache/hive/jdbc/LlapRowInputFormat.java | 34 --
      llap-client/pom.xml | 32 ++
      .../hadoop/hive/llap/LlapBaseRecordReader.java | 205 +++++++++
      .../hadoop/hive/llap/LlapInputFormat.java | 392 ++++++++++++++++++
      .../apache/hadoop/hive/llap/LlapInputSplit.java | 131 ++++++
      .../hadoop/hive/llap/LlapRowRecordReader.java | 155 +++++++
      .../apache/hadoop/hive/llap/SubmitWorkInfo.java | 103 +++++
      .../ext/LlapTaskUmbilicalExternalClient.java | 415 +++++++++++++++++++
      .../helpers/LlapTaskUmbilicalServer.java | 57 +++
      .../hadoop/hive/llap/LlapRowRecordReader.java | 155 -------
      llap-ext-client/pom.xml | 140 +++++++
      .../hadoop/hive/llap/LlapBaseInputFormat.java | 136 ++++++
      .../org/apache/hadoop/hive/llap/LlapDump.java | 165 ++++++++
      .../hadoop/hive/llap/LlapRowInputFormat.java | 36 ++
      .../apache/hive/llap/ext/LlapInputSplit.java | 73 ++++
      .../hadoop/hive/llap/LlapInputFormat.java | 392 ------------------
      .../ext/LlapTaskUmbilicalExternalClient.java | 415 -------------------
      .../helpers/LlapTaskUmbilicalServer.java | 57 ---
      pom.xml | 1 +
      .../hadoop/hive/llap/LlapBaseRecordReader.java | 205 ---------
      .../apache/hadoop/hive/llap/LlapInputSplit.java | 131 ------
      .../apache/hadoop/hive/llap/SubmitWorkInfo.java | 103 -----
      29 files changed, 2148 insertions(+), 1966 deletions(-)
    ----------------------------------------------------------------------


    http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/itests/hive-unit/pom.xml
    ----------------------------------------------------------------------
    diff --git a/itests/hive-unit/pom.xml b/itests/hive-unit/pom.xml
    index ae231de..b248673 100644
    --- a/itests/hive-unit/pom.xml
    +++ b/itests/hive-unit/pom.xml
    @@ -61,6 +61,11 @@
          </dependency>
          <dependency>
            <groupId>org.apache.hive</groupId>
    + <artifactId>hive-llap-ext-client</artifactId>
    + <version>${project.version}</version>
    + </dependency>
    + <dependency>
    + <groupId>org.apache.hive</groupId>
            <artifactId>hive-llap-server</artifactId>
            <version>${project.version}</version>
            <type>test-jar</type>

    http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/itests/hive-unit/src/test/java/org/apache/hadoop/hive/jdbc/TestLlapInputSplit.java
    ----------------------------------------------------------------------
    diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/jdbc/TestLlapInputSplit.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/jdbc/TestLlapInputSplit.java
    deleted file mode 100644
    index 366e326..0000000
    --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/jdbc/TestLlapInputSplit.java
    +++ /dev/null
    @@ -1,100 +0,0 @@
    -package org.apache.hive.jdbc;
    -
    -import java.io.ByteArrayInputStream;
    -import java.io.ByteArrayOutputStream;
    -import java.io.DataInputStream;
    -import java.io.DataOutput;
    -import java.io.DataOutputStream;
    -import java.util.ArrayList;
    -import java.util.HashMap;
    -
    -import org.apache.hadoop.io.Text;
    -
    -import org.apache.hadoop.hive.llap.Schema;
    -import org.apache.hadoop.hive.llap.FieldDesc;
    -import org.apache.hadoop.hive.llap.TypeDesc;
    -
    -import org.apache.hadoop.mapred.SplitLocationInfo;
    -import org.junit.After;
    -import org.junit.Before;
    -import org.junit.BeforeClass;
    -import org.junit.Rule;
    -import org.junit.Test;
    -import org.junit.rules.ExpectedException;
    -import static org.junit.Assert.*;
    -
    -public class TestLlapInputSplit {
    -
    - @Test
    - public void testWritable() throws Exception {
    - int splitNum = 88;
    - byte[] planBytes = "0123456789987654321".getBytes();
    - byte[] fragmentBytes = "abcdefghijklmnopqrstuvwxyz".getBytes();
    - SplitLocationInfo[] locations = {
    - new SplitLocationInfo("location1", false),
    - new SplitLocationInfo("location2", false),
    - };
    -
    - ArrayList<FieldDesc> colDescs = new ArrayList<FieldDesc>();
    - colDescs.add(new FieldDesc("col1", new TypeDesc(TypeDesc.Type.STRING)));
    - colDescs.add(new FieldDesc("col2", new TypeDesc(TypeDesc.Type.INT)));
    - Schema schema = new Schema(colDescs);
    -
    - org.apache.hadoop.hive.llap.LlapInputSplit split1 = new org.apache.hadoop.hive.llap.LlapInputSplit(
    - splitNum,
    - planBytes,
    - fragmentBytes,
    - locations,
    - schema,
    - "hive");
    - ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
    - DataOutputStream dataOut = new DataOutputStream(byteOutStream);
    - split1.write(dataOut);
    - ByteArrayInputStream byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray());
    - DataInputStream dataIn = new DataInputStream(byteInStream);
    - org.apache.hadoop.hive.llap.LlapInputSplit split2 = new org.apache.hadoop.hive.llap.LlapInputSplit();
    - split2.readFields(dataIn);
    -
    - // Did we read all the data?
    - assertEquals(0, byteInStream.available());
    -
    - checkLlapSplits(split1, split2);
    -
    - // Try JDBC LlapInputSplits
    - org.apache.hive.jdbc.LlapInputSplit<Text> jdbcSplit1 =
    - new org.apache.hive.jdbc.LlapInputSplit<Text>(split1, "org.apache.hadoop.hive.llap.LlapInputFormat");
    - byteOutStream.reset();
    - jdbcSplit1.write(dataOut);
    - byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray());
    - dataIn = new DataInputStream(byteInStream);
    - org.apache.hive.jdbc.LlapInputSplit<Text> jdbcSplit2 = new org.apache.hive.jdbc.LlapInputSplit<Text>();
    - jdbcSplit2.readFields(dataIn);
    -
    - assertEquals(0, byteInStream.available());
    -
    - checkLlapSplits(
    - (org.apache.hadoop.hive.llap.LlapInputSplit) jdbcSplit1.getSplit(),
    - (org.apache.hadoop.hive.llap.LlapInputSplit) jdbcSplit2.getSplit());
    - assertEquals(jdbcSplit1.getInputFormat().getClass(), jdbcSplit2.getInputFormat().getClass());
    - }
    -
    - static void checkLlapSplits(
    - org.apache.hadoop.hive.llap.LlapInputSplit split1,
    - org.apache.hadoop.hive.llap.LlapInputSplit split2) throws Exception {
    -
    - assertEquals(split1.getSplitNum(), split2.getSplitNum());
    - assertArrayEquals(split1.getPlanBytes(), split2.getPlanBytes());
    - assertArrayEquals(split1.getFragmentBytes(), split2.getFragmentBytes());
    - SplitLocationInfo[] locationInfo1 = split1.getLocationInfo();
    - SplitLocationInfo[] locationInfo2 = split2.getLocationInfo();
    - for (int idx = 0; idx < locationInfo1.length; ++idx) {
    - assertEquals(locationInfo1[idx].getLocation(), locationInfo2[idx].getLocation());
    - assertEquals(locationInfo1[idx].isInMemory(), locationInfo2[idx].isInMemory());
    - assertEquals(locationInfo1[idx].isOnDisk(), locationInfo2[idx].isOnDisk());
    - }
    - assertArrayEquals(split1.getLocations(), split2.getLocations());
    - assertEquals(split1.getSchema().toString(), split2.getSchema().toString());
    - assertEquals(split1.getLlapUser(), split2.getLlapUser());
    - }
    -
    -}

    http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/itests/hive-unit/src/test/java/org/apache/hadoop/hive/llap/ext/TestLlapInputSplit.java
    ----------------------------------------------------------------------
    diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/llap/ext/TestLlapInputSplit.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/llap/ext/TestLlapInputSplit.java
    new file mode 100644
    index 0000000..04da17e
    --- /dev/null
    +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/llap/ext/TestLlapInputSplit.java
    @@ -0,0 +1,100 @@
    +package org.apache.hadoop.hive.llap.ext;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.DataInputStream;
    +import java.io.DataOutput;
    +import java.io.DataOutputStream;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +
    +import org.apache.hadoop.io.Text;
    +
    +import org.apache.hadoop.hive.llap.Schema;
    +import org.apache.hadoop.hive.llap.FieldDesc;
    +import org.apache.hadoop.hive.llap.TypeDesc;
    +
    +import org.apache.hadoop.mapred.SplitLocationInfo;
    +import org.junit.After;
    +import org.junit.Before;
    +import org.junit.BeforeClass;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.ExpectedException;
    +import static org.junit.Assert.*;
    +
    +public class TestLlapInputSplit {
    +
    + @Test
    + public void testWritable() throws Exception {
    + int splitNum = 88;
    + byte[] planBytes = "0123456789987654321".getBytes();
    + byte[] fragmentBytes = "abcdefghijklmnopqrstuvwxyz".getBytes();
    + SplitLocationInfo[] locations = {
    + new SplitLocationInfo("location1", false),
    + new SplitLocationInfo("location2", false),
    + };
    +
    + ArrayList<FieldDesc> colDescs = new ArrayList<FieldDesc>();
    + colDescs.add(new FieldDesc("col1", new TypeDesc(TypeDesc.Type.STRING)));
    + colDescs.add(new FieldDesc("col2", new TypeDesc(TypeDesc.Type.INT)));
    + Schema schema = new Schema(colDescs);
    +
    + org.apache.hadoop.hive.llap.LlapInputSplit split1 = new org.apache.hadoop.hive.llap.LlapInputSplit(
    + splitNum,
    + planBytes,
    + fragmentBytes,
    + locations,
    + schema,
    + "hive");
    + ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
    + DataOutputStream dataOut = new DataOutputStream(byteOutStream);
    + split1.write(dataOut);
    + ByteArrayInputStream byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray());
    + DataInputStream dataIn = new DataInputStream(byteInStream);
    + org.apache.hadoop.hive.llap.LlapInputSplit split2 = new org.apache.hadoop.hive.llap.LlapInputSplit();
    + split2.readFields(dataIn);
    +
    + // Did we read all the data?
    + assertEquals(0, byteInStream.available());
    +
    + checkLlapSplits(split1, split2);
    +
    + // Try JDBC LlapInputSplits
    + org.apache.hive.llap.ext.LlapInputSplit<Text> jdbcSplit1 =
    + new org.apache.hive.llap.ext.LlapInputSplit<Text>(split1, "org.apache.hadoop.hive.llap.LlapInputFormat");
    + byteOutStream.reset();
    + jdbcSplit1.write(dataOut);
    + byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray());
    + dataIn = new DataInputStream(byteInStream);
    + org.apache.hive.llap.ext.LlapInputSplit<Text> jdbcSplit2 = new org.apache.hive.llap.ext.LlapInputSplit<Text>();
    + jdbcSplit2.readFields(dataIn);
    +
    + assertEquals(0, byteInStream.available());
    +
    + checkLlapSplits(
    + (org.apache.hadoop.hive.llap.LlapInputSplit) jdbcSplit1.getSplit(),
    + (org.apache.hadoop.hive.llap.LlapInputSplit) jdbcSplit2.getSplit());
    + assertEquals(jdbcSplit1.getInputFormat().getClass(), jdbcSplit2.getInputFormat().getClass());
    + }
    +
    + static void checkLlapSplits(
    + org.apache.hadoop.hive.llap.LlapInputSplit split1,
    + org.apache.hadoop.hive.llap.LlapInputSplit split2) throws Exception {
    +
    + assertEquals(split1.getSplitNum(), split2.getSplitNum());
    + assertArrayEquals(split1.getPlanBytes(), split2.getPlanBytes());
    + assertArrayEquals(split1.getFragmentBytes(), split2.getFragmentBytes());
    + SplitLocationInfo[] locationInfo1 = split1.getLocationInfo();
    + SplitLocationInfo[] locationInfo2 = split2.getLocationInfo();
    + for (int idx = 0; idx < locationInfo1.length; ++idx) {
    + assertEquals(locationInfo1[idx].getLocation(), locationInfo2[idx].getLocation());
    + assertEquals(locationInfo1[idx].isInMemory(), locationInfo2[idx].isInMemory());
    + assertEquals(locationInfo1[idx].isOnDisk(), locationInfo2[idx].isOnDisk());
    + }
    + assertArrayEquals(split1.getLocations(), split2.getLocations());
    + assertEquals(split1.getSchema().toString(), split2.getSchema().toString());
    + assertEquals(split1.getLlapUser(), split2.getLlapUser());
    + }
    +
    +}

    http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java
    ----------------------------------------------------------------------
    diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java
    index deeac2e..5b4ba49 100644
    --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java
    +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java
    @@ -68,8 +68,8 @@ import org.apache.hadoop.io.Text;

      import org.apache.hive.jdbc.miniHS2.MiniHS2;
      import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType;
    -import org.apache.hive.jdbc.LlapBaseInputFormat;
    -import org.apache.hive.jdbc.LlapRowInputFormat;
    +import org.apache.hadoop.hive.llap.LlapBaseInputFormat;
    +import org.apache.hadoop.hive.llap.LlapRowInputFormat;

      import org.datanucleus.ClassLoaderResolver;
      import org.datanucleus.NucleusContext;

    http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/jdbc/src/java/org/apache/hive/jdbc/LlapBaseInputFormat.java
    ----------------------------------------------------------------------
    diff --git a/jdbc/src/java/org/apache/hive/jdbc/LlapBaseInputFormat.java b/jdbc/src/java/org/apache/hive/jdbc/LlapBaseInputFormat.java
    deleted file mode 100644
    index a0ddeaa..0000000
    --- a/jdbc/src/java/org/apache/hive/jdbc/LlapBaseInputFormat.java
    +++ /dev/null
    @@ -1,135 +0,0 @@
    -/*
    - * Licensed to the Apache Software Foundation (ASF) under one or more
    - * contributor license agreements. See the NOTICE file distributed with
    - * this work for additional information regarding copyright ownership.
    - * The ASF licenses this file to You under the Apache License, Version 2.0
    - * (the "License"); you may not use this file except in compliance with
    - * the License. You may obtain a copy of the License at
    - *
    - * http://www.apache.org/licenses/LICENSE-2.0
    - *
    - * Unless required by applicable law or agreed to in writing, software
    - * distributed under the License is distributed on an "AS IS" BASIS,
    - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    - * See the License for the specific language governing permissions and
    - * limitations under the License.
    - */
    -package org.apache.hive.jdbc;
    -
    -import java.util.ArrayList;
    -import java.util.List;
    -
    -import java.sql.SQLException;
    -import java.sql.Connection;
    -import java.sql.ResultSet;
    -import java.sql.Statement;
    -import java.sql.DriverManager;
    -
    -import java.io.IOException;
    -import java.io.DataInput;
    -import java.io.DataOutput;
    -import java.io.DataInputStream;
    -import java.io.ByteArrayInputStream;
    -
    -import org.apache.hadoop.fs.FileSystem;
    -import org.apache.hadoop.io.Text;
    -import org.apache.hadoop.io.WritableComparable;
    -import org.apache.hadoop.io.NullWritable;
    -import org.apache.hadoop.io.Writable;
    -import org.apache.hadoop.mapred.JobConf;
    -import org.apache.hadoop.mapred.InputFormat;
    -import org.apache.hadoop.mapred.InputSplit;
    -import org.apache.hadoop.mapred.InputSplitWithLocationInfo;
    -import org.apache.hadoop.mapred.SplitLocationInfo;
    -import org.apache.hadoop.mapred.FileSplit;
    -import org.apache.hadoop.mapred.RecordReader;
    -import org.apache.hadoop.mapred.Reporter;
    -import org.apache.hadoop.security.UserGroupInformation;
    -import org.apache.hadoop.security.token.Token;
    -import org.apache.hadoop.security.token.TokenIdentifier;
    -import org.apache.hadoop.util.Progressable;
    -
    -import com.google.common.base.Preconditions;
    -
    -public class LlapBaseInputFormat<V extends WritableComparable> implements InputFormat<NullWritable, V> {
    -
    - private static String driverName = "org.apache.hive.jdbc.HiveDriver";
    - private String url; // "jdbc:hive2://localhost:10000/default"
    - private String user; // "hive",
    - private String pwd; // ""
    - private String query;
    -
    - public static final String URL_KEY = "llap.if.hs2.connection";
    - public static final String QUERY_KEY = "llap.if.query";
    - public static final String USER_KEY = "llap.if.user";
    - public static final String PWD_KEY = "llap.if.pwd";
    -
    - public final String SPLIT_QUERY = "select get_splits(\"%s\",%d)";
    -
    - private Connection con;
    - private Statement stmt;
    -
    - public LlapBaseInputFormat(String url, String user, String pwd, String query) {
    - this.url = url;
    - this.user = user;
    - this.pwd = pwd;
    - this.query = query;
    - }
    -
    - public LlapBaseInputFormat() {}
    -
    -
    - @Override
    - public RecordReader<NullWritable, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
    - LlapInputSplit llapSplit = (LlapInputSplit) split;
    - return llapSplit.getInputFormat().getRecordReader(llapSplit.getSplit(), job, reporter);
    - }
    -
    - @Override
    - public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
    - List<InputSplit> ins = new ArrayList<InputSplit>();
    -
    - if (url == null) url = job.get(URL_KEY);
    - if (query == null) query = job.get(QUERY_KEY);
    - if (user == null) user = job.get(USER_KEY);
    - if (pwd == null) pwd = job.get(PWD_KEY);
    -
    - if (url == null || query == null) {
    - throw new IllegalStateException();
    - }
    -
    - try {
    - Class.forName(driverName);
    - } catch (ClassNotFoundException e) {
    - throw new IOException(e);
    - }
    -
    - try {
    - con = DriverManager.getConnection(url,user,pwd);
    - stmt = con.createStatement();
    - String sql = String.format(SPLIT_QUERY, query, numSplits);
    - ResultSet res = stmt.executeQuery(sql);
    - while (res.next()) {
    - // deserialize split
    - DataInput in = new DataInputStream(res.getBinaryStream(3));
    - InputSplitWithLocationInfo is = (InputSplitWithLocationInfo)Class.forName(res.getString(2)).newInstance();
    - is.readFields(in);
    - ins.add(new LlapInputSplit(is, res.getString(1)));
    - }
    -
    - res.close();
    - stmt.close();
    - } catch (Exception e) {
    - throw new IOException(e);
    - }
    - return ins.toArray(new InputSplit[ins.size()]);
    - }
    -
    - public void close() {
    - try {
    - con.close();
    - } catch (Exception e) {
    - // ignore
    - }
    - }
    -}

    http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/jdbc/src/java/org/apache/hive/jdbc/LlapDump.java
    ----------------------------------------------------------------------
    diff --git a/jdbc/src/java/org/apache/hive/jdbc/LlapDump.java b/jdbc/src/java/org/apache/hive/jdbc/LlapDump.java
    deleted file mode 100644
    index 4c3c3ab..0000000
    --- a/jdbc/src/java/org/apache/hive/jdbc/LlapDump.java
    +++ /dev/null
    @@ -1,164 +0,0 @@
    -/**
    - * Licensed to the Apache Software Foundation (ASF) under one
    - * or more contributor license agreements. See the NOTICE file
    - * distributed with this work for additional information
    - * regarding copyright ownership. The ASF licenses this file
    - * to you under the Apache License, Version 2.0 (the
    - * "License"); you may not use this file except in compliance
    - * with the License. You may obtain a copy of the License at
    - *
    - * http://www.apache.org/licenses/LICENSE-2.0
    - *
    - * Unless required by applicable law or agreed to in writing, software
    - * distributed under the License is distributed on an "AS IS" BASIS,
    - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    - * See the License for the specific language governing permissions and
    - * limitations under the License.
    - */
    -
    -package org.apache.hive.jdbc;
    -
    -import java.io.OutputStream;
    -import java.io.InputStream;
    -import java.io.File;
    -import java.io.IOException;
    -import java.io.FileInputStream;
    -import java.util.ArrayList;
    -import java.util.Arrays;
    -import java.util.LinkedList;
    -import java.util.List;
    -
    -import org.slf4j.Logger;
    -import org.slf4j.LoggerFactory;
    -
    -import org.apache.commons.cli.CommandLine;
    -import org.apache.commons.cli.GnuParser;
    -import org.apache.commons.cli.HelpFormatter;
    -import org.apache.commons.cli.OptionBuilder;
    -import org.apache.commons.cli.Options;
    -
    -import org.apache.hadoop.conf.Configuration;
    -import org.apache.hadoop.hive.ql.io.RCFile.Reader;
    -import org.apache.hadoop.io.Writable;
    -import org.apache.hadoop.io.Text;
    -import org.apache.hadoop.io.WritableComparable;
    -import org.apache.hadoop.io.NullWritable;
    -import org.apache.hadoop.util.ReflectionUtils;
    -import org.apache.hadoop.mapred.RecordReader;
    -import org.apache.hadoop.mapred.RecordWriter;
    -import org.apache.hadoop.mapred.Reporter;
    -import org.apache.hadoop.mapred.JobConf;
    -import org.apache.hadoop.mapred.InputSplit;
    -import org.apache.hadoop.hive.llap.io.api.LlapProxy;
    -import org.apache.hadoop.hive.llap.LlapBaseRecordReader;
    -import org.apache.hadoop.hive.llap.Schema;
    -
    -public class LlapDump {
    -
    - private static final Logger LOG = LoggerFactory.getLogger(LlapDump.class);
    -
    - private static String url = "jdbc:hive2://localhost:10000/default";
    - private static String user = "hive";
    - private static String pwd = "";
    - private static String query = "select * from test";
    - private static String numSplits = "1";
    -
    - public static void main(String[] args) throws Exception {
    - Options opts = createOptions();
    - CommandLine cli = new GnuParser().parse(opts, args);
    -
    - if (cli.hasOption('h')) {
    - HelpFormatter formatter = new HelpFormatter();
    - formatter.printHelp("orcfiledump", opts);
    - return;
    - }
    -
    - if (cli.hasOption('l')) {
    - url = cli.getOptionValue("l");
    - }
    -
    - if (cli.hasOption('u')) {
    - user = cli.getOptionValue("u");
    - }
    -
    - if (cli.hasOption('p')) {
    - pwd = cli.getOptionValue("p");
    - }
    -
    - if (cli.hasOption('n')) {
    - numSplits = cli.getOptionValue("n");
    - }
    -
    - if (cli.getArgs().length > 0) {
    - query = cli.getArgs()[0];
    - }
    -
    - System.out.println("url: "+url);
    - System.out.println("user: "+user);
    - System.out.println("query: "+query);
    -
    - LlapBaseInputFormat format = new LlapBaseInputFormat(url, user, pwd, query);
    - JobConf job = new JobConf();
    -
    - InputSplit[] splits = format.getSplits(job, Integer.parseInt(numSplits));
    -
    - if (splits.length == 0) {
    - System.out.println("No splits returned - empty scan");
    - System.out.println("Results: ");
    - } else {
    - boolean first = true;
    -
    - for (InputSplit s: splits) {
    - LOG.info("Processing input split s from " + Arrays.toString(s.getLocations()));
    - RecordReader<NullWritable, Text> reader = format.getRecordReader(s, job, null);
    -
    - if (reader instanceof LlapBaseRecordReader && first) {
    - Schema schema = ((LlapBaseRecordReader)reader).getSchema();
    - System.out.println(""+schema);
    - }
    -
    - if (first) {
    - System.out.println("Results: ");
    - System.out.println("");
    - first = false;
    - }
    -
    - Text value = reader.createValue();
    - while (reader.next(NullWritable.get(), value)) {
    - System.out.println(value);
    - }
    - }
    - System.exit(0);
    - }
    - }
    -
    - static Options createOptions() {
    - Options result = new Options();
    -
    - result.addOption(OptionBuilder
    - .withLongOpt("location")
    - .withDescription("HS2 url")
    - .hasArg()
    - .create('l'));
    -
    - result.addOption(OptionBuilder
    - .withLongOpt("user")
    - .withDescription("user name")
    - .hasArg()
    - .create('u'));
    -
    - result.addOption(OptionBuilder
    - .withLongOpt("pwd")
    - .withDescription("password")
    - .hasArg()
    - .create('p'));
    -
    - result.addOption(OptionBuilder
    - .withLongOpt("num")
    - .withDescription("number of splits")
    - .hasArg()
    - .create('n'));
    -
    - return result;
    - }
    -}

    http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/jdbc/src/java/org/apache/hive/jdbc/LlapInputSplit.java
    ----------------------------------------------------------------------
    diff --git a/jdbc/src/java/org/apache/hive/jdbc/LlapInputSplit.java b/jdbc/src/java/org/apache/hive/jdbc/LlapInputSplit.java
    deleted file mode 100644
    index 0f4fd4e..0000000
    --- a/jdbc/src/java/org/apache/hive/jdbc/LlapInputSplit.java
    +++ /dev/null
    @@ -1,73 +0,0 @@
    -package org.apache.hive.jdbc;
    -
    -import java.io.DataInput;
    -import java.io.DataOutput;
    -import java.io.IOException;
    -
    -import org.apache.hadoop.io.NullWritable;
    -import org.apache.hadoop.io.WritableComparable;
    -import org.apache.hadoop.mapred.InputFormat;
    -import org.apache.hadoop.mapred.InputSplit;
    -import org.apache.hadoop.mapred.InputSplitWithLocationInfo;
    -import org.apache.hadoop.mapred.SplitLocationInfo;
    -
    -
    -public class LlapInputSplit<V extends WritableComparable> implements InputSplitWithLocationInfo {
    - InputSplitWithLocationInfo nativeSplit;
    - String inputFormatClassName;
    -
    - public LlapInputSplit() {
    - }
    -
    - public LlapInputSplit(InputSplitWithLocationInfo nativeSplit, String inputFormatClassName) {
    - this.nativeSplit = nativeSplit;
    - this.inputFormatClassName = inputFormatClassName;
    - }
    -
    - @Override
    - public long getLength() throws IOException {
    - return nativeSplit.getLength();
    - }
    -
    - @Override
    - public String[] getLocations() throws IOException {
    - return nativeSplit.getLocations();
    - }
    -
    - @Override
    - public void write(DataOutput out) throws IOException {
    - out.writeUTF(inputFormatClassName);
    - out.writeUTF(nativeSplit.getClass().getName());
    - nativeSplit.write(out);
    - }
    -
    - @Override
    - public void readFields(DataInput in) throws IOException {
    - inputFormatClassName = in.readUTF();
    - String splitClass = in.readUTF();
    - try {
    - nativeSplit = (InputSplitWithLocationInfo)Class.forName(splitClass).newInstance();
    - } catch (Exception e) {
    - throw new IOException(e);
    - }
    - nativeSplit.readFields(in);
    - }
    -
    - @Override
    - public SplitLocationInfo[] getLocationInfo() throws IOException {
    - return nativeSplit.getLocationInfo();
    - }
    -
    - public InputSplit getSplit() {
    - return nativeSplit;
    - }
    -
    - public InputFormat<NullWritable, V> getInputFormat() throws IOException {
    - try {
    - return (InputFormat<NullWritable, V>) Class.forName(inputFormatClassName)
    - .newInstance();
    - } catch(Exception e) {
    - throw new IOException(e);
    - }
    - }
    -}

    http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/jdbc/src/java/org/apache/hive/jdbc/LlapRowInputFormat.java
    ----------------------------------------------------------------------
    diff --git a/jdbc/src/java/org/apache/hive/jdbc/LlapRowInputFormat.java b/jdbc/src/java/org/apache/hive/jdbc/LlapRowInputFormat.java
    deleted file mode 100644
    index 1cca66a..0000000
    --- a/jdbc/src/java/org/apache/hive/jdbc/LlapRowInputFormat.java
    +++ /dev/null
    @@ -1,34 +0,0 @@
    -package org.apache.hive.jdbc;
    -
    -import java.io.IOException;
    -
    -import org.apache.hadoop.hive.llap.LlapBaseRecordReader;
    -import org.apache.hadoop.hive.llap.LlapRowRecordReader;
    -import org.apache.hadoop.hive.llap.Row;
    -import org.apache.hadoop.hive.llap.Schema;
    -
    -import org.apache.hadoop.io.NullWritable;
    -import org.apache.hadoop.io.Text;
    -import org.apache.hadoop.io.Writable;
    -import org.apache.hadoop.mapred.JobConf;
    -import org.apache.hadoop.mapred.InputFormat;
    -import org.apache.hadoop.mapred.InputSplit;
    -import org.apache.hadoop.mapred.RecordReader;
    -import org.apache.hadoop.mapred.Reporter;
    -
    -public class LlapRowInputFormat implements InputFormat<NullWritable, Row> {
    - LlapBaseInputFormat<Text> baseInputFormat = new LlapBaseInputFormat<Text>();
    -
    - @Override
    - public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
    - return baseInputFormat.getSplits(job, numSplits);
    - }
    -
    - @Override
    - public RecordReader<NullWritable, Row> getRecordReader(InputSplit split, JobConf job, Reporter reporter)
    - throws IOException {
    - LlapInputSplit<Text> llapSplit = (LlapInputSplit<Text>) split;
    - LlapBaseRecordReader<Text> reader = (LlapBaseRecordReader<Text>) baseInputFormat.getRecordReader(llapSplit, job, reporter);
    - return new LlapRowRecordReader(job, reader.getSchema(), reader);
    - }
    -}

    http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/llap-client/pom.xml
    ----------------------------------------------------------------------
    diff --git a/llap-client/pom.xml b/llap-client/pom.xml
    index 50c06a4..4a75bbb 100644
    --- a/llap-client/pom.xml
    +++ b/llap-client/pom.xml
    @@ -109,6 +109,38 @@
              </exclusion>
            </exclusions>
          </dependency>
    + <dependency>
    + <groupId>org.apache.tez</groupId>
    + <artifactId>tez-api</artifactId>
    + <version>${tez.version}</version>
    + <optional>true</optional>
    + <exclusions>
    + <exclusion>
    + <groupId>org.slf4j</groupId>
    + <artifactId>slf4j-log4j12</artifactId>
    + </exclusion>
    + <exclusion>
    + <groupId>commmons-logging</groupId>
    + <artifactId>commons-logging</artifactId>
    + </exclusion>
    + </exclusions>
    + </dependency>
    + <dependency>
    + <groupId>org.apache.tez</groupId>
    + <artifactId>tez-runtime-internals</artifactId>
    + <version>${tez.version}</version>
    + <optional>true</optional>
    + <exclusions>
    + <exclusion>
    + <groupId>org.slf4j</groupId>
    + <artifactId>slf4j-log4j12</artifactId>
    + </exclusion>
    + <exclusion>
    + <groupId>commmons-logging</groupId>
    + <artifactId>commons-logging</artifactId>
    + </exclusion>
    + </exclusions>
    + </dependency>
        </dependencies>
        <build>
          <sourceDirectory>${basedir}/src/java</sourceDirectory>

    http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
    ----------------------------------------------------------------------
    diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
    new file mode 100644
    index 0000000..7073280
    --- /dev/null
    +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
    @@ -0,0 +1,205 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements. See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership. The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License. You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.hadoop.hive.llap;
    +
    +import java.io.EOFException;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.DataInputStream;
    +import java.util.concurrent.LinkedBlockingQueue;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hive.llap.Schema;
    +import org.apache.hadoop.io.Writable;
    +import org.apache.hadoop.io.WritableComparable;
    +import org.apache.hadoop.io.NullWritable;
    +import org.apache.hadoop.util.ReflectionUtils;
    +import org.apache.hadoop.mapred.RecordReader;
    +import org.apache.hadoop.mapred.Reporter;
    +import org.apache.hadoop.mapred.JobConf;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class LlapBaseRecordReader<V extends WritableComparable> implements RecordReader<NullWritable, V> {
    + private static final Logger LOG = LoggerFactory.getLogger(LlapBaseRecordReader.class);
    +
    + DataInputStream din;
    + Schema schema;
    + Class<V> clazz;
    +
    +
    + protected Thread readerThread = null;
    + protected LinkedBlockingQueue<ReaderEvent> readerEvents = new LinkedBlockingQueue<ReaderEvent>();
    +
    + public LlapBaseRecordReader(InputStream in, Schema schema, Class<V> clazz) {
    + din = new DataInputStream(in);
    + this.schema = schema;
    + this.clazz = clazz;
    + this.readerThread = Thread.currentThread();
    + }
    +
    + public Schema getSchema() {
    + return schema;
    + }
    +
    + @Override
    + public void close() throws IOException {
    + din.close();
    + }
    +
    + @Override
    + public long getPos() { return 0; }
    +
    + @Override
    + public float getProgress() { return 0f; }
    +
    + @Override
    + public NullWritable createKey() {
    + return NullWritable.get();
    + }
    +
    + @Override
    + public V createValue() {
    + try {
    + return clazz.newInstance();
    + } catch (Exception e) {
    + return null;
    + }
    + }
    +
    + @Override
    + public boolean next(NullWritable key, V value) throws IOException {
    + try {
    + // Need a way to know what thread to interrupt, since this is a blocking thread.
    + setReaderThread(Thread.currentThread());
    +
    + value.readFields(din);
    + return true;
    + } catch (EOFException eof) {
    + // End of input. There should be a reader event available, or coming soon, so okay to be blocking call.
    + ReaderEvent event = getReaderEvent();
    + switch (event.getEventType()) {
    + case DONE:
    + break;
    + default:
    + throw new IOException("Expected reader event with done status, but got "
    + + event.getEventType() + " with message " + event.getMessage());
    + }
    + return false;
    + } catch (IOException io) {
    + if (Thread.interrupted()) {
    + // Either we were interrupted by one of:
    + // 1. handleEvent(), in which case there is a reader event waiting for us in the queue
    + // 2. Some other unrelated cause which interrupted us, in which case there may not be a reader event coming.
    + // Either way we should not try to block trying to read the reader events queue.
    + if (readerEvents.isEmpty()) {
    + // Case 2.
    + throw io;
    + } else {
    + // Case 1. Fail the reader, sending back the error we received from the reader event.
    + ReaderEvent event = getReaderEvent();
    + switch (event.getEventType()) {
    + case ERROR:
    + throw new IOException("Received reader event error: " + event.getMessage());
    + default:
    + throw new IOException("Got reader event type " + event.getEventType() + ", expected error event");
    + }
    + }
    + } else {
    + // If we weren't interrupted, just propagate the error
    + throw io;
    + }
    + }
    + }
    +
    + /**
    + * Define success/error events which are passed to the reader from a different thread.
    + * The reader will check for these events on end of input and interruption of the reader thread.
    + */
    + public static class ReaderEvent {
    + public enum EventType {
    + DONE,
    + ERROR
    + }
    +
    + protected final EventType eventType;
    + protected final String message;
    +
    + protected ReaderEvent(EventType type, String message) {
    + this.eventType = type;
    + this.message = message;
    + }
    +
    + public static ReaderEvent doneEvent() {
    + return new ReaderEvent(EventType.DONE, "");
    + }
    +
    + public static ReaderEvent errorEvent(String message) {
    + return new ReaderEvent(EventType.ERROR, message);
    + }
    +
    + public EventType getEventType() {
    + return eventType;
    + }
    +
    + public String getMessage() {
    + return message;
    + }
    + }
    +
    + public void handleEvent(ReaderEvent event) {
    + switch (event.getEventType()) {
    + case DONE:
    + // Reader will check for the event queue upon the end of the input stream - no need to interrupt.
    + readerEvents.add(event);
    + break;
    + case ERROR:
    + readerEvents.add(event);
    + if (readerThread == null) {
    + throw new RuntimeException("Reader thread is unexpectedly null, during ReaderEvent error " + event.getMessage());
    + }
    + // Reader is using a blocking socket .. interrupt it.
    + if (LOG.isDebugEnabled()) {
    + LOG.debug("Interrupting reader thread due to reader event with error " + event.getMessage());
    + }
    + getReaderThread().interrupt();
    + break;
    + default:
    + throw new RuntimeException("Unhandled ReaderEvent type " + event.getEventType() + " with message " + event.getMessage());
    + }
    + }
    +
    + protected ReaderEvent getReaderEvent() {
    + try {
    + ReaderEvent event = readerEvents.take();
    + return event;
    + } catch (InterruptedException ie) {
    + throw new RuntimeException("Interrupted while getting readerEvents, not expected", ie);
    + }
    + }
    +
    + protected synchronized void setReaderThread(Thread readerThread) {
    + this.readerThread = readerThread;
    + }
    +
    + protected synchronized Thread getReaderThread() {
    + return readerThread;
    + }
    +}

    http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/llap-client/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
    ----------------------------------------------------------------------
    diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
    new file mode 100644
    index 0000000..0930d60
    --- /dev/null
    +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
    @@ -0,0 +1,392 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.hadoop.hive.llap;
    +
    +import java.io.IOException;
    +import java.net.InetAddress;
    +import java.net.InetSocketAddress;
    +import java.net.Socket;
    +import java.nio.ByteBuffer;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.LinkedBlockingQueue;
    +
    +import com.google.common.collect.Lists;
    +import com.google.protobuf.ByteString;
    +
    +import org.apache.commons.collections4.ListUtils;
    +import org.apache.hadoop.hive.conf.HiveConf;
    +import org.apache.hadoop.hive.llap.LlapBaseRecordReader.ReaderEvent;
    +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
    +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
    +import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient;
    +import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient.LlapTaskUmbilicalExternalResponder;
    +import org.apache.hadoop.hive.llap.registry.ServiceInstance;
    +import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
    +import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
    +import org.apache.hadoop.hive.llap.tez.Converters;
    +import org.apache.hadoop.io.DataInputBuffer;
    +import org.apache.hadoop.io.DataOutputBuffer;
    +import org.apache.hadoop.io.NullWritable;
    +import org.apache.hadoop.io.Text;
    +import org.apache.hadoop.io.WritableComparable;
    +import org.apache.hadoop.mapred.InputFormat;
    +import org.apache.hadoop.mapred.InputSplit;
    +import org.apache.hadoop.mapred.JobConf;
    +import org.apache.hadoop.mapred.RecordReader;
    +import org.apache.hadoop.mapred.Reporter;
    +import org.apache.hadoop.security.Credentials;
    +import org.apache.hadoop.security.token.Token;
    +import org.apache.hadoop.yarn.api.ApplicationConstants;
    +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
    +import org.apache.hadoop.yarn.api.records.ApplicationId;
    +import org.apache.hadoop.yarn.api.records.ContainerId;
    +import org.apache.tez.common.security.JobTokenIdentifier;
    +import org.apache.tez.common.security.TokenCache;
    +import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
    +import org.apache.tez.runtime.api.impl.TaskSpec;
    +import org.apache.tez.runtime.api.impl.TezEvent;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.tez.dag.records.TezTaskAttemptID;
    +import org.apache.tez.runtime.api.impl.EventType;
    +import org.apache.tez.runtime.api.impl.TezEvent;
    +import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
    +import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
    +
    +
    +public class LlapInputFormat<V extends WritableComparable> implements InputFormat<NullWritable, V> {
    +
    + private static final Logger LOG = LoggerFactory.getLogger(LlapInputFormat.class);
    +
    + public LlapInputFormat() {
    + }
    +
    + /*
    + * This proxy record reader has the duty of establishing a connected socket with LLAP, then fire
    + * off the work in the split to LLAP and finally return the connected socket back in an
    + * LlapRecordReader. The LlapRecordReader class reads the results from the socket.
    + */
    + @Override
    + public RecordReader<NullWritable, V> getRecordReader(InputSplit split, JobConf job,
    + Reporter reporter) throws IOException {
    +
    + LlapInputSplit llapSplit = (LlapInputSplit) split;
    +
    + // Set conf to use LLAP user rather than current user for LLAP Zk registry.
    + HiveConf.setVar(job, HiveConf.ConfVars.LLAP_ZK_REGISTRY_USER, llapSplit.getLlapUser());
    + SubmitWorkInfo submitWorkInfo = SubmitWorkInfo.fromBytes(llapSplit.getPlanBytes());
    +
    + ServiceInstance serviceInstance = getServiceInstance(job, llapSplit);
    + String host = serviceInstance.getHost();
    + int llapSubmitPort = serviceInstance.getRpcPort();
    +
    + LOG.info("Found service instance for host " + host + " with rpc port " + llapSubmitPort
    + + " and outputformat port " + serviceInstance.getOutputFormatPort());
    +
    + LlapRecordReaderTaskUmbilicalExternalResponder umbilicalResponder =
    + new LlapRecordReaderTaskUmbilicalExternalResponder();
    + LlapTaskUmbilicalExternalClient llapClient =
    + new LlapTaskUmbilicalExternalClient(job, submitWorkInfo.getTokenIdentifier(),
    + submitWorkInfo.getToken(), umbilicalResponder);
    + llapClient.init(job);
    + llapClient.start();
    +
    + SubmitWorkRequestProto submitWorkRequestProto =
    + constructSubmitWorkRequestProto(submitWorkInfo, llapSplit.getSplitNum(),
    + llapClient.getAddress(), submitWorkInfo.getToken());
    +
    + TezEvent tezEvent = new TezEvent();
    + DataInputBuffer dib = new DataInputBuffer();
    + dib.reset(llapSplit.getFragmentBytes(), 0, llapSplit.getFragmentBytes().length);
    + tezEvent.readFields(dib);
    + List<TezEvent> tezEventList = Lists.newArrayList();
    + tezEventList.add(tezEvent);
    +
    + llapClient.submitWork(submitWorkRequestProto, host, llapSubmitPort, tezEventList);
    +
    + String id = HiveConf.getVar(job, HiveConf.ConfVars.HIVEQUERYID) + "_" + llapSplit.getSplitNum();
    +
    + HiveConf conf = new HiveConf();
    + Socket socket = new Socket(host,
    + serviceInstance.getOutputFormatPort());
    +
    + LOG.debug("Socket connected");
    +
    + socket.getOutputStream().write(id.getBytes());
    + socket.getOutputStream().write(0);
    + socket.getOutputStream().flush();
    +
    + LOG.info("Registered id: " + id);
    +
    + LlapBaseRecordReader recordReader = new LlapBaseRecordReader(socket.getInputStream(), llapSplit.getSchema(), Text.class);
    + umbilicalResponder.setRecordReader(recordReader);
    + return recordReader;
    + }
    +
    + @Override
    + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
    + throw new IOException("These are not the splits you are looking for.");
    + }
    +
    + private ServiceInstance getServiceInstance(JobConf job, LlapInputSplit llapSplit) throws IOException {
    + LlapRegistryService registryService = LlapRegistryService.getClient(job);
    + String host = llapSplit.getLocations()[0];
    +
    + ServiceInstance serviceInstance = getServiceInstanceForHost(registryService, host);
    + if (serviceInstance == null) {
    + throw new IOException("No service instances found for " + host + " in registry");
    + }
    +
    + return serviceInstance;
    + }
    +
    + private ServiceInstance getServiceInstanceForHost(LlapRegistryService registryService, String host) throws IOException {
    + InetAddress address = InetAddress.getByName(host);
    + ServiceInstanceSet instanceSet = registryService.getInstances();
    + ServiceInstance serviceInstance = null;
    +
    + // The name used in the service registry may not match the host name we're using.
    + // Try hostname/canonical hostname/host address
    +
    + String name = address.getHostName();
    + LOG.info("Searching service instance by hostname " + name);
    + serviceInstance = selectServiceInstance(instanceSet.getByHost(name));
    + if (serviceInstance != null) {
    + return serviceInstance;
    + }
    +
    + name = address.getCanonicalHostName();
    + LOG.info("Searching service instance by canonical hostname " + name);
    + serviceInstance = selectServiceInstance(instanceSet.getByHost(name));
    + if (serviceInstance != null) {
    + return serviceInstance;
    + }
    +
    + name = address.getHostAddress();
    + LOG.info("Searching service instance by address " + name);
    + serviceInstance = selectServiceInstance(instanceSet.getByHost(name));
    + if (serviceInstance != null) {
    + return serviceInstance;
    + }
    +
    + return serviceInstance;
    + }
    +
    + private ServiceInstance selectServiceInstance(Set<ServiceInstance> serviceInstances) {
    + if (serviceInstances == null || serviceInstances.isEmpty()) {
    + return null;
    + }
    +
    + // Get the first live service instance
    + for (ServiceInstance serviceInstance : serviceInstances) {
    + if (serviceInstance.isAlive()) {
    + return serviceInstance;
    + }
    + }
    +
    + LOG.info("No live service instances were found");
    + return null;
    + }
    +
    + private SubmitWorkRequestProto constructSubmitWorkRequestProto(SubmitWorkInfo submitWorkInfo,
    + int taskNum,
    + InetSocketAddress address,
    + Token<JobTokenIdentifier> token) throws
    + IOException {
    + TaskSpec taskSpec = submitWorkInfo.getTaskSpec();
    + ApplicationId appId = submitWorkInfo.getFakeAppId();
    +
    + SubmitWorkRequestProto.Builder builder = SubmitWorkRequestProto.newBuilder();
    + // This works, assuming the executor is running within YARN.
    + LOG.info("Setting user in submitWorkRequest to: " +
    + System.getenv(ApplicationConstants.Environment.USER.name()));
    + builder.setUser(System.getenv(ApplicationConstants.Environment.USER.name()));
    + builder.setApplicationIdString(appId.toString());
    + builder.setAppAttemptNumber(0);
    + builder.setTokenIdentifier(appId.toString());
    +
    + ContainerId containerId =
    + ContainerId.newInstance(ApplicationAttemptId.newInstance(appId, 0), taskNum);
    + builder.setContainerIdString(containerId.toString());
    +
    + builder.setAmHost(address.getHostName());
    + builder.setAmPort(address.getPort());
    + Credentials taskCredentials = new Credentials();
    + // Credentials can change across DAGs. Ideally construct only once per DAG.
    + // TODO Figure out where credentials will come from. Normally Hive sets up
    + // URLs on the tez dag, for which Tez acquires credentials.
    +
    + // taskCredentials.addAll(getContext().getCredentials());
    +
    + // Preconditions.checkState(currentQueryIdentifierProto.getDagIdentifier() ==
    + // taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId());
    + // ByteBuffer credentialsBinary = credentialMap.get(currentQueryIdentifierProto);
    + // if (credentialsBinary == null) {
    + // credentialsBinary = serializeCredentials(getContext().getCredentials());
    + // credentialMap.putIfAbsent(currentQueryIdentifierProto, credentialsBinary.duplicate());
    + // } else {
    + // credentialsBinary = credentialsBinary.duplicate();
    + // }
    + // builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary));
    + Credentials credentials = new Credentials();
    + TokenCache.setSessionToken(token, credentials);
    + ByteBuffer credentialsBinary = serializeCredentials(credentials);
    + builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary));
    +
    +
    + builder.setFragmentSpec(Converters.convertTaskSpecToProto(taskSpec));
    +
    + FragmentRuntimeInfo.Builder runtimeInfo = FragmentRuntimeInfo.newBuilder();
    + runtimeInfo.setCurrentAttemptStartTime(System.currentTimeMillis());
    + runtimeInfo.setWithinDagPriority(0);
    + runtimeInfo.setDagStartTime(submitWorkInfo.getCreationTime());
    + runtimeInfo.setFirstAttemptStartTime(submitWorkInfo.getCreationTime());
    + runtimeInfo.setNumSelfAndUpstreamTasks(taskSpec.getVertexParallelism());
    + runtimeInfo.setNumSelfAndUpstreamCompletedTasks(0);
    +
    +
    + builder.setUsingTezAm(false);
    + builder.setFragmentRuntimeInfo(runtimeInfo.build());
    + return builder.build();
    + }
    +
    + private ByteBuffer serializeCredentials(Credentials credentials) throws IOException {
    + Credentials containerCredentials = new Credentials();
    + containerCredentials.addAll(credentials);
    + DataOutputBuffer containerTokens_dob = new DataOutputBuffer();
    + containerCredentials.writeTokenStorageToStream(containerTokens_dob);
    + return ByteBuffer.wrap(containerTokens_dob.getData(), 0, containerTokens_dob.getLength());
    + }
    +
    + private static class LlapRecordReaderTaskUmbilicalExternalResponder implements LlapTaskUmbilicalExternalResponder {
    + protected LlapBaseRecordReader recordReader = null;
    + protected LinkedBlockingQueue<ReaderEvent> queuedEvents = new LinkedBlockingQueue<ReaderEvent>();
    +
    + public LlapRecordReaderTaskUmbilicalExternalResponder() {
    + }
    +
    + @Override
    + public void submissionFailed(String fragmentId, Throwable throwable) {
    + try {
    + sendOrQueueEvent(ReaderEvent.errorEvent(
    + "Received submission failed event for fragment ID " + fragmentId));
    + } catch (Exception err) {
    + LOG.error("Error during heartbeat responder:", err);
    + }
    + }
    +
    + @Override
    + public void heartbeat(TezHeartbeatRequest request) {
    + TezTaskAttemptID taskAttemptId = request.getCurrentTaskAttemptID();
    + List<TezEvent> inEvents = request.getEvents();
    + for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) {
    + EventType eventType = tezEvent.getEventType();
    + try {
    + switch (eventType) {
    + case TASK_ATTEMPT_COMPLETED_EVENT:
    + sendOrQueueEvent(ReaderEvent.doneEvent());
    + break;
    + case TASK_ATTEMPT_FAILED_EVENT:
    + TaskAttemptFailedEvent taskFailedEvent = (TaskAttemptFailedEvent) tezEvent.getEvent();
    + sendOrQueueEvent(ReaderEvent.errorEvent(taskFailedEvent.getDiagnostics()));
    + break;
    + case TASK_STATUS_UPDATE_EVENT:
    + // If we want to handle counters
    + break;
    + default:
    + LOG.warn("Unhandled event type " + eventType);
    + break;
    + }
    + } catch (Exception err) {
    + LOG.error("Error during heartbeat responder:", err);
    + }
    + }
    + }
    +
    + @Override
    + public void taskKilled(TezTaskAttemptID taskAttemptId) {
    + try {
    + sendOrQueueEvent(ReaderEvent.errorEvent(
    + "Received task killed event for task ID " + taskAttemptId));
    + } catch (Exception err) {
    + LOG.error("Error during heartbeat responder:", err);
    + }
    + }
    +
    + @Override
    + public void heartbeatTimeout(String taskAttemptId) {
    + try {
    + sendOrQueueEvent(ReaderEvent.errorEvent(
    + "Timed out waiting for heartbeat for task ID " + taskAttemptId));
    + } catch (Exception err) {
    + LOG.error("Error during heartbeat responder:", err);
    + }
    + }
    +
    + public synchronized LlapBaseRecordReader getRecordReader() {
    + return recordReader;
    + }
    +
    + public synchronized void setRecordReader(LlapBaseRecordReader recordReader) {
    + this.recordReader = recordReader;
    +
    + if (recordReader == null) {
    + return;
    + }
    +
    + // If any events were queued by the responder, give them to the record reader now.
    + while (!queuedEvents.isEmpty()) {
    + ReaderEvent readerEvent = queuedEvents.poll();
    + LOG.debug("Sending queued event to record reader: " + readerEvent.getEventType());
    + recordReader.handleEvent(readerEvent);
    + }
    + }
    +
    + /**
    + * Send the ReaderEvents to the record reader, if it is registered to this responder.
    + * If there is no registered record reader, add them to a list of pending reader events
    + * since we don't want to drop these events.
    + * @param readerEvent
    + */
    + protected synchronized void sendOrQueueEvent(ReaderEvent readerEvent) {
    + LlapBaseRecordReader recordReader = getRecordReader();
    + if (recordReader != null) {
    + recordReader.handleEvent(readerEvent);
    + } else {
    + if (LOG.isDebugEnabled()) {
    + LOG.debug("No registered record reader, queueing event " + readerEvent.getEventType()
    + + " with message " + readerEvent.getMessage());
    + }
    +
    + try {
    + queuedEvents.put(readerEvent);
    + } catch (Exception err) {
    + throw new RuntimeException("Unexpected exception while queueing reader event", err);
    + }
    + }
    + }
    +
    + /**
    + * Clear the list of queued reader events if we are not interested in sending any pending events to any registering record reader.
    + */
    + public void clearQueuedEvents() {
    + queuedEvents.clear();
    + }
    + }
    +}

    http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/llap-client/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java
    ----------------------------------------------------------------------
    diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java
    new file mode 100644
    index 0000000..02aedfd
    --- /dev/null
    +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java
    @@ -0,0 +1,131 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.hadoop.hive.llap;
    +
    +import java.io.DataInput;
    +import java.io.DataOutput;
    +import java.io.IOException;
    +
    +import org.apache.hadoop.hive.llap.Schema;
    +import org.apache.hadoop.mapred.InputSplitWithLocationInfo;
    +import org.apache.hadoop.mapred.SplitLocationInfo;
    +import org.apache.thrift.TDeserializer;
    +import org.apache.thrift.TSerializer;
    +
    +public class LlapInputSplit implements InputSplitWithLocationInfo {
    +
    + int splitNum;
    + byte[] planBytes;
    + byte[] fragmentBytes;
    + SplitLocationInfo[] locations;
    + Schema schema;
    + String llapUser;
    +
    + public LlapInputSplit() {
    + }
    +
    + public LlapInputSplit(int splitNum, byte[] planBytes, byte[] fragmentBytes, SplitLocationInfo[] locations, Schema schema, String llapUser) {
    + this.planBytes = planBytes;
    + this.fragmentBytes = fragmentBytes;
    + this.locations = locations;
    + this.schema = schema;
    + this.splitNum = splitNum;
    + this.llapUser = llapUser;
    + }
    +
    + public Schema getSchema() {
    + return schema;
    + }
    +
    + @Override
    + public long getLength() throws IOException {
    + return 0;
    + }
    +
    + @Override
    + public String[] getLocations() throws IOException {
    + String[] locs = new String[locations.length];
    + for (int i = 0; i < locations.length; ++i) {
    + locs[i] = locations[i].getLocation();
    + }
    + return locs;
    + }
    +
    + public int getSplitNum() {
    + return splitNum;
    + }
    +
    + public byte[] getPlanBytes() {
    + return planBytes;
    + }
    +
    + public byte[] getFragmentBytes() {
    + return fragmentBytes;
    + }
    +
    +
    +
    + @Override
    + public void write(DataOutput out) throws IOException {
    + out.writeInt(splitNum);
    + out.writeInt(planBytes.length);
    + out.write(planBytes);
    +
    + out.writeInt(fragmentBytes.length);
    + out.write(fragmentBytes);
    +
    + out.writeInt(locations.length);
    + for (int i = 0; i < locations.length; ++i) {
    + out.writeUTF(locations[i].getLocation());
    + }
    +
    + schema.write(out);
    + out.writeUTF(llapUser);
    + }
    +
    + @Override
    + public void readFields(DataInput in) throws IOException {
    + splitNum = in.readInt();
    + int length = in.readInt();
    + planBytes = new byte[length];
    + in.readFully(planBytes);
    +
    + length = in.readInt();
    + fragmentBytes = new byte[length];
    + in.readFully(fragmentBytes);
    +
    + length = in.readInt();
    + locations = new SplitLocationInfo[length];
    +
    + for (int i = 0; i < length; ++i) {
    + locations[i] = new SplitLocationInfo(in.readUTF(), false);
    + }
    +
    + schema = new Schema();
    + schema.readFields(in);
    + llapUser = in.readUTF();
    + }
    +
    + @Override
    + public SplitLocationInfo[] getLocationInfo() throws IOException {
    + return locations;
    + }
    +
    + public String getLlapUser() {
    + return llapUser;
    + }
    +}

    http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/llap-client/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java
    ----------------------------------------------------------------------
    diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java
    new file mode 100644
    index 0000000..4e000ff
    --- /dev/null
    +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java
    @@ -0,0 +1,155 @@
    +package org.apache.hadoop.hive.llap;
    +
    +import com.google.common.base.Preconditions;
    +
    +import java.io.IOException;
    +import java.util.List;
    +import java.util.Properties;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.io.NullWritable;
    +import org.apache.hadoop.io.Text;
    +import org.apache.hadoop.io.Writable;
    +import org.apache.hadoop.mapred.RecordReader;
    +import org.apache.hadoop.mapred.Reporter;
    +import org.apache.hadoop.mapred.JobConf;
    +
    +import org.apache.hadoop.hive.llap.Row;
    +import org.apache.hadoop.hive.llap.FieldDesc;
    +import org.apache.hadoop.hive.llap.Schema;
    +import org.apache.hadoop.hive.llap.TypeDesc;
    +import org.apache.hadoop.hive.serde.serdeConstants;
    +import org.apache.hadoop.hive.serde2.SerDe;
    +import org.apache.hadoop.hive.serde2.SerDeException;
    +import org.apache.hadoop.hive.serde2.io.HiveCharWritable;
    +import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
    +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
    +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
    +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
    +import org.apache.hadoop.hive.serde2.objectinspector.StructField;
    +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +
    +public class LlapRowRecordReader implements RecordReader<NullWritable, Row> {
    +
    + private static final Logger LOG = LoggerFactory.getLogger(LlapRowRecordReader.class);
    +
    + Configuration conf;
    + RecordReader<NullWritable, Text> reader;
    + Schema schema;
    + SerDe serde;
    + final Text textData = new Text();
    +
    + public LlapRowRecordReader(Configuration conf, Schema schema, RecordReader<NullWritable, Text> reader) {
    + this.conf = conf;
    + this.schema = schema;
    + this.reader = reader;
    + }
    +
    + @Override
    + public void close() throws IOException {
    + reader.close();
    + }
    +
    + @Override
    + public NullWritable createKey() {
    + return NullWritable.get();
    + }
    +
    + @Override
    + public Row createValue() {
    + return new Row(schema);
    + }
    +
    + @Override
    + public long getPos() throws IOException {
    + return 0;
    + }
    +
    + @Override
    + public float getProgress() throws IOException {
    + return 0;
    + }
    +
    + @Override
    + public boolean next(NullWritable key, Row value) throws IOException {
    + Preconditions.checkArgument(value != null);
    +
    + if (serde == null) {
    + try {
    + serde = initSerDe(conf);
    + } catch (SerDeException err) {
    + throw new IOException(err);
    + }
    + }
    +
    + boolean hasNext = reader.next(key, textData);
    + if (hasNext) {
    + // Deserialize Text to column values, and populate the row record
    + Object rowObj;
    + try {
    + StructObjectInspector rowOI = (StructObjectInspector) serde.getObjectInspector();
    + rowObj = serde.deserialize(textData);
    + List<? extends StructField> colFields = rowOI.getAllStructFieldRefs();
    + for (int idx = 0; idx < colFields.size(); ++idx) {
    + StructField field = colFields.get(idx);
    + Object colValue = rowOI.getStructFieldData(rowObj, field);
    + Preconditions.checkState(field.getFieldObjectInspector().getCategory() == Category.PRIMITIVE,
    + "Cannot handle non-primitive column type " + field.getFieldObjectInspector().getTypeName());
    +
    + PrimitiveObjectInspector poi = (PrimitiveObjectInspector) field.getFieldObjectInspector();
    + // char/varchar special cased here since the row record handles them using Text
    + switch (poi.getPrimitiveCategory()) {
    + case CHAR:
    + value.setValue(idx, ((HiveCharWritable) poi.getPrimitiveWritableObject(colValue)).getPaddedValue());
    + break;
    + case VARCHAR:
    + value.setValue(idx, ((HiveVarcharWritable) poi.getPrimitiveWritableObject(colValue)).getTextValue());
    + break;
    + default:
    + value.setValue(idx, (Writable) poi.getPrimitiveWritableObject(colValue));
    + break;
    + }
    + }
    + } catch (SerDeException err) {
    + if (LOG.isDebugEnabled()) {
    + LOG.debug("Error deserializing row from text: " + textData);
    + }
    + throw new IOException("Error deserializing row data", err);
    + }
    + }
    +
    + return hasNext;
    + }
    +
    + public Schema getSchema() {
    + return schema;
    + }
    +
    + protected SerDe initSerDe(Configuration conf) throws SerDeException {
    + Properties props = new Properties();
    + StringBuffer columnsBuffer = new StringBuffer();
    + StringBuffer typesBuffer = new StringBuffer();
    + boolean isFirst = true;
    + for (FieldDesc colDesc : schema.getColumns()) {
    + if (!isFirst) {
    + columnsBuffer.append(',');
    + typesBuffer.append(',');
    + }
    + columnsBuffer.append(colDesc.getName());
    + typesBuffer.append(colDesc.getTypeDesc().toString());
    + isFirst = false;
    + }
    + String columns = columnsBuffer.toString();
    + String types = typesBuffer.toString();
    + props.put(serdeConstants.LIST_COLUMNS, columns);
    + props.put(serdeConstants.LIST_COLUMN_TYPES, types);
    + SerDe serde = new LazySimpleSerDe();
    + serde.initialize(conf, props);
    +
    + return serde;
    + }
    +}

    http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/llap-client/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java
    ----------------------------------------------------------------------
    diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java b/llap-client/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java
    new file mode 100644
    index 0000000..83149ab
    --- /dev/null
    +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java
    @@ -0,0 +1,103 @@
    +package org.apache.hadoop.hive.llap;
    +
    +import java.io.DataInput;
    +import java.io.DataOutput;
    +import java.io.IOException;
    +
    +import org.apache.hadoop.io.DataInputBuffer;
    +import org.apache.hadoop.io.DataOutputBuffer;
    +import org.apache.hadoop.io.Text;
    +import org.apache.hadoop.io.Writable;
    +import org.apache.hadoop.security.token.Token;
    +import org.apache.hadoop.yarn.api.records.ApplicationId;
    +import org.apache.tez.common.security.JobTokenIdentifier;
    +import org.apache.tez.common.security.JobTokenSecretManager;
    +import org.apache.tez.runtime.api.impl.TaskSpec;
    +
    +public class SubmitWorkInfo implements Writable {
    +
    + private TaskSpec taskSpec;
    + private ApplicationId fakeAppId;
    + private long creationTime;
    +
    + // This is used to communicate over the LlapUmbilicalProtocol. Not related to tokens used to
    + // talk to LLAP daemons itself via the securit work.
    + private Token<JobTokenIdentifier> token;
    +
    + public SubmitWorkInfo(TaskSpec taskSpec, ApplicationId fakeAppId, long creationTime) {
    + this.taskSpec = taskSpec;
    + this.fakeAppId = fakeAppId;
    + this.token = createJobToken();
    + this.creationTime = creationTime;
    + }
    +
    + // Empty constructor for writable etc.
    + public SubmitWorkInfo() {
    + }
    +
    + public TaskSpec getTaskSpec() {
    + return taskSpec;
    + }
    +
    + public ApplicationId getFakeAppId() {
    + return fakeAppId;
    + }
    +
    + public String getTokenIdentifier() {
    + return fakeAppId.toString();
    + }
    +
    + public Token<JobTokenIdentifier> getToken() {
    + return token;
    + }
    +
    + public long getCreationTime() {
    + return creationTime;
    + }
    +
    + @Override
    + public void write(DataOutput out) throws IOException {
    + taskSpec.write(out);
    + out.writeLong(fakeAppId.getClusterTimestamp());
    + out.writeInt(fakeAppId.getId());
    + token.write(out);
    + out.writeLong(creationTime);
    + }
    +
    + @Override
    + public void readFields(DataInput in) throws IOException {
    + taskSpec = new TaskSpec();
    + taskSpec.readFields(in);
    + long appIdTs = in.readLong();
    + int appIdId = in.readInt();
    + fakeAppId = ApplicationId.newInstance(appIdTs, appIdId);
    + token = new Token<>();
    + token.readFields(in);
    + creationTime = in.readLong();
    + }
    +
    + public static byte[] toBytes(SubmitWorkInfo submitWorkInfo) throws IOException {
    + DataOutputBuffer dob = new DataOutputBuffer();
    + submitWorkInfo.write(dob);
    + return dob.getData();
    + }
    +
    + public static SubmitWorkInfo fromBytes(byte[] submitWorkInfoBytes) throws IOException {
    + DataInputBuffer dib = new DataInputBuffer();
    + dib.reset(submitWorkInfoBytes, 0, submitWorkInfoBytes.length);
    + SubmitWorkInfo submitWorkInfo = new SubmitWorkInfo();
    + submitWorkInfo.readFields(dib);
    + return submitWorkInfo;
    + }
    +
    +
    + private Token<JobTokenIdentifier> createJobToken() {
    + String tokenIdentifier = fakeAppId.toString();
    + JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(
    + tokenIdentifier));
    + Token<JobTokenIdentifier> sessionToken = new Token<JobTokenIdentifier>(identifier,
    + new JobTokenSecretManager());
    + sessionToken.setService(identifier.getJobId());
    + return sessionToken;
    + }
    +}

    http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
    ----------------------------------------------------------------------
    diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
    new file mode 100644
    index 0000000..7d06637
    --- /dev/null
    +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
    @@ -0,0 +1,415 @@
    +package org.apache.hadoop.hive.llap.ext;
    +
    +import java.io.IOException;
    +import java.net.InetSocketAddress;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.concurrent.atomic.AtomicLong;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.ConcurrentMap;
    +import java.util.concurrent.ScheduledThreadPoolExecutor;
    +import java.util.concurrent.TimeUnit;
    +
    +import com.google.common.base.Preconditions;
    +import org.apache.commons.collections4.ListUtils;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hive.conf.HiveConf;
    +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
    +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
    +import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
    +import org.apache.hadoop.hive.llap.tez.LlapProtocolClientProxy;
    +import org.apache.hadoop.hive.llap.tezplugins.helpers.LlapTaskUmbilicalServer;
    +import org.apache.hadoop.io.Text;
    +import org.apache.hadoop.ipc.ProtocolSignature;
    +import org.apache.hadoop.security.token.Token;
    +import org.apache.hadoop.service.AbstractService;
    +import org.apache.hadoop.yarn.api.records.ContainerId;
    +import org.apache.hadoop.yarn.util.ConverterUtils;
    +import org.apache.tez.common.security.JobTokenIdentifier;
    +import org.apache.tez.dag.api.TezException;
    +import org.apache.tez.dag.records.TezTaskAttemptID;
    +import org.apache.tez.runtime.api.Event;
    +import org.apache.tez.runtime.api.impl.EventType;
    +import org.apache.tez.runtime.api.impl.TezEvent;
    +import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
    +import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +
    +public class LlapTaskUmbilicalExternalClient extends AbstractService {
    +
    + private static final Logger LOG = LoggerFactory.getLogger(LlapTaskUmbilicalExternalClient.class);
    +
    + private final LlapProtocolClientProxy communicator;
    + private volatile LlapTaskUmbilicalServer llapTaskUmbilicalServer;
    + private final Configuration conf;
    + private final LlapTaskUmbilicalProtocol umbilical;
    +
    + protected final String tokenIdentifier;
    + protected final Token<JobTokenIdentifier> sessionToken;
    +
    + private final ConcurrentMap<String, PendingEventData> pendingEvents = new ConcurrentHashMap<>();
    + private final ConcurrentMap<String, TaskHeartbeatInfo> registeredTasks= new ConcurrentHashMap<String, TaskHeartbeatInfo>();
    + private LlapTaskUmbilicalExternalResponder responder = null;
    + private final ScheduledThreadPoolExecutor timer;
    + private final long connectionTimeout;
    +
    + private static class TaskHeartbeatInfo {
    + final String taskAttemptId;
    + final String hostname;
    + final int port;
    + final AtomicLong lastHeartbeat = new AtomicLong();
    +
    + public TaskHeartbeatInfo(String taskAttemptId, String hostname, int port) {
    + this.taskAttemptId = taskAttemptId;
    + this.hostname = hostname;
    + this.port = port;
    + this.lastHeartbeat.set(System.currentTimeMillis());
    + }
    + }
    +
    + private static class PendingEventData {
    + final TaskHeartbeatInfo heartbeatInfo;
    + final List<TezEvent> tezEvents;
    +
    + public PendingEventData(TaskHeartbeatInfo heartbeatInfo, List<TezEvent> tezEvents) {
    + this.heartbeatInfo = heartbeatInfo;
    + this.tezEvents = tezEvents;
    + }
    + }
    +
    + // TODO KKK Work out the details of the tokenIdentifier, and the session token.
    + // It may just be possible to create one here - since Shuffle is not involved, and this is only used
    + // for communication from LLAP-Daemons to the server. It will need to be sent in as part
    + // of the job submission request.
    + public LlapTaskUmbilicalExternalClient(Configuration conf, String tokenIdentifier,
    + Token<JobTokenIdentifier> sessionToken, LlapTaskUmbilicalExternalResponder responder) {
    + super(LlapTaskUmbilicalExternalClient.class.getName());
    + this.conf = conf;
    + this.umbilical = new LlapTaskUmbilicalExternalImpl();
    + this.tokenIdentifier = tokenIdentifier;
    + this.sessionToken = sessionToken;
    + this.responder = responder;
    + this.timer = new ScheduledThreadPoolExecutor(1);
    + this.connectionTimeout = HiveConf.getTimeVar(conf,
    + HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS);
    + // TODO. No support for the LLAP token yet. Add support for configurable threads, however 1 should always be enough.
    + this.communicator = new LlapProtocolClientProxy(1, conf, null);
    + this.communicator.init(conf);
    + }
    +
    + @Override
    + public void serviceStart() throws IOException {
    + int numHandlers = HiveConf.getIntVar(conf,
    + HiveConf.ConfVars.LLAP_TMP_EXT_CLIENT_NUM_SERVER_HANDLERS);
    + llapTaskUmbilicalServer = new LlapTaskUmbilicalServer(conf, umbilical, numHandlers, tokenIdentifier, sessionToken);
    + communicator.start();
    + }
    +
    + @Override
    + public void serviceStop() {
    + llapTaskUmbilicalServer.shutdownServer();
    + timer.shutdown();
    + if (this.communicator != null) {
    + this.communicator.stop();
    + }
    + }
    +
    + public InetSocketAddress getAddress() {
    + return llapTaskUmbilicalServer.getAddress();
    + }
    +
    +
    + /**
    + * Submit the work for actual execution. This should always have the usingTezAm flag disabled
    + * @param submitWorkRequestProto
    + */
    + public void submitWork(final SubmitWorkRequestProto submitWorkRequestProto, String llapHost, int llapPort, List<TezEvent> tezEvents) {
    + Preconditions.checkArgument(submitWorkRequestProto.getUsingTezAm() == false);
    +
    + // Register the pending events to be sent for this spec.
    + String fragmentId = submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString();
    + PendingEventData pendingEventData = new PendingEventData(
    + new TaskHeartbeatInfo(fragmentId, llapHost, llapPort),
    + tezEvents);
    + pendingEvents.putIfAbsent(fragmentId, pendingEventData);
    +
    + // Setup timer task to check for hearbeat timeouts
    + timer.scheduleAtFixedRate(new HeartbeatCheckTask(),
    + connectionTimeout, connectionTimeout, TimeUnit.MILLISECONDS);
    +
    + // Send out the actual SubmitWorkRequest
    + communicator.sendSubmitWork(submitWorkRequestProto, llapHost, llapPort,
    + new LlapProtocolClientProxy.ExecuteRequestCallback<LlapDaemonProtocolProtos.SubmitWorkResponseProto>() {
    +
    + @Override
    + public void setResponse(LlapDaemonProtocolProtos.SubmitWorkResponseProto response) {
    + if (response.hasSubmissionState()) {
    + if (response.getSubmissionState().equals(LlapDaemonProtocolProtos.SubmissionStateProto.REJECTED)) {
    + String msg = "Fragment: " + submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString() + " rejected. Server Busy.";
    + LOG.info(msg);
    + if (responder != null) {
    + Throwable err = new RuntimeException(msg);
    + responder.submissionFailed(submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString(), err);
    + }
    + return;
    + }
    + }
    + }
    +
    + @Override
    + public void indicateError(Throwable t) {
    + String msg = "Failed to submit: " + submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString();
    + LOG.error(msg, t);
    + Throwable err = new RuntimeException(msg, t);
    + responder.submissionFailed(submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString(), err);
    + }
    + });
    +
    +
    +
    +
    +
    +// // TODO Also send out information saying that the fragment is finishable - if that is not already included in the main fragment.
    +// // This entire call is only required if we're doing more than scans. MRInput has no dependencies and is always finishable
    +// QueryIdentifierProto queryIdentifier = QueryIdentifierProto
    +// .newBuilder()
    +// .setAppIdentifier(submitWorkRequestProto.getApplicationIdString()).setDagIdentifier(submitWorkRequestProto.getFragmentSpec().getDagId())
    +// .build();
    +// LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto sourceStateUpdatedRequest =
    +// LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto.newBuilder().setQueryIdentifier(queryIdentifier).setState(
    +// LlapDaemonProtocolProtos.SourceStateProto.S_SUCCEEDED).
    +// setSrcName(TODO)
    +// communicator.sendSourceStateUpdate(LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto.newBuilder().setQueryIdentifier(submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString()).set);
    +
    +
    + }
    +
    + private void updateHeartbeatInfo(String taskAttemptId) {
    + int updateCount = 0;
    +
    + PendingEventData pendingEventData = pendingEvents.get(taskAttemptId);
    + if (pendingEventData != null) {
    + pendingEventData.heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis());
    + updateCount++;
    + }
    +
    + TaskHeartbeatInfo heartbeatInfo = registeredTasks.get(taskAttemptId);
    + if (heartbeatInfo != null) {
    + heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis());
    + updateCount++;
    + }
    +
    + if (updateCount == 0) {
    + LOG.warn("No tasks found for heartbeat from taskAttemptId " + taskAttemptId);
    + }
    + }
    +
    + private void updateHeartbeatInfo(String hostname, int port) {
    + int updateCount = 0;
    +
    + for (String key : pendingEvents.keySet()) {
    + PendingEventData pendingEventData = pendingEvents.get(key);
    + if (pendingEventData != null) {
    + if (pendingEventData.heartbeatInfo.hostname.equals(hostname)
    + && pendingEventData.heartbeatInfo.port == port) {
    + pendingEventData.heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis());
    + updateCount++;
    + }
    + }
    + }
    +
    + for (String key : registeredTasks.keySet()) {
    + TaskHeartbeatInfo heartbeatInfo = registeredTasks.get(key);
    + if (heartbeatInfo != null) {
    + if (heartbeatInfo.hostname.equals(hostname)
    + && heartbeatInfo.port == port) {
    + heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis());
    + updateCount++;
    + }
    + }
    + }
    +
    + if (updateCount == 0) {
    + LOG.info("No tasks found for heartbeat from hostname " + hostname + ", port " + port);
    + }
    + }
    +
    + private class HeartbeatCheckTask implements Runnable {
    + public void run() {
    + long currentTime = System.currentTimeMillis();
    + List<String> timedOutTasks = new ArrayList<String>();
    +
    + // Check both pending and registered tasks for timeouts
    + for (String key : pendingEvents.keySet()) {
    + PendingEventData pendingEventData = pendingEvents.get(key);
    + if (pendingEventData != null) {
    + if (currentTime - pendingEventData.heartbeatInfo.lastHeartbeat.get() >= connectionTimeout) {
    + timedOutTasks.add(key);
    + }
    + }
    + }
    + for (String timedOutTask : timedOutTasks) {
    + LOG.info("Pending taskAttemptId " + timedOutTask + " timed out");
    + responder.heartbeatTimeout(timedOutTask);
    + pendingEvents.remove(timedOutTask);
    + // TODO: Do we need to tell the LLAP daemon we are no longer interested in this task?
    + }
    +
    + timedOutTasks.clear();
    + for (String key : registeredTasks.keySet()) {
    + TaskHeartbeatInfo heartbeatInfo = registeredTasks.get(key);
    + if (heartbeatInfo != null) {
    + if (currentTime - heartbeatInfo.lastHeartbeat.get() >= connectionTimeout) {
    + timedOutTasks.add(key);
    + }
    + }
    + }
    + for (String timedOutTask : timedOutTasks) {
    + LOG.info("Running taskAttemptId " + timedOutTask + " timed out");
    + responder.heartbeatTimeout(timedOutTask);
    + registeredTasks.remove(timedOutTask);
    + // TODO: Do we need to tell the LLAP daemon we are no longer interested in this task?
    + }
    + }
    + }
    +
    + public interface LlapTaskUmbilicalExternalResponder {
    + void submissionFailed(String fragmentId, Throwable throwable);
    + void heartbeat(TezHeartbeatRequest request);
    + void taskKilled(TezTaskAttemptID taskAttemptId);
    + void heartbeatTimeout(String fragmentId);
    + }
    +
    +
    +
    + // TODO Ideally, the server should be shared across all client sessions running on the same node.
    + private class LlapTaskUmbilicalExternalImpl implements LlapTaskUmbilicalProtocol {
    +
    + @Override
    + public boolean canCommit(TezTaskAttemptID taskid) throws IOException {
    + // Expecting only a single instance of a task to be running.
    + return true;
    + }
    +
    + @Override
    + public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOException,
    + TezException {
    + // Keep-alive information. The client should be informed and will have to take care of re-submitting the work.
    + // Some parts of fault tolerance go here.
    +
    + // This also provides completion information, and a possible notification when task actually starts running (first heartbeat)
    +
    + if (LOG.isDebugEnabled()) {
    + LOG.debug("Received heartbeat from container, request=" + request);
    + }
    +
    + // Incoming events can be ignored until the point when shuffle needs to be handled, instead of just scans.
    + TezHeartbeatResponse response = new TezHeartbeatResponse();
    +
    + response.setLastRequestId(request.getRequestId());
    + // Assuming TaskAttemptId and FragmentIdentifierString are the same. Verify this.
    + TezTaskAttemptID taskAttemptId = request.getCurrentTaskAttemptID();
    + String taskAttemptIdString = taskAttemptId.toString();
    +
    + updateHeartbeatInfo(taskAttemptIdString);
    +
    + List<TezEvent> tezEvents = null;
    + PendingEventData pendingEventData = pendingEvents.remove(taskAttemptIdString);
    + if (pendingEventData == null) {
    + tezEvents = Collections.emptyList();
    +
    + // If this heartbeat was not from a pending event and it's not in our list of registered tasks,
    + if (!registeredTasks.containsKey(taskAttemptIdString)) {
    + LOG.info("Unexpected heartbeat from " + taskAttemptIdString);
    + response.setShouldDie(); // Do any of the other fields need to be set?
    + return response;
    + }
    + } else {
    + tezEvents = pendingEventData.tezEvents;
    + // Tasks removed from the pending list should then be added to the registered list.
    + registeredTasks.put(taskAttemptIdString, pendingEventData.heartbeatInfo);
    + }
    +
    + response.setLastRequestId(request.getRequestId());
    + // Irrelevant from eventIds. This can be tracked in the AM itself, instead of polluting the task.
    + // Also since we have all the MRInput events here - they'll all be sent in together.
    + response.setNextFromEventId(0); // Irrelevant. See comment above.
    + response.setNextPreRoutedEventId(0); //Irrelevant. See comment above.
    + response.setEvents(tezEvents);
    +
    + List<TezEvent> inEvents = request.getEvents();
    + if (LOG.isDebugEnabled()) {
    + LOG.debug("Heartbeat from " + taskAttemptIdString +
    + " events: " + (inEvents != null ? inEvents.size() : -1));
    + }
    + for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) {
    + EventType eventType = tezEvent.getEventType();
    + switch (eventType) {
    + case TASK_ATTEMPT_COMPLETED_EVENT:
    + LOG.debug("Task completed event for " + taskAttemptIdString);
    + registeredTasks.remove(taskAttemptIdString);
    + break;
    + case TASK_ATTEMPT_FAILED_EVENT:
    + LOG.debug("Task failed event for " + taskAttemptIdString);
    + registeredTasks.remove(taskAttemptIdString);
    + break;
    + case TASK_STATUS_UPDATE_EVENT:
    + // If we want to handle counters
    + LOG.debug("Task update event for " + taskAttemptIdString);
    + break;
    + default:
    + LOG.warn("Unhandled event type " + eventType);
    + break;
    + }
    + }
    +
    + // Pass the request on to the responder
    + try {
    + if (responder != null) {
    + responder.heartbeat(request);
    + }
    + } catch (Exception err) {
    + LOG.error("Error during responder execution", err);
    + }
    +
    + return response;
    + }
    +
    + @Override
    + public void nodeHeartbeat(Text hostname, int port) throws IOException {
    + updateHeartbeatInfo(hostname.toString(), port);
    + // No need to propagate to this to the responder
    + }
    +
    + @Override
    + public void taskKilled(TezTaskAttemptID taskAttemptId) throws IOException {
    + String taskAttemptIdString = taskAttemptId.toString();
    + LOG.error("Task killed - " + taskAttemptIdString);
    + registeredTasks.remove(taskAttemptIdString);
    +
    + try {
    + if (responder != null) {
    + responder.taskKilled(taskAttemptId);
    + }
    + } catch (Exception err) {
    + LOG.error("Error during responder execution", err);
    + }
    + }
    +
    + @Override
    + public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
    + return 0;
    + }
    +
    + @Override
    + public ProtocolSignature getProtocolSignature(String protocol, long clientVersion,
    + int clientMethodsHash) throws IOException {
    + return ProtocolSignature.getProtocolSignature(this, protocol,
    + clientVersion, clientMethodsHash);
    + }
    + }
    +
    +}

    http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java
    ----------------------------------------------------------------------
    diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java b/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java
    new file mode 100644
    index 0000000..dbd591a
    --- /dev/null
    +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java
    @@ -0,0 +1,57 @@
    +package org.apache.hadoop.hive.llap.tezplugins.helpers;
    +
    +import java.io.IOException;
    +import java.net.InetSocketAddress;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
    +import org.apache.hadoop.ipc.RPC;
    +import org.apache.hadoop.ipc.Server;
    +import org.apache.hadoop.net.NetUtils;
    +import org.apache.hadoop.security.token.Token;
    +import org.apache.tez.common.security.JobTokenIdentifier;
    +import org.apache.tez.common.security.JobTokenSecretManager;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class LlapTaskUmbilicalServer {
    +
    + private static final Logger LOG = LoggerFactory.getLogger(LlapTaskUmbilicalServer.class);
    +
    + protected volatile Server server;
    + private final InetSocketAddress address;
    + private final AtomicBoolean started = new AtomicBoolean(true);
    +
    + public LlapTaskUmbilicalServer(Configuration conf, LlapTaskUmbilicalProtocol umbilical, int numHandlers, String tokenIdentifier, Token<JobTokenIdentifier> token) throws
    + IOException {
    + JobTokenSecretManager jobTokenSecretManager =
    + new JobTokenSecretManager();
    + jobTokenSecretManager.addTokenForJob(tokenIdentifier, token);
    +
    + server = new RPC.Builder(conf)
    + .setProtocol(LlapTaskUmbilicalProtocol.class)
    + .setBindAddress("0.0.0.0")
    + .setPort(0)
    + .setInstance(umbilical)
    + .setNumHandlers(numHandlers)
    + .setSecretManager(jobTokenSecretManager).build();
    +
    + server.start();
    + this.address = NetUtils.getConnectAddress(server);
    + LOG.info(
    + "Started TaskUmbilicalServer: " + umbilical.getClass().getName() + " at address: " + address +
    + " with numHandlers=" + numHandlers);
    + }
    +
    + public InetSocketAddress getAddress() {
    + return this.address;
    + }
    +
    + public void shutdownServer() {
    + if (started.get()) { // Primarily to avoid multiple shutdowns.
    + started.set(false);
    + server.stop();
    + }
    + }
    +}

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommits @
categorieshive, hadoop
postedApr 15, '16 at 11:45p
activeApr 15, '16 at 11:45p
posts2
users1
websitehive.apache.org

1 user in discussion

Jdere: 2 posts

People

Translate

site design / logo © 2021 Grokbase