diff --git a/minerva/__init__.py b/minerva/__init__.py index 5d7e0b8..74e1b53 100644 --- a/minerva/__init__.py +++ b/minerva/__init__.py @@ -5,10 +5,10 @@ from .redshift import Redshift from .s3 import S3 from .docker import Docker +from .remote import Remote from .machine import Machine from .pier import Pier from .pool import Pool -from .remote import Remote from .minerva import Minerva diff --git a/minerva/machine.py b/minerva/machine.py index 26621c8..c9cb7ae 100644 --- a/minerva/machine.py +++ b/minerva/machine.py @@ -9,7 +9,7 @@ import os import minerva import select -class Machine: +class Machine(minerva.Remote): def __init__(self, pier, ami = "ami-0a538467cc9da9bb2", # ubuntu 22 @@ -88,6 +88,7 @@ class Machine: self.thread.join() + # Wait until the machine is ready (max 180 seconds) def wait(self, n): i = 0 # Time for the server to show as "running" @@ -136,158 +137,6 @@ class Machine: return True - def prep_variables(self): - varz = [f"export {var}={repr(val)}" for var, val in self.variables.items()] - base = ['source ~/.profile'] - return "; ".join([*base, *varz]) - - - # Unfortunately, under the hood, it's running /bin/bash -c '...' - # You stand informed - # - # This creates a pseudo-TTY on the other end - # - # `watch` means it'll print the output live, else it'll wait till it's finished - # `disown` means it'll run in the background - # - # TODO switch this to use `self.ssh.client.exec_command()` - def cmd(self, command, hide=True, disown=False, watch=False): - res = self.ssh.run(f"{self.prep_variables()}; {command}", - warn=True, - hide=hide, - disown=disown) - - return res - - # https://github.com/paramiko/paramiko/issues/593#issuecomment-145377328 - # https://stackoverflow.com/questions/23504126/do-you-have-to-check-exit-status-ready-if-you-are-going-to-check-recv-ready/32758464#32758464 - # - def cmd(self, command, hide=True, disown=False, watch=False): - print(self.ssh) - print(self.ssh.client) - - self.ssh.run("echo hello world", warn=True, hide=hide, disown=disown) - - stdin, stdout, stderr = self.ssh.client.exec_command(command) - - # this is the same for all three inputs - channel = stdin.channel - - out_r, out_w = os.pipe() - err_r, err_w = os.pipe() - - # https://stackoverflow.com/a/78765054 - - # we do not need stdin. - stdin.close() - # indicate that we're not going to write to that channel anymore - channel.shutdown_write() - - # read stdout/stderr to prevent read block hangs - stdout_chunks = [] - stderr_chunks = [] - stdout_chunks.append(channel.recv(len(channel.in_buffer))) - - timeout = 60 - - def fill_buffers(): - # perform chunked read to prevent stalls - while (not channel.closed - or channel.recv_ready() - or channel.recv_stderr_ready()): - # stop if channel was closed prematurely and buffers are empty - got_chunk = False - readq, _, _ = select.select([channel], [], [], timeout) - - # returns three empty lists on timeout - if not readq: - break - for c in readq: - if c.recv_ready(): - #stdout_chunks.append(channel.recv(len(c.in_buffer))) - data = channel.recv(len(c.in_buffer)) - if data: - print("*******") - print(repr(data)) - os.write(out_w, data) - got_chunk = True - if c.recv_stderr_ready(): - #stderr_chunks.append(channel.recv_stderr(len(c.in_stderr_buffer))) - data = channel.recv_stderr(len(c.in_stderr_buffer)) - if data: - print("*******") - print(repr(data)) - os.write(err_w, data) - got_chunk = True - # for c - - """ - 1) make sure that there are at least 2 cycles with no data in the input - buffers in order to not exit too early; i.e., cat on a >200k file - 2) if no data arrived in the last loop, check if we received exit code - 3) check if input buffers are empty - 4) exit the loop - """ - if (not got_chunk - and channel.exit_status_ready() - and not channel.recv_stderr_ready() - and not channel.recv_ready()): - # indicate that we're not going to read from this channel anymore - channel.shutdown_read() - # close the channel - channel.close() - # remote side is finished and our buffers are empty - break - # if - # while - - # close the pseudofiles - stdout.close() - stderr.close() - - thread = threading.Thread(target = fill_buffers) - thread.start() - - if not disown: - thread.join() - - #return (stdout_chunks, stderr_chunks) - os.close(out_w) - os.close(err_w) - return (os.fdopen(out_r), os.fdopen(err_r), thread) - - - def write_env_file(self, variables, fname="~/env.list"): - vals = "\n".join([f"{var}={val}" for var, val in variables.items()]) - self.cmd(f"echo {shlex.quote(vals)} > {fname}") - return fname - - - def load_docker(self, docker): - docker.machine = self - return docker - - - def aws_docker_login(self, ecr): - return self.cmd(f"aws ecr get-login-password --region {self.pier.session.region_name} | " + - f"docker login --username AWS --password-stdin {ecr}" -) - - - def docker_run(self, uri, cmd="", env={}): - if env: - fname = self.write_env_file(env) - environ = f"--env-file {fname}" - else: - environ = "" - - return self.cmd(f"docker run {environ} {uri} {cmd}") - - - def docker_pull(self, uri): - return self.cmd(f"docker pull {uri}") - - def terminate(self): if self.terminated: return diff --git a/minerva/remote.py b/minerva/remote.py index a420372..f3fb7aa 100644 --- a/minerva/remote.py +++ b/minerva/remote.py @@ -25,6 +25,7 @@ class Remote: connect_kwargs = { "key_filename": self.key_path }) + self.ssh.open() return True @@ -39,23 +40,15 @@ class Remote: # # This creates a pseudo-TTY on the other end # - # `watch` means it'll print the output live, else it'll wait till it's finished + # `watch` means it'll print the output live, else it'll return the + # output (stdout, stderr) streams and the thread # `disown` means it'll run in the background # - # TODO switch this to use `self.ssh.client.exec_command()` - #def cmd(self, command, hide=True, disown=False, watch=False): - # res = self.ssh.run(f"{self.prep_variables()}; {command}", - # warn=True, - # hide=hide, - # disown=disown) - - # return res - # https://github.com/paramiko/paramiko/issues/593#issuecomment-145377328 - # https://stackoverflow.com/questions/23504126/do-you-have-to-check-exit-status-ready-if-you-are-going-to-check-recv-ready/32758464#32758464 # def cmd(self, command, hide=True, disown=False, watch=False): - self.ssh.run("echo hello world", warn=True, hide=hide, disown=disown) + # TODO this is necessary to load paramiko details + #self.ssh.run("echo hello world", warn=True, hide=hide, disown=disown) stdin, stdout, stderr = self.ssh.client.exec_command(command) @@ -103,13 +96,21 @@ class Remote: break for c in readq: if c.recv_ready(): - out.write(channel.recv(len(c.in_buffer))) + data = channel.recv(len(c.in_buffer)) + out.write(data) out.flush() got_chunk = True + + if watch: + sys.stdout.write(data) if c.recv_stderr_ready(): - err.write(channel.recv_stderr(len(c.in_stderr_buffer))) + data = channel.recv_stderr(len(c.in_stderr_buffer)) + err.write(data) err.flush() got_chunk = True + + if watch: + sys.stderr.write(data) # for c """