From f5865d4bb9549596147d1bf66149f1635bc4817a Mon Sep 17 00:00:00 2001 From: steven-winfield-quantohm Date: Mon, 15 Jun 2026 14:09:24 +0100 Subject: [PATCH 1/3] More efficient (and less segfaulty) create_match_filter --- pyiceberg/table/upsert_util.py | 40 ++++++++++++++++++++++++---------- 1 file changed, 28 insertions(+), 12 deletions(-) diff --git a/pyiceberg/table/upsert_util.py b/pyiceberg/table/upsert_util.py index 6f32826eb0..f68d538c3b 100644 --- a/pyiceberg/table/upsert_util.py +++ b/pyiceberg/table/upsert_util.py @@ -33,19 +33,35 @@ def create_match_filter(df: pyarrow_table, join_cols: list[str]) -> BooleanExpression: unique_keys = df.select(join_cols).group_by(join_cols).aggregate([]) + if unique_keys.num_rows == 0: + return AlwaysFalse() + if len(join_cols) == 1: - return In(join_cols[0], unique_keys[0].to_pylist()) - else: - filters = [ - functools.reduce(operator.and_, [EqualTo(col, row[col]) for col in join_cols]) for row in unique_keys.to_pylist() - ] - - if len(filters) == 0: - return AlwaysFalse() - elif len(filters) == 1: - return filters[0] - else: - return Or(*filters) + return In(join_cols[0], unique_keys.column(join_cols[0]).to_pylist()) + + # Fold the column that leaves the fewest distinct "prefix" combinations into + # an In(); this minimises the disjunct count regardless of column order. + in_col = min( + join_cols, + key=lambda cand: unique_keys.select([c for c in join_cols if c != cand]) + .group_by([c for c in join_cols if c != cand]) + .aggregate([]) + .num_rows, + ) + prefix_cols = [c for c in join_cols if c != in_col] + + grouped = unique_keys.group_by(prefix_cols).aggregate([(in_col, "list")]) + in_values_col = f"{in_col}_list" + + disjuncts: list[BooleanExpression] = [] + for row in grouped.to_pylist(): + eqs = [EqualTo(c, row[c]) for c in prefix_cols] + prefix_pred = functools.reduce(operator.and_, eqs) if len(eqs) > 1 else eqs[0] + disjuncts.append(And(prefix_pred, In(in_col, row[in_values_col]))) + + if len(disjuncts) == 1: + return disjuncts[0] + return Or(*disjuncts) def has_duplicate_rows(df: pyarrow_table, join_cols: list[str]) -> bool: From 0362a91748fb36056c3d2ce0a2661acc678dead0 Mon Sep 17 00:00:00 2001 From: steven-winfield-quantohm Date: Mon, 15 Jun 2026 14:29:19 +0100 Subject: [PATCH 2/3] Missing import --- pyiceberg/table/upsert_util.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pyiceberg/table/upsert_util.py b/pyiceberg/table/upsert_util.py index f68d538c3b..c8b4225d37 100644 --- a/pyiceberg/table/upsert_util.py +++ b/pyiceberg/table/upsert_util.py @@ -23,6 +23,7 @@ from pyiceberg.expressions import ( AlwaysFalse, + And, BooleanExpression, EqualTo, In, From 269dcd2380de10fd18eda263039fe84fa490d283 Mon Sep 17 00:00:00 2001 From: steven-winfield-quantohm Date: Mon, 15 Jun 2026 14:38:52 +0100 Subject: [PATCH 3/3] Allow And(op1, op2) or And(op2, op1) in test_create_match_filter_single_condition --- tests/table/test_upsert.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/table/test_upsert.py b/tests/table/test_upsert.py index 08f90c6600..b5a0c63b02 100644 --- a/tests/table/test_upsert.py +++ b/tests/table/test_upsert.py @@ -437,10 +437,10 @@ def test_create_match_filter_single_condition() -> None: schema = pa.schema([pa.field("order_id", pa.int32()), pa.field("order_line_id", pa.int32()), pa.field("extra", pa.string())]) table = pa.Table.from_pylist(data, schema=schema) expr = create_match_filter(table, ["order_id", "order_line_id"]) - assert expr == And( - EqualTo(term=Reference(name="order_id"), literal=LongLiteral(101)), - EqualTo(term=Reference(name="order_line_id"), literal=LongLiteral(1)), - ) + # Be insensitive to left/right operands + op1 = EqualTo(term=Reference(name="order_id"), literal=LongLiteral(101)) + op2 = EqualTo(term=Reference(name="order_line_id"), literal=LongLiteral(1)) + assert expr == And(op1, op2) or expr == And(op2, op1) def test_upsert_with_duplicate_rows_in_table(catalog: Catalog) -> None: