diff --git a/TODO.md b/TODO.md index 0b9a5ec..d37098b 100644 --- a/TODO.md +++ b/TODO.md @@ -1 +1,2 @@ * add lambda support +* add outfile tracking to docker containers and instances and docker groups diff --git a/minerva/__init__.py b/minerva/__init__.py index 74e1b53..b0bd4ff 100644 --- a/minerva/__init__.py +++ b/minerva/__init__.py @@ -8,7 +8,9 @@ from .docker import Docker from .remote import Remote from .machine import Machine from .pier import Pier -from .pool import Pool +from .pool import Pool, TempOuts + +from .timing import Timing from .minerva import Minerva @@ -26,6 +28,8 @@ __all__ = [ "load_sql", "AWS_INSTANCES", "Pool", - "Remote" + "Remote", + "TempOuts", + "Timing" ] diff --git a/minerva/athena.py b/minerva/athena.py index e600de9..117ee1f 100644 --- a/minerva/athena.py +++ b/minerva/athena.py @@ -7,7 +7,6 @@ import pyarrow as pa import pyarrow.dataset import pprint import datetime -import dask.dataframe as dd from minerva import parallel_map, load_template pp = pprint.PrettyPrinter(indent=4) diff --git a/minerva/docker.py b/minerva/docker.py index e11c48d..4132485 100644 --- a/minerva/docker.py +++ b/minerva/docker.py @@ -44,16 +44,19 @@ class Docker: if self.registry.endswith("amazonaws.com"): self.machine.aws_docker_login(self.registry) - res = self.machine.docker_run(self.uri, cmd=cmd, env=self.variables) + res = self.machine.docker_run(self.uri, + cmd = cmd, + env = self.variables, + output = (self.stdout, self.stderr)) - self.out["stdout"] = res[0].name - self.out["stderr"] = res[1].name + #self.out["stdout"] = res[0].name + #self.out["stderr"] = res[1].name - if self.stdout: - self.stdout.write(res[0].read()) + #if self.stdout: + # self.stdout.write(res[0].read()) - if self.stderr: - self.stderr.write(res[1].read()) + #if self.stderr: + # self.stderr.write(res[1].read()) self.finished = True print(f"finished on {self.machine.name}") diff --git a/minerva/lambda.py b/minerva/lambda.py new file mode 100644 index 0000000..a9889e8 --- /dev/null +++ b/minerva/lambda.py @@ -0,0 +1,13 @@ +import json + +class Lambda: + def __init__(self, handler, name): + self.handler = handler + self.name = name + self.client = handler.session.client("lambda") + + def invoke(self, payload): + self.client.invoke(InvocationType = "RequestResponse", + FunctionName = self.name, + Payload = json.dumps(payload) or "{}") + diff --git a/minerva/machine.py b/minerva/machine.py index d0e5d53..4b6502c 100644 --- a/minerva/machine.py +++ b/minerva/machine.py @@ -21,20 +21,21 @@ class Machine(minerva.Remote): public = True, disk_size = 8): - self.pier = pier - self.ami = ami + self.pier = pier + self.ami = ami self.instance_type = instance_type - self.username = username - self.key_pair = key_pair - self.variables = variables - self.name = name - self.ready = False - self.info = None - self.ssh = None - self.started = False - self.terminated = False - self.public = public - self.disk_size = disk_size + self.username = username + self.key_pair = key_pair + self.variables = variables + self.name = name + self.instance_id = None + self.ready = False + self.info = None + self.ssh = None + self.started = False + self.terminated = False + self.public = public + self.disk_size = disk_size def create(self): @@ -63,12 +64,13 @@ class Machine(minerva.Remote): self.info = res['Instances'][0] self.private_ip = self.info['NetworkInterfaces'][0]['PrivateIpAddress'] + self.instance_id = self.info['InstanceId'] # TODO there should be a check here in case some instances fail to # start up in a timely manner # Start a countdown in the background # to give time for the instance to start up - wait_time = 30 + wait_time = 180 self.thread = threading.Thread(target = self.wait, args = (wait_time,), daemon = True) @@ -97,7 +99,7 @@ class Machine(minerva.Remote): time.sleep(10) i += 1 - if i > 18: + if i > (n / 10): reason = f"{self.info['InstanceId']} took too long to start ({i} attempts)" raise Exception(reason) @@ -152,7 +154,7 @@ class Machine(minerva.Remote): def run_time(self): now = datetime.datetime.now() - start_time = self.started or now # what if we haven't started? + start_time = self.started or now # what if AWS hasn't made our start time available? end_time = self.terminated or now # what if we're still running? return end_time - start_time diff --git a/minerva/pool.py b/minerva/pool.py index 5b58e24..3724592 100644 --- a/minerva/pool.py +++ b/minerva/pool.py @@ -1,4 +1,6 @@ from threading import Thread, Lock +import time +import os class Pool: def __init__(self, worker, num=1, web=False): @@ -6,6 +8,7 @@ class Pool: # many concurrent requests for AWS self.machines = [worker(i).create() for i in range(num)] self.mutex = None + self.jobs = [] if web: import minerva.web @@ -16,6 +19,7 @@ class Pool: machine.join() machine.login() + # One thread per machine def run(self, func, data=[]): if not data or not func: return @@ -49,8 +53,11 @@ class Pool: self.mutex.release() # do the work - func(machine, item) - #time.sleep(0.5) + start = time.time() + result = func(machine, item) + total = time.time() - start + + self.jobs.append({"time": total, "input": item, "return": result}) # prior to return to the while-loop check self.mutex.acquire() @@ -66,3 +73,31 @@ class Pool: def cost(self): return sum([mach.cost() for mach in self.machines]) + +class TempOuts: + def __init__(self, directory, prefix): + self.directory = directory + self.prefix = prefix + self.stdout = None + self.stderr = None + + + def __enter__(self): + try: + os.mkdir(self.directory) + except: + pass + + path = os.path.join(self.directory, self.prefix) + + self.stdout = open(f"{path}_stdout.out", "ab") + self.stderr = open(f"{path}_stderr.out", "ab") + + return (self.stdout, self.stderr) + + + def __exit__(self, exception_type, exception_value, exception_traceback): + self.stdout.close() + self.stderr.close() + + diff --git a/minerva/remote.py b/minerva/remote.py index 13628b5..87ddfcc 100644 --- a/minerva/remote.py +++ b/minerva/remote.py @@ -20,10 +20,10 @@ class Remote: ip, username, key_path): - self.ip = ip + self.ip = ip self.username = username - self.key_path = key_path # full path - self.ssh = None + self.key_path = os.path.expanduser(key_path) # full path + self.ssh = None def login(self): if self.ssh: @@ -55,7 +55,7 @@ class Remote: # # https://github.com/paramiko/paramiko/issues/593#issuecomment-145377328 # - def cmd(self, command, hide=True, disown=False, watch=False): + def cmd(self, command, hide=True, disown=False, watch=False, output=(None, None)): # TODO this is necessary to load paramiko details #self.ssh.run("echo hello world", warn=True, hide=hide, disown=disown) @@ -73,8 +73,11 @@ class Remote: # are done # # Thanks to SirDonNick in #python for the help here - out = tempfile.NamedTemporaryFile(delete=False) - err = tempfile.NamedTemporaryFile(delete=False) + out = output[0] or tempfile.NamedTemporaryFile(delete=False) + err = output[1] or tempfile.NamedTemporaryFile(delete=False) + + print(command) + print(f"\t{out.name} -- {err.name}") # Taken from # https://stackoverflow.com/a/78765054 @@ -85,14 +88,14 @@ class Remote: # indicate that we're not going to write to that channel anymore channel.shutdown_write() - # read stdout/stderr to prevent read block hangs - flush_data(channel.recv(len(channel.in_buffer)), - out, - (watch and sys.stdout.buffer)) + ## read stdout/stderr to prevent read block hangs + #flush_data(channel.recv(len(channel.in_buffer)), + # out, + # (watch and sys.stdout.buffer)) - flush_data(channel.recv_stderr(len(channel.in_stderr_buffer)), - err, - (watch and sys.stderr.buffer)) + #flush_data(channel.recv_stderr(len(channel.in_stderr_buffer)), + # err, + # (watch and sys.stderr.buffer)) timeout = 60 @@ -103,6 +106,7 @@ class Remote: or channel.recv_stderr_ready()): # stop if channel was closed prematurely and buffers are empty got_chunk = False + readq, _, _ = select.select([channel], [], [], timeout) # returns three empty lists on timeout @@ -158,9 +162,9 @@ class Remote: return (open(out.name, "rb"), open(err.name, "rb"), thread) - def write_env_file(self, variables, fname="~/env.list"): + def write_env_file(self, variables, fname="~/env.list", output=(None, None)): vals = "\n".join([f"{var}={val}" for var, val in variables.items()]) - self.cmd(f"echo {shlex.quote(vals)} > {fname}") + self.cmd(f"echo {shlex.quote(vals)} > {fname}", output=output) return fname @@ -169,22 +173,22 @@ class Remote: return docker - def aws_docker_login(self, ecr): + def aws_docker_login(self, ecr, output=(None, None)): return self.cmd(f"aws ecr get-login-password --region {self.pier.session.region_name} | " + - f"docker login --username AWS --password-stdin {ecr}" -) + f"docker login --username AWS --password-stdin {ecr}", + output=output) - def docker_run(self, uri, cmd="", env={}): + def docker_run(self, uri, cmd="", env={}, output=(None, None)): if env: fname = self.write_env_file(env) environ = f"--env-file {fname}" else: environ = "" - return self.cmd(f"docker run {environ} {uri} {cmd}") + return self.cmd(f"docker run -t {environ} {uri} {cmd}", output=output) - def docker_pull(self, uri): - return self.cmd(f"docker pull {uri}") + def docker_pull(self, uri, output=(None, None)): + return self.cmd(f"docker pull {uri}", output=output) diff --git a/pyproject.toml b/pyproject.toml index 08226e4..98696b6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,12 +18,12 @@ minerva-console = "minerva.console:main" [tool.poetry.dependencies] python = ">3.9" boto3 = "^1.34.0" -pyarrow = "^14.0.1" +pyarrow = "^16.0" joblib = "^1.1.0" fabric = "^3.0.0" s3fs = ">2023.6.0" mako = ">1.2.0" -dask = ">2023.11.0" -distributed = ">2023.11.0" +#dask = ">2023.11.0" +#distributed = ">2023.11.0" pandas = ">2.0.0" -numpy = ">1.26.0" +numpy = ">2.0"