Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
Expand Down Expand Up @@ -111,9 +110,7 @@ private static Expression translate(ExpressionTree tree, List<PredicateLeaf> lea
*/
private static Expression translateLeaf(PredicateLeaf leaf) {
TransformSpec transformSpec = TransformSpec.fromStringWithColumnName(leaf.getColumnName());
String columnName = transformSpec.getColumnName();
UnboundTerm<Object> column =
ObjectUtils.defaultIfNull(toTerm(columnName, transformSpec), Expressions.ref(columnName));
UnboundTerm<Object> column = SchemaUtils.toTerm(transformSpec);

switch (leaf.getOperator()) {
case EQUALS:
Expand Down Expand Up @@ -144,30 +141,6 @@ private static Expression translateLeaf(PredicateLeaf leaf) {
}
}

public static UnboundTerm<Object> toTerm(String columnName, TransformSpec transformSpec) {
if (transformSpec == null) {
return null;
}
switch (transformSpec.getTransformType()) {
case YEAR:
return Expressions.year(columnName);
case MONTH:
return Expressions.month(columnName);
case DAY:
return Expressions.day(columnName);
case HOUR:
return Expressions.hour(columnName);
case TRUNCATE:
return Expressions.truncate(columnName, transformSpec.getTransformParam());
case BUCKET:
return Expressions.bucket(columnName, transformSpec.getTransformParam());
case IDENTITY:
return null;
default:
throw new UnsupportedOperationException("Unknown transformSpec: " + transformSpec);
}
}

// PredicateLeafImpl has a work-around for Kryo serialization with java.util.Date objects where it converts values to
// Timestamp using Date#getTime. This conversion discards microseconds, so this is a necessary to avoid it.
private static final DynFields.UnboundField<?> LITERAL_FIELD = DynFields.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
package org.apache.iceberg.mr.hive;

import java.io.IOException;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
Expand All @@ -31,11 +29,9 @@
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -137,7 +133,6 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
Expand Down Expand Up @@ -574,54 +569,47 @@ public void rollbackAlterTable(org.apache.hadoop.hive.metastore.api.Table hmsTab

@Override
public void preTruncateTable(org.apache.hadoop.hive.metastore.api.Table table, EnvironmentContext context,
List<String> partNames)
throws MetaException {
List<String> partNames) throws MetaException {

this.tableProperties = IcebergTableProperties.getTableProperties(table, conf);
this.icebergTable = Catalogs.loadTable(conf, tableProperties);
Map<String, PartitionField> partitionFieldMap = icebergTable.spec().fields().stream()
.collect(Collectors.toMap(PartitionField::name, Function.identity()));
Expression finalExp = CollectionUtils.isEmpty(partNames) ? Expressions.alwaysTrue() : Expressions.alwaysFalse();
if (partNames != null) {
for (String partName : partNames) {
Map<String, String> specMap = Warehouse.makeSpecFromName(partName);
Expression subExp = Expressions.alwaysTrue();
for (Map.Entry<String, String> entry : specMap.entrySet()) {
// Since Iceberg encodes the values in UTF-8, we need to decode it.
String partColValue = URLDecoder.decode(entry.getValue(), StandardCharsets.UTF_8);

if (partitionFieldMap.containsKey(entry.getKey())) {
PartitionField partitionField = partitionFieldMap.get(entry.getKey());
Type resultType = partitionField.transform().getResultType(icebergTable.schema()
.findField(partitionField.sourceId()).type());
TransformSpec.TransformType transformType = TransformSpec.fromString(partitionField.transform().toString());
Object value = Conversions.fromPartitionString(resultType, partColValue);
Iterable iterable = () -> Collections.singletonList(value).iterator();
if (TransformSpec.TransformType.IDENTITY.equals(transformType)) {
Expression boundPredicate = Expressions.in(partitionField.name(), iterable);
subExp = Expressions.and(subExp, boundPredicate);
} else {
throw new MetaException(
String.format("Partition transforms are not supported via truncate operation: %s", entry.getKey()));
}
} else {
throw new MetaException(String.format("No partition column/transform name by the name: %s",
entry.getKey()));
}
}
finalExp = Expressions.or(finalExp, subExp);
}
}

Expression predicate = buildDeletePredicate(partNames);

DeleteFiles delete = icebergTable.newDelete();
String branchName = context.getProperties().get(Catalogs.SNAPSHOT_REF);
if (branchName != null) {
delete.toBranch(HiveUtils.getTableSnapshotRef(branchName));
}
delete.deleteFromRowFilter(finalExp);

delete.deleteFromRowFilter(predicate);
delete.commit();
context.putToProperties("truncateSkipDataDeletion", "true");
}

private Expression buildDeletePredicate(List<String> partNames) throws MetaException {
if (CollectionUtils.isEmpty(partNames)) {
return Expressions.alwaysTrue();
}

Expression predicate = Expressions.alwaysFalse();

for (String partName : partNames) {
try {
Map<String, String> partSpec = Warehouse.makeSpecFromName(partName);
Expression partitionExpr = IcebergTableUtil.generateExpressionFromPartitionSpec(
icebergTable, partSpec, true);

predicate = Expressions.or(predicate, partitionExpr);
} catch (Exception e) {
throw new MetaException(
"Failed to generate expression for partition: " + partName + ". " + e.getMessage());
}
}

return predicate;
}

@Override public boolean createHMSTableInHook() {
return createHMSTableInHook;
}
Expand Down Expand Up @@ -1029,10 +1017,7 @@ private static UnboundPredicate<Object> getPartitionPredicate(PartitionData part
String columName = schema.findField(field.sourceId()).name();
TransformSpec transformSpec = TransformSpec.fromString(field.transform().toString(), columName);

UnboundTerm<Object> partitionColumn =
ObjectUtils.defaultIfNull(HiveIcebergFilterFactory.toTerm(columName, transformSpec),
Expressions.ref(field.name()));

UnboundTerm<Object> partitionColumn = SchemaUtils.toTerm(transformSpec);
return Expressions.equal(partitionColumn, partitionData.get(index, Object.class));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -866,8 +866,7 @@ public List<TransformSpec> getPartitionTransformSpec(org.apache.hadoop.hive.ql.m
return table.spec().fields().stream()
.filter(f -> !f.transform().isVoid())
.map(f -> {
TransformSpec spec = IcebergTableUtil.getTransformSpec(
table, f.transform().toString(), f.sourceId());
TransformSpec spec = IcebergTableUtil.getTransformSpec(table, f.transform().toString(), f.sourceId());
spec.setFieldName(f.name());
return spec;
})
Expand All @@ -882,8 +881,7 @@ public Map<Integer, List<TransformSpec>> getPartitionTransformSpecs(
e.getValue().fields().stream()
.filter(f -> !f.transform().isVoid())
.map(f -> {
TransformSpec spec = IcebergTableUtil.getTransformSpec(
table, f.transform().toString(), f.sourceId());
TransformSpec spec = IcebergTableUtil.getTransformSpec(table, f.transform().toString(), f.sourceId());
spec.setFieldName(f.name());
return Pair.of(e.getKey(), spec);
}))
Expand All @@ -893,9 +891,8 @@ public Map<Integer, List<TransformSpec>> getPartitionTransformSpecs(

private List<TransformSpec> getSortTransformSpec(Table table) {
return table.sortOrder().fields().stream().map(s ->
IcebergTableUtil.getTransformSpec(table, s.transform().toString(), s.sourceId())
)
.collect(Collectors.toList());
IcebergTableUtil.getTransformSpec(table, s.transform().toString(), s.sourceId()))
.toList();
}

@Override
Expand Down Expand Up @@ -2024,8 +2021,7 @@ public void validatePartSpec(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
* @param hmsTable A Hive table instance.
* @param partitionSpec Map containing partition specification given by user.
* @return true if we can perform metadata delete, otherwise false.
* @throws SemanticException Exception raised when a partition transform is being used
* or when partition column is not present in the table.
* @throws SemanticException Exception raised when partition column is not present in the table.
*/
@Override
public boolean canUseTruncate(org.apache.hadoop.hive.ql.metadata.Table hmsTable, Map<String, String> partitionSpec)
Expand All @@ -2037,13 +2033,16 @@ public boolean canUseTruncate(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
return false;
}

Expression finalExp = IcebergTableUtil.generateExpressionFromPartitionSpec(table, partitionSpec, true);
FindFiles.Builder builder = new FindFiles.Builder(table).withRecordsMatching(finalExp);
Expression partitionExpr = IcebergTableUtil.generateExpressionFromPartitionSpec(
table, partitionSpec, true);

FindFiles.Builder builder = new FindFiles.Builder(table).withRecordsMatching(partitionExpr);
Set<DataFile> dataFiles = Sets.newHashSet(builder.collect());

boolean result = true;
for (DataFile dataFile : dataFiles) {
PartitionData partitionData = (PartitionData) dataFile.partition();
Expression residual = ResidualEvaluator.of(table.spec(), finalExp, false)
Expression residual = ResidualEvaluator.of(table.spec(), partitionExpr, false)
.residualFor(partitionData);
if (!residual.isEquivalentTo(Expressions.alwaysTrue())) {
result = false;
Expand All @@ -2056,8 +2055,7 @@ public boolean canUseTruncate(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
@Override
public List<Partition> getPartitions(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
Map<String, String> partitionSpec, boolean latestSpecOnly) throws SemanticException {
Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
List<String> partNames = IcebergTableUtil.getPartitionNames(table, partitionSpec, latestSpecOnly);
List<String> partNames = IcebergTableUtil.getPartitionNames(conf, hmsTable, partitionSpec, latestSpecOnly);
return IcebergTableUtil.convertNameToMetastorePartition(hmsTable, partNames);
}

Expand All @@ -2078,12 +2076,39 @@ public boolean isPartitioned(org.apache.hadoop.hive.ql.metadata.Table hmsTable)
@Override
public Partition getPartition(org.apache.hadoop.hive.ql.metadata.Table table,
Map<String, String> partitionSpec, RewritePolicy policy) throws SemanticException {

validatePartSpec(table, partitionSpec, policy);

boolean isDescTable = SessionStateUtil.getQueryState(conf)
.map(QueryState::getHiveOperation)
.filter(op -> op == HiveOperation.DESCTABLE)
.isPresent();

if (!isDescTable) {
return createDummyPartitionHandle(table, partitionSpec);
}

Partition partition = IcebergTableUtil.getPartition(conf, table, partitionSpec);

// Populate basic statistics
if (partition != null) {
Map<String, String> stats = getBasicStatistics(Partish.buildFor(table, partition));
if (stats != null && !stats.isEmpty()) {
partition.getTPartition().setParameters(stats);
}
}

return partition;
}

private static DummyPartition createDummyPartitionHandle(
org.apache.hadoop.hive.ql.metadata.Table table, Map<String, String> partitionSpec)
throws SemanticException {
try {
String partName = Warehouse.makePartName(partitionSpec, false);
return new DummyPartition(table, partName, partitionSpec);
String partitionName = Warehouse.makePartName(partitionSpec, false);
return new DummyPartition(table, partitionName, partitionSpec);
} catch (MetaException e) {
throw new SemanticException("Unable to construct name for dummy partition due to: ", e);
throw new SemanticException("Unable to construct partition name", e);
}
}

Expand All @@ -2096,8 +2121,7 @@ public Partition getPartition(org.apache.hadoop.hive.ql.metadata.Table table,
*/
public List<String> getPartitionNames(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
Map<String, String> partitionSpec) throws SemanticException {
Table icebergTable = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
return IcebergTableUtil.getPartitionNames(icebergTable, partitionSpec, false);
return IcebergTableUtil.getPartitionNames(conf, hmsTable, partitionSpec, false);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ public static void appendFiles(URI fromURI, String format, Table icebergTbl, boo
if (isOverwrite) {
DeleteFiles delete = transaction.newDelete();
if (partitionSpec != null) {
Expression partitionExpr =
IcebergTableUtil.generateExpressionFromPartitionSpec(icebergTbl, partitionSpec, true);
Expression partitionExpr = IcebergTableUtil.generateExpressionFromPartitionSpec(
icebergTbl, partitionSpec, true);
delete.deleteFromRowFilter(partitionExpr);
} else {
delete.deleteFromRowFilter(Expressions.alwaysTrue());
Expand Down
Loading