diff --git a/minerva/athena.py b/minerva/athena.py index b2ef466..b2b25a6 100644 --- a/minerva/athena.py +++ b/minerva/athena.py @@ -160,9 +160,13 @@ class Query(Execute): return self.ds futures = [] + print(f"{len(self.manifest_files())} files in manifest") for fn in self.manifest_files(): + print(f"reading {fn}...") df = pd.read_parquet(fn) + print("\tloaded") future = client.scatter(df) + print("\tscattered") futures.append(future) return dd.from_delayed(futures, meta=df) diff --git a/minerva/cluster.py b/minerva/cluster.py index 9aee493..c813fcb 100644 --- a/minerva/cluster.py +++ b/minerva/cluster.py @@ -50,6 +50,7 @@ class Cluster: # Wait for the instances to finish starting up and log in to them + # TODO add support for tunneling through another server first def login(self): self.scheduler.login() for w in self.workers: diff --git a/minerva/machine.py b/minerva/machine.py index 9df3f2d..69154c8 100644 --- a/minerva/machine.py +++ b/minerva/machine.py @@ -30,6 +30,7 @@ class Machine: self.public = public self.disk_size = disk_size + def create(self): if self.info: return @@ -50,13 +51,14 @@ class Machine: BlockDeviceMappings = [{'DeviceName': '/dev/sda1', 'Ebs': {'VolumeSize': self.disk_size, 'DeleteOnTermination': True}}], - IamInstanceProfile = iam + IamInstanceProfile = iam, + Monitoring = {'Enabled': True} ) self.info = res['Instances'][0] self.private_ip = self.info['NetworkInterfaces'][0]['PrivateIpAddress'] - # FIXME there should be a check here in case some instances fail to + # TODO there should be a check here in case some instances fail to # start up in a timely manner # Start a countdown in the background # to give time for the instance to start up @@ -66,14 +68,17 @@ class Machine: daemon = True) self.thread.start() + def status(self): resp = self.pier.ec2.describe_instance_status(InstanceIds=[self.info['InstanceId']], IncludeAllInstances=True) return resp['InstanceStatuses'][0]['InstanceState']['Name'] + def join(self): self.thread.join() + def wait(self, n): i = 0 # Time for the server to show as "running" @@ -91,6 +96,7 @@ class Machine: time.sleep(25) self.ready = True + # alternatively, could maybe implement this with SSM so that we can access # private subnets? TODO def login(self): @@ -117,11 +123,13 @@ class Machine: ) return True + def prep_variables(self): varz = [f"export {var}={repr(val)}" for var, val in self.variables.items()] base = ['source ~/.profile'] return "; ".join([*base, *varz]) + # Unfortunately, under the hood, it's running /bin/bash -c '...' # You stand informed def cmd(self, command, hide=True, disown=False): @@ -132,16 +140,19 @@ class Machine: return res + def write_env_file(self, variables, fname="~/env.list"): vals = "\n".join([f"{var}={val}" for var, val in variables.items()]) self.cmd(f"echo {shlex.quote(vals)} > {fname}") return fname + def aws_docker_login(self, ecr): return self.cmd(f"aws ecr get-login-password --region {self.pier.session.region_name} | " + f"docker login --username AWS --password-stdin {ecr}" ) + def docker_run(self, uri, cmd="", env={}): if env: fname = self.write_env_file(env) @@ -151,9 +162,11 @@ class Machine: return self.cmd(f"docker run {environ} {uri} {cmd}") + def docker_pull(self, uri): return self.cmd(f"docker pull {uri}") + def terminate(self): if self.terminated: return diff --git a/pyproject.toml b/pyproject.toml index 84068cd..340dc58 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "minerva" -version = "0.6.2" +version = "0.6.3" description = "Easier access to AWS Athena and Redshift" authors = [ "Ari Brown ",