diff --git a/ice-rest-catalog/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java b/ice-rest-catalog/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java index 0a5ac042..e9ca5cc5 100644 --- a/ice-rest-catalog/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java +++ b/ice-rest-catalog/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Predicate; +import org.apache.iceberg.BaseMetastoreOperations.CommitStatus; import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.CommitFailedException; @@ -294,20 +295,39 @@ public long newSnapshotId() { * were attempting to set. This is used as a last resort when we are dealing with exceptions that * may indicate the commit has failed but are not proof that this is the case. Past locations must * also be searched on the chance that a second committer was able to successfully commit on top - * of our commit. + * of our commit. When the {@code newMetadataLocation} is not found, the method returns {@link + * CommitStatus#UNKNOWN}. * * @param newMetadataLocation the path of the new commit file * @param config metadata to use for configuration - * @return Commit Status of Success, Failure or Unknown + * @return Commit Status of Success, Unknown */ protected CommitStatus checkCommitStatus(String newMetadataLocation, TableMetadata config) { - return CommitStatus.valueOf( - checkCommitStatus( - tableName(), - newMetadataLocation, - config.properties(), - () -> checkCurrentMetadataLocation(newMetadataLocation)) - .name()); + return checkCommitStatus( + tableName(), + newMetadataLocation, + config.properties(), + () -> checkCurrentMetadataLocation(newMetadataLocation)); + } + + /** + * Attempt to load the table and see if any current or past metadata location matches the one we + * were attempting to set. This is used as a last resort when we are dealing with exceptions that + * may indicate the commit has failed but are not proof that this is the case. Past locations must + * also be searched on the chance that a second committer was able to successfully commit on top + * of our commit. When the {@code newMetadataLocation} is not found, the method returns {@link + * CommitStatus#FAILURE}. + * + * @param newMetadataLocation the path of the new commit file + * @param config metadata to use for configuration + * @return Commit Status of Success, Failure or Unknown + */ + protected CommitStatus checkCommitStatusStrict(String newMetadataLocation, TableMetadata config) { + return checkCommitStatusStrict( + tableName(), + newMetadataLocation, + config.properties(), + () -> checkCurrentMetadataLocation(newMetadataLocation)); } /** diff --git a/ice/README.md b/ice/README.md index 69687120..3e6f8416 100644 --- a/ice/README.md +++ b/ice/README.md @@ -98,6 +98,9 @@ Supported codecs: `zstd`, `snappy`, `gzip`, `lz4`. # add a column to an existing table ice alter-table flowers.iris '[{"op":"add_column","name":"extra","type":"string"}]' +# add a NOT NULL column (`required: true`; uses Iceberg allowIncompatibleChanges for that alter) +ice alter-table flowers.iris '[{"op":"add_column","name":"extra","type":"string","required":true}]' + # verify the schema change ice describe -s flowers.iris ``` diff --git a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/AlterTable.java b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/AlterTable.java index 782682f2..d39a8e9f 100644 --- a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/AlterTable.java +++ b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/AlterTable.java @@ -13,7 +13,11 @@ import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; import java.io.IOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.List; +import java.util.UUID; import javax.annotation.Nullable; import org.apache.iceberg.Table; import org.apache.iceberg.Transaction; @@ -22,6 +26,7 @@ import org.apache.iceberg.UpdateSchema; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.slf4j.Logger; @@ -53,6 +58,7 @@ public static class AddColumn extends Update { @Nullable private final String after; @Nullable private final String before; private final boolean first; + private final boolean required; public AddColumn( @JsonProperty(value = "name", required = true) String name, @@ -60,16 +66,83 @@ public AddColumn( @JsonProperty("doc") @Nullable String doc, @JsonProperty("after") @Nullable String after, @JsonProperty("before") @Nullable String before, - @JsonProperty("first") @Nullable Boolean first) { + @JsonProperty("first") @Nullable Boolean first, + @JsonProperty("required") @Nullable Boolean required) { this.name = name; this.type = Types.fromPrimitiveString(type); this.doc = doc; this.after = after; this.before = before; this.first = first != null && first; + this.required = required != null && required; } } + // Coerce a JSON literal (number / boolean / string) into an Iceberg Literal of the requested + // type. For numeric, boolean and string types we build the Literal directly; for complex + // primitives (date/time/timestamp/decimal/uuid/binary) we accept a string form and let + // Iceberg parse it via Literal.to(type). + @Nullable + private static Literal coerceDefault(Type type, @Nullable Object jsonValue) { + if (jsonValue == null) { + return null; + } + try { + switch (type.typeId()) { + case BOOLEAN: + if (jsonValue instanceof Boolean b) { + return Literal.of(b); + } + return Literal.of(Boolean.parseBoolean(jsonValue.toString())); + case INTEGER: + return Literal.of(((Number) numberOrParse(jsonValue)).intValue()); + case LONG: + return Literal.of(((Number) numberOrParse(jsonValue)).longValue()); + case FLOAT: + return Literal.of(((Number) numberOrParse(jsonValue)).floatValue()); + case DOUBLE: + return Literal.of(((Number) numberOrParse(jsonValue)).doubleValue()); + case STRING: + return Literal.of(jsonValue.toString()); + case BINARY: + case FIXED: + return Literal.of(ByteBuffer.wrap(jsonValue.toString().getBytes(StandardCharsets.UTF_8))); + case DECIMAL: + return Literal.of(new BigDecimal(jsonValue.toString())).to(type); + case UUID: + return Literal.of(UUID.fromString(jsonValue.toString())); + case DATE: + case TIME: + case TIMESTAMP: + case TIMESTAMP_NANO: + // Let Iceberg parse the canonical string form (e.g. "2025-01-01", + // "2025-01-01T00:00:00", "2025-01-01T00:00:00+00:00"). + Literal parsed = Literal.of(jsonValue.toString()).to(type); + if (parsed == null) { + throw new IllegalArgumentException( + String.format("Cannot parse default value '%s' as %s", jsonValue, type)); + } + return parsed; + default: + throw new IllegalArgumentException( + String.format("Defaults not supported for type %s", type)); + } + } catch (ClassCastException | IllegalArgumentException e) { + throw new IllegalArgumentException( + String.format("Cannot coerce default value '%s' to type %s", jsonValue, type), e); + } + } + + // Accept either a JSON number (already a Number) or a string parseable as a number. + private static Number numberOrParse(Object jsonValue) { + if (jsonValue instanceof Number n) { + return n; + } + String s = jsonValue.toString(); + // Use BigDecimal to accept any numeric form; downstream casts narrow to int/long/float/double. + return new BigDecimal(s); + } + public static class AlterColumn extends Update { private final String name; private final Type.PrimitiveType type; @@ -148,7 +221,18 @@ public static void run(Catalog catalog, TableIdentifier tableId, List up case AddColumn up -> { // TODO: support nested columns UpdateSchema us = schemaUpdates.getValue(); - us.addColumn(up.name, up.type, up.doc); + if (up.required) { + // Iceberg rejects required adds without an initial default unless incompatible + // changes are explicitly allowed (even for empty tables with a snapshot). + us.allowIncompatibleChanges(); + if (up.doc != null) { + us.addRequiredColumn(up.name, up.type, up.doc); + } else { + us.addRequiredColumn(up.name, up.type); + } + } else { + us.addColumn(up.name, up.type, up.doc); + } if (up.after != null) { us.moveAfter(up.name, up.after); } else if (up.before != null) { diff --git a/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java b/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java index 7f237b72..620c226d 100644 --- a/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java +++ b/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java @@ -18,15 +18,17 @@ */ package org.apache.iceberg.data.parquet; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Optional; -import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.parquet.ParquetSchemaUtil; import org.apache.iceberg.parquet.ParquetValueReader; import org.apache.iceberg.parquet.ParquetValueReaders; +import org.apache.iceberg.parquet.ParquetVariantVisitor; import org.apache.iceberg.parquet.TypeWithSchemaVisitor; +import org.apache.iceberg.parquet.VariantReaderBuilder; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -50,11 +52,7 @@ import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; -/** - * @deprecated since 1.8.0, will be made package-private in 1.9.0 - */ -@Deprecated -public abstract class BaseParquetReaders { +abstract class BaseParquetReaders { protected BaseParquetReaders() {} protected ParquetValueReader createReader(Schema expectedSchema, MessageType fileSchema) { @@ -78,56 +76,16 @@ protected ParquetValueReader createReader( } protected abstract ParquetValueReader createStructReader( - List types, List> fieldReaders, Types.StructType structType); + List> fieldReaders, Types.StructType structType); - protected ParquetValueReader fixedReader(ColumnDescriptor desc) { - return new GenericParquetReaders.FixedReader(desc); - } + protected abstract ParquetValueReader fixedReader(ColumnDescriptor desc); - protected ParquetValueReader dateReader(ColumnDescriptor desc) { - return new GenericParquetReaders.DateReader(desc); - } + protected abstract ParquetValueReader dateReader(ColumnDescriptor desc); - protected ParquetValueReader timeReader(ColumnDescriptor desc) { - LogicalTypeAnnotation time = desc.getPrimitiveType().getLogicalTypeAnnotation(); - Preconditions.checkArgument( - time instanceof TimeLogicalTypeAnnotation, "Invalid time logical type: " + time); - - LogicalTypeAnnotation.TimeUnit unit = ((TimeLogicalTypeAnnotation) time).getUnit(); - switch (unit) { - case MICROS: - return new GenericParquetReaders.TimeReader(desc); - case MILLIS: - return new GenericParquetReaders.TimeMillisReader(desc); - default: - throw new UnsupportedOperationException("Unsupported unit for time: " + unit); - } - } + protected abstract ParquetValueReader timeReader(ColumnDescriptor desc); - protected ParquetValueReader timestampReader(ColumnDescriptor desc, boolean isAdjustedToUTC) { - if (desc.getPrimitiveType().getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT96) { - return new GenericParquetReaders.TimestampInt96Reader(desc); - } - - LogicalTypeAnnotation timestamp = desc.getPrimitiveType().getLogicalTypeAnnotation(); - Preconditions.checkArgument( - timestamp instanceof TimestampLogicalTypeAnnotation, - "Invalid timestamp logical type: " + timestamp); - - LogicalTypeAnnotation.TimeUnit unit = ((TimestampLogicalTypeAnnotation) timestamp).getUnit(); - switch (unit) { - case MICROS: - return isAdjustedToUTC - ? new GenericParquetReaders.TimestamptzReader(desc) - : new GenericParquetReaders.TimestampReader(desc); - case MILLIS: - return isAdjustedToUTC - ? new GenericParquetReaders.TimestamptzMillisReader(desc) - : new GenericParquetReaders.TimestampMillisReader(desc); - default: - throw new UnsupportedOperationException("Unsupported unit for timestamp: " + unit); - } - } + protected abstract ParquetValueReader timestampReader( + ColumnDescriptor desc, boolean isAdjustedToUTC); protected Object convertConstant(org.apache.iceberg.types.Type type, Object value) { return value; @@ -151,7 +109,6 @@ public ParquetValueReader struct( // the expected struct is ignored because nested fields are never found when the List> newFields = Lists.newArrayListWithExpectedSize(fieldReaders.size()); - List types = Lists.newArrayListWithExpectedSize(fieldReaders.size()); List fields = struct.getFields(); for (int i = 0; i < fields.size(); i += 1) { ParquetValueReader fieldReader = fieldReaders.get(i); @@ -159,11 +116,10 @@ public ParquetValueReader struct( Type fieldType = fields.get(i); int fieldD = type().getMaxDefinitionLevel(path(fieldType.getName())) - 1; newFields.add(ParquetValueReaders.option(fieldType, fieldD, fieldReader)); - types.add(fieldType); } } - return createStructReader(types, newFields, expected); + return createStructReader(newFields, expected); } } @@ -207,8 +163,7 @@ public Optional> visit(TimeLogicalTypeAnnotation timeLogic @Override public Optional> visit( TimestampLogicalTypeAnnotation timestampLogicalType) { - return Optional.of( - timestampReader(desc, ((Types.TimestampType) expected).shouldAdjustToUTC())); + return Optional.of(timestampReader(desc, timestampLogicalType.isAdjustedToUTC())); } @Override @@ -218,6 +173,7 @@ public Optional> visit(IntLogicalTypeAnnotation intLogical Preconditions.checkArgument( intLogicalType.isSigned(), "Cannot read UINT64 as a long value"); */ + return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc)); } @@ -268,10 +224,12 @@ public ParquetValueReader message( @Override public ParquetValueReader struct( Types.StructType expected, GroupType struct, List> fieldReaders) { + if (null == expected) { + return createStructReader(ImmutableList.of(), null); + } + // match the expected struct's order Map> readersById = Maps.newHashMap(); - Map typesById = Maps.newHashMap(); - Map maxDefinitionLevelsById = Maps.newHashMap(); List fields = struct.getFields(); for (int i = 0; i < fields.size(); i += 1) { ParquetValueReader fieldReader = fieldReaders.get(i); @@ -280,55 +238,37 @@ public ParquetValueReader struct( int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1; int id = fieldType.getId().intValue(); readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, fieldReader)); - typesById.put(id, fieldType); - if (idToConstant.containsKey(id)) { - maxDefinitionLevelsById.put(id, fieldD); - } } } - List expectedFields = - expected != null ? expected.fields() : ImmutableList.of(); + int constantDefinitionLevel = type.getMaxDefinitionLevel(currentPath()); + List expectedFields = expected.fields(); List> reorderedFields = Lists.newArrayListWithExpectedSize(expectedFields.size()); - List types = Lists.newArrayListWithExpectedSize(expectedFields.size()); - // Defaulting to parent max definition level - int defaultMaxDefinitionLevel = type.getMaxDefinitionLevel(currentPath()); + for (Types.NestedField field : expectedFields) { int id = field.fieldId(); - ParquetValueReader reader = readersById.get(id); - if (idToConstant.containsKey(id)) { - // containsKey is used because the constant may be null - int fieldMaxDefinitionLevel = - maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel); - reorderedFields.add( - ParquetValueReaders.constant(idToConstant.get(id), fieldMaxDefinitionLevel)); - types.add(null); - } else if (id == MetadataColumns.ROW_POSITION.fieldId()) { - reorderedFields.add(ParquetValueReaders.position()); - types.add(null); - } else if (id == MetadataColumns.IS_DELETED.fieldId()) { - reorderedFields.add(ParquetValueReaders.constant(false)); - types.add(null); - } else if (reader != null) { - reorderedFields.add(reader); - types.add(typesById.get(id)); - } else if (field.initialDefault() != null) { - reorderedFields.add( - ParquetValueReaders.constant( - convertConstant(field.type(), field.initialDefault()), - maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel))); - types.add(typesById.get(id)); - } else if (field.isOptional()) { - reorderedFields.add(ParquetValueReaders.nulls()); - types.add(null); - } else { - throw new IllegalArgumentException( - String.format("Missing required field: %s", field.name())); - } + ParquetValueReader reader = + ParquetValueReaders.replaceWithMetadataReader( + id, readersById.get(id), idToConstant, constantDefinitionLevel); + reorderedFields.add(defaultReader(field, reader, constantDefinitionLevel)); + } + + return createStructReader(reorderedFields, expected); + } + + private ParquetValueReader defaultReader( + Types.NestedField field, ParquetValueReader reader, int constantDL) { + if (reader != null) { + return reader; + } else if (field.initialDefault() != null) { + return ParquetValueReaders.constant( + convertConstant(field.type(), field.initialDefault()), constantDL); + } else if (field.isOptional()) { + return ParquetValueReaders.nulls(); } - return createStructReader(types, reorderedFields, expected); + throw new IllegalArgumentException(String.format("Missing required field: %s", field.name())); } @Override @@ -432,6 +372,17 @@ public ParquetValueReader primitive( } } + @Override + public ParquetValueReader variant( + Types.VariantType iVariant, GroupType variant, ParquetValueReader reader) { + return reader; + } + + @Override + public ParquetVariantVisitor> variantVisitor() { + return new VariantReaderBuilder(type, Arrays.asList(currentPath())); + } + MessageType type() { return type; } diff --git a/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java b/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java index 8199f698..6304a41e 100644 --- a/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java +++ b/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java @@ -18,13 +18,17 @@ */ package org.apache.iceberg.data.parquet; +import java.util.Arrays; import java.util.List; import java.util.Optional; -import org.apache.iceberg.parquet.ParquetTypeVisitor; import org.apache.iceberg.parquet.ParquetValueWriter; import org.apache.iceberg.parquet.ParquetValueWriters; +import org.apache.iceberg.parquet.ParquetVariantVisitor; +import org.apache.iceberg.parquet.TypeWithSchemaVisitor; +import org.apache.iceberg.parquet.VariantWriterBuilder; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.LogicalTypeAnnotation; @@ -32,41 +36,31 @@ import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; -/** - * @deprecated since 1.8.0, will be made package-private in 1.9.0 - */ -@Deprecated -public abstract class BaseParquetWriter { +abstract class BaseParquetWriter { - @SuppressWarnings("unchecked") protected ParquetValueWriter createWriter(MessageType type) { - return (ParquetValueWriter) ParquetTypeVisitor.visit(type, new WriteBuilder(type)); + return createWriter(null, type); + } + + @SuppressWarnings("unchecked") + protected ParquetValueWriter createWriter(Types.StructType struct, MessageType type) { + return (ParquetValueWriter) + TypeWithSchemaVisitor.visit(struct, type, new WriteBuilder(type)); } protected abstract ParquetValueWriters.StructWriter createStructWriter( - List> writers); + Types.StructType struct, List> writers); - protected ParquetValueWriter fixedWriter(ColumnDescriptor desc) { - return new GenericParquetWriter.FixedWriter(desc); - } + protected abstract ParquetValueWriter fixedWriter(ColumnDescriptor desc); - protected ParquetValueWriter dateWriter(ColumnDescriptor desc) { - return new GenericParquetWriter.DateWriter(desc); - } + protected abstract ParquetValueWriter dateWriter(ColumnDescriptor desc); - protected ParquetValueWriter timeWriter(ColumnDescriptor desc) { - return new GenericParquetWriter.TimeWriter(desc); - } + protected abstract ParquetValueWriter timeWriter(ColumnDescriptor desc); - protected ParquetValueWriter timestampWriter(ColumnDescriptor desc, boolean isAdjustedToUTC) { - if (isAdjustedToUTC) { - return new GenericParquetWriter.TimestamptzWriter(desc); - } else { - return new GenericParquetWriter.TimestampWriter(desc); - } - } + protected abstract ParquetValueWriter timestampWriter( + ColumnDescriptor desc, boolean isAdjustedToUTC); - private class WriteBuilder extends ParquetTypeVisitor> { + private class WriteBuilder extends TypeWithSchemaVisitor> { private final MessageType type; private WriteBuilder(MessageType type) { @@ -75,14 +69,14 @@ private WriteBuilder(MessageType type) { @Override public ParquetValueWriter message( - MessageType message, List> fieldWriters) { + Types.StructType struct, MessageType message, List> fieldWriters) { - return struct(message.asGroupType(), fieldWriters); + return struct(struct, message.asGroupType(), fieldWriters); } @Override public ParquetValueWriter struct( - GroupType struct, List> fieldWriters) { + Types.StructType iceberg, GroupType struct, List> fieldWriters) { List fields = struct.getFields(); List> writers = Lists.newArrayListWithExpectedSize(fieldWriters.size()); for (int i = 0; i < fields.size(); i += 1) { @@ -91,11 +85,12 @@ public ParquetValueWriter struct( writers.add(ParquetValueWriters.option(fieldType, fieldD, fieldWriters.get(i))); } - return createStructWriter(writers); + return createStructWriter(iceberg, writers); } @Override - public ParquetValueWriter list(GroupType array, ParquetValueWriter elementWriter) { + public ParquetValueWriter list( + Types.ListType iceberg, GroupType array, ParquetValueWriter elementWriter) { GroupType repeated = array.getFields().get(0).asGroupType(); String[] repeatedPath = currentPath(); @@ -111,7 +106,10 @@ public ParquetValueWriter list(GroupType array, ParquetValueWriter element @Override public ParquetValueWriter map( - GroupType map, ParquetValueWriter keyWriter, ParquetValueWriter valueWriter) { + Types.MapType iceberg, + GroupType map, + ParquetValueWriter keyWriter, + ParquetValueWriter valueWriter) { GroupType repeatedKeyValue = map.getFields().get(0).asGroupType(); String[] repeatedPath = currentPath(); @@ -131,7 +129,8 @@ public ParquetValueWriter map( } @Override - public ParquetValueWriter primitive(PrimitiveType primitive) { + public ParquetValueWriter primitive( + org.apache.iceberg.types.Type.PrimitiveType iceberg, PrimitiveType primitive) { ColumnDescriptor desc = type.getColumnDescription(currentPath()); LogicalTypeAnnotation logicalType = primitive.getLogicalTypeAnnotation(); if (logicalType != null) { @@ -161,6 +160,17 @@ public ParquetValueWriter primitive(PrimitiveType primitive) { throw new UnsupportedOperationException("Unsupported type: " + primitive); } } + + @Override + public ParquetValueWriter variant( + Types.VariantType iVariant, GroupType variant, ParquetValueWriter result) { + return result; + } + + @Override + public ParquetVariantVisitor> variantVisitor() { + return new VariantWriterBuilder(type, Arrays.asList(currentPath())); + } } private class LogicalTypeWriterVisitor @@ -224,8 +234,8 @@ public Optional> visit( public Optional> visit( LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampType) { Preconditions.checkArgument( - LogicalTypeAnnotation.TimeUnit.MICROS.equals(timestampType.getUnit()), - "Cannot write timestamp in %s, only MICROS is supported", + !LogicalTypeAnnotation.TimeUnit.MILLIS.equals(timestampType.getUnit()), + "Cannot write timestamp in %s, only MICROS and NANOS are supported", timestampType.getUnit()); return Optional.of(timestampWriter(desc, timestampType.isAdjustedToUTC())); } diff --git a/ice/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java b/ice/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java index a00c1176..a003e1af 100644 --- a/ice/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java +++ b/ice/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java @@ -27,6 +27,7 @@ import java.util.function.Function; import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.types.Type; +import org.apache.iceberg.util.UUIDUtil; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; @@ -77,6 +78,55 @@ static Literal fromParquetPrimitive(Type type, PrimitiveType parquetType, } } + // Iceberg 1.9 introduced this API in upstream ParquetConversions and uses it from + // ParquetMetrics. Provide a compatible shim here so callers compiled against 1.9 + // continue to resolve against this shadowed class. The TIME/TIMESTAMP branch keeps + // the local millis -> micros normalization used by fromParquetPrimitive above. + @SuppressWarnings("unchecked") + static T convertValue(Type type, PrimitiveType parquetType, Object value) { + switch (type.typeId()) { + case BOOLEAN: + case INTEGER: + case DATE: + case LONG: + case TIMESTAMP_NANO: + case FLOAT: + case DOUBLE: + return (T) value; + case TIME: + case TIMESTAMP: + if (parquetType.getLogicalTypeAnnotation() + instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation tsAnno + && tsAnno.getUnit() == LogicalTypeAnnotation.TimeUnit.MILLIS) { + return (T) Long.valueOf(((Number) value).longValue() * 1000L); + } + return (T) value; + case STRING: + return (T) ((Binary) value).toStringUsingUTF8(); + case UUID: + return (T) UUIDUtil.convert(((Binary) value).toByteBuffer()); + case FIXED: + case BINARY: + return (T) ((Binary) value).toByteBuffer(); + case DECIMAL: + int scale = + ((DecimalLogicalTypeAnnotation) parquetType.getLogicalTypeAnnotation()).getScale(); + switch (parquetType.getPrimitiveTypeName()) { + case INT32: + case INT64: + return (T) BigDecimal.valueOf(((Number) value).longValue(), scale); + case FIXED_LEN_BYTE_ARRAY: + case BINARY: + return (T) new BigDecimal(new BigInteger(((Binary) value).getBytes()), scale); + default: + throw new IllegalArgumentException( + "Unsupported primitive type for decimal: " + parquetType.getPrimitiveTypeName()); + } + default: + throw new IllegalArgumentException("Unsupported primitive type: " + type); + } + } + static Function converterFromParquet( PrimitiveType parquetType, Type icebergType) { Function fromParquet = converterFromParquet(parquetType); diff --git a/ice/src/main/java/org/apache/iceberg/rest/HTTPClient.java b/ice/src/main/java/org/apache/iceberg/rest/HTTPClient.java index a939cdf0..c771c83c 100644 --- a/ice/src/main/java/org/apache/iceberg/rest/HTTPClient.java +++ b/ice/src/main/java/org/apache/iceberg/rest/HTTPClient.java @@ -41,7 +41,6 @@ import org.apache.hc.core5.http.Header; import org.apache.hc.core5.http.HttpHeaders; import org.apache.hc.core5.http.HttpHost; -import org.apache.hc.core5.http.HttpRequestInterceptor; import org.apache.hc.core5.http.HttpStatus; import org.apache.hc.core5.http.ParseException; import org.apache.hc.core5.http.impl.EnglishReasonPhraseCatalog; @@ -49,8 +48,6 @@ import org.apache.hc.core5.http.io.entity.StringEntity; import org.apache.hc.core5.io.CloseMode; import org.apache.iceberg.IcebergBuild; -import org.apache.iceberg.common.DynConstructors; -import org.apache.iceberg.common.DynMethods; import org.apache.iceberg.exceptions.RESTException; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -66,9 +63,6 @@ public class HTTPClient extends BaseHTTPClient { private static final Logger LOG = LoggerFactory.getLogger(HTTPClient.class); - private static final String SIGV4_ENABLED = "rest.sigv4-enabled"; - private static final String SIGV4_REQUEST_INTERCEPTOR_IMPL = - "org.apache.iceberg.aws.RESTSigV4Signer"; @VisibleForTesting static final String CLIENT_VERSION_HEADER = "X-Client-Version"; @VisibleForTesting @@ -90,6 +84,7 @@ public class HTTPClient extends BaseHTTPClient { private final Map baseHeaders; private final ObjectMapper mapper; private final AuthSession authSession; + private final boolean isRootClient; private HTTPClient( URI baseUri, @@ -97,7 +92,6 @@ private HTTPClient( CredentialsProvider proxyCredsProvider, Map baseHeaders, ObjectMapper objectMapper, - HttpRequestInterceptor requestInterceptor, Map properties, HttpClientConnectionManager connectionManager, AuthSession session) { @@ -110,10 +104,6 @@ private HTTPClient( clientBuilder.setConnectionManager(connectionManager); - if (requestInterceptor != null) { - clientBuilder.addRequestInterceptorLast(requestInterceptor); - } - int maxRetries = PropertyUtil.propertyAsInt(properties, REST_MAX_RETRIES, 5); clientBuilder.setRetryStrategy(new ExponentialHttpRequestRetryStrategy(maxRetries)); @@ -126,6 +116,7 @@ private HTTPClient( } this.httpClient = clientBuilder.build(); + this.isRootClient = true; } /** @@ -139,6 +130,7 @@ private HTTPClient(HTTPClient parent, AuthSession authSession) { this.mapper = parent.mapper; this.baseHeaders = parent.baseHeaders; this.authSession = authSession; + this.isRootClient = false; } @Override @@ -160,7 +152,6 @@ private static String extractResponseBodyAsString(CloseableHttpResponse response } } - // Per the spec, the only currently defined / used "success" responses are 200 and 202. private static boolean isSuccessful(CloseableHttpResponse response) { int code = response.getCode(); return code == HttpStatus.SC_OK @@ -331,50 +322,13 @@ protected T execute( @Override public void close() throws IOException { - try { - if (authSession != null) { - authSession.close(); - } - } finally { + // Do not close the AuthSession as it's managed by the owner of this HTTPClient. + // Only close the underlying Apache HTTP client if this is a root HTTPClient. + if (isRootClient) { httpClient.close(CloseMode.GRACEFUL); } } - @VisibleForTesting - static HttpRequestInterceptor loadInterceptorDynamically( - String impl, Map properties) { - HttpRequestInterceptor instance; - - DynConstructors.Ctor ctor; - try { - ctor = - DynConstructors.builder(HttpRequestInterceptor.class) - .loader(HTTPClient.class.getClassLoader()) - .impl(impl) - .buildChecked(); - } catch (NoSuchMethodException e) { - throw new IllegalArgumentException( - String.format( - "Cannot initialize RequestInterceptor, missing no-arg constructor: %s", impl), - e); - } - - try { - instance = ctor.newInstance(); - } catch (ClassCastException e) { - throw new IllegalArgumentException( - String.format("Cannot initialize, %s does not implement RequestInterceptor", impl), e); - } - - DynMethods.builder("initialize") - .hiddenImpl(impl, Map.class) - .orNoop() - .build(instance) - .invoke(properties); - - return instance; - } - static HttpClientConnectionManager configureConnectionManager( Map properties, TlsSocketStrategy tlsSocketStrategy) { PoolingHttpClientConnectionManagerBuilder connectionManagerBuilder = @@ -501,12 +455,6 @@ public HTTPClient build() { withHeader(CLIENT_VERSION_HEADER, IcebergBuild.fullVersion()); withHeader(CLIENT_GIT_COMMIT_SHORT_HEADER, IcebergBuild.gitCommitShortId()); - HttpRequestInterceptor interceptor = null; - - if (PropertyUtil.propertyAsBoolean(properties, SIGV4_ENABLED, false)) { - interceptor = loadInterceptorDynamically(SIGV4_REQUEST_INTERCEPTOR_IMPL, properties); - } - if (this.proxyCredentialsProvider != null) { Preconditions.checkNotNull( proxy, "Invalid http client proxy for proxy credentials provider: null"); @@ -518,7 +466,6 @@ public HTTPClient build() { proxyCredentialsProvider, baseHeaders, mapper, - interceptor, properties, configureConnectionManager(properties, tlsSocketStrategy), authSession); diff --git a/ice/src/test/java/com/altinity/ice/cli/internal/cmd/AlterTableTest.java b/ice/src/test/java/com/altinity/ice/cli/internal/cmd/AlterTableTest.java index a087acee..bd290fb5 100644 --- a/ice/src/test/java/com/altinity/ice/cli/internal/cmd/AlterTableTest.java +++ b/ice/src/test/java/com/altinity/ice/cli/internal/cmd/AlterTableTest.java @@ -122,7 +122,7 @@ public void testAddColumnAfter() throws Exception { catalog.buildTable(tableId, schema).create(); List updates = - Arrays.asList(new AlterTable.AddColumn("age", "long", null, "name", null, null)); + Arrays.asList(new AlterTable.AddColumn("age", "long", null, "name", null, null, null)); AlterTable.run(catalog, tableId, updates); @@ -136,7 +136,8 @@ public void testAddColumnBefore() throws Exception { catalog.buildTable(tableId, schema).create(); List updates = - Arrays.asList(new AlterTable.AddColumn("age", "long", null, null, "timestamp_col", null)); + Arrays.asList( + new AlterTable.AddColumn("age", "long", null, null, "timestamp_col", null, null)); AlterTable.run(catalog, tableId, updates); @@ -150,7 +151,7 @@ public void testAddColumnFirst() throws Exception { catalog.buildTable(tableId, schema).create(); List updates = - Arrays.asList(new AlterTable.AddColumn("age", "long", null, null, null, true)); + Arrays.asList(new AlterTable.AddColumn("age", "long", null, null, null, true, null)); AlterTable.run(catalog, tableId, updates); @@ -165,7 +166,7 @@ public void testAddColumnAfterWinsWhenBothAfterAndBeforeSet() throws Exception { catalog.buildTable(tableId, schema).create(); List updates = - Arrays.asList(new AlterTable.AddColumn("bad", "string", null, "name", "id", null)); + Arrays.asList(new AlterTable.AddColumn("bad", "string", null, "name", "id", null, null)); AlterTable.run(catalog, tableId, updates); @@ -173,4 +174,30 @@ public void testAddColumnAfterWinsWhenBothAfterAndBeforeSet() throws Exception { assertThat(table.schema().columns().stream().map(Types.NestedField::name).toList()) .containsExactly("id", "name", "bad", "timestamp_col", "date_col"); } + + @Test + public void testAddRequiredColumnOnEmptyTable() throws Exception { + catalog.buildTable(tableId, schema).create(); + + List updates = + Arrays.asList(new AlterTable.AddColumn("age", "long", null, null, null, null, true)); + + AlterTable.run(catalog, tableId, updates); + + Table table = catalog.loadTable(tableId); + assertThat(table.schema().findField("age").isRequired()).isTrue(); + } + + @Test + public void testAddOptionalColumnByDefault() throws Exception { + catalog.buildTable(tableId, schema).create(); + + List updates = + Arrays.asList(new AlterTable.AddColumn("age", "long", null, null, null, null, null)); + + AlterTable.run(catalog, tableId, updates); + + Table table = catalog.loadTable(tableId); + assertThat(table.schema().findField("age").isOptional()).isTrue(); + } } diff --git a/pom.xml b/pom.xml index e84e6369..c6ed84cc 100644 --- a/pom.xml +++ b/pom.xml @@ -14,12 +14,12 @@ 21 21 - 1.8.1 + 1.9.2 3.4.1 1.15.1 2.31.13 4.7.6 - 2.18.2 + 2.18.3 2.0.17 1.5.18 1.3.6