From e2b7ce1fa30c13f37c1b13892aa36453e7a5913b Mon Sep 17 00:00:00 2001 From: kanthi subramanian Date: Thu, 23 Apr 2026 14:24:10 -0500 Subject: [PATCH 1/4] Upgraded iceberg-java to 1.9.2, upgrade overridden classes, added support for alter table add column with default, non/null columns. --- .../iceberg/BaseMetastoreTableOperations.java | 38 +++-- .../main/java/com/altinity/ice/cli/Main.java | 2 +- .../ice/cli/internal/cmd/AlterTable.java | 93 ++++++++++- .../data/parquet/BaseParquetReaders.java | 148 ++++++------------ .../data/parquet/BaseParquetWriter.java | 80 +++++----- .../iceberg/parquet/ParquetConversions.java | 50 ++++++ .../org/apache/iceberg/rest/HTTPClient.java | 65 +------- .../ice/cli/internal/cmd/AlterTableTest.java | 105 +++++++++++++ pom.xml | 4 +- 9 files changed, 378 insertions(+), 207 deletions(-) diff --git a/ice-rest-catalog/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java b/ice-rest-catalog/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java index 0a5ac042..e9ca5cc5 100644 --- a/ice-rest-catalog/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java +++ b/ice-rest-catalog/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Predicate; +import org.apache.iceberg.BaseMetastoreOperations.CommitStatus; import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.CommitFailedException; @@ -294,20 +295,39 @@ public long newSnapshotId() { * were attempting to set. This is used as a last resort when we are dealing with exceptions that * may indicate the commit has failed but are not proof that this is the case. Past locations must * also be searched on the chance that a second committer was able to successfully commit on top - * of our commit. + * of our commit. When the {@code newMetadataLocation} is not found, the method returns {@link + * CommitStatus#UNKNOWN}. * * @param newMetadataLocation the path of the new commit file * @param config metadata to use for configuration - * @return Commit Status of Success, Failure or Unknown + * @return Commit Status of Success, Unknown */ protected CommitStatus checkCommitStatus(String newMetadataLocation, TableMetadata config) { - return CommitStatus.valueOf( - checkCommitStatus( - tableName(), - newMetadataLocation, - config.properties(), - () -> checkCurrentMetadataLocation(newMetadataLocation)) - .name()); + return checkCommitStatus( + tableName(), + newMetadataLocation, + config.properties(), + () -> checkCurrentMetadataLocation(newMetadataLocation)); + } + + /** + * Attempt to load the table and see if any current or past metadata location matches the one we + * were attempting to set. This is used as a last resort when we are dealing with exceptions that + * may indicate the commit has failed but are not proof that this is the case. Past locations must + * also be searched on the chance that a second committer was able to successfully commit on top + * of our commit. When the {@code newMetadataLocation} is not found, the method returns {@link + * CommitStatus#FAILURE}. + * + * @param newMetadataLocation the path of the new commit file + * @param config metadata to use for configuration + * @return Commit Status of Success, Failure or Unknown + */ + protected CommitStatus checkCommitStatusStrict(String newMetadataLocation, TableMetadata config) { + return checkCommitStatusStrict( + tableName(), + newMetadataLocation, + config.properties(), + () -> checkCurrentMetadataLocation(newMetadataLocation)); } /** diff --git a/ice/src/main/java/com/altinity/ice/cli/Main.java b/ice/src/main/java/com/altinity/ice/cli/Main.java index 644811b1..69d5e0e1 100644 --- a/ice/src/main/java/com/altinity/ice/cli/Main.java +++ b/ice/src/main/java/com/altinity/ice/cli/Main.java @@ -335,7 +335,7 @@ void alterTable( e.g. [{"op":"drop_column","name":"foo"}] Supported operations: - - add_column (params: "name", "type" (https://iceberg.apache.org/spec/#primitive-types), "doc" (optional)) + - add_column (params: "name", "type" (https://iceberg.apache.org/spec/#primitive-types), "doc" (optional), "nullable" (optional, default true), "default" (optional)) - alter_column (params: "name", "type" (https://iceberg.apache.org/spec/#primitive-types)) - rename_column (params: "name", "new_name") - drop_column (params: "name") diff --git a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/AlterTable.java b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/AlterTable.java index 65b07ae7..942d0961 100644 --- a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/AlterTable.java +++ b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/AlterTable.java @@ -13,7 +13,11 @@ import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; import java.io.IOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.List; +import java.util.UUID; import javax.annotation.Nullable; import org.apache.iceberg.Table; import org.apache.iceberg.Transaction; @@ -22,6 +26,7 @@ import org.apache.iceberg.UpdateSchema; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.slf4j.Logger; @@ -50,17 +55,88 @@ public static class AddColumn extends Update { private final String name; private final Type type; @Nullable private final String doc; + private final boolean nullable; + @Nullable private final Literal defaultValue; public AddColumn( @JsonProperty(value = "name", required = true) String name, @JsonProperty(value = "type", required = true) String type, - @JsonProperty("doc") @Nullable String doc) { + @JsonProperty("doc") @Nullable String doc, + @JsonProperty("nullable") @Nullable Boolean nullable, + @JsonProperty("default") @Nullable Object defaultValue) { this.name = name; this.type = Types.fromPrimitiveString(type); this.doc = doc; + this.nullable = nullable == null || nullable; + this.defaultValue = coerceDefault(this.type, defaultValue); } } + // Coerce a JSON literal (number / boolean / string) into an Iceberg Literal of the requested + // type. For numeric, boolean and string types we build the Literal directly; for complex + // primitives (date/time/timestamp/decimal/uuid/binary) we accept a string form and let + // Iceberg parse it via Literal.to(type). + @Nullable + private static Literal coerceDefault(Type type, @Nullable Object jsonValue) { + if (jsonValue == null) { + return null; + } + try { + switch (type.typeId()) { + case BOOLEAN: + if (jsonValue instanceof Boolean b) { + return Literal.of(b); + } + return Literal.of(Boolean.parseBoolean(jsonValue.toString())); + case INTEGER: + return Literal.of(((Number) numberOrParse(jsonValue)).intValue()); + case LONG: + return Literal.of(((Number) numberOrParse(jsonValue)).longValue()); + case FLOAT: + return Literal.of(((Number) numberOrParse(jsonValue)).floatValue()); + case DOUBLE: + return Literal.of(((Number) numberOrParse(jsonValue)).doubleValue()); + case STRING: + return Literal.of(jsonValue.toString()); + case BINARY: + case FIXED: + return Literal.of(ByteBuffer.wrap(jsonValue.toString().getBytes(StandardCharsets.UTF_8))); + case DECIMAL: + return Literal.of(new BigDecimal(jsonValue.toString())).to(type); + case UUID: + return Literal.of(UUID.fromString(jsonValue.toString())); + case DATE: + case TIME: + case TIMESTAMP: + case TIMESTAMP_NANO: + // Let Iceberg parse the canonical string form (e.g. "2025-01-01", + // "2025-01-01T00:00:00", "2025-01-01T00:00:00+00:00"). + Literal parsed = Literal.of(jsonValue.toString()).to(type); + if (parsed == null) { + throw new IllegalArgumentException( + String.format("Cannot parse default value '%s' as %s", jsonValue, type)); + } + return parsed; + default: + throw new IllegalArgumentException( + String.format("Defaults not supported for type %s", type)); + } + } catch (ClassCastException | IllegalArgumentException e) { + throw new IllegalArgumentException( + String.format("Cannot coerce default value '%s' to type %s", jsonValue, type), e); + } + } + + // Accept either a JSON number (already a Number) or a string parseable as a number. + private static Number numberOrParse(Object jsonValue) { + if (jsonValue instanceof Number n) { + return n; + } + String s = jsonValue.toString(); + // Use BigDecimal to accept any numeric form; downstream casts narrow to int/long/float/double. + return new BigDecimal(s); + } + public static class AlterColumn extends Update { private final String name; private final Type.PrimitiveType type; @@ -138,7 +214,20 @@ public static void run(Catalog catalog, TableIdentifier tableId, List up switch (update) { case AddColumn up -> { // TODO: support nested columns - schemaUpdates.getValue().addColumn(up.name, up.type, up.doc); + UpdateSchema us = schemaUpdates.getValue(); + if (up.nullable) { + if (up.defaultValue == null) { + us.addColumn(up.name, up.type, up.doc); + } else { + us.addColumn(up.name, up.type, up.doc, up.defaultValue); + } + } else { + if (up.defaultValue == null) { + us.addRequiredColumn(up.name, up.type, up.doc); + } else { + us.addRequiredColumn(up.name, up.type, up.doc, up.defaultValue); + } + } } case AlterColumn up -> { // TODO: support nested columns diff --git a/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java b/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java index 7f237b72..8708fe28 100644 --- a/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java +++ b/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java @@ -18,15 +18,17 @@ */ package org.apache.iceberg.data.parquet; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Optional; -import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.parquet.ParquetSchemaUtil; import org.apache.iceberg.parquet.ParquetValueReader; import org.apache.iceberg.parquet.ParquetValueReaders; +import org.apache.iceberg.parquet.ParquetVariantVisitor; import org.apache.iceberg.parquet.TypeWithSchemaVisitor; +import org.apache.iceberg.parquet.VariantReaderBuilder; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -50,11 +52,7 @@ import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; -/** - * @deprecated since 1.8.0, will be made package-private in 1.9.0 - */ -@Deprecated -public abstract class BaseParquetReaders { +abstract class BaseParquetReaders { protected BaseParquetReaders() {} protected ParquetValueReader createReader(Schema expectedSchema, MessageType fileSchema) { @@ -78,56 +76,16 @@ protected ParquetValueReader createReader( } protected abstract ParquetValueReader createStructReader( - List types, List> fieldReaders, Types.StructType structType); - - protected ParquetValueReader fixedReader(ColumnDescriptor desc) { - return new GenericParquetReaders.FixedReader(desc); - } + List> fieldReaders, Types.StructType structType); - protected ParquetValueReader dateReader(ColumnDescriptor desc) { - return new GenericParquetReaders.DateReader(desc); - } + protected abstract ParquetValueReader fixedReader(ColumnDescriptor desc); - protected ParquetValueReader timeReader(ColumnDescriptor desc) { - LogicalTypeAnnotation time = desc.getPrimitiveType().getLogicalTypeAnnotation(); - Preconditions.checkArgument( - time instanceof TimeLogicalTypeAnnotation, "Invalid time logical type: " + time); - - LogicalTypeAnnotation.TimeUnit unit = ((TimeLogicalTypeAnnotation) time).getUnit(); - switch (unit) { - case MICROS: - return new GenericParquetReaders.TimeReader(desc); - case MILLIS: - return new GenericParquetReaders.TimeMillisReader(desc); - default: - throw new UnsupportedOperationException("Unsupported unit for time: " + unit); - } - } + protected abstract ParquetValueReader dateReader(ColumnDescriptor desc); - protected ParquetValueReader timestampReader(ColumnDescriptor desc, boolean isAdjustedToUTC) { - if (desc.getPrimitiveType().getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT96) { - return new GenericParquetReaders.TimestampInt96Reader(desc); - } + protected abstract ParquetValueReader timeReader(ColumnDescriptor desc); - LogicalTypeAnnotation timestamp = desc.getPrimitiveType().getLogicalTypeAnnotation(); - Preconditions.checkArgument( - timestamp instanceof TimestampLogicalTypeAnnotation, - "Invalid timestamp logical type: " + timestamp); - - LogicalTypeAnnotation.TimeUnit unit = ((TimestampLogicalTypeAnnotation) timestamp).getUnit(); - switch (unit) { - case MICROS: - return isAdjustedToUTC - ? new GenericParquetReaders.TimestamptzReader(desc) - : new GenericParquetReaders.TimestampReader(desc); - case MILLIS: - return isAdjustedToUTC - ? new GenericParquetReaders.TimestamptzMillisReader(desc) - : new GenericParquetReaders.TimestampMillisReader(desc); - default: - throw new UnsupportedOperationException("Unsupported unit for timestamp: " + unit); - } - } + protected abstract ParquetValueReader timestampReader( + ColumnDescriptor desc, boolean isAdjustedToUTC); protected Object convertConstant(org.apache.iceberg.types.Type type, Object value) { return value; @@ -151,7 +109,6 @@ public ParquetValueReader struct( // the expected struct is ignored because nested fields are never found when the List> newFields = Lists.newArrayListWithExpectedSize(fieldReaders.size()); - List types = Lists.newArrayListWithExpectedSize(fieldReaders.size()); List fields = struct.getFields(); for (int i = 0; i < fields.size(); i += 1) { ParquetValueReader fieldReader = fieldReaders.get(i); @@ -159,11 +116,10 @@ public ParquetValueReader struct( Type fieldType = fields.get(i); int fieldD = type().getMaxDefinitionLevel(path(fieldType.getName())) - 1; newFields.add(ParquetValueReaders.option(fieldType, fieldD, fieldReader)); - types.add(fieldType); } } - return createStructReader(types, newFields, expected); + return createStructReader(newFields, expected); } } @@ -207,8 +163,7 @@ public Optional> visit(TimeLogicalTypeAnnotation timeLogic @Override public Optional> visit( TimestampLogicalTypeAnnotation timestampLogicalType) { - return Optional.of( - timestampReader(desc, ((Types.TimestampType) expected).shouldAdjustToUTC())); + return Optional.of(timestampReader(desc, timestampLogicalType.isAdjustedToUTC())); } @Override @@ -268,10 +223,12 @@ public ParquetValueReader message( @Override public ParquetValueReader struct( Types.StructType expected, GroupType struct, List> fieldReaders) { + if (null == expected) { + return createStructReader(ImmutableList.of(), null); + } + // match the expected struct's order Map> readersById = Maps.newHashMap(); - Map typesById = Maps.newHashMap(); - Map maxDefinitionLevelsById = Maps.newHashMap(); List fields = struct.getFields(); for (int i = 0; i < fields.size(); i += 1) { ParquetValueReader fieldReader = fieldReaders.get(i); @@ -280,55 +237,37 @@ public ParquetValueReader struct( int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1; int id = fieldType.getId().intValue(); readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, fieldReader)); - typesById.put(id, fieldType); - if (idToConstant.containsKey(id)) { - maxDefinitionLevelsById.put(id, fieldD); - } } } - List expectedFields = - expected != null ? expected.fields() : ImmutableList.of(); + int constantDefinitionLevel = type.getMaxDefinitionLevel(currentPath()); + List expectedFields = expected.fields(); List> reorderedFields = Lists.newArrayListWithExpectedSize(expectedFields.size()); - List types = Lists.newArrayListWithExpectedSize(expectedFields.size()); - // Defaulting to parent max definition level - int defaultMaxDefinitionLevel = type.getMaxDefinitionLevel(currentPath()); + for (Types.NestedField field : expectedFields) { int id = field.fieldId(); - ParquetValueReader reader = readersById.get(id); - if (idToConstant.containsKey(id)) { - // containsKey is used because the constant may be null - int fieldMaxDefinitionLevel = - maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel); - reorderedFields.add( - ParquetValueReaders.constant(idToConstant.get(id), fieldMaxDefinitionLevel)); - types.add(null); - } else if (id == MetadataColumns.ROW_POSITION.fieldId()) { - reorderedFields.add(ParquetValueReaders.position()); - types.add(null); - } else if (id == MetadataColumns.IS_DELETED.fieldId()) { - reorderedFields.add(ParquetValueReaders.constant(false)); - types.add(null); - } else if (reader != null) { - reorderedFields.add(reader); - types.add(typesById.get(id)); - } else if (field.initialDefault() != null) { - reorderedFields.add( - ParquetValueReaders.constant( - convertConstant(field.type(), field.initialDefault()), - maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel))); - types.add(typesById.get(id)); - } else if (field.isOptional()) { - reorderedFields.add(ParquetValueReaders.nulls()); - types.add(null); - } else { - throw new IllegalArgumentException( - String.format("Missing required field: %s", field.name())); - } + ParquetValueReader reader = + ParquetValueReaders.replaceWithMetadataReader( + id, readersById.get(id), idToConstant, constantDefinitionLevel); + reorderedFields.add(defaultReader(field, reader, constantDefinitionLevel)); + } + + return createStructReader(reorderedFields, expected); + } + + private ParquetValueReader defaultReader( + Types.NestedField field, ParquetValueReader reader, int constantDL) { + if (reader != null) { + return reader; + } else if (field.initialDefault() != null) { + return ParquetValueReaders.constant( + convertConstant(field.type(), field.initialDefault()), constantDL); + } else if (field.isOptional()) { + return ParquetValueReaders.nulls(); } - return createStructReader(types, reorderedFields, expected); + throw new IllegalArgumentException(String.format("Missing required field: %s", field.name())); } @Override @@ -432,6 +371,17 @@ public ParquetValueReader primitive( } } + @Override + public ParquetValueReader variant( + Types.VariantType iVariant, GroupType variant, ParquetValueReader reader) { + return reader; + } + + @Override + public ParquetVariantVisitor> variantVisitor() { + return new VariantReaderBuilder(type, Arrays.asList(currentPath())); + } + MessageType type() { return type; } diff --git a/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java b/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java index 8199f698..6304a41e 100644 --- a/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java +++ b/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java @@ -18,13 +18,17 @@ */ package org.apache.iceberg.data.parquet; +import java.util.Arrays; import java.util.List; import java.util.Optional; -import org.apache.iceberg.parquet.ParquetTypeVisitor; import org.apache.iceberg.parquet.ParquetValueWriter; import org.apache.iceberg.parquet.ParquetValueWriters; +import org.apache.iceberg.parquet.ParquetVariantVisitor; +import org.apache.iceberg.parquet.TypeWithSchemaVisitor; +import org.apache.iceberg.parquet.VariantWriterBuilder; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.LogicalTypeAnnotation; @@ -32,41 +36,31 @@ import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; -/** - * @deprecated since 1.8.0, will be made package-private in 1.9.0 - */ -@Deprecated -public abstract class BaseParquetWriter { +abstract class BaseParquetWriter { - @SuppressWarnings("unchecked") protected ParquetValueWriter createWriter(MessageType type) { - return (ParquetValueWriter) ParquetTypeVisitor.visit(type, new WriteBuilder(type)); + return createWriter(null, type); + } + + @SuppressWarnings("unchecked") + protected ParquetValueWriter createWriter(Types.StructType struct, MessageType type) { + return (ParquetValueWriter) + TypeWithSchemaVisitor.visit(struct, type, new WriteBuilder(type)); } protected abstract ParquetValueWriters.StructWriter createStructWriter( - List> writers); + Types.StructType struct, List> writers); - protected ParquetValueWriter fixedWriter(ColumnDescriptor desc) { - return new GenericParquetWriter.FixedWriter(desc); - } + protected abstract ParquetValueWriter fixedWriter(ColumnDescriptor desc); - protected ParquetValueWriter dateWriter(ColumnDescriptor desc) { - return new GenericParquetWriter.DateWriter(desc); - } + protected abstract ParquetValueWriter dateWriter(ColumnDescriptor desc); - protected ParquetValueWriter timeWriter(ColumnDescriptor desc) { - return new GenericParquetWriter.TimeWriter(desc); - } + protected abstract ParquetValueWriter timeWriter(ColumnDescriptor desc); - protected ParquetValueWriter timestampWriter(ColumnDescriptor desc, boolean isAdjustedToUTC) { - if (isAdjustedToUTC) { - return new GenericParquetWriter.TimestamptzWriter(desc); - } else { - return new GenericParquetWriter.TimestampWriter(desc); - } - } + protected abstract ParquetValueWriter timestampWriter( + ColumnDescriptor desc, boolean isAdjustedToUTC); - private class WriteBuilder extends ParquetTypeVisitor> { + private class WriteBuilder extends TypeWithSchemaVisitor> { private final MessageType type; private WriteBuilder(MessageType type) { @@ -75,14 +69,14 @@ private WriteBuilder(MessageType type) { @Override public ParquetValueWriter message( - MessageType message, List> fieldWriters) { + Types.StructType struct, MessageType message, List> fieldWriters) { - return struct(message.asGroupType(), fieldWriters); + return struct(struct, message.asGroupType(), fieldWriters); } @Override public ParquetValueWriter struct( - GroupType struct, List> fieldWriters) { + Types.StructType iceberg, GroupType struct, List> fieldWriters) { List fields = struct.getFields(); List> writers = Lists.newArrayListWithExpectedSize(fieldWriters.size()); for (int i = 0; i < fields.size(); i += 1) { @@ -91,11 +85,12 @@ public ParquetValueWriter struct( writers.add(ParquetValueWriters.option(fieldType, fieldD, fieldWriters.get(i))); } - return createStructWriter(writers); + return createStructWriter(iceberg, writers); } @Override - public ParquetValueWriter list(GroupType array, ParquetValueWriter elementWriter) { + public ParquetValueWriter list( + Types.ListType iceberg, GroupType array, ParquetValueWriter elementWriter) { GroupType repeated = array.getFields().get(0).asGroupType(); String[] repeatedPath = currentPath(); @@ -111,7 +106,10 @@ public ParquetValueWriter list(GroupType array, ParquetValueWriter element @Override public ParquetValueWriter map( - GroupType map, ParquetValueWriter keyWriter, ParquetValueWriter valueWriter) { + Types.MapType iceberg, + GroupType map, + ParquetValueWriter keyWriter, + ParquetValueWriter valueWriter) { GroupType repeatedKeyValue = map.getFields().get(0).asGroupType(); String[] repeatedPath = currentPath(); @@ -131,7 +129,8 @@ public ParquetValueWriter map( } @Override - public ParquetValueWriter primitive(PrimitiveType primitive) { + public ParquetValueWriter primitive( + org.apache.iceberg.types.Type.PrimitiveType iceberg, PrimitiveType primitive) { ColumnDescriptor desc = type.getColumnDescription(currentPath()); LogicalTypeAnnotation logicalType = primitive.getLogicalTypeAnnotation(); if (logicalType != null) { @@ -161,6 +160,17 @@ public ParquetValueWriter primitive(PrimitiveType primitive) { throw new UnsupportedOperationException("Unsupported type: " + primitive); } } + + @Override + public ParquetValueWriter variant( + Types.VariantType iVariant, GroupType variant, ParquetValueWriter result) { + return result; + } + + @Override + public ParquetVariantVisitor> variantVisitor() { + return new VariantWriterBuilder(type, Arrays.asList(currentPath())); + } } private class LogicalTypeWriterVisitor @@ -224,8 +234,8 @@ public Optional> visit( public Optional> visit( LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampType) { Preconditions.checkArgument( - LogicalTypeAnnotation.TimeUnit.MICROS.equals(timestampType.getUnit()), - "Cannot write timestamp in %s, only MICROS is supported", + !LogicalTypeAnnotation.TimeUnit.MILLIS.equals(timestampType.getUnit()), + "Cannot write timestamp in %s, only MICROS and NANOS are supported", timestampType.getUnit()); return Optional.of(timestampWriter(desc, timestampType.isAdjustedToUTC())); } diff --git a/ice/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java b/ice/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java index a00c1176..a003e1af 100644 --- a/ice/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java +++ b/ice/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java @@ -27,6 +27,7 @@ import java.util.function.Function; import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.types.Type; +import org.apache.iceberg.util.UUIDUtil; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; @@ -77,6 +78,55 @@ static Literal fromParquetPrimitive(Type type, PrimitiveType parquetType, } } + // Iceberg 1.9 introduced this API in upstream ParquetConversions and uses it from + // ParquetMetrics. Provide a compatible shim here so callers compiled against 1.9 + // continue to resolve against this shadowed class. The TIME/TIMESTAMP branch keeps + // the local millis -> micros normalization used by fromParquetPrimitive above. + @SuppressWarnings("unchecked") + static T convertValue(Type type, PrimitiveType parquetType, Object value) { + switch (type.typeId()) { + case BOOLEAN: + case INTEGER: + case DATE: + case LONG: + case TIMESTAMP_NANO: + case FLOAT: + case DOUBLE: + return (T) value; + case TIME: + case TIMESTAMP: + if (parquetType.getLogicalTypeAnnotation() + instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation tsAnno + && tsAnno.getUnit() == LogicalTypeAnnotation.TimeUnit.MILLIS) { + return (T) Long.valueOf(((Number) value).longValue() * 1000L); + } + return (T) value; + case STRING: + return (T) ((Binary) value).toStringUsingUTF8(); + case UUID: + return (T) UUIDUtil.convert(((Binary) value).toByteBuffer()); + case FIXED: + case BINARY: + return (T) ((Binary) value).toByteBuffer(); + case DECIMAL: + int scale = + ((DecimalLogicalTypeAnnotation) parquetType.getLogicalTypeAnnotation()).getScale(); + switch (parquetType.getPrimitiveTypeName()) { + case INT32: + case INT64: + return (T) BigDecimal.valueOf(((Number) value).longValue(), scale); + case FIXED_LEN_BYTE_ARRAY: + case BINARY: + return (T) new BigDecimal(new BigInteger(((Binary) value).getBytes()), scale); + default: + throw new IllegalArgumentException( + "Unsupported primitive type for decimal: " + parquetType.getPrimitiveTypeName()); + } + default: + throw new IllegalArgumentException("Unsupported primitive type: " + type); + } + } + static Function converterFromParquet( PrimitiveType parquetType, Type icebergType) { Function fromParquet = converterFromParquet(parquetType); diff --git a/ice/src/main/java/org/apache/iceberg/rest/HTTPClient.java b/ice/src/main/java/org/apache/iceberg/rest/HTTPClient.java index a939cdf0..c771c83c 100644 --- a/ice/src/main/java/org/apache/iceberg/rest/HTTPClient.java +++ b/ice/src/main/java/org/apache/iceberg/rest/HTTPClient.java @@ -41,7 +41,6 @@ import org.apache.hc.core5.http.Header; import org.apache.hc.core5.http.HttpHeaders; import org.apache.hc.core5.http.HttpHost; -import org.apache.hc.core5.http.HttpRequestInterceptor; import org.apache.hc.core5.http.HttpStatus; import org.apache.hc.core5.http.ParseException; import org.apache.hc.core5.http.impl.EnglishReasonPhraseCatalog; @@ -49,8 +48,6 @@ import org.apache.hc.core5.http.io.entity.StringEntity; import org.apache.hc.core5.io.CloseMode; import org.apache.iceberg.IcebergBuild; -import org.apache.iceberg.common.DynConstructors; -import org.apache.iceberg.common.DynMethods; import org.apache.iceberg.exceptions.RESTException; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -66,9 +63,6 @@ public class HTTPClient extends BaseHTTPClient { private static final Logger LOG = LoggerFactory.getLogger(HTTPClient.class); - private static final String SIGV4_ENABLED = "rest.sigv4-enabled"; - private static final String SIGV4_REQUEST_INTERCEPTOR_IMPL = - "org.apache.iceberg.aws.RESTSigV4Signer"; @VisibleForTesting static final String CLIENT_VERSION_HEADER = "X-Client-Version"; @VisibleForTesting @@ -90,6 +84,7 @@ public class HTTPClient extends BaseHTTPClient { private final Map baseHeaders; private final ObjectMapper mapper; private final AuthSession authSession; + private final boolean isRootClient; private HTTPClient( URI baseUri, @@ -97,7 +92,6 @@ private HTTPClient( CredentialsProvider proxyCredsProvider, Map baseHeaders, ObjectMapper objectMapper, - HttpRequestInterceptor requestInterceptor, Map properties, HttpClientConnectionManager connectionManager, AuthSession session) { @@ -110,10 +104,6 @@ private HTTPClient( clientBuilder.setConnectionManager(connectionManager); - if (requestInterceptor != null) { - clientBuilder.addRequestInterceptorLast(requestInterceptor); - } - int maxRetries = PropertyUtil.propertyAsInt(properties, REST_MAX_RETRIES, 5); clientBuilder.setRetryStrategy(new ExponentialHttpRequestRetryStrategy(maxRetries)); @@ -126,6 +116,7 @@ private HTTPClient( } this.httpClient = clientBuilder.build(); + this.isRootClient = true; } /** @@ -139,6 +130,7 @@ private HTTPClient(HTTPClient parent, AuthSession authSession) { this.mapper = parent.mapper; this.baseHeaders = parent.baseHeaders; this.authSession = authSession; + this.isRootClient = false; } @Override @@ -160,7 +152,6 @@ private static String extractResponseBodyAsString(CloseableHttpResponse response } } - // Per the spec, the only currently defined / used "success" responses are 200 and 202. private static boolean isSuccessful(CloseableHttpResponse response) { int code = response.getCode(); return code == HttpStatus.SC_OK @@ -331,50 +322,13 @@ protected T execute( @Override public void close() throws IOException { - try { - if (authSession != null) { - authSession.close(); - } - } finally { + // Do not close the AuthSession as it's managed by the owner of this HTTPClient. + // Only close the underlying Apache HTTP client if this is a root HTTPClient. + if (isRootClient) { httpClient.close(CloseMode.GRACEFUL); } } - @VisibleForTesting - static HttpRequestInterceptor loadInterceptorDynamically( - String impl, Map properties) { - HttpRequestInterceptor instance; - - DynConstructors.Ctor ctor; - try { - ctor = - DynConstructors.builder(HttpRequestInterceptor.class) - .loader(HTTPClient.class.getClassLoader()) - .impl(impl) - .buildChecked(); - } catch (NoSuchMethodException e) { - throw new IllegalArgumentException( - String.format( - "Cannot initialize RequestInterceptor, missing no-arg constructor: %s", impl), - e); - } - - try { - instance = ctor.newInstance(); - } catch (ClassCastException e) { - throw new IllegalArgumentException( - String.format("Cannot initialize, %s does not implement RequestInterceptor", impl), e); - } - - DynMethods.builder("initialize") - .hiddenImpl(impl, Map.class) - .orNoop() - .build(instance) - .invoke(properties); - - return instance; - } - static HttpClientConnectionManager configureConnectionManager( Map properties, TlsSocketStrategy tlsSocketStrategy) { PoolingHttpClientConnectionManagerBuilder connectionManagerBuilder = @@ -501,12 +455,6 @@ public HTTPClient build() { withHeader(CLIENT_VERSION_HEADER, IcebergBuild.fullVersion()); withHeader(CLIENT_GIT_COMMIT_SHORT_HEADER, IcebergBuild.gitCommitShortId()); - HttpRequestInterceptor interceptor = null; - - if (PropertyUtil.propertyAsBoolean(properties, SIGV4_ENABLED, false)) { - interceptor = loadInterceptorDynamically(SIGV4_REQUEST_INTERCEPTOR_IMPL, properties); - } - if (this.proxyCredentialsProvider != null) { Preconditions.checkNotNull( proxy, "Invalid http client proxy for proxy credentials provider: null"); @@ -518,7 +466,6 @@ public HTTPClient build() { proxyCredentialsProvider, baseHeaders, mapper, - interceptor, properties, configureConnectionManager(properties, tlsSocketStrategy), authSession); diff --git a/ice/src/test/java/com/altinity/ice/cli/internal/cmd/AlterTableTest.java b/ice/src/test/java/com/altinity/ice/cli/internal/cmd/AlterTableTest.java index 588e0089..02bd0cc3 100644 --- a/ice/src/test/java/com/altinity/ice/cli/internal/cmd/AlterTableTest.java +++ b/ice/src/test/java/com/altinity/ice/cli/internal/cmd/AlterTableTest.java @@ -116,4 +116,109 @@ public void testDropAllPartitionFields() throws Exception { table = catalog.loadTable(tableId); assertThat(table.spec().fields()).isEmpty(); } + + // Default values require Iceberg table spec v3. + private Table createV3Table() { + return catalog.buildTable(tableId, schema).withProperty("format-version", "3").create(); + } + + @Test + public void testAddColumnNullable() throws Exception { + catalog.buildTable(tableId, schema).create(); + + List updates = + Arrays.asList(new AlterTable.AddColumn("email", "string", null, null, null)); + + AlterTable.run(catalog, tableId, updates); + + Table table = catalog.loadTable(tableId); + Types.NestedField added = table.schema().findField("email"); + assertThat(added).isNotNull(); + assertThat(added.isOptional()).isTrue(); + assertThat(added.type()).isEqualTo(Types.StringType.get()); + assertThat(added.initialDefault()).isNull(); + assertThat(added.writeDefault()).isNull(); + } + + @Test + public void testAddRequiredColumnWithoutDefaultIsRejected() throws Exception { + catalog.buildTable(tableId, schema).create(); + + // Iceberg 1.9+ rejects adding a required column without an explicit default + // because there is no safe value to backfill into existing rows. + assertThatThrownBy( + () -> + AlterTable.run( + catalog, + tableId, + Arrays.asList(new AlterTable.AddColumn("status", "string", null, false, null)))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("required column"); + } + + @Test + public void testAddColumnWithDefault() throws Exception { + createV3Table(); + + List updates = + Arrays.asList(new AlterTable.AddColumn("age", "int", null, true, 0)); + + AlterTable.run(catalog, tableId, updates); + + Table table = catalog.loadTable(tableId); + Types.NestedField added = table.schema().findField("age"); + assertThat(added).isNotNull(); + assertThat(added.isOptional()).isTrue(); + assertThat(added.type()).isEqualTo(Types.IntegerType.get()); + assertThat(added.initialDefault()).isEqualTo(0); + assertThat(added.writeDefault()).isEqualTo(0); + } + + @Test + public void testAddRequiredColumnWithDefault() throws Exception { + createV3Table(); + + List updates = + Arrays.asList(new AlterTable.AddColumn("score", "long", null, false, 42)); + + AlterTable.run(catalog, tableId, updates); + + Table table = catalog.loadTable(tableId); + Types.NestedField added = table.schema().findField("score"); + assertThat(added).isNotNull(); + assertThat(added.isRequired()).isTrue(); + assertThat(added.type()).isEqualTo(Types.LongType.get()); + assertThat(added.initialDefault()).isEqualTo(42L); + assertThat(added.writeDefault()).isEqualTo(42L); + } + + @Test + public void testAddColumnWithStringDefault() throws Exception { + createV3Table(); + + List updates = + Arrays.asList(new AlterTable.AddColumn("country", "string", null, true, "US")); + + AlterTable.run(catalog, tableId, updates); + + Table table = catalog.loadTable(tableId); + Types.NestedField added = table.schema().findField("country"); + assertThat(added).isNotNull(); + assertThat(added.type()).isEqualTo(Types.StringType.get()); + assertThat(added.initialDefault().toString()).isEqualTo("US"); + } + + @Test + public void testAddColumnInvalidDefaultThrows() throws Exception { + createV3Table(); + + assertThatThrownBy( + () -> + AlterTable.run( + catalog, + tableId, + Arrays.asList( + new AlterTable.AddColumn("age", "int", null, true, "not-an-int")))) + .isInstanceOf(IllegalArgumentException.class); + } } diff --git a/pom.xml b/pom.xml index e84e6369..c6ed84cc 100644 --- a/pom.xml +++ b/pom.xml @@ -14,12 +14,12 @@ 21 21 - 1.8.1 + 1.9.2 3.4.1 1.15.1 2.31.13 4.7.6 - 2.18.2 + 2.18.3 2.0.17 1.5.18 1.3.6 From 8856728b1f50ef90dd537fea70236f639867c130 Mon Sep 17 00:00:00 2001 From: kanthi subramanian Date: Fri, 24 Apr 2026 09:59:39 -0500 Subject: [PATCH 2/4] rollback changes to overridden classes. --- .../iceberg/BaseMetastoreTableOperations.java | 20 -- .../data/parquet/BaseParquetReaders.java | 231 ++++++++---------- .../org/apache/iceberg/rest/HTTPClient.java | 65 ++++- 3 files changed, 165 insertions(+), 151 deletions(-) diff --git a/ice-rest-catalog/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java b/ice-rest-catalog/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java index e9ca5cc5..9363e252 100644 --- a/ice-rest-catalog/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java +++ b/ice-rest-catalog/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java @@ -310,26 +310,6 @@ protected CommitStatus checkCommitStatus(String newMetadataLocation, TableMetada () -> checkCurrentMetadataLocation(newMetadataLocation)); } - /** - * Attempt to load the table and see if any current or past metadata location matches the one we - * were attempting to set. This is used as a last resort when we are dealing with exceptions that - * may indicate the commit has failed but are not proof that this is the case. Past locations must - * also be searched on the chance that a second committer was able to successfully commit on top - * of our commit. When the {@code newMetadataLocation} is not found, the method returns {@link - * CommitStatus#FAILURE}. - * - * @param newMetadataLocation the path of the new commit file - * @param config metadata to use for configuration - * @return Commit Status of Success, Failure or Unknown - */ - protected CommitStatus checkCommitStatusStrict(String newMetadataLocation, TableMetadata config) { - return checkCommitStatusStrict( - tableName(), - newMetadataLocation, - config.properties(), - () -> checkCurrentMetadataLocation(newMetadataLocation)); - } - /** * Validate if the new metadata location is the current metadata location or present within * previous metadata files. diff --git a/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java b/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java index 8708fe28..d894b73b 100644 --- a/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java +++ b/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java @@ -18,17 +18,15 @@ */ package org.apache.iceberg.data.parquet; -import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Optional; +import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.parquet.ParquetSchemaUtil; import org.apache.iceberg.parquet.ParquetValueReader; import org.apache.iceberg.parquet.ParquetValueReaders; -import org.apache.iceberg.parquet.ParquetVariantVisitor; import org.apache.iceberg.parquet.TypeWithSchemaVisitor; -import org.apache.iceberg.parquet.VariantReaderBuilder; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -39,20 +37,20 @@ import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.LogicalTypeAnnotation; -import org.apache.parquet.schema.LogicalTypeAnnotation.DateLogicalTypeAnnotation; import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; import org.apache.parquet.schema.LogicalTypeAnnotation.EnumLogicalTypeAnnotation; import org.apache.parquet.schema.LogicalTypeAnnotation.IntLogicalTypeAnnotation; import org.apache.parquet.schema.LogicalTypeAnnotation.JsonLogicalTypeAnnotation; import org.apache.parquet.schema.LogicalTypeAnnotation.LogicalTypeAnnotationVisitor; import org.apache.parquet.schema.LogicalTypeAnnotation.StringLogicalTypeAnnotation; -import org.apache.parquet.schema.LogicalTypeAnnotation.TimeLogicalTypeAnnotation; -import org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; -abstract class BaseParquetReaders { +/** + * @deprecated since 1.8.0, will be made package-private in 1.9.0 + */ +@Deprecated +public abstract class BaseParquetReaders { protected BaseParquetReaders() {} protected ParquetValueReader createReader(Schema expectedSchema, MessageType fileSchema) { @@ -76,16 +74,7 @@ protected ParquetValueReader createReader( } protected abstract ParquetValueReader createStructReader( - List> fieldReaders, Types.StructType structType); - - protected abstract ParquetValueReader fixedReader(ColumnDescriptor desc); - - protected abstract ParquetValueReader dateReader(ColumnDescriptor desc); - - protected abstract ParquetValueReader timeReader(ColumnDescriptor desc); - - protected abstract ParquetValueReader timestampReader( - ColumnDescriptor desc, boolean isAdjustedToUTC); + List types, List> fieldReaders, Types.StructType structType); protected Object convertConstant(org.apache.iceberg.types.Type type, Object value) { return value; @@ -109,6 +98,7 @@ public ParquetValueReader struct( // the expected struct is ignored because nested fields are never found when the List> newFields = Lists.newArrayListWithExpectedSize(fieldReaders.size()); + List types = Lists.newArrayListWithExpectedSize(fieldReaders.size()); List fields = struct.getFields(); for (int i = 0; i < fields.size(); i += 1) { ParquetValueReader fieldReader = fieldReaders.get(i); @@ -116,10 +106,11 @@ public ParquetValueReader struct( Type fieldType = fields.get(i); int fieldD = type().getMaxDefinitionLevel(path(fieldType.getName())) - 1; newFields.add(ParquetValueReaders.option(fieldType, fieldD, fieldReader)); + types.add(fieldType); } } - return createStructReader(newFields, expected); + return createStructReader(types, newFields, expected); } } @@ -150,22 +141,6 @@ public Optional> visit(DecimalLogicalTypeAnnotation decima return Optional.of(ParquetValueReaders.bigDecimals(desc)); } - @Override - public Optional> visit(DateLogicalTypeAnnotation dateLogicalType) { - return Optional.of(dateReader(desc)); - } - - @Override - public Optional> visit(TimeLogicalTypeAnnotation timeLogicalType) { - return Optional.of(timeReader(desc)); - } - - @Override - public Optional> visit( - TimestampLogicalTypeAnnotation timestampLogicalType) { - return Optional.of(timestampReader(desc, timestampLogicalType.isAdjustedToUTC())); - } - @Override public Optional> visit(IntLogicalTypeAnnotation intLogicalType) { if (intLogicalType.getBitWidth() == 64) { @@ -223,12 +198,10 @@ public ParquetValueReader message( @Override public ParquetValueReader struct( Types.StructType expected, GroupType struct, List> fieldReaders) { - if (null == expected) { - return createStructReader(ImmutableList.of(), null); - } - // match the expected struct's order Map> readersById = Maps.newHashMap(); + Map typesById = Maps.newHashMap(); + Map maxDefinitionLevelsById = Maps.newHashMap(); List fields = struct.getFields(); for (int i = 0; i < fields.size(); i += 1) { ParquetValueReader fieldReader = fieldReaders.get(i); @@ -237,37 +210,55 @@ public ParquetValueReader struct( int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1; int id = fieldType.getId().intValue(); readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, fieldReader)); + typesById.put(id, fieldType); + if (idToConstant.containsKey(id)) { + maxDefinitionLevelsById.put(id, fieldD); + } } } - int constantDefinitionLevel = type.getMaxDefinitionLevel(currentPath()); - List expectedFields = expected.fields(); + List expectedFields = + expected != null ? expected.fields() : ImmutableList.of(); List> reorderedFields = Lists.newArrayListWithExpectedSize(expectedFields.size()); - + List types = Lists.newArrayListWithExpectedSize(expectedFields.size()); + // Defaulting to parent max definition level + int defaultMaxDefinitionLevel = type.getMaxDefinitionLevel(currentPath()); for (Types.NestedField field : expectedFields) { int id = field.fieldId(); - ParquetValueReader reader = - ParquetValueReaders.replaceWithMetadataReader( - id, readersById.get(id), idToConstant, constantDefinitionLevel); - reorderedFields.add(defaultReader(field, reader, constantDefinitionLevel)); - } - - return createStructReader(reorderedFields, expected); - } - - private ParquetValueReader defaultReader( - Types.NestedField field, ParquetValueReader reader, int constantDL) { - if (reader != null) { - return reader; - } else if (field.initialDefault() != null) { - return ParquetValueReaders.constant( - convertConstant(field.type(), field.initialDefault()), constantDL); - } else if (field.isOptional()) { - return ParquetValueReaders.nulls(); + ParquetValueReader reader = readersById.get(id); + if (idToConstant.containsKey(id)) { + // containsKey is used because the constant may be null + int fieldMaxDefinitionLevel = + maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel); + reorderedFields.add( + ParquetValueReaders.constant(idToConstant.get(id), fieldMaxDefinitionLevel)); + types.add(null); + } else if (id == MetadataColumns.ROW_POSITION.fieldId()) { + reorderedFields.add(ParquetValueReaders.position()); + types.add(null); + } else if (id == MetadataColumns.IS_DELETED.fieldId()) { + reorderedFields.add(ParquetValueReaders.constant(false)); + types.add(null); + } else if (reader != null) { + reorderedFields.add(reader); + types.add(typesById.get(id)); + } else if (field.initialDefault() != null) { + reorderedFields.add( + ParquetValueReaders.constant( + convertConstant(field.type(), field.initialDefault()), + maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel))); + types.add(typesById.get(id)); + } else if (field.isOptional()) { + reorderedFields.add(ParquetValueReaders.nulls()); + types.add(null); + } else { + throw new IllegalArgumentException( + String.format("Missing required field: %s", field.name())); + } } - throw new IllegalArgumentException(String.format("Missing required field: %s", field.name())); + return createStructReader(types, reorderedFields, expected); } @Override @@ -317,70 +308,60 @@ public ParquetValueReader map( ParquetValueReaders.option(valueType, valueD, valueReader)); } - @Override - @SuppressWarnings("checkstyle:CyclomaticComplexity") - public ParquetValueReader primitive( - org.apache.iceberg.types.Type.PrimitiveType expected, PrimitiveType primitive) { - if (expected == null) { - return null; - } - - ColumnDescriptor desc = type.getColumnDescription(currentPath()); - - if (primitive.getLogicalTypeAnnotation() != null) { - return primitive - .getLogicalTypeAnnotation() - .accept(new LogicalTypeReadBuilder(desc, expected)) - .orElseThrow( - () -> - new UnsupportedOperationException( - "Unsupported logical type: " + primitive.getLogicalTypeAnnotation())); - } - - switch (primitive.getPrimitiveTypeName()) { - case FIXED_LEN_BYTE_ARRAY: - return fixedReader(desc); - case BINARY: - if (expected.typeId() == TypeID.STRING) { - return ParquetValueReaders.strings(desc); - } else { - return ParquetValueReaders.byteBuffers(desc); - } - case INT32: - if (expected.typeId() == TypeID.LONG) { - return ParquetValueReaders.intsAsLongs(desc); - } else { - return ParquetValueReaders.unboxed(desc); - } - case FLOAT: - if (expected.typeId() == TypeID.DOUBLE) { - return ParquetValueReaders.floatsAsDoubles(desc); - } else { - return ParquetValueReaders.unboxed(desc); - } - case BOOLEAN: - case INT64: - case DOUBLE: - return ParquetValueReaders.unboxed(desc); - case INT96: - // Impala & Spark used to write timestamps as INT96 without a logical type. For backwards - // compatibility we try to read INT96 as timestamps. - return timestampReader(desc, true); - default: - throw new UnsupportedOperationException("Unsupported type: " + primitive); - } - } - - @Override - public ParquetValueReader variant( - Types.VariantType iVariant, GroupType variant, ParquetValueReader reader) { - return reader; - } - - @Override - public ParquetVariantVisitor> variantVisitor() { - return new VariantReaderBuilder(type, Arrays.asList(currentPath())); - } + // @Override + // @SuppressWarnings("checkstyle:CyclomaticComplexity") + // public ParquetValueReader primitive( + // org.apache.iceberg.types.Type.PrimitiveType expected, PrimitiveType primitive) { + // if (expected == null) { + // return null; + // } + // + // ColumnDescriptor desc = type.getColumnDescription(currentPath()); + // + // if (primitive.getLogicalTypeAnnotation() != null) { + // return primitive + // .getLogicalTypeAnnotation() + // .accept(new LogicalTypeReadBuilder(desc, expected)) + // .orElseThrow( + // () -> + // new UnsupportedOperationException( + // "Unsupported logical type: " + primitive.getLogicalTypeAnnotation())); + // } + // + // switch (primitive.getPrimitiveTypeName()) { + // case FIXED_LEN_BYTE_ARRAY: + // return fixedReader(desc); + // case BINARY: + // if (expected.typeId() == TypeID.STRING) { + // return ParquetValueReaders.strings(desc); + // } else { + // return ParquetValueReaders.byteBuffers(desc); + // } + // case INT32: + // if (expected.typeId() == TypeID.LONG) { + // return ParquetValueReaders.intsAsLongs(desc); + // } else { + // return ParquetValueReaders.unboxed(desc); + // } + // case FLOAT: + // if (expected.typeId() == TypeID.DOUBLE) { + // return ParquetValueReaders.floatsAsDoubles(desc); + // } else { + // return ParquetValueReaders.unboxed(desc); + // } + // case BOOLEAN: + // case INT64: + // case DOUBLE: + // return ParquetValueReaders.unboxed(desc); + // case INT96: + // // Impala & Spark used to write timestamps as INT96 without a logical type. For + // backwards + // // compatibility we try to read INT96 as timestamps. + // return timestampReader(desc, true); + // default: + // throw new UnsupportedOperationException("Unsupported type: " + primitive); + // } + // } MessageType type() { return type; diff --git a/ice/src/main/java/org/apache/iceberg/rest/HTTPClient.java b/ice/src/main/java/org/apache/iceberg/rest/HTTPClient.java index c771c83c..a939cdf0 100644 --- a/ice/src/main/java/org/apache/iceberg/rest/HTTPClient.java +++ b/ice/src/main/java/org/apache/iceberg/rest/HTTPClient.java @@ -41,6 +41,7 @@ import org.apache.hc.core5.http.Header; import org.apache.hc.core5.http.HttpHeaders; import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.HttpRequestInterceptor; import org.apache.hc.core5.http.HttpStatus; import org.apache.hc.core5.http.ParseException; import org.apache.hc.core5.http.impl.EnglishReasonPhraseCatalog; @@ -48,6 +49,8 @@ import org.apache.hc.core5.http.io.entity.StringEntity; import org.apache.hc.core5.io.CloseMode; import org.apache.iceberg.IcebergBuild; +import org.apache.iceberg.common.DynConstructors; +import org.apache.iceberg.common.DynMethods; import org.apache.iceberg.exceptions.RESTException; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -63,6 +66,9 @@ public class HTTPClient extends BaseHTTPClient { private static final Logger LOG = LoggerFactory.getLogger(HTTPClient.class); + private static final String SIGV4_ENABLED = "rest.sigv4-enabled"; + private static final String SIGV4_REQUEST_INTERCEPTOR_IMPL = + "org.apache.iceberg.aws.RESTSigV4Signer"; @VisibleForTesting static final String CLIENT_VERSION_HEADER = "X-Client-Version"; @VisibleForTesting @@ -84,7 +90,6 @@ public class HTTPClient extends BaseHTTPClient { private final Map baseHeaders; private final ObjectMapper mapper; private final AuthSession authSession; - private final boolean isRootClient; private HTTPClient( URI baseUri, @@ -92,6 +97,7 @@ private HTTPClient( CredentialsProvider proxyCredsProvider, Map baseHeaders, ObjectMapper objectMapper, + HttpRequestInterceptor requestInterceptor, Map properties, HttpClientConnectionManager connectionManager, AuthSession session) { @@ -104,6 +110,10 @@ private HTTPClient( clientBuilder.setConnectionManager(connectionManager); + if (requestInterceptor != null) { + clientBuilder.addRequestInterceptorLast(requestInterceptor); + } + int maxRetries = PropertyUtil.propertyAsInt(properties, REST_MAX_RETRIES, 5); clientBuilder.setRetryStrategy(new ExponentialHttpRequestRetryStrategy(maxRetries)); @@ -116,7 +126,6 @@ private HTTPClient( } this.httpClient = clientBuilder.build(); - this.isRootClient = true; } /** @@ -130,7 +139,6 @@ private HTTPClient(HTTPClient parent, AuthSession authSession) { this.mapper = parent.mapper; this.baseHeaders = parent.baseHeaders; this.authSession = authSession; - this.isRootClient = false; } @Override @@ -152,6 +160,7 @@ private static String extractResponseBodyAsString(CloseableHttpResponse response } } + // Per the spec, the only currently defined / used "success" responses are 200 and 202. private static boolean isSuccessful(CloseableHttpResponse response) { int code = response.getCode(); return code == HttpStatus.SC_OK @@ -322,13 +331,50 @@ protected T execute( @Override public void close() throws IOException { - // Do not close the AuthSession as it's managed by the owner of this HTTPClient. - // Only close the underlying Apache HTTP client if this is a root HTTPClient. - if (isRootClient) { + try { + if (authSession != null) { + authSession.close(); + } + } finally { httpClient.close(CloseMode.GRACEFUL); } } + @VisibleForTesting + static HttpRequestInterceptor loadInterceptorDynamically( + String impl, Map properties) { + HttpRequestInterceptor instance; + + DynConstructors.Ctor ctor; + try { + ctor = + DynConstructors.builder(HttpRequestInterceptor.class) + .loader(HTTPClient.class.getClassLoader()) + .impl(impl) + .buildChecked(); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException( + String.format( + "Cannot initialize RequestInterceptor, missing no-arg constructor: %s", impl), + e); + } + + try { + instance = ctor.newInstance(); + } catch (ClassCastException e) { + throw new IllegalArgumentException( + String.format("Cannot initialize, %s does not implement RequestInterceptor", impl), e); + } + + DynMethods.builder("initialize") + .hiddenImpl(impl, Map.class) + .orNoop() + .build(instance) + .invoke(properties); + + return instance; + } + static HttpClientConnectionManager configureConnectionManager( Map properties, TlsSocketStrategy tlsSocketStrategy) { PoolingHttpClientConnectionManagerBuilder connectionManagerBuilder = @@ -455,6 +501,12 @@ public HTTPClient build() { withHeader(CLIENT_VERSION_HEADER, IcebergBuild.fullVersion()); withHeader(CLIENT_GIT_COMMIT_SHORT_HEADER, IcebergBuild.gitCommitShortId()); + HttpRequestInterceptor interceptor = null; + + if (PropertyUtil.propertyAsBoolean(properties, SIGV4_ENABLED, false)) { + interceptor = loadInterceptorDynamically(SIGV4_REQUEST_INTERCEPTOR_IMPL, properties); + } + if (this.proxyCredentialsProvider != null) { Preconditions.checkNotNull( proxy, "Invalid http client proxy for proxy credentials provider: null"); @@ -466,6 +518,7 @@ public HTTPClient build() { proxyCredentialsProvider, baseHeaders, mapper, + interceptor, properties, configureConnectionManager(properties, tlsSocketStrategy), authSession); From 2eca2e76e7d25c68bd2b5528426b162d0f8bb753 Mon Sep 17 00:00:00 2001 From: kanthi subramanian Date: Tue, 28 Apr 2026 16:41:54 -0500 Subject: [PATCH 3/4] rolled back changes to date/timestamp that was part of the original implementation. --- .../data/parquet/BaseParquetReaders.java | 136 +++++++++++------- 1 file changed, 82 insertions(+), 54 deletions(-) diff --git a/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java b/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java index d894b73b..408c6a37 100644 --- a/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java +++ b/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java @@ -37,13 +37,17 @@ import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.DateLogicalTypeAnnotation; import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; import org.apache.parquet.schema.LogicalTypeAnnotation.EnumLogicalTypeAnnotation; import org.apache.parquet.schema.LogicalTypeAnnotation.IntLogicalTypeAnnotation; import org.apache.parquet.schema.LogicalTypeAnnotation.JsonLogicalTypeAnnotation; import org.apache.parquet.schema.LogicalTypeAnnotation.LogicalTypeAnnotationVisitor; import org.apache.parquet.schema.LogicalTypeAnnotation.StringLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.TimeLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; /** @@ -76,6 +80,15 @@ protected ParquetValueReader createReader( protected abstract ParquetValueReader createStructReader( List types, List> fieldReaders, Types.StructType structType); + protected abstract ParquetValueReader fixedReader(ColumnDescriptor desc); + + protected abstract ParquetValueReader dateReader(ColumnDescriptor desc); + + protected abstract ParquetValueReader timeReader(ColumnDescriptor desc); + + protected abstract ParquetValueReader timestampReader( + ColumnDescriptor desc, boolean isAdjustedToUTC); + protected Object convertConstant(org.apache.iceberg.types.Type type, Object value) { return value; } @@ -141,6 +154,22 @@ public Optional> visit(DecimalLogicalTypeAnnotation decima return Optional.of(ParquetValueReaders.bigDecimals(desc)); } + @Override + public Optional> visit(DateLogicalTypeAnnotation dateLogicalType) { + return Optional.of(dateReader(desc)); + } + + @Override + public Optional> visit(TimeLogicalTypeAnnotation timeLogicalType) { + return Optional.of(timeReader(desc)); + } + + @Override + public Optional> visit( + TimestampLogicalTypeAnnotation timestampLogicalType) { + return Optional.of(timestampReader(desc, timestampLogicalType.isAdjustedToUTC())); + } + @Override public Optional> visit(IntLogicalTypeAnnotation intLogicalType) { if (intLogicalType.getBitWidth() == 64) { @@ -308,60 +337,59 @@ public ParquetValueReader map( ParquetValueReaders.option(valueType, valueD, valueReader)); } - // @Override - // @SuppressWarnings("checkstyle:CyclomaticComplexity") - // public ParquetValueReader primitive( - // org.apache.iceberg.types.Type.PrimitiveType expected, PrimitiveType primitive) { - // if (expected == null) { - // return null; - // } - // - // ColumnDescriptor desc = type.getColumnDescription(currentPath()); - // - // if (primitive.getLogicalTypeAnnotation() != null) { - // return primitive - // .getLogicalTypeAnnotation() - // .accept(new LogicalTypeReadBuilder(desc, expected)) - // .orElseThrow( - // () -> - // new UnsupportedOperationException( - // "Unsupported logical type: " + primitive.getLogicalTypeAnnotation())); - // } - // - // switch (primitive.getPrimitiveTypeName()) { - // case FIXED_LEN_BYTE_ARRAY: - // return fixedReader(desc); - // case BINARY: - // if (expected.typeId() == TypeID.STRING) { - // return ParquetValueReaders.strings(desc); - // } else { - // return ParquetValueReaders.byteBuffers(desc); - // } - // case INT32: - // if (expected.typeId() == TypeID.LONG) { - // return ParquetValueReaders.intsAsLongs(desc); - // } else { - // return ParquetValueReaders.unboxed(desc); - // } - // case FLOAT: - // if (expected.typeId() == TypeID.DOUBLE) { - // return ParquetValueReaders.floatsAsDoubles(desc); - // } else { - // return ParquetValueReaders.unboxed(desc); - // } - // case BOOLEAN: - // case INT64: - // case DOUBLE: - // return ParquetValueReaders.unboxed(desc); - // case INT96: - // // Impala & Spark used to write timestamps as INT96 without a logical type. For - // backwards - // // compatibility we try to read INT96 as timestamps. - // return timestampReader(desc, true); - // default: - // throw new UnsupportedOperationException("Unsupported type: " + primitive); - // } - // } + @Override + @SuppressWarnings("checkstyle:CyclomaticComplexity") + public ParquetValueReader primitive( + org.apache.iceberg.types.Type.PrimitiveType expected, PrimitiveType primitive) { + if (expected == null) { + return null; + } + + ColumnDescriptor desc = type.getColumnDescription(currentPath()); + + if (primitive.getLogicalTypeAnnotation() != null) { + return primitive + .getLogicalTypeAnnotation() + .accept(new LogicalTypeReadBuilder(desc, expected)) + .orElseThrow( + () -> + new UnsupportedOperationException( + "Unsupported logical type: " + primitive.getLogicalTypeAnnotation())); + } + + switch (primitive.getPrimitiveTypeName()) { + case FIXED_LEN_BYTE_ARRAY: + return fixedReader(desc); + case BINARY: + if (expected.typeId() == TypeID.STRING) { + return ParquetValueReaders.strings(desc); + } else { + return ParquetValueReaders.byteBuffers(desc); + } + case INT32: + if (expected.typeId() == TypeID.LONG) { + return ParquetValueReaders.intsAsLongs(desc); + } else { + return ParquetValueReaders.unboxed(desc); + } + case FLOAT: + if (expected.typeId() == TypeID.DOUBLE) { + return ParquetValueReaders.floatsAsDoubles(desc); + } else { + return ParquetValueReaders.unboxed(desc); + } + case BOOLEAN: + case INT64: + case DOUBLE: + return ParquetValueReaders.unboxed(desc); + case INT96: + // Impala & Spark used to write timestamps as INT96 without a logical type. For + // backwards compatibility we try to read INT96 as timestamps. + return timestampReader(desc, true); + default: + throw new UnsupportedOperationException("Unsupported type: " + primitive); + } + } MessageType type() { return type; From acc5dfa0c8bb2da3615363a058f84352926b4984 Mon Sep 17 00:00:00 2001 From: kanthi subramanian Date: Tue, 28 Apr 2026 21:11:32 -0500 Subject: [PATCH 4/4] feat: Enhance alter-table command to support adding NOT NULL columns with required flag --- .../iceberg/BaseMetastoreTableOperations.java | 20 ++++ ice/README.md | 3 + .../ice/cli/internal/cmd/AlterTable.java | 18 +++- .../data/parquet/BaseParquetReaders.java | 100 ++++++++---------- .../org/apache/iceberg/rest/HTTPClient.java | 65 ++---------- .../ice/cli/internal/cmd/AlterTableTest.java | 35 +++++- 6 files changed, 122 insertions(+), 119 deletions(-) diff --git a/ice-rest-catalog/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java b/ice-rest-catalog/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java index 9363e252..e9ca5cc5 100644 --- a/ice-rest-catalog/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java +++ b/ice-rest-catalog/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java @@ -310,6 +310,26 @@ protected CommitStatus checkCommitStatus(String newMetadataLocation, TableMetada () -> checkCurrentMetadataLocation(newMetadataLocation)); } + /** + * Attempt to load the table and see if any current or past metadata location matches the one we + * were attempting to set. This is used as a last resort when we are dealing with exceptions that + * may indicate the commit has failed but are not proof that this is the case. Past locations must + * also be searched on the chance that a second committer was able to successfully commit on top + * of our commit. When the {@code newMetadataLocation} is not found, the method returns {@link + * CommitStatus#FAILURE}. + * + * @param newMetadataLocation the path of the new commit file + * @param config metadata to use for configuration + * @return Commit Status of Success, Failure or Unknown + */ + protected CommitStatus checkCommitStatusStrict(String newMetadataLocation, TableMetadata config) { + return checkCommitStatusStrict( + tableName(), + newMetadataLocation, + config.properties(), + () -> checkCurrentMetadataLocation(newMetadataLocation)); + } + /** * Validate if the new metadata location is the current metadata location or present within * previous metadata files. diff --git a/ice/README.md b/ice/README.md index 69687120..3e6f8416 100644 --- a/ice/README.md +++ b/ice/README.md @@ -98,6 +98,9 @@ Supported codecs: `zstd`, `snappy`, `gzip`, `lz4`. # add a column to an existing table ice alter-table flowers.iris '[{"op":"add_column","name":"extra","type":"string"}]' +# add a NOT NULL column (`required: true`; uses Iceberg allowIncompatibleChanges for that alter) +ice alter-table flowers.iris '[{"op":"add_column","name":"extra","type":"string","required":true}]' + # verify the schema change ice describe -s flowers.iris ``` diff --git a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/AlterTable.java b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/AlterTable.java index 3ce0484c..d39a8e9f 100644 --- a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/AlterTable.java +++ b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/AlterTable.java @@ -58,6 +58,7 @@ public static class AddColumn extends Update { @Nullable private final String after; @Nullable private final String before; private final boolean first; + private final boolean required; public AddColumn( @JsonProperty(value = "name", required = true) String name, @@ -65,13 +66,15 @@ public AddColumn( @JsonProperty("doc") @Nullable String doc, @JsonProperty("after") @Nullable String after, @JsonProperty("before") @Nullable String before, - @JsonProperty("first") @Nullable Boolean first) { + @JsonProperty("first") @Nullable Boolean first, + @JsonProperty("required") @Nullable Boolean required) { this.name = name; this.type = Types.fromPrimitiveString(type); this.doc = doc; this.after = after; this.before = before; this.first = first != null && first; + this.required = required != null && required; } } @@ -218,7 +221,18 @@ public static void run(Catalog catalog, TableIdentifier tableId, List up case AddColumn up -> { // TODO: support nested columns UpdateSchema us = schemaUpdates.getValue(); - us.addColumn(up.name, up.type, up.doc); + if (up.required) { + // Iceberg rejects required adds without an initial default unless incompatible + // changes are explicitly allowed (even for empty tables with a snapshot). + us.allowIncompatibleChanges(); + if (up.doc != null) { + us.addRequiredColumn(up.name, up.type, up.doc); + } else { + us.addRequiredColumn(up.name, up.type); + } + } else { + us.addColumn(up.name, up.type, up.doc); + } if (up.after != null) { us.moveAfter(up.name, up.after); } else if (up.before != null) { diff --git a/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java b/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java index 408c6a37..620c226d 100644 --- a/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java +++ b/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java @@ -18,15 +18,17 @@ */ package org.apache.iceberg.data.parquet; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Optional; -import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.parquet.ParquetSchemaUtil; import org.apache.iceberg.parquet.ParquetValueReader; import org.apache.iceberg.parquet.ParquetValueReaders; +import org.apache.iceberg.parquet.ParquetVariantVisitor; import org.apache.iceberg.parquet.TypeWithSchemaVisitor; +import org.apache.iceberg.parquet.VariantReaderBuilder; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -50,11 +52,7 @@ import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; -/** - * @deprecated since 1.8.0, will be made package-private in 1.9.0 - */ -@Deprecated -public abstract class BaseParquetReaders { +abstract class BaseParquetReaders { protected BaseParquetReaders() {} protected ParquetValueReader createReader(Schema expectedSchema, MessageType fileSchema) { @@ -78,7 +76,7 @@ protected ParquetValueReader createReader( } protected abstract ParquetValueReader createStructReader( - List types, List> fieldReaders, Types.StructType structType); + List> fieldReaders, Types.StructType structType); protected abstract ParquetValueReader fixedReader(ColumnDescriptor desc); @@ -111,7 +109,6 @@ public ParquetValueReader struct( // the expected struct is ignored because nested fields are never found when the List> newFields = Lists.newArrayListWithExpectedSize(fieldReaders.size()); - List types = Lists.newArrayListWithExpectedSize(fieldReaders.size()); List fields = struct.getFields(); for (int i = 0; i < fields.size(); i += 1) { ParquetValueReader fieldReader = fieldReaders.get(i); @@ -119,11 +116,10 @@ public ParquetValueReader struct( Type fieldType = fields.get(i); int fieldD = type().getMaxDefinitionLevel(path(fieldType.getName())) - 1; newFields.add(ParquetValueReaders.option(fieldType, fieldD, fieldReader)); - types.add(fieldType); } } - return createStructReader(types, newFields, expected); + return createStructReader(newFields, expected); } } @@ -177,6 +173,7 @@ public Optional> visit(IntLogicalTypeAnnotation intLogical Preconditions.checkArgument( intLogicalType.isSigned(), "Cannot read UINT64 as a long value"); */ + return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc)); } @@ -227,10 +224,12 @@ public ParquetValueReader message( @Override public ParquetValueReader struct( Types.StructType expected, GroupType struct, List> fieldReaders) { + if (null == expected) { + return createStructReader(ImmutableList.of(), null); + } + // match the expected struct's order Map> readersById = Maps.newHashMap(); - Map typesById = Maps.newHashMap(); - Map maxDefinitionLevelsById = Maps.newHashMap(); List fields = struct.getFields(); for (int i = 0; i < fields.size(); i += 1) { ParquetValueReader fieldReader = fieldReaders.get(i); @@ -239,55 +238,37 @@ public ParquetValueReader struct( int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1; int id = fieldType.getId().intValue(); readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, fieldReader)); - typesById.put(id, fieldType); - if (idToConstant.containsKey(id)) { - maxDefinitionLevelsById.put(id, fieldD); - } } } - List expectedFields = - expected != null ? expected.fields() : ImmutableList.of(); + int constantDefinitionLevel = type.getMaxDefinitionLevel(currentPath()); + List expectedFields = expected.fields(); List> reorderedFields = Lists.newArrayListWithExpectedSize(expectedFields.size()); - List types = Lists.newArrayListWithExpectedSize(expectedFields.size()); - // Defaulting to parent max definition level - int defaultMaxDefinitionLevel = type.getMaxDefinitionLevel(currentPath()); + for (Types.NestedField field : expectedFields) { int id = field.fieldId(); - ParquetValueReader reader = readersById.get(id); - if (idToConstant.containsKey(id)) { - // containsKey is used because the constant may be null - int fieldMaxDefinitionLevel = - maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel); - reorderedFields.add( - ParquetValueReaders.constant(idToConstant.get(id), fieldMaxDefinitionLevel)); - types.add(null); - } else if (id == MetadataColumns.ROW_POSITION.fieldId()) { - reorderedFields.add(ParquetValueReaders.position()); - types.add(null); - } else if (id == MetadataColumns.IS_DELETED.fieldId()) { - reorderedFields.add(ParquetValueReaders.constant(false)); - types.add(null); - } else if (reader != null) { - reorderedFields.add(reader); - types.add(typesById.get(id)); - } else if (field.initialDefault() != null) { - reorderedFields.add( - ParquetValueReaders.constant( - convertConstant(field.type(), field.initialDefault()), - maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel))); - types.add(typesById.get(id)); - } else if (field.isOptional()) { - reorderedFields.add(ParquetValueReaders.nulls()); - types.add(null); - } else { - throw new IllegalArgumentException( - String.format("Missing required field: %s", field.name())); - } + ParquetValueReader reader = + ParquetValueReaders.replaceWithMetadataReader( + id, readersById.get(id), idToConstant, constantDefinitionLevel); + reorderedFields.add(defaultReader(field, reader, constantDefinitionLevel)); } - return createStructReader(types, reorderedFields, expected); + return createStructReader(reorderedFields, expected); + } + + private ParquetValueReader defaultReader( + Types.NestedField field, ParquetValueReader reader, int constantDL) { + if (reader != null) { + return reader; + } else if (field.initialDefault() != null) { + return ParquetValueReaders.constant( + convertConstant(field.type(), field.initialDefault()), constantDL); + } else if (field.isOptional()) { + return ParquetValueReaders.nulls(); + } + + throw new IllegalArgumentException(String.format("Missing required field: %s", field.name())); } @Override @@ -383,14 +364,25 @@ public ParquetValueReader primitive( case DOUBLE: return ParquetValueReaders.unboxed(desc); case INT96: - // Impala & Spark used to write timestamps as INT96 without a logical type. For - // backwards compatibility we try to read INT96 as timestamps. + // Impala & Spark used to write timestamps as INT96 without a logical type. For backwards + // compatibility we try to read INT96 as timestamps. return timestampReader(desc, true); default: throw new UnsupportedOperationException("Unsupported type: " + primitive); } } + @Override + public ParquetValueReader variant( + Types.VariantType iVariant, GroupType variant, ParquetValueReader reader) { + return reader; + } + + @Override + public ParquetVariantVisitor> variantVisitor() { + return new VariantReaderBuilder(type, Arrays.asList(currentPath())); + } + MessageType type() { return type; } diff --git a/ice/src/main/java/org/apache/iceberg/rest/HTTPClient.java b/ice/src/main/java/org/apache/iceberg/rest/HTTPClient.java index a939cdf0..c771c83c 100644 --- a/ice/src/main/java/org/apache/iceberg/rest/HTTPClient.java +++ b/ice/src/main/java/org/apache/iceberg/rest/HTTPClient.java @@ -41,7 +41,6 @@ import org.apache.hc.core5.http.Header; import org.apache.hc.core5.http.HttpHeaders; import org.apache.hc.core5.http.HttpHost; -import org.apache.hc.core5.http.HttpRequestInterceptor; import org.apache.hc.core5.http.HttpStatus; import org.apache.hc.core5.http.ParseException; import org.apache.hc.core5.http.impl.EnglishReasonPhraseCatalog; @@ -49,8 +48,6 @@ import org.apache.hc.core5.http.io.entity.StringEntity; import org.apache.hc.core5.io.CloseMode; import org.apache.iceberg.IcebergBuild; -import org.apache.iceberg.common.DynConstructors; -import org.apache.iceberg.common.DynMethods; import org.apache.iceberg.exceptions.RESTException; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -66,9 +63,6 @@ public class HTTPClient extends BaseHTTPClient { private static final Logger LOG = LoggerFactory.getLogger(HTTPClient.class); - private static final String SIGV4_ENABLED = "rest.sigv4-enabled"; - private static final String SIGV4_REQUEST_INTERCEPTOR_IMPL = - "org.apache.iceberg.aws.RESTSigV4Signer"; @VisibleForTesting static final String CLIENT_VERSION_HEADER = "X-Client-Version"; @VisibleForTesting @@ -90,6 +84,7 @@ public class HTTPClient extends BaseHTTPClient { private final Map baseHeaders; private final ObjectMapper mapper; private final AuthSession authSession; + private final boolean isRootClient; private HTTPClient( URI baseUri, @@ -97,7 +92,6 @@ private HTTPClient( CredentialsProvider proxyCredsProvider, Map baseHeaders, ObjectMapper objectMapper, - HttpRequestInterceptor requestInterceptor, Map properties, HttpClientConnectionManager connectionManager, AuthSession session) { @@ -110,10 +104,6 @@ private HTTPClient( clientBuilder.setConnectionManager(connectionManager); - if (requestInterceptor != null) { - clientBuilder.addRequestInterceptorLast(requestInterceptor); - } - int maxRetries = PropertyUtil.propertyAsInt(properties, REST_MAX_RETRIES, 5); clientBuilder.setRetryStrategy(new ExponentialHttpRequestRetryStrategy(maxRetries)); @@ -126,6 +116,7 @@ private HTTPClient( } this.httpClient = clientBuilder.build(); + this.isRootClient = true; } /** @@ -139,6 +130,7 @@ private HTTPClient(HTTPClient parent, AuthSession authSession) { this.mapper = parent.mapper; this.baseHeaders = parent.baseHeaders; this.authSession = authSession; + this.isRootClient = false; } @Override @@ -160,7 +152,6 @@ private static String extractResponseBodyAsString(CloseableHttpResponse response } } - // Per the spec, the only currently defined / used "success" responses are 200 and 202. private static boolean isSuccessful(CloseableHttpResponse response) { int code = response.getCode(); return code == HttpStatus.SC_OK @@ -331,50 +322,13 @@ protected T execute( @Override public void close() throws IOException { - try { - if (authSession != null) { - authSession.close(); - } - } finally { + // Do not close the AuthSession as it's managed by the owner of this HTTPClient. + // Only close the underlying Apache HTTP client if this is a root HTTPClient. + if (isRootClient) { httpClient.close(CloseMode.GRACEFUL); } } - @VisibleForTesting - static HttpRequestInterceptor loadInterceptorDynamically( - String impl, Map properties) { - HttpRequestInterceptor instance; - - DynConstructors.Ctor ctor; - try { - ctor = - DynConstructors.builder(HttpRequestInterceptor.class) - .loader(HTTPClient.class.getClassLoader()) - .impl(impl) - .buildChecked(); - } catch (NoSuchMethodException e) { - throw new IllegalArgumentException( - String.format( - "Cannot initialize RequestInterceptor, missing no-arg constructor: %s", impl), - e); - } - - try { - instance = ctor.newInstance(); - } catch (ClassCastException e) { - throw new IllegalArgumentException( - String.format("Cannot initialize, %s does not implement RequestInterceptor", impl), e); - } - - DynMethods.builder("initialize") - .hiddenImpl(impl, Map.class) - .orNoop() - .build(instance) - .invoke(properties); - - return instance; - } - static HttpClientConnectionManager configureConnectionManager( Map properties, TlsSocketStrategy tlsSocketStrategy) { PoolingHttpClientConnectionManagerBuilder connectionManagerBuilder = @@ -501,12 +455,6 @@ public HTTPClient build() { withHeader(CLIENT_VERSION_HEADER, IcebergBuild.fullVersion()); withHeader(CLIENT_GIT_COMMIT_SHORT_HEADER, IcebergBuild.gitCommitShortId()); - HttpRequestInterceptor interceptor = null; - - if (PropertyUtil.propertyAsBoolean(properties, SIGV4_ENABLED, false)) { - interceptor = loadInterceptorDynamically(SIGV4_REQUEST_INTERCEPTOR_IMPL, properties); - } - if (this.proxyCredentialsProvider != null) { Preconditions.checkNotNull( proxy, "Invalid http client proxy for proxy credentials provider: null"); @@ -518,7 +466,6 @@ public HTTPClient build() { proxyCredentialsProvider, baseHeaders, mapper, - interceptor, properties, configureConnectionManager(properties, tlsSocketStrategy), authSession); diff --git a/ice/src/test/java/com/altinity/ice/cli/internal/cmd/AlterTableTest.java b/ice/src/test/java/com/altinity/ice/cli/internal/cmd/AlterTableTest.java index a087acee..bd290fb5 100644 --- a/ice/src/test/java/com/altinity/ice/cli/internal/cmd/AlterTableTest.java +++ b/ice/src/test/java/com/altinity/ice/cli/internal/cmd/AlterTableTest.java @@ -122,7 +122,7 @@ public void testAddColumnAfter() throws Exception { catalog.buildTable(tableId, schema).create(); List updates = - Arrays.asList(new AlterTable.AddColumn("age", "long", null, "name", null, null)); + Arrays.asList(new AlterTable.AddColumn("age", "long", null, "name", null, null, null)); AlterTable.run(catalog, tableId, updates); @@ -136,7 +136,8 @@ public void testAddColumnBefore() throws Exception { catalog.buildTable(tableId, schema).create(); List updates = - Arrays.asList(new AlterTable.AddColumn("age", "long", null, null, "timestamp_col", null)); + Arrays.asList( + new AlterTable.AddColumn("age", "long", null, null, "timestamp_col", null, null)); AlterTable.run(catalog, tableId, updates); @@ -150,7 +151,7 @@ public void testAddColumnFirst() throws Exception { catalog.buildTable(tableId, schema).create(); List updates = - Arrays.asList(new AlterTable.AddColumn("age", "long", null, null, null, true)); + Arrays.asList(new AlterTable.AddColumn("age", "long", null, null, null, true, null)); AlterTable.run(catalog, tableId, updates); @@ -165,7 +166,7 @@ public void testAddColumnAfterWinsWhenBothAfterAndBeforeSet() throws Exception { catalog.buildTable(tableId, schema).create(); List updates = - Arrays.asList(new AlterTable.AddColumn("bad", "string", null, "name", "id", null)); + Arrays.asList(new AlterTable.AddColumn("bad", "string", null, "name", "id", null, null)); AlterTable.run(catalog, tableId, updates); @@ -173,4 +174,30 @@ public void testAddColumnAfterWinsWhenBothAfterAndBeforeSet() throws Exception { assertThat(table.schema().columns().stream().map(Types.NestedField::name).toList()) .containsExactly("id", "name", "bad", "timestamp_col", "date_col"); } + + @Test + public void testAddRequiredColumnOnEmptyTable() throws Exception { + catalog.buildTable(tableId, schema).create(); + + List updates = + Arrays.asList(new AlterTable.AddColumn("age", "long", null, null, null, null, true)); + + AlterTable.run(catalog, tableId, updates); + + Table table = catalog.loadTable(tableId); + assertThat(table.schema().findField("age").isRequired()).isTrue(); + } + + @Test + public void testAddOptionalColumnByDefault() throws Exception { + catalog.buildTable(tableId, schema).create(); + + List updates = + Arrays.asList(new AlterTable.AddColumn("age", "long", null, null, null, null, null)); + + AlterTable.run(catalog, tableId, updates); + + Table table = catalog.loadTable(tableId); + assertThat(table.schema().findField("age").isOptional()).isTrue(); + } }