diff --git a/minerva/__init__.py b/minerva/__init__.py index 5fbad8a..5d7e0b8 100644 --- a/minerva/__init__.py +++ b/minerva/__init__.py @@ -8,6 +8,7 @@ from .docker import Docker from .machine import Machine from .pier import Pier from .pool import Pool +from .remote import Remote from .minerva import Minerva @@ -24,6 +25,7 @@ __all__ = [ "load_template", "load_sql", "AWS_INSTANCES", - "Pool" + "Pool", + "Remote" ] diff --git a/minerva/machine.py b/minerva/machine.py index 5d7b68f..26621c8 100644 --- a/minerva/machine.py +++ b/minerva/machine.py @@ -204,10 +204,20 @@ class Machine: break for c in readq: if c.recv_ready(): - stdout_chunks.append(channel.recv(len(c.in_buffer))) + #stdout_chunks.append(channel.recv(len(c.in_buffer))) + data = channel.recv(len(c.in_buffer)) + if data: + print("*******") + print(repr(data)) + os.write(out_w, data) got_chunk = True if c.recv_stderr_ready(): - stderr_chunks.append(channel.recv_stderr(len(c.in_stderr_buffer))) + #stderr_chunks.append(channel.recv_stderr(len(c.in_stderr_buffer))) + data = channel.recv_stderr(len(c.in_stderr_buffer)) + if data: + print("*******") + print(repr(data)) + os.write(err_w, data) got_chunk = True # for c @@ -241,8 +251,10 @@ class Machine: if not disown: thread.join() - return (stdout_chunks, stderr_chunks) - #return (os.fdopen(out_r), os.fdopen(err_r), thread) + #return (stdout_chunks, stderr_chunks) + os.close(out_w) + os.close(err_w) + return (os.fdopen(out_r), os.fdopen(err_r), thread) def write_env_file(self, variables, fname="~/env.list"): diff --git a/minerva/pier.py b/minerva/pier.py index 1eb43dd..ebb64e0 100644 --- a/minerva/pier.py +++ b/minerva/pier.py @@ -40,6 +40,7 @@ class Pier: print(f"making keypair ({self.key_pair_name})") self.key = self.ec2.create_key_pair(KeyName=self.key_pair_name) + # TODO add some randomization here self.key_path = "/tmp/key" with open(self.key_path, "w") as f: f.write(self.key['KeyMaterial']) @@ -61,30 +62,3 @@ class Pier: def cluster(self, *args, **kwargs): return Cluster(self, *args, **kwargs) - - def run_time(self, instance_id): - ct = self.session.client("cloudtrail") - events = ct.lookup_events(LookupAttributes = [{"AttributeKey": "ResourceName", - "AttributeValue": instance_id}]) - - if len(events['Events']) == 0: - return None - - starts = [] - stops = [] - for event in events['Events']: - if event['EventName'] in ['RunInstances', 'StartInstances']: - starts.append(event['EventTime']) - elif event['EventName'] in ['StopInstances', 'TerminateInstances']: - stops.append(event['EventTime']) - - # What if the instance is still running? - if len(stops) == len(starts) - 1: - stops.append(datetime.datetime.now()) - - print(f"{len(starts)} starts, {len(stops)} stops") - - times = [stop - start for (start, stop) in starts.zip(stops)] - return sum(times) - - diff --git a/minerva/remote.py b/minerva/remote.py new file mode 100644 index 0000000..204cfc2 --- /dev/null +++ b/minerva/remote.py @@ -0,0 +1,183 @@ +from fabric import Connection +import os +import threading +import select + +# Bare machine, not necessarily associated with AWS +class Remote: + def __init__(self, + ip, + username, + key_path): + self.ip = ip + self.username = username + self.key_path = key_path # full path + self.ssh = None + + def login(self): + if self.ssh: + return True + + self.ssh = Connection(self.ip, + self.username, + connect_kwargs = { + "key_filename": self.key_path + }) + return True + + + def prep_variables(self): + varz = [f"export {var}={repr(val)}" for var, val in self.variables.items()] + base = ['source ~/.profile'] + return "; ".join([*base, *varz]) + + + # Unfortunately, under the hood, it's running /bin/bash -c '...' + # You stand informed + # + # This creates a pseudo-TTY on the other end + # + # `watch` means it'll print the output live, else it'll wait till it's finished + # `disown` means it'll run in the background + # + # TODO switch this to use `self.ssh.client.exec_command()` + def cmd(self, command, hide=True, disown=False, watch=False): + res = self.ssh.run(f"{self.prep_variables()}; {command}", + warn=True, + hide=hide, + disown=disown) + + return res + + # https://github.com/paramiko/paramiko/issues/593#issuecomment-145377328 + # https://stackoverflow.com/questions/23504126/do-you-have-to-check-exit-status-ready-if-you-are-going-to-check-recv-ready/32758464#32758464 + # + def cmd(self, command, hide=True, disown=False, watch=False): + print(self.ssh) + print(self.ssh.client) + + self.ssh.run("echo hello world", warn=True, hide=hide, disown=disown) + + stdin, stdout, stderr = self.ssh.client.exec_command(command) + + # this is the same for all three inputs + channel = stdin.channel + + out_r, out_w = os.pipe() + err_r, err_w = os.pipe() + + os.write(out_w, b"hello world") + + # https://stackoverflow.com/a/78765054 + + # we do not need stdin. + stdin.close() + # indicate that we're not going to write to that channel anymore + channel.shutdown_write() + + # read stdout/stderr to prevent read block hangs + stdout_chunks = [] + stderr_chunks = [] + stdout_chunks.append(channel.recv(len(channel.in_buffer))) + + timeout = 60 + + def fill_buffers(out, err): + # perform chunked read to prevent stalls + while (not channel.closed + or channel.recv_ready() + or channel.recv_stderr_ready()): + # stop if channel was closed prematurely and buffers are empty + got_chunk = False + readq, _, _ = select.select([channel], [], [], timeout) + + # returns three empty lists on timeout + if not readq: + break + for c in readq: + if c.recv_ready(): + #stdout_chunks.append(channel.recv(len(c.in_buffer))) + data = channel.recv(len(c.in_buffer)) + if data: + print("*******") + print(repr(data)) + os.write(out, data) + got_chunk = True + if c.recv_stderr_ready(): + #stderr_chunks.append(channel.recv_stderr(len(c.in_stderr_buffer))) + data = channel.recv_stderr(len(c.in_stderr_buffer)) + if data: + print("*******") + print(repr(data)) + os.write(err, data) + got_chunk = True + # for c + + """ + 1) make sure that there are at least 2 cycles with no data in the input + buffers in order to not exit too early; i.e., cat on a >200k file + 2) if no data arrived in the last loop, check if we received exit code + 3) check if input buffers are empty + 4) exit the loop + """ + if (not got_chunk + and channel.exit_status_ready() + and not channel.recv_stderr_ready() + and not channel.recv_ready()): + # indicate that we're not going to read from this channel anymore + channel.shutdown_read() + # close the channel + channel.close() + # remote side is finished and our buffers are empty + break + # if + # while + + # close the pseudofiles + stdout.close() + stderr.close() + + #thread = threading.Thread(target = fill_buffers, + # args = (out_w, err_w)) + #thread.start() + fill_buffers(out_w, err_w) + + #if not disown: + # thread.join() + + #return (stdout_chunks, stderr_chunks) + os.close(out_w) + os.close(err_w) + return (os.fdopen(out_r), os.fdopen(err_r)), thread) + + + def write_env_file(self, variables, fname="~/env.list"): + vals = "\n".join([f"{var}={val}" for var, val in variables.items()]) + self.cmd(f"echo {shlex.quote(vals)} > {fname}") + return fname + + + def load_docker(self, docker): + docker.machine = self + return docker + + + def aws_docker_login(self, ecr): + return self.cmd(f"aws ecr get-login-password --region {self.pier.session.region_name} | " + + f"docker login --username AWS --password-stdin {ecr}" +) + + + def docker_run(self, uri, cmd="", env={}): + if env: + fname = self.write_env_file(env) + environ = f"--env-file {fname}" + else: + environ = "" + + return self.cmd(f"docker run {environ} {uri} {cmd}") + + + def docker_pull(self, uri): + return self.cmd(f"docker pull {uri}") +