FAQ
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java?rev=1452992&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java Tue Mar 5 20:44:50 2013
@@ -0,0 +1,395 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
+import org.apache.hadoop.io.Writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+final class OrcStruct implements Writable {
+
+ private final Object[] fields;
+
+ OrcStruct(int children) {
+ fields = new Object[children];
+ }
+
+ Object getFieldValue(int fieldIndex) {
+ return fields[fieldIndex];
+ }
+
+ void setFieldValue(int fieldIndex, Object value) {
+ fields[fieldIndex] = value;
+ }
+
+ @Override
+ public void write(DataOutput dataOutput) throws IOException {
+ throw new UnsupportedOperationException("write unsupported");
+ }
+
+ @Override
+ public void readFields(DataInput dataInput) throws IOException {
+ throw new UnsupportedOperationException("readFields unsupported");
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null || other.getClass() != OrcStruct.class) {
+ return false;
+ } else {
+ OrcStruct oth = (OrcStruct) other;
+ if (fields.length != oth.fields.length) {
+ return false;
+ }
+ for(int i=0; i < fields.length; ++i) {
+ if (fields[i] == null) {
+ if (oth.fields[i] != null) {
+ return false;
+ }
+ } else {
+ if (!fields[i].equals(oth.fields[i])) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ int result = fields.length;
+ for(Object field: fields) {
+ if (field != null) {
+ result ^= field.hashCode();
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder buffer = new StringBuilder();
+ buffer.append("{");
+ for(int i=0; i < fields.length; ++i) {
+ if (i != 0) {
+ buffer.append(", ");
+ }
+ buffer.append(fields[i]);
+ }
+ buffer.append("}");
+ return buffer.toString();
+ }
+
+ static class Field implements StructField {
+ private final String name;
+ private final ObjectInspector inspector;
+ private final int offset;
+
+ Field(String name, ObjectInspector inspector, int offset) {
+ this.name = name;
+ this.inspector = inspector;
+ this.offset = offset;
+ }
+
+ @Override
+ public String getFieldName() {
+ return name;
+ }
+
+ @Override
+ public ObjectInspector getFieldObjectInspector() {
+ return inspector;
+ }
+
+ @Override
+ public String getFieldComment() {
+ return null;
+ }
+ }
+
+ static class OrcStructInspector extends StructObjectInspector {
+ private final List<StructField> fields;
+
+ OrcStructInspector(StructTypeInfo info) {
+ ArrayList<String> fieldNames = info.getAllStructFieldNames();
+ ArrayList<TypeInfo> fieldTypes = info.getAllStructFieldTypeInfos();
+ fields = new ArrayList<StructField>(fieldNames.size());
+ for(int i=0; i < fieldNames.size(); ++i) {
+ fields.add(new Field(fieldNames.get(i),
+ createObjectInspector(fieldTypes.get(i)), i));
+ }
+ }
+
+ OrcStructInspector(int columnId, List<OrcProto.Type> types) {
+ OrcProto.Type type = types.get(columnId);
+ int fieldCount = type.getSubtypesCount();
+ fields = new ArrayList<StructField>(fieldCount);
+ for(int i=0; i < fieldCount; ++i) {
+ int fieldType = type.getSubtypes(i);
+ fields.add(new Field(type.getFieldNames(i),
+ createObjectInspector(fieldType, types), i));
+ }
+ }
+
+ @Override
+ public List<StructField> getAllStructFieldRefs() {
+ return fields;
+ }
+
+ @Override
+ public StructField getStructFieldRef(String s) {
+ for(StructField field: fields) {
+ if (field.getFieldName().equals(s)) {
+ return field;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public Object getStructFieldData(Object object, StructField field) {
+ return ((OrcStruct) object).fields[((Field) field).offset];
+ }
+
+ @Override
+ public List<Object> getStructFieldsDataAsList(Object object) {
+ OrcStruct struct = (OrcStruct) object;
+ List<Object> result = new ArrayList<Object>(struct.fields.length);
+ for (Object child: struct.fields) {
+ result.add(child);
+ }
+ return result;
+ }
+
+ @Override
+ public String getTypeName() {
+ StringBuilder buffer = new StringBuilder();
+ buffer.append("struct<");
+ for(int i=0; i < fields.size(); ++i) {
+ StructField field = fields.get(i);
+ if (i != 0) {
+ buffer.append(",");
+ }
+ buffer.append(field.getFieldName());
+ buffer.append(":");
+ buffer.append(field.getFieldObjectInspector().getTypeName());
+ }
+ buffer.append(">");
+ return buffer.toString();
+ }
+
+ @Override
+ public Category getCategory() {
+ return Category.STRUCT;
+ }
+ }
+
+ static class OrcMapObjectInspector implements MapObjectInspector {
+ private final ObjectInspector key;
+ private final ObjectInspector value;
+
+ OrcMapObjectInspector(MapTypeInfo info) {
+ key = createObjectInspector(info.getMapKeyTypeInfo());
+ value = createObjectInspector(info.getMapValueTypeInfo());
+ }
+
+ OrcMapObjectInspector(int columnId, List<OrcProto.Type> types) {
+ OrcProto.Type type = types.get(columnId);
+ key = createObjectInspector(type.getSubtypes(0), types);
+ value = createObjectInspector(type.getSubtypes(1), types);
+ }
+
+ @Override
+ public ObjectInspector getMapKeyObjectInspector() {
+ return key;
+ }
+
+ @Override
+ public ObjectInspector getMapValueObjectInspector() {
+ return value;
+ }
+
+ @Override
+ public Object getMapValueElement(Object map, Object key) {
+ return ((Map) map).get(key);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public Map<Object, Object> getMap(Object map) {
+ return (Map) map;
+ }
+
+ @Override
+ public int getMapSize(Object map) {
+ return ((Map) map).size();
+ }
+
+ @Override
+ public String getTypeName() {
+ return "map<" + key.getTypeName() + "," + value.getTypeName() + ">";
+ }
+
+ @Override
+ public Category getCategory() {
+ return Category.MAP;
+ }
+ }
+
+ static class OrcListObjectInspector implements ListObjectInspector {
+ private final ObjectInspector child;
+
+ OrcListObjectInspector(ListTypeInfo info) {
+ child = createObjectInspector(info.getListElementTypeInfo());
+ }
+
+ OrcListObjectInspector(int columnId, List<OrcProto.Type> types) {
+ OrcProto.Type type = types.get(columnId);
+ child = createObjectInspector(type.getSubtypes(0), types);
+ }
+
+ @Override
+ public ObjectInspector getListElementObjectInspector() {
+ return child;
+ }
+
+ @Override
+ public Object getListElement(Object list, int i) {
+ return ((List) list).get(i);
+ }
+
+ @Override
+ public int getListLength(Object list) {
+ return ((List) list).size();
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public List<?> getList(Object list) {
+ return (List) list;
+ }
+
+ @Override
+ public String getTypeName() {
+ return "array<" + child.getTypeName() + ">";
+ }
+
+ @Override
+ public Category getCategory() {
+ return Category.LIST;
+ }
+ }
+
+ static ObjectInspector createObjectInspector(TypeInfo info) {
+ switch (info.getCategory()) {
+ case PRIMITIVE:
+ switch (((PrimitiveTypeInfo) info).getPrimitiveCategory()) {
+ case FLOAT:
+ return PrimitiveObjectInspectorFactory.writableFloatObjectInspector;
+ case DOUBLE:
+ return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;
+ case BOOLEAN:
+ return PrimitiveObjectInspectorFactory.writableBooleanObjectInspector;
+ case BYTE:
+ return PrimitiveObjectInspectorFactory.writableByteObjectInspector;
+ case SHORT:
+ return PrimitiveObjectInspectorFactory.writableShortObjectInspector;
+ case INT:
+ return PrimitiveObjectInspectorFactory.writableIntObjectInspector;
+ case LONG:
+ return PrimitiveObjectInspectorFactory.writableLongObjectInspector;
+ case BINARY:
+ return PrimitiveObjectInspectorFactory.writableBinaryObjectInspector;
+ case STRING:
+ return PrimitiveObjectInspectorFactory.writableStringObjectInspector;
+ case TIMESTAMP:
+ return PrimitiveObjectInspectorFactory.javaTimestampObjectInspector;
+ default:
+ throw new IllegalArgumentException("Unknown primitive type " +
+ ((PrimitiveTypeInfo) info).getPrimitiveCategory());
+ }
+ case STRUCT:
+ return new OrcStructInspector((StructTypeInfo) info);
+ case UNION:
+ return new OrcUnion.OrcUnionObjectInspector((UnionTypeInfo) info);
+ case MAP:
+ return new OrcMapObjectInspector((MapTypeInfo) info);
+ case LIST:
+ return new OrcListObjectInspector((ListTypeInfo) info);
+ default:
+ throw new IllegalArgumentException("Unknown type " +
+ info.getCategory());
+ }
+ }
+
+ static ObjectInspector createObjectInspector(int columnId,
+ List<OrcProto.Type> types){
+ OrcProto.Type type = types.get(columnId);
+ switch (type.getKind()) {
+ case FLOAT:
+ return PrimitiveObjectInspectorFactory.writableFloatObjectInspector;
+ case DOUBLE:
+ return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;
+ case BOOLEAN:
+ return PrimitiveObjectInspectorFactory.writableBooleanObjectInspector;
+ case BYTE:
+ return PrimitiveObjectInspectorFactory.writableByteObjectInspector;
+ case SHORT:
+ return PrimitiveObjectInspectorFactory.writableShortObjectInspector;
+ case INT:
+ return PrimitiveObjectInspectorFactory.writableIntObjectInspector;
+ case LONG:
+ return PrimitiveObjectInspectorFactory.writableLongObjectInspector;
+ case BINARY:
+ return PrimitiveObjectInspectorFactory.writableBinaryObjectInspector;
+ case STRING:
+ return PrimitiveObjectInspectorFactory.writableStringObjectInspector;
+ case TIMESTAMP:
+ return PrimitiveObjectInspectorFactory.javaTimestampObjectInspector;
+ case STRUCT:
+ return new OrcStructInspector(columnId, types);
+ case UNION:
+ return new OrcUnion.OrcUnionObjectInspector(columnId, types);
+ case MAP:
+ return new OrcMapObjectInspector(columnId, types);
+ case LIST:
+ return new OrcListObjectInspector(columnId, types);
+ default:
+ throw new UnsupportedOperationException("Unknown type " +
+ type.getKind());
+ }
+ }
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUnion.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUnion.java?rev=1452992&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUnion.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUnion.java Tue Mar 5 20:44:50 2013
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.UnionObject;
+import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * An in-memory representation of a union type.
+ */
+final class OrcUnion implements UnionObject {
+ private byte tag;
+ private Object object;
+
+ void set(byte tag, Object object) {
+ this.tag = tag;
+ this.object = object;
+ }
+
+ @Override
+ public byte getTag() {
+ return tag;
+ }
+
+ @Override
+ public Object getObject() {
+ return object;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null || other.getClass() != OrcUnion.class) {
+ return false;
+ }
+ OrcUnion oth = (OrcUnion) other;
+ if (tag != oth.tag) {
+ return false;
+ } else if (object == null) {
+ return oth.object == null;
+ } else {
+ return object.equals(oth.object);
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ int result = tag;
+ if (object != null) {
+ result ^= object.hashCode();
+ }
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "union(" + Integer.toString(tag & 0xff) + ", " + object.toString() +
+ ")";
+ }
+
+ static class OrcUnionObjectInspector implements UnionObjectInspector {
+ private final List<ObjectInspector> children;
+
+ OrcUnionObjectInspector(int columnId,
+ List<OrcProto.Type> types) {
+ OrcProto.Type type = types.get(columnId);
+ children = new ArrayList<ObjectInspector>(type.getSubtypesCount());
+ for(int i=0; i < type.getSubtypesCount(); ++i) {
+ children.add(OrcStruct.createObjectInspector(type.getSubtypes(i),
+ types));
+ }
+ }
+
+ OrcUnionObjectInspector(UnionTypeInfo info) {
+ List<TypeInfo> unionChildren = info.getAllUnionObjectTypeInfos();
+ this.children = new ArrayList<ObjectInspector>(unionChildren.size());
+ for(TypeInfo child: info.getAllUnionObjectTypeInfos()) {
+ this.children.add(OrcStruct.createObjectInspector(child));
+ }
+ }
+
+ @Override
+ public List<ObjectInspector> getObjectInspectors() {
+ return children;
+ }
+
+ @Override
+ public byte getTag(Object obj) {
+ return ((OrcUnion) obj).tag;
+ }
+
+ @Override
+ public Object getField(Object obj) {
+ return ((OrcUnion) obj).object;
+ }
+
+ @Override
+ public String getTypeName() {
+ StringBuilder builder = new StringBuilder("union{");
+ boolean first = true;
+ for(ObjectInspector child: children) {
+ if (first) {
+ first = false;
+ } else {
+ builder.append(", ");
+ }
+ builder.append(child.getTypeName());
+ }
+ builder.append("}");
+ return builder.toString();
+ }
+
+ @Override
+ public Category getCategory() {
+ return Category.UNION;
+ }
+ }
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OutStream.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OutStream.java?rev=1452992&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OutStream.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OutStream.java Tue Mar 5 20:44:50 2013
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+class OutStream extends PositionedOutputStream {
+
+ interface OutputReceiver {
+ void output(ByteBuffer buffer) throws IOException;
+ }
+
+ static final int HEADER_SIZE = 3;
+ private final String name;
+ private final OutputReceiver receiver;
+ private ByteBuffer compressed = null;
+ private ByteBuffer overflow = null;
+ private ByteBuffer current;
+ private final int bufferSize;
+ private final CompressionCodec codec;
+ private long compressedBytes = 0;
+ private long uncompressedBytes = 0;
+
+ OutStream(String name,
+ int bufferSize,
+ CompressionCodec codec,
+ OutputReceiver receiver) throws IOException {
+ this.name = name;
+ this.bufferSize = bufferSize;
+ this.codec = codec;
+ this.receiver = receiver;
+ getNewInputBuffer();
+ }
+
+ public void clear() throws IOException {
+ current.position(codec == null ? 0 : HEADER_SIZE);
+ if (compressed != null) {
+ compressed.clear();
+ }
+ if (overflow != null) {
+ overflow.clear();
+ }
+ }
+
+ /**
+ * Write the length of the compressed bytes. Life is much easier if the
+ * header is constant length, so just use 3 bytes. Considering most of the
+ * codecs want between 32k (snappy) and 256k (lzo, zlib), 3 bytes should
+ * be plenty. We also use the low bit for whether it is the original or
+ * compressed bytes.
+ * @param buffer the buffer to write the header to
+ * @param position the position in the buffer to write at
+ * @param val the size in the file
+ * @param original is it uncompressed
+ */
+ private static void writeHeader(ByteBuffer buffer,
+ int position,
+ int val,
+ boolean original) {
+ buffer.put(position, (byte) ((val << 1) + (original ? 1 : 0)));
+ buffer.put(position + 1, (byte) (val >> 7));
+ buffer.put(position + 2, (byte) (val >> 15));
+ }
+
+ private void getNewInputBuffer() throws IOException {
+ if (codec == null) {
+ current = ByteBuffer.allocate(bufferSize);
+ } else {
+ current = ByteBuffer.allocate(bufferSize + HEADER_SIZE);
+ writeHeader(current, 0, bufferSize, true);
+ current.position(HEADER_SIZE);
+ }
+ }
+
+ private ByteBuffer getNewOutputBuffer() throws IOException {
+ return ByteBuffer.allocate(bufferSize +
+ (codec == null ? 0 : HEADER_SIZE));
+ }
+
+ private void flip() throws IOException {
+ current.limit(current.position());
+ current.position(codec == null ? 0 : HEADER_SIZE);
+ }
+
+ @Override
+ public void write(int i) throws IOException {
+ if (current.remaining() < 1) {
+ spill();
+ }
+ uncompressedBytes += 1;
+ current.put((byte) i);
+ }
+
+ @Override
+ public void write(byte[] bytes, int offset, int length) throws IOException {
+ int remaining = Math.min(current.remaining(), length);
+ current.put(bytes, offset, remaining);
+ uncompressedBytes += remaining;
+ length -= remaining;
+ while (length != 0) {
+ spill();
+ offset += remaining;
+ remaining = Math.min(current.remaining(), length);
+ current.put(bytes, offset, remaining);
+ uncompressedBytes += remaining;
+ length -= remaining;
+ }
+ }
+
+ private void spill() throws java.io.IOException {
+ // if there isn't anything in the current buffer, don't spill
+ if (current.position() == (codec == null ? 0 : HEADER_SIZE)) {
+ return;
+ }
+ flip();
+ if (codec == null) {
+ receiver.output(current);
+ getNewInputBuffer();
+ } else {
+ if (compressed == null) {
+ compressed = getNewOutputBuffer();
+ } else if (overflow == null) {
+ overflow = getNewOutputBuffer();
+ }
+ int sizePosn = compressed.position();
+ compressed.position(compressed.position() + HEADER_SIZE);
+ if (codec.compress(current, compressed, overflow)) {
+ uncompressedBytes = 0;
+ // move position back to after the header
+ current.position(HEADER_SIZE);
+ current.limit(current.capacity());
+ // find the total bytes in the chunk
+ int totalBytes = compressed.position() - sizePosn - HEADER_SIZE;
+ if (overflow != null) {
+ totalBytes += overflow.position();
+ }
+ compressedBytes += totalBytes + HEADER_SIZE;
+ writeHeader(compressed, sizePosn, totalBytes, false);
+ // if we have less than the next header left, spill it.
+ if (compressed.remaining() < HEADER_SIZE) {
+ compressed.flip();
+ receiver.output(compressed);
+ compressed = overflow;
+ overflow = null;
+ }
+ } else {
+ compressedBytes += uncompressedBytes + HEADER_SIZE;
+ uncompressedBytes = 0;
+ // we are using the original, but need to spill the current
+ // compressed buffer first. So back up to where we started,
+ // flip it and add it to done.
+ if (sizePosn != 0) {
+ compressed.position(sizePosn);
+ compressed.flip();
+ receiver.output(compressed);
+ compressed = null;
+ // if we have an overflow, clear it and make it the new compress
+ // buffer
+ if (overflow != null) {
+ overflow.clear();
+ compressed = overflow;
+ overflow = null;
+ }
+ } else {
+ compressed.clear();
+ if (overflow != null) {
+ overflow.clear();
+ }
+ }
+
+ // now add the current buffer into the done list and get a new one.
+ current.position(0);
+ // update the header with the current length
+ writeHeader(current, 0, current.limit() - HEADER_SIZE, true);
+ receiver.output(current);
+ getNewInputBuffer();
+ }
+ }
+ }
+
+ void getPosition(PositionRecorder recorder) throws IOException {
+ if (codec == null) {
+ recorder.addPosition(uncompressedBytes);
+ } else {
+ recorder.addPosition(compressedBytes);
+ recorder.addPosition(uncompressedBytes);
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ spill();
+ if (compressed != null && compressed.position() != 0) {
+ compressed.flip();
+ receiver.output(compressed);
+ compressed = null;
+ }
+ uncompressedBytes = 0;
+ compressedBytes = 0;
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+
+ @Override
+ public long getSize() {
+ return uncompressedBytes + compressedBytes;
+ }
+}
+

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/PositionProvider.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/PositionProvider.java?rev=1452992&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/PositionProvider.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/PositionProvider.java Tue Mar 5 20:44:50 2013
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+/**
+ * An interface used for seeking to a row index.
+ */
+interface PositionProvider {
+ long getNext();
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/PositionRecorder.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/PositionRecorder.java?rev=1452992&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/PositionRecorder.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/PositionRecorder.java Tue Mar 5 20:44:50 2013
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+/**
+ * An interface for recording positions in a stream.
+ */
+interface PositionRecorder {
+ void addPosition(long offset);
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/PositionedOutputStream.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/PositionedOutputStream.java?rev=1452992&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/PositionedOutputStream.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/PositionedOutputStream.java Tue Mar 5 20:44:50 2013
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+abstract class PositionedOutputStream extends OutputStream {
+ abstract void getPosition(PositionRecorder recorder) throws IOException;
+ abstract long getSize();
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java?rev=1452992&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java Tue Mar 5 20:44:50 2013
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * The interface for reading ORC files.
+ *
+ * One Reader can support multiple concurrent RecordReader.
+ */
+public interface Reader {
+
+ /**
+ * Get the number of rows in the file.
+ * @return the number of rows
+ */
+ long getNumberOfRows();
+
+ /**
+ * Get the user metadata keys.
+ * @return the set of metadata keys
+ */
+ Iterable<String> getMetadataKeys();
+
+ /**
+ * Get a user metadata value.
+ * @param key a key given by the user
+ * @return the bytes associated with the given key
+ */
+ ByteBuffer getMetadataValue(String key);
+
+ /**
+ * Get the compression kind.
+ * @return the kind of compression in the file
+ */
+ CompressionKind getCompression();
+
+ /**
+ * Get the buffer size for the compression.
+ * @return number of bytes to buffer for the compression codec.
+ */
+ int getCompressionSize();
+
+ /**
+ * Get the number of rows per a entry in the row index.
+ * @return the number of rows per an entry in the row index or 0 if there
+ * is no row index.
+ */
+ int getRowIndexStride();
+
+ /**
+ * Get the list of stripes.
+ * @return the information about the stripes in order
+ */
+ Iterable<StripeInformation> getStripes();
+
+ /**
+ * Get the object inspector for looking at the objects.
+ * @return an object inspector for each row returned
+ */
+ ObjectInspector getObjectInspector();
+
+ /**
+ * Get the length of the file.
+ * @return the number of bytes in the file
+ */
+ long getContentLength();
+
+ /**
+ * Get the statistics about the columns in the file.
+ * @return the information about the column
+ */
+ ColumnStatistics[] getStatistics();
+
+ /**
+ * Get the list of types contained in the file. The root type is the first
+ * type in the list.
+ * @return the list of flattened types
+ */
+ List<OrcProto.Type> getTypes();
+
+ /**
+ * Create a RecordReader that will scan the entire file.
+ * @param include true for each column that should be included
+ * @return A new RecordReader
+ * @throws IOException
+ */
+ RecordReader rows(boolean[] include) throws IOException;
+
+ /**
+ * Create a RecordReader that will start reading at the first stripe after
+ * offset up to the stripe that starts at offset + length. This is intended
+ * to work with MapReduce's FileInputFormat where divisions are picked
+ * blindly, but they must cover all of the rows.
+ * @param offset a byte offset in the file
+ * @param length a number of bytes in the file
+ * @param include true for each column that should be included
+ * @return a new RecordReader that will read the specified rows.
+ * @throws IOException
+ */
+ RecordReader rows(long offset, long length,
+ boolean[] include) throws IOException;
+
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java?rev=1452992&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java Tue Mar 5 20:44:50 2013
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import com.google.protobuf.CodedInputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+final class ReaderImpl implements Reader {
+
+ private static final int DIRECTORY_SIZE_GUESS = 16 * 1024;
+
+ private final FileSystem fileSystem;
+ private final Path path;
+ private final CompressionKind compressionKind;
+ private final CompressionCodec codec;
+ private final int bufferSize;
+ private final OrcProto.Footer footer;
+ private final ObjectInspector inspector;
+
+ private static class StripeInformationImpl
+ implements StripeInformation {
+ private final OrcProto.StripeInformation stripe;
+
+ StripeInformationImpl(OrcProto.StripeInformation stripe) {
+ this.stripe = stripe;
+ }
+
+ @Override
+ public long getOffset() {
+ return stripe.getOffset();
+ }
+
+ @Override
+ public long getDataLength() {
+ return stripe.getDataLength();
+ }
+
+ @Override
+ public long getFooterLength() {
+ return stripe.getFooterLength();
+ }
+
+ @Override
+ public long getIndexLength() {
+ return stripe.getIndexLength();
+ }
+
+ @Override
+ public long getNumberOfRows() {
+ return stripe.getNumberOfRows();
+ }
+
+ @Override
+ public String toString() {
+ return "offset: " + getOffset() + " data: " + getDataLength() +
+ " rows: " + getNumberOfRows() + " tail: " + getFooterLength() +
+ " index: " + getIndexLength();
+ }
+ }
+
+ @Override
+ public long getNumberOfRows() {
+ return footer.getNumberOfRows();
+ }
+
+ @Override
+ public Iterable<String> getMetadataKeys() {
+ List<String> result = new ArrayList<String>();
+ for(OrcProto.UserMetadataItem item: footer.getMetadataList()) {
+ result.add(item.getName());
+ }
+ return result;
+ }
+
+ @Override
+ public ByteBuffer getMetadataValue(String key) {
+ for(OrcProto.UserMetadataItem item: footer.getMetadataList()) {
+ if (item.hasName() && item.getName().equals(key)) {
+ return item.getValue().asReadOnlyByteBuffer();
+ }
+ }
+ throw new IllegalArgumentException("Can't find user metadata " + key);
+ }
+
+ @Override
+ public CompressionKind getCompression() {
+ return compressionKind;
+ }
+
+ @Override
+ public int getCompressionSize() {
+ return bufferSize;
+ }
+
+ @Override
+ public Iterable<StripeInformation> getStripes() {
+ return new Iterable<org.apache.hadoop.hive.ql.io.orc.StripeInformation>(){
+
+ @Override
+ public Iterator<org.apache.hadoop.hive.ql.io.orc.StripeInformation> iterator() {
+ return new Iterator<org.apache.hadoop.hive.ql.io.orc.StripeInformation>(){
+ private final Iterator<OrcProto.StripeInformation> inner =
+ footer.getStripesList().iterator();
+
+ @Override
+ public boolean hasNext() {
+ return inner.hasNext();
+ }
+
+ @Override
+ public org.apache.hadoop.hive.ql.io.orc.StripeInformation next() {
+ return new StripeInformationImpl(inner.next());
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("remove unsupported");
+ }
+ };
+ }
+ };
+ }
+
+ @Override
+ public ObjectInspector getObjectInspector() {
+ return inspector;
+ }
+
+ @Override
+ public long getContentLength() {
+ return footer.getContentLength();
+ }
+
+ @Override
+ public List<OrcProto.Type> getTypes() {
+ return footer.getTypesList();
+ }
+
+ @Override
+ public int getRowIndexStride() {
+ return footer.getRowIndexStride();
+ }
+
+ @Override
+ public ColumnStatistics[] getStatistics() {
+ ColumnStatistics[] result = new ColumnStatistics[footer.getTypesCount()];
+ for(int i=0; i < result.length; ++i) {
+ result[i] = ColumnStatisticsImpl.deserialize(footer.getStatistics(i));
+ }
+ return result;
+ }
+
+ ReaderImpl(FileSystem fs, Path path) throws IOException {
+ this.fileSystem = fs;
+ this.path = path;
+ FSDataInputStream file = fs.open(path);
+ long size = fs.getFileStatus(path).getLen();
+ int readSize = (int) Math.min(size, DIRECTORY_SIZE_GUESS);
+ file.seek(size - readSize);
+ ByteBuffer buffer = ByteBuffer.allocate(readSize);
+ file.readFully(buffer.array(), buffer.arrayOffset() + buffer.position(),
+ buffer.remaining());
+ int psLen = buffer.get(readSize - 1);
+ int psOffset = readSize - 1 - psLen;
+ CodedInputStream in = CodedInputStream.newInstance(buffer.array(),
+ buffer.arrayOffset() + psOffset, psLen);
+ OrcProto.PostScript ps = OrcProto.PostScript.parseFrom(in);
+ int footerSize = (int) ps.getFooterLength();
+ bufferSize = (int) ps.getCompressionBlockSize();
+ switch (ps.getCompression()) {
+ case NONE:
+ compressionKind = CompressionKind.NONE;
+ break;
+ case ZLIB:
+ compressionKind = CompressionKind.ZLIB;
+ break;
+ case SNAPPY:
+ compressionKind = CompressionKind.SNAPPY;
+ break;
+ case LZO:
+ compressionKind = CompressionKind.LZO;
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown compression");
+ }
+ codec = WriterImpl.createCodec(compressionKind);
+ int extra = Math.max(0, psLen + 1 + footerSize - readSize);
+ if (extra > 0) {
+ file.seek(size - readSize - extra);
+ ByteBuffer extraBuf = ByteBuffer.allocate(extra + readSize);
+ file.readFully(extraBuf.array(),
+ extraBuf.arrayOffset() + extraBuf.position(), extra);
+ extraBuf.position(extra);
+ extraBuf.put(buffer);
+ buffer = extraBuf;
+ buffer.position(0);
+ buffer.limit(footerSize);
+ } else {
+ buffer.position(psOffset - footerSize);
+ buffer.limit(psOffset);
+ }
+ InputStream instream = InStream.create("footer", buffer, codec, bufferSize);
+ footer = OrcProto.Footer.parseFrom(instream);
+ inspector = OrcStruct.createObjectInspector(0, footer.getTypesList());
+ file.close();
+ }
+
+ @Override
+ public RecordReader rows(boolean[] include) throws IOException {
+ return rows(0, Long.MAX_VALUE, include);
+ }
+
+ @Override
+ public RecordReader rows(long offset, long length, boolean[] include
+ ) throws IOException {
+ return new RecordReaderImpl(this.getStripes(), fileSystem, path, offset,
+ length, footer.getTypesList(), codec, bufferSize,
+ include, footer.getRowIndexStride());
+ }
+
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java?rev=1452992&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java Tue Mar 5 20:44:50 2013
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+
+/**
+ * A row-by-row iterator for ORC files.
+ */
+public interface RecordReader {
+ /**
+ * Does the reader have more rows available.
+ * @return true if there are more rows
+ * @throws java.io.IOException
+ */
+ boolean hasNext() throws IOException;
+
+ /**
+ * Read the next row.
+ * @param previous a row object that can be reused by the reader
+ * @return the row that was read
+ * @throws java.io.IOException
+ */
+ Object next(Object previous) throws IOException;
+
+ /**
+ * Get the row number of the row that will be returned by the following
+ * call to next().
+ * @return the row number from 0 to the number of rows in the file
+ * @throws java.io.IOException
+ */
+ long getRowNumber() throws IOException;
+
+ /**
+ * Get the progress of the reader through the rows.
+ * @return a fraction between 0.0 and 1.0 of rows read
+ * @throws java.io.IOException
+ */
+ float getProgress() throws IOException;
+
+ /**
+ * Release the resources associated with the given reader.
+ * @throws java.io.IOException
+ */
+ void close() throws IOException;
+
+ /**
+ * Seek to a particular row number.
+ */
+ void seekToRow(long rowCount) throws IOException;
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1452992&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Tue Mar 5 20:44:50 2013
@@ -0,0 +1,1238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+class RecordReaderImpl implements RecordReader {
+ private final FSDataInputStream file;
+ private final long firstRow;
+ private final List<StripeInformation> stripes =
+ new ArrayList<StripeInformation>();
+ private OrcProto.StripeFooter stripeFooter;
+ private final long totalRowCount;
+ private final CompressionCodec codec;
+ private final int bufferSize;
+ private final boolean[] included;
+ private final long rowIndexStride;
+ private long rowInStripe = 0;
+ private int currentStripe = 0;
+ private long rowBaseInStripe = 0;
+ private long rowCountInStripe = 0;
+ private final Map<StreamName, InStream> streams =
+ new HashMap<StreamName, InStream>();
+ private final TreeReader reader;
+ private final OrcProto.RowIndex[] indexes;
+
+ RecordReaderImpl(Iterable<StripeInformation> stripes,
+ FileSystem fileSystem,
+ Path path,
+ long offset, long length,
+ List<OrcProto.Type> types,
+ CompressionCodec codec,
+ int bufferSize,
+ boolean[] included,
+ long strideRate
+ ) throws IOException {
+ this.file = fileSystem.open(path);
+ this.codec = codec;
+ this.bufferSize = bufferSize;
+ this.included = included;
+ long rows = 0;
+ long skippedRows = 0;
+ for(StripeInformation stripe: stripes) {
+ long stripeStart = stripe.getOffset();
+ if (offset > stripeStart) {
+ skippedRows += stripe.getNumberOfRows();
+ } else if (stripeStart < offset + length) {
+ this.stripes.add(stripe);
+ rows += stripe.getNumberOfRows();
+ }
+ }
+ firstRow = skippedRows;
+ totalRowCount = rows;
+ reader = createTreeReader(0, types, included);
+ indexes = new OrcProto.RowIndex[types.size()];
+ rowIndexStride = strideRate;
+ if (this.stripes.size() > 0) {
+ readStripe();
+ }
+ }
+
+ private static final class PositionProviderImpl implements PositionProvider {
+ private final OrcProto.RowIndexEntry entry;
+ private int index = 0;
+
+ PositionProviderImpl(OrcProto.RowIndexEntry entry) {
+ this.entry = entry;
+ }
+
+ @Override
+ public long getNext() {
+ return entry.getPositions(index++);
+ }
+ }
+
+ private abstract static class TreeReader {
+ protected final int columnId;
+ private BitFieldReader present = null;
+ protected boolean valuePresent = false;
+
+ TreeReader(int columnId) {
+ this.columnId = columnId;
+ }
+
+ void startStripe(Map<StreamName, InStream> streams,
+ List<OrcProto.ColumnEncoding> encoding
+ ) throws IOException {
+ InStream in = streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.PRESENT));
+ if (in == null) {
+ present = null;
+ valuePresent = true;
+ } else {
+ present = new BitFieldReader(in, 1);
+ }
+ }
+
+ /**
+ * Seek to the given position.
+ * @param index the indexes loaded from the file
+ * @throws IOException
+ */
+ void seek(PositionProvider[] index) throws IOException {
+ if (present != null) {
+ present.seek(index[columnId]);
+ }
+ }
+
+ protected long countNonNulls(long rows) throws IOException {
+ if (present != null) {
+ long result = 0;
+ for(long c=0; c < rows; ++c) {
+ if (present.next() == 1) {
+ result += 1;
+ }
+ }
+ return result;
+ } else {
+ return rows;
+ }
+ }
+
+ abstract void skipRows(long rows) throws IOException;
+
+ Object next(Object previous) throws IOException {
+ if (present != null) {
+ valuePresent = present.next() == 1;
+ }
+ return previous;
+ }
+ }
+
+ private static class BooleanTreeReader extends TreeReader{
+ private BitFieldReader reader = null;
+
+ BooleanTreeReader(int columnId) {
+ super(columnId);
+ }
+
+ @Override
+ void startStripe(Map<StreamName, InStream> streams,
+ List<OrcProto.ColumnEncoding> encodings
+ ) throws IOException {
+ super.startStripe(streams, encodings);
+ reader = new BitFieldReader(streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.DATA)), 1);
+ }
+
+ @Override
+ void seek(PositionProvider[] index) throws IOException {
+ super.seek(index);
+ reader.seek(index[columnId]);
+ }
+
+ @Override
+ void skipRows(long items) throws IOException {
+ reader.skip(countNonNulls(items));
+ }
+
+ @Override
+ Object next(Object previous) throws IOException {
+ super.next(previous);
+ BooleanWritable result = null;
+ if (valuePresent) {
+ if (previous == null) {
+ result = new BooleanWritable();
+ } else {
+ result = (BooleanWritable) previous;
+ }
+ result.set(reader.next() == 1);
+ }
+ return result;
+ }
+ }
+
+ private static class ByteTreeReader extends TreeReader{
+ private RunLengthByteReader reader = null;
+
+ ByteTreeReader(int columnId) {
+ super(columnId);
+ }
+
+ @Override
+ void startStripe(Map<StreamName, InStream> streams,
+ List<OrcProto.ColumnEncoding> encodings
+ ) throws IOException {
+ super.startStripe(streams, encodings);
+ reader = new RunLengthByteReader(streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.DATA)));
+ }
+
+ @Override
+ void seek(PositionProvider[] index) throws IOException {
+ super.seek(index);
+ reader.seek(index[columnId]);
+ }
+
+ @Override
+ Object next(Object previous) throws IOException {
+ super.next(previous);
+ ByteWritable result = null;
+ if (valuePresent) {
+ if (previous == null) {
+ result = new ByteWritable();
+ } else {
+ result = (ByteWritable) previous;
+ }
+ result.set(reader.next());
+ }
+ return result;
+ }
+
+ @Override
+ void skipRows(long items) throws IOException {
+ reader.skip(countNonNulls(items));
+ }
+ }
+
+ private static class ShortTreeReader extends TreeReader{
+ private RunLengthIntegerReader reader = null;
+
+ ShortTreeReader(int columnId) {
+ super(columnId);
+ }
+
+ @Override
+ void startStripe(Map<StreamName, InStream> streams,
+ List<OrcProto.ColumnEncoding> encodings
+ ) throws IOException {
+ super.startStripe(streams, encodings);
+ StreamName name = new StreamName(columnId,
+ OrcProto.Stream.Kind.DATA);
+ reader = new RunLengthIntegerReader(streams.get(name), true);
+ }
+
+ @Override
+ void seek(PositionProvider[] index) throws IOException {
+ super.seek(index);
+ reader.seek(index[columnId]);
+ }
+
+ @Override
+ Object next(Object previous) throws IOException {
+ super.next(previous);
+ ShortWritable result = null;
+ if (valuePresent) {
+ if (previous == null) {
+ result = new ShortWritable();
+ } else {
+ result = (ShortWritable) previous;
+ }
+ result.set((short) reader.next());
+ }
+ return result;
+ }
+
+ @Override
+ void skipRows(long items) throws IOException {
+ reader.skip(countNonNulls(items));
+ }
+ }
+
+ private static class IntTreeReader extends TreeReader{
+ private RunLengthIntegerReader reader = null;
+
+ IntTreeReader(int columnId) {
+ super(columnId);
+ }
+
+ @Override
+ void startStripe(Map<StreamName, InStream> streams,
+ List<OrcProto.ColumnEncoding> encodings
+ ) throws IOException {
+ super.startStripe(streams, encodings);
+ StreamName name = new StreamName(columnId,
+ OrcProto.Stream.Kind.DATA);
+ reader = new RunLengthIntegerReader(streams.get(name), true);
+ }
+
+ @Override
+ void seek(PositionProvider[] index) throws IOException {
+ super.seek(index);
+ reader.seek(index[columnId]);
+ }
+
+ @Override
+ Object next(Object previous) throws IOException {
+ super.next(previous);
+ IntWritable result = null;
+ if (valuePresent) {
+ if (previous == null) {
+ result = new IntWritable();
+ } else {
+ result = (IntWritable) previous;
+ }
+ result.set((int) reader.next());
+ }
+ return result;
+ }
+
+ @Override
+ void skipRows(long items) throws IOException {
+ reader.skip(countNonNulls(items));
+ }
+ }
+
+ private static class LongTreeReader extends TreeReader{
+ private RunLengthIntegerReader reader = null;
+
+ LongTreeReader(int columnId) {
+ super(columnId);
+ }
+
+ @Override
+ void startStripe(Map<StreamName, InStream> streams,
+ List<OrcProto.ColumnEncoding> encodings
+ ) throws IOException {
+ super.startStripe(streams, encodings);
+ StreamName name = new StreamName(columnId,
+ OrcProto.Stream.Kind.DATA);
+ reader = new RunLengthIntegerReader(streams.get(name), true);
+ }
+
+ @Override
+ void seek(PositionProvider[] index) throws IOException {
+ super.seek(index);
+ reader.seek(index[columnId]);
+ }
+
+ @Override
+ Object next(Object previous) throws IOException {
+ super.next(previous);
+ LongWritable result = null;
+ if (valuePresent) {
+ if (previous == null) {
+ result = new LongWritable();
+ } else {
+ result = (LongWritable) previous;
+ }
+ result.set(reader.next());
+ }
+ return result;
+ }
+
+ @Override
+ void skipRows(long items) throws IOException {
+ reader.skip(countNonNulls(items));
+ }
+ }
+
+ private static class FloatTreeReader extends TreeReader{
+ private InStream stream;
+
+ FloatTreeReader(int columnId) {
+ super(columnId);
+ }
+
+ @Override
+ void startStripe(Map<StreamName, InStream> streams,
+ List<OrcProto.ColumnEncoding> encodings
+ ) throws IOException {
+ super.startStripe(streams, encodings);
+ StreamName name = new StreamName(columnId,
+ OrcProto.Stream.Kind.DATA);
+ stream = streams.get(name);
+ }
+
+ @Override
+ void seek(PositionProvider[] index) throws IOException {
+ super.seek(index);
+ stream.seek(index[columnId]);
+ }
+
+ @Override
+ Object next(Object previous) throws IOException {
+ super.next(previous);
+ FloatWritable result = null;
+ if (valuePresent) {
+ if (previous == null) {
+ result = new FloatWritable();
+ } else {
+ result = (FloatWritable) previous;
+ }
+ result.set(SerializationUtils.readFloat(stream));
+ }
+ return result;
+ }
+
+ @Override
+ void skipRows(long items) throws IOException {
+ items = countNonNulls(items);
+ for(int i=0; i < items; ++i) {
+ SerializationUtils.readFloat(stream);
+ }
+ }
+ }
+
+ private static class DoubleTreeReader extends TreeReader{
+ private InStream stream;
+
+ DoubleTreeReader(int columnId) {
+ super(columnId);
+ }
+
+ @Override
+ void startStripe(Map<StreamName, InStream> streams,
+ List<OrcProto.ColumnEncoding> encodings
+ ) throws IOException {
+ super.startStripe(streams, encodings);
+ StreamName name =
+ new StreamName(columnId,
+ OrcProto.Stream.Kind.DATA);
+ stream = streams.get(name);
+ }
+
+ @Override
+ void seek(PositionProvider[] index) throws IOException {
+ super.seek(index);
+ stream.seek(index[columnId]);
+ }
+
+ @Override
+ Object next(Object previous) throws IOException {
+ super.next(previous);
+ DoubleWritable result = null;
+ if (valuePresent) {
+ if (previous == null) {
+ result = new DoubleWritable();
+ } else {
+ result = (DoubleWritable) previous;
+ }
+ result.set(SerializationUtils.readDouble(stream));
+ }
+ return result;
+ }
+
+ @Override
+ void skipRows(long items) throws IOException {
+ items = countNonNulls(items);
+ stream.skip(items * 8);
+ }
+ }
+
+ private static class BinaryTreeReader extends TreeReader{
+ private InStream stream;
+ private RunLengthIntegerReader lengths;
+
+ BinaryTreeReader(int columnId) {
+ super(columnId);
+ }
+
+ @Override
+ void startStripe(Map<StreamName, InStream> streams,
+ List<OrcProto.ColumnEncoding> encodings
+ ) throws IOException {
+ super.startStripe(streams, encodings);
+ StreamName name = new StreamName(columnId,
+ OrcProto.Stream.Kind.DATA);
+ stream = streams.get(name);
+ lengths = new RunLengthIntegerReader(streams.get(new
+ StreamName(columnId, OrcProto.Stream.Kind.LENGTH)),
+ false);
+ }
+
+ @Override
+ void seek(PositionProvider[] index) throws IOException {
+ super.seek(index);
+ stream.seek(index[columnId]);
+ lengths.seek(index[columnId]);
+ }
+
+ @Override
+ Object next(Object previous) throws IOException {
+ super.next(previous);
+ BytesWritable result = null;
+ if (valuePresent) {
+ if (previous == null) {
+ result = new BytesWritable();
+ } else {
+ result = (BytesWritable) previous;
+ }
+ int len = (int) lengths.next();
+ result.setSize(len);
+ int offset = 0;
+ while (len > 0) {
+ int written = stream.read(result.getBytes(), offset, len);
+ if (written < 0) {
+ throw new EOFException("Can't finish byte read from " + stream);
+ }
+ len -= written;
+ offset += written;
+ }
+ }
+ return result;
+ }
+
+ @Override
+ void skipRows(long items) throws IOException {
+ items = countNonNulls(items);
+ long lengthToSkip = 0;
+ for(int i=0; i < items; ++i) {
+ lengthToSkip += lengths.next();
+ }
+ stream.skip(lengthToSkip);
+ }
+ }
+
+ private static class TimestampTreeReader extends TreeReader{
+ private RunLengthIntegerReader data;
+ private RunLengthIntegerReader nanos;
+
+ TimestampTreeReader(int columnId) {
+ super(columnId);
+ }
+
+ @Override
+ void startStripe(Map<StreamName, InStream> streams,
+ List<OrcProto.ColumnEncoding> encodings
+ ) throws IOException {
+ super.startStripe(streams, encodings);
+ data = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.DATA)), true);
+ nanos = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.NANO_DATA)), false);
+ }
+
+ @Override
+ void seek(PositionProvider[] index) throws IOException {
+ super.seek(index);
+ data.seek(index[columnId]);
+ nanos.seek(index[columnId]);
+ }
+
+ @Override
+ Object next(Object previous) throws IOException {
+ super.next(previous);
+ Timestamp result = null;
+ if (valuePresent) {
+ if (previous == null) {
+ result = new Timestamp(0);
+ } else {
+ result = (Timestamp) previous;
+ }
+ long millis = (data.next() + WriterImpl.BASE_TIMESTAMP) *
+ WriterImpl.MILLIS_PER_SECOND;
+ int newNanos = parseNanos(nanos.next());
+ // fix the rounding when we divided by 1000.
+ if (millis >= 0) {
+ millis += newNanos / 1000000;
+ } else {
+ millis -= newNanos / 1000000;
+ }
+ result.setTime(millis);
+ result.setNanos(newNanos);
+ }
+ return result;
+ }
+
+ private static int parseNanos(long serialized) {
+ int zeros = 7 & (int) serialized;
+ int result = (int) serialized >>> 3;
+ if (zeros != 0) {
+ for(int i =0; i <= zeros; ++i) {
+ result *= 10;
+ }
+ }
+ return result;
+ }
+
+ @Override
+ void skipRows(long items) throws IOException {
+ items = countNonNulls(items);
+ data.skip(items);
+ nanos.skip(items);
+ }
+ }
+
+ private static class StringTreeReader extends TreeReader {
+ private DynamicByteArray dictionaryBuffer = null;
+ private int dictionarySize;
+ private int[] dictionaryOffsets;
+ private RunLengthIntegerReader reader;
+
+ StringTreeReader(int columnId) {
+ super(columnId);
+ }
+
+ @Override
+ void startStripe(Map<StreamName, InStream> streams,
+ List<OrcProto.ColumnEncoding> encodings
+ ) throws IOException {
+ super.startStripe(streams, encodings);
+
+ // read the dictionary blob
+ dictionarySize = encodings.get(columnId).getDictionarySize();
+ StreamName name = new StreamName(columnId,
+ OrcProto.Stream.Kind.DICTIONARY_DATA);
+ InStream in = streams.get(name);
+ if (in.available() > 0) {
+ dictionaryBuffer = new DynamicByteArray(64, in.available());
+ dictionaryBuffer.readAll(in);
+ } else {
+ dictionaryBuffer = null;
+ }
+ in.close();
+
+ // read the lengths
+ name = new StreamName(columnId, OrcProto.Stream.Kind.LENGTH);
+ in = streams.get(name);
+ RunLengthIntegerReader lenReader = new RunLengthIntegerReader(in, false);
+ int offset = 0;
+ if (dictionaryOffsets == null ||
+ dictionaryOffsets.length < dictionarySize + 1) {
+ dictionaryOffsets = new int[dictionarySize + 1];
+ }
+ for(int i=0; i < dictionarySize; ++i) {
+ dictionaryOffsets[i] = offset;
+ offset += (int) lenReader.next();
+ }
+ dictionaryOffsets[dictionarySize] = offset;
+ in.close();
+
+ // set up the row reader
+ name = new StreamName(columnId, OrcProto.Stream.Kind.DATA);
+ reader = new RunLengthIntegerReader(streams.get(name), false);
+ }
+
+ @Override
+ void seek(PositionProvider[] index) throws IOException {
+ super.seek(index);
+ reader.seek(index[columnId]);
+ }
+
+ @Override
+ Object next(Object previous) throws IOException {
+ super.next(previous);
+ Text result = null;
+ if (valuePresent) {
+ int entry = (int) reader.next();
+ if (previous == null) {
+ result = new Text();
+ } else {
+ result = (Text) previous;
+ }
+ int offset = dictionaryOffsets[entry];
+ int length;
+ // if it isn't the last entry, subtract the offsets otherwise use
+ // the buffer length.
+ if (entry < dictionaryOffsets.length - 1) {
+ length = dictionaryOffsets[entry + 1] - offset;
+ } else {
+ length = dictionaryBuffer.size() - offset;
+ }
+ dictionaryBuffer.setText(result, offset, length);
+ }
+ return result;
+ }
+
+ @Override
+ void skipRows(long items) throws IOException {
+ reader.skip(countNonNulls(items));
+ }
+ }
+
+ private static class StructTreeReader extends TreeReader {
+ private final TreeReader[] fields;
+ private final String[] fieldNames;
+
+ StructTreeReader(int columnId,
+ List<OrcProto.Type> types,
+ boolean[] included) throws IOException {
+ super(columnId);
+ OrcProto.Type type = types.get(columnId);
+ int fieldCount = type.getFieldNamesCount();
+ this.fields = new TreeReader[fieldCount];
+ this.fieldNames = new String[fieldCount];
+ for(int i=0; i < fieldCount; ++i) {
+ int subtype = type.getSubtypes(i);
+ if (included == null || included[subtype]) {
+ this.fields[i] = createTreeReader(subtype, types, included);
+ }
+ this.fieldNames[i] = type.getFieldNames(i);
+ }
+ }
+
+ @Override
+ void seek(PositionProvider[] index) throws IOException {
+ super.seek(index);
+ for(TreeReader kid: fields) {
+ kid.seek(index);
+ }
+ }
+
+ @Override
+ Object next(Object previous) throws IOException {
+ super.next(previous);
+ OrcStruct result = null;
+ if (valuePresent) {
+ if (previous == null) {
+ result = new OrcStruct(fields.length);
+ } else {
+ result = (OrcStruct) previous;
+ }
+ for(int i=0; i < fields.length; ++i) {
+ if (fields[i] != null) {
+ result.setFieldValue(i, fields[i].next(result.getFieldValue(i)));
+ }
+ }
+ }
+ return result;
+ }
+
+ @Override
+ void startStripe(Map<StreamName, InStream> streams,
+ List<OrcProto.ColumnEncoding> encodings
+ ) throws IOException {
+ super.startStripe(streams, encodings);
+ for(TreeReader field: fields) {
+ if (field != null) {
+ field.startStripe(streams, encodings);
+ }
+ }
+ }
+
+ @Override
+ void skipRows(long items) throws IOException {
+ items = countNonNulls(items);
+ for(TreeReader field: fields) {
+ field.skipRows(items);
+ }
+ }
+ }
+
+ private static class UnionTreeReader extends TreeReader {
+ private final TreeReader[] fields;
+ private RunLengthByteReader tags;
+
+ UnionTreeReader(int columnId,
+ List<OrcProto.Type> types,
+ boolean[] included) throws IOException {
+ super(columnId);
+ OrcProto.Type type = types.get(columnId);
+ int fieldCount = type.getSubtypesCount();
+ this.fields = new TreeReader[fieldCount];
+ for(int i=0; i < fieldCount; ++i) {
+ int subtype = type.getSubtypes(i);
+ if (included == null || included[subtype]) {
+ this.fields[i] = createTreeReader(subtype, types, included);
+ }
+ }
+ }
+
+ @Override
+ void seek(PositionProvider[] index) throws IOException {
+ super.seek(index);
+ tags.seek(index[columnId]);
+ for(TreeReader kid: fields) {
+ kid.seek(index);
+ }
+ }
+
+ @Override
+ Object next(Object previous) throws IOException {
+ super.next(previous);
+ OrcUnion result = null;
+ if (valuePresent) {
+ if (previous == null) {
+ result = new OrcUnion();
+ } else {
+ result = (OrcUnion) previous;
+ }
+ byte tag = tags.next();
+ Object previousVal = result.getObject();
+ result.set(tag, fields[tag].next(tag == result.getTag() ?
+ previousVal : null));
+ }
+ return result;
+ }
+
+ @Override
+ void startStripe(Map<StreamName, InStream> streams,
+ List<OrcProto.ColumnEncoding> encodings
+ ) throws IOException {
+ super.startStripe(streams, encodings);
+ tags = new RunLengthByteReader(streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.DATA)));
+ for(TreeReader field: fields) {
+ if (field != null) {
+ field.startStripe(streams, encodings);
+ }
+ }
+ }
+
+ @Override
+ void skipRows(long items) throws IOException {
+ items = countNonNulls(items);
+ long[] counts = new long[fields.length];
+ for(int i=0; i < items; ++i) {
+ counts[tags.next()] += 1;
+ }
+ for(int i=0; i < counts.length; ++i) {
+ fields[i].skipRows(counts[i]);
+ }
+ }
+ }
+
+ private static class ListTreeReader extends TreeReader {
+ private final TreeReader elementReader;
+ private RunLengthIntegerReader lengths;
+
+ ListTreeReader(int columnId,
+ List<OrcProto.Type> types,
+ boolean[] included) throws IOException {
+ super(columnId);
+ OrcProto.Type type = types.get(columnId);
+ elementReader = createTreeReader(type.getSubtypes(0), types, included);
+ }
+
+ @Override
+ void seek(PositionProvider[] index) throws IOException {
+ super.seek(index);
+ lengths.seek(index[columnId]);
+ elementReader.seek(index);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ Object next(Object previous) throws IOException {
+ super.next(previous);
+ List<Object> result = null;
+ if (valuePresent) {
+ if (previous == null) {
+ result = new ArrayList<Object>();
+ } else {
+ result = (ArrayList<Object>) previous;
+ }
+ int prevLength = result.size();
+ int length = (int) lengths.next();
+ // extend the list to the new length
+ for(int i=prevLength; i < length; ++i) {
+ result.add(null);
+ }
+ // read the new elements into the array
+ for(int i=0; i< length; i++) {
+ result.set(i, elementReader.next(i < prevLength ?
+ result.get(i) : null));
+ }
+ // remove any extra elements
+ for(int i=prevLength - 1; i >= length; --i) {
+ result.remove(i);
+ }
+ }
+ return result;
+ }
+
+ @Override
+ void startStripe(Map<StreamName, InStream> streams,
+ List<OrcProto.ColumnEncoding> encodings
+ ) throws IOException {
+ super.startStripe(streams, encodings);
+ lengths = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.LENGTH)), false);
+ if (elementReader != null) {
+ elementReader.startStripe(streams, encodings);
+ }
+ }
+
+ @Override
+ void skipRows(long items) throws IOException {
+ items = countNonNulls(items);
+ long childSkip = 0;
+ for(long i=0; i < items; ++i) {
+ childSkip += lengths.next();
+ }
+ elementReader.skipRows(childSkip);
+ }
+ }
+
+ private static class MapTreeReader extends TreeReader {
+ private final TreeReader keyReader;
+ private final TreeReader valueReader;
+ private RunLengthIntegerReader lengths;
+
+ MapTreeReader(int columnId,
+ List<OrcProto.Type> types,
+ boolean[] included) throws IOException {
+ super(columnId);
+ OrcProto.Type type = types.get(columnId);
+ int keyColumn = type.getSubtypes(0);
+ int valueColumn = type.getSubtypes(1);
+ if (included == null || included[keyColumn]) {
+ keyReader = createTreeReader(keyColumn, types, included);
+ } else {
+ keyReader = null;
+ }
+ if (included == null || included[valueColumn]) {
+ valueReader = createTreeReader(valueColumn, types, included);
+ } else {
+ valueReader = null;
+ }
+ }
+
+ @Override
+ void seek(PositionProvider[] index) throws IOException {
+ super.seek(index);
+ lengths.seek(index[columnId]);
+ keyReader.seek(index);
+ valueReader.seek(index);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ Object next(Object previous) throws IOException {
+ super.next(previous);
+ Map<Object, Object> result = null;
+ if (valuePresent) {
+ if (previous == null) {
+ result = new HashMap<Object, Object>();
+ } else {
+ result = (HashMap<Object, Object>) previous;
+ }
+ // for now just clear and create new objects
+ result.clear();
+ int length = (int) lengths.next();
+ // read the new elements into the array
+ for(int i=0; i< length; i++) {
+ result.put(keyReader.next(null), valueReader.next(null));
+ }
+ }
+ return result;
+ }
+
+ @Override
+ void startStripe(Map<StreamName, InStream> streams,
+ List<OrcProto.ColumnEncoding> encodings
+ ) throws IOException {
+ super.startStripe(streams, encodings);
+ lengths = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.LENGTH)), false);
+ if (keyReader != null) {
+ keyReader.startStripe(streams, encodings);
+ }
+ if (valueReader != null) {
+ valueReader.startStripe(streams, encodings);
+ }
+ }
+
+ @Override
+ void skipRows(long items) throws IOException {
+ items = countNonNulls(items);
+ long childSkip = 0;
+ for(long i=0; i < items; ++i) {
+ childSkip += lengths.next();
+ }
+ keyReader.skipRows(childSkip);
+ valueReader.skipRows(childSkip);
+ }
+ }
+
+ private static TreeReader createTreeReader(int columnId,
+ List<OrcProto.Type> types,
+ boolean[] included
+ ) throws IOException {
+ OrcProto.Type type = types.get(columnId);
+ switch (type.getKind()) {
+ case BOOLEAN:
+ return new BooleanTreeReader(columnId);
+ case BYTE:
+ return new ByteTreeReader(columnId);
+ case DOUBLE:
+ return new DoubleTreeReader(columnId);
+ case FLOAT:
+ return new FloatTreeReader(columnId);
+ case SHORT:
+ return new ShortTreeReader(columnId);
+ case INT:
+ return new IntTreeReader(columnId);
+ case LONG:
+ return new LongTreeReader(columnId);
+ case STRING:
+ return new StringTreeReader(columnId);
+ case BINARY:
+ return new BinaryTreeReader(columnId);
+ case TIMESTAMP:
+ return new TimestampTreeReader(columnId);
+ case STRUCT:
+ return new StructTreeReader(columnId, types, included);
+ case LIST:
+ return new ListTreeReader(columnId, types, included);
+ case MAP:
+ return new MapTreeReader(columnId, types, included);
+ case UNION:
+ return new UnionTreeReader(columnId, types, included);
+ default:
+ throw new IllegalArgumentException("Unsupported type " +
+ type.getKind());
+ }
+ }
+
+ OrcProto.StripeFooter readStripeFooter(StripeInformation stripe
+ ) throws IOException {
+ long offset = stripe.getOffset() + stripe.getIndexLength() +
+ stripe.getDataLength();
+ int tailLength = (int) stripe.getFooterLength();
+
+ // read the footer
+ ByteBuffer tailBuf = ByteBuffer.allocate(tailLength);
+ file.seek(offset);
+ file.readFully(tailBuf.array(), tailBuf.arrayOffset(), tailLength);
+ return OrcProto.StripeFooter.parseFrom(InStream.create("footer", tailBuf,
+ codec, bufferSize));
+ }
+
+ private void readStripe() throws IOException {
+ StripeInformation stripe = stripes.get(currentStripe);
+ stripeFooter = readStripeFooter(stripe);
+ long offset = stripe.getOffset();
+ streams.clear();
+
+ // if we aren't projecting columns, just read the whole stripe
+ if (included == null) {
+ byte[] buffer =
+ new byte[(int) (stripe.getDataLength())];
+ file.seek(offset + stripe.getIndexLength());
+ file.readFully(buffer, 0, buffer.length);
+ int sectionOffset = 0;
+ for(OrcProto.Stream section: stripeFooter.getStreamsList()) {
+ if (StreamName.getArea(section.getKind()) == StreamName.Area.DATA) {
+ int sectionLength = (int) section.getLength();
+ ByteBuffer sectionBuffer = ByteBuffer.wrap(buffer, sectionOffset,
+ sectionLength);
+ StreamName name = new StreamName(section.getColumn(),
+ section.getKind());
+ streams.put(name,
+ InStream.create(name.toString(), sectionBuffer, codec,
+ bufferSize));
+ sectionOffset += sectionLength;
+ }
+ }
+ } else {
+ List<OrcProto.Stream> streamList = stripeFooter.getStreamsList();
+ // the index of the current section
+ int currentSection = 0;
+ while (currentSection < streamList.size() &&
+ StreamName.getArea(streamList.get(currentSection).getKind()) !=
+ StreamName.Area.DATA) {
+ currentSection += 1;
+ }
+ // byte position of the current section relative to the stripe start
+ long sectionOffset = stripe.getIndexLength();
+ while (currentSection < streamList.size()) {
+ int bytes = 0;
+
+ // find the first section that shouldn't be read
+ int excluded=currentSection;
+ while (excluded < streamList.size() &&
+ included[streamList.get(excluded).getColumn()]) {
+ bytes += streamList.get(excluded).getLength();
+ excluded += 1;
+ }
+
+ // actually read the bytes as a big chunk
+ if (bytes != 0) {
+ byte[] buffer = new byte[bytes];
+ file.seek(offset + sectionOffset);
+ file.readFully(buffer, 0, bytes);
+ sectionOffset += bytes;
+
+ // create the streams for the sections we just read
+ bytes = 0;
+ while (currentSection < excluded) {
+ OrcProto.Stream section = streamList.get(currentSection);
+ StreamName name =
+ new StreamName(section.getColumn(), section.getKind());
+ this.streams.put(name,
+ InStream.create(name.toString(),
+ ByteBuffer.wrap(buffer, bytes,
+ (int) section.getLength()), codec, bufferSize));
+ currentSection += 1;
+ bytes += section.getLength();
+ }
+ }
+
+ // skip forward until we get back to a section that we need
+ while (currentSection < streamList.size() &&
+ !included[streamList.get(currentSection).getColumn()]) {
+ sectionOffset += streamList.get(currentSection).getLength();
+ currentSection += 1;
+ }
+ }
+ }
+ reader.startStripe(streams, stripeFooter.getColumnsList());
+ rowInStripe = 0;
+ rowCountInStripe = stripe.getNumberOfRows();
+ rowBaseInStripe = 0;
+ for(int i=0; i < currentStripe; ++i) {
+ rowBaseInStripe += stripes.get(i).getNumberOfRows();
+ }
+ for(int i=0; i < indexes.length; ++i) {
+ indexes[i] = null;
+ }
+ }
+
+ @Override
+ public boolean hasNext() throws IOException {
+ return rowInStripe < rowCountInStripe || currentStripe < stripes.size() - 1;
+ }
+
+ @Override
+ public Object next(Object previous) throws IOException {
+ if (rowInStripe >= rowCountInStripe) {
+ currentStripe += 1;
+ readStripe();
+ }
+ rowInStripe += 1;
+ return reader.next(previous);
+ }
+
+ @Override
+ public void close() throws IOException {
+ file.close();
+ }
+
+ @Override
+ public long getRowNumber() {
+ return rowInStripe + rowBaseInStripe + firstRow;
+ }
+
+ /**
+ * Return the fraction of rows that have been read from the selected.
+ * section of the file
+ * @return fraction between 0.0 and 1.0 of rows consumed
+ */
+ @Override
+ public float getProgress() {
+ return ((float) rowBaseInStripe + rowInStripe) / totalRowCount;
+ }
+
+ private int findStripe(long rowNumber) {
+ if (rowNumber < 0) {
+ throw new IllegalArgumentException("Seek to a negative row number " +
+ rowNumber);
+ } else if (rowNumber < firstRow) {
+ throw new IllegalArgumentException("Seek before reader range " +
+ rowNumber);
+ }
+ rowNumber -= firstRow;
+ for(int i=0; i < stripes.size(); i++) {
+ StripeInformation stripe = stripes.get(i);
+ if (stripe.getNumberOfRows() > rowNumber) {
+ return i;
+ }
+ rowNumber -= stripe.getNumberOfRows();
+ }
+ throw new IllegalArgumentException("Seek after the end of reader range");
+ }
+
+ private void readRowIndex() throws IOException {
+ long offset = stripes.get(currentStripe).getOffset();
+ for(OrcProto.Stream stream: stripeFooter.getStreamsList()) {
+ if (stream.getKind() == OrcProto.Stream.Kind.ROW_INDEX) {
+ int col = stream.getColumn();
+ if ((included == null || included[col]) && indexes[col] == null) {
+ byte[] buffer = new byte[(int) stream.getLength()];
+ file.seek(offset);
+ file.readFully(buffer);
+ indexes[col] = OrcProto.RowIndex.parseFrom(InStream.create("index",
+ ByteBuffer.wrap(buffer), codec, bufferSize));
+ }
+ }
+ offset += stream.getLength();
+ }
+ }
+
+ private void seekToRowEntry(int rowEntry) throws IOException {
+ PositionProvider[] index = new PositionProvider[indexes.length];
+ for(int i=0; i < indexes.length; ++i) {
+ if (indexes[i] != null) {
+ index[i]=
+ new PositionProviderImpl(indexes[i].getEntry(rowEntry));
+ }
+ }
+ reader.seek(index);
+ }
+
+ @Override
+ public void seekToRow(long rowNumber) throws IOException {
+ int rightStripe = findStripe(rowNumber);
+ if (rightStripe != currentStripe) {
+ currentStripe = rightStripe;
+ readStripe();
+ }
+ readRowIndex();
+ rowInStripe = rowNumber - rowBaseInStripe;
+ if (rowIndexStride != 0) {
+ long entry = rowInStripe / rowIndexStride;
+ seekToRowEntry((int) entry);
+ reader.skipRows(rowInStripe - entry * rowIndexStride);
+ } else {
+ reader.skipRows(rowInStripe);
+ }
+ }
+}

Search Discussions

Discussion Posts

Previous

Follow ups

Related Discussions

Discussion Navigation
viewthread | post
posts ‹ prev | 5 of 8 | next ›
Discussion Overview
groupcommits @
categorieshive, hadoop
postedMar 5, '13 at 8:45p
activeMar 5, '13 at 8:45p
posts8
users1
websitehive.apache.org

1 user in discussion

Kevinwilfong: 8 posts

People

Translate

site design / logo © 2021 Grokbase