using select to get partial updates of stdout and stderr from commands

This commit is contained in:
Ari Brown 2024-07-22 22:09:17 -04:00
parent 745919e587
commit 2e215c45cf
2 changed files with 100 additions and 3 deletions

View file

@ -1,7 +1,7 @@
import threading import threading
class Docker: class Docker:
def __init__(self, machine, container, variables={}, stdout=None, stderr=None): def __init__(self, container, machine=None, variables={}, stdout=None, stderr=None):
self.machine = machine self.machine = machine
self.uri = container self.uri = container
self.variables = variables self.variables = variables

View file

@ -5,7 +5,9 @@ import datetime
from fabric import Connection from fabric import Connection
import shlex import shlex
import threading import threading
import os
import minerva import minerva
import select
class Machine: class Machine:
def __init__(self, def __init__(self,
@ -143,8 +145,13 @@ class Machine:
# Unfortunately, under the hood, it's running /bin/bash -c '...' # Unfortunately, under the hood, it's running /bin/bash -c '...'
# You stand informed # You stand informed
# #
# This is blocking # This creates a pseudo-TTY on the other end
def cmd(self, command, hide=True, disown=False): #
# `watch` means it'll print the output live, else it'll wait till it's finished
# `disown` means it'll run in the background
#
# TODO switch this to use `self.ssh.client.exec_command()`
def cmd(self, command, hide=True, disown=False, watch=False):
res = self.ssh.run(f"{self.prep_variables()}; {command}", res = self.ssh.run(f"{self.prep_variables()}; {command}",
warn=True, warn=True,
hide=hide, hide=hide,
@ -152,6 +159,91 @@ class Machine:
return res return res
# https://github.com/paramiko/paramiko/issues/593#issuecomment-145377328
# https://stackoverflow.com/questions/23504126/do-you-have-to-check-exit-status-ready-if-you-are-going-to-check-recv-ready/32758464#32758464
#
def cmd(self, command, hide=True, disown=False, watch=False):
print(self.ssh)
print(self.ssh.client)
self.ssh.run("echo hello world", warn=True, hide=hide, disown=disown)
stdin, stdout, stderr = self.ssh.client.exec_command(command)
# this is the same for all three inputs
channel = stdin.channel
out_r, out_w = os.pipe()
err_r, err_w = os.pipe()
# https://stackoverflow.com/a/78765054
# we do not need stdin.
stdin.close()
# indicate that we're not going to write to that channel anymore
channel.shutdown_write()
# read stdout/stderr to prevent read block hangs
stdout_chunks = []
stderr_chunks = []
stdout_chunks.append(channel.recv(len(channel.in_buffer)))
timeout = 60
def fill_buffers():
# perform chunked read to prevent stalls
while (not channel.closed
or channel.recv_ready()
or channel.recv_stderr_ready()):
# stop if channel was closed prematurely and buffers are empty
got_chunk = False
readq, _, _ = select.select([channel], [], [], timeout)
# returns three empty lists on timeout
if not readq:
break
for c in readq:
if c.recv_ready():
stdout_chunks.append(channel.recv(len(c.in_buffer)))
got_chunk = True
if c.recv_stderr_ready():
stderr_chunks.append(channel.recv_stderr(len(c.in_stderr_buffer)))
got_chunk = True
# for c
"""
1) make sure that there are at least 2 cycles with no data in the input
buffers in order to not exit too early; i.e., cat on a >200k file
2) if no data arrived in the last loop, check if we received exit code
3) check if input buffers are empty
4) exit the loop
"""
if (not got_chunk
and channel.exit_status_ready()
and not channel.recv_stderr_ready()
and not channel.recv_ready()):
# indicate that we're not going to read from this channel anymore
channel.shutdown_read()
# close the channel
channel.close()
# remote side is finished and our buffers are empty
break
# if
# while
# close the pseudofiles
stdout.close()
stderr.close()
thread = threading.Thread(target = fill_buffers)
thread.start()
if not disown:
thread.join()
return (stdout_chunks, stderr_chunks)
#return (os.fdopen(out_r), os.fdopen(err_r), thread)
def write_env_file(self, variables, fname="~/env.list"): def write_env_file(self, variables, fname="~/env.list"):
vals = "\n".join([f"{var}={val}" for var, val in variables.items()]) vals = "\n".join([f"{var}={val}" for var, val in variables.items()])
@ -159,6 +251,11 @@ class Machine:
return fname return fname
def load_docker(self, docker):
docker.machine = self
return docker
def aws_docker_login(self, ecr): def aws_docker_login(self, ecr):
return self.cmd(f"aws ecr get-login-password --region {self.pier.session.region_name} | " + return self.cmd(f"aws ecr get-login-password --region {self.pier.session.region_name} | " +
f"docker login --username AWS --password-stdin {ecr}" f"docker login --username AWS --password-stdin {ecr}"