added live output to command running

This commit is contained in:
Ari Brown 2024-07-25 08:07:56 -04:00
parent 4df9b04b2b
commit 205ed4bb18
3 changed files with 18 additions and 168 deletions

View file

@ -5,10 +5,10 @@ from .redshift import Redshift
from .s3 import S3 from .s3 import S3
from .docker import Docker from .docker import Docker
from .remote import Remote
from .machine import Machine from .machine import Machine
from .pier import Pier from .pier import Pier
from .pool import Pool from .pool import Pool
from .remote import Remote
from .minerva import Minerva from .minerva import Minerva

View file

@ -9,7 +9,7 @@ import os
import minerva import minerva
import select import select
class Machine: class Machine(minerva.Remote):
def __init__(self, def __init__(self,
pier, pier,
ami = "ami-0a538467cc9da9bb2", # ubuntu 22 ami = "ami-0a538467cc9da9bb2", # ubuntu 22
@ -88,6 +88,7 @@ class Machine:
self.thread.join() self.thread.join()
# Wait until the machine is ready (max 180 seconds)
def wait(self, n): def wait(self, n):
i = 0 i = 0
# Time for the server to show as "running" # Time for the server to show as "running"
@ -136,158 +137,6 @@ class Machine:
return True return True
def prep_variables(self):
varz = [f"export {var}={repr(val)}" for var, val in self.variables.items()]
base = ['source ~/.profile']
return "; ".join([*base, *varz])
# Unfortunately, under the hood, it's running /bin/bash -c '...'
# You stand informed
#
# This creates a pseudo-TTY on the other end
#
# `watch` means it'll print the output live, else it'll wait till it's finished
# `disown` means it'll run in the background
#
# TODO switch this to use `self.ssh.client.exec_command()`
def cmd(self, command, hide=True, disown=False, watch=False):
res = self.ssh.run(f"{self.prep_variables()}; {command}",
warn=True,
hide=hide,
disown=disown)
return res
# https://github.com/paramiko/paramiko/issues/593#issuecomment-145377328
# https://stackoverflow.com/questions/23504126/do-you-have-to-check-exit-status-ready-if-you-are-going-to-check-recv-ready/32758464#32758464
#
def cmd(self, command, hide=True, disown=False, watch=False):
print(self.ssh)
print(self.ssh.client)
self.ssh.run("echo hello world", warn=True, hide=hide, disown=disown)
stdin, stdout, stderr = self.ssh.client.exec_command(command)
# this is the same for all three inputs
channel = stdin.channel
out_r, out_w = os.pipe()
err_r, err_w = os.pipe()
# https://stackoverflow.com/a/78765054
# we do not need stdin.
stdin.close()
# indicate that we're not going to write to that channel anymore
channel.shutdown_write()
# read stdout/stderr to prevent read block hangs
stdout_chunks = []
stderr_chunks = []
stdout_chunks.append(channel.recv(len(channel.in_buffer)))
timeout = 60
def fill_buffers():
# perform chunked read to prevent stalls
while (not channel.closed
or channel.recv_ready()
or channel.recv_stderr_ready()):
# stop if channel was closed prematurely and buffers are empty
got_chunk = False
readq, _, _ = select.select([channel], [], [], timeout)
# returns three empty lists on timeout
if not readq:
break
for c in readq:
if c.recv_ready():
#stdout_chunks.append(channel.recv(len(c.in_buffer)))
data = channel.recv(len(c.in_buffer))
if data:
print("*******")
print(repr(data))
os.write(out_w, data)
got_chunk = True
if c.recv_stderr_ready():
#stderr_chunks.append(channel.recv_stderr(len(c.in_stderr_buffer)))
data = channel.recv_stderr(len(c.in_stderr_buffer))
if data:
print("*******")
print(repr(data))
os.write(err_w, data)
got_chunk = True
# for c
"""
1) make sure that there are at least 2 cycles with no data in the input
buffers in order to not exit too early; i.e., cat on a >200k file
2) if no data arrived in the last loop, check if we received exit code
3) check if input buffers are empty
4) exit the loop
"""
if (not got_chunk
and channel.exit_status_ready()
and not channel.recv_stderr_ready()
and not channel.recv_ready()):
# indicate that we're not going to read from this channel anymore
channel.shutdown_read()
# close the channel
channel.close()
# remote side is finished and our buffers are empty
break
# if
# while
# close the pseudofiles
stdout.close()
stderr.close()
thread = threading.Thread(target = fill_buffers)
thread.start()
if not disown:
thread.join()
#return (stdout_chunks, stderr_chunks)
os.close(out_w)
os.close(err_w)
return (os.fdopen(out_r), os.fdopen(err_r), thread)
def write_env_file(self, variables, fname="~/env.list"):
vals = "\n".join([f"{var}={val}" for var, val in variables.items()])
self.cmd(f"echo {shlex.quote(vals)} > {fname}")
return fname
def load_docker(self, docker):
docker.machine = self
return docker
def aws_docker_login(self, ecr):
return self.cmd(f"aws ecr get-login-password --region {self.pier.session.region_name} | " +
f"docker login --username AWS --password-stdin {ecr}"
)
def docker_run(self, uri, cmd="", env={}):
if env:
fname = self.write_env_file(env)
environ = f"--env-file {fname}"
else:
environ = ""
return self.cmd(f"docker run {environ} {uri} {cmd}")
def docker_pull(self, uri):
return self.cmd(f"docker pull {uri}")
def terminate(self): def terminate(self):
if self.terminated: if self.terminated:
return return

View file

@ -25,6 +25,7 @@ class Remote:
connect_kwargs = { connect_kwargs = {
"key_filename": self.key_path "key_filename": self.key_path
}) })
self.ssh.open()
return True return True
@ -39,23 +40,15 @@ class Remote:
# #
# This creates a pseudo-TTY on the other end # This creates a pseudo-TTY on the other end
# #
# `watch` means it'll print the output live, else it'll wait till it's finished # `watch` means it'll print the output live, else it'll return the
# output (stdout, stderr) streams and the thread
# `disown` means it'll run in the background # `disown` means it'll run in the background
# #
# TODO switch this to use `self.ssh.client.exec_command()`
#def cmd(self, command, hide=True, disown=False, watch=False):
# res = self.ssh.run(f"{self.prep_variables()}; {command}",
# warn=True,
# hide=hide,
# disown=disown)
# return res
# https://github.com/paramiko/paramiko/issues/593#issuecomment-145377328 # https://github.com/paramiko/paramiko/issues/593#issuecomment-145377328
# https://stackoverflow.com/questions/23504126/do-you-have-to-check-exit-status-ready-if-you-are-going-to-check-recv-ready/32758464#32758464
# #
def cmd(self, command, hide=True, disown=False, watch=False): def cmd(self, command, hide=True, disown=False, watch=False):
self.ssh.run("echo hello world", warn=True, hide=hide, disown=disown) # TODO this is necessary to load paramiko details
#self.ssh.run("echo hello world", warn=True, hide=hide, disown=disown)
stdin, stdout, stderr = self.ssh.client.exec_command(command) stdin, stdout, stderr = self.ssh.client.exec_command(command)
@ -103,13 +96,21 @@ class Remote:
break break
for c in readq: for c in readq:
if c.recv_ready(): if c.recv_ready():
out.write(channel.recv(len(c.in_buffer))) data = channel.recv(len(c.in_buffer))
out.write(data)
out.flush() out.flush()
got_chunk = True got_chunk = True
if watch:
sys.stdout.write(data)
if c.recv_stderr_ready(): if c.recv_stderr_ready():
err.write(channel.recv_stderr(len(c.in_stderr_buffer))) data = channel.recv_stderr(len(c.in_stderr_buffer))
err.write(data)
err.flush() err.flush()
got_chunk = True got_chunk = True
if watch:
sys.stderr.write(data)
# for c # for c
""" """