Source code for soba.agents.roomsOccupant

import random
import soba.agents.resources.aStar as aStar
from soba.agents.occupant import Occupant

[docs]class RoomsOccupant(Occupant): """ This class enables to create occupants that are modelled with a simplified models based on a discrete space associated with rooms. The occupants are agents with their activity defined by markov states. Attributes: Those inherited from the Occupant class. Methods: getPosState: Auxiliary method to distribute the occupants between the rooms shared by more than one occupant object. getWay: Invocation of the AStar resource to calculate the optimal path. occupantMovePos: Calculation of the control attributes that regulate the cost (steps) of the movement between rooms according to their size. getPlaceToGo: Obtaining the position associated with the current state. step: Method invoked by the Model scheduler in each step. """ def __init__(self, unique_id, model, json, speed = 0.7): super().__init__(unique_id, model, json, speed) """ Create a new RoomsOccupant object. Args: unique_id: Unique identifier corresponding to the Occupant. models: Associated Model object, by default RoomsModel. json: Json of definition of parameters of behavior speed: Movement speed in m/s Return: RoomsOccupant object """ self.model.schedule.add(self) #State machine for k, v in json['states'].items(): pos = self.getPosState(k, v) self.positionByState[k] = pos possible_rooms = [] roomsNames = self.positionByState[self.state] for room in self.model.rooms: if isinstance(roomsNames, dict): for roomName, v in roomsNames.items(): if room.name.split(r".")[0] == roomName: possible_rooms.append(room) else: if room.name.split(r".")[0] == self.positionByState[self.state]: possible_rooms.append(room) if len(possible_rooms) > 1: roomaux = random.choice(possible_rooms) else: roomaux = possible_rooms[0] self.model.grid.place_agent(self, roomaux.pos) self.model.pushAgentRoom(self, roomaux.pos) #control self.onMyWay1 = False self.onMyWay2 = False self.costMovementToNewRoom = 0 self.costMovementInNewRoom = 0 self.room1 = False self.room2 = False
[docs] def getPosState(self, name, posAux): ''' Auxiliary method to distribute the occupants between the rooms shared by more than one occupant object. Args: name: State name. posAux: Name of the room associated with this state, string, or dictionary of room names with number of occupants. {'RoomName1': numberofOccupantsAssigned1, 'RoomName2': numberofOccupantsAssigned2... } Return: Position associated with this occupant ''' if isinstance(posAux, dict): for k, v in posAux.items(): if v > 0: self.positionByStateAux[name][k]= v - 1 return k return posAux
#Movement
[docs] def getWay(self, pos = None, pos_to_go = None): ''' Invocation of the AStar resource to calculate the optimal path. Args: pos: Initial position, by default the current position of the occupant. pos_to_go: Final position, by default the value of the 'pos_to_go' attribute of the occupant. Return: List of positions (x, y). ''' posSend = pos pos_to_goSend = pos_to_go if pos == None: posSend = self.pos if pos_to_go == None: pos_to_goSend = self.pos_to_go return aStar.getPathRooms(self.model, posSend, pos_to_goSend)
[docs] def occupantMovePos(self, new_position): ''' Calculation of the control attributes that regulate the cost (steps) of the movement between rooms according to their size. Args: new_position: Room object to which it moves. ''' ux, uy = self.pos nx, ny = new_position for room in self.model.rooms: rx, ry = room.pos if room.pos == self.pos: #Cost as steps if (rx == nx): self.costMovemenToNewRoom = room.dy/2 * (1/self.speed) * (1/self.model.clock.timeByStep)# m * seg/m * step/seg if (ry == ny): self.costMovemenToNewRoom = room.dx/2 * (1/self.speed) * (1/self.model.clock.timeByStep) if room.pos == new_position: if (rx == ux): self.costMovementInNewRoom = room.dy/2 * (1/self.speed) * (1/self.model.clock.timeByStep) if (ry == uy): self.costMovementInNewRoom = room.dx/2 * (1/self.speed) * (1/self.model.clock.timeByStep)
[docs] def getPlaceToGo(self): ''' Obtaining the position associated with the current state. It is invoked when you enter a new state. Return: Position as coordinate (x, y). ''' pos_to_go = self.pos roomsNames = self.positionByState[self.state] possible_rooms = [] for room in self.model.rooms: if isinstance(roomsNames, dict): for roomName, v in roomsNames.items(): if room.name.split(r".")[0] == roomName: possible_rooms.append(room.pos) else: if room.name.split(r".")[0] == self.positionByState[self.state]: possible_rooms.append(room.pos) if len(possible_rooms) > 1: pos_to_go = random.choice(possible_rooms) else: pos_to_go = possible_rooms[0] return pos_to_go
def startActivity(self): super.startActivity()
[docs] def step(self): """ Method invoked by the Model scheduler in each step. Evaluate if appropriate and, if so, perform: A change of state, a movement or advance in the cost of a movement, or an advance in the performance of an activity. """ if self.markov == True or self.changeSchedule(): self.markov_machine.runStep(self.markovActivity[self.getPeriod()]) elif self.onMyWay1 == True: if self.costMovemenToNewRoom > 0: self.costMovemenToNewRoom = self.costMovemenToNewRoom - 1 else: room1 = self.model.getRoom(self.pos) room2 = self.model.getRoom(self.movements[self.N]) self.room1 = room1 self.room2 = room2 if room1.name.split(r".")[0] != room2.name.split(r".")[0]: self.model.openDoor(self, room1, room2) self.model.popAgentRoom(self, self.pos) self.model.grid.move_agent(self, self.movements[self.N]) self.model.pushAgentRoom(self, self.pos) self.N = self.N + 1 self.onMyWay1 = False self.onMyWay2 = True elif self.onMyWay2 == True: if self.costMovementInNewRoom > 0: self.costMovementInNewRoom = self.costMovementInNewRoom - 1 else: room1 = self.room1 room2 = self.room2 if room1.name.split(r".")[0] != room2.name.split(r".")[0]: self.model.closeDoor(self, room1, room2) self.onMyWay2 = False self.step() elif self.pos != self.pos_to_go: self.occupantMovePos(self.movements[self.N]) self.onMyWay1 = True self.step() else: self.N = 0 if self.time_activity > 0: self.time_activity = self.time_activity - 1 else: self.markov = True