diff --git a/examples/pursuit-evasion/README.md b/examples/pursuit-evasion/README.md new file mode 100644 index 00000000..5e72ea4a --- /dev/null +++ b/examples/pursuit-evasion/README.md @@ -0,0 +1,43 @@ +## Pursuit-Evasion Example ## + +This example evolves a feed-forward neural network controller for a pursuer agent in a 2D continuous pursuit-evasion game. The evader follows one of several fixed strategies, and the pursuer must learn to capture it through vector-based control. + +The environment and experimental setup are based on the paper: + +> **Adaptive Adversarial Agents via NEAT: Evolving Pursuit Behaviour Against Non-Stationary Player Strategies** (Aryan Jha, 2026) + +### Environment ### + +- Continuous 2D space of 800 × 800 units. +- **Observation (8-dim):** relative position (Δx, Δy), evader velocity (vx, vy), evader acceleration (ax, ay), pursuer velocity (vx, vy). +- **Action (2-dim):** velocity change [Δvx, Δvy] clipped to [-1, 1], scaled internally by 2.0. +- **Reward:** `+1` on capture, `-distance/800` per step otherwise. +- **Episode:** 500 steps max. +- **Capture condition:** pursuer within 20 units of evader. + +### Evader Strategies ### + +The evader can follow one of four strategies (set via `evader_strategy` in `evolve-feedforward.py`): + +| Strategy | Behaviour | +|---|---| +| `random` | Random bounded acceleration (default). | +| `flee` | Moves directly away from pursuer at max speed. | +| `dodge` | Predicts pursuer trajectory via velocity extrapolation and moves perpendicular. | +| `oscillate` | Follows sinusoidal motion patterns. | + +### Running ### + +```bash +# Train a pursuer against the random evader (default) +python evolve-feedforward.py + +# To change the evader strategy, edit the `evader_strategy` variable in the script. +``` + +### Files ### + +* `evolve-feedforward.py` — Main evolution script. +* `pursuit_evasion.py` — The pursuit-evasion environment. +* `config-feedforward` — NEAT configuration (based on the paper's setup: pop 80, 8 inputs, 2 outputs). +* `visualize.py` — Plotting and network visualization utilities. diff --git a/examples/pursuit-evasion/clean.bat b/examples/pursuit-evasion/clean.bat new file mode 100644 index 00000000..958cd4e6 --- /dev/null +++ b/examples/pursuit-evasion/clean.bat @@ -0,0 +1,3 @@ +del /f *.svg *.gv *.pkl +del /f neat-checkpoint-* +del /f fitness_history.csv speciation.csv species_fitness.csv diff --git a/examples/pursuit-evasion/clean.sh b/examples/pursuit-evasion/clean.sh new file mode 100644 index 00000000..d1897ffb --- /dev/null +++ b/examples/pursuit-evasion/clean.sh @@ -0,0 +1,4 @@ +#!/usr/bin/env bash +rm -f *.svg *.gv *.pkl +rm -f neat-checkpoint-* +rm -f fitness_history.csv speciation.csv species_fitness.csv diff --git a/examples/pursuit-evasion/config-feedforward b/examples/pursuit-evasion/config-feedforward new file mode 100644 index 00000000..21761c93 --- /dev/null +++ b/examples/pursuit-evasion/config-feedforward @@ -0,0 +1,68 @@ +[NEAT] +fitness_criterion = max +fitness_threshold = 0.90 +pop_size = 80 +reset_on_extinction = False +no_fitness_termination = True + +[DefaultGenome] +num_inputs = 8 +num_outputs = 2 +num_hidden = 0 +feed_forward = True +compatibility_disjoint_coefficient = 1.0 +compatibility_weight_coefficient = 0.5 +conn_add_prob = 0.3 +conn_delete_prob = 0.1 +node_add_prob = 0.2 +node_delete_prob = 0.1 +single_structural_mutation = False +structural_mutation_surer = default +initial_connection = full_direct +activation_default = tanh +activation_mutate_rate = 0.05 +activation_options = tanh relu sigmoid +aggregation_default = sum +aggregation_mutate_rate = 0.0 +aggregation_options = sum +bias_init_mean = 0.0 +bias_init_stdev = 1.0 +bias_init_type = gaussian +bias_max_value = 30.0 +bias_min_value = -30.0 +bias_mutate_rate = 0.7 +bias_mutate_power = 0.5 +bias_replace_rate = 0.1 +response_init_mean = 1.0 +response_init_stdev = 0.0 +response_init_type = gaussian +response_max_value = 30.0 +response_min_value = -30.0 +response_mutate_rate = 0.0 +response_mutate_power = 0.0 +response_replace_rate = 0.0 +weight_init_mean = 0.0 +weight_init_stdev = 1.0 +weight_init_type = gaussian +weight_max_value = 30 +weight_min_value = -30 +weight_mutate_rate = 0.8 +weight_mutate_power = 0.5 +weight_replace_rate = 0.1 +enabled_default = True +enabled_mutate_rate = 0.01 +enabled_rate_to_true_add = 0.0 +enabled_rate_to_false_add = 0.0 + +[DefaultSpeciesSet] +compatibility_threshold = 3.0 + +[DefaultStagnation] +species_fitness_func = max +max_stagnation = 15 +species_elitism = 2 + +[DefaultReproduction] +elitism = 2 +survival_threshold = 0.2 +min_species_size = 1 diff --git a/examples/pursuit-evasion/evolve-feedforward.py b/examples/pursuit-evasion/evolve-feedforward.py new file mode 100644 index 00000000..0f401d5c --- /dev/null +++ b/examples/pursuit-evasion/evolve-feedforward.py @@ -0,0 +1,88 @@ +""" +2D pursuit-evasion experiment with a feed-forward neural network. + +NEAT evolves a pursuer controller that learns to capture an evader +in a bounded 2D continuous space. The evader follows one of several +fixed strategies, configurable below. + +Based on the experimental setup from: + "Adaptive Adversarial Agents via NEAT: Evolving Pursuit Behaviour + Against Non-Stationary Player Strategies" (Aryan Jha, 2026) +""" + +import multiprocessing +import os +import pickle + +import neat +import pursuit_evasion +import visualize + +runs_per_genome = 5 +evader_strategy = 'random' + + +def eval_genome(genome, config): + net = neat.nn.FeedForwardNetwork.create(genome, config) + env = pursuit_evasion.PursuitEvasion(evader_strategy=evader_strategy) + + total_reward = 0.0 + for _ in range(runs_per_genome): + state = env.step([0.0, 0.0])[0] + ep_reward = 0.0 + while True: + action = net.activate(state) + state, reward, done = env.step(action) + ep_reward += reward + if done: + break + total_reward += ep_reward + + return total_reward / (runs_per_genome * env.max_steps) + + +def eval_genomes(genomes, config): + for genome_id, genome in genomes: + genome.fitness = eval_genome(genome, config) + + +def run(): + local_dir = os.path.dirname(__file__) + config_path = os.path.join(local_dir, 'config-feedforward') + config = neat.Config(neat.DefaultGenome, neat.DefaultReproduction, + neat.DefaultSpeciesSet, neat.DefaultStagnation, + config_path) + + pop = neat.Population(config) + stats = neat.StatisticsReporter() + pop.add_reporter(stats) + pop.add_reporter(neat.StdOutReporter(True)) + pop.add_reporter(neat.Checkpointer(50)) + + pe = neat.ParallelEvaluator(multiprocessing.cpu_count(), eval_genome) + winner = pop.run(pe.evaluate, 100) + + with open('winner-feedforward', 'wb') as f: + pickle.dump(winner, f) + + print(f'\nBest genome:\n{winner!s}') + + visualize.plot_stats(stats, ylog=False, view=True, filename="pursuit-fitness.svg") + visualize.plot_species(stats, view=True, filename="pursuit-speciation.svg") + + node_names = { + -1: 'rel_x', -2: 'rel_y', + -3: 'evader_vx', -4: 'evader_vy', + -5: 'evader_ax', -6: 'evader_ay', + -7: 'pursuer_vx', -8: 'pursuer_vy', + 0: 'Δvx', 1: 'Δvy', + } + visualize.draw_net(config, winner, True, node_names=node_names) + visualize.draw_net(config, winner, view=True, node_names=node_names, + filename="winner-feedforward.gv") + visualize.draw_net(config, winner, view=True, node_names=node_names, + filename="winner-feedforward-pruned.gv", prune_unused=True) + + +if __name__ == '__main__': + run() diff --git a/examples/pursuit-evasion/pursuit_evasion.py b/examples/pursuit-evasion/pursuit_evasion.py new file mode 100644 index 00000000..bdc823ea --- /dev/null +++ b/examples/pursuit-evasion/pursuit_evasion.py @@ -0,0 +1,138 @@ +""" +2D pursuit-evasion environment for NEAT experiments. + +A pursuer (controlled by a NEAT-evolved network) chases an evader +in a bounded continuous space. The evader follows one of several +fixed strategies, and the pursuer must learn to capture it. + +This environment is based on the one described in: + "Adaptive Adversarial Agents via NEAT: Evolving Pursuit Behaviour + Against Non-Stationary Player Strategies" (Aryan Jha, 2026) +""" + +import math +import random + +ARENA_SIZE = 400.0 +CAPTURE_RADIUS = 20.0 +MAX_STEPS = 500 +MAX_SPEED = 5.0 +ACTION_SCALE = 2.0 + + +class PursuitEvasion: + def __init__(self, evader_strategy='random'): + self.arena_size = ARENA_SIZE + self.capture_radius = CAPTURE_RADIUS + self.max_steps = MAX_STEPS + self.evader_strategy = evader_strategy + + self.step_count = 0 + self.captured = False + + margin = 50.0 + self.pursuer_pos = [random.uniform(margin, 2 * ARENA_SIZE - margin), + random.uniform(margin, 2 * ARENA_SIZE - margin)] + self.pursuer_vel = [0.0, 0.0] + + while True: + ex = random.uniform(margin, 2 * ARENA_SIZE - margin) + ey = random.uniform(margin, 2 * ARENA_SIZE - margin) + dx = ex - self.pursuer_pos[0] + dy = ey - self.pursuer_pos[1] + if math.hypot(dx, dy) > self.capture_radius * 3: + break + self.evader_pos = [ex, ey] + self.evader_vel = [0.0, 0.0] + self.evader_prev_vel = [0.0, 0.0] + self.evader_phase = random.uniform(0, 2 * math.pi) + + def _clamp(self, pos): + pos[0] = max(0.0, min(2 * self.arena_size, pos[0])) + pos[1] = max(0.0, min(2 * self.arena_size, pos[1])) + + def _evader_action(self): + if self.evader_strategy == 'random': + return [random.uniform(-1, 1) * 3.0, + random.uniform(-1, 1) * 3.0] + elif self.evader_strategy == 'flee': + dx = self.evader_pos[0] - self.pursuer_pos[0] + dy = self.evader_pos[1] - self.pursuer_pos[1] + dist = math.hypot(dx, dy) + if dist > 0: + return [dx / dist * 5.0, dy / dist * 5.0] + elif self.evader_strategy == 'dodge': + pred_px = self.pursuer_pos[0] + self.pursuer_vel[0] * 10 + pred_py = self.pursuer_pos[1] + self.pursuer_vel[1] * 10 + dx = self.evader_pos[0] - pred_px + dy = self.evader_pos[1] - pred_py + perp = [-dy, dx] + dist = math.hypot(*perp) + if dist > 0: + return [perp[0] / dist * 5.0, perp[1] / dist * 5.0] + elif self.evader_strategy == 'oscillate': + t = self.step_count / 30.0 + return [math.cos(t) * 4.0, math.sin(t) * 4.0] + return [0.0, 0.0] + + def step(self, action): + if self.captured: + return self._get_state(), 0.0, True + + ax = max(-1.0, min(1.0, action[0])) + ay = max(-1.0, min(1.0, action[1])) + + self.pursuer_vel[0] += ax * ACTION_SCALE + self.pursuer_vel[1] += ay * ACTION_SCALE + self.pursuer_vel[0] = max(-MAX_SPEED, min(MAX_SPEED, self.pursuer_vel[0])) + self.pursuer_vel[1] = max(-MAX_SPEED, min(MAX_SPEED, self.pursuer_vel[1])) + + self.evader_prev_vel = list(self.evader_vel) + evader_accel = self._evader_action() + self.evader_vel[0] += evader_accel[0] + self.evader_vel[1] += evader_accel[1] + self.evader_vel[0] = max(-MAX_SPEED, min(MAX_SPEED, self.evader_vel[0])) + self.evader_vel[1] = max(-MAX_SPEED, min(MAX_SPEED, self.evader_vel[1])) + + for i in range(2): + self.pursuer_pos[i] += self.pursuer_vel[i] + self.evader_pos[i] += self.evader_vel[i] + self._clamp(self.pursuer_pos) + self._clamp(self.evader_pos) + + self.step_count += 1 + + dx = self.pursuer_pos[0] - self.evader_pos[0] + dy = self.pursuer_pos[1] - self.evader_pos[1] + dist = math.hypot(dx, dy) + + self.captured = dist < self.capture_radius + done = self.captured or (self.step_count >= self.max_steps) + reward = 1.0 if self.captured else -dist / (2 * self.arena_size) + + return self._get_state(), reward, done + + def _get_state(self): + rel_x = (self.evader_pos[0] - self.pursuer_pos[0]) / (2 * self.arena_size) + rel_y = (self.evader_pos[1] - self.pursuer_pos[1]) / (2 * self.arena_size) + evader_accel_x = self.evader_vel[0] - self.evader_prev_vel[0] + evader_accel_y = self.evader_vel[1] - self.evader_prev_vel[1] + return [ + rel_x, + rel_y, + self.evader_vel[0] / MAX_SPEED, + self.evader_vel[1] / MAX_SPEED, + evader_accel_x / (ACTION_SCALE * 2), + evader_accel_y / (ACTION_SCALE * 2), + self.pursuer_vel[0] / MAX_SPEED, + self.pursuer_vel[1] / MAX_SPEED, + ] + + def get_pursuer_pos(self): + return tuple(self.pursuer_pos) + + def get_evader_pos(self): + return tuple(self.evader_pos) + + def get_capture_time(self): + return self.step_count if self.captured else None diff --git a/examples/pursuit-evasion/visualize.py b/examples/pursuit-evasion/visualize.py new file mode 100644 index 00000000..1a0b1e84 --- /dev/null +++ b/examples/pursuit-evasion/visualize.py @@ -0,0 +1,128 @@ +import warnings + +import graphviz +import matplotlib.pyplot as plt +import numpy as np + + +def plot_stats(statistics, ylog=False, view=False, filename='avg_fitness.svg'): + """ Plots the population's average and best fitness. """ + if plt is None: + warnings.warn("This display is not available due to a missing optional dependency (matplotlib)") + return + + generation = range(len(statistics.most_fit_genomes)) + best_fitness = [c.fitness for c in statistics.most_fit_genomes] + avg_fitness = np.array(statistics.get_fitness_mean()) + stdev_fitness = np.array(statistics.get_fitness_stdev()) + + plt.plot(generation, avg_fitness, 'b-', label="average") + plt.plot(generation, avg_fitness - stdev_fitness, 'g-.', label="-1 sd") + plt.plot(generation, avg_fitness + stdev_fitness, 'g-.', label="+1 sd") + plt.plot(generation, best_fitness, 'r-', label="best") + + plt.title("Population's average and best fitness") + plt.xlabel("Generations") + plt.ylabel("Fitness") + plt.grid() + plt.legend(loc="best") + if ylog: + plt.gca().set_yscale('symlog') + + plt.savefig(filename) + if view: + plt.show() + + plt.close() + + +def plot_species(statistics, view=False, filename='speciation.svg'): + """ Visualizes speciation throughout evolution. """ + if plt is None: + warnings.warn("This display is not available due to a missing optional dependency (matplotlib)") + return + + species_sizes = statistics.get_species_sizes() + num_generations = len(species_sizes) + curves = np.array(species_sizes).T + + fig, ax = plt.subplots() + ax.stackplot(range(num_generations), *curves) + + plt.title("Speciation") + plt.ylabel("Size per Species") + plt.xlabel("Generations") + + plt.savefig(filename) + + if view: + plt.show() + + plt.close() + + +def draw_net(config, genome, view=False, filename=None, node_names=None, show_disabled=True, prune_unused=False, + node_colors=None, fmt='svg'): + """ Receives a genome and draws a neural network with arbitrary topology. """ + if graphviz is None: + warnings.warn("This display is not available due to a missing optional dependency (graphviz)") + return + + if prune_unused: + genome = genome.get_pruned_copy(config.genome_config) + + if node_names is None: + node_names = {} + + assert type(node_names) is dict + + if node_colors is None: + node_colors = {} + + assert type(node_colors) is dict + + node_attrs = { + 'shape': 'circle', + 'fontsize': '9', + 'height': '0.2', + 'width': '0.2'} + + dot = graphviz.Digraph(format=fmt, node_attr=node_attrs) + + inputs = set() + for k in config.genome_config.input_keys: + inputs.add(k) + name = node_names.get(k, str(k)) + input_attrs = {'style': 'filled', 'shape': 'box', 'fillcolor': node_colors.get(k, 'lightgray')} + dot.node(name, _attributes=input_attrs) + + outputs = set() + for k in config.genome_config.output_keys: + outputs.add(k) + name = node_names.get(k, str(k)) + node_attrs = {'style': 'filled', 'fillcolor': node_colors.get(k, 'lightblue')} + + dot.node(name, _attributes=node_attrs) + + used_nodes = set(genome.nodes.keys()) + for n in used_nodes: + if n in inputs or n in outputs: + continue + + attrs = {'style': 'filled', + 'fillcolor': node_colors.get(n, 'white')} + dot.node(str(n), _attributes=attrs) + + for cg in genome.connections.values(): + if cg.enabled or show_disabled: + input, output = cg.key + a = node_names.get(input, str(input)) + b = node_names.get(output, str(output)) + style = 'solid' if cg.enabled else 'dotted' + color = 'green' if cg.weight > 0 else 'red' + width = str(0.1 + abs(cg.weight / 5.0)) + dot.edge(a, b, _attributes={'style': style, 'color': color, 'penwidth': width}) + + dot.render(filename, view=view) + + return dot