diff --git a/api/src/org/labkey/api/data/MaterializedQueryHelper.java b/api/src/org/labkey/api/data/MaterializedQueryHelper.java index de63430869b..226abff3031 100644 --- a/api/src/org/labkey/api/data/MaterializedQueryHelper.java +++ b/api/src/org/labkey/api/data/MaterializedQueryHelper.java @@ -17,6 +17,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.Strings; +import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.junit.After; @@ -29,6 +30,8 @@ import org.labkey.api.test.TestWhen; import org.labkey.api.util.GUID; import org.labkey.api.util.HeartBeat; +import org.labkey.api.util.JobRunner; +import org.labkey.api.util.logging.LogHelper; import org.labkey.api.util.MemTracker; import org.labkey.api.util.UnexpectedException; @@ -40,8 +43,8 @@ import java.util.Map; import java.util.Objects; import java.util.Random; -import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -56,6 +59,9 @@ */ public class MaterializedQueryHelper implements CacheListener, AutoCloseable { + private static final Logger LOG = LogHelper.getLogger(MaterializedQueryHelper.class, "Materialized query helper"); + private static final JobRunner _materializationRunner = new JobRunner("MaterializedQueryHelper", 1); + public static class Materialized { MaterializedQueryHelper _mqh; @@ -106,6 +112,19 @@ public void reset() i.stillValid(now); } + /** + * Returns true if calling getFromSql() performs synchronous DB work (full rebuild via SELECT INTO, + * or incremental update SQL). This check is non-destructive: it does not consume the invalidation state. + */ + public boolean needsSynchronousWork() + { + for (Invalidator i : _invalidators) + { + if (!i.peekValid(_created)) + return true; + } + return false; + } /** return false if we did not acquire the loadingLock */ boolean load(SQLFragment selectQuery, boolean isSelectInto) @@ -143,7 +162,10 @@ boolean load(SQLFragment selectQuery, boolean isSelectInto) } else { - selectInto = new SQLFragment("SELECT * INTO ").appendIdentifier("\"" + temp.getName() + "\"").append(".").appendIdentifier("\"" + _tableName + "\"").append("\nFROM (\n"); + // UNLOGGED skips WAL when populating and indexing the table; only supported in PostgreSQL. + selectInto = new SQLFragment("SELECT * INTO ") + .append(_mqh._unlogged && _mqh._scope.getSqlDialect().isPostgreSQL() ? "UNLOGGED " : "") + .appendIdentifier(temp.getName()).append(".").appendIdentifier(_tableName).append("\nFROM (\n"); selectInto.append(selectQuery); selectInto.append("\n) _sql_"); } @@ -205,6 +227,16 @@ public CacheCheck checkValid(long createdTime) return CacheCheck.COALESCE; } public abstract boolean stillValid(long createdTime); + + /** + * Non-destructive validity check. Returns false if the entry should be considered stale without updating + * any stored state. Use this in read-only probes (e.g. {@link Materialized#needsSynchronousWork()}) to avoid + * consuming invalidation state before the actual update logic has run. + */ + public boolean peekValid(long createdTime) + { + return true; + } } @@ -248,6 +280,12 @@ public boolean stillValid(long createdTime) { return _maxTime != -1 && createdTime + _maxTime > HeartBeat.currentTimeMillis(); } + + @Override + public boolean peekValid(long createdTime) + { + return stillValid(createdTime); // pure time arithmetic; non-destructive + } } @@ -271,19 +309,55 @@ public boolean stillValid(long createdTime) _result.set(newResult); return false; } - } + /** + * Non-destructive validity check: returns false if the supplier value has changed since the last + * {@link #stillValid} call, without updating the stored snapshot. + * Use this in read-only staleness checks (e.g. {@code needsSynchronousWork()}) to avoid consuming + * the invalidation before the incremental-update logic has had a chance to run. + */ + public boolean peekValid() + { + return Strings.CS.equals(_result.get(), _supplier.get()); + } + + @Override + public boolean peekValid(long createdTime) + { + return peekValid(); + } + + /** + * Current supplier value, with no change to the stored snapshot. Capture this before doing work, + * then pass it to {@link #markValidAs} once the work has committed. + */ + public String current() + { + return _supplier.get(); + } + + /** + * Accept a previously {@link #current() captured} supplier value as the new snapshot, marking the + * entry valid up to that point. Call this only after the work the invalidation represents has actually + * committed, so that a concurrent {@link #peekValid()} reports stale (not valid) for the entire duration + * of the work. + */ + public void markValidAs(String token) + { + _result.set(token); + } + } private String makeKey(DbScope.Transaction t) { return (null == t ? "-" : t.getId()); } - protected final String _prefix; protected final DbScope _scope; private final SQLFragment _selectQuery; private final boolean _isSelectIntoSql; + private final boolean _unlogged; protected final SQLFragment _uptodateQuery; protected final Supplier _supplier; private final List _indexes = new ArrayList<>(); @@ -304,11 +378,12 @@ protected boolean removeEldestEntry(Map.Entry eldest) private final AtomicInteger _countGetFromSql = new AtomicInteger(); private final AtomicInteger _countSelectInto = new AtomicInteger(); private final AtomicLong _lastUsed = new AtomicLong(HeartBeat.currentTimeMillis()); + private final AtomicBoolean _backgroundTaskRunning = new AtomicBoolean(false); private boolean _closed = false; protected MaterializedQueryHelper(String prefix, DbScope scope, SQLFragment select, @Nullable SQLFragment uptodate, Supplier supplier, @Nullable Collection indexes, long maxTimeToCache, - boolean isSelectIntoSql) + boolean isSelectIntoSql, boolean unlogged) { _prefix = Objects.toString(prefix,"mat"); _scope = scope; @@ -319,6 +394,7 @@ protected MaterializedQueryHelper(String prefix, DbScope scope, SQLFragment sele if (null != indexes) _indexes.addAll(indexes); _isSelectIntoSql = isSelectIntoSql; + _unlogged = unlogged; assert MemTracker.get().put(this); } @@ -338,6 +414,53 @@ public void clearCaches() _map.clear(); } + /** + * Submits a background task to build or incrementally update the materialized view. + * Only one background task runs at a time per MQH instance (guarded by CAS on _backgroundTaskRunning). + */ + public void materializeAsync() + { + if (_backgroundTaskRunning.compareAndSet(false, true)) + { + _materializationRunner.execute(() -> { + try + { + getFromSql("_bg_"); + } + catch (Exception e) + { + LOG.warn("Background materialization failed.", e); + } + finally + { + _backgroundTaskRunning.set(false); + } + }); + } + } + + /** + * Non-blocking variant of {@link #getFromSql(String)}: returns a SQL fragment referencing the cached materialized + * temp table only if the view is currently LOADED with no pending synchronous work, without triggering any rebuild + * or incremental-update SQL. Returns {@code null} if the view is not available (not yet built, still loading, or + * stale), so the caller can fall back immediately without blocking. + */ + public @Nullable SQLFragment tryGetFromSqlIfLoaded(String tableAlias) + { + Materialized m = _map.get(makeKey(null)); + if (m == null || m._loadingState.get() != Materialized.LoadingState.LOADED) + return null; + if (m.needsSynchronousWork()) + return null; + _lastUsed.set(HeartBeat.currentTimeMillis()); + _countGetFromSql.incrementAndGet(); + SQLFragment sqlf = new SQLFragment(m._fromSql); + if (!StringUtils.isBlank(tableAlias)) + sqlf.append(" ").append(tableAlias); + sqlf.addTempToken(m); + return sqlf; + } + /** * NOTE: invalidating within a transaction, may NOT force re-materialize for subsequent call within the transaction * because it could re-use the global cached result. @@ -361,9 +484,6 @@ public synchronized void uncache(final Container c) } } - - private final Set _pending = null; - // this is a method so you can subclass MaterializedQueryHelper protected String getUpToDateKey() { @@ -408,14 +528,12 @@ public int upsert(SQLFragment sqlf) } } - /* used by FLow directly for some reason */ public SQLFragment getFromSql(@NotNull SQLFragment selectQuery, String tableAlias) { return getFromSql(selectQuery, false, tableAlias); } - public SQLFragment getFromSql(@NotNull SQLFragment selectQuery, boolean isSelectInto, String tableAlias) { Materialized materialized = getMaterializedAndLoad(selectQuery, isSelectInto); @@ -430,12 +548,10 @@ public SQLFragment getFromSql(@NotNull SQLFragment selectQuery, boolean isSelect return sqlf; } - protected void incrementalUpdateBeforeSelect(Materialized m) { } - /** * A Materialized represents a particular instance of materialized view (stored in a temp table). * We want to avoid two threads materializing the same view. This is why we synchronize first creating the @@ -516,7 +632,6 @@ private Materialized getMaterialized(boolean forWrite) } } - private Materialized getMaterializedAndLoad(SQLFragment selectQuery, boolean isSelectIntoSql) { Materialized materialized = getMaterialized(false); @@ -565,17 +680,16 @@ protected void initMaterialized(Materialized materialized) @Deprecated // use Builder public static MaterializedQueryHelper create(String prefix, DbScope scope, SQLFragment select, @Nullable SQLFragment uptodate, Collection indexes, long maxTimeToCache) { - return new MaterializedQueryHelper(prefix, scope, select, uptodate, null, indexes, maxTimeToCache, false); + return new MaterializedQueryHelper(prefix, scope, select, uptodate, null, indexes, maxTimeToCache, false, false); } @Deprecated // use Builder public static MaterializedQueryHelper create(String prefix, DbScope scope, SQLFragment select, Supplier uptodate, Collection indexes, long maxTimeToCache) { - return new MaterializedQueryHelper(prefix, scope, select, null, uptodate, indexes, maxTimeToCache, false); + return new MaterializedQueryHelper(prefix, scope, select, null, uptodate, indexes, maxTimeToCache, false, false); } - public static class Builder implements org.labkey.api.data.Builder { protected final String _prefix; @@ -583,6 +697,7 @@ public static class Builder implements org.labkey.api.data.Builder _supplier = null; @@ -602,6 +717,16 @@ public Builder setIsSelectInto(boolean b) return this; } + /** + * Request that the materialized table be created UNLOGGED, skipping WAL when populating and indexing it. + * Honored only on PostgreSQL and only for the helper-generated SELECT INTO (not setIsSelectInto). + */ + public Builder unlogged(boolean b) + { + _unlogged = b; + return this; + } + public Builder upToDateSql(SQLFragment uptodate) { _uptodate = uptodate; @@ -629,11 +754,10 @@ public Builder addIndex(String index) @Override public MaterializedQueryHelper build() { - return new MaterializedQueryHelper(_prefix, _scope, _select, _uptodate, _supplier, _indexes, _max, _isSelectInto); + return new MaterializedQueryHelper(_prefix, _scope, _select, _uptodate, _supplier, _indexes, _max, _isSelectInto, _unlogged); } } - @TestWhen(BVT) public static class TestCase extends Assert { @@ -737,5 +861,63 @@ private void _join(List list) { list.forEach((t) -> {try{t.join();}catch(InterruptedException x){/* */}}); } + + @Test + public void testPeekValidNonDestructive() + { + // Arrange: supplier starts at "A", reset stores "A" + AtomicReference supplierValue = new AtomicReference<>("A"); + SupplierInvalidator inv = new SupplierInvalidator(supplierValue::get); + inv.stillValid(0); // seed the stored snapshot to "A" + + // Verify valid when unchanged + assertTrue("peekValid should return true when supplier unchanged", inv.peekValid()); + assertTrue("peekValid(long) should return true when supplier unchanged", inv.peekValid(0)); + + // Change supplier without calling stillValid + supplierValue.set("B"); + + // peekValid should detect the change + assertFalse("peekValid should return false after supplier changes", inv.peekValid()); + assertFalse("peekValid(long) should return false after supplier changes", inv.peekValid(0)); + + // Critically: calling peekValid must NOT have consumed the invalidation. + // stillValid should still return false (snapshot is still "A", supplier is "B"). + assertFalse("stillValid must still return false after peekValid — invalidation was not consumed", inv.stillValid(0)); + } + + @Test + public void testNeedsSynchronousWorkNonDestructive() + { + DbSchema temp = DbSchema.getTemp(); + DbScope s = temp.getScope(); + AtomicReference supplierValue = new AtomicReference<>("v1"); + SQLFragment select = new SQLFragment("SELECT * FROM temp.MQH_TESTCASE"); + + try (MaterializedQueryHelper mqh = new Builder("test", s, select) + .addInvalidCheck(supplierValue::get) + .build()) + { + // Materialize once so m is LOADED and snapshot is "v1" + mqh.getFromSql("_"); + + Materialized m = mqh._map.get(mqh.makeKey(null)); + assertNotNull(m); + assertFalse("should not need synchronous work right after materialization", m.needsSynchronousWork()); + + // Invalidate by changing the supplier + supplierValue.set("v2"); + assertTrue("should need synchronous work after supplier changes", m.needsSynchronousWork()); + + // Call needsSynchronousWork a second time — must still report stale (state was not consumed) + assertTrue("needsSynchronousWork must remain true; state must not have been consumed", m.needsSynchronousWork()); + + // Consuming the state via getFromSql triggers a rebuild; afterwards no synchronous work needed + mqh.getFromSql("_"); + Materialized m2 = mqh._map.get(mqh.makeKey(null)); + assertNotNull(m2); + assertFalse("no synchronous work needed after rebuild", m2.needsSynchronousWork()); + } + } } } diff --git a/experiment/src/org/labkey/experiment/api/ExpMaterialTableImpl.java b/experiment/src/org/labkey/experiment/api/ExpMaterialTableImpl.java index ed99b811e37..39c13581874 100644 --- a/experiment/src/org/labkey/experiment/api/ExpMaterialTableImpl.java +++ b/experiment/src/org/labkey/experiment/api/ExpMaterialTableImpl.java @@ -907,7 +907,7 @@ protected void populateColumns() SQLFragment sql; UserSchema plateUserSchema; - // Issue 53194 : this would be the case for linked to study samples. The contextual role is set up from the study dataset + // Issue 53194: this would be the case for linked to study samples. The contextual role is set up from the study dataset // for the source sample, we want to allow the plate schema to inherit any contextual roles to allow querying // against tables in that schema. if (_userSchema instanceof UserSchema.HasContextualRoles samplesSchema && !samplesSchema.getContextualRoles().isEmpty()) @@ -988,7 +988,7 @@ protected void populateColumns() private ContainerFilter getSampleStatusLookupContainerFilter() { - // The default lookup container filter is Current, but we want to have the default be CurrentPlusProjectAndShared + // The default lookup container filter is Current. However, we want to have the default be CurrentPlusProjectAndShared // for the sample status lookup since in the app project context we want to share status definitions across // a given project instead of creating duplicate statuses in each subfolder project. ContainerFilter.Type type = QueryService.get().getContainerFilterTypeForLookups(getContainer()); @@ -1058,7 +1058,7 @@ private void addSampleTypeColumns(ExpSampleType st, List visibleColumn for (ColumnInfo dbColumn : dbTable.getColumns()) { - // Don't include PHI columns in full text search index + // Don't include PHI columns in the full-text search index // CONSIDER: Can we move this to a base class? Maybe in .addColumn() if (schema.getUser().isSearchUser() && !dbColumn.getPHI().isLevelAllowed(PHI.NotPHI)) continue; @@ -1111,7 +1111,7 @@ protected boolean isDisabledInput(RenderContext ctx) if (propColumn.isMvEnabled()) { // The column in the physical table has a "_MVIndicator" suffix, but we want to expose - // it with a "MVIndicator" suffix (no underscore) + // it with an "MVIndicator" suffix (no underscore) var mvColumn = new AliasedColumn(this, dp.getName() + MvColumn.MV_INDICATOR_SUFFIX, StorageProvisioner.get().getMvIndicatorColumn(dbTable, dp.getPropertyDescriptor(), "No MV column found for '" + dp.getName() + "' in sample type '" + getName() + "'")); mvColumn.setLabel(dp.getLabel() != null ? dp.getLabel() : dp.getName() + " MV Indicator"); @@ -1176,27 +1176,28 @@ public SQLFragment getFromSQL(String alias) public SQLFragment getFromSQLExpanded(String alias, Set selectedColumns) { SQLFragment sql = new SQLFragment("("); - boolean usedMaterialized; - // SELECT FROM - /* NOTE We want to avoid caching in paths where the table is actively being updated (e.g. loadRows) - * Unfortunately, we don't _really_ know when this is, but if we in a transaction that's a good guess. - * Also, we may use RemapCache for material lookup outside a transaction - */ + // NOTE: We want to avoid caching in paths where the table is actively being updated (e.g., loadRows). + // Unfortunately, we don't _really_ know when this is, but if we are in a transaction, that's a good guess. + // Also, we may use RemapCache for material lookup outside a transaction. boolean onlyMaterialColums = false; if (null != selectedColumns && !selectedColumns.isEmpty()) onlyMaterialColums = selectedColumns.stream().allMatch(fk -> fk.getName().equalsIgnoreCase("Folder") || null != _rootTable.getColumn(fk)); + + boolean usedMaterialized = false; if (!onlyMaterialColums && null != _ss && null != _ss.getTinfo() && !getExpSchema().getDbSchema().getScope().isTransactionActive()) { - sql.append(getMaterializedSQL()); - usedMaterialized = true; + SQLFragment materializedSql = getMaterializedSQL(); + if (materializedSql != null) + { + sql.append(materializedSql); + usedMaterialized = true; + } } - else - { + + if (!usedMaterialized) sql.append(getJoinSQL(selectedColumns)); - usedMaterialized = false; - } // WHERE SQLFragment filterFrag = getFilter().getSQLFragment(_rootTable, null); @@ -1221,7 +1222,7 @@ public void setSupportTableRules(boolean b) } @Override - public boolean supportTableRules() // intentional override + public boolean supportTableRules() { return _supportTableRules; } @@ -1235,7 +1236,6 @@ public boolean supportTableRules() // intentional override return TableRulesManager.get().getTableRules(definitionContainer, getUserSchema().getUser(), getUserSchema().getContainer()); } - static class InvalidationCounters { public final AtomicLong update, insert, delete, rollup; @@ -1349,13 +1349,10 @@ private static InvalidationCounters getInvalidateCounters(String lsid) return _invalidationCounters.computeIfAbsent(lsid, (_) -> new InvalidationCounters()); } - /* SELECT and JOIN, does not include WHERE, same as getJoinSQL() */ - private SQLFragment getMaterializedSQL() + /** Look up (or build and cache) the {@link _MaterializedQueryHelper} for this table's sample type. Caller must ensure {@code _ss != null}. */ + private _MaterializedQueryHelper getOrCreateMQH() { - if (null == _ss) - return getJoinSQL(null); - - var mqh = _materializedQueries.get(_ss.getLSID(), null, (unusedKey, unusedArg) -> + return _materializedQueries.get(_ss.getLSID(), null, (unusedKey, unusedArg) -> { /* NOTE: MaterializedQueryHelper does have a pattern to help with detecting schema changes. * Previously it has been used on non-provisioned tables. It might be helpful to have a pattern, @@ -1367,6 +1364,7 @@ private SQLFragment getMaterializedSQL() SQLFragment viewSql = getJoinSQL(null, updateColumns).append(" WHERE CpasType = ").appendValue(_ss.getLSID()); MaterializedQueryHelper.Builder builder = new _MaterializedQueryHelper.Builder(_ss.getLSID(), "", getExpSchema().getDbSchema().getScope(), viewSql) .updateColumns(updateColumns) + .unlogged(true) .addIndex("CREATE UNIQUE INDEX uq_${NAME}_rowid ON temp.${NAME} (rowid)") .addIndex("CREATE UNIQUE INDEX uq_${NAME}_lsid ON temp.${NAME} (lsid)") .addIndex("CREATE INDEX idx_${NAME}_container ON temp.${NAME} (container)") @@ -1377,7 +1375,24 @@ private SQLFragment getMaterializedSQL() return (_MaterializedQueryHelper) builder.build(); }); - return new SQLFragment("SELECT * FROM ").append(mqh.getFromSql("_cached_view_")); + } + + /* SELECT and JOIN, does not include WHERE, same as getJoinSQL() */ + private @Nullable SQLFragment getMaterializedSQL() + { + if (null == _ss) + return getJoinSQL(null); + + var mqh = getOrCreateMQH(); + + SQLFragment tempRef = mqh.tryGetFromSqlIfLoaded("_cached_view_"); + if (tempRef == null) + { + mqh.materializeAsync(); + return null; + } + + return new SQLFragment("SELECT * FROM ").append(tempRef); } /** @@ -1434,7 +1449,7 @@ public Builder updateColumns(List updateColumns) @Override public _MaterializedQueryHelper build() { - return new _MaterializedQueryHelper(_lsid, _updateColumns, _prefix, _scope, _select, _uptodate, _supplier, _indexes, _max, _isSelectInto); + return new _MaterializedQueryHelper(_lsid, _updateColumns, _prefix, _scope, _select, _uptodate, _supplier, _indexes, _max, _isSelectInto, _unlogged); } } @@ -1448,10 +1463,11 @@ public _MaterializedQueryHelper build() Supplier supplier, @Nullable Collection indexes, long maxTimeToCache, - boolean isSelectIntoSql + boolean isSelectIntoSql, + boolean unlogged ) { - super(prefix, scope, select, uptodate, supplier, indexes, maxTimeToCache, isSelectIntoSql); + super(prefix, scope, select, uptodate, supplier, indexes, maxTimeToCache, isSelectIntoSql, unlogged); this._lsid = lsid; this._updateColumns = updateColumns; } @@ -1478,14 +1494,10 @@ protected void incrementalUpdateBeforeSelect(Materialized m) if (Materialized.LoadingState.ERROR == materialized._loadingState.get()) throw materialized._loadException; - if (!materialized.incrementalDeleteCheck.stillValid(0)) - executeIncrementalDelete(); - if (!materialized.incrementalUpdateCheck.stillValid(0)) - executeIncrementalUpdate(); - if (!materialized.incrementalRollupCheck.stillValid(0)) - executeIncrementalRollup(); - if (!materialized.incrementalInsertCheck.stillValid(0)) - executeIncrementalInsert(); + runIncremental(materialized.incrementalDeleteCheck, this::executeIncrementalDelete); + runIncremental(materialized.incrementalUpdateCheck, this::executeIncrementalUpdate); + runIncremental(materialized.incrementalRollupCheck, this::executeIncrementalRollup); + runIncremental(materialized.incrementalInsertCheck, this::executeIncrementalInsert); } catch (RuntimeException|InterruptedException ex) { @@ -1505,6 +1517,23 @@ protected void incrementalUpdateBeforeSelect(Materialized m) } } + /** + * Apply one incremental step, advancing its invalidation snapshot only after the work has committed. + * The counter is captured (via {@code current()}) before the work runs and accepted (via {@code markValidAs()}) + * afterward, so a concurrent reader's non-destructive {@code peekValid()} reports stale for the entire + * duration of the update. That keeps reads on the live (unmaterialized) query until the temp table actually + * reflects the change, instead of briefly serving a stale materialized view. Must be called while holding the + * materialized's loading lock so only one updater runs at a time. + */ + private static void runIncremental(MaterializedQueryHelper.SupplierInvalidator check, Runnable work) + { + if (check.peekValid()) + return; + String token = check.current(); + work.run(); + check.markValidAs(token); + } + void upsertWithRetry(SQLFragment sql) { // not actually read-only, but we don't want to start an explicit transaction @@ -1631,10 +1660,10 @@ private void appendSetFromSrc(SQLFragment sql) static class _Materialized extends MaterializedQueryHelper.Materialized { - final MaterializedQueryHelper.Invalidator incrementalInsertCheck; - final MaterializedQueryHelper.Invalidator incrementalRollupCheck; - final MaterializedQueryHelper.Invalidator incrementalDeleteCheck; - final MaterializedQueryHelper.Invalidator incrementalUpdateCheck; + final MaterializedQueryHelper.SupplierInvalidator incrementalInsertCheck; + final MaterializedQueryHelper.SupplierInvalidator incrementalRollupCheck; + final MaterializedQueryHelper.SupplierInvalidator incrementalDeleteCheck; + final MaterializedQueryHelper.SupplierInvalidator incrementalUpdateCheck; _Materialized(_MaterializedQueryHelper mqh, String tableName, String cacheKey, long created, String sql) { @@ -1657,6 +1686,21 @@ public void reset() incrementalUpdateCheck.stillValid(now); } + /** + * Returns true if any incremental counter has changed since the last reset, indicating that + * {@code incrementalUpdateBeforeSelect()} would perform synchronous DB work. + * Uses non-destructive {@code peekValid()} so the background task can still apply the updates. + */ + @Override + public boolean needsSynchronousWork() + { + if (super.needsSynchronousWork()) return true; + return !incrementalInsertCheck.peekValid() + || !incrementalDeleteCheck.peekValid() + || !incrementalUpdateCheck.peekValid() + || !incrementalRollupCheck.peekValid(); + } + Lock getLock() { return _loadingLock; @@ -2287,12 +2331,14 @@ private void mergeRows(ExpSampleType st, List> rows) throws } /** - * Trigger materialization (and any pending incremental update) as a side effect of {@link #getMaterializedSQL()}, - * then assert the cached temp table equals a fresh derivation of the join query in both directions. + * Trigger materialization (and any pending incremental update) synchronously, then assert the cached + * temp table equals a fresh derivation of the join query in both directions. */ private void assertCacheMatchesFreshDerivation(ExpMaterialTableImpl table, String lsid) { - SQLFragment cached = table.getMaterializedSQL(); // builds/refreshes the temp table via the normal read path + _MaterializedQueryHelper mqh = table.getOrCreateMQH(); + assertNotNull("MQH should not be null for a sample type table", mqh); + SQLFragment cached = new SQLFragment("SELECT * FROM ").append(mqh.getFromSql("_cached_view_")); SQLFragment fresh = table.getJoinSQL(null).append(" WHERE CpasType = ").appendValue(lsid); DbScope scope = ExperimentServiceImpl.getExpSchema().getScope(); assertEquals("Cached rows not found in fresh derivation", 0, countDiff(scope, cached, fresh));