Source code for soba.agents.continuousOccupant

import random
import soba.agents.resources.aStar as aStar
import soba.agents.resources.fov as fov
import soba.visualization.ramen.performanceGenerator as ramen
from soba.agents.occupant import Occupant
from soba.agents.avatar import Avatar 
import time

[docs]class ContinuousOccupant(Occupant): """ This class enables to create occupants that are modelled with a continuous space models. based on considering a scaled grid (x, y). Cell size of 0.5m ^ 2 by default. The occupants are agents with their activity defined by markov states. Attributes: Those Inherited from the Occupant class. fov: List of positions (x, y) that the occupant can see. Methods: getPosState: Auxiliary method to distribute the occupants between the points of interests with same id for more than one occupant. getWay: Invocation of the AStar resource to calculate the optimal path. getPlaceToGo: Obtaining the position associated with the current state. posInMyFOV: Check if a position is in my field of vision. evalAvoid: Check the future movement to be made by another agent to assess a possible collision. checkFreeSharedPOI: Get a free position of a shared point of interest if possible. checkCanMove: Get a new path in case of possible collision. evalCollision: Evaluate a possible collision with an agent and solve it if necessary by calculating another path. makeMovement: Carry out a movement: displacement between cells or reduction of the movement cost parameter. reportMovement: Auxiliary method to notify a movement giving its orientation and speed. checkLeaveArrive: Evaluates the entrance and exit of the building by an occupying agent. getFOV: Calculation of the occupant's field of vision, registered in the attribute fov. step: Method invoked by the Model scheduler in each step. """ global ramenAux ramenAux = False def activeRamen(): global ramenAux ramenAux = True def __init__(self, unique_id, model, json, speed = 0.71428): super().__init__(unique_id, model, json, speed) """ Create a new ContinuousOccupant object. Args: unique_id: Unique identifier corresponding to the Occupant. models: Associated Model object, by default ContinuousModel. json: Json of definition of parameters of behavior speed: Movement speed in m/s Return: ContinuousOccupant object """ self.fov = [] self.fovCal = True #Control params. self.costMovement = round(0.25/(self.speed*self.model.clock.timeByStep)) self.out = True self.initmove = True self.entering = False self.rect = True self.alreadyMovement = False self.movement = {'speed': self.speed, 'orientation':'out'} #State machine for k, v in json['states'].items(): pos = self.getPosState(v) self.positionByState[k] = pos pos = self.positionByState[list(json['states'].items())[0][0]] self.model.grid.place_agent(self, pos)
[docs] def getPosState(self, name): ''' Auxiliary method to distribute the occupants between the points of interests with same id for more than one occupant. Args: name: Poi id/name. Return: Position associated with this occupant. ''' options = [] for poi in self.model.pois: if poi.id == name: if not poi.share and not poi.used: poi.used = True return poi.pos elif poi.share: options.append(poi.pos) else: options.append(poi.pos) return random.choice(options)
def startActivity(self): super.startActivity()
[docs] def getWay(self, pos = None, pos_to_go = None, other = []): ''' Invocation of the AStar resource to calculate the optimal path. Args: pos: Initial position, by default the current position of the occupant. pos_to_go: Final position, by default the value of the 'pos_to_go' attribute of the occupant. other: List of auxiliary positions given to be considered impenetrable by the occupants, that is, they will not be used by the AStar. Return: List of positions (x, y). ''' posSend = pos pos_to_goSend = pos_to_go if pos == None: posSend = self.pos if pos_to_go == None: pos_to_goSend = self.pos_to_go return aStar.getPathContinuous(self.model, posSend, pos_to_goSend, other)
[docs] def getPlaceToGo(self): ''' Obtaining the position associated with the current state. It is invoked when you enter a new state. Return: Position as coordinate (x, y). ''' pos_to_go = self.positionByState[self.state] return pos_to_go
[docs] def posInMyFOV(self, pos): ''' Check if the position is in my field of vision Args: pos: Position to be checked Return: Boolean ''' if pos in self.fov: return True return False
[docs] def evalAvoid(self, otherAgent): ''' Check the future movement to be made by another agent to assess a possible collision. Args: otherAgent: The other agent to be avoid. Return: Boolean ''' if otherAgent.alreadyMovement: return False if otherAgent.pos == otherAgent.pos_to_go: return False if otherAgent.movements[otherAgent.N] == self.pos: return False if otherAgent.movements[len(otherAgent.movements)-1] == otherAgent.pos: return False return True
[docs] def checkFreeSharedPOI(self): """ Get a free position of a shared point of interest if possible. Return: POI object """ poi = self.model.getPOIsPos(self.pos_to_go) if not poi: return False pois = self.model.getPOIsId(poi[0].id) for p in pois: if self.model.checkFreePOI(p): return p.pos return False
[docs] def checkCanMove(self): """ Get a new path in case of possible collision. Return: List of positions """ x1, y1 = self.pos possiblePosition1 = [(x1, y1 + 1), (x1 + 1, y1), (x1 - 1, y1), (x1, y1 - 1)] possiblePosition2 = [(x1 + 1, y1 + 1), (x1 + 1, y1 - 1), (x1 - 1, y1 - 1), (x1 - 1, y1 + 1)] possiblePosition = possiblePosition2 + possiblePosition1 posOccupied = [] for pos in possiblePosition: possibleOccupant = self.model.grid.get_cell_list_contents(pos) for j in possibleOccupant: if isinstance(j, Occupant) and not self.evalAvoid(j): posOccupied.append(pos) if self.pos_to_go == self.movements[self.N]: pos_shared = self.checkFreeSharedPOI() if pos_shared: self.pos_to_go = pos_shared else: self.pos_to_go = self.pos return [self.pos] way = self.getWay(other = posOccupied) print('way', way) if way[0] == self.pos: print('if') self.N = 0 self.pos_to_go = self.pos return way
[docs] def evalCollision(self): """ Evaluate a possible collision with an agent, invoking the evalAvoid method, and solve it if necessary by calculating another path. Return: True if the collision exists and is avoided, False otherwise. """ if self.movements[self.N] in self.model.exits: return True possibleOccupant = self.model.grid.get_cell_list_contents(self.movements[self.N]) for i in possibleOccupant: if isinstance(i, Occupant) and not isinstance(i, Avatar): if self.evalAvoid(i): return True self.movements = self.checkCanMove() self.N = 0 return True return True
[docs] def makeMovement(self): """Carry out a movement: displacement between cells or reduction of the movement cost parameter.""" if self.costMovement > 1: self.costMovement = self.costMovement - 1 self.reportMovement() else: if self.initmove: if self.pos != self.pos_to_go: x1, y1 = self.pos x2, y2 = self.movements[self.N] rect = True if x1!=x2 and y1!=y2: rect = False if rect == True: self.costMovement = round(0.5/(self.speed*self.model.clock.timeByStep)) self.rect = True else: self.costMovement = round(0.707106781/(self.speed*self.model.clock.timeByStep)) self.rect = False self.initmove = False self.step() if self.fovCal: self.getFOV() return if self.evalCollision(): self.reportMovement() self.model.grid.move_agent(self, self.movements[self.N]) self.N = self.N+1 if self.fovCal: self.getFOV() if self.pos != self.pos_to_go: x1, y1 = self.pos x2, y2 = self.movements[self.N] #if self.N > len(self.movements) - 1: #self.N = 0 rect = True if x1!=x2 and y1!=y2: rect = False if rect == True: self.costMovement = round(0.5/(self.speed*self.model.clock.timeByStep)) self.rect = True else: self.costMovement = round(0.707106781/(self.speed*self.model.clock.timeByStep)) self.rect = False else: self.N = 0 self.costMovement = round(0.5/(self.speed*self.model.clock.timeByStep)) self.rect = True
[docs] def reportMovement(self): """ Auxiliary method to notify a movement giving its orientation and speed. """ global ramenAux x1, y1 = self.pos x2, y2 = self.movements[self.N] pos = '' if x2 > x1 and y2 > y1: pos = 'NE' elif x2 > x1 and y2 == y1: pos = 'E' elif x1 > x2 and y2 == y1: pos = 'W' elif x1 > x2 and y1 > y2: pos = 'SW' elif y2 > y1 and x2 == x1: pos = 'N' elif x1 == x2 and y1 > y2: pos = 'S' elif x1 > x2 and y2 > y1: pos = 'NW' elif x2 > x1 and y1 > y2: pos = 'SE' else: if ramenAux: ramen.reportStop(self) else: self.movement = {'speed': self.speed, 'orientation': 'stop'} return if ramenAux: ramen.reportMovement(self, pos, self.rect) else: orientation = pos self.movement = {'speed': self.speed, 'orientation': orientation}
[docs] def checkLeaveArrive(self): """ Evaluates the entrance and exit of the building by an occupying agent. """ global ramenAux if (self.pos_to_go not in self.model.exits) and not self.inbuilding: self.entering = True if ramenAux: ramen.reportCreation(self, 'E') self.inbuilding = True self.movement = {'speed': self.speed, 'orientation': 'E'} return if self.entering and (self.pos in self.model.exits): return else: self.entering = False if (self.pos in self.model.exits) and self.inbuilding: self.inbuilding = False if ramenAux: ramen.reportExit(self) self.movement = {'speed': self.speed, 'orientation': 'out'} return
[docs] def getFOV(self): '''Calculation of the occupant's field of vision, registered in the attribute fov''' asciMap = self.model.asciMap fovMap, flag = fov.makeFOV(asciMap, self.pos) self.fov = [] for index1, line in enumerate(fovMap): for index2, element in enumerate(line): if element == flag: self.fov.append((index2, index1))
[docs] def step(self): """ Method invoked by the Model scheduler in each step. Evaluate if appropriate and, if so, perform: A change of state, a movement or advance in the cost of a movement, or an advance in the performance of an activity. """ if self.changeSchedule() or self.markov == True: self.markov_machine.runStep(self.markovActivity[self.getPeriod()]) self.checkLeaveArrive() self.markov = False elif self.pos != self.pos_to_go and self.movements[0] != self.pos: print('pos: ', self.pos) print('pos_to_go: ', self.pos_to_go) print('movements: ', self.movements) self.makeMovement() self.checkLeaveArrive() self.alreadyMovement = True elif self.time_activity > 0: self.time_activity = self.time_activity - 1 self.checkLeaveArrive() global ramenAux if self.inbuilding and ramenAux: ramen.reportStop(self) else: self.markov = True self.step()