Source code for alpyne.sim

import atexit
import json
import logging
import os
import subprocess
import time
from datetime import datetime
from tempfile import TemporaryDirectory
from typing import Any
import socket
from pathlib import Path
from warnings import warn

import psutil as psutil
import requests
from requests.exceptions import ConnectionError as RequestsConnectionError, HTTPError
from psutil import NoSuchProcess

from alpyne.utils import resolve_model_jar, \
    get_wildcard_paths, shorten_by_relativeness, get_resources_path, AlpyneJSONEncoder, AlpyneJSONDecoder
from alpyne.constants import EngineState, JavaLogLevel
from alpyne.outputs import TimeUnits, UnitValue
from alpyne.typing import EngineSettingKeys, Number, OutputType
from alpyne.data import SimSchema, EngineStatus, EngineSettings, SimStatus, FieldData, \
    SimConfiguration, SimAction, SimObservation
from alpyne.errors import ModelError, ExitException


def find_free_port() -> int:
    with socket.socket() as s:
        s.bind(('', 0))
        return s.getsockname()[1]


[docs] class AnyLogicSim: """The class for connecting to, communicating with, and controlling an AnyLogic sim model exported via the RL Experiment.""" schema: SimSchema = None """ A class variable (static) defining the model schema, assigned on startup """ def __init__(self, model_path: str, port: int = 0, py_log_level: int | str | bool = logging.WARNING, java_log_level: int | str | bool = logging.WARNING, log_id: str = None, auto_lock: bool = True, auto_finish: bool = False, engine_overrides: dict[EngineSettingKeys, Number | datetime | TimeUnits | UnitValue] = None, config_defaults: dict[str, Any] = None, lock_defaults: dict = None, java_exe: str = None, startup_delay: float = 0.1, **kwargs): """ Initialize a connection to the simulation model, with arguments for defining the model setup and operating behavior. By default, the runs will use the units, start/stop time/date, and RNG seed based on what you set in the RL experiment. This can be overriden by passing a dictionary to ``engine_overrides``. :param model_path: a relative or absolute path to the exported model zip or extracted model.jar file :param port: what local port to run the Alpyne app on (0 = find a free one) :param py_log_level: verboseness for this library; expressed as levels from the ``logging`` library or a boolean - True defaults to INFO, False to WARNING; defaults to WARNING :param java_log_level: verboseness of java-side logging (writes to alpyne.log); expressed as levels from the ``logging`` library or a boolean - True defaults to INFO, False to WARNING; defaults to WARNING :param log_id: An identifier to put between the log file name and the '.log' extension, useful if you do not want log files to be overridden by subsequent or parallel runs; you can use $p for the port number, $n for a unique number (starts from 1); defaults to None (empty string) :param auto_lock: whether to automatically wait for a 'ready' state after each call to reset or take_action and return the subsequent RL status instead of None; defaults to False :param auto_finish: whether to automatically force the model into a FINISH state once the stop condition (defined in the RL experiment) is met; defaults to False :param engine_overrides: definition for overrides to the engine settings; allowed keys: seed, units, start_time, start_date, stop_time, stop_date; defaults to None :param config_defaults: desired default values for the Configuration values; defaults to None :param lock_defaults: default values to use when calling ``lock``; flag arg defaults to ``EngineState.ready()``, timeout to 30 :param java_exe: Path to Java executable; if None, uses whatever is associated with the 'java' command :param startup_delay: Seconds to wait between launching the application and sending the first request to verify it's alive. Too small and it may incorrectly detect as not started up; too large and it adds to startup overhead. Default 0.1 :param kwargs: Internal arguments :raises ModelError: if the app fails to start .. warning:: For ``engine_overrides``: (1) do not override the model time units unless ensuring the model does not have ambiguous, unit-generic logic (e.g., calls to ``time()``), as this can affect the logic or outputs; (2) when setting a stop time *and* date, only the last set value will be used. """ if port == 0: port = find_free_port() if py_log_level is True: py_log_level = logging.INFO elif py_log_level is False: py_log_level = logging.WARNING logging.basicConfig( #level=logging.getLevelName(py_log_level), format=f"%(asctime)s [%(name)s @ %(lineno)s][%(levelname)8s] %(message)s", handlers=[logging.StreamHandler()], ) logging.getLogger("alpyne").setLevel(logging.getLevelName(py_log_level)) self.log = logging.getLogger(__name__) self.auto_wait = auto_lock self._last_status = None # should be updated whenever `lock` is called self._internal_args = kwargs self._proc_pids: list = [] # will store all top-level and children PIDs for killing them later java_exe_path = self._validate_java(java_exe) try: self._temp_dir = None # only populated if passed as zip self._proc = self._start_app(java_exe_path, model_path, port, java_log_level, log_id, auto_finish, startup_delay) except: raise ModelError(f"Failed to properly start the app. Check the logs.") # support for "hidden" arguments for host; meant for debugging or testing purposes (may include a dedicated arg if deemed useful) self._base_url = f"{kwargs.get('host') or 'http://127.0.0.1'}:{port}" self._session = requests.Session() # may need more than fraction of time in `_start_app` for the server to be set up set_schema, time_start = False, time.time() while not set_schema and (time.time() - time_start <= 2): try: AnyLogicSim.schema = SimSchema( self._session.get(f"{self._base_url}/version").json() ) set_schema = True except: time.sleep(0.5) if not set_schema: raise ModelError(f"The underlying application failed to start. Check the logs.") # setup the `engine_settings` instance variable to store the desired values to use if engine_overrides is None: # default to an empty dict, so the next fallback will be used engine_overrides = dict() # omitted values will cause the object to pull from the schema self.engine_settings = EngineSettings(**engine_overrides) # overwrite the 'value' in the schema for any config fields desired to be overridden for key, new_value in (config_defaults or dict()).items(): AnyLogicSim.schema.configuration[key].value = new_value # setup lookup for lock kwargs, specifying any missing kwargs here self._lock_defaults = lock_defaults or dict() self._lock_defaults.setdefault("flag", EngineState.ready()) self._lock_defaults.setdefault("timeout", 30) def _validate_java(self, java_exe: str) -> str: """ :param java_exe: Java command or path to executable :return: The full path to the executable file """ # set some default value if java_exe is None: java_exe = "java" # assign initial path value and resolve as needed java_exe_path = Path(java_exe) if java_exe_path.exists(): # file or directory target_file = "java" + ".exe" if os.name == "nt" else "" if os.path.isdir(java_exe): # look for java[.exe] in the bin folder try: java_exe_path = next(java_exe_path.rglob(target_file)) except StopIteration: raise RuntimeError(f"Could not find {target_file} in {java_exe}") elif java_exe_path.name != target_file: raise ValueError(f"Passed executable ({java_exe_path}) does not have the expected filename ({target_file})") else: # assume java_exe is referring to a command locate_cmd = "where" if os.name == "nt" else "which" proc = subprocess.run(f"{locate_cmd} {java_exe}", capture_output=True, shell=True) if proc.returncode != 0: logs = [proc.stdout.decode().strip(), proc.stderr.decode().strip()] logs = [log for log in logs if log] # remove empty strings raise RuntimeError(f"Failed to find path to java executable via {locate_cmd} {java_exe}. Return code: {proc.returncode}; message: {' | '.join(logs)}") java_exe_path = proc.stdout.decode().strip() return str(java_exe_path) def _start_app(self, java_exe_path: str, model_path: str, port: int, java_log_level: int | str | bool | JavaLogLevel, log_id: str, auto_finish: bool, startup_delay: float = 0.1) -> subprocess.Popen: """ Execute the backend app with the desired preferences. :param java_exe_path: :param model_path: :param port: :param java_log_level: :param log_id: :param auto_finish: :param startup_delay: """ # get the directory for the model, optionally extracting it to a temp dir if necessary model_jar, self._temp_dir = resolve_model_jar(model_path) model_dir = str(model_jar.parent.absolute()) # temporarily change to the exported model folder's directory for starting purposes # (needed to make sure database is properly connected to) initdir = os.getcwd() os.chdir(model_dir) # get the directory for the alpyne server library alpyne_path = self._internal_args.get('alpyne_path', str(get_resources_path())) self.log.debug(f"Using Java executable {java_exe_path}") self.log.debug(f"Loading server from {alpyne_path}") self.log.debug(f"Launching using model in {model_dir}") # build the class path argument, reformatting to use wildcards so as to avoid the cp arg len limit jar_sources = get_wildcard_paths(alpyne_path) + get_wildcard_paths(str(model_dir)) jar_sources = shorten_by_relativeness(jar_sources) if os.name == "nt": class_path = ";".join(jar_sources) else: class_path = ":".join(jar_sources) # convert to a java-compatible level, if not already if isinstance(java_log_level, bool): # bool -> python logging type if java_log_level is True: java_log_level = logging.INFO elif java_log_level is False: java_log_level = logging.WARNING if isinstance(java_log_level, (str, int)): # python logging type (or compatible arg) -> java logging type java_log_level = JavaLogLevel.from_py_level(java_log_level) # preliminary list of arguments cmdline_args = [java_exe_path, "-cp", class_path, "com.anylogic.alpyne.AlpyneServer", "-p", f"{port}", "-l", java_log_level.name, "-d", initdir ] # handle adding conditional arguments if log_id is not None and len(log_id) > 0: # only include log id flag if non-default cmdline_args.extend(["-i", log_id]) if auto_finish: # flag without arguments cmdline_args.append("-f") # add in final argument cmdline_args.append(".") self.log.debug(f"Executing:\n{' '.join(cmdline_args)}\n") try: proc = subprocess.Popen(cmdline_args, stdin=subprocess.PIPE, # Needed for quitting the app ) except FileNotFoundError: raise RuntimeError("Java not found. Please check your system path.") # return back to original directory os.chdir(initdir) returncode = proc.poll() if returncode is not None and returncode != 0: err_message = proc.stderr.readlines() raise EnvironmentError(f"Process returned code: {returncode}; message: {err_message}") # Give the previous command a moment to realize time.sleep(startup_delay) # Store the IDs from both the active process and any subprocesses spawned from it, # for later confirming the finality of it. # (this may happen when the `java` command calls itself with corrected/absolute paths) self._proc_pids = [proc.pid] + [c.pid for c in psutil.Process(proc.pid).children(True)] self.log.info(f"Started app | PID(s) = {self._proc_pids}") atexit.register(self._quit_app) return proc def _quit_app(self): """ Trigger app's self-destruct, killing any active runs, in addition to cleaning up any temporary files """ # Trigger self-destruct, killing the active run try: self._session.delete(f"{self._base_url}/") except RequestsConnectionError: self.log.debug( "Failed to request self-destruct from server due to connection error. Attempting other methods.") try: # send arbitrary text directly to process to trigger shutdown stdout, stderr = self._proc.communicate('PYTHON SMITES THEE!'.encode(), 3) if stdout: self.log.debug(f"Uncaught output from app's stdout: {stdout.decode()}") if stderr: self.log.debug(f"Uncaught output from app's stderr: {stderr.decode()}") except subprocess.TimeoutExpired: self.log.debug(f"Timed out waiting to send shutdown signal; may already be dead") # ensure the server self-quit, otherwise attempt to force it try: rcode = self._proc.wait(1) self.log.info(f"Quit with return code {rcode}") except subprocess.TimeoutExpired: self.log.debug(f"Force killing app; server did not quit as expected") self._proc.kill() for pid in self._proc_pids: try: p = psutil.Process(pid) p.kill() except NoSuchProcess: pass # pause to let actions take place time.sleep(0.1) # report if any PIDs still exist for pid in self._proc_pids: try: _ = psutil.Process(pid) # wait extra little bit just in case.... time.sleep(0.4) _ = psutil.Process(pid) msg = (f"ALERT! All attempts to kill process with ID {pid} failed! " f"May requires system restart or manual quit to close.") self.log.error(msg) except NoSuchProcess: pass # final cleanup self._session.close() if self._temp_dir: self._temp_dir.cleanup() self.log.info(f"Deleted temporary directory: {self._temp_dir.name}") # bug in java app: even on clean close, the lck file does not get removed; handle that here for now for file in Path(os.getcwd()).glob("*.log.lck"): file.unlink(missing_ok=True) # in case it does delete it but face a race condition self.log.info("Completed cleanup successfully") def _request(self, method: str, endpoint: str, params: dict = None, data: dict = None) -> dict | None: """ Submit an HTTP request to the underlying server and return the results :param method: The HTTP verb :param endpoint: The endpoint to call upon; gets appended to the base url; starting slash is not necessary :param params: Query parameters to add to the url :param data: Body to add to the request :return: The processed JSON output, if any (else None) :raises HTTPError: If there was any problem receiving or submitting the request """ try: if data is not None: data = json.dumps(data, cls=AlpyneJSONEncoder) self.log.debug(f"Request : {method} @ {endpoint}: {params} | {data}") response = self._session.request(method, f"{self._base_url}/{endpoint.strip('/')}", params=params, data=data) self.log.debug(f"Response: {response.status_code} | {response.content}") if 400 <= response.status_code < 600: source = "Client" if response.status_code < 500 else "Server" reason = response.reason if isinstance(reason, bytes): reason = reason.decode("utf-8") error_msg = f"{response.status_code} {source} Error: {reason} for url {response.url} -- check alpyne.log for more info" raise HTTPError(error_msg) elif response.content: return response.json(cls=AlpyneJSONDecoder) except KeyboardInterrupt: # reraise an exception so that any calling function will end itself and trigger the on-exit logic self.log.info(f"Interrupted {method} request to {endpoint}; passing.") raise ExitException("Exit requested.")
[docs] def reset(self, _config: SimConfiguration | dict = None, **config_kwargs) -> SimStatus | None: """ Reset the experiment to the beginning with the specified configuration. Any omitted values will use the default (either defined by you in the constructor or else the Java defaults). You should pass an object or keyword arguments (only one necessary). After applying, the model will auto-run (in the background) to the first call to ``takeAction`` or its natural finish condition, whichever comes first. :param _config: A dictionary or dictionary subclass with configuration arguments :param config_kwargs: Mapping between names defined in the RL Experiment's Configuration to the desired values :return: Nothing (when ``auto_wait`` == False) or the model's status post-waiting (when ``auto_wait`` == True) """ if _config is None: _config = SimConfiguration() elif isinstance(_config, dict): _config = SimConfiguration(**_config) _config.update(**config_kwargs) _ = self._request("PUT", "rl", data=dict(configuration=_config, engine_settings=self.engine_settings)) if self.auto_wait: return self.lock()
[docs] def take_action(self, _action: SimAction | dict = None, **action_kwargs) -> SimStatus | None: """ Submit an action to the model. You should pass an object or keyword arguments (only one necessary, with the former taking higher precedence). :param _action: The dataclass instance or a dictionary with action arguments :param action_kwargs: Direct mapping between names defined in the RL Experiment's Action to the desired values :return: Nothing (when ``auto_wait`` == False) or the model's status (when ``auto_wait`` == True) """ if _action is None: _action = SimAction() elif isinstance(_action, dict): _action = SimAction(**_action) _action.update(**action_kwargs) _ = self._request("PATCH", "rl", data=dict(action=_action)) if self.auto_wait: return self.lock()
[docs] def observation(self) -> SimObservation: """ Queries the current Observation, regardless of the model's current state (i.e., it may not be requesting an action yet!). This function is a shorthand for ``status().observation``. :return: The current model observation """ return self.status().observation
def _to_status(self, data: dict) -> SimStatus: """ A centralized location for the logic to handle the response from a status request - whether from calling `status` or `lock`. :param data: The output from GET /status :return: The SimStatus object """ # all attributes in data should match those in the SimStatus object status = SimStatus(**data) # when the message it passed, assume some important, but non-halting issue if status.message: # TODO throw runtime error instead? warn(status.message) return status
[docs] def status(self) -> SimStatus: """ Queries the current status of the model, regardless of its current state (i.e., it may not be requesting an action yet!). :return: The current model status """ data = self._request("GET", "status") return self._to_status(data)
[docs] def _engine(self) -> EngineStatus: # TODO remove me? rename? """ :return: An immutable object providing engine-level information """ data = self._request("GET", "engine") # received data is a dict matching the EngineStatus attributes return EngineStatus(**data)
[docs] def lock(self, flag: EngineState = None, timeout: int = None) -> SimStatus: """ Hang the active thread until the engine is in a given state. :param flag: An encoded indicator for which state(s) to wait for the engine to be in. Defaults to ``State.ready()`` (i.e., in PAUSED, FINISHED, or ERROR) unless this object was constructed with different defaults (by you). :param timeout: Maximum wait time, in seconds. Defaults to 30 unless this object was constructed with different defaults (by you). :return: An object providing status information :raise TimeoutError: If timeout elapses before the desired state is reached """ # replace Nones with default values, as defined in constructor if flag is None: flag = self._lock_defaults["flag"] if timeout is None: timeout = self._lock_defaults["timeout"] names = [state.name for state in EngineState if flag & state] data = self._request("GET", "lock", params=dict(state=names, timeout=int(timeout * 1000))) status = self._to_status(data) self._last_status = status return status
[docs] def outputs(self, *names: str) -> list[OutputType] | dict[str, OutputType] | None: """ Retrieves the values of any analysis-related objects in the top-level agent (if any exist), as detailed further in the AnyLogic Help article `Data analysis items <https://anylogic.help/anylogic/analysis/index.html>`_ and its subpages. Each of the analysis-related objects have a type of the same name implemented in this library. Output objects for plain scalar types are converted to their natural python equivalent (e.g., String -> str, double -> float); types units attached make use of the custom ``UnitValue`` type. :param names: The names (as defined in the sim) of objects to query the current value for; passing nothing will get everything :return: The desired values; provided as a list when explicit names are used (in the specified order), otherwise in a dictionary keyed by the names """ # as the next step changes 'names', first record what the return should be ret_as_list = bool(names) # put all names if none were provided if not names: names = list(self.schema.outputs.keys()) # instant return if model has no outputs if not names: return list() if ret_as_list else dict() data = self._request("GET", "outputs", params=dict(names=names)) # received data contains a list of ModelData objects so that their types are included, # which allows us to pull their converted, non-raw (non-dict / typed) values out. # initially store as a dict outputs = dict() for model_data in data['model_datas']: md = FieldData(**model_data) outputs[md.name] = md.py_value if ret_as_list: # explicitly use provided order in case values returned otherwise return [outputs[name] for name in names] return outputs