folded wharfer in and updated __init__.py

This commit is contained in:
Ari Brown 2023-09-19 16:52:20 -04:00
parent bd3c578c06
commit efe0f40c04
4 changed files with 327 additions and 7 deletions

View file

@ -1,13 +1,22 @@
from .parallel import parallel_map from .athena import Athena
from .athena import Athena
from .redshift import Redshift from .redshift import Redshift
from .s3 import S3 from .s3 import S3
from .minerva import Minerva
from .docker import Docker
from .machine import Machine
from .pier import Pier
from .minerva import Minerva
from .parallel_map import parallel_map
__all__ = [ __all__ = [
"Athena", "Athena",
"Redshift", "Redshift",
"parallel_map", "S3",
"Minerva", "Docker",
"S3" "Machine",
"Pier",
"Minerva"
] ]

110
minerva/docker.py Normal file
View file

@ -0,0 +1,110 @@
import threading
class Docker:
def __init__(self, machine, container, variables={}, stdout=None, stderr=None):
self.machine = machine
self.uri = container
self.variables = variables
self.finished = False
self.stdout = stdout
self.stderr = stderr
self.out = {"stdout": None,
"stderr": None}
self.registry = container.split("/")[0]
def __rshift__(self, other):
if type(other) == type([]):
other = Group(other)
# This part is required to be sequential
self.create()
self.run()
self.terminate()
other.create()
other.run()
other.terminate()
return other
def create(self):
if self.finished:
return
print(f"creating {self.machine.name}")
self.machine.create()
def run(self, cmd=""):
if self.finished:
return
print(f"running {self.machine.name}")
self.machine.login()
if self.registry.endswith("amazonaws.com"):
self.machine.aws_docker_login(self.registry)
res = self.machine.docker_run(self.uri, cmd=cmd, env=self.variables)
self.out["stdout"] = res.stdout
self.out["stderr"] = res.stderr
if self.stdout:
self.stdout.write(res.stdout)
self.stdout.write("\n")
if self.stderr:
self.stderr.write(res.stderr)
self.stderr.write("\n")
self.finished = True
def terminate(self):
self.machine.terminate()
class Group:
def __init__(self, ary):
self.dockers = ary
self.finished = False
def create(self):
if self.finished:
return
for d in self.dockers:
d.create()
# Green threads because these methods are IO intensive (not CPU intensive)
# This DOES however mess with the STDOUT swapping that occurs in Machine#run
def run(self):
if self.finished:
return
threads = [threading.Thread(target=d.run) for d in self.dockers]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
self.finished = True
def terminate(self):
for d in self.dockers:
d.terminate()
def __rshift__(self, other):
if type(other) == type([]):
other = Group(other)
self.create()
self.run()
self.terminate()
other.create()
other.run()
other.terminate()
return other

150
minerva/machine.py Normal file
View file

@ -0,0 +1,150 @@
import time
#from pexpect import pxssh
from fabric import Connection
import shlex
import threading
class Machine:
def __init__(self,
pier,
ami = "ami-0a538467cc9da9bb2", # ubuntu 22
instance_type = "t2.micro",
variables = {},
username = None,
key_pair = None,
name = "Minerva Instance"):
self.pier = pier
self.ami = ami
self.instance_type = instance_type
self.username = username
self.key_pair = key_pair
self.variables = variables
self.name = name
self.ready = False
self.info = None
self.ssh = None
self.terminated = False
def create(self):
if self.info:
return
iam = {'Name': self.pier.iam} if self.pier.iam else {}
res = self.pier.ec2.run_instances(
ImageId = self.ami,
InstanceType = self.instance_type,
KeyName = self.key_pair or self.pier.key_pair_name,
MinCount = 1,
MaxCount = 1,
TagSpecifications = [{'ResourceType': 'instance',
'Tags': [{'Key': 'Name', 'Value': self.name}]}],
NetworkInterfaces = [{'AssociatePublicIpAddress': True,
'SubnetId': self.pier.subnet_id,
'Groups': self.pier.groups,
'DeviceIndex': 0}],
IamInstanceProfile = iam
)
self.info = res['Instances'][0]
# FIXME there should be a check here in case some instances fail to
# start up in a timely manner
# Start a countdown in the background
# to give time for the instance to start up
wait_time = 30
self.thread = threading.Thread(target = self.wait,
args = (wait_time,),
daemon = True)
self.thread.start()
def status(self):
resp = self.pier.ec2.describe_instance_status(InstanceIds=[self.info['InstanceId']],
IncludeAllInstances=True)
return resp['InstanceStatuses'][0]['InstanceState']['Name']
def join(self):
self.thread.join()
def wait(self, n):
i = 0
# Time for the server to show as "running"
# and time for the server to finish getting daemons running
while self.status() != "running":
time.sleep(10)
i += 1
if i > 12:
raise f"{self.info['InstanceId']} took too long to start ({i} attempts)"
self.ready = True
# alternatively, could maybe implement this with SSM so that we can access
# private subnets? TODO
def login(self):
if self.ssh:
return True
# Machine must be running first, so we need to wait for the countdown to finish
self.join()
# Final wait, now that the server is up and running -- need some time for daemons to start
time.sleep(25)
resp = self.pier.ec2.describe_instances(InstanceIds=[self.info['InstanceId']])
self.description = resp['Reservations'][0]['Instances'][0]
self.ip_address = self.description['PublicIpAddress']
print(f"\t{self.name} ({self.info['InstanceId']}) => {self.ip_address}")
self.ssh = Connection(self.ip_address,
self.username,
connect_kwargs = {
"key_filename": self.pier.key_path
}
)
return True
def prep_variables(self):
return "; ".join([f"export {var}={val}" for var, val in self.variables.items()])
# Unfortunately, under the hood, it's running /bin/bash -c '...'
# You stand informed
def cmd(self, command, hide=True):
res = self.ssh.run(f"{self.prep_variables()}; {command}",
warn=True,
hide=hide)
return res
def write_env_file(self, variables, fname="~/env.list"):
vals = "\n".join([f"{var}={val}" for var, val in variables.items()])
self.cmd(f"echo {shlex.quote(vals)} > {fname}")
return fname
def aws_docker_login(self, ecr):
return self.cmd(f"aws ecr get-login-password --region {self.pier.session.region_name} | " +
f"docker login --username AWS --password-stdin {ecr}"
)
def docker_run(self, uri, cmd="", env={}):
if env:
fname = self.write_env_file(env)
environ = f"--env-file {fname}"
else:
environ = ""
return self.cmd(f"docker run {environ} {uri} {cmd}")
def docker_pull(self, uri):
return self.cmd(f"docker pull {uri}")
def terminate(self):
if self.terminated:
return
self.pier.ec2.terminate_instances(
InstanceIds=[self.info['InstanceId']],
DryRun=False
)
print(f"terminated {self.name} ({self.info['InstanceId']})")
self.terminated = True

51
minerva/pier.py Normal file
View file

@ -0,0 +1,51 @@
import boto3
import random
import os
import stat
from minerva.machine import Machine
# Used for interacting with AWS
class Pier:
def __init__(self,
profile = "default",
subnet_id = None,
sg_groups = [],
key_pair = None, # (keypair name, keypair privkey pair)
iam = None,
):
self.session = boto3.session.Session(profile_name=profile)
self.ec2 = self.session.client("ec2")
self.subnet_id = subnet_id
self.groups = sg_groups
self.iam = iam
self.account = self.session.client("sts").get_caller_identity()['Account']
self.ecr = f"{self.account}.dkr.ecr.{self.session.region_name}.amazonaws.com"
if key_pair:
print(f"Using keypair {key_pair}")
self.key_pair_name = key_pair[0]
self.key_path = os.path.expanduser(key_pair[1])
else:
self.make_key_pair()
def make_key_pair(self):
print("making keypair")
self.key_pair_name = f"Minerva-{random.random()}"
self.key = self.ec2.create_key_pair(KeyName=self.key_pair_name)
self.key_path = "/tmp/key"
with open(self.key_path, "w") as f:
f.write(self.key['KeyMaterial'])
os.chmod(self.key_path, stat.S_IRUSR | stat.S_IWUSR)
def machine(self, **kwargs):
return Machine(self, **kwargs)
def t3_med(self, num):
return self.machine(ami = "ami-0a538467cc9da9bb2",
instance_type = "t3.medium",
username = "ubuntu",
name = f"blah-{num}",
variables = {"num": num})