added bare machine support (remote), and making progress with the stdout stderr redirection

This commit is contained in:
Ari Brown 2024-07-23 16:34:34 -04:00
parent 2e215c45cf
commit d67e398a69
4 changed files with 203 additions and 32 deletions

View file

@ -8,6 +8,7 @@ from .docker import Docker
from .machine import Machine
from .pier import Pier
from .pool import Pool
from .remote import Remote
from .minerva import Minerva
@ -24,6 +25,7 @@ __all__ = [
"load_template",
"load_sql",
"AWS_INSTANCES",
"Pool"
"Pool",
"Remote"
]

View file

@ -204,10 +204,20 @@ class Machine:
break
for c in readq:
if c.recv_ready():
stdout_chunks.append(channel.recv(len(c.in_buffer)))
#stdout_chunks.append(channel.recv(len(c.in_buffer)))
data = channel.recv(len(c.in_buffer))
if data:
print("*******")
print(repr(data))
os.write(out_w, data)
got_chunk = True
if c.recv_stderr_ready():
stderr_chunks.append(channel.recv_stderr(len(c.in_stderr_buffer)))
#stderr_chunks.append(channel.recv_stderr(len(c.in_stderr_buffer)))
data = channel.recv_stderr(len(c.in_stderr_buffer))
if data:
print("*******")
print(repr(data))
os.write(err_w, data)
got_chunk = True
# for c
@ -241,8 +251,10 @@ class Machine:
if not disown:
thread.join()
return (stdout_chunks, stderr_chunks)
#return (os.fdopen(out_r), os.fdopen(err_r), thread)
#return (stdout_chunks, stderr_chunks)
os.close(out_w)
os.close(err_w)
return (os.fdopen(out_r), os.fdopen(err_r), thread)
def write_env_file(self, variables, fname="~/env.list"):

View file

@ -40,6 +40,7 @@ class Pier:
print(f"making keypair ({self.key_pair_name})")
self.key = self.ec2.create_key_pair(KeyName=self.key_pair_name)
# TODO add some randomization here
self.key_path = "/tmp/key"
with open(self.key_path, "w") as f:
f.write(self.key['KeyMaterial'])
@ -61,30 +62,3 @@ class Pier:
def cluster(self, *args, **kwargs):
return Cluster(self, *args, **kwargs)
def run_time(self, instance_id):
ct = self.session.client("cloudtrail")
events = ct.lookup_events(LookupAttributes = [{"AttributeKey": "ResourceName",
"AttributeValue": instance_id}])
if len(events['Events']) == 0:
return None
starts = []
stops = []
for event in events['Events']:
if event['EventName'] in ['RunInstances', 'StartInstances']:
starts.append(event['EventTime'])
elif event['EventName'] in ['StopInstances', 'TerminateInstances']:
stops.append(event['EventTime'])
# What if the instance is still running?
if len(stops) == len(starts) - 1:
stops.append(datetime.datetime.now())
print(f"{len(starts)} starts, {len(stops)} stops")
times = [stop - start for (start, stop) in starts.zip(stops)]
return sum(times)

183
minerva/remote.py Normal file
View file

@ -0,0 +1,183 @@
from fabric import Connection
import os
import threading
import select
# Bare machine, not necessarily associated with AWS
class Remote:
def __init__(self,
ip,
username,
key_path):
self.ip = ip
self.username = username
self.key_path = key_path # full path
self.ssh = None
def login(self):
if self.ssh:
return True
self.ssh = Connection(self.ip,
self.username,
connect_kwargs = {
"key_filename": self.key_path
})
return True
def prep_variables(self):
varz = [f"export {var}={repr(val)}" for var, val in self.variables.items()]
base = ['source ~/.profile']
return "; ".join([*base, *varz])
# Unfortunately, under the hood, it's running /bin/bash -c '...'
# You stand informed
#
# This creates a pseudo-TTY on the other end
#
# `watch` means it'll print the output live, else it'll wait till it's finished
# `disown` means it'll run in the background
#
# TODO switch this to use `self.ssh.client.exec_command()`
def cmd(self, command, hide=True, disown=False, watch=False):
res = self.ssh.run(f"{self.prep_variables()}; {command}",
warn=True,
hide=hide,
disown=disown)
return res
# https://github.com/paramiko/paramiko/issues/593#issuecomment-145377328
# https://stackoverflow.com/questions/23504126/do-you-have-to-check-exit-status-ready-if-you-are-going-to-check-recv-ready/32758464#32758464
#
def cmd(self, command, hide=True, disown=False, watch=False):
print(self.ssh)
print(self.ssh.client)
self.ssh.run("echo hello world", warn=True, hide=hide, disown=disown)
stdin, stdout, stderr = self.ssh.client.exec_command(command)
# this is the same for all three inputs
channel = stdin.channel
out_r, out_w = os.pipe()
err_r, err_w = os.pipe()
os.write(out_w, b"hello world")
# https://stackoverflow.com/a/78765054
# we do not need stdin.
stdin.close()
# indicate that we're not going to write to that channel anymore
channel.shutdown_write()
# read stdout/stderr to prevent read block hangs
stdout_chunks = []
stderr_chunks = []
stdout_chunks.append(channel.recv(len(channel.in_buffer)))
timeout = 60
def fill_buffers(out, err):
# perform chunked read to prevent stalls
while (not channel.closed
or channel.recv_ready()
or channel.recv_stderr_ready()):
# stop if channel was closed prematurely and buffers are empty
got_chunk = False
readq, _, _ = select.select([channel], [], [], timeout)
# returns three empty lists on timeout
if not readq:
break
for c in readq:
if c.recv_ready():
#stdout_chunks.append(channel.recv(len(c.in_buffer)))
data = channel.recv(len(c.in_buffer))
if data:
print("*******")
print(repr(data))
os.write(out, data)
got_chunk = True
if c.recv_stderr_ready():
#stderr_chunks.append(channel.recv_stderr(len(c.in_stderr_buffer)))
data = channel.recv_stderr(len(c.in_stderr_buffer))
if data:
print("*******")
print(repr(data))
os.write(err, data)
got_chunk = True
# for c
"""
1) make sure that there are at least 2 cycles with no data in the input
buffers in order to not exit too early; i.e., cat on a >200k file
2) if no data arrived in the last loop, check if we received exit code
3) check if input buffers are empty
4) exit the loop
"""
if (not got_chunk
and channel.exit_status_ready()
and not channel.recv_stderr_ready()
and not channel.recv_ready()):
# indicate that we're not going to read from this channel anymore
channel.shutdown_read()
# close the channel
channel.close()
# remote side is finished and our buffers are empty
break
# if
# while
# close the pseudofiles
stdout.close()
stderr.close()
#thread = threading.Thread(target = fill_buffers,
# args = (out_w, err_w))
#thread.start()
fill_buffers(out_w, err_w)
#if not disown:
# thread.join()
#return (stdout_chunks, stderr_chunks)
os.close(out_w)
os.close(err_w)
return (os.fdopen(out_r), os.fdopen(err_r)), thread)
def write_env_file(self, variables, fname="~/env.list"):
vals = "\n".join([f"{var}={val}" for var, val in variables.items()])
self.cmd(f"echo {shlex.quote(vals)} > {fname}")
return fname
def load_docker(self, docker):
docker.machine = self
return docker
def aws_docker_login(self, ecr):
return self.cmd(f"aws ecr get-login-password --region {self.pier.session.region_name} | " +
f"docker login --username AWS --password-stdin {ecr}"
)
def docker_run(self, uri, cmd="", env={}):
if env:
fname = self.write_env_file(env)
environ = f"--env-file {fname}"
else:
environ = ""
return self.cmd(f"docker run {environ} {uri} {cmd}")
def docker_pull(self, uri):
return self.cmd(f"docker pull {uri}")