finally got streaming output working again

This commit is contained in:
Ari Brown 2024-10-18 14:23:27 -04:00
parent 95f87b2e3c
commit 6eb31cab1e
9 changed files with 115 additions and 54 deletions

View file

@ -1 +1,2 @@
* add lambda support * add lambda support
* add outfile tracking to docker containers and instances and docker groups

View file

@ -8,7 +8,9 @@ from .docker import Docker
from .remote import Remote from .remote import Remote
from .machine import Machine from .machine import Machine
from .pier import Pier from .pier import Pier
from .pool import Pool from .pool import Pool, TempOuts
from .timing import Timing
from .minerva import Minerva from .minerva import Minerva
@ -26,6 +28,8 @@ __all__ = [
"load_sql", "load_sql",
"AWS_INSTANCES", "AWS_INSTANCES",
"Pool", "Pool",
"Remote" "Remote",
"TempOuts",
"Timing"
] ]

View file

@ -7,7 +7,6 @@ import pyarrow as pa
import pyarrow.dataset import pyarrow.dataset
import pprint import pprint
import datetime import datetime
import dask.dataframe as dd
from minerva import parallel_map, load_template from minerva import parallel_map, load_template
pp = pprint.PrettyPrinter(indent=4) pp = pprint.PrettyPrinter(indent=4)

View file

@ -44,16 +44,19 @@ class Docker:
if self.registry.endswith("amazonaws.com"): if self.registry.endswith("amazonaws.com"):
self.machine.aws_docker_login(self.registry) self.machine.aws_docker_login(self.registry)
res = self.machine.docker_run(self.uri, cmd=cmd, env=self.variables) res = self.machine.docker_run(self.uri,
cmd = cmd,
env = self.variables,
output = (self.stdout, self.stderr))
self.out["stdout"] = res[0].name #self.out["stdout"] = res[0].name
self.out["stderr"] = res[1].name #self.out["stderr"] = res[1].name
if self.stdout: #if self.stdout:
self.stdout.write(res[0].read()) # self.stdout.write(res[0].read())
if self.stderr: #if self.stderr:
self.stderr.write(res[1].read()) # self.stderr.write(res[1].read())
self.finished = True self.finished = True
print(f"finished on {self.machine.name}") print(f"finished on {self.machine.name}")

13
minerva/lambda.py Normal file
View file

@ -0,0 +1,13 @@
import json
class Lambda:
def __init__(self, handler, name):
self.handler = handler
self.name = name
self.client = handler.session.client("lambda")
def invoke(self, payload):
self.client.invoke(InvocationType = "RequestResponse",
FunctionName = self.name,
Payload = json.dumps(payload) or "{}")

View file

@ -28,6 +28,7 @@ class Machine(minerva.Remote):
self.key_pair = key_pair self.key_pair = key_pair
self.variables = variables self.variables = variables
self.name = name self.name = name
self.instance_id = None
self.ready = False self.ready = False
self.info = None self.info = None
self.ssh = None self.ssh = None
@ -63,12 +64,13 @@ class Machine(minerva.Remote):
self.info = res['Instances'][0] self.info = res['Instances'][0]
self.private_ip = self.info['NetworkInterfaces'][0]['PrivateIpAddress'] self.private_ip = self.info['NetworkInterfaces'][0]['PrivateIpAddress']
self.instance_id = self.info['InstanceId']
# TODO there should be a check here in case some instances fail to # TODO there should be a check here in case some instances fail to
# start up in a timely manner # start up in a timely manner
# Start a countdown in the background # Start a countdown in the background
# to give time for the instance to start up # to give time for the instance to start up
wait_time = 30 wait_time = 180
self.thread = threading.Thread(target = self.wait, self.thread = threading.Thread(target = self.wait,
args = (wait_time,), args = (wait_time,),
daemon = True) daemon = True)
@ -97,7 +99,7 @@ class Machine(minerva.Remote):
time.sleep(10) time.sleep(10)
i += 1 i += 1
if i > 18: if i > (n / 10):
reason = f"{self.info['InstanceId']} took too long to start ({i} attempts)" reason = f"{self.info['InstanceId']} took too long to start ({i} attempts)"
raise Exception(reason) raise Exception(reason)
@ -152,7 +154,7 @@ class Machine(minerva.Remote):
def run_time(self): def run_time(self):
now = datetime.datetime.now() now = datetime.datetime.now()
start_time = self.started or now # what if we haven't started? start_time = self.started or now # what if AWS hasn't made our start time available?
end_time = self.terminated or now # what if we're still running? end_time = self.terminated or now # what if we're still running?
return end_time - start_time return end_time - start_time

View file

@ -1,4 +1,6 @@
from threading import Thread, Lock from threading import Thread, Lock
import time
import os
class Pool: class Pool:
def __init__(self, worker, num=1, web=False): def __init__(self, worker, num=1, web=False):
@ -6,6 +8,7 @@ class Pool:
# many concurrent requests for AWS # many concurrent requests for AWS
self.machines = [worker(i).create() for i in range(num)] self.machines = [worker(i).create() for i in range(num)]
self.mutex = None self.mutex = None
self.jobs = []
if web: if web:
import minerva.web import minerva.web
@ -16,6 +19,7 @@ class Pool:
machine.join() machine.join()
machine.login() machine.login()
# One thread per machine
def run(self, func, data=[]): def run(self, func, data=[]):
if not data or not func: if not data or not func:
return return
@ -49,8 +53,11 @@ class Pool:
self.mutex.release() self.mutex.release()
# do the work # do the work
func(machine, item) start = time.time()
#time.sleep(0.5) result = func(machine, item)
total = time.time() - start
self.jobs.append({"time": total, "input": item, "return": result})
# prior to return to the while-loop check # prior to return to the while-loop check
self.mutex.acquire() self.mutex.acquire()
@ -66,3 +73,31 @@ class Pool:
def cost(self): def cost(self):
return sum([mach.cost() for mach in self.machines]) return sum([mach.cost() for mach in self.machines])
class TempOuts:
def __init__(self, directory, prefix):
self.directory = directory
self.prefix = prefix
self.stdout = None
self.stderr = None
def __enter__(self):
try:
os.mkdir(self.directory)
except:
pass
path = os.path.join(self.directory, self.prefix)
self.stdout = open(f"{path}_stdout.out", "ab")
self.stderr = open(f"{path}_stderr.out", "ab")
return (self.stdout, self.stderr)
def __exit__(self, exception_type, exception_value, exception_traceback):
self.stdout.close()
self.stderr.close()

View file

@ -22,7 +22,7 @@ class Remote:
key_path): key_path):
self.ip = ip self.ip = ip
self.username = username self.username = username
self.key_path = key_path # full path self.key_path = os.path.expanduser(key_path) # full path
self.ssh = None self.ssh = None
def login(self): def login(self):
@ -55,7 +55,7 @@ class Remote:
# #
# https://github.com/paramiko/paramiko/issues/593#issuecomment-145377328 # https://github.com/paramiko/paramiko/issues/593#issuecomment-145377328
# #
def cmd(self, command, hide=True, disown=False, watch=False): def cmd(self, command, hide=True, disown=False, watch=False, output=(None, None)):
# TODO this is necessary to load paramiko details # TODO this is necessary to load paramiko details
#self.ssh.run("echo hello world", warn=True, hide=hide, disown=disown) #self.ssh.run("echo hello world", warn=True, hide=hide, disown=disown)
@ -73,8 +73,11 @@ class Remote:
# are done # are done
# #
# Thanks to SirDonNick in #python for the help here # Thanks to SirDonNick in #python for the help here
out = tempfile.NamedTemporaryFile(delete=False) out = output[0] or tempfile.NamedTemporaryFile(delete=False)
err = tempfile.NamedTemporaryFile(delete=False) err = output[1] or tempfile.NamedTemporaryFile(delete=False)
print(command)
print(f"\t{out.name} -- {err.name}")
# Taken from # Taken from
# https://stackoverflow.com/a/78765054 # https://stackoverflow.com/a/78765054
@ -85,14 +88,14 @@ class Remote:
# indicate that we're not going to write to that channel anymore # indicate that we're not going to write to that channel anymore
channel.shutdown_write() channel.shutdown_write()
# read stdout/stderr to prevent read block hangs ## read stdout/stderr to prevent read block hangs
flush_data(channel.recv(len(channel.in_buffer)), #flush_data(channel.recv(len(channel.in_buffer)),
out, # out,
(watch and sys.stdout.buffer)) # (watch and sys.stdout.buffer))
flush_data(channel.recv_stderr(len(channel.in_stderr_buffer)), #flush_data(channel.recv_stderr(len(channel.in_stderr_buffer)),
err, # err,
(watch and sys.stderr.buffer)) # (watch and sys.stderr.buffer))
timeout = 60 timeout = 60
@ -103,6 +106,7 @@ class Remote:
or channel.recv_stderr_ready()): or channel.recv_stderr_ready()):
# stop if channel was closed prematurely and buffers are empty # stop if channel was closed prematurely and buffers are empty
got_chunk = False got_chunk = False
readq, _, _ = select.select([channel], [], [], timeout) readq, _, _ = select.select([channel], [], [], timeout)
# returns three empty lists on timeout # returns three empty lists on timeout
@ -158,9 +162,9 @@ class Remote:
return (open(out.name, "rb"), open(err.name, "rb"), thread) return (open(out.name, "rb"), open(err.name, "rb"), thread)
def write_env_file(self, variables, fname="~/env.list"): def write_env_file(self, variables, fname="~/env.list", output=(None, None)):
vals = "\n".join([f"{var}={val}" for var, val in variables.items()]) vals = "\n".join([f"{var}={val}" for var, val in variables.items()])
self.cmd(f"echo {shlex.quote(vals)} > {fname}") self.cmd(f"echo {shlex.quote(vals)} > {fname}", output=output)
return fname return fname
@ -169,22 +173,22 @@ class Remote:
return docker return docker
def aws_docker_login(self, ecr): def aws_docker_login(self, ecr, output=(None, None)):
return self.cmd(f"aws ecr get-login-password --region {self.pier.session.region_name} | " + return self.cmd(f"aws ecr get-login-password --region {self.pier.session.region_name} | " +
f"docker login --username AWS --password-stdin {ecr}" f"docker login --username AWS --password-stdin {ecr}",
) output=output)
def docker_run(self, uri, cmd="", env={}): def docker_run(self, uri, cmd="", env={}, output=(None, None)):
if env: if env:
fname = self.write_env_file(env) fname = self.write_env_file(env)
environ = f"--env-file {fname}" environ = f"--env-file {fname}"
else: else:
environ = "" environ = ""
return self.cmd(f"docker run {environ} {uri} {cmd}") return self.cmd(f"docker run -t {environ} {uri} {cmd}", output=output)
def docker_pull(self, uri): def docker_pull(self, uri, output=(None, None)):
return self.cmd(f"docker pull {uri}") return self.cmd(f"docker pull {uri}", output=output)

View file

@ -18,12 +18,12 @@ minerva-console = "minerva.console:main"
[tool.poetry.dependencies] [tool.poetry.dependencies]
python = ">3.9" python = ">3.9"
boto3 = "^1.34.0" boto3 = "^1.34.0"
pyarrow = "^14.0.1" pyarrow = "^16.0"
joblib = "^1.1.0" joblib = "^1.1.0"
fabric = "^3.0.0" fabric = "^3.0.0"
s3fs = ">2023.6.0" s3fs = ">2023.6.0"
mako = ">1.2.0" mako = ">1.2.0"
dask = ">2023.11.0" #dask = ">2023.11.0"
distributed = ">2023.11.0" #distributed = ">2023.11.0"
pandas = ">2.0.0" pandas = ">2.0.0"
numpy = ">1.26.0" numpy = ">2.0"