From 1854475bc8b6f1add5f841dcb75f32f0c0f25911 Mon Sep 17 00:00:00 2001 From: lmoresi Date: Sat, 4 Jul 2026 13:23:39 +1000 Subject: [PATCH 1/5] fix(mesh): refresh _nav_coords before the kd-tree rebuild after adapt (#286) In nuke_coords_and_rebuild() the point-location kd-tree was rebuilt before self._nav_coords was refreshed from the rebuilt DM, so on an adapted mesh the rebuild indexed an old-sized coordinate array with new-mesh point ranges. When the mesh grew this raised IndexError (index 91 out of bounds for axis 0 with size 80) from swarm migration after mesh.adapt(); when it shrank it silently mislocated points. The navigation-coordinate refresh (both the volume-mesh and manifold branches) now runs before _build_kd_tree_index(). Reproduced by tests/test_0810_amr_swarm_migration_regression.py, which now passes. Underworld development team with AI support from Claude Code --- .../discretisation/discretisation_mesh.py | 44 +++++++++++-------- 1 file changed, 25 insertions(+), 19 deletions(-) diff --git a/src/underworld3/discretisation/discretisation_mesh.py b/src/underworld3/discretisation/discretisation_mesh.py index 2362bf2a..ab246d0e 100644 --- a/src/underworld3/discretisation/discretisation_mesh.py +++ b/src/underworld3/discretisation/discretisation_mesh.py @@ -2345,6 +2345,29 @@ def nuke_coords_and_rebuild( flush=True, ) + # The navigation coords (used to build the kd-tree control points and + # for point location) were captured as a reference to the ORIGINAL + # coords in __init__ and are not updated by the DM rebuild above, so + # on an adapted/deformed mesh they still describe the old geometry. + # They MUST be refreshed BEFORE _build_kd_tree_index() runs: that + # rebuild indexes _nav_coords with the NEW nav-DM point range, and a + # stale (old-sized) array raises IndexError when the mesh grew — and + # silently mislocates points otherwise (issue #286). + if getattr(self, "_nav_dm", None) is None: + self._nav_coords = numpy.asarray( + self.dm.getCoordinatesLocal().array + ).reshape(-1, self.cdim) + else: + # manifold mesh: the nav clone carries its own (ghosted) coords; + # refresh them from the rebuilt main DM where possible. + try: + self._nav_dm.setCoordinatesLocal(self.dm.getCoordinatesLocal()) + self._nav_coords = numpy.asarray( + self._nav_dm.getCoordinatesLocal().array + ).reshape(-1, self.cdim) + except Exception: + pass + self._index = None self._build_kd_tree_index() @@ -2395,25 +2418,8 @@ def nuke_coords_and_rebuild( self.boundary_face_control_points_sign = None self._domain_radius_squared = float("inf") - # The navigation coords (used to build those control points and for - # point location) were captured as a reference to the ORIGINAL coords - # in __init__ and never refreshed here, so on a volume mesh they stayed - # at the undeformed geometry — the real reason a bulged-out region read - # as exterior. Re-point them at the current (deformed) DM coordinates. - if getattr(self, "_nav_dm", None) is None: - self._nav_coords = numpy.asarray( - self.dm.getCoordinatesLocal().array - ).reshape(-1, self.cdim) - else: - # manifold mesh: the nav clone carries its own (ghosted) coords; - # refresh them from the rebuilt main DM where possible. - try: - self._nav_dm.setCoordinatesLocal(self.dm.getCoordinatesLocal()) - self._nav_coords = numpy.asarray( - self._nav_dm.getCoordinatesLocal().array - ).reshape(-1, self.cdim) - except Exception: - pass + # NB: the _nav_coords refresh for the deformed/adapted geometry happens + # ABOVE, before _build_kd_tree_index() — see the issue #286 note there. # BUGFIX(#130): recompute the DOF coordinate cache for every # already-registered variable. Variables created before this From 8c0849f17e8ea369bef652e31b5851f6fafb3435 Mon Sep 17 00:00:00 2001 From: lmoresi Date: Sat, 4 Jul 2026 13:24:04 +1000 Subject: [PATCH 2/5] fix(swarm): invalidate stale caches on no-move migrate, particle addition, and populate (BF-02) Swarm.migrate() early-returned when no particle was globally unclaimed without invalidating the canonical .data caches, the particle kd-tree, or the proxy staleness flags (SWARM-01/SWARM-02, 2026-07 audit). In serial or whenever no particle changes rank - the common case - every cached array kept the old particle count and the kd-tree kept a poisoned no-copy view of mutated coordinates. This is the root cause of issue #289: advection ends with a no-move migrate, so material proxies froze at the launch positions and time-stepped models silently used stationary material. The early return now performs the same invalidation as the fall-through path. add_particles_with_global_coordinates() gains an explicit invalidation (mirroring the #216 fix in add_particles_with_coordinates - with migrate=False nothing else invalidates), and populate() invalidates caches created before the swarm had particles (SWARM-17, verified at runtime). Collateral defect exposed by this fix, same #216 stale-cache class: Swarm.apply_snapshot_payload() wrote restored variable data into a detached np.asarray view of the canonical cache, so the DMSwarm field never received the restored values - the restore only worked because the stale canonical copy survived. It now writes through the canonical array so the PETSc pack fires (caught by test_0007_snapshot_inmemory). Regression tests: tests/test_0113_swarm_stale_cache_regression.py (SWARM-01 both migrate= variants, SWARM-17, SWARM-02 kd-tree contract and behavioural mirror probe, and the issue #289 reproducer). Underworld development team with AI support from Claude Code --- src/underworld3/swarm.py | 63 +++++- .../test_0113_swarm_stale_cache_regression.py | 195 ++++++++++++++++++ 2 files changed, 257 insertions(+), 1 deletion(-) create mode 100644 tests/test_0113_swarm_stale_cache_regression.py diff --git a/src/underworld3/swarm.py b/src/underworld3/swarm.py index 5b159944..5d1d9464 100644 --- a/src/underworld3/swarm.py +++ b/src/underworld3/swarm.py @@ -2653,6 +2653,31 @@ def __init__(self, mesh, recycle_rate=0, verbose=False, clip_to_mesh=True): self._nnmapdict = {} self._migration_disabled = False + # Deterministic (SPMD-consistent) creation index — used to order + # collective per-swarm operations identically on every rank. + self._instance_number = Swarm.instances + + # Names of variables whose canonical writes were made while + # _migration_disabled was set: the PETSc pack is deferred (not + # discarded) and flushed by _flush_pending_petsc_sync() at context + # exit (SWARM-04). + self._pending_petsc_sync = set() + + # Set when particle coordinates are written through the modern + # interface; the actual (collective) migrate() is deferred to the + # next collective point — migration-context exit or solve entry — + # never run per-write, which would deadlock when ranks write + # unevenly (SWARM-03). + self._needs_migration = False + + # SPMD-consistent guard: while True, _sync_before_assembly() must NOT + # run the deferred migration. Set by advection() around its substep + # loop — its velocity evaluations pass through Mesh.update_lvec(), and + # a migrate() there would reorder particle rows between the coordinate + # array and the velocity array captured from it. advection() runs its + # own migrate() at the end. + self._deferred_migration_suspended = False + super().__init__() # Register with the same model already captured in self._model_ref @@ -3394,6 +3419,13 @@ def populate( offset = swarm_orig_size * i self._remeshed.data[offset::, 0] = i + # Invalidate cached data — the swarm was just given its particles. + # Any canonical `.data` array created before populate() (legitimate: + # variables must be created first) is sized for the empty swarm and + # would otherwise hide every particle from reads and corrupt writes + # (SWARM-17, same stale-cache class as #216). + self._invalidate_canonical_data() + # Informational: particle population just changed. self._population_generation += 1 @@ -3465,6 +3497,19 @@ def migrate( # Unlikely, but we should check this uw.mpi.barrier() if global_unclaimed_points == 0: + # No particle needs to change rank, but we were still called + # because coordinates and/or the population may have changed + # (in-place coordinate writes, addNPoints, serial advection). + # The cached canonical arrays, the particle kd-tree, and the + # proxy variables are stale regardless of whether anything + # moved between ranks. Skipping this invalidation froze proxy + # mesh variables after serial advection (issue #289) and left + # wrong-sized `.data` caches after particle addition + # (SWARM-01/SWARM-02, 2026-07 audit). + self._invalidate_canonical_data() + self._needs_migration = False + # any explicit/terminal migrate ends an advection suspension + self._deferred_migration_suspended = False return # Migrate particles between processes (if there are more than one of them) @@ -3539,6 +3584,9 @@ def migrate( # Any particle movement (send, receive, or balanced swap) makes # cached arrays stale — both size and values may have changed. self._invalidate_canonical_data() + self._needs_migration = False + # any explicit/terminal migrate ends an advection suspension + self._deferred_migration_suspended = False return @@ -3742,6 +3790,12 @@ def add_particles_with_global_coordinates( # self._Xorig.data[...] = globalCoordinatesArray self._remeshed.data[...] = 0 + # Invalidate cached data — the particle count changed via addNPoints + # (mirrors add_particles_with_coordinates). This must not be left to + # migrate(): with migrate=False nothing else invalidates, and every + # cached `.data` array keeps the old particle count (SWARM-01). + self._invalidate_canonical_data() + if migrate: self.migrate(remove_sent_points=True, delete_lost_points=delete_lost_points) @@ -4328,7 +4382,14 @@ def apply_snapshot_payload(self, payload: dict) -> None: f"{var_clean_name!r} data shape mismatch — current " f"{current.shape} vs snapshot {saved.shape}" ) - current[...] = saved + # Write THROUGH the canonical array (not the detached + # np.asarray view above) so the PETSc pack callback fires. + # Writing into the view mutated only the cached copy: the + # DMSwarm field kept its post-realloc garbage, and the first + # cache invalidation after restore (e.g. migrate() at the end + # of advection) silently replaced the restored values with + # that garbage (exposed by the SWARM-01 invalidation fix). + var.data[...] = saved def _legacy_access(self, *writeable_vars: SwarmVariable): """ diff --git a/tests/test_0113_swarm_stale_cache_regression.py b/tests/test_0113_swarm_stale_cache_regression.py new file mode 100644 index 00000000..2e521822 --- /dev/null +++ b/tests/test_0113_swarm_stale_cache_regression.py @@ -0,0 +1,195 @@ +"""Regression tests for swarm stale-cache and stale-proxy bugs. + +Covers the Track-0 quality-audit findings (docs/reviews/2026-07): + +- SWARM-01 / BF-02: ``Swarm.migrate()`` early-returned when no particle needed + to change rank *without* invalidating the cached canonical ``.data`` arrays, + so ``add_particles_with_global_coordinates`` (whose only invalidation route + was migrate) left every variable's cached ``.data`` at the old particle + count. +- SWARM-02 / BF-02: the same early return left the cached particle kd-tree + built over a mutated coordinate buffer, silently corrupting + ``rbf_interpolate`` and every proxy update. +- SWARM-17 / BF-02: ``populate()`` never invalidated caches created before the + swarm was populated. +- GitHub issue #289: ``swarm.advection()`` left proxy mesh variables frozen at + the pre-advection particle positions (the no-move migrate early return never + marked them stale), so time-stepped models silently used stationary + material fields. +- SWARM-05 / LE-03 / BF-08 (issue #215 Bug 3): solvers read the proxy + MeshVariable's DM directly, bypassing the lazy ``.sym`` refresh, so a solve + after a ``material.data`` write consumed stale proxy values. +""" + +import numpy as np +import pytest + +import underworld3 as uw +from underworld3.meshing import UnstructuredSimplexBox + +pytestmark = [pytest.mark.level_1, pytest.mark.tier_a] + + +@pytest.fixture +def mesh(): + return UnstructuredSimplexBox( + minCoords=(0.0, 0.0), + maxCoords=(1.0, 1.0), + cellSize=1.0 / 8.0, + ) + + +# -------------------------------------------------------------------------- +# SWARM-01: cache size after add_particles_with_global_coordinates +# -------------------------------------------------------------------------- + +@pytest.mark.parametrize("migrate", [True, False]) +def test_add_particles_global_coordinates_invalidates_cache(mesh, migrate): + swarm = uw.swarm.Swarm(mesh=mesh) + var = swarm.add_variable("val", 1) + swarm.populate(fill_param=2) + + _ = var.data.shape # populate the canonical cache at the old size + + new_pts = np.array([[0.51, 0.51], [0.52, 0.52], [0.11, 0.87]]) + swarm.add_particles_with_global_coordinates(new_pts, migrate=migrate) + + dm_size = swarm.dm.getLocalSize() + assert var.data.shape[0] == dm_size, ( + f"cached .data rows ({var.data.shape[0]}) out of sync with the DMSwarm " + f"local size ({dm_size}) after add_particles_with_global_coordinates" + ) + assert ( + swarm._particle_coordinates.data.shape[0] == dm_size + ), "coordinate cache out of sync after particle addition" + + del swarm + del mesh + + +# -------------------------------------------------------------------------- +# SWARM-17: cache created before populate() +# -------------------------------------------------------------------------- + +def test_populate_invalidates_pre_existing_cache(mesh): + swarm = uw.swarm.Swarm(mesh=mesh) + var = swarm.add_variable("val", 1) + + _ = var.data.shape # create the (0-row) canonical cache before populate + + swarm.populate(fill_param=2) + + dm_size = swarm.dm.getLocalSize() + assert dm_size > 0 + assert var.data.shape[0] == dm_size, ( + f"cached .data rows ({var.data.shape[0]}) out of sync with the DMSwarm " + f"local size ({dm_size}) after populate()" + ) + + del swarm + del mesh + + +# -------------------------------------------------------------------------- +# SWARM-02: kd-tree refreshed after an in-domain coordinate move +# -------------------------------------------------------------------------- + +def test_rbf_interpolate_uses_current_positions_after_no_move_migrate(mesh): + swarm = uw.swarm.Swarm(mesh=mesh) + var = swarm.add_variable("val", 1) + swarm.populate(fill_param=3) + + # tag every particle with its launch x-coordinate + var.data[:, 0] = swarm._particle_coordinates.data[:, 0] + + probe = np.array([[0.15, 0.5], [0.85, 0.5], [0.25, 0.3], [0.75, 0.7]]) + _ = var.rbf_interpolate(probe) # builds and caches the particle kd-tree + + # mirror every particle in-domain: x -> 1 - x. The cached KDTree holds a + # NO-COPY view of the coordinate buffer, so after this in-place mutation + # its index topology (built at the old positions) is inconsistent with + # its stored points — queries return garbage, not merely frozen values. + coords = swarm._particle_coordinates.data + swarm._particle_coordinates.data[...] = np.column_stack( + [1.0 - coords[:, 0], coords[:, 1]] + ) + swarm.migrate() # serial / no-rank-change: previously hit the early return + + # the invalidation contract: migrate() must drop the cached tree + assert swarm._kdtree is None, ( + "migrate() left the cached particle kd-tree in place after an " + "in-place coordinate mutation (SWARM-02 early-return hole)" + ) + + values = var.rbf_interpolate(probe) + + # with a FRESH tree, the particle now found at probe_x carries its launch + # coordinate 1 - probe_x. + expected = 1.0 - probe[:, 0] + err = np.abs(values[:, 0] - expected).max() + assert err < 0.1, ( + f"rbf_interpolate is using a stale/poisoned kd-tree after migrate() " + f"(max error {err:.3f} against fresh-tree expectation)" + ) + + del swarm + del mesh + + +# -------------------------------------------------------------------------- +# issue #289: proxy tracks the particles through advection +# -------------------------------------------------------------------------- + +def test_issue289_proxy_tracks_particles_through_advection(): + """Reproducer from GitHub issue #289 (reduced): a blob of material is + advected upward with a uniform velocity; the lazily-refreshed proxy must + track the particle centroid, not stay frozen at the launch position.""" + mesh = UnstructuredSimplexBox( + minCoords=(0.0, 0.0), maxCoords=(1.0, 0.25), cellSize=1.0 / 24.0, qdegree=3 + ) + v = uw.discretisation.MeshVariable("V", mesh, vtype=uw.VarType.VECTOR, degree=2) + swarm = uw.swarm.Swarm(mesh=mesh) + mat = uw.swarm.IndexSwarmVariable( + "M", swarm, indices=2, proxy_degree=1, proxy_continuous=True + ) + swarm.populate(fill_param=3) + + mat.data[...] = 0 + pc = swarm._particle_coordinates.data + blob = (pc[:, 0] - 0.5) ** 2 + (pc[:, 1] - 0.107) ** 2 < 0.03**2 + assert blob.sum() > 0, "test premise: blob must contain particles" + mat.data[blob, 0] = 1 + + v.data[:, 1] = 0.01 # uniform upward velocity + + def particle_cy(): + m = mat.data[:, 0] > 0.5 + return swarm._particle_coordinates.data[m, 1].mean() + + def proxy_cy(): + pv = np.asarray(uw.function.evaluate(mat.sym[1], v.coords)).reshape(-1) + w = np.clip(pv, 0.0, None) + return float((w * np.asarray(v.coords)[:, 1]).sum() / w.sum()) + + cy_particles_before = particle_cy() + cy_proxy_before = proxy_cy() + assert abs(cy_proxy_before - cy_particles_before) < 0.01 + + swarm.advection(v.sym, 2.0, order=2, corrector=True) # blob moves up 0.02 + + cy_particles_after = particle_cy() + moved = cy_particles_after - cy_particles_before + assert moved > 0.015, "test premise: the particles must actually move" + + # lazy .sym access must now see the refreshed proxy + cy_proxy_after = proxy_cy() + tracked = cy_proxy_after - cy_proxy_before + assert tracked > 0.5 * moved, ( + f"proxy frozen after advection: particle centroid moved {moved:.4f} " + f"but proxy centroid moved only {tracked:.4f} (issue #289)" + ) + + del swarm + del mesh + + From da8f28d322b67981a019578a83bcb67db78e64fd Mon Sep 17 00:00:00 2001 From: lmoresi Date: Sat, 4 Jul 2026 13:24:35 +1000 Subject: [PATCH 3/5] fix(swarm): flush writes made while migration is suppressed instead of discarding them (BF-05) Variable and coordinate writes made inside migration_disabled() / migration_control() were silently discarded (SWARM-04, 2026-07 audit): both PETSc-sync callbacks early-returned while _migration_disabled was set, nothing ever re-packed the canonical arrays, migrate() then read the stale PETSc coordinates, and its trailing invalidation destroyed the only copy of the writes. The docstrings explicitly recommended writing inside the context, and the parallel test covering the pattern asserted vacuously. Suppressing migration now defers only the PETSc pack, never the data: writes made while the flag is set are recorded per variable and flushed into the DMSwarm by Swarm._flush_pending_petsc_sync() at context exit (and defensively at migrate() entry). Both context-manager modes flush; only the migrate() call itself is deferred (default) or skipped (disable=True). Docstrings updated to state the actual contract. This commit also introduces Swarm._sync_before_assembly() (adjacent to the flush helper): the collective solve-entry synchronisation used by the deferred-migration and stale-proxy fixes. It is wired into the mesh assembly path in the follow-up commit. tests/parallel/test_0755_swarm_global_stats.py's perturbation block now asserts the write actually reaches the DMSwarm (previously vacuous), and tests/parallel/test_0756_swarm_migration_semantics.py (added in the follow-up commit) covers write survival at np2. Underworld development team with AI support from Claude Code --- src/underworld3/swarm.py | 135 +++++++++++++++++- .../parallel/test_0755_swarm_global_stats.py | 15 ++ 2 files changed, 145 insertions(+), 5 deletions(-) diff --git a/src/underworld3/swarm.py b/src/underworld3/swarm.py index 5d1d9464..a252b508 100644 --- a/src/underworld3/swarm.py +++ b/src/underworld3/swarm.py @@ -425,8 +425,13 @@ def variable_update_callback(array, change_context): if not data_changed: return - # Skip updates during coordinate changes to prevent corruption - if hasattr(var.swarm, "_migration_disabled") and var.swarm._migration_disabled: + # While migration is suppressed, DEFER the PETSc pack rather than + # discarding the write: the DMSwarm layout may be mid-change, so + # packing now could corrupt it, but the user's values must survive. + # They are flushed by Swarm._flush_pending_petsc_sync() when the + # migration-control context exits (SWARM-04). + if getattr(var.swarm, "_migration_disabled", False): + var.swarm._pending_petsc_sync.add(var.clean_name) return # Persist changes to PETSc (like swarm callback updates coordinates) @@ -476,8 +481,16 @@ def canonical_data_callback(array, change_context): if not data_changed: return - # Skip updates during migration to prevent corruption - if hasattr(self.swarm, "_migration_disabled") and self.swarm._migration_disabled: + # While migration is suppressed, DEFER the PETSc pack rather than + # discarding the write: the DMSwarm layout may be mid-change, so + # packing now could corrupt it, but the user's values must survive + # in the canonical array. They are flushed to PETSc by + # Swarm._flush_pending_petsc_sync() when the migration-control + # context exits (SWARM-04: previously these writes were silently + # lost — nothing re-packed, and migrate()'s trailing invalidation + # destroyed the only copy). + if getattr(self.swarm, "_migration_disabled", False): + self.swarm._pending_petsc_sync.add(self.clean_name) return # Check for None array to prevent copy errors @@ -2750,6 +2763,100 @@ def _invalidate_canonical_data(self): # Invalidate cached spatial index self._kdtree = None + def _flush_pending_petsc_sync(self): + """Pack canonical arrays written while migration was suppressed. + + Writes made inside ``migration_control()`` / ``migration_disabled()`` + land in each variable's canonical array but their PETSc pack is + deferred (see the sync callbacks). This flushes them into the DMSwarm + fields. Called on migration-context exit and defensively at + :meth:`migrate` entry; a no-op when nothing is pending (SWARM-04). + """ + pending, self._pending_petsc_sync = self._pending_petsc_sync, set() + for name in pending: + var = self._vars.get(name, None) + if var is None: + continue + canonical = getattr(var, "_canonical_data", None) + if canonical is None: + # cache was invalidated after the write; PETSc already holds + # the authoritative data — nothing left to flush. + continue + arr = np.asarray(canonical).reshape(-1, var.num_components) + if arr.shape[0] != max(self.dm.getLocalSize(), 0): + raise RuntimeError( + f"Cannot flush deferred writes for swarm variable " + f"'{name}': cached array has {arr.shape[0]} rows but the " + f"DMSwarm holds {self.dm.getLocalSize()} particles. The " + "particle layout changed while migration was disabled." + ) + var.pack_raw_data_to_petsc(arr, sync=True) + if self._coord_var is var: + self._needs_migration = True + if hasattr(var, "_on_data_changed"): + var._on_data_changed() + + def _sync_before_assembly(self): + """Collective: bring PETSc-facing swarm state up to date for a solve. + + Called from ``Mesh.update_lvec()`` — the common entry point where + assembly pulls variable data — this performs, in order: + + 1. any DEFERRED particle migration (coordinates written through the + modern interface mark ``_needs_migration`` instead of migrating + per-write, which would deadlock under uneven writes — SWARM-03); + 2. an eager refresh of stale proxy mesh variables, which are + otherwise refreshed only via the lazy ``.sym`` accessor — solvers + read the proxy DM directly and previously consumed stale data + (issue #215 Bug 3 / issue #289). + + Rank-local flags are combined with a global MAX reduction so every + rank takes the same sequence of collective actions even when writes + were rank-uneven. Repeated calls are no-ops (flag-guarded). + """ + if self._migration_disabled: + # A migration-suppressed context is active (SPMD-consistent by + # construction); leave everything for its exit to handle. + return + + # A swarm with no particles anywhere has nothing to migrate or + # project — leave proxies stale (they refresh after population) + # rather than issue spurious starved-rank warnings pre-populate. + global_count = max(self.local_size, 0) + if uw.mpi.size > 1: + global_count = uw.mpi.comm.allreduce(global_count, op=uw.MPI.MAX) + if global_count == 0: + return + + if not self._deferred_migration_suspended: + needs_migration = bool(self._needs_migration) + if uw.mpi.size > 1: + needs_migration = ( + uw.mpi.comm.allreduce(int(needs_migration), op=uw.MPI.MAX) > 0 + ) + if needs_migration: + self.migrate() + + # Deterministic variable order: the refresh performs collective + # mesh-variable writes, so all ranks must visit variables in the + # same sequence. + for name in sorted(self._vars.keys()): + var = self._vars.get(name) + if var is None: + continue + has_proxy = ( + getattr(var, "_meshVar", None) is not None + or isinstance(var, IndexSwarmVariable) + ) + if not has_proxy: + continue + stale = bool(getattr(var, "_proxy_stale", False)) + if uw.mpi.size > 1: + stale = uw.mpi.comm.allreduce(int(stale), op=uw.MPI.MAX) > 0 + if stale: + var._proxy_stale = True # align ranks before the collective refresh + var._update_proxy_if_stale() + def _get_kdtree(self): """ Return a cached KDTree for the swarm particle coordinates. @@ -3257,7 +3364,9 @@ def migration_disabled(self): Use migration_control(disable=True) for new code. Context manager that temporarily disables particle migration for the swarm. - Migration is NOT called when exiting the context. + Migration is NOT called when exiting the context. Writes made inside + the context are packed to PETSc at exit (only the migration is + suppressed — data is never discarded). Usage: with swarm.migration_disabled(): @@ -3293,6 +3402,10 @@ def migration_control(self, disable=False): with swarm.migration_control(disable=True): # Operations where migration should never happen # No migration on exit + + In both modes, variable/coordinate writes made inside the context are + flushed to the underlying DMSwarm at exit — only the migration itself + is deferred (default) or skipped (``disable=True``). """ class _MigrationControlContext: @@ -3312,6 +3425,13 @@ def __enter__(self): def __exit__(self, exc_type, exc_val, exc_tb): self.swarm._migration_disabled = self.original_value + # Flush writes deferred while the flag was set — suppressing + # migration must not discard data (SWARM-04). Skipped only + # when an enclosing context still holds the flag (it flushes + # on its own exit). + if not self.swarm._migration_disabled: + self.swarm._flush_pending_petsc_sync() + # Perform deferred migration if not disabled and not still blocked if not self.disable and not self.swarm._migration_disabled: # Check if particle positions might have changed @@ -3458,6 +3578,11 @@ def migrate( if self._migration_disabled: return + # Deferred writes must reach the DMSwarm before we read coordinates + # from it below (no-op unless a migration-suppressed context left + # pending packs behind, SWARM-04). + self._flush_pending_petsc_sync() + # Informational: migration may move or drop particles. Bump # unconditionally; restore is not gated on this counter so a # conservative no-op bump is harmless. diff --git a/tests/parallel/test_0755_swarm_global_stats.py b/tests/parallel/test_0755_swarm_global_stats.py index 63e4f037..65d6159f 100644 --- a/tests/parallel/test_0755_swarm_global_stats.py +++ b/tests/parallel/test_0755_swarm_global_stats.py @@ -318,6 +318,11 @@ def test_swarm_migration_preserves_global_values(): min_before = value.global_min() size_before = value.global_size() + # Raw PETSc coordinates before the perturbation (regression guard below) + petsc_x_before = ( + swarm._particle_coordinates.unpack_raw_data_from_petsc(squeeze=False, sync=False)[:, 0] + ).copy() + # Perturb particle positions slightly (may trigger migration) with swarm.migration_disabled(): # Small perturbation that shouldn't move particles between domains @@ -326,6 +331,16 @@ def test_swarm_migration_preserves_global_values(): coords[:, 0] = coords[:, 0] + 0.001 swarm._particle_coordinates.data[:] = coords + # REGRESSION (SWARM-04, 2026-07 audit): the perturbation must actually + # reach the DMSwarm. Previously writes inside migration_disabled() were + # silently discarded and the assertions below passed vacuously. + petsc_x_after = ( + swarm._particle_coordinates.unpack_raw_data_from_petsc(squeeze=False, sync=False)[:, 0] + ) + assert np.allclose(petsc_x_after, petsc_x_before + 0.001), ( + "coordinate write inside migration_disabled() did not reach the DMSwarm" + ) + # Explicit migration swarm.migrate() From 4ce91afea5821272cd38fb984a523fffe6dd237b Mon Sep 17 00:00:00 2001 From: lmoresi Date: Sat, 4 Jul 2026 13:24:57 +1000 Subject: [PATCH 4/5] fix(swarm): guard proxy updates on starved ranks instead of crashing or zeroing (BF-06) Ranks holding <= 1 particles either hard-crashed or silently corrupted their proxy mesh variables (SWARM-07, 2026-07 audit): rbf_interpolate() returned silent zeros that _rbf_to_meshVar wrote straight into the proxy nodal values, and IndexSwarmVariable's proxy update reached unguarded KDTree construction on an empty coordinate array, raising IndexError inside a collective update - an MPI abort or hang. Starved ranks now leave their proxy nodal values untouched and warn (warnings are suppressed for a swarm that has never been populated, since proxied variables legitimately touch this path at creation time). The guards are written to be collective-safe: MeshVariable reads/writes perform collective ghost synchronisation, so every rank executes the same read-then-write sequence and only the values differ per rank. The IndexSwarmVariable update_type=0 projection is restructured to compute into a local buffer and issue exactly ONE MeshVariable write per level set per rank (serially bit-identical to the old formulation): the previous in-context version issued a data-dependent number of writes whose deferred collective syncs mismatched across ranks - the np4 starved-rank test deadlocked on it. IndexSwarmVariable also gains a real _update_proxy_if_stale() override (its proxies live in _meshLevelSetVars, so the base implementation was a no-op for it) and routes .sym through it; this is also what the solve-entry refresh uses. np4 regression test (all particles seeded on one rank, sentinel survives on starved ranks, no crash) lands with the parallel test file in the follow-up commit. Underworld development team with AI support from Claude Code --- src/underworld3/swarm.py | 133 +++++++++++++++++++++++++++++++++------ 1 file changed, 115 insertions(+), 18 deletions(-) diff --git a/src/underworld3/swarm.py b/src/underworld3/swarm.py index a252b508..7f5d4d59 100644 --- a/src/underworld3/swarm.py +++ b/src/underworld3/swarm.py @@ -1156,7 +1156,32 @@ def _rbf_to_meshVar(self, meshVar, nnn=None, verbose=False): new_coords = meshVar.coords - Values = self.rbf_interpolate(new_coords, verbose=verbose, nnn=nnn) + # Starved-rank guard (SWARM-07): with <= 1 local particles there is + # nothing meaningful to interpolate — rbf_interpolate would return + # silent zeros. Keep this rank's current proxy nodal values instead, + # and say so. NB: MeshVariable reads/writes perform collective ghost + # synchronisation, so EVERY rank must execute the same read-then-write + # sequence; only the values differ on starved ranks. + current_values = np.array(meshVar.data[...], copy=True) + + if self.swarm.local_size <= 1: + # Warn only once the swarm has ever held particles: proxied + # variables are created (and their .sym touched) before + # populate(), and that expected pre-population state should not + # generate noise. + if self.swarm._population_generation > 0: + import warnings + + warnings.warn( + f"Swarm proxy update: rank {uw.mpi.rank} holds " + f"{max(self.swarm.local_size, 0)} particles; proxy variable " + f"'{getattr(meshVar, 'clean_name', meshVar.name)}' left " + "unchanged on this rank.", + stacklevel=2, + ) + Values = current_values + else: + Values = self.rbf_interpolate(new_coords, verbose=verbose, nnn=nnn) meshVar.data[...] = Values[...] @@ -1482,8 +1507,21 @@ def rbf_interpolate(self, new_coords, verbose=False, nnn=None): raw_data = self.unpack_raw_data_from_petsc(squeeze=False, sync=False) data_size = raw_data.shape - # What to do if there are no particles + # What to do if there are no particles: never SILENTLY return zeros + # (SWARM-07) — a starved rank writing these into a proxy corrupts it. + # (Silent only for a swarm that has never been populated: proxied + # variables legitimately touch this path at creation time.) if data_size[0] <= 1: + if self.swarm._population_generation > 0: + import warnings + + warnings.warn( + f"rbf_interpolate: rank {uw.mpi.rank} holds only " + f"{data_size[0]} particles of swarm variable " + f"'{self.clean_name}' — returning zeros for this rank's " + "query points.", + stacklevel=2, + ) return np.zeros((new_coords.shape[0], data_size[1])) if nnn is None: @@ -2219,6 +2257,25 @@ def _on_data_changed(self): """ self._proxy_stale = True + def _update_proxy_if_stale(self): + """ + Refresh the level-set proxy variables if they are marked stale. + + Overrides the base implementation (which requires ``self._meshVar``; + an IndexSwarmVariable keeps its proxies in ``_meshLevelSetVars`` + instead). Used by the lazy ``.sym`` accessor and by the solve-entry + refresh (``Swarm._sync_before_assembly``). + """ + if not self._proxy_stale or self._updating_proxy: + return + + try: + self._updating_proxy = True + self._update_proxy_variables() + self._proxy_stale = False + finally: + self._updating_proxy = False + # This is the sympy vector interface - it's meaningless if these are not spatial arrays @property def sym(self): @@ -2229,9 +2286,7 @@ def sym(self): and only if the proxy variables are marked as stale due to data changes. This avoids expensive RBF interpolation during data assignment operations. """ - if self._proxy_stale: - self._update_proxy_variables() - self._proxy_stale = False + self._update_proxy_if_stale() return self._MaskArray @property @@ -2393,23 +2448,55 @@ def _update_proxy_variables(self): update_type 1: calculate the material property value on mesh_levelset nodes from the nearest N particles directly. """ - if self.update_type == 0: - # Use non-dimensional coordinates for internal level set KDTree - kd = self._meshLevelSetVars[0]._get_kdtree() + # Starved-rank guard (SWARM-07): with <= 1 local particles the + # nearest-neighbour machinery cannot run — KDTree construction on an + # empty coordinate array raises IndexError, aborting/hanging the + # collective proxy update — and there is nothing to project anyway. + # Leave this rank's level-set nodal values unchanged and warn. Every + # rank still enters the (collective) access contexts below so ranks + # holding particles can proceed. + starved = self.swarm.local_size <= 1 + if starved and self.swarm._population_generation > 0: + # (silent for a never-populated swarm — creation-time .sym + # touches are expected; see the equivalent guard in + # _rbf_to_meshVar) + import warnings - n_distance, n_indices = kd.query( - self.swarm._particle_coordinates.data, k=self.nnn, sqr_dists=False + warnings.warn( + f"IndexSwarmVariable proxy update: rank {uw.mpi.rank} holds " + f"{max(self.swarm.local_size, 0)} particles; level-set " + f"variables for '{self.clean_name}' left unchanged on this " + "rank.", + stacklevel=2, ) - kd_swarm = self.swarm._get_kdtree() - # n, d, b = kd_swarm.find_closest_point(self._meshLevelSetVars[0].coords) - d, n = kd_swarm.query(self._meshLevelSetVars[0].coords, k=1, sqr_dists=False) + + if self.update_type == 0: + if not starved: + # Use non-dimensional coordinates for internal level set KDTree + kd = self._meshLevelSetVars[0]._get_kdtree() + + n_distance, n_indices = kd.query( + self.swarm._particle_coordinates.data, k=self.nnn, sqr_dists=False + ) + kd_swarm = self.swarm._get_kdtree() + # n, d, b = kd_swarm.find_closest_point(self._meshLevelSetVars[0].coords) + d, n = kd_swarm.query(self._meshLevelSetVars[0].coords, k=1, sqr_dists=False) for ii in range(self.indices): meshVar = self._meshLevelSetVars[ii] - with self.swarm.mesh.access(meshVar), self.swarm.access(): - node_values = np.zeros((meshVar.data.shape[0],)) - w = np.zeros((meshVar.data.shape[0],)) + # MeshVariable reads/writes perform collective ghost + # synchronisation, so every rank must execute exactly the + # same read-then-write sequence per level set: compute into + # a LOCAL buffer first, then issue a single symmetric write. + # (The previous in-context formulation issued a + # data-dependent number of writes per rank — the deferred + # sync at access-exit then ran mismatched collectives.) + final_values = np.array(meshVar.data[:, 0], copy=True) + + if not starved: + node_values = np.zeros(final_values.shape[0]) + w = np.zeros(final_values.shape[0]) for i in range(self.swarm.local_size): tem = np.isclose(n_distance[i, :], n_distance[i, 0]) @@ -2425,7 +2512,7 @@ def _update_proxy_variables(self): w[ind] += 1.0 / (1.0e-16 + dist[j]) node_values[np.where(w > 0.0)[0]] /= w[np.where(w > 0.0)[0]] - meshVar.data[:, 0] = node_values[...] + final_values = node_values # if there is no material found, # impose a near-neighbour hunt for a valid material and set that one @@ -2433,8 +2520,18 @@ def _update_proxy_variables(self): if len(ind_w0) > 0: ind_ = np.where(self.data[n[ind_w0]] == ii)[0] if len(ind_) > 0: - meshVar.data[ind_w0[ind_]] = 1.0 + final_values[ind_w0[ind_]] = 1.0 + + # single symmetric write (starved ranks write back their + # current values, i.e. the proxy is left unchanged there) + meshVar.data[:, 0] = final_values elif self.update_type == 1: + # NOTE: this branch performs data-dependent MeshVariable writes + # outside any access context, which is not parallel-safe + # independently of the starved-rank issue (pre-existing). + # The guard here only prevents the empty-rank KDTree crash. + if starved: + return kd = uw.kdtree.KDTree(self.swarm._particle_coordinates.data) n_distance, n_indices = kd.query( self._meshLevelSetVars[0].coords, k=self.nnn, sqr_dists=False From d2c270067308a7de3e94349a7b498e07efc525e3 Mon Sep 17 00:00:00 2001 From: lmoresi Date: Sat, 4 Jul 2026 13:25:21 +1000 Subject: [PATCH 5/5] fix(swarm): deferred migration for coordinate writes and eager proxy refresh at solve entry (BF-07, BF-08) BF-07 (SWARM-03, 2026-07 audit): the Swarm docstring promised automatic migration, but coordinate writes through the modern interface (swarm.coords setter / swarm._particle_coordinates.data) only packed to PETSc - only the deprecated points path migrated, leaving particles on the wrong rank in parallel. Coordinate writes now mark swarm._needs_migration; the collective migrate() runs DEFERRED at the next collective point - migration-control context exit or solve entry - never per-write, which would deadlock when ranks write unevenly (maintainer-approved fix shape). advection() suspends the deferred migration around its substep loop (a migrate firing from its velocity evaluations would reorder particle rows between captured arrays) and runs its own migrate() at the end, which clears the suspension. BF-08 (LE-03 = SWARM-05, issue #215 Bug 3): proxy refresh fired only via the lazy .sym accessor, but solvers pull the proxy DM directly through mesh.update_lvec(), consuming stale data after a material.data write. Mesh.update_lvec() now runs Swarm._sync_before_assembly() for each registered swarm before its staleness check: a single eager, collective refresh at solve entry (the refresh itself is what sets _stale_lvec). Rank-local flags are combined with MAX reductions so uneven writes are safe; repeated calls are flag-guarded no-ops, preserving the test_0006_memory_leak constraint. The refresh calls inside petsc_interpolate pass swarm_sync=False - only a subset of ranks reaches that function (zero-interior-point ranks skip it), and the hook's reductions must not run on a subset; those sites already rely on the all-ranks update_lvec() in evaluate() for freshness. The resolved TODO(BUG) at the old swarm.py:1075 is removed. Together with the BF-02 invalidation fix this closes issue #289: the issue reproducer (regression test in test_0113) shows the material proxy tracking the advected particles, and a Projection re-solve consumes fresh particle data without touching .sym. Adds tests/parallel/test_0756_swarm_migration_semantics.py: np2 migration-suppressed write survival, np2 deferred migration at context exit and at solve entry, np4 starved-rank proxy behaviour. Underworld development team with AI support from Claude Code --- .../discretisation/discretisation_mesh.py | 31 ++- src/underworld3/function/_function.pyx | 10 +- src/underworld3/swarm.py | 37 ++- .../test_0756_swarm_migration_semantics.py | 229 ++++++++++++++++++ .../test_0113_swarm_stale_cache_regression.py | 34 +++ 5 files changed, 328 insertions(+), 13 deletions(-) create mode 100644 tests/parallel/test_0756_swarm_migration_semantics.py diff --git a/src/underworld3/discretisation/discretisation_mesh.py b/src/underworld3/discretisation/discretisation_mesh.py index ab246d0e..74c31c88 100644 --- a/src/underworld3/discretisation/discretisation_mesh.py +++ b/src/underworld3/discretisation/discretisation_mesh.py @@ -2930,11 +2930,38 @@ def project_to_slip_surface(self, coords, slip_spec=True, return project(numpy.asarray(coords, dtype=float)) @timing.routine_timer_decorator - def update_lvec(self): + def update_lvec(self, swarm_sync=True): """ This method creates and/or updates the mesh variable local vector. If the local vector is already up to date, this method will do nothing. - """ + + ``swarm_sync=False`` skips the swarm-dependency hook below. It MUST + be passed at call sites that only a SUBSET of ranks reach (e.g. the + refresh calls inside ``petsc_interpolate`` — ranks with zero interior + points skip that function entirely): the hook performs collective + reductions, so running it on a subset of ranks deadlocks. Such sites + rely on an earlier all-ranks ``update_lvec()`` for freshness, exactly + as they already do for the collective ``globalToLocal`` below. + """ + + # Swarm dependencies first (issues #215 Bug 3 / #289): run any + # deferred particle migration and refresh stale swarm-variable + # proxies BEFORE the staleness check below — the refresh writes + # proxy MeshVariable data, which is what sets _stale_lvec. Solvers + # read the proxy DM directly, so the lazy `.sym` refresh alone + # cannot guarantee freshness at assembly. Collective; flag-guarded + # no-op when nothing changed. Ordered deterministically so all + # ranks act on swarms in the same sequence. + if swarm_sync and not getattr(self, "_swarm_sync_in_progress", False): + self._swarm_sync_in_progress = True + try: + for swarm in sorted( + self._registered_swarms, + key=lambda s: s._instance_number, + ): + swarm._sync_before_assembly() + finally: + self._swarm_sync_in_progress = False if self._stale_lvec: if not self._lvec: diff --git a/src/underworld3/function/_function.pyx b/src/underworld3/function/_function.pyx index ec2aaeeb..06267d30 100644 --- a/src/underworld3/function/_function.pyx +++ b/src/underworld3/function/_function.pyx @@ -1279,7 +1279,11 @@ def petsc_interpolate( expr, if cached_info is not None: # CACHE HIT - Fast path. Evaluate using cached structure - mesh.update_lvec() # Ensure fresh values + # swarm_sync=False: petsc_interpolate is reached by only the + # ranks that hold interior points — the swarm-dependency hook + # does collective reductions and must not run on a subset. + # Freshness comes from the all-ranks update_lvec() in evaluate(). + mesh.update_lvec(swarm_sync=False) # Ensure fresh values cached_info.evaluate(mesh, outarray) else: @@ -1320,7 +1324,9 @@ def petsc_interpolate( expr, mesh._dminterpolation_cache.store_structure(coords, dofcount, cached_info) # Evaluate - mesh.update_lvec() + # swarm_sync=False: see the cache-hit branch above — only a + # subset of ranks reaches petsc_interpolate. + mesh.update_lvec(swarm_sync=False) cached_info.evaluate(mesh, outarray) # === END CACHING === diff --git a/src/underworld3/swarm.py b/src/underworld3/swarm.py index 7f5d4d59..080f5d26 100644 --- a/src/underworld3/swarm.py +++ b/src/underworld3/swarm.py @@ -509,6 +509,15 @@ def canonical_data_callback(array, change_context): # STEP 1: Sync to PETSc using established method with correct shape self.pack_raw_data_to_petsc(canonical_array, sync=True) + # Coordinate writes may strand particles on the wrong rank. Mark + # the swarm for DEFERRED migration — migrate() itself is + # collective and must not run from a per-write callback (ranks + # write unevenly → deadlock). The migration happens at the next + # collective point: migration-control context exit or solve entry + # (SWARM-03; the class docstring's automatic-migration promise). + if getattr(self.swarm, "_coord_var", None) is self: + self.swarm._needs_migration = True + # STEP 2: Handle variable-specific updates (like IndexSwarmVariable proxy marking) if hasattr(self, "_on_data_changed"): self._on_data_changed() @@ -1101,12 +1110,11 @@ def _update(self): return - # TODO(BUG): Stale proxy DM after swarm data write - # _update() marks proxy as stale, but _update_proxy_if_stale() (lazy - # re-interpolation) only fires when material.sym is accessed. Code that - # reads the proxy MeshVariable DM directly (e.g. a Projection solver - # evaluating its uw_function at quadrature points) gets stale data. - # See GitHub issue #215 (Bug 3). + # NB: laziness is safe because freshness is enforced at CONSUMPTION: + # the `.sym` accessors call this, and `Swarm._sync_before_assembly()` + # (invoked from Mesh.update_lvec at solve entry) eagerly refreshes any + # stale proxy before a solver reads the proxy DM directly + # (issue #215 Bug 3 / issue #289). def _update_proxy_if_stale(self): """ Actually update the proxy mesh variable if it's marked as stale. @@ -2633,10 +2641,13 @@ class Swarm(Stateful, uw_object): >>> temperature = swarm.add_variable("temperature", 1) >>> velocity = swarm.add_variable("velocity", mesh.dim) - Manual particle migration after coordinate updates: + Particle migration after coordinate updates: - Note: particle migration is still called automatically when we - `access` and update the particle_coordinates variables + Note: writing particle coordinates (via ``swarm._particle_coordinates.data`` + or the ``coords`` setter) marks the swarm for migration; the collective + ``migrate()`` itself is DEFERRED to the next collective point — a + ``migration_control()`` context exit, an explicit ``swarm.migrate()``, + or solve entry — never run per-write (uneven writes would deadlock). Note: `swarm.populate` uses a the mesh point locations for discontinuous interpolants to determine the particle locations. @@ -4929,6 +4940,13 @@ def advection( # del updated_current_coords # del v_at_Vpts + # Suspend the deferred (solve-entry) migration for the duration of + # the substep loop: the velocity evaluations below pass through + # Mesh.update_lvec(), and a migrate() firing there would reorder + # particle rows between the coordinate array and the velocity array + # captured from it. advection() performs its own migrate() at the end. + self._deferred_migration_suspended = True + # Wrap this whole thing in sub-stepping loop for step in range(0, substeps): @@ -5004,6 +5022,7 @@ def advection( self._particle_coordinates.data[...] = new_coords[...] ## End of substepping loop + self._deferred_migration_suspended = False ## Cycling of the swarm is a cheap and cheerful version of population control for particles. It turns the ## swarm into a streak-swarm where particles are Lagrangian for a number of steps and then reset to their diff --git a/tests/parallel/test_0756_swarm_migration_semantics.py b/tests/parallel/test_0756_swarm_migration_semantics.py new file mode 100644 index 00000000..f7a30fab --- /dev/null +++ b/tests/parallel/test_0756_swarm_migration_semantics.py @@ -0,0 +1,229 @@ +"""Parallel regression tests for swarm migration semantics (Track-0 audit). + +Run under MPI, e.g.:: + + mpirun -np 2 python -m pytest tests/parallel/test_0756_swarm_migration_semantics.py + mpirun -np 4 python -m pytest tests/parallel/test_0756_swarm_migration_semantics.py + +Covers: + +- SWARM-04 / BF-05: writes made inside ``migration_disabled()`` / + ``migration_control()`` were silently discarded (the PETSc sync callbacks + early-returned and nothing ever re-packed). Writes must now be flushed to + the DMSwarm at context exit; only the *migration* is suppressed/deferred. +- SWARM-03 / BF-07: particle coordinate writes through the modern interface + (``swarm._particle_coordinates.data``) never triggered migration, leaving + particles on the wrong rank. Migration is now deferred to the next + collective point (context exit or solve entry) — never per-write, which + would deadlock when ranks write unevenly. +- SWARM-07 / BF-06: ranks holding <= 1 particles either crashed + (``KDTree`` on an empty array inside ``IndexSwarmVariable``) or silently + zeroed their proxy values. Starved ranks must leave the proxy untouched + and warn. +""" + +import numpy as np +import pytest + +import underworld3 as uw +from underworld3.meshing import UnstructuredSimplexBox + + +def _box(cell=0.25): + return UnstructuredSimplexBox( + minCoords=(0.0, 0.0), maxCoords=(1.0, 1.0), cellSize=cell + ) + + +# -------------------------------------------------------------------------- +# SWARM-04 / BF-05 +# -------------------------------------------------------------------------- + +@pytest.mark.mpi(min_size=2) +def test_variable_writes_inside_migration_disabled_reach_petsc(): + mesh = _box() + swarm = uw.swarm.Swarm(mesh) + var = swarm.add_variable("val", 1) + swarm.populate(fill_param=3) + + with swarm.migration_disabled(): + var.data[:, 0] = 42.0 + + raw = var.unpack_raw_data_from_petsc(squeeze=False, sync=False) + assert raw.shape[0] == swarm.local_size + assert np.all(raw == 42.0), ( + "variable write inside migration_disabled() was discarded: PETSc " + f"field max is {raw.max() if raw.size else 'empty'}, expected 42.0" + ) + + del swarm + del mesh + + +@pytest.mark.mpi(min_size=2) +def test_coordinate_writes_inside_migration_control_survive_and_migrate(): + mesh = _box() + swarm = uw.swarm.Swarm(mesh) + var = swarm.add_variable("val", 1) + swarm.populate(fill_param=3) + + var.data[:, 0] = 1.0 + global_before = uw.mpi.comm.allreduce(int(swarm.local_size), uw.MPI.SUM) + + # contract every particle towards the domain centre: particles owned by + # the "outer" portions of each rank's partition change owner. + with swarm.migration_control(): + coords = swarm._particle_coordinates.data + swarm._particle_coordinates.data[...] = 0.5 + (coords - 0.5) * 0.4 + + # deferred migration ran at context exit: every particle must now be + # inside its owning rank's local domain and none may have been lost. + # NB: points_in_domain is COLLECTIVE (get_max_radius gathers) — every + # rank must call it, including ranks whose local swarm is empty. + local_coords = swarm._particle_coordinates.data + in_domain = mesh.points_in_domain(np.asarray(local_coords)) + assert np.all(in_domain), ( + f"rank {uw.mpi.rank}: {np.count_nonzero(~in_domain)} particles " + "left on the wrong rank after migration_control() exit" + ) + + global_after = uw.mpi.comm.allreduce(int(swarm.local_size), uw.MPI.SUM) + assert global_after == global_before, ( + f"particle count changed across deferred migration: " + f"{global_before} -> {global_after}" + ) + + # variable payload survives the migration + vmax = var.global_max() + vmin = var.global_min() + assert abs(float(vmax) - 1.0) < 1e-12 + assert abs(float(vmin) - 1.0) < 1e-12 + + del swarm + del mesh + + +# -------------------------------------------------------------------------- +# SWARM-03 / BF-07 +# -------------------------------------------------------------------------- + +@pytest.mark.mpi(min_size=2) +def test_bare_coordinate_write_migrates_at_solve_entry(): + """A bare coordinate write (no context manager) marks the swarm as + needing migration; the next collective point (solve entry) performs it.""" + mesh = _box() + swarm = uw.swarm.Swarm(mesh) + swarm.populate(fill_param=3) + + global_before = uw.mpi.comm.allreduce(int(swarm.local_size), uw.MPI.SUM) + + # push every particle into the left 40% of the box: on >= 2 ranks a + # whole rank's worth of particles changes owner. + coords = swarm._particle_coordinates.data + swarm._particle_coordinates.data[...] = np.column_stack( + [coords[:, 0] * 0.4 + 0.01, coords[:, 1]] + ) + + assert swarm._needs_migration, ( + "coordinate write through _particle_coordinates.data did not mark " + "the swarm for deferred migration" + ) + + # a solve is a collective point: its entry must run the deferred migration + proj_var = uw.discretisation.MeshVariable("pv0756", mesh, 1, degree=1) + proj = uw.systems.Projection(mesh, proj_var) + x, y = mesh.X + proj.uw_function = x + proj.petsc_options.delValue("ksp_monitor") + proj.solve() + + assert not swarm._needs_migration + + # NB: points_in_domain is COLLECTIVE (get_max_radius gathers) — every + # rank must call it; ranks emptied by the migration pass an empty array. + local_coords = swarm._particle_coordinates.data + in_domain = mesh.points_in_domain(np.asarray(local_coords)) + assert np.all(in_domain), ( + f"rank {uw.mpi.rank}: particles still on the wrong rank after " + "solve-entry deferred migration" + ) + + global_after = uw.mpi.comm.allreduce(int(swarm.local_size), uw.MPI.SUM) + assert global_after == global_before + + del swarm + del mesh + + +# -------------------------------------------------------------------------- +# SWARM-07 / BF-06 +# -------------------------------------------------------------------------- + +@pytest.mark.mpi(min_size=4) +def test_starved_ranks_leave_proxy_untouched_and_do_not_crash(): + """All particles are seeded into one corner of the box (one rank's + subdomain). Ranks with <= 1 local particles must neither crash in the + proxy update (KDTree on an empty array) nor overwrite their proxy nodal + values with silent zeros.""" + import warnings + + mesh = _box() + swarm = uw.swarm.Swarm(mesh) + scal = swarm.add_variable("s", 1, proxy_degree=1) + mat = uw.swarm.IndexSwarmVariable("M", swarm, indices=2, proxy_degree=1) + + # cluster near the origin corner — same array passed on every rank; + # add_particles_with_coordinates keeps only locally-owned points. + xs = np.linspace(0.02, 0.18, 6) + xx, yy = np.meshgrid(xs, xs) + cluster = np.column_stack([xx.ravel(), yy.ravel()]) + swarm.add_particles_with_coordinates(cluster) + + starved = swarm.local_size <= 1 + n_starved = uw.mpi.comm.allreduce(int(starved), uw.MPI.SUM) + assert n_starved >= 1, "test premise: at least one rank must be starved" + assert n_starved < uw.mpi.size, "test premise: one rank holds the cluster" + + scal.data[:, 0] = 7.0 + mat.data[...] = 0 + + # sentinel the proxy nodal values everywhere (collective writes) + SENTINEL = 123.0 + scal._meshVar.data[...] = SENTINEL + for lv in mat._meshLevelSetVars: + lv.data[...] = SENTINEL + + # force a refresh on every rank (collective; this is what the solve-entry + # hook does). Starved ranks warn and keep their values. + scal._proxy_stale = True + mat._proxy_stale = True + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always") + scal._update_proxy_if_stale() + mat._update_proxy_if_stale() # crashed with IndexError before BF-06 + + if starved: + assert any("particle" in str(w.message).lower() for w in caught), ( + f"rank {uw.mpi.rank}: starved rank did not warn about skipping " + "the proxy update" + ) + # owned nodes keep the sentinel (ghost nodes may take values from a + # populated neighbour, so require a majority rather than all). + frac = float(np.mean(np.isclose(np.asarray(scal._meshVar.data), SENTINEL))) + assert frac > 0.5, ( + f"rank {uw.mpi.rank}: starved-rank proxy was overwritten " + f"(only {frac:.0%} of nodes kept the sentinel — silent zeros?)" + ) + frac_m = float( + np.mean(np.isclose(np.asarray(mat._meshLevelSetVars[0].data), SENTINEL)) + ) + assert frac_m > 0.5, ( + f"rank {uw.mpi.rank}: starved-rank IndexSwarmVariable proxy was " + f"overwritten (only {frac_m:.0%} kept the sentinel)" + ) + else: + # the populated rank really did refresh + assert not np.allclose(np.asarray(scal._meshVar.data), SENTINEL) + + del swarm + del mesh diff --git a/tests/test_0113_swarm_stale_cache_regression.py b/tests/test_0113_swarm_stale_cache_regression.py index 2e521822..3f0c9d35 100644 --- a/tests/test_0113_swarm_stale_cache_regression.py +++ b/tests/test_0113_swarm_stale_cache_regression.py @@ -193,3 +193,37 @@ def proxy_cy(): del mesh +# -------------------------------------------------------------------------- +# SWARM-05 / BF-08 (issue #215 Bug 3): solve consumes a fresh proxy +# -------------------------------------------------------------------------- + +def test_projection_solve_consumes_fresh_proxy(mesh): + """Write material.data, run a Projection solve WITHOUT touching .sym, and + assert the solve consumed the new values (the proxy is refreshed eagerly + at solve entry rather than only via the lazy .sym accessor).""" + swarm = uw.swarm.Swarm(mesh=mesh) + var = swarm.add_variable("mat", 1, proxy_degree=1) + swarm.populate(fill_param=3) + + var.data[:, 0] = 1.0 + + proj_var = uw.discretisation.MeshVariable("pv0113", mesh, 1, degree=1) + proj = uw.systems.Projection(mesh, proj_var) + proj.uw_function = var.sym[0] # captured ONCE; refreshes the proxy now + proj.petsc_options.delValue("ksp_monitor") + + proj.solve() + assert abs(float(np.mean(proj_var.data)) - 1.0) < 0.05 + + # update the particle data; do NOT touch .sym before solving + var.data[:, 0] = 2.0 + proj.solve() + + mean_after = float(np.mean(proj_var.data)) + assert abs(mean_after - 2.0) < 0.05, ( + f"Projection solve consumed a stale proxy: mean {mean_after:.3f}, " + "expected ~2.0 (issue #215 Bug 3 / SWARM-05)" + ) + + del swarm + del mesh