diff --git a/minerva/__init__.py b/minerva/__init__.py index 0b41ca7..6b84ba0 100644 --- a/minerva/__init__.py +++ b/minerva/__init__.py @@ -1,13 +1,22 @@ -from .parallel import parallel_map -from .athena import Athena +from .athena import Athena from .redshift import Redshift -from .s3 import S3 -from .minerva import Minerva +from .s3 import S3 + +from .docker import Docker +from .machine import Machine +from .pier import Pier + +from .minerva import Minerva + +from .parallel_map import parallel_map __all__ = [ "Athena", "Redshift", - "parallel_map", - "Minerva", - "S3" + "S3", + "Docker", + "Machine", + "Pier", + "Minerva" ] + diff --git a/minerva/docker.py b/minerva/docker.py new file mode 100644 index 0000000..e2b424f --- /dev/null +++ b/minerva/docker.py @@ -0,0 +1,110 @@ +import threading + +class Docker: + def __init__(self, machine, container, variables={}, stdout=None, stderr=None): + self.machine = machine + self.uri = container + self.variables = variables + self.finished = False + self.stdout = stdout + self.stderr = stderr + self.out = {"stdout": None, + "stderr": None} + self.registry = container.split("/")[0] + + def __rshift__(self, other): + if type(other) == type([]): + other = Group(other) + + # This part is required to be sequential + self.create() + self.run() + self.terminate() + + other.create() + other.run() + other.terminate() + + return other + + def create(self): + if self.finished: + return + + print(f"creating {self.machine.name}") + self.machine.create() + + def run(self, cmd=""): + if self.finished: + return + + print(f"running {self.machine.name}") + self.machine.login() + + if self.registry.endswith("amazonaws.com"): + self.machine.aws_docker_login(self.registry) + + res = self.machine.docker_run(self.uri, cmd=cmd, env=self.variables) + + self.out["stdout"] = res.stdout + self.out["stderr"] = res.stderr + + if self.stdout: + self.stdout.write(res.stdout) + self.stdout.write("\n") + + if self.stderr: + self.stderr.write(res.stderr) + self.stderr.write("\n") + + self.finished = True + + def terminate(self): + self.machine.terminate() + +class Group: + def __init__(self, ary): + self.dockers = ary + self.finished = False + + def create(self): + if self.finished: + return + + for d in self.dockers: + d.create() + + # Green threads because these methods are IO intensive (not CPU intensive) + # This DOES however mess with the STDOUT swapping that occurs in Machine#run + def run(self): + if self.finished: + return + + threads = [threading.Thread(target=d.run) for d in self.dockers] + + for thread in threads: + thread.start() + + for thread in threads: + thread.join() + + self.finished = True + + def terminate(self): + for d in self.dockers: + d.terminate() + + def __rshift__(self, other): + if type(other) == type([]): + other = Group(other) + + self.create() + self.run() + self.terminate() + + other.create() + other.run() + other.terminate() + + return other + diff --git a/minerva/machine.py b/minerva/machine.py new file mode 100644 index 0000000..02da722 --- /dev/null +++ b/minerva/machine.py @@ -0,0 +1,150 @@ +import time +#from pexpect import pxssh +from fabric import Connection +import shlex +import threading + +class Machine: + def __init__(self, + pier, + ami = "ami-0a538467cc9da9bb2", # ubuntu 22 + instance_type = "t2.micro", + variables = {}, + username = None, + key_pair = None, + name = "Minerva Instance"): + + self.pier = pier + self.ami = ami + self.instance_type = instance_type + self.username = username + self.key_pair = key_pair + self.variables = variables + self.name = name + self.ready = False + self.info = None + self.ssh = None + self.terminated = False + + def create(self): + if self.info: + return + + iam = {'Name': self.pier.iam} if self.pier.iam else {} + res = self.pier.ec2.run_instances( + ImageId = self.ami, + InstanceType = self.instance_type, + KeyName = self.key_pair or self.pier.key_pair_name, + MinCount = 1, + MaxCount = 1, + TagSpecifications = [{'ResourceType': 'instance', + 'Tags': [{'Key': 'Name', 'Value': self.name}]}], + NetworkInterfaces = [{'AssociatePublicIpAddress': True, + 'SubnetId': self.pier.subnet_id, + 'Groups': self.pier.groups, + 'DeviceIndex': 0}], + IamInstanceProfile = iam + ) + + self.info = res['Instances'][0] + + # FIXME there should be a check here in case some instances fail to + # start up in a timely manner + # Start a countdown in the background + # to give time for the instance to start up + wait_time = 30 + self.thread = threading.Thread(target = self.wait, + args = (wait_time,), + daemon = True) + self.thread.start() + + def status(self): + resp = self.pier.ec2.describe_instance_status(InstanceIds=[self.info['InstanceId']], + IncludeAllInstances=True) + return resp['InstanceStatuses'][0]['InstanceState']['Name'] + + def join(self): + self.thread.join() + + def wait(self, n): + i = 0 + # Time for the server to show as "running" + # and time for the server to finish getting daemons running + while self.status() != "running": + time.sleep(10) + i += 1 + + if i > 12: + raise f"{self.info['InstanceId']} took too long to start ({i} attempts)" + + self.ready = True + + # alternatively, could maybe implement this with SSM so that we can access + # private subnets? TODO + def login(self): + if self.ssh: + return True + + # Machine must be running first, so we need to wait for the countdown to finish + self.join() + + # Final wait, now that the server is up and running -- need some time for daemons to start + time.sleep(25) + + resp = self.pier.ec2.describe_instances(InstanceIds=[self.info['InstanceId']]) + self.description = resp['Reservations'][0]['Instances'][0] + self.ip_address = self.description['PublicIpAddress'] + print(f"\t{self.name} ({self.info['InstanceId']}) => {self.ip_address}") + + self.ssh = Connection(self.ip_address, + self.username, + connect_kwargs = { + "key_filename": self.pier.key_path + } + ) + return True + + def prep_variables(self): + return "; ".join([f"export {var}={val}" for var, val in self.variables.items()]) + + # Unfortunately, under the hood, it's running /bin/bash -c '...' + # You stand informed + def cmd(self, command, hide=True): + res = self.ssh.run(f"{self.prep_variables()}; {command}", + warn=True, + hide=hide) + + return res + + def write_env_file(self, variables, fname="~/env.list"): + vals = "\n".join([f"{var}={val}" for var, val in variables.items()]) + self.cmd(f"echo {shlex.quote(vals)} > {fname}") + return fname + + def aws_docker_login(self, ecr): + return self.cmd(f"aws ecr get-login-password --region {self.pier.session.region_name} | " + + f"docker login --username AWS --password-stdin {ecr}" +) + + def docker_run(self, uri, cmd="", env={}): + if env: + fname = self.write_env_file(env) + environ = f"--env-file {fname}" + else: + environ = "" + + return self.cmd(f"docker run {environ} {uri} {cmd}") + + def docker_pull(self, uri): + return self.cmd(f"docker pull {uri}") + + def terminate(self): + if self.terminated: + return + + self.pier.ec2.terminate_instances( + InstanceIds=[self.info['InstanceId']], + DryRun=False + ) + print(f"terminated {self.name} ({self.info['InstanceId']})") + self.terminated = True diff --git a/minerva/pier.py b/minerva/pier.py new file mode 100644 index 0000000..ef67672 --- /dev/null +++ b/minerva/pier.py @@ -0,0 +1,51 @@ +import boto3 +import random +import os +import stat + +from minerva.machine import Machine + +# Used for interacting with AWS +class Pier: + def __init__(self, + profile = "default", + subnet_id = None, + sg_groups = [], + key_pair = None, # (keypair name, keypair privkey pair) + iam = None, + ): + self.session = boto3.session.Session(profile_name=profile) + self.ec2 = self.session.client("ec2") + self.subnet_id = subnet_id + self.groups = sg_groups + self.iam = iam + self.account = self.session.client("sts").get_caller_identity()['Account'] + self.ecr = f"{self.account}.dkr.ecr.{self.session.region_name}.amazonaws.com" + + if key_pair: + print(f"Using keypair {key_pair}") + self.key_pair_name = key_pair[0] + self.key_path = os.path.expanduser(key_pair[1]) + else: + self.make_key_pair() + + def make_key_pair(self): + print("making keypair") + self.key_pair_name = f"Minerva-{random.random()}" + self.key = self.ec2.create_key_pair(KeyName=self.key_pair_name) + + self.key_path = "/tmp/key" + with open(self.key_path, "w") as f: + f.write(self.key['KeyMaterial']) + os.chmod(self.key_path, stat.S_IRUSR | stat.S_IWUSR) + + def machine(self, **kwargs): + return Machine(self, **kwargs) + + def t3_med(self, num): + return self.machine(ami = "ami-0a538467cc9da9bb2", + instance_type = "t3.medium", + username = "ubuntu", + name = f"blah-{num}", + variables = {"num": num}) +