"""
This module circumvents the ray package while maintaining ray-like syntax in the code.
Only the exact ISOFIT use cases of Ray are implemented here. If new uses of Ray are
implemented, those uses/functions will have to be defined here as well.
To enable, set the environment variable `ISOFIT_DEBUG` to "1". For example:
$ export ISOFIT_DEBUG=1
$ python isofit ...
Additionally, you may pass it as a temporary environment variable via:
$ ISOFIT_DEBUG=1 python isofit ...
"""
import logging
from types import FunctionType
[docs]
Logger = logging.getLogger(__file__)
[docs]
class Remote:
def __init__(self, obj, *args, **kwargs):
[docs]
def __getattr__(self, key):
"""
Returns a Remote object on the key being requested. This enables
ray.remote(Class).func.remote()
"""
return Remote(getattr(self.obj, key))
[docs]
def remote(self, *args, **kwargs):
return Remote(self.obj, *args, **kwargs)
[docs]
def get(self):
return self.obj(*self.args, **self.kwargs)
[docs]
def __repr__(self):
return f"<Remote({self.obj})>"
[docs]
def __getattr__(key):
"""
Reports any call to Ray that is not emulated
"""
Logger.error(f"Unsupported operation: {key!r}")
return lambda *a, **kw: None
[docs]
def remote(*args, **kwargs):
def wrap(obj):
return Remote(obj)
if len(args) == 1 and isinstance(args[0], (FunctionType, object)):
return wrap(*args)
return wrap
[docs]
def init(*args, **kwargs):
Logger.debug("Ray has been disabled for this run")
[docs]
def get(jobs):
if hasattr(jobs, "__iter__"):
return [job.get() for job in jobs]
else:
return jobs.get()
[docs]
def wait(jobs, num_returns=1, **kwargs):
if hasattr(jobs, "__iter__"):
if num_returns + 1 < len(jobs):
return jobs[:num_returns], jobs[num_returns:]
return jobs[:num_returns], []
else:
return jobs
[docs]
def put(obj):
return obj
[docs]
def shutdown(*args, **kwargs):
pass
[docs]
class util:
[docs]
class ActorPool:
def __init__(self, actors):
"""
Emulates https://docs.ray.io/en/latest/_modules/ray/util/actor_pool.html
Parameters
----------
actors: list
List of Remote objects to call
"""
# Only need one actor function
[docs]
self.actors = Remote(actors[0].get())
[docs]
def map_unordered(self, func, iterable):
return [func(self.actors, item).get() for item in iterable]