Module crowd-control.classes.simulation

Expand source code
import copy
import matplotlib.pyplot as plt 
import numpy as np 
import random 

from numpy import ndarray
from collections import defaultdict
from classes.lattice import Lattice
from classes.cell import Cell
from helpers import get_value_array


class Simulation:

    """
    Simulates heterogenous crowd in an infinite corridor.

    Attributes:
        N (int): Total number of people in the corridor.
        iters (int): Number of iterations (timesteps) to execute.
        corridor (Lattice): Object representing the initial state of populated corridor.
        populated_cells (ndarray[Cell]): Array of currently populated cell objects .
        p (float): probability of agents moving straight (soberness).
    """

    def __init__(self, iters, corridor, p=1):
        """
        Initializes the simulation object.

        Args:
            N (int): Total number of people in the corridor.
            iters (int): Number of iterations (timesteps) to exectue
            corridor (Lattice): Object representing the initial state of populated corridor.
            p (float): probability of agents moving straight (soberness).
        """
        self.N = len(corridor.get_populated_cells())
        self.iters = iters
        self.corridor: Lattice = copy.deepcopy(corridor)
        self.corridor.load_neighbours()
        self.populated_cells: ndarray[Cell] = self.corridor.get_populated_cells()
        self.p = p

    def find_target_cell(self, cell: Cell) -> Cell | None:
        """
        Find and return target cell while considering boundary conditions.
        Implicitly returns None if no target cell available.

        Args:
            cell (Cell): Populated cell that needs to target next cell.
        """
        assert cell.value != 0, 'Unpopulated cell should not target a cell'

        # check periodic boundary conditions
        if cell.is_leaving_left():
            new_y = self.corridor.len_y - 1
            return self.corridor.get_random_empty_edge_cell(new_y, x=cell.x, p=self.p)
        elif cell.is_leaving_right(self.corridor.len_y):
            return self.corridor.get_random_empty_edge_cell(y=0, x=cell.x, p=self.p)
        
        # cell is not a boundary cell
        return cell.get_best_neighbor(self.p)

    def resolve_conflicts(self, next_cells):
        """
        Resolve conflicts in the next_cells dictionary.
        If cell targeted by multiple cells, random cell wins and others stay put.
        Return dict with coordinate of target cell as key, cell object as value.

        Args:
            next_cells: dict with coordinate of target cell as keys, list of cells as values.
        """
        targets = list(next_cells.keys())
        cell_assigned = {}

        for target in targets:
            candidates = next_cells[target]

            if len(candidates) == 1:
                cell_assigned[target] = candidates[0]
            else:
                for _ in range(len(candidates) - 1):
                    loser = candidates.pop(random.randint(0, len(candidates) - 1))
                    cell_assigned[(loser.x, loser.y)] = loser
                winner = candidates[0]
                cell_assigned[target] = winner
        
        return cell_assigned
    
    def execute_timestep(self, next_cells):
        """
        Populate new cells and empty old ones.
        Adjust distance value of visited cells. 

        Args:
            next_cells (dict): Defines all moves to execute.
        """
        for new_cell_coords, old_cell in next_cells.items():
            value = old_cell.value
            new_cell = self.corridor.cells[new_cell_coords]
            
            if old_cell != new_cell: 
                old_cell.lower_distance_to_exit()
            
            old_cell.clear()    
            new_cell.populate(value)

    def iteration(self):
        """
        Execute one iteration of the Cellular Automata.
        """
        # key=tuple with coords of a targeted cell, value=list of old_cell(s)
        next_cells = defaultdict(list) 
 
        for cell in self.populated_cells:
            
            target_cell = self.find_target_cell(cell)
            
            # save targeted cells
            if target_cell: 
                next_cells[(target_cell.x, target_cell.y)].append(cell)
            else:
                next_cells[((cell.x, cell.y))] = [cell] 
        
        # solve conflicts when cell is targeted by multiple cells
        next_cells = self.resolve_conflicts(next_cells)

        # populate new cells and empty old ones
        self.execute_timestep(next_cells)
        
        # update populated cells
        self.populated_cells = self.corridor.get_populated_cells()

    def run(self, animate=True, save_video=True, print_progress=True):
        """
        Execute self.iters iterations and return evolution of order parameter.

        Args:
            animate (bool): Animates progress if true.
            save_video (bool): Saves CA snapshots if true
            print_progress (bool): Prints iteration number if true.
        """
        images = []
        if animate:
            plt.figure(figsize=(12, 5))
            plt.ion()
            
        phi_0 = calculate_phi_0(self.corridor.len_x, self.corridor.len_y, self.N)
        print(f'phi_0:{phi_0:.3f}')
        phi_values = np.zeros(self.iters)
        
        for i in range(self.iters):
            print(f'iteration {i+1}/{self.iters}     ', end='\r') if print_progress else None
            
            # update all cells
            self.iteration()
            
            # update order parameter
            phi = self.corridor.calculate_lane_formation()
            phi_reduced = (phi-phi_0)/(1-phi_0)
            phi_values[i] = phi_reduced

            if save_video == True:
                images.append(get_value_array(self.corridor.cells))

            self.animate(i, phi_values) if animate else None   

        plt.ioff() if animate else None

        assert len(self.corridor.get_populated_cells()) == self.N, 'Density has changed during simulation'
        return images, phi_values
    
    def animate(self, i, phi_values):
        """
        Animates one frame of the simulation. 

        Args:
            i (int): Current iteration number.
            phi_values (list[float]): array of phi value per iteration.
        """
        # plot lattice
        plt.subplot(1,2,1)
        self.plot_snapshot(colorbar=False)

        # plot phi evolution
        plt.subplot(1,2,2)
        plt.xlabel('iteration')
        plt.ylabel('$\\tilde{\phi}$', fontsize=14)
        plt.plot(list(range(i + 1)), phi_values[0:i+1], 'k-')
        plt.pause(0.005)
        plt.clf()

    def plot_snapshot(self, colorbar=True):
        """
        Plots snapshot of the current state of the lattice.
        """
        plt.imshow(get_value_array(self.corridor.cells), interpolation="nearest", origin="upper")
        plt.colorbar() if colorbar == True else None
        plt.show()
    
    def plot_results(self, phi_values, save=False):
        """
        Plots final configuration of the lattice and progress of phi in one figure.
        """
        plt.figure(figsize=(12, 5))
        plt.subplot(121)
        plt.imshow(get_value_array(self.corridor.cells), interpolation="nearest", origin="upper")
        plt.colorbar() 

        plt.subplot(122)
        plt.xlabel('iteration')
        plt.ylabel('$\\tilde{\phi}$', fontsize=14)
        plt.plot(list(range(len(phi_values))), phi_values, 'k-')

        if save:
            density = self.N / (self.corridor.len_x * self.corridor.len_y)
            plt.savefig(f'./results/final_snapshots/L_{self.corridor.len_x}_rho_{density}_p_{self.p}.png')

def calculate_phi_0(len_x, len_y, N):
    """
    Calculates phi_0, the normalization factor for the reduced order parameter
    (measures the degree of lane formation) by generating 100 random corridors.

    Args:
        len_x (int): Number of rows of the corridor.
        len_y (int): Number of columns of the corridor.
        N (int): Number of agents in the corridor.
    """
    phi_randoms = np.zeros(100)
    for j in range(0,100):
        corridor = Lattice(len_x, len_y)
        corridor.populate_corridor(N)
        phi_randoms[j] = corridor.calculate_lane_formation()
    phi_0 = np.mean(phi_randoms)
    return phi_0

Functions

def calculate_phi_0(len_x, len_y, N)

Calculates phi_0, the normalization factor for the reduced order parameter (measures the degree of lane formation) by generating 100 random corridors.

Args

len_x : int
Number of rows of the corridor.
len_y : int
Number of columns of the corridor.
N : int
Number of agents in the corridor.
Expand source code
def calculate_phi_0(len_x, len_y, N):
    """
    Calculates phi_0, the normalization factor for the reduced order parameter
    (measures the degree of lane formation) by generating 100 random corridors.

    Args:
        len_x (int): Number of rows of the corridor.
        len_y (int): Number of columns of the corridor.
        N (int): Number of agents in the corridor.
    """
    phi_randoms = np.zeros(100)
    for j in range(0,100):
        corridor = Lattice(len_x, len_y)
        corridor.populate_corridor(N)
        phi_randoms[j] = corridor.calculate_lane_formation()
    phi_0 = np.mean(phi_randoms)
    return phi_0

Classes

class Simulation (iters, corridor, p=1)

Simulates heterogenous crowd in an infinite corridor.

Attributes

N : int
Total number of people in the corridor.
iters : int
Number of iterations (timesteps) to execute.
corridor : Lattice
Object representing the initial state of populated corridor.
populated_cells : ndarray[Cell]
Array of currently populated cell objects .
p : float
probability of agents moving straight (soberness).

Initializes the simulation object.

Args

N : int
Total number of people in the corridor.
iters : int
Number of iterations (timesteps) to exectue
corridor : Lattice
Object representing the initial state of populated corridor.
p : float
probability of agents moving straight (soberness).
Expand source code
class Simulation:

    """
    Simulates heterogenous crowd in an infinite corridor.

    Attributes:
        N (int): Total number of people in the corridor.
        iters (int): Number of iterations (timesteps) to execute.
        corridor (Lattice): Object representing the initial state of populated corridor.
        populated_cells (ndarray[Cell]): Array of currently populated cell objects .
        p (float): probability of agents moving straight (soberness).
    """

    def __init__(self, iters, corridor, p=1):
        """
        Initializes the simulation object.

        Args:
            N (int): Total number of people in the corridor.
            iters (int): Number of iterations (timesteps) to exectue
            corridor (Lattice): Object representing the initial state of populated corridor.
            p (float): probability of agents moving straight (soberness).
        """
        self.N = len(corridor.get_populated_cells())
        self.iters = iters
        self.corridor: Lattice = copy.deepcopy(corridor)
        self.corridor.load_neighbours()
        self.populated_cells: ndarray[Cell] = self.corridor.get_populated_cells()
        self.p = p

    def find_target_cell(self, cell: Cell) -> Cell | None:
        """
        Find and return target cell while considering boundary conditions.
        Implicitly returns None if no target cell available.

        Args:
            cell (Cell): Populated cell that needs to target next cell.
        """
        assert cell.value != 0, 'Unpopulated cell should not target a cell'

        # check periodic boundary conditions
        if cell.is_leaving_left():
            new_y = self.corridor.len_y - 1
            return self.corridor.get_random_empty_edge_cell(new_y, x=cell.x, p=self.p)
        elif cell.is_leaving_right(self.corridor.len_y):
            return self.corridor.get_random_empty_edge_cell(y=0, x=cell.x, p=self.p)
        
        # cell is not a boundary cell
        return cell.get_best_neighbor(self.p)

    def resolve_conflicts(self, next_cells):
        """
        Resolve conflicts in the next_cells dictionary.
        If cell targeted by multiple cells, random cell wins and others stay put.
        Return dict with coordinate of target cell as key, cell object as value.

        Args:
            next_cells: dict with coordinate of target cell as keys, list of cells as values.
        """
        targets = list(next_cells.keys())
        cell_assigned = {}

        for target in targets:
            candidates = next_cells[target]

            if len(candidates) == 1:
                cell_assigned[target] = candidates[0]
            else:
                for _ in range(len(candidates) - 1):
                    loser = candidates.pop(random.randint(0, len(candidates) - 1))
                    cell_assigned[(loser.x, loser.y)] = loser
                winner = candidates[0]
                cell_assigned[target] = winner
        
        return cell_assigned
    
    def execute_timestep(self, next_cells):
        """
        Populate new cells and empty old ones.
        Adjust distance value of visited cells. 

        Args:
            next_cells (dict): Defines all moves to execute.
        """
        for new_cell_coords, old_cell in next_cells.items():
            value = old_cell.value
            new_cell = self.corridor.cells[new_cell_coords]
            
            if old_cell != new_cell: 
                old_cell.lower_distance_to_exit()
            
            old_cell.clear()    
            new_cell.populate(value)

    def iteration(self):
        """
        Execute one iteration of the Cellular Automata.
        """
        # key=tuple with coords of a targeted cell, value=list of old_cell(s)
        next_cells = defaultdict(list) 
 
        for cell in self.populated_cells:
            
            target_cell = self.find_target_cell(cell)
            
            # save targeted cells
            if target_cell: 
                next_cells[(target_cell.x, target_cell.y)].append(cell)
            else:
                next_cells[((cell.x, cell.y))] = [cell] 
        
        # solve conflicts when cell is targeted by multiple cells
        next_cells = self.resolve_conflicts(next_cells)

        # populate new cells and empty old ones
        self.execute_timestep(next_cells)
        
        # update populated cells
        self.populated_cells = self.corridor.get_populated_cells()

    def run(self, animate=True, save_video=True, print_progress=True):
        """
        Execute self.iters iterations and return evolution of order parameter.

        Args:
            animate (bool): Animates progress if true.
            save_video (bool): Saves CA snapshots if true
            print_progress (bool): Prints iteration number if true.
        """
        images = []
        if animate:
            plt.figure(figsize=(12, 5))
            plt.ion()
            
        phi_0 = calculate_phi_0(self.corridor.len_x, self.corridor.len_y, self.N)
        print(f'phi_0:{phi_0:.3f}')
        phi_values = np.zeros(self.iters)
        
        for i in range(self.iters):
            print(f'iteration {i+1}/{self.iters}     ', end='\r') if print_progress else None
            
            # update all cells
            self.iteration()
            
            # update order parameter
            phi = self.corridor.calculate_lane_formation()
            phi_reduced = (phi-phi_0)/(1-phi_0)
            phi_values[i] = phi_reduced

            if save_video == True:
                images.append(get_value_array(self.corridor.cells))

            self.animate(i, phi_values) if animate else None   

        plt.ioff() if animate else None

        assert len(self.corridor.get_populated_cells()) == self.N, 'Density has changed during simulation'
        return images, phi_values
    
    def animate(self, i, phi_values):
        """
        Animates one frame of the simulation. 

        Args:
            i (int): Current iteration number.
            phi_values (list[float]): array of phi value per iteration.
        """
        # plot lattice
        plt.subplot(1,2,1)
        self.plot_snapshot(colorbar=False)

        # plot phi evolution
        plt.subplot(1,2,2)
        plt.xlabel('iteration')
        plt.ylabel('$\\tilde{\phi}$', fontsize=14)
        plt.plot(list(range(i + 1)), phi_values[0:i+1], 'k-')
        plt.pause(0.005)
        plt.clf()

    def plot_snapshot(self, colorbar=True):
        """
        Plots snapshot of the current state of the lattice.
        """
        plt.imshow(get_value_array(self.corridor.cells), interpolation="nearest", origin="upper")
        plt.colorbar() if colorbar == True else None
        plt.show()
    
    def plot_results(self, phi_values, save=False):
        """
        Plots final configuration of the lattice and progress of phi in one figure.
        """
        plt.figure(figsize=(12, 5))
        plt.subplot(121)
        plt.imshow(get_value_array(self.corridor.cells), interpolation="nearest", origin="upper")
        plt.colorbar() 

        plt.subplot(122)
        plt.xlabel('iteration')
        plt.ylabel('$\\tilde{\phi}$', fontsize=14)
        plt.plot(list(range(len(phi_values))), phi_values, 'k-')

        if save:
            density = self.N / (self.corridor.len_x * self.corridor.len_y)
            plt.savefig(f'./results/final_snapshots/L_{self.corridor.len_x}_rho_{density}_p_{self.p}.png')

Methods

def animate(self, i, phi_values)

Animates one frame of the simulation.

Args

i : int
Current iteration number.
phi_values : list[float]
array of phi value per iteration.
Expand source code
def animate(self, i, phi_values):
    """
    Animates one frame of the simulation. 

    Args:
        i (int): Current iteration number.
        phi_values (list[float]): array of phi value per iteration.
    """
    # plot lattice
    plt.subplot(1,2,1)
    self.plot_snapshot(colorbar=False)

    # plot phi evolution
    plt.subplot(1,2,2)
    plt.xlabel('iteration')
    plt.ylabel('$\\tilde{\phi}$', fontsize=14)
    plt.plot(list(range(i + 1)), phi_values[0:i+1], 'k-')
    plt.pause(0.005)
    plt.clf()
def execute_timestep(self, next_cells)

Populate new cells and empty old ones. Adjust distance value of visited cells.

Args

next_cells : dict
Defines all moves to execute.
Expand source code
def execute_timestep(self, next_cells):
    """
    Populate new cells and empty old ones.
    Adjust distance value of visited cells. 

    Args:
        next_cells (dict): Defines all moves to execute.
    """
    for new_cell_coords, old_cell in next_cells.items():
        value = old_cell.value
        new_cell = self.corridor.cells[new_cell_coords]
        
        if old_cell != new_cell: 
            old_cell.lower_distance_to_exit()
        
        old_cell.clear()    
        new_cell.populate(value)
def find_target_cell(self, cell: classes.cell.Cell) ‑> classes.cell.Cell | None

Find and return target cell while considering boundary conditions. Implicitly returns None if no target cell available.

Args

cell : Cell
Populated cell that needs to target next cell.
Expand source code
def find_target_cell(self, cell: Cell) -> Cell | None:
    """
    Find and return target cell while considering boundary conditions.
    Implicitly returns None if no target cell available.

    Args:
        cell (Cell): Populated cell that needs to target next cell.
    """
    assert cell.value != 0, 'Unpopulated cell should not target a cell'

    # check periodic boundary conditions
    if cell.is_leaving_left():
        new_y = self.corridor.len_y - 1
        return self.corridor.get_random_empty_edge_cell(new_y, x=cell.x, p=self.p)
    elif cell.is_leaving_right(self.corridor.len_y):
        return self.corridor.get_random_empty_edge_cell(y=0, x=cell.x, p=self.p)
    
    # cell is not a boundary cell
    return cell.get_best_neighbor(self.p)
def iteration(self)

Execute one iteration of the Cellular Automata.

Expand source code
def iteration(self):
    """
    Execute one iteration of the Cellular Automata.
    """
    # key=tuple with coords of a targeted cell, value=list of old_cell(s)
    next_cells = defaultdict(list) 

    for cell in self.populated_cells:
        
        target_cell = self.find_target_cell(cell)
        
        # save targeted cells
        if target_cell: 
            next_cells[(target_cell.x, target_cell.y)].append(cell)
        else:
            next_cells[((cell.x, cell.y))] = [cell] 
    
    # solve conflicts when cell is targeted by multiple cells
    next_cells = self.resolve_conflicts(next_cells)

    # populate new cells and empty old ones
    self.execute_timestep(next_cells)
    
    # update populated cells
    self.populated_cells = self.corridor.get_populated_cells()
def plot_results(self, phi_values, save=False)

Plots final configuration of the lattice and progress of phi in one figure.

Expand source code
def plot_results(self, phi_values, save=False):
    """
    Plots final configuration of the lattice and progress of phi in one figure.
    """
    plt.figure(figsize=(12, 5))
    plt.subplot(121)
    plt.imshow(get_value_array(self.corridor.cells), interpolation="nearest", origin="upper")
    plt.colorbar() 

    plt.subplot(122)
    plt.xlabel('iteration')
    plt.ylabel('$\\tilde{\phi}$', fontsize=14)
    plt.plot(list(range(len(phi_values))), phi_values, 'k-')

    if save:
        density = self.N / (self.corridor.len_x * self.corridor.len_y)
        plt.savefig(f'./results/final_snapshots/L_{self.corridor.len_x}_rho_{density}_p_{self.p}.png')
def plot_snapshot(self, colorbar=True)

Plots snapshot of the current state of the lattice.

Expand source code
def plot_snapshot(self, colorbar=True):
    """
    Plots snapshot of the current state of the lattice.
    """
    plt.imshow(get_value_array(self.corridor.cells), interpolation="nearest", origin="upper")
    plt.colorbar() if colorbar == True else None
    plt.show()
def resolve_conflicts(self, next_cells)

Resolve conflicts in the next_cells dictionary. If cell targeted by multiple cells, random cell wins and others stay put. Return dict with coordinate of target cell as key, cell object as value.

Args

next_cells
dict with coordinate of target cell as keys, list of cells as values.
Expand source code
def resolve_conflicts(self, next_cells):
    """
    Resolve conflicts in the next_cells dictionary.
    If cell targeted by multiple cells, random cell wins and others stay put.
    Return dict with coordinate of target cell as key, cell object as value.

    Args:
        next_cells: dict with coordinate of target cell as keys, list of cells as values.
    """
    targets = list(next_cells.keys())
    cell_assigned = {}

    for target in targets:
        candidates = next_cells[target]

        if len(candidates) == 1:
            cell_assigned[target] = candidates[0]
        else:
            for _ in range(len(candidates) - 1):
                loser = candidates.pop(random.randint(0, len(candidates) - 1))
                cell_assigned[(loser.x, loser.y)] = loser
            winner = candidates[0]
            cell_assigned[target] = winner
    
    return cell_assigned
def run(self, animate=True, save_video=True, print_progress=True)

Execute self.iters iterations and return evolution of order parameter.

Args

animate : bool
Animates progress if true.
save_video : bool
Saves CA snapshots if true
print_progress : bool
Prints iteration number if true.
Expand source code
def run(self, animate=True, save_video=True, print_progress=True):
    """
    Execute self.iters iterations and return evolution of order parameter.

    Args:
        animate (bool): Animates progress if true.
        save_video (bool): Saves CA snapshots if true
        print_progress (bool): Prints iteration number if true.
    """
    images = []
    if animate:
        plt.figure(figsize=(12, 5))
        plt.ion()
        
    phi_0 = calculate_phi_0(self.corridor.len_x, self.corridor.len_y, self.N)
    print(f'phi_0:{phi_0:.3f}')
    phi_values = np.zeros(self.iters)
    
    for i in range(self.iters):
        print(f'iteration {i+1}/{self.iters}     ', end='\r') if print_progress else None
        
        # update all cells
        self.iteration()
        
        # update order parameter
        phi = self.corridor.calculate_lane_formation()
        phi_reduced = (phi-phi_0)/(1-phi_0)
        phi_values[i] = phi_reduced

        if save_video == True:
            images.append(get_value_array(self.corridor.cells))

        self.animate(i, phi_values) if animate else None   

    plt.ioff() if animate else None

    assert len(self.corridor.get_populated_cells()) == self.N, 'Density has changed during simulation'
    return images, phi_values