diff --git a/src/underworld3/discretisation/discretisation_mesh.py b/src/underworld3/discretisation/discretisation_mesh.py index 2362bf2a..74c31c88 100644 --- a/src/underworld3/discretisation/discretisation_mesh.py +++ b/src/underworld3/discretisation/discretisation_mesh.py @@ -2345,6 +2345,29 @@ def nuke_coords_and_rebuild( flush=True, ) + # The navigation coords (used to build the kd-tree control points and + # for point location) were captured as a reference to the ORIGINAL + # coords in __init__ and are not updated by the DM rebuild above, so + # on an adapted/deformed mesh they still describe the old geometry. + # They MUST be refreshed BEFORE _build_kd_tree_index() runs: that + # rebuild indexes _nav_coords with the NEW nav-DM point range, and a + # stale (old-sized) array raises IndexError when the mesh grew — and + # silently mislocates points otherwise (issue #286). + if getattr(self, "_nav_dm", None) is None: + self._nav_coords = numpy.asarray( + self.dm.getCoordinatesLocal().array + ).reshape(-1, self.cdim) + else: + # manifold mesh: the nav clone carries its own (ghosted) coords; + # refresh them from the rebuilt main DM where possible. + try: + self._nav_dm.setCoordinatesLocal(self.dm.getCoordinatesLocal()) + self._nav_coords = numpy.asarray( + self._nav_dm.getCoordinatesLocal().array + ).reshape(-1, self.cdim) + except Exception: + pass + self._index = None self._build_kd_tree_index() @@ -2395,25 +2418,8 @@ def nuke_coords_and_rebuild( self.boundary_face_control_points_sign = None self._domain_radius_squared = float("inf") - # The navigation coords (used to build those control points and for - # point location) were captured as a reference to the ORIGINAL coords - # in __init__ and never refreshed here, so on a volume mesh they stayed - # at the undeformed geometry — the real reason a bulged-out region read - # as exterior. Re-point them at the current (deformed) DM coordinates. - if getattr(self, "_nav_dm", None) is None: - self._nav_coords = numpy.asarray( - self.dm.getCoordinatesLocal().array - ).reshape(-1, self.cdim) - else: - # manifold mesh: the nav clone carries its own (ghosted) coords; - # refresh them from the rebuilt main DM where possible. - try: - self._nav_dm.setCoordinatesLocal(self.dm.getCoordinatesLocal()) - self._nav_coords = numpy.asarray( - self._nav_dm.getCoordinatesLocal().array - ).reshape(-1, self.cdim) - except Exception: - pass + # NB: the _nav_coords refresh for the deformed/adapted geometry happens + # ABOVE, before _build_kd_tree_index() — see the issue #286 note there. # BUGFIX(#130): recompute the DOF coordinate cache for every # already-registered variable. Variables created before this @@ -2924,11 +2930,38 @@ def project_to_slip_surface(self, coords, slip_spec=True, return project(numpy.asarray(coords, dtype=float)) @timing.routine_timer_decorator - def update_lvec(self): + def update_lvec(self, swarm_sync=True): """ This method creates and/or updates the mesh variable local vector. If the local vector is already up to date, this method will do nothing. - """ + + ``swarm_sync=False`` skips the swarm-dependency hook below. It MUST + be passed at call sites that only a SUBSET of ranks reach (e.g. the + refresh calls inside ``petsc_interpolate`` — ranks with zero interior + points skip that function entirely): the hook performs collective + reductions, so running it on a subset of ranks deadlocks. Such sites + rely on an earlier all-ranks ``update_lvec()`` for freshness, exactly + as they already do for the collective ``globalToLocal`` below. + """ + + # Swarm dependencies first (issues #215 Bug 3 / #289): run any + # deferred particle migration and refresh stale swarm-variable + # proxies BEFORE the staleness check below — the refresh writes + # proxy MeshVariable data, which is what sets _stale_lvec. Solvers + # read the proxy DM directly, so the lazy `.sym` refresh alone + # cannot guarantee freshness at assembly. Collective; flag-guarded + # no-op when nothing changed. Ordered deterministically so all + # ranks act on swarms in the same sequence. + if swarm_sync and not getattr(self, "_swarm_sync_in_progress", False): + self._swarm_sync_in_progress = True + try: + for swarm in sorted( + self._registered_swarms, + key=lambda s: s._instance_number, + ): + swarm._sync_before_assembly() + finally: + self._swarm_sync_in_progress = False if self._stale_lvec: if not self._lvec: diff --git a/src/underworld3/function/_function.pyx b/src/underworld3/function/_function.pyx index ec2aaeeb..06267d30 100644 --- a/src/underworld3/function/_function.pyx +++ b/src/underworld3/function/_function.pyx @@ -1279,7 +1279,11 @@ def petsc_interpolate( expr, if cached_info is not None: # CACHE HIT - Fast path. Evaluate using cached structure - mesh.update_lvec() # Ensure fresh values + # swarm_sync=False: petsc_interpolate is reached by only the + # ranks that hold interior points — the swarm-dependency hook + # does collective reductions and must not run on a subset. + # Freshness comes from the all-ranks update_lvec() in evaluate(). + mesh.update_lvec(swarm_sync=False) # Ensure fresh values cached_info.evaluate(mesh, outarray) else: @@ -1320,7 +1324,9 @@ def petsc_interpolate( expr, mesh._dminterpolation_cache.store_structure(coords, dofcount, cached_info) # Evaluate - mesh.update_lvec() + # swarm_sync=False: see the cache-hit branch above — only a + # subset of ranks reaches petsc_interpolate. + mesh.update_lvec(swarm_sync=False) cached_info.evaluate(mesh, outarray) # === END CACHING === diff --git a/src/underworld3/swarm.py b/src/underworld3/swarm.py index 5b159944..080f5d26 100644 --- a/src/underworld3/swarm.py +++ b/src/underworld3/swarm.py @@ -425,8 +425,13 @@ def variable_update_callback(array, change_context): if not data_changed: return - # Skip updates during coordinate changes to prevent corruption - if hasattr(var.swarm, "_migration_disabled") and var.swarm._migration_disabled: + # While migration is suppressed, DEFER the PETSc pack rather than + # discarding the write: the DMSwarm layout may be mid-change, so + # packing now could corrupt it, but the user's values must survive. + # They are flushed by Swarm._flush_pending_petsc_sync() when the + # migration-control context exits (SWARM-04). + if getattr(var.swarm, "_migration_disabled", False): + var.swarm._pending_petsc_sync.add(var.clean_name) return # Persist changes to PETSc (like swarm callback updates coordinates) @@ -476,8 +481,16 @@ def canonical_data_callback(array, change_context): if not data_changed: return - # Skip updates during migration to prevent corruption - if hasattr(self.swarm, "_migration_disabled") and self.swarm._migration_disabled: + # While migration is suppressed, DEFER the PETSc pack rather than + # discarding the write: the DMSwarm layout may be mid-change, so + # packing now could corrupt it, but the user's values must survive + # in the canonical array. They are flushed to PETSc by + # Swarm._flush_pending_petsc_sync() when the migration-control + # context exits (SWARM-04: previously these writes were silently + # lost — nothing re-packed, and migrate()'s trailing invalidation + # destroyed the only copy). + if getattr(self.swarm, "_migration_disabled", False): + self.swarm._pending_petsc_sync.add(self.clean_name) return # Check for None array to prevent copy errors @@ -496,6 +509,15 @@ def canonical_data_callback(array, change_context): # STEP 1: Sync to PETSc using established method with correct shape self.pack_raw_data_to_petsc(canonical_array, sync=True) + # Coordinate writes may strand particles on the wrong rank. Mark + # the swarm for DEFERRED migration — migrate() itself is + # collective and must not run from a per-write callback (ranks + # write unevenly → deadlock). The migration happens at the next + # collective point: migration-control context exit or solve entry + # (SWARM-03; the class docstring's automatic-migration promise). + if getattr(self.swarm, "_coord_var", None) is self: + self.swarm._needs_migration = True + # STEP 2: Handle variable-specific updates (like IndexSwarmVariable proxy marking) if hasattr(self, "_on_data_changed"): self._on_data_changed() @@ -1088,12 +1110,11 @@ def _update(self): return - # TODO(BUG): Stale proxy DM after swarm data write - # _update() marks proxy as stale, but _update_proxy_if_stale() (lazy - # re-interpolation) only fires when material.sym is accessed. Code that - # reads the proxy MeshVariable DM directly (e.g. a Projection solver - # evaluating its uw_function at quadrature points) gets stale data. - # See GitHub issue #215 (Bug 3). + # NB: laziness is safe because freshness is enforced at CONSUMPTION: + # the `.sym` accessors call this, and `Swarm._sync_before_assembly()` + # (invoked from Mesh.update_lvec at solve entry) eagerly refreshes any + # stale proxy before a solver reads the proxy DM directly + # (issue #215 Bug 3 / issue #289). def _update_proxy_if_stale(self): """ Actually update the proxy mesh variable if it's marked as stale. @@ -1143,7 +1164,32 @@ def _rbf_to_meshVar(self, meshVar, nnn=None, verbose=False): new_coords = meshVar.coords - Values = self.rbf_interpolate(new_coords, verbose=verbose, nnn=nnn) + # Starved-rank guard (SWARM-07): with <= 1 local particles there is + # nothing meaningful to interpolate — rbf_interpolate would return + # silent zeros. Keep this rank's current proxy nodal values instead, + # and say so. NB: MeshVariable reads/writes perform collective ghost + # synchronisation, so EVERY rank must execute the same read-then-write + # sequence; only the values differ on starved ranks. + current_values = np.array(meshVar.data[...], copy=True) + + if self.swarm.local_size <= 1: + # Warn only once the swarm has ever held particles: proxied + # variables are created (and their .sym touched) before + # populate(), and that expected pre-population state should not + # generate noise. + if self.swarm._population_generation > 0: + import warnings + + warnings.warn( + f"Swarm proxy update: rank {uw.mpi.rank} holds " + f"{max(self.swarm.local_size, 0)} particles; proxy variable " + f"'{getattr(meshVar, 'clean_name', meshVar.name)}' left " + "unchanged on this rank.", + stacklevel=2, + ) + Values = current_values + else: + Values = self.rbf_interpolate(new_coords, verbose=verbose, nnn=nnn) meshVar.data[...] = Values[...] @@ -1469,8 +1515,21 @@ def rbf_interpolate(self, new_coords, verbose=False, nnn=None): raw_data = self.unpack_raw_data_from_petsc(squeeze=False, sync=False) data_size = raw_data.shape - # What to do if there are no particles + # What to do if there are no particles: never SILENTLY return zeros + # (SWARM-07) — a starved rank writing these into a proxy corrupts it. + # (Silent only for a swarm that has never been populated: proxied + # variables legitimately touch this path at creation time.) if data_size[0] <= 1: + if self.swarm._population_generation > 0: + import warnings + + warnings.warn( + f"rbf_interpolate: rank {uw.mpi.rank} holds only " + f"{data_size[0]} particles of swarm variable " + f"'{self.clean_name}' — returning zeros for this rank's " + "query points.", + stacklevel=2, + ) return np.zeros((new_coords.shape[0], data_size[1])) if nnn is None: @@ -2206,6 +2265,25 @@ def _on_data_changed(self): """ self._proxy_stale = True + def _update_proxy_if_stale(self): + """ + Refresh the level-set proxy variables if they are marked stale. + + Overrides the base implementation (which requires ``self._meshVar``; + an IndexSwarmVariable keeps its proxies in ``_meshLevelSetVars`` + instead). Used by the lazy ``.sym`` accessor and by the solve-entry + refresh (``Swarm._sync_before_assembly``). + """ + if not self._proxy_stale or self._updating_proxy: + return + + try: + self._updating_proxy = True + self._update_proxy_variables() + self._proxy_stale = False + finally: + self._updating_proxy = False + # This is the sympy vector interface - it's meaningless if these are not spatial arrays @property def sym(self): @@ -2216,9 +2294,7 @@ def sym(self): and only if the proxy variables are marked as stale due to data changes. This avoids expensive RBF interpolation during data assignment operations. """ - if self._proxy_stale: - self._update_proxy_variables() - self._proxy_stale = False + self._update_proxy_if_stale() return self._MaskArray @property @@ -2380,23 +2456,55 @@ def _update_proxy_variables(self): update_type 1: calculate the material property value on mesh_levelset nodes from the nearest N particles directly. """ - if self.update_type == 0: - # Use non-dimensional coordinates for internal level set KDTree - kd = self._meshLevelSetVars[0]._get_kdtree() + # Starved-rank guard (SWARM-07): with <= 1 local particles the + # nearest-neighbour machinery cannot run — KDTree construction on an + # empty coordinate array raises IndexError, aborting/hanging the + # collective proxy update — and there is nothing to project anyway. + # Leave this rank's level-set nodal values unchanged and warn. Every + # rank still enters the (collective) access contexts below so ranks + # holding particles can proceed. + starved = self.swarm.local_size <= 1 + if starved and self.swarm._population_generation > 0: + # (silent for a never-populated swarm — creation-time .sym + # touches are expected; see the equivalent guard in + # _rbf_to_meshVar) + import warnings - n_distance, n_indices = kd.query( - self.swarm._particle_coordinates.data, k=self.nnn, sqr_dists=False + warnings.warn( + f"IndexSwarmVariable proxy update: rank {uw.mpi.rank} holds " + f"{max(self.swarm.local_size, 0)} particles; level-set " + f"variables for '{self.clean_name}' left unchanged on this " + "rank.", + stacklevel=2, ) - kd_swarm = self.swarm._get_kdtree() - # n, d, b = kd_swarm.find_closest_point(self._meshLevelSetVars[0].coords) - d, n = kd_swarm.query(self._meshLevelSetVars[0].coords, k=1, sqr_dists=False) + + if self.update_type == 0: + if not starved: + # Use non-dimensional coordinates for internal level set KDTree + kd = self._meshLevelSetVars[0]._get_kdtree() + + n_distance, n_indices = kd.query( + self.swarm._particle_coordinates.data, k=self.nnn, sqr_dists=False + ) + kd_swarm = self.swarm._get_kdtree() + # n, d, b = kd_swarm.find_closest_point(self._meshLevelSetVars[0].coords) + d, n = kd_swarm.query(self._meshLevelSetVars[0].coords, k=1, sqr_dists=False) for ii in range(self.indices): meshVar = self._meshLevelSetVars[ii] - with self.swarm.mesh.access(meshVar), self.swarm.access(): - node_values = np.zeros((meshVar.data.shape[0],)) - w = np.zeros((meshVar.data.shape[0],)) + # MeshVariable reads/writes perform collective ghost + # synchronisation, so every rank must execute exactly the + # same read-then-write sequence per level set: compute into + # a LOCAL buffer first, then issue a single symmetric write. + # (The previous in-context formulation issued a + # data-dependent number of writes per rank — the deferred + # sync at access-exit then ran mismatched collectives.) + final_values = np.array(meshVar.data[:, 0], copy=True) + + if not starved: + node_values = np.zeros(final_values.shape[0]) + w = np.zeros(final_values.shape[0]) for i in range(self.swarm.local_size): tem = np.isclose(n_distance[i, :], n_distance[i, 0]) @@ -2412,7 +2520,7 @@ def _update_proxy_variables(self): w[ind] += 1.0 / (1.0e-16 + dist[j]) node_values[np.where(w > 0.0)[0]] /= w[np.where(w > 0.0)[0]] - meshVar.data[:, 0] = node_values[...] + final_values = node_values # if there is no material found, # impose a near-neighbour hunt for a valid material and set that one @@ -2420,8 +2528,18 @@ def _update_proxy_variables(self): if len(ind_w0) > 0: ind_ = np.where(self.data[n[ind_w0]] == ii)[0] if len(ind_) > 0: - meshVar.data[ind_w0[ind_]] = 1.0 + final_values[ind_w0[ind_]] = 1.0 + + # single symmetric write (starved ranks write back their + # current values, i.e. the proxy is left unchanged there) + meshVar.data[:, 0] = final_values elif self.update_type == 1: + # NOTE: this branch performs data-dependent MeshVariable writes + # outside any access context, which is not parallel-safe + # independently of the starved-rank issue (pre-existing). + # The guard here only prevents the empty-rank KDTree crash. + if starved: + return kd = uw.kdtree.KDTree(self.swarm._particle_coordinates.data) n_distance, n_indices = kd.query( self._meshLevelSetVars[0].coords, k=self.nnn, sqr_dists=False @@ -2523,10 +2641,13 @@ class Swarm(Stateful, uw_object): >>> temperature = swarm.add_variable("temperature", 1) >>> velocity = swarm.add_variable("velocity", mesh.dim) - Manual particle migration after coordinate updates: + Particle migration after coordinate updates: - Note: particle migration is still called automatically when we - `access` and update the particle_coordinates variables + Note: writing particle coordinates (via ``swarm._particle_coordinates.data`` + or the ``coords`` setter) marks the swarm for migration; the collective + ``migrate()`` itself is DEFERRED to the next collective point — a + ``migration_control()`` context exit, an explicit ``swarm.migrate()``, + or solve entry — never run per-write (uneven writes would deadlock). Note: `swarm.populate` uses a the mesh point locations for discontinuous interpolants to determine the particle locations. @@ -2653,6 +2774,31 @@ def __init__(self, mesh, recycle_rate=0, verbose=False, clip_to_mesh=True): self._nnmapdict = {} self._migration_disabled = False + # Deterministic (SPMD-consistent) creation index — used to order + # collective per-swarm operations identically on every rank. + self._instance_number = Swarm.instances + + # Names of variables whose canonical writes were made while + # _migration_disabled was set: the PETSc pack is deferred (not + # discarded) and flushed by _flush_pending_petsc_sync() at context + # exit (SWARM-04). + self._pending_petsc_sync = set() + + # Set when particle coordinates are written through the modern + # interface; the actual (collective) migrate() is deferred to the + # next collective point — migration-context exit or solve entry — + # never run per-write, which would deadlock when ranks write + # unevenly (SWARM-03). + self._needs_migration = False + + # SPMD-consistent guard: while True, _sync_before_assembly() must NOT + # run the deferred migration. Set by advection() around its substep + # loop — its velocity evaluations pass through Mesh.update_lvec(), and + # a migrate() there would reorder particle rows between the coordinate + # array and the velocity array captured from it. advection() runs its + # own migrate() at the end. + self._deferred_migration_suspended = False + super().__init__() # Register with the same model already captured in self._model_ref @@ -2725,6 +2871,100 @@ def _invalidate_canonical_data(self): # Invalidate cached spatial index self._kdtree = None + def _flush_pending_petsc_sync(self): + """Pack canonical arrays written while migration was suppressed. + + Writes made inside ``migration_control()`` / ``migration_disabled()`` + land in each variable's canonical array but their PETSc pack is + deferred (see the sync callbacks). This flushes them into the DMSwarm + fields. Called on migration-context exit and defensively at + :meth:`migrate` entry; a no-op when nothing is pending (SWARM-04). + """ + pending, self._pending_petsc_sync = self._pending_petsc_sync, set() + for name in pending: + var = self._vars.get(name, None) + if var is None: + continue + canonical = getattr(var, "_canonical_data", None) + if canonical is None: + # cache was invalidated after the write; PETSc already holds + # the authoritative data — nothing left to flush. + continue + arr = np.asarray(canonical).reshape(-1, var.num_components) + if arr.shape[0] != max(self.dm.getLocalSize(), 0): + raise RuntimeError( + f"Cannot flush deferred writes for swarm variable " + f"'{name}': cached array has {arr.shape[0]} rows but the " + f"DMSwarm holds {self.dm.getLocalSize()} particles. The " + "particle layout changed while migration was disabled." + ) + var.pack_raw_data_to_petsc(arr, sync=True) + if self._coord_var is var: + self._needs_migration = True + if hasattr(var, "_on_data_changed"): + var._on_data_changed() + + def _sync_before_assembly(self): + """Collective: bring PETSc-facing swarm state up to date for a solve. + + Called from ``Mesh.update_lvec()`` — the common entry point where + assembly pulls variable data — this performs, in order: + + 1. any DEFERRED particle migration (coordinates written through the + modern interface mark ``_needs_migration`` instead of migrating + per-write, which would deadlock under uneven writes — SWARM-03); + 2. an eager refresh of stale proxy mesh variables, which are + otherwise refreshed only via the lazy ``.sym`` accessor — solvers + read the proxy DM directly and previously consumed stale data + (issue #215 Bug 3 / issue #289). + + Rank-local flags are combined with a global MAX reduction so every + rank takes the same sequence of collective actions even when writes + were rank-uneven. Repeated calls are no-ops (flag-guarded). + """ + if self._migration_disabled: + # A migration-suppressed context is active (SPMD-consistent by + # construction); leave everything for its exit to handle. + return + + # A swarm with no particles anywhere has nothing to migrate or + # project — leave proxies stale (they refresh after population) + # rather than issue spurious starved-rank warnings pre-populate. + global_count = max(self.local_size, 0) + if uw.mpi.size > 1: + global_count = uw.mpi.comm.allreduce(global_count, op=uw.MPI.MAX) + if global_count == 0: + return + + if not self._deferred_migration_suspended: + needs_migration = bool(self._needs_migration) + if uw.mpi.size > 1: + needs_migration = ( + uw.mpi.comm.allreduce(int(needs_migration), op=uw.MPI.MAX) > 0 + ) + if needs_migration: + self.migrate() + + # Deterministic variable order: the refresh performs collective + # mesh-variable writes, so all ranks must visit variables in the + # same sequence. + for name in sorted(self._vars.keys()): + var = self._vars.get(name) + if var is None: + continue + has_proxy = ( + getattr(var, "_meshVar", None) is not None + or isinstance(var, IndexSwarmVariable) + ) + if not has_proxy: + continue + stale = bool(getattr(var, "_proxy_stale", False)) + if uw.mpi.size > 1: + stale = uw.mpi.comm.allreduce(int(stale), op=uw.MPI.MAX) > 0 + if stale: + var._proxy_stale = True # align ranks before the collective refresh + var._update_proxy_if_stale() + def _get_kdtree(self): """ Return a cached KDTree for the swarm particle coordinates. @@ -3232,7 +3472,9 @@ def migration_disabled(self): Use migration_control(disable=True) for new code. Context manager that temporarily disables particle migration for the swarm. - Migration is NOT called when exiting the context. + Migration is NOT called when exiting the context. Writes made inside + the context are packed to PETSc at exit (only the migration is + suppressed — data is never discarded). Usage: with swarm.migration_disabled(): @@ -3268,6 +3510,10 @@ def migration_control(self, disable=False): with swarm.migration_control(disable=True): # Operations where migration should never happen # No migration on exit + + In both modes, variable/coordinate writes made inside the context are + flushed to the underlying DMSwarm at exit — only the migration itself + is deferred (default) or skipped (``disable=True``). """ class _MigrationControlContext: @@ -3287,6 +3533,13 @@ def __enter__(self): def __exit__(self, exc_type, exc_val, exc_tb): self.swarm._migration_disabled = self.original_value + # Flush writes deferred while the flag was set — suppressing + # migration must not discard data (SWARM-04). Skipped only + # when an enclosing context still holds the flag (it flushes + # on its own exit). + if not self.swarm._migration_disabled: + self.swarm._flush_pending_petsc_sync() + # Perform deferred migration if not disabled and not still blocked if not self.disable and not self.swarm._migration_disabled: # Check if particle positions might have changed @@ -3394,6 +3647,13 @@ def populate( offset = swarm_orig_size * i self._remeshed.data[offset::, 0] = i + # Invalidate cached data — the swarm was just given its particles. + # Any canonical `.data` array created before populate() (legitimate: + # variables must be created first) is sized for the empty swarm and + # would otherwise hide every particle from reads and corrupt writes + # (SWARM-17, same stale-cache class as #216). + self._invalidate_canonical_data() + # Informational: particle population just changed. self._population_generation += 1 @@ -3426,6 +3686,11 @@ def migrate( if self._migration_disabled: return + # Deferred writes must reach the DMSwarm before we read coordinates + # from it below (no-op unless a migration-suppressed context left + # pending packs behind, SWARM-04). + self._flush_pending_petsc_sync() + # Informational: migration may move or drop particles. Bump # unconditionally; restore is not gated on this counter so a # conservative no-op bump is harmless. @@ -3465,6 +3730,19 @@ def migrate( # Unlikely, but we should check this uw.mpi.barrier() if global_unclaimed_points == 0: + # No particle needs to change rank, but we were still called + # because coordinates and/or the population may have changed + # (in-place coordinate writes, addNPoints, serial advection). + # The cached canonical arrays, the particle kd-tree, and the + # proxy variables are stale regardless of whether anything + # moved between ranks. Skipping this invalidation froze proxy + # mesh variables after serial advection (issue #289) and left + # wrong-sized `.data` caches after particle addition + # (SWARM-01/SWARM-02, 2026-07 audit). + self._invalidate_canonical_data() + self._needs_migration = False + # any explicit/terminal migrate ends an advection suspension + self._deferred_migration_suspended = False return # Migrate particles between processes (if there are more than one of them) @@ -3539,6 +3817,9 @@ def migrate( # Any particle movement (send, receive, or balanced swap) makes # cached arrays stale — both size and values may have changed. self._invalidate_canonical_data() + self._needs_migration = False + # any explicit/terminal migrate ends an advection suspension + self._deferred_migration_suspended = False return @@ -3742,6 +4023,12 @@ def add_particles_with_global_coordinates( # self._Xorig.data[...] = globalCoordinatesArray self._remeshed.data[...] = 0 + # Invalidate cached data — the particle count changed via addNPoints + # (mirrors add_particles_with_coordinates). This must not be left to + # migrate(): with migrate=False nothing else invalidates, and every + # cached `.data` array keeps the old particle count (SWARM-01). + self._invalidate_canonical_data() + if migrate: self.migrate(remove_sent_points=True, delete_lost_points=delete_lost_points) @@ -4328,7 +4615,14 @@ def apply_snapshot_payload(self, payload: dict) -> None: f"{var_clean_name!r} data shape mismatch — current " f"{current.shape} vs snapshot {saved.shape}" ) - current[...] = saved + # Write THROUGH the canonical array (not the detached + # np.asarray view above) so the PETSc pack callback fires. + # Writing into the view mutated only the cached copy: the + # DMSwarm field kept its post-realloc garbage, and the first + # cache invalidation after restore (e.g. migrate() at the end + # of advection) silently replaced the restored values with + # that garbage (exposed by the SWARM-01 invalidation fix). + var.data[...] = saved def _legacy_access(self, *writeable_vars: SwarmVariable): """ @@ -4646,6 +4940,13 @@ def advection( # del updated_current_coords # del v_at_Vpts + # Suspend the deferred (solve-entry) migration for the duration of + # the substep loop: the velocity evaluations below pass through + # Mesh.update_lvec(), and a migrate() firing there would reorder + # particle rows between the coordinate array and the velocity array + # captured from it. advection() performs its own migrate() at the end. + self._deferred_migration_suspended = True + # Wrap this whole thing in sub-stepping loop for step in range(0, substeps): @@ -4721,6 +5022,7 @@ def advection( self._particle_coordinates.data[...] = new_coords[...] ## End of substepping loop + self._deferred_migration_suspended = False ## Cycling of the swarm is a cheap and cheerful version of population control for particles. It turns the ## swarm into a streak-swarm where particles are Lagrangian for a number of steps and then reset to their diff --git a/tests/parallel/test_0755_swarm_global_stats.py b/tests/parallel/test_0755_swarm_global_stats.py index 63e4f037..65d6159f 100644 --- a/tests/parallel/test_0755_swarm_global_stats.py +++ b/tests/parallel/test_0755_swarm_global_stats.py @@ -318,6 +318,11 @@ def test_swarm_migration_preserves_global_values(): min_before = value.global_min() size_before = value.global_size() + # Raw PETSc coordinates before the perturbation (regression guard below) + petsc_x_before = ( + swarm._particle_coordinates.unpack_raw_data_from_petsc(squeeze=False, sync=False)[:, 0] + ).copy() + # Perturb particle positions slightly (may trigger migration) with swarm.migration_disabled(): # Small perturbation that shouldn't move particles between domains @@ -326,6 +331,16 @@ def test_swarm_migration_preserves_global_values(): coords[:, 0] = coords[:, 0] + 0.001 swarm._particle_coordinates.data[:] = coords + # REGRESSION (SWARM-04, 2026-07 audit): the perturbation must actually + # reach the DMSwarm. Previously writes inside migration_disabled() were + # silently discarded and the assertions below passed vacuously. + petsc_x_after = ( + swarm._particle_coordinates.unpack_raw_data_from_petsc(squeeze=False, sync=False)[:, 0] + ) + assert np.allclose(petsc_x_after, petsc_x_before + 0.001), ( + "coordinate write inside migration_disabled() did not reach the DMSwarm" + ) + # Explicit migration swarm.migrate() diff --git a/tests/parallel/test_0756_swarm_migration_semantics.py b/tests/parallel/test_0756_swarm_migration_semantics.py new file mode 100644 index 00000000..f7a30fab --- /dev/null +++ b/tests/parallel/test_0756_swarm_migration_semantics.py @@ -0,0 +1,229 @@ +"""Parallel regression tests for swarm migration semantics (Track-0 audit). + +Run under MPI, e.g.:: + + mpirun -np 2 python -m pytest tests/parallel/test_0756_swarm_migration_semantics.py + mpirun -np 4 python -m pytest tests/parallel/test_0756_swarm_migration_semantics.py + +Covers: + +- SWARM-04 / BF-05: writes made inside ``migration_disabled()`` / + ``migration_control()`` were silently discarded (the PETSc sync callbacks + early-returned and nothing ever re-packed). Writes must now be flushed to + the DMSwarm at context exit; only the *migration* is suppressed/deferred. +- SWARM-03 / BF-07: particle coordinate writes through the modern interface + (``swarm._particle_coordinates.data``) never triggered migration, leaving + particles on the wrong rank. Migration is now deferred to the next + collective point (context exit or solve entry) — never per-write, which + would deadlock when ranks write unevenly. +- SWARM-07 / BF-06: ranks holding <= 1 particles either crashed + (``KDTree`` on an empty array inside ``IndexSwarmVariable``) or silently + zeroed their proxy values. Starved ranks must leave the proxy untouched + and warn. +""" + +import numpy as np +import pytest + +import underworld3 as uw +from underworld3.meshing import UnstructuredSimplexBox + + +def _box(cell=0.25): + return UnstructuredSimplexBox( + minCoords=(0.0, 0.0), maxCoords=(1.0, 1.0), cellSize=cell + ) + + +# -------------------------------------------------------------------------- +# SWARM-04 / BF-05 +# -------------------------------------------------------------------------- + +@pytest.mark.mpi(min_size=2) +def test_variable_writes_inside_migration_disabled_reach_petsc(): + mesh = _box() + swarm = uw.swarm.Swarm(mesh) + var = swarm.add_variable("val", 1) + swarm.populate(fill_param=3) + + with swarm.migration_disabled(): + var.data[:, 0] = 42.0 + + raw = var.unpack_raw_data_from_petsc(squeeze=False, sync=False) + assert raw.shape[0] == swarm.local_size + assert np.all(raw == 42.0), ( + "variable write inside migration_disabled() was discarded: PETSc " + f"field max is {raw.max() if raw.size else 'empty'}, expected 42.0" + ) + + del swarm + del mesh + + +@pytest.mark.mpi(min_size=2) +def test_coordinate_writes_inside_migration_control_survive_and_migrate(): + mesh = _box() + swarm = uw.swarm.Swarm(mesh) + var = swarm.add_variable("val", 1) + swarm.populate(fill_param=3) + + var.data[:, 0] = 1.0 + global_before = uw.mpi.comm.allreduce(int(swarm.local_size), uw.MPI.SUM) + + # contract every particle towards the domain centre: particles owned by + # the "outer" portions of each rank's partition change owner. + with swarm.migration_control(): + coords = swarm._particle_coordinates.data + swarm._particle_coordinates.data[...] = 0.5 + (coords - 0.5) * 0.4 + + # deferred migration ran at context exit: every particle must now be + # inside its owning rank's local domain and none may have been lost. + # NB: points_in_domain is COLLECTIVE (get_max_radius gathers) — every + # rank must call it, including ranks whose local swarm is empty. + local_coords = swarm._particle_coordinates.data + in_domain = mesh.points_in_domain(np.asarray(local_coords)) + assert np.all(in_domain), ( + f"rank {uw.mpi.rank}: {np.count_nonzero(~in_domain)} particles " + "left on the wrong rank after migration_control() exit" + ) + + global_after = uw.mpi.comm.allreduce(int(swarm.local_size), uw.MPI.SUM) + assert global_after == global_before, ( + f"particle count changed across deferred migration: " + f"{global_before} -> {global_after}" + ) + + # variable payload survives the migration + vmax = var.global_max() + vmin = var.global_min() + assert abs(float(vmax) - 1.0) < 1e-12 + assert abs(float(vmin) - 1.0) < 1e-12 + + del swarm + del mesh + + +# -------------------------------------------------------------------------- +# SWARM-03 / BF-07 +# -------------------------------------------------------------------------- + +@pytest.mark.mpi(min_size=2) +def test_bare_coordinate_write_migrates_at_solve_entry(): + """A bare coordinate write (no context manager) marks the swarm as + needing migration; the next collective point (solve entry) performs it.""" + mesh = _box() + swarm = uw.swarm.Swarm(mesh) + swarm.populate(fill_param=3) + + global_before = uw.mpi.comm.allreduce(int(swarm.local_size), uw.MPI.SUM) + + # push every particle into the left 40% of the box: on >= 2 ranks a + # whole rank's worth of particles changes owner. + coords = swarm._particle_coordinates.data + swarm._particle_coordinates.data[...] = np.column_stack( + [coords[:, 0] * 0.4 + 0.01, coords[:, 1]] + ) + + assert swarm._needs_migration, ( + "coordinate write through _particle_coordinates.data did not mark " + "the swarm for deferred migration" + ) + + # a solve is a collective point: its entry must run the deferred migration + proj_var = uw.discretisation.MeshVariable("pv0756", mesh, 1, degree=1) + proj = uw.systems.Projection(mesh, proj_var) + x, y = mesh.X + proj.uw_function = x + proj.petsc_options.delValue("ksp_monitor") + proj.solve() + + assert not swarm._needs_migration + + # NB: points_in_domain is COLLECTIVE (get_max_radius gathers) — every + # rank must call it; ranks emptied by the migration pass an empty array. + local_coords = swarm._particle_coordinates.data + in_domain = mesh.points_in_domain(np.asarray(local_coords)) + assert np.all(in_domain), ( + f"rank {uw.mpi.rank}: particles still on the wrong rank after " + "solve-entry deferred migration" + ) + + global_after = uw.mpi.comm.allreduce(int(swarm.local_size), uw.MPI.SUM) + assert global_after == global_before + + del swarm + del mesh + + +# -------------------------------------------------------------------------- +# SWARM-07 / BF-06 +# -------------------------------------------------------------------------- + +@pytest.mark.mpi(min_size=4) +def test_starved_ranks_leave_proxy_untouched_and_do_not_crash(): + """All particles are seeded into one corner of the box (one rank's + subdomain). Ranks with <= 1 local particles must neither crash in the + proxy update (KDTree on an empty array) nor overwrite their proxy nodal + values with silent zeros.""" + import warnings + + mesh = _box() + swarm = uw.swarm.Swarm(mesh) + scal = swarm.add_variable("s", 1, proxy_degree=1) + mat = uw.swarm.IndexSwarmVariable("M", swarm, indices=2, proxy_degree=1) + + # cluster near the origin corner — same array passed on every rank; + # add_particles_with_coordinates keeps only locally-owned points. + xs = np.linspace(0.02, 0.18, 6) + xx, yy = np.meshgrid(xs, xs) + cluster = np.column_stack([xx.ravel(), yy.ravel()]) + swarm.add_particles_with_coordinates(cluster) + + starved = swarm.local_size <= 1 + n_starved = uw.mpi.comm.allreduce(int(starved), uw.MPI.SUM) + assert n_starved >= 1, "test premise: at least one rank must be starved" + assert n_starved < uw.mpi.size, "test premise: one rank holds the cluster" + + scal.data[:, 0] = 7.0 + mat.data[...] = 0 + + # sentinel the proxy nodal values everywhere (collective writes) + SENTINEL = 123.0 + scal._meshVar.data[...] = SENTINEL + for lv in mat._meshLevelSetVars: + lv.data[...] = SENTINEL + + # force a refresh on every rank (collective; this is what the solve-entry + # hook does). Starved ranks warn and keep their values. + scal._proxy_stale = True + mat._proxy_stale = True + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always") + scal._update_proxy_if_stale() + mat._update_proxy_if_stale() # crashed with IndexError before BF-06 + + if starved: + assert any("particle" in str(w.message).lower() for w in caught), ( + f"rank {uw.mpi.rank}: starved rank did not warn about skipping " + "the proxy update" + ) + # owned nodes keep the sentinel (ghost nodes may take values from a + # populated neighbour, so require a majority rather than all). + frac = float(np.mean(np.isclose(np.asarray(scal._meshVar.data), SENTINEL))) + assert frac > 0.5, ( + f"rank {uw.mpi.rank}: starved-rank proxy was overwritten " + f"(only {frac:.0%} of nodes kept the sentinel — silent zeros?)" + ) + frac_m = float( + np.mean(np.isclose(np.asarray(mat._meshLevelSetVars[0].data), SENTINEL)) + ) + assert frac_m > 0.5, ( + f"rank {uw.mpi.rank}: starved-rank IndexSwarmVariable proxy was " + f"overwritten (only {frac_m:.0%} kept the sentinel)" + ) + else: + # the populated rank really did refresh + assert not np.allclose(np.asarray(scal._meshVar.data), SENTINEL) + + del swarm + del mesh diff --git a/tests/test_0113_swarm_stale_cache_regression.py b/tests/test_0113_swarm_stale_cache_regression.py new file mode 100644 index 00000000..3f0c9d35 --- /dev/null +++ b/tests/test_0113_swarm_stale_cache_regression.py @@ -0,0 +1,229 @@ +"""Regression tests for swarm stale-cache and stale-proxy bugs. + +Covers the Track-0 quality-audit findings (docs/reviews/2026-07): + +- SWARM-01 / BF-02: ``Swarm.migrate()`` early-returned when no particle needed + to change rank *without* invalidating the cached canonical ``.data`` arrays, + so ``add_particles_with_global_coordinates`` (whose only invalidation route + was migrate) left every variable's cached ``.data`` at the old particle + count. +- SWARM-02 / BF-02: the same early return left the cached particle kd-tree + built over a mutated coordinate buffer, silently corrupting + ``rbf_interpolate`` and every proxy update. +- SWARM-17 / BF-02: ``populate()`` never invalidated caches created before the + swarm was populated. +- GitHub issue #289: ``swarm.advection()`` left proxy mesh variables frozen at + the pre-advection particle positions (the no-move migrate early return never + marked them stale), so time-stepped models silently used stationary + material fields. +- SWARM-05 / LE-03 / BF-08 (issue #215 Bug 3): solvers read the proxy + MeshVariable's DM directly, bypassing the lazy ``.sym`` refresh, so a solve + after a ``material.data`` write consumed stale proxy values. +""" + +import numpy as np +import pytest + +import underworld3 as uw +from underworld3.meshing import UnstructuredSimplexBox + +pytestmark = [pytest.mark.level_1, pytest.mark.tier_a] + + +@pytest.fixture +def mesh(): + return UnstructuredSimplexBox( + minCoords=(0.0, 0.0), + maxCoords=(1.0, 1.0), + cellSize=1.0 / 8.0, + ) + + +# -------------------------------------------------------------------------- +# SWARM-01: cache size after add_particles_with_global_coordinates +# -------------------------------------------------------------------------- + +@pytest.mark.parametrize("migrate", [True, False]) +def test_add_particles_global_coordinates_invalidates_cache(mesh, migrate): + swarm = uw.swarm.Swarm(mesh=mesh) + var = swarm.add_variable("val", 1) + swarm.populate(fill_param=2) + + _ = var.data.shape # populate the canonical cache at the old size + + new_pts = np.array([[0.51, 0.51], [0.52, 0.52], [0.11, 0.87]]) + swarm.add_particles_with_global_coordinates(new_pts, migrate=migrate) + + dm_size = swarm.dm.getLocalSize() + assert var.data.shape[0] == dm_size, ( + f"cached .data rows ({var.data.shape[0]}) out of sync with the DMSwarm " + f"local size ({dm_size}) after add_particles_with_global_coordinates" + ) + assert ( + swarm._particle_coordinates.data.shape[0] == dm_size + ), "coordinate cache out of sync after particle addition" + + del swarm + del mesh + + +# -------------------------------------------------------------------------- +# SWARM-17: cache created before populate() +# -------------------------------------------------------------------------- + +def test_populate_invalidates_pre_existing_cache(mesh): + swarm = uw.swarm.Swarm(mesh=mesh) + var = swarm.add_variable("val", 1) + + _ = var.data.shape # create the (0-row) canonical cache before populate + + swarm.populate(fill_param=2) + + dm_size = swarm.dm.getLocalSize() + assert dm_size > 0 + assert var.data.shape[0] == dm_size, ( + f"cached .data rows ({var.data.shape[0]}) out of sync with the DMSwarm " + f"local size ({dm_size}) after populate()" + ) + + del swarm + del mesh + + +# -------------------------------------------------------------------------- +# SWARM-02: kd-tree refreshed after an in-domain coordinate move +# -------------------------------------------------------------------------- + +def test_rbf_interpolate_uses_current_positions_after_no_move_migrate(mesh): + swarm = uw.swarm.Swarm(mesh=mesh) + var = swarm.add_variable("val", 1) + swarm.populate(fill_param=3) + + # tag every particle with its launch x-coordinate + var.data[:, 0] = swarm._particle_coordinates.data[:, 0] + + probe = np.array([[0.15, 0.5], [0.85, 0.5], [0.25, 0.3], [0.75, 0.7]]) + _ = var.rbf_interpolate(probe) # builds and caches the particle kd-tree + + # mirror every particle in-domain: x -> 1 - x. The cached KDTree holds a + # NO-COPY view of the coordinate buffer, so after this in-place mutation + # its index topology (built at the old positions) is inconsistent with + # its stored points — queries return garbage, not merely frozen values. + coords = swarm._particle_coordinates.data + swarm._particle_coordinates.data[...] = np.column_stack( + [1.0 - coords[:, 0], coords[:, 1]] + ) + swarm.migrate() # serial / no-rank-change: previously hit the early return + + # the invalidation contract: migrate() must drop the cached tree + assert swarm._kdtree is None, ( + "migrate() left the cached particle kd-tree in place after an " + "in-place coordinate mutation (SWARM-02 early-return hole)" + ) + + values = var.rbf_interpolate(probe) + + # with a FRESH tree, the particle now found at probe_x carries its launch + # coordinate 1 - probe_x. + expected = 1.0 - probe[:, 0] + err = np.abs(values[:, 0] - expected).max() + assert err < 0.1, ( + f"rbf_interpolate is using a stale/poisoned kd-tree after migrate() " + f"(max error {err:.3f} against fresh-tree expectation)" + ) + + del swarm + del mesh + + +# -------------------------------------------------------------------------- +# issue #289: proxy tracks the particles through advection +# -------------------------------------------------------------------------- + +def test_issue289_proxy_tracks_particles_through_advection(): + """Reproducer from GitHub issue #289 (reduced): a blob of material is + advected upward with a uniform velocity; the lazily-refreshed proxy must + track the particle centroid, not stay frozen at the launch position.""" + mesh = UnstructuredSimplexBox( + minCoords=(0.0, 0.0), maxCoords=(1.0, 0.25), cellSize=1.0 / 24.0, qdegree=3 + ) + v = uw.discretisation.MeshVariable("V", mesh, vtype=uw.VarType.VECTOR, degree=2) + swarm = uw.swarm.Swarm(mesh=mesh) + mat = uw.swarm.IndexSwarmVariable( + "M", swarm, indices=2, proxy_degree=1, proxy_continuous=True + ) + swarm.populate(fill_param=3) + + mat.data[...] = 0 + pc = swarm._particle_coordinates.data + blob = (pc[:, 0] - 0.5) ** 2 + (pc[:, 1] - 0.107) ** 2 < 0.03**2 + assert blob.sum() > 0, "test premise: blob must contain particles" + mat.data[blob, 0] = 1 + + v.data[:, 1] = 0.01 # uniform upward velocity + + def particle_cy(): + m = mat.data[:, 0] > 0.5 + return swarm._particle_coordinates.data[m, 1].mean() + + def proxy_cy(): + pv = np.asarray(uw.function.evaluate(mat.sym[1], v.coords)).reshape(-1) + w = np.clip(pv, 0.0, None) + return float((w * np.asarray(v.coords)[:, 1]).sum() / w.sum()) + + cy_particles_before = particle_cy() + cy_proxy_before = proxy_cy() + assert abs(cy_proxy_before - cy_particles_before) < 0.01 + + swarm.advection(v.sym, 2.0, order=2, corrector=True) # blob moves up 0.02 + + cy_particles_after = particle_cy() + moved = cy_particles_after - cy_particles_before + assert moved > 0.015, "test premise: the particles must actually move" + + # lazy .sym access must now see the refreshed proxy + cy_proxy_after = proxy_cy() + tracked = cy_proxy_after - cy_proxy_before + assert tracked > 0.5 * moved, ( + f"proxy frozen after advection: particle centroid moved {moved:.4f} " + f"but proxy centroid moved only {tracked:.4f} (issue #289)" + ) + + del swarm + del mesh + + +# -------------------------------------------------------------------------- +# SWARM-05 / BF-08 (issue #215 Bug 3): solve consumes a fresh proxy +# -------------------------------------------------------------------------- + +def test_projection_solve_consumes_fresh_proxy(mesh): + """Write material.data, run a Projection solve WITHOUT touching .sym, and + assert the solve consumed the new values (the proxy is refreshed eagerly + at solve entry rather than only via the lazy .sym accessor).""" + swarm = uw.swarm.Swarm(mesh=mesh) + var = swarm.add_variable("mat", 1, proxy_degree=1) + swarm.populate(fill_param=3) + + var.data[:, 0] = 1.0 + + proj_var = uw.discretisation.MeshVariable("pv0113", mesh, 1, degree=1) + proj = uw.systems.Projection(mesh, proj_var) + proj.uw_function = var.sym[0] # captured ONCE; refreshes the proxy now + proj.petsc_options.delValue("ksp_monitor") + + proj.solve() + assert abs(float(np.mean(proj_var.data)) - 1.0) < 0.05 + + # update the particle data; do NOT touch .sym before solving + var.data[:, 0] = 2.0 + proj.solve() + + mean_after = float(np.mean(proj_var.data)) + assert abs(mean_after - 2.0) < 0.05, ( + f"Projection solve consumed a stale proxy: mean {mean_after:.3f}, " + "expected ~2.0 (issue #215 Bug 3 / SWARM-05)" + ) + + del swarm + del mesh