import os
import math
import shutil
import time
import collections
from pathlib import Path
import logging
import uuid
import numpy as np
from fmpy.fmi1 import FMU1Slave, FMU1Model
from fmpy.fmi2 import FMU2Slave, FMU2Model
from fmpy import read_model_description, extract
from energym.envs.env import Env
from energym.envs.utils.weather import EPW, MOS
from energym.envs.utils.kpi import KPI
from energym.spaces.dict import Dict
from energym.spaces.discrete import Discrete
from energym.spaces.box import Box
logger = logging.getLogger(__name__)
[docs]class EnvFMU(Env):
"""The FMU base class for Energym.
It encapsulates an environment whose simulation is performed in an FMU.
The methods step(), reset() and close() from Env are implemented here.
Attributes
----------
fmu_file : str
Full path to the FMU file
model_description : fmpy ModelDescription object
Encapsulated description of the model extracted from the FMU using
FMPy inspection methods
fmi_version : str
Version number of FMI, inspected inside the FMU. Should be '1.0'
or '2.0'
step_size : int or double
Simulation stepsize in seconds. Int for EnergyPlus, double for Modelica
weather : str
Indicates the used weather profile
is_fmu_initialized : bool
Flags the FMU initialization process
vrs : dict
Contains the variable names and their references
fmi_type : str
The simulation type as speified by the FMU, either 'cosim' or 'modex'
start_time : int
Start of simulation time in seconds
stop_time : int
End of simulation time in seconds
kpis : KPI object
To track the KPI relevant metrics
input_space : dict
Contains controllable input variables
output_space : dict
Contains output variables
observation_history : list
Collects all observations of one simulation
unzipdir : str
Directory for extracting the FMU
fmu : FMU1Slave or FMU2Slave or FMU1Model or FMU2Model
Simulation object
time : int or double
Current simulation time (int for EnergyPlus, double for Modelica)
Methods
-------
initialize()
Initializes simulation object.
__build_input_space(input_specs)
Collects the inputs from the simulation object.
__build_output_space(output_specs)
Collects the outputs from the simulation object.
__initialize_fmu()
Initializes the FMU after instantiation.
get_inputs_names()
Retrieves list of inputs from model description.
get_outputs_names()
Retrieves list of outputs from model description.
get_date()
Gets the current simulation time.
step(inputs=None)
Advances the simulation one timestep.
print_kpis()
Prints the KPIs.
get_kpi(start_ind=0, end_ind=-1)
Retrieves the KPIs.
get_cumulative_kpi(phrase, kpi_type, out_type)
Retrieves the cumulative KPIs over multiple variables.
sample_random_action()
Samples random actions from the action space.
get_forecast(forecast_length = 24, **kwargs)
Generates a weather forecast of a given length.
look_for_weather_file(name = None)
Finds a weather file in the FMU.
post_process(list_rel_out, res, arrays=False)
Post-process output of FMPY.
reset()
Resets the simulation.
close()
Terminates the FMU and removes leftover folders.
"""
def __init__(
self,
model_path,
start_time,
stop_time,
step_size,
weather=None,
input_specs=None,
output_specs=None,
kpi_options=None,
default_path=True,
weather_file_path=None,
):
"""
Parameters
----------
model_path: str
Path to the fmu model file, relative inside the simulation folder
start_time: int
Begin of the simulation time in seconds in relation to the
beginning of the year
stop_time: int
End of the simulation time in seconds in relation to the
beginning of the year
step_size: double
Step size in second. May be chosen freely for some models (modelica),
or needs to be identical to model step size in other cases (EnergyPlus)
weather : EPW or MOS, optional
Specifies the used weather file, by default None
input_specs : dict, optional
Contains the model inputs, by default None
output_specs : dict, optional
Contains the model outputs, by default None
kpi_options : dict, optional
Dict to specify the tracked KPIs, by default None
default_path : bool
Whether to use the deault path or an absolute path in model_path
Raises
------
ValueError
If the FMU supprts neither co-simulation nor model exchange
"""
super().__init__()
if default_path:
self.fmu_file = self.energym_path / "simulation" / model_path
else:
self.fmu_file = model_path
self.model_description = read_model_description(self.fmu_file)
self.step_size = step_size
self.weather = weather
self.weather_file_path = weather_file_path
self.is_fmu_initialized = False
# Extract variables references
self.vrs = {}
for variable in self.model_description.modelVariables:
self.vrs[variable.name] = variable.valueReference
# detect fmi_version
self.fmi_version = self.model_description.fmiVersion
# detect FMI type
if self.model_description.coSimulation is not None:
self.fmi_type = "cosim"
elif self.model_description.modelExchange is not None:
self.fmi_type = "modex"
else:
raise ValueError("the type of FMU could not be identified")
# extract the FMU
self.start_time = start_time
self.stop_time = stop_time
self.input_specs = input_specs
# Fix inputs and outputs keys
if output_specs is not None:
self.output_keys = sorted(
[
p.name
for p in self.model_description.modelVariables
if p.name in list(output_specs.keys())
]
)
self.__build_output_space(output_specs)
else:
self.output_keys = sorted(
[
p.name
for p in self.model_description.modelVariables
if p.causality == "output"
]
)
if input_specs is not None:
self.input_keys = sorted(
[
p.name
for p in self.model_description.modelVariables
if p.name in list(input_specs.keys())
]
)
self.__build_input_space(input_specs)
else:
self.input_keys = sorted(
[
p.name
for p in self.model_description.modelVariables
if p.causality == "input"
]
)
self.kpis = KPI(kpi_options)
# # initialize FMU and spaces
self.initialize()
def __build_input_space(self, input_specs):
"""Collects the inputs from the simulation object.
The inputs have to be contained in input_specs but
not every key of the two needs to be an input to the specific model.
Parameters
----------
input_specs : dict
Contains possible control inputs from the model.
"""
input_array = self.get_inputs_names()
input_space_list = []
for act_name in input_array:
if act_name in input_specs:
act_specs = input_specs[act_name]
if act_specs["type"] == "scalar":
input_space_list += [
(
act_name,
Box(
low=act_specs["lower_bound"],
high=act_specs["upper_bound"],
shape=[1],
dtype=np.float32,
),
)
]
elif act_specs["type"] == "discrete":
input_space_list += [(act_name, Discrete(act_specs["size"]))]
else:
raise TypeError("Wrong type in INPUT_SPECS.")
else:
raise ValueError("Undefined Input {}".format(act_name))
self.input_space = Dict(spaces=input_space_list)
def __build_output_space(self, output_specs):
"""Collects the outputs from the simulation object.
The outputs have to be contained in output_specs, but not every
key needs to be an output to the specific model.
Parameters
----------
output_specs : dict
Contains possible outputs from the model.
"""
output_array = self.get_outputs_names()
output_space_list = []
for obs_name in output_array:
obs_specs = output_specs[obs_name]
if obs_specs["type"] == "scalar":
output_space_list += [
(
obs_name,
Box(
low=obs_specs["lower_bound"],
high=obs_specs["upper_bound"],
shape=[1],
dtype=np.float32,
),
)
]
elif obs_specs["type"] == "discrete":
output_space_list += [(obs_name, Discrete(obs_specs["size"]))]
self.output_space = Dict(spaces=output_space_list)
self.observation_history = []
def __initialize_fmu(self):
"""Initializes the FMU after instantiation."""
if self.fmi_version == "1.0":
self.fmu.initialize(tStart=self.start_time, stopTime=self.stop_time)
elif self.fmi_version == "2.0":
self.fmu.enterInitializationMode()
self.fmu.exitInitializationMode()
self.is_fmu_initialized = True
[docs] def initialize(self):
"""Initializes simulation object.
Instantiates FMPy FMUSalve1 or FMUSlave2 object based on FMI
version detected.
"""
init_time = str(time.time())[0:10]
random_id = str(uuid.uuid4().fields[-1])[:7]
fmu_path = os.path.join(self.runs_path, init_time + "_" + random_id)
os.mkdir(fmu_path)
self.unzipdir = extract(self.fmu_file, unzipdir=fmu_path)
weather_folder = Path(self.unzipdir) / "resources"
possible_weather_files = list(weather_folder.rglob("*.mos")) + list(
weather_folder.rglob("*.epw")
)
weather_default_file_path = weather_folder / possible_weather_files[0]
try:
os.remove(weather_default_file_path)
shutil.copy(self.weather_file_path, weather_default_file_path)
except BaseException as e:
logging.error(e)
logging.error("Problem with the weather file handling")
# initialize
instance_name = "instance" + init_time
# model identifier
if self.fmi_type == "modex":
model_id = self.model_description.modelExchange.modelIdentifier
else:
model_id = self.model_description.coSimulation.modelIdentifier
kwargs = dict(
guid=self.model_description.guid,
unzipDirectory=self.unzipdir,
modelIdentifier=model_id,
instanceName=instance_name,
)
if self.fmi_version == "1.0":
if self.fmi_type == "cosim":
self.fmu = FMU1Slave(**kwargs)
else:
self.fmu = FMU1Model(**kwargs)
elif self.fmi_version == "2.0":
if self.fmi_type == "cosim":
self.fmu = FMU2Slave(**kwargs)
else:
self.fmu = FMU2Model(**kwargs)
self.fmu.instantiate(loggingOn=True)
if self.fmi_version == "2.0":
self.fmu.setupExperiment(startTime=self.start_time, stopTime=self.stop_time)
# Initialize time and the last_output values
self.time = self.start_time
[docs] def get_outputs_names(self):
"""Retrieves list of outputs from model description.
Returns
-------
output_keys : list of str
Variable names that specify outputs.
"""
return self.output_keys # res
[docs] def get_date(self):
"""Gets the current simulation time.
Returns
-------
int
Minutes of the current simulation time
int
Hours of the current simulation time
int
Day of the current simulation time
int
Month of the current simulation time
"""
time_tuple = (2013, 1, 1, 0, 0, 0, 1, 1, 0)
base_time = time.mktime(time_tuple)
date = time.localtime(base_time + self.time)
return date[4], date[3], date[2], date[1]
[docs] def step(self, inputs=None):
"""Advances the simulation one timestep.
Applies input for current step, simulate the system in FMU and retrieves outputs.
Parameters
----------
inputs: dict
Inputs for the system. Keys are input names, values are iterables of input values.
If not defined, assumes no inputs required.
Returns
----------
outputs: dict
Outputs for the system.
"""
if inputs is None:
inputs = {}
# Initializes FMU is not already
if not self.is_fmu_initialized:
self.__initialize_fmu()
# Inputs is a dictionary of arrays
res = []
# simulation loop
if bool(inputs):
inp_keys = sorted(list(inputs.keys()))
n_steps = len(inputs[inp_keys[0]])
non_inp_keys = set(self.input_keys) - set(list(inputs))
else:
n_steps = 1
inp_keys = []
non_inp_keys = self.input_keys
for p in range(n_steps):
key_list = []
input_list = []
for key in inp_keys:
key_list.append(self.vrs[key])
input_list.append(inputs[key][p])
for key in non_inp_keys:
key_list.append(self.vrs[key])
input_list.append(self.input_specs[key]["default"])
self.fmu.setReal(
key_list,
input_list,
)
# perform one step
self.fmu.doStep(
currentCommunicationPoint=self.time,
communicationStepSize=self.step_size,
)
# get the values
out_values = self.fmu.getReal([self.vrs[key] for key in self.output_keys])
# advance the time
self.time += self.step_size
# append the results
res.append((self.time, out_values))
output = self.post_process(self.output_keys, res, arrays=False)
self.kpis.add_observation(output)
return output
[docs] def print_kpis(self):
"""Prints the KPIs."""
kpi_summary = self.get_kpi()
for key in kpi_summary:
print(
"####################################################################"
)
kpi_name = kpi_summary[key]["name"]
kpi_type = kpi_summary[key]["type"]
kpi_val = kpi_summary[key]["kpi"]
print(
"Variable name: {}, kpi type: {}, kpi value: {}".format(
kpi_name, kpi_type, kpi_val
)
)
[docs] def get_kpi(self, start_ind=0, end_ind=-1):
"""Retrieves the KPIs.
For implementation details see the KPI class.
Parameters
----------
start_ind : int, optional
Index from where the KPI computation starts, by default 0
end_ind: int, optional
Index where the KPI computation ends, by default -1
Returns
-------
kpi_summary : dict
Dict containing all the tracked variables and their KPIs.
"""
return self.kpis.get_kpi(start_ind, end_ind)
[docs] def get_cumulative_kpi(self, names, kpi_type, out_type):
"""Retrieves the cumulative KPIs over multiple variables.
For implementation details see the KPI class.
Parameters
----------
names : list or str
List of variable names or common string to filter the variables.
kpi_type : str
One of the 4 KPI types to filter the variables.
out_type : str
Cumulative KPI type ("avg" or "sum").
Returns
-------
float or int
The computed KPI.
"""
return self.kpis.get_cumulative_kpi(names, kpi_type, out_type)
[docs] def sample_random_action(self):
"""Samples random actions from the action space.
Returns
-------
dict
Inputs with random values, within a specified range
"""
action = self.input_space.sample()
return dict(list(action.items()))
[docs] def get_forecast(self, forecast_length=24):
"""Generates a weather forecast of a given length.
Parameters
----------
forecast_length : int, optional
Number of timesteps that will be forecasted, by default 24
Returns
-------
forecast : dict
Forecasted values for default keys or ones specified in kwargs
"""
time_resolution = self.step_size / 60
hourly_steps = int(60 / time_resolution)
tot_length = math.ceil(forecast_length / hourly_steps) + 2
start_index = 0
forecast = {}
if isinstance(self.weather, EPW):
minute, hour, day, month = self.get_date()
start_index = int(minute / time_resolution)
forecast = self.weather.get_forecast(hour, day, month, tot_length)
elif isinstance(self.weather, MOS):
res = self.time % 3600
start_index = int(res / self.step_size)
forecast = self.weather.get_forecast(self.time - res, forecast_length)
forecast = self._interpolate_forecast(forecast, hourly_steps)
for key in forecast:
forecast[key] = forecast[key][start_index : forecast_length + start_index]
return forecast
def _interpolate_forecast(self, forecast, hourly_steps):
for key in forecast:
mod_list = forecast[key]
new_list = []
for i in range(len(mod_list) - 1):
for j in range(hourly_steps):
weight = j / hourly_steps
new_list.append(
(1 - weight) * mod_list[i] + weight * mod_list[i + 1]
)
new_list.append(mod_list[len(mod_list) - 1])
forecast[key] = new_list
return forecast
[docs] def look_for_weather_file(
self,
name=None,
generate_forecasts=True,
generate_forecast_method="perfect",
generate_forecast_keys=None,
):
"""Finds a weather file in the FMU.
Parameters
----------
name : str
Name of weather file
Raises
------
Exception
If no weather file/more than one weather file is found or the file has a wrong type
"""
weather_folder = Path(self.unzipdir) / "resources"
if name is None:
possible_weather_files = list(weather_folder.rglob("*.mos")) + list(
weather_folder.rglob("*.epw")
)
else:
possible_weather_files = list(weather_folder.rglob(name))
if len(possible_weather_files) == 0:
raise Exception("No weather file found in FMU")
elif len(possible_weather_files) > 1:
raise Exception(
"Found more than one weather file: {}. specify a name to select one.".format(
possible_weather_files
)
)
else:
wf = weather_folder / possible_weather_files[0]
if wf.suffix == ".mos":
self.weather = MOS()
self.weather.read(
wf,
generate_forecasts,
generate_forecast_method,
generate_forecast_keys,
)
elif wf.suffix == ".epw":
self.weather = EPW()
self.weather.read(
wf,
generate_forecasts,
generate_forecast_method,
generate_forecast_keys,
)
else:
raise Exception(
"File {} cannot be interpreted as a weather file".format(wf)
)
[docs] def post_process(self, list_rel_out, res, arrays=False):
"""Post-process output of FMPY.
Parameters
----------
list_rel_out : list of str
Output labels
res : list
Output of doStep FMPy method
arrays : bool, optional
If True, array output in processed structure, default is False
Returns
-------
dic_res: collections.OrderedDict
Dictionary with values of output for each key
"""
N = len(res)
dic_res = collections.OrderedDict()
position = {e: i for i, e in enumerate(list_rel_out)}
# Store time
if arrays:
dic_res["time"] = []
for key in list_rel_out:
dic_res[key] = []
for p in range(N):
for key in list_rel_out:
if arrays:
dic_res[key] += [res[p][1][position[key]]]
else:
dic_res[key] = res[p][1][position[key]]
if arrays:
dic_res["time"] += [res[p][0]]
else:
dic_res["time"] = res[p][0]
if arrays:
for key in dic_res:
dic_res[key] = np.asarray(dic_res[key]).flatten()
return dic_res
[docs] def reset(self):
"""Resets the simulation."""
self.close()
self.kpis.reset()
self.initialize()
[docs] def close(self, save=True):
"""Terminates the FMU and removes leftover folders."""
instance_name = self.fmu.instanceName
self.fmu.terminate()
self.fmu.freeInstance()
self.is_fmu_initialized = False
try:
shutil.rmtree(self.unzipdir)
except PermissionError as e:
logger.error(f"Folder could not be removed. {e}")
cwd = os.getcwd()
wd_sub_list = os.listdir(cwd)
if save:
for directory in wd_sub_list:
if instance_name in directory:
try:
shutil.move(
os.path.join(cwd, directory),
os.path.join(self.runs_path, directory),
)
except PermissionError as e:
logger.error(f"Folder could not be moved. {e}")
else:
for directory in wd_sub_list:
if instance_name in directory:
try:
shutil.rmtree(
os.path.join(cwd, directory),
)
except PermissionError as e:
logger.error(f"Folder could not be removed. {e}")