Source code for cobs.model

import sys

from cobs import EventQueue, ReplayBuffer
from eppy.modeleditor import IDF
from eppy.bunch_subclass import BadEPFieldError
from multiprocessing import Event, Pipe, Process
import os
from datetime import datetime, timedelta
from cobs.state_modifier import StateModifier
from calendar import isleap


[docs]class Agent: """ Dummy agent as a placeholder. No meaning """ def __init__(self): pass
[docs]class Reward: """ Dummy reward as a placeholder. No meaning """ def __init__(self): pass
[docs]class Model: """ The environment class. """ model_import_flag = False
[docs] @classmethod def set_energyplus_folder(cls, path): """ Add the pyenergyplus into the path so the program can find the EnergyPlus. :parameter path: The installation path of the EnergyPlus 9.3.0. :type path: str :return: None """ sys.path.insert(0, path) IDF.setiddname(f"{path}Energy+.idd") cls.model_import_flag = True
def __init__(self, idf_file_name: str = None, prototype: str = None, climate_zone: str = None, weather_file: str = None, heating_type: str = None, foundation_type: str = None, agent: Agent = None, reward=None, eplus_naming_dict=None, eplus_var_types=None, buffer_capacity=None, buffer_seed=None, buffer_chkpt_dir=None, tmp_idf_path=None): """ Initialize the building by loading the IDF file to the model. :parameter idf_file_name: The relative path to the IDF file. Use it if you want to use your own model. :parameter prototype: Either "multi" and "single", indicates the Multi-family low-rise apartment building and Single-family detached house. :parameter climate_zone: The climate zone code of the building. Please refer to https://codes.iccsafe.org/content/iecc2018/chapter-3-ce-general-requirements. :parameter weather_file: The relative path to the weather file associate with the building. :parameter heating_type: Select one from "electric", "gas", "oil", and "pump" :parameter foundation_type: Select one from "crawspace", "heated", "slab", and "unheated" :parameter agent: The user-defined Agent class object if the agent is implemented in a class. :parameter reward: The user-defined reward class object that contains a reward(state, actions) method. :parameter eplus_naming_dict: A dictionary map the state variable name to some specified names. :parameter eplus_var_types: A dictionary contains the state name and the state source location. :parameter buffer_capacity: The maximum number of historical state, action, new_state pair store in the buffer. :parameter buffer_seed: The random seed when sample from the buffer. :parameter buffer_chkpt_dir: The location of the buffer checkpoint should save. """ if not Model.model_import_flag: raise ImportError("You have to set the energyplus folder first") self.api = None self.current_state = dict() self.idf = None self.occupancy = None self.run_parameters = None self.queue = EventQueue() self.agent = agent self.ignore_list = set() self.zone_names = None self.thermal_names = None self.counter = 0 self.replay = ReplayBuffer(buffer_capacity, buffer_seed, buffer_chkpt_dir) self.warmup_complete = False self.terminate = False self.wait_for_step = Event() self.wait_for_state = Event() self.parent, self.child_energy = Pipe(duplex=True) self.child = None self.use_lock = False self.reward = reward self.eplus_naming_dict = eplus_naming_dict self.eplus_var_types = eplus_var_types self.prev_reward = None self.total_timestep = -1 self.leap_weather = False self.state_modifier = StateModifier() # TODO: Validate input parameters if idf_file_name is None: idf_file_name = f"./buildings/{prototype}_{climate_zone}_{heating_type}_{foundation_type}.idf" if weather_file is None: weather_file = f"./weathers/{climate_zone}.epw" if tmp_idf_path is None: self.input_idf = "input.idf" else: self.input_idf = os.path.join(tmp_idf_path, "input.idf") self.run_parameters = ["-d", "result", self.input_idf] if weather_file: self.run_parameters = ["-w", weather_file] + self.run_parameters with open(weather_file, 'r') as w_file: for line in w_file: line = line.split(',') if len(line) > 3 and line[0].upper() == "HOLIDAYS/DAYLIGHT SAVINGS": self.leap_weather = True if line[1].upper() == "YES" else False break try: self.idf = IDF(idf_file_name) except: raise ValueError("IDF file is damaged or not match with your EnergyPlus version.")
[docs] @staticmethod def name_reformat(name): """ Convert the entry from the space separated entry to the underline separated entry to match the IDF. :parameter name: The space separated entry. :return: The underline separated entry. """ name = name.replace(' ', '_').split('_') return '_'.join([word for word in name])
[docs] def list_all_available_configurations(self): """ Generate a list of all type of components appeared in the current building. :return: list of components entry. """ return list(self.idf.idfobjects.keys())
[docs] def get_all_configurations(self): """ Read the IDF file, and return the content formatted with the IDD file. :return: the full IDF file with names and comments aside. """ return self.idf.idfobjects
[docs] def get_sub_configuration(self, idf_header_name: str): """ Show all available settings for the given type of component. :parameter idf_header_name: The type of the component. :return: List of settings entry. """ idf_header_name = idf_header_name.upper() if not self.idf.idfobjects.get(idf_header_name): raise KeyError(f"No {idf_header_name} section in current IDF file") return self.idf.idfobjects[idf_header_name][0].fieldnames
[docs] def get_available_names_under_group(self, idf_header_name: str): """ Given the type of components, find all available components in the building by their entry. :parameter idf_header_name: The type of the component. :return: List of names. """ idf_header_name = idf_header_name.upper() available_names = self.get_sub_configuration(idf_header_name) if "Name" in available_names: return [entry["Name"] for entry in self.idf.idfobjects[idf_header_name]] else: for field_name in available_names: if "name" in field_name.lower(): return [entry[field_name] for entry in self.idf.idfobjects[idf_header_name]] raise KeyError(f"No entry field available for {idf_header_name}")
[docs] def get_configuration(self, idf_header_name: str, component_name: str = None): """ Given the type of component, the entry of the target component, find the settings of that component. :parameter idf_header_name: The type of the component. :parameter component_name: The entry of the component. :return: Settings of this component. """ idf_header_name = idf_header_name.upper() if component_name is None: return self.idf.idfobjects[idf_header_name] else: names = self.get_available_names_under_group(idf_header_name) if component_name in names: return self.idf.idfobjects[idf_header_name][names.index(component_name)] else: raise KeyError(f"Failed to locate {component_name} in {idf_header_name}")
[docs] def get_value_range(self, idf_header_name: str, field_name: str, validate: bool = False): """ Get the range of acceptable values of the specific setting. :parameter idf_header_name: The type of the component. :parameter field_name: The setting entry. :parameter validate: Set to True to check the current value is valid or not. :return: Validation result or the range of all acceptable values retrieved from the IDD file. """ idf_header_name = idf_header_name.upper() field_name = field_name.replace(' ', '_') if field_name not in self.get_sub_configuration(idf_header_name): raise KeyError(f"Failed to locate {field_name} in {idf_header_name}") if validate: return self.idf.idfobjects[idf_header_name][0].checkrange(field_name) else: return self.idf.idfobjects[idf_header_name][0].getrange(field_name)
[docs] def add_configuration(self, idf_header_name: str, values: dict = None): """ Create and add a new component into the building model with the specific type and setting values. :parameter idf_header_name: The type of the component. :parameter values: A dictionary map the setting entry and the setting value. :return: The new component. """ idf_header_name = idf_header_name.upper() object = self.idf.newidfobject(idf_header_name.upper()) if values is None: return object for key, value in values.items(): key = Model.name_reformat(key) if isinstance(value, (int, float)): exec(f"object.{key} = {value}") else: exec(f"object.{key} = '{value}'") return object
[docs] def delete_configuration(self, idf_header_name: str, component_name: str = None): """ Delete an existing component from the building model. :parameter idf_header_name: The type of the component. :parameter component_name: The entry of the component. :return: None. """ idf_header_name = idf_header_name.upper() if not self.idf.idfobjects.get(idf_header_name): raise KeyError(f"No {idf_header_name} section in current IDF file") if component_name is None: while len(self.idf.idfobjects[idf_header_name]): self.idf.popidfobject(idf_header_name, 0) else: names = self.get_available_names_under_group(idf_header_name) if component_name in names: self.idf.popidfobject(idf_header_name, names.index(component_name)) else: raise KeyError(f"Failed to locate {component_name} in {idf_header_name}")
[docs] def edit_configuration(self, idf_header_name: str, identifier: dict, update_values: dict): """ Edit an existing component in the building model. :parameter idf_header_name: The type of the component. :parameter identifier: A dictionary map the setting entry and the setting value to locate the target component. :parameter update_values: A dictionary map the setting entry and the setting value that needs to update. :return: None. """ idf_header_name = idf_header_name.upper() if not self.idf.idfobjects.get(idf_header_name): raise KeyError(f"No {idf_header_name} section in current IDF file") fields = self.get_sub_configuration(idf_header_name) for entry in self.idf.idfobjects[idf_header_name]: valid = True for key, value in identifier.items(): key = Model.name_reformat(key) if key in fields: if entry[key] != value: valid = False if valid: for key, value in update_values.items(): key = Model.name_reformat(key) if isinstance(value, (int, float)): exec(f"entry.{key} = {value}") else: exec(f"entry.{key} = '{value}'")
def _get_thermal_names(self): """ Initialize all available thermal zones. :return: None """ people_zones = self.get_configuration("People") self.thermal_names = dict() for zone in people_zones: try: if zone[Model.name_reformat("Thermal Comfort Model 1 Type")]: self.thermal_names[zone["Name"]] = zone[Model.name_reformat("Zone or ZoneList Name")] except BadEPFieldError: pass
[docs] def save_idf_file(self, path: str): """ Save the modified building model for EnergyPlus simulation :parameter path: The relative path to the modified IDF file. :return: None. """ self.idf.saveas(path)
def _initialization(self): """ Initialize the EnergyPlus simulation by letting the EnergyPlus finish the warmup. :return: None """ if not self.api.exchange.api_data_fully_ready(): return self.warmup_complete = True def _generate_output_files(self): """ Assert errors to terminate the simulation after the warmup in order to generate the EDD file to list all available actions for the current building. :return: None """ assert False def _step_callback(self): """ Get the state value at each timestep, and modify the building model based on the actions from the ``EventQueue``. :return: None """ # print("Child: Not ready") if not self.api.exchange.api_data_fully_ready() or not self.warmup_complete: return current_state = dict() # print("Child: Simulating") current_state["timestep"] = self.counter # print(self.get_date()) current_state["time"] = self.get_date() current_state["temperature"] = dict() current_state["occupancy"] = dict() current_state["terminate"] = self.total_timestep == self.counter # if self.occupancy is not None: # current_state["occupancy"] = {zone: value[self.counter] for zone, value in self.occupancy.items()} for name in self.zone_names: handle = self.api.exchange.get_variable_handle("Zone People Occupant Count", name) if handle == -1: continue current_state["occupancy"][name] = self.api.exchange.get_variable_value(handle) for name in self.zone_names: handle = self.api.exchange.get_variable_handle("Zone Air Temperature", name) if handle == -1: continue # print("Child: Simulating 2") current_state["temperature"][name] = self.api.exchange.get_variable_value(handle) handle = self.api.exchange.get_meter_handle("Heating:EnergyTransfer") current_state["energy"] = self.api.exchange.get_meter_value(handle) if self.reward is not None: current_state["reward"] = self.prev_reward # print("Child: Simulating 1") if "Zone Thermal Comfort Fanger Model PMV" in self.get_available_names_under_group("Output:Variable"): current_state["PMV"] = dict() for zone in self.thermal_names: handle = self.api.exchange.get_variable_handle("Zone Thermal Comfort Fanger Model PMV", zone) if handle == -1: continue current_state["PMV"][self.thermal_names[zone]] = self.api.exchange.get_variable_value(handle) # Add for temp extra output if self.eplus_naming_dict is not None: for entry in self.idf.idfobjects['OUTPUT:VARIABLE']: # we only care about the output vars for Gnu-RL if (entry['Variable_Name'], entry['Key_Value']) in self.eplus_naming_dict.keys(): var_name = entry['Variable_Name'] # if the key value is not associated with a zone return None for variable handler # key_val = entry['Key_Value'] if entry['Key_Value'] != '*' else None if entry['Key_Value'] == '*': key_val = self.eplus_var_types[var_name] else: key_val = entry['Key_Value'] handle = self.api.exchange.get_variable_handle(var_name, key_val) if handle == -1: continue # name the state value based on Gnu-RL paper key = self.eplus_naming_dict.get((var_name, entry['Key_Value'])) current_state[key] = self.api.exchange.get_variable_value(handle) self.state_modifier.get_update_states(current_state, self) # current_state.update(update_dict) # print(current_state) if self.use_lock: # print("Child: Sending current states") self.child_energy.send(current_state) self.wait_for_state.set() # Take all actions self.wait_for_step.clear() # print("Child: Waiting for actions") if not self.child_energy.poll(): self.wait_for_step.wait() # print("Child: Receiving actions") events = self.child_energy.recv() else: # print(self.current_state) if self.counter != 0: self.replay.push(self.current_state, self.queue.get_event(self.current_state["timestep"]), current_state, current_state["terminate"]) self.current_state = current_state # self.historical_values.append(self.current_state) events = self.queue.trigger(self.counter) self.counter += 1 # Trigger modifiers # for modifier in self.modifier: # modifier.update(current_state) # print("Child: executing actions") # print("Child: Printing Reward") # Calculate Reward if self.reward is not None: self.prev_reward = self.reward.reward(current_state, events) # Trigger events for key in events["actuator"]: component_type, control_type, actuator_key = key.split("|*|") value = events["actuator"][key][1] handle = self.api.exchange.get_actuator_handle(component_type, control_type, actuator_key) if handle == -1: raise ValueError('Actuator handle could not be found: ', component_type, control_type, actuator_key) self.api.exchange.set_actuator_value(handle, value) for key in events["global"]: var_name = key value = events["global"][key][1] handle = self.api.exchange.get_global_handle(var_name) if handle == -1: raise ValueError('Actuator handle could not be found: ', component_type, control_type, actuator_key) self.api.exchange.set_global_value(handle, value) # if self.use_lock: # # wait for next call of step # self.wait_for_step.clear() # self.wait_for_step.wait() # else: if not self.use_lock and self.agent: self.agent.step(self.current_state, self.queue, self.counter - 1)
[docs] def step(self, action_list=None): """ Add all actions into the ``EventQueue``, and then generate the state value of the next timestep. :parameter action_list: list of dictionarys contains the arguments for ``EventQueue.schedule_event()``. :return: The state value of the current timestep. """ if action_list is not None: for action in action_list: self.queue.schedule_event(**action) # print("Parent: Sending actions") self.parent.send(self.queue.trigger(self.counter)) self.counter += 1 # Let process grab and execute actions # print("Parent: Releasing child's lock") self.wait_for_state.clear() self.wait_for_step.set() # print("Parent: Waiting for state values") if not self.parent.poll(): self.wait_for_state.wait() self.wait_for_state.clear() current_state = self.parent.recv() # if isinstance(current_state, dict): self.replay.push(self.current_state, self.queue.get_event(self.current_state["timestep"]), current_state, current_state["terminate"]) self.current_state = current_state # self.historical_values.append(self.current_state) if current_state["terminate"]: self.terminate = True self.parent.send(self.queue.trigger(self.counter)) self.wait_for_step.set() self.child.join() # self.replay.terminate() self.wait_for_state.clear() # print("Parent: received state values") return self.current_state
[docs] def is_terminate(self): """ Determine if the simulation is finished or not. :return: True if the simulation is done, and False otherwise. """ return self.terminate
[docs] def reset(self): """ Clear the actions and buffer, reset the environment and start the simulation. :return: The initial state of the simulation. """ self._init_simulation() self.counter = 0 self.total_timestep = self.get_total_timestep() - 1 self.queue = EventQueue() self.replay.reset() self.ignore_list = set() self.wait_for_state.clear() self.wait_for_step.clear() self.terminate = False self.use_lock = True self.parent, self.child_energy = Pipe(duplex=True) self.child = Process(target=self.simulate) self.child.start() # print("Waiting") if not self.parent.poll(): self.wait_for_state.wait() self.current_state = self.parent.recv() # self.historical_values.append(self.current_state) self.wait_for_state.clear() return self.current_state
[docs] def get_total_timestep(self): if "-w" not in self.run_parameters: return self.get_configuration("Timestep")[0].Number_of_Timesteps_per_Hour * 24 * 8 elif len(self.get_configuration("RunPeriod")) == 0: raise ValueError("Your IDF files does not specify the run period." "Please manually edit the IDF file or use Model().set_runperiod(...)") run_period = self.get_configuration("RunPeriod")[0] start = datetime(year=run_period.Begin_Year if run_period.Begin_Year else 2000, month=run_period.Begin_Month, day=run_period.Begin_Day_of_Month) end = datetime(year=run_period.End_Year if run_period.End_Year else start.year, month=run_period.End_Month, day=run_period.End_Day_of_Month) end += timedelta(days=1) if not self.leap_weather: offset = 0 for year in range(start.year, end.year + 1): if isleap(year) and datetime(year, 2, 29) > start and datetime(year, 2, 29) < end: offset += 1 end -= timedelta(days=offset) timestep = self.get_configuration("Timestep")[0].Number_of_Timesteps_per_Hour if 60 % timestep != 0: timestep = 60 // round(60 / timestep) return int((end - start).total_seconds() // 3600 * timestep)
[docs] def simulate(self, terminate_after_warmup=False): """ Run the whole simulation once. If user use this method instead of the reset function, the user need to provide the Agent. :parameter terminate_after_warmup: True if the simulation should terminate after the warmup. :return: None. """ from pyenergyplus.api import EnergyPlusAPI self.replay.set_ignore(self.state_modifier.get_ignore_by_checkpoint()) if not self.use_lock: self._init_simulation() # for entry in self.zone_names: # print(entry) # self.current_handle["temperature"][entry] = \ # self.api.exchange.get_variable_handle("Zone Air Temperature", entry) # self.current_handle["temperature"] = self.api.exchange.get_variable_handle("SITE OUTDOOR AIR DRYBULB TEMPERATURE", "ENVIRONMENT") # self.current_handle["energy"] = self.api.exchange.get_meter_handle("Electricity:Facility") self.api = EnergyPlusAPI() if not terminate_after_warmup: self.api.runtime.callback_after_new_environment_warmup_complete(self._initialization) self.api.runtime.callback_begin_system_timestep_before_predictor(self._step_callback) else: self.api.runtime.callback_begin_new_environment(self._generate_output_files) self.api.runtime.run_energyplus(self.run_parameters)
# if self.use_lock: # self.child_energy.send("Terminated") # self.wait_for_state.set() # else: # self.replay.terminate() def _init_simulation(self): """ Save the modified building model and initialize the zones for states. :return: None. """ try: self.get_configuration("Output:Variable", "Zone People Occupant Count") except KeyError: self.add_configuration("Output:Variable", {"Key Value": '*', "Variable Name": "Zone People Occupant Count", "Reporting Frequency": "Timestep"}) self.idf.saveas(self.input_idf) self.use_lock = False self.zone_names = self.get_available_names_under_group("Zone") self._get_thermal_names() self.warmup_complete = False
[docs] def get_current_state_values(self): """ Find the current entries in the state. :return: List of entry names that is currently available in the state. """ state_values = list(set(self.get_possible_state_entries()) - self.ignore_list) state_values.sort() return state_values
[docs] def select_state_values(self, entry=None, index=None): """ Select interested state entries. If selected entry is not available for the current building, it will be ignored. :parameter entry: Entry names that the state of the environment should have. :parameter index: Index of all available entries that the state of the environment should have. :return: None. """ current_state = self.get_current_state_values() if entry is None: entry = list() elif isinstance(entry, str): entry = list(entry) if index is not None: if isinstance(index, int): index = [index] for i in index: if i < len(current_state): entry.append(current_state[i]) self.ignore_list = set(self.get_possible_state_entries()) - set(entry)
[docs] def add_state_values(self, entry): """ Add entries to the state. If selected entry is not available for the current building, it will be ignored. :parameter entry: Entry names that the state of the environment should have. :return: None. """ if not self.ignore_list: return if isinstance(entry, str): entry = [entry] self.ignore_list -= set(entry)
[docs] def remove_state_values(self, entry): """ Remove entries from the state. If selected entry is not available in the state, it will be ignored. :parameter entry: Entry names that the state of the environment should not have. :return: None. """ if isinstance(entry, str): entry = [entry] self.ignore_list = self.ignore_list.union(set(entry))
[docs] def pop_state_values(self, index): """ Remove entries from the state by its index. If selected index is not available in the state, it will be ignored. :parameter index: Entry index that the state of the environment should not have. :return: All entry names that is removed. """ current_state = self.get_current_state_values() pop_values = list() if isinstance(index, int): index = [index] for i in index: if i < len(current_state): self.ignore_list.add(current_state[i]) pop_values.append(current_state[i]) return pop_values
[docs] def get_possible_state_entries(self): """ Get all available state entries. This list of entries only depends on the building architecture. :return: List of available state entry names. """ output = self.get_available_names_under_group("Output:Variable") output.sort() return output
[docs] def get_possible_actions(self): """ Get all available actions that the user-defined agent can take. This list of actions only depends on the building architecture. :return: List of available actions in dictionaries. """ if not os.path.isfile("./result/eplusout.edd"): if not self.get_configuration("Output:EnergyManagementSystem"): self.add_configuration("Output:EnergyManagementSystem", values={"Actuator Availability Dictionary Reporting": "Verbose", "Internal Variable Availability Dictionary Reporting": "Verbose", "EMS Runtime Language Debug Output Level": "ErrorsOnly"}) try: self.simulate(terminate_after_warmup=True) except AssertionError: pass actions = list() with open("./result/eplusout.edd", 'r') as edd: for line in edd: line = line.strip() if len(line) == 0 or line[0] == '!': continue line = line.split(',') actions.append({"Component Type": line[2], "Control Type": line[3], "Actuator Key": line[1]}) return actions
[docs] def get_date(self): """ Get the current time in the simulation environment. :return: None """ year = self.api.exchange.year() month = self.api.exchange.month() day = self.api.exchange.day_of_month() hour = self.api.exchange.hour() minute = self.api.exchange.minutes() current_time = datetime(year, month, day, hour) + timedelta(minutes=minute) return current_time
[docs] def get_windows(self): """ Get the zone-window matching dictionary based on the IDF file. :return: A dictionary where key is the zone name, and the value is a set of window available in the zone. """ zone_window = {name: set() for name in self.get_available_names_under_group("Zone")} all_window = dict() for window in self.get_configuration("FenestrationSurface:Detailed"): if window.Surface_Type != "WINDOW": continue all_window[window.Building_Surface_Name] = window.Name for wall in self.get_configuration("BuildingSurface:Detailed"): if wall.Surface_Type != "WALL": continue if wall.Name in all_window: zone_window[wall.Zone_Name].add(all_window[wall.Name]) return zone_window
[docs] def get_doors(self): """ Get the zone-door matching dictionary based on the IDF file. :return: A dictionary where key is the zone name, and the value is a set of door available in the zone. """ zone_door = {name: set() for name in self.get_available_names_under_group("Zone")} all_door = dict() for door in self.get_configuration("FenestrationSurface:Detailed"): if door.Surface_Type != "GLASSDOOR": continue all_door[door.Building_Surface_Name] = door.Name for wall in self.get_configuration("BuildingSurface:Detailed"): if wall.Surface_Type != "WALL": continue if wall.Name in all_door: zone_door[wall.Zone_Name].add(all_door[wall.Name]) return zone_door
[docs] def get_lights(self): """ Get the zone-light matching dictionary based on the IDF file. :return: A dictionary where key is the zone name, and the value is a set of light available in the zone. """ zone_light = {name: set() for name in self.get_available_names_under_group("Zone")} for light in self.get_configuration("Lights"): zone_light[light.Zone_or_ZoneList_Name].add(light.Name) return zone_light
[docs] def get_blinds(self): """ Get the zone-blind matching dictionary based on the IDF file. :return: A dictionary where key is the zone name, and the value is a set of blind available in the zone. """ window_with_blinds = set() for shade in self.get_configuration("WindowShadingControl"): window_with_blinds.add(shade.Fenestration_Surface_1_Name) zone_blinds = self.get_windows() for zone in zone_blinds: zone_blinds[zone] = zone_blinds[zone].intersection(window_with_blinds) return zone_blinds
[docs] def set_blinds(self, windows, blind_material_name=None, shading_control_type='AlwaysOff', setpoint=50, agent_control=False): """ Install blinds that can be controlled on some given windows. :param windows: An iterable object that includes all windows' name that plan to install the blind. :param blind_material_name: The name of an existing blind in the IDF file as the blind for all windows. :param shading_control_type: Specify default EPlus control strategy (only works if control=False) :param setpoint: Specify default blind angle. :param agent_control: False if using a default EPlus control strategy or no control (ie blinds always off). True if using an external agent to control the blinds. :return: None """ if agent_control: shading_control_type = 'OnIfScheduleAllows' blind_material = None if blind_material_name: try: blind_material = self.get_configuration("WindowMaterial:Blind", blind_material_name) except KeyError: pass zone_window = self.get_windows() for zone in zone_window: for window in zone_window[zone]: if window in windows: window_idf = self.get_configuration("FenestrationSurface:Detailed", window) if blind_material is None: blind = {"Name": f"{window}_blind", "Slat Orientation": "Horizontal", "Slat Width": 0.025, "Slat Separation": 0.01875, "Front Side Slat Beam Solar Reflectance": 0.8, "Back Side Slat Beam Solar Reflectance": 0.8, "Front Side Slat Diffuse Solar Reflectance": 0.8, "Back Side Slat Diffuse Solar Reflectance": 0.8, "Slat Beam Visible Transmittance": 0.0} blind_mat = self.add_configuration("WindowMaterial:Blind", values=blind) else: blind_mat = self.idf.copyidfobject(blind_material) blind_mat.Name = blind_mat.Name + f" {window}" shading = {"Name": f"{window}_blind_shading", "Zone Name": zone, "Shading Type": "InteriorBlind", "Shading Device Material Name": f"{blind_mat.Name}", "Shading Control Type": shading_control_type, "Setpoint": setpoint, "Type of Slat Angle Control for Blinds": "ScheduledSlatAngle", "Fenestration Surface 1 Name": window_idf.Name} if agent_control: shading["Slat Angle Schedule Name"] = f"{window}_shading_schedule" shading["Multiple Surface Control Type"] = "Group" shading["Shading Control Is Scheduled"] = "Yes" angle_schedule = {"Name": f"{window}_shading_schedule", "Schedule Type Limits Name": "Angle", "Hourly Value": 45} self.add_configuration("Schedule:Constant", values=angle_schedule) self.add_configuration("WindowShadingControl", values=shading)
[docs] def set_occupancy(self, occupancy, locations): """ Include the occupancy schedule generated by the OccupancyGenerator to the model as the occupancy data in EnergyPlus simulated environment is broken. :param occupancy: Numpy matrix contains the number of occupanct in each zone at each time slot. :param locations: List of zone names. :return: None """ occupancy = occupancy.astype(int) self.occupancy = {locations[i]: occupancy[i, :] for i in range(len(locations))} if "Outdoor" in self.occupancy.keys(): self.occupancy.pop("Outdoor") if "busy" in self.occupancy.keys(): self.occupancy.pop("busy")
[docs] def set_runperiod(self, days, start_year: int = 2000, start_month: int = 1, start_day: int = 1, specify_year: bool = False): """ Set the simulation run period. :param days: How many days in total the simulation should perform. :param start_year: Start from which year :param start_month: Start from which month of the year :param start_day: Start from which day of the month :param specify_year: Use default year or a specific year when simulation is within a year. :return: None """ if "-w" not in self.run_parameters: raise KeyError("You must include a weather file to set run period") start = datetime(start_year, start_month, start_day) end = start + timedelta(days=days - 1) if not self.leap_weather: test_year = start_year - 1 while datetime(test_year, 1, 1) < end: test_year += 1 if not isleap(test_year): continue if datetime(test_year, 2, 29) > start and datetime(test_year, 2, 29) < end: end += timedelta(days=1) values = {"Begin Month": start_month, "Begin Day of Month": start_day, "End Month": end.month, "End Day of Month": end.day} if end.year != start_year or specify_year: values.update({"Begin Year": start_year, "End Year": end.year}) run_setting = self.get_configuration("RunPeriod") if len(run_setting) == 0: values["Name"] = "RunPeriod 1" self.add_configuration("RunPeriod", values) else: name = self.get_configuration("RunPeriod")[0].Name self.edit_configuration("RunPeriod", {"Name": name}, values)
[docs] def set_timestep(self, timestep_per_hour): """ Set the timestep per hour for the simulation. :param timestep_per_hour: How many timesteps within a hour. :return: None """ self.get_configuration("Timestep")[0].Number_of_Timesteps_per_Hour = timestep_per_hour
[docs] def add_state_modifier(self, model): """ Add a state modifier model, including predictive model, state estimator, controller, etc. :param model: A class object that follows the template (contains step(true_state, environment) method). :return: None """ self.state_modifier.add_model(model)
[docs] def flatten_state(self, order, state=None): """ Flatten the state to a list of values by a given order. :param order: The order that the values should follow. :param state: The state to flatten. If not specified, then the current state is selected. :return: List of values follows the given order. """ if state is None: state = self.current_state return [self.current_state.get(name, None) for name in order]
[docs] def sample_buffer(self, batch_size): """ Sample a batch of experience from the replay buffer. :param batch_size: Number of entries in a batch. :return: (state, action, next state, is terminate) """ return self.replay.sample(batch_size)
[docs] def sample_flattened_buffer(self, order, batch_size): """ Sample a batch of experience from the replay buffer and flatten the states by a given order. :param order: The order that the values should follow. :param batch_size: Number of entries in a batch. :return: (state, action, next state, is terminate) where states are flatten. """ state, action, next_state, done = self.replay.sample(batch_size) for i, row in state: state[i] = self.flatten_state(order, row) for i, row in next_state: next_state[i] = self.flatten_state(order, row) return state, action, next_state, done