import numpy as np
from random import choice, seed
from datetime import datetime, timedelta
[docs]class OccupancyGenerator:
"""
This class use the queueing system to generate the occupancy schedule
TODO: Add occupancy actions
"""
def __init__(self,
model,
num_occupant=10,
random_seed=None):
"""
This class contains multiple editable attributes to generate the occupancy schedule. Default setting includes:
Work shift: 9:00 ~ 17:00, where people start arriving/leaving 30 minutes earily.
Group meeting: 16:00, once per day, last average 15 minutes.
Lunch time: 12:00 ~ 13:00.
Call for absence probability: 1%.
Chat with colleague: average 30 minutes each.
Customer service time: average 30 minutes each.
Average number of guests per day: 3.
:parameter model: The ``COBS.Model`` class object as the target building model.
:parameter num_occupant: The number of long-term occupants belongs to the model.
:parameter random_seed: The seed for numpy and random module. None means no seed specified.
"""
if random_seed is not None:
seed(random_seed)
np.random.seed(random_seed)
self.start_work = 9 * 60 * 60 # Work start from 9:00. unit: second
self.end_work = 17 * 60 * 60 # Work end at 17:00. unit: second
self.daily_report = 16 * 60 * 60 # Daily progress report at 16:00, in meeting room
self.daily_report_mean = 15 * 60 # Daily progress report average length 15 min
self.daily_report_std = 1 * 60 # Daily progress report std.dev 1 min
self.come_leave_flex_coef = 30 * 60 # Tend to come 8:30, average arrive at 9:00. Leave is similar. Exponential distribution
self.call_for_absence = 0.01 # Possibility of not come to the office
self.lunch_start_time = 12 * 60 * 60 # Lunch serve start time 12:00. unit: second
self.lunch_end_time = 13 * 60 * 60 # Lunch serve end time 13:00. unit: second
self.eat_time_a = 10 # average time for each person to eat lunch. Beta distribution
self.eat_time_b = 50 # average time for each person to eat lunch. Beta distribution
self.cut_off_time = 14 * 60 * 60 # After this time, the person won't come to work
self.day_cut_off = 24 * 60 * 60
self.start_synthetic_data = datetime(2020, 3, 25) # start date
self.end_synthetic_data = datetime(2020, 3, 27) # end date
self.report_interval = timedelta(seconds=60) # Time interval between two consecutive package
self.guest_lambda = 3 # Poisson arrival for unknown customers. unit: person per day
self.visit_colleague = 3 # How many times a worker goes to a colleague's office
self.average_stay_in_colleague_office = 30 * 60
self.std_stay_in_colleague_office = 4 * 60
self.average_stay_customer = 30 * 60
self.std_stay_customer = 5 * 60
# TODO: Add zone trespass time
self.model = model
self.possible_locations = self.model.get_available_names_under_group("Zone")
self.work_zones = self.possible_locations[:]
self.zone_link = model.get_link_zones()
self.meeting_room = choice(self.possible_locations)
self.lunch_room = choice(self.possible_locations)
self.entry_zone = choice(list(self.zone_link["Outdoor"]))
self.possible_locations.insert(0, "Outdoor")
self.possible_locations.append("busy")
self.work_zones.remove(self.lunch_room)
if self.meeting_room != self.lunch_room:
self.work_zones.remove(self.meeting_room)
self.worker_assign = [Person(self, office=choice(self.work_zones)) for _ in range(num_occupant)]
# value = (np.random.beta(eat_time_a, eat_time_b, 10000) + 0.1) * 100
[docs] def get_path(self, start, end):
"""
Use BFS to find the shortest path between two zones.
:parameter start: The entry of the start zone.
:parameter end: The entry of the target zone.
:return: A list of zone names that the occupant need to cross.
"""
queue = [(start, [start])]
visited = set()
while queue:
vertex, path = queue.pop(0)
visited.add(vertex)
for node in self.zone_link[vertex]:
if node == end:
return path + [end]
else:
if node not in visited:
visited.add(node)
queue.append((node, path + [node]))
return [start]
[docs] def generate_all_people_daily_movement(self):
"""
Generate a list of ``Person`` objects and simulate the movement for each person.
:return: list of ``Person`` objects.
"""
available_worker = list()
for i, worker in enumerate(self.worker_assign):
if worker.decide_come():
available_worker.append(i)
# print(available_worker)
guests = np.random.poisson(self.guest_lambda)
guest_assign = np.random.choice(available_worker, size=guests)
all_people = list()
guest_counter = 0
for i in available_worker:
worker = self.worker_assign[i]
all_people.append(worker)
guest_list = np.random.randint(1, 4, size=np.sum(guest_assign == i))
appointments = worker.generate_daily_route(guest_list)
for j, appointment in enumerate(appointments):
for _ in range(guest_list[j]):
new_guest = Person(self)
guest_counter += 1
new_guest.customer_come(*appointment)
all_people.append(new_guest)
return all_people
[docs] def generate_daily_schedule(self, add_to_model=True, overwrite_dict=None):
"""
Generate a numpy matrix contains the locations of all occupants in the day and add tp the model.
:parameter add_to_model: Default is True. If False, then only generate the schedule in numpy and IDF format but not save to the model automatically.
:parameter overwrite_dict: Default is None. If set to a dict with {zone_name: old_people_object_name}, it will overwrite existing People instead of creating a new one
:return: Three objects, (IDF format schedule, numpy format schedule, list of all accessble locations in the building).
"""
all_zones = self.model.get_available_names_under_group("Zone")
valid_zones = list()
for zone in all_zones:
if zone in self.possible_locations:
valid_zones.append(zone)
all_people = self.generate_all_people_daily_movement()
locations = list()
for person in all_people:
locations.append(person.position.copy())
if person.office is not None:
locations[-1][locations[-1] == self.possible_locations.index('busy')] = \
self.possible_locations.index(person.office)
location_matrix = np.vstack(locations)
all_commands = list()
if add_to_model:
activity_values = {"Name": "Test_Activity_Schedule",
"Schedule Type Limits Name": "Any Number",
"Field 1": "Through:12/31",
"Field 2": "For: Alldays",
"Field 3": "Until 24:00",
"Field 4": "200"}
work_efficiency = {"Name": "Test_Work_Schedule",
"Schedule Type Limits Name": "Fraction",
"Field 1": "Through:12/31",
"Field 2": "For: Alldays",
"Field 3": "Until 24:00",
"Field 4": "0.1"}
cloth_schedule = {"Name": "Test_Cloth_Schedule",
"Schedule Type Limits Name": "Fraction",
"Field 1": "Through:12/31",
"Field 2": "For: Alldays",
"Field 3": "Until 24:00",
"Field 4": "0.9"}
air_velocity = {"Name": "Test_Air_Velocity",
"Schedule Type Limits Name": "Fraction",
"Field 1": "Through:12/31",
"Field 2": "For: Alldays",
"Field 3": "Until 24:00",
"Field 4": "0.25"}
self.model.add_configuration("Schedule:Compact", values=activity_values)
self.model.add_configuration("Schedule:Compact", values=work_efficiency)
self.model.add_configuration("Schedule:Compact", values=cloth_schedule)
self.model.add_configuration("Schedule:Compact", values=air_velocity)
self.model.add_configuration("Output:Variable", values={"Variable Name": "Zone People Occupant Count",
"Reporting_Frequency": "timestep"})
self.model.add_configuration("Output:Variable",
values={"Variable Name": "Zone Thermal Comfort Fanger Model PMV",
"Reporting_Frequency": "timestep"})
zone_occupancy = np.zeros((len(self.possible_locations), 24 * 60))
for zone in valid_zones:
i = self.possible_locations.index(zone)
occupancy = np.sum(location_matrix == i, axis=0)
result_command = {"Name": f"Generated_Schedule_Zone_{zone}",
"Schedule Type Limits Name": "Any Number",
"Field 1": "Through: 12/31",
"Field 2": "For: Weekdays"}
counter = 3
for t in range(1, 24 * 60 + 1):
zone_occupancy[i, t - 1] = occupancy[t * 60 - 1]
if t != 24 * 60 and occupancy[(t + 1) * 60 - 1] == occupancy[t * 60 - 1]:
continue
hour = t // 60
min = t % 60
result_command[f"Field {counter}"] = f"Until {hour:02d}:{min:02d}"
result_command[f"Field {counter + 1}"] = f"{occupancy[t * 60 - 1]}"
counter += 2
all_commands.append(result_command)
if add_to_model:
self.model.add_configuration("Schedule:Compact", values=result_command)
if overwrite_dict is not None and zone in overwrite_dict:
self.model.edit_configuration("People",
{"Name": overwrite_dict[zone]},
{"Number of People Schedule Name": f"Generated_Schedule_Zone_{zone}"})
else:
people_values = {"Name": f"Test_Zone_{zone}",
"Zone or ZoneList Name": zone,
"Number of People Schedule Name": f"Generated_Schedule_Zone_{zone}",
"Number of People": location_matrix.shape[0],
"Activity Level Schedule Name": "Test_Activity_Schedule",
"Work Efficiency Schedule Name": "Test_Work_Schedule",
"Clothing Insulation Schedule Name": "Test_Cloth_Schedule",
"Air Velocity Schedule Name": "Test_Air_Velocity",
"Thermal Comfort Model 1 Type": "Fanger"}
self.model.add_configuration("People", values=people_values)
return all_commands, location_matrix, zone_occupancy, self.possible_locations
[docs] def save_light_config(self, output_name=None):
if self.light_config is None:
self.initialize_light_config()
if output_name is None:
output_name = "light_config.json"
with open(output_name, 'w') as output_file:
json.dump(self.light_config, output_file)
[docs] def initialize_light_config(self):
zone_lights = self.model.get_lights()
self.light_config = dict()
print(self.model.get_windows())
for zone in zone_lights:
self.light_config[zone] = list()
for light in zone_lights[zone]:
self.light_config[zone].append({"name": light,
"probability": 1,
"condition": {zone_name: {"occupancy > 0": 1,
"occupancy == 0": 1}
for zone_name in self.possible_locations}})
[docs] def generate_light(self, input_name=None):
pass
[docs]class Person:
"""
This class contains the detail location of a single occupant.
"""
def __init__(self, generator, office=None):
"""
Each long-term occupant will have an office, and he tend to stay in office more than other places.
:parameter generator: The OccupancyGenerator which provides the settings.
:parameter office: The designated office for long-term occupants.
"""
self.office = office
self.position = np.zeros(generator.day_cut_off)
self.source = generator
[docs] def customer_come(self, start_time, end_time, dest):
"""
Simulate the event of customers coming for the current occupant.
:parameter start_time: The scheduled appointment start time (not the real start time).
:parameter end_time: The scheduled appointment end time (not the real end time).
:parameter dest: The appointment location (zone entry).
:return: None
"""
pass_zones = self.source.get_path(self.source.entry_zone, dest)
zone_move_timer = list()
# real_start_time = start_time - int(np.random.exponential(5 * 60)) # Come eariler than expected
zone_move_timer.append(start_time - int(np.random.exponential(5 * 60))) # Come eariler than expected
# decide the time takes from Room_1_1_150 door to the meeting room
# TODO: Trespass time
temp_timer = start_time
for _ in pass_zones[1:]:
temp_timer = temp_timer - 3 + get_white_bias(1)
zone_move_timer.insert(1, temp_timer)
temp_timer = end_time
for _ in pass_zones:
zone_move_timer.append(temp_timer)
temp_timer = temp_timer + 3 + get_white_bias(1)
# Apply to the daily route
for i in range(len(zone_move_timer) - 1):
self.position[zone_move_timer[i]:zone_move_timer[i + 1]] = \
self.source.possible_locations.index(pass_zones[len(pass_zones) - abs(i - len(pass_zones) + 1) - 1])
[docs] def decide_come(self):
"""
Each person need to decide if he/she will come to work today, when exactly they come, and when exactly
they leave. We assume people start to come at 8:30 am and leave at 5 pm, with a poisson arrival lambda = 30 min.
Notice that we simulate this as poisson arrival, which means two arrivals are not independent.
:return: True if come to work, False otherwise
"""
self.position = np.zeros(self.source.day_cut_off)
# Decide absence
if np.random.random() < self.source.call_for_absence:
return False
else:
# Decide when come to office
arrival_time = (self.source.start_work - self.source.come_leave_flex_coef) + \
int(np.random.exponential(self.source.come_leave_flex_coef))
if arrival_time > self.source.cut_off_time:
return False
else:
# Decide when go back home
leave_time = self.source.end_work + int(np.random.exponential(self.source.come_leave_flex_coef))
if leave_time >= self.source.day_cut_off:
leave_time = self.source.day_cut_off - 1
pass_zones = self.source.get_path(self.source.entry_zone, self.office)
zone_move_timer = list()
# TODO: Trespass time
temp_timer = arrival_time
for _ in pass_zones:
zone_move_timer.append(temp_timer)
temp_timer = temp_timer + 3 + get_white_bias(1)
temp_timer = leave_time
for _ in pass_zones:
zone_move_timer.insert(len(pass_zones), temp_timer)
temp_timer = temp_timer - 3 + get_white_bias(1)
# Apply to the daily route
for i in range(len(zone_move_timer) - 1):
self.position[zone_move_timer[i]:zone_move_timer[i + 1]] = \
self.source.possible_locations.index(
pass_zones[len(pass_zones) - abs(i - len(pass_zones) + 1) - 1])
return True
[docs] def generate_lunch(self):
"""
Generate the time that current occupant go to the cafeteria and take the lunch.
:return: None
"""
# Usually go for lunch immediately, with average delay of 5 minute
lunch_delay = int(np.random.exponential(5 * 60))
lunch_delay = max(lunch_delay, 20 * 60)
pass_zones = self.source.get_path(self.office, self.source.lunch_room)
pass_zones.pop(0)
zone_move_timer = [self.source.lunch_start_time]
# TODO: Trespass time
temp_timer = self.source.lunch_start_time + lunch_delay
for _ in pass_zones[:-1]:
temp_timer = temp_timer + 3 + get_white_bias(1)
zone_move_timer.append(temp_timer)
zone_move_timer.append(temp_timer +
int((np.random.beta(self.source.eat_time_a, self.source.eat_time_b) + 0.1) * 6000))
temp_timer = zone_move_timer[-1]
for _ in pass_zones[:-1]:
temp_timer = temp_timer + 3 + get_white_bias(1)
zone_move_timer.append(temp_timer)
# Apply to the daily route
for i in range(len(zone_move_timer) - 1):
self.position[zone_move_timer[i]:zone_move_timer[i + 1]] = \
self.source.possible_locations.index(
pass_zones[len(pass_zones) - abs(i - len(pass_zones) + 1) - 1])
[docs] def generate_daily_meeting(self):
"""
Generate the time that current occupant go to the daily meeting.
:return: None
"""
# Arrive maximum 3 min early, 2 min late
meeting_attend = int(np.random.exponential(3 * 60))
meeting_attend = self.source.daily_report - max(meeting_attend, 5 * 60)
pass_zones = self.source.get_path(self.office, self.source.meeting_room)
pass_zones.pop(0)
zone_move_timer = [meeting_attend]
# TODO: Trespass time
temp_timer = meeting_attend
for _ in pass_zones[:-1]:
temp_timer = temp_timer - 3 + get_white_bias(1)
zone_move_timer.insert(0, temp_timer)
zone_move_timer.append(self.source.daily_report +
int(np.random.normal(self.source.daily_report_mean, self.source.daily_report_std)))
temp_timer = zone_move_timer[-1]
for _ in pass_zones[:-1]:
temp_timer = temp_timer + 3 + get_white_bias(1)
zone_move_timer.append(temp_timer)
# Apply to the daily route
for i in range(len(zone_move_timer) - 1):
self.position[zone_move_timer[i]:zone_move_timer[i + 1]] = \
self.source.possible_locations.index(
pass_zones[len(pass_zones) - abs(i - len(pass_zones) + 1) - 1])
[docs] def check_in_office(self, start, end):
"""
Determine if the occupant is in his/her office or not during a given period of time.
:parameter start: The start time.
:parameter end: The end time.
:return: Return True if the occupant is in his/her office between given time, and False otherwise.
"""
return np.sum(self.position[start:end] == self.source.possible_locations.index(self.office)) == (end - start)
[docs] def get_in_office_range(self):
"""
Find all times that the occupant is in his/her office.
:return: list of timeslots that the occupant is in the office
"""
in_office = np.concatenate(([0],
np.equal(self.position,
self.source.possible_locations.index(self.office)).view(np.int8),
[0]))
absdiff = np.abs(np.diff(in_office))
# Runs start and end where absdiff is 1.
ranges = np.where(absdiff == 1)[0].reshape(-1, 2)
return ranges
[docs] def handle_customer(self, num_customer):
"""
Set up an appointment for occupant with some new customers.
:parameter num_customer: Number of customers in total today will come.
:return: tuple of (appointment start time, appointment end time, appointment location).
"""
# Set-up meeting time
in_office_range = self.get_in_office_range()
visit_length = int(np.random.normal(self.source.average_stay_customer, self.source.std_stay_customer))
in_office_duration = in_office_range[:, 1] - in_office_range[:, 0]
in_office_idx = np.nonzero(in_office_duration > visit_length)[0]
if len(in_office_idx) == 0:
visit_length = np.max(in_office_duration)
in_office_idx = np.nonzero(in_office_duration == visit_length)[0]
idx = np.random.choice(in_office_idx)
start_time = np.random.randint(in_office_range[idx, 0], in_office_range[idx, 1] - visit_length + 1)
end_time = start_time + visit_length
in_room = start_time + 10 + get_white_bias(1)
out_room = end_time - 10 + get_white_bias(1)
# Decide meeting location
if num_customer > 1:
# Go meet in meeting room
room_name = self.source.meeting_room
self.position[in_room:out_room] = self.source.possible_locations.index(self.source.meeting_room)
else:
self.position[in_room:out_room] = self.source.possible_locations.index("busy")
room_name = self.office
return in_room, out_room, room_name
[docs] def generate_go_other_office(self):
"""
Generate the event of visiting colleagues' office for random talk. Only possible if the colleague is in the office.
:return: None.
"""
for _ in range(np.random.poisson(self.source.visit_colleague)):
# Find available time for current person to meet some colleague
in_office_range = self.get_in_office_range()
visit_length = int(np.random.normal(self.source.average_stay_in_colleague_office,
self.source.std_stay_in_colleague_office))
in_office_idx = np.nonzero((in_office_range[:, 1] - in_office_range[:, 0]) > visit_length)[0]
if len(in_office_idx) == 0:
continue
idx = np.random.choice(in_office_idx)
start_time = np.random.randint(in_office_range[idx, 0], in_office_range[idx, 1] - visit_length + 1)
end_time = start_time + visit_length
# Find available colleague
for coworker in self.source.worker_assign:
if coworker.check_in_office(start_time, end_time):
# Go meet the colleague
in_colleague = start_time + 10 + get_white_bias(1)
out_colleague = end_time - 10 + get_white_bias(1)
self.position[in_colleague:out_colleague] = self.source.possible_locations.index(coworker.office)
coworker.position[in_colleague:out_colleague] = self.source.possible_locations.index("busy")
break
[docs] def generate_daily_route(self, customer_list):
"""
Generate the whole day locations for the occupant.
:parameter customer_list: List of Person that will visit the occupant today.
:return: List of appointment times.
"""
time_list = list()
self.generate_lunch()
self.generate_daily_meeting()
for num_customer in customer_list:
time_list.append(self.handle_customer(num_customer))
self.generate_go_other_office()
return time_list
[docs] def get_position(self, sec):
"""
Get the location of the occupant at the given time
:parameter sec: The time that need to check.
:return: The zone entry of the location at the given time.
"""
if self.position[sec] == self.source.possible_locations.index("busy"):
return self.office
return self.source.possible_locations[int(self.position[sec])]
[docs] def get_trigger(self):
pass
[docs]def get_white_bias(second):
"""
Generate a bias.
:parameter second: Value range.
:return: Bias.
"""
return np.random.randint(second * 2 + 1) - second
[docs]def main():
all_people = generate_daily_data()
for person in all_people:
print(person.position.size)
# print(list(person.position))
# current = start_synthetic_data
# all_people = list()
# results = dict()
# for _ in range(int((end_synthetic_data - start_synthetic_data) / report_interval)):
# # print(current)
# results[str(current)[-8:].replace(':', '_')] = dict()
# if current.hour + current.minute + current.second == 0:
# # Generate a whole day data
# all_people = generate_daily_data()
# # for person in all_people:
# # print(person.get_position(current.second + 60 * current.minute + 60 * 60 * current.hour))
#
# current += report_interval
#
# if current.hour + current.minute + current.second == 0:
# time_str = str(current)[:10].replace(' ', '_').replace('-', '_')
# with open(f"output/{time_str}", 'w') as json_out:
# json.dump(results, json_out)
# results = dict()
if __name__ == '__main__':
main()