trying to debug why loading large amounts of data fails

This commit is contained in:
Ari Brown 2023-11-09 10:46:01 -05:00
parent 65247844e0
commit 65768f6f24
4 changed files with 21 additions and 3 deletions

View file

@ -160,9 +160,13 @@ class Query(Execute):
return self.ds return self.ds
futures = [] futures = []
print(f"{len(self.manifest_files())} files in manifest")
for fn in self.manifest_files(): for fn in self.manifest_files():
print(f"reading {fn}...")
df = pd.read_parquet(fn) df = pd.read_parquet(fn)
print("\tloaded")
future = client.scatter(df) future = client.scatter(df)
print("\tscattered")
futures.append(future) futures.append(future)
return dd.from_delayed(futures, meta=df) return dd.from_delayed(futures, meta=df)

View file

@ -50,6 +50,7 @@ class Cluster:
# Wait for the instances to finish starting up and log in to them # Wait for the instances to finish starting up and log in to them
# TODO add support for tunneling through another server first
def login(self): def login(self):
self.scheduler.login() self.scheduler.login()
for w in self.workers: for w in self.workers:

View file

@ -30,6 +30,7 @@ class Machine:
self.public = public self.public = public
self.disk_size = disk_size self.disk_size = disk_size
def create(self): def create(self):
if self.info: if self.info:
return return
@ -50,13 +51,14 @@ class Machine:
BlockDeviceMappings = [{'DeviceName': '/dev/sda1', BlockDeviceMappings = [{'DeviceName': '/dev/sda1',
'Ebs': {'VolumeSize': self.disk_size, 'Ebs': {'VolumeSize': self.disk_size,
'DeleteOnTermination': True}}], 'DeleteOnTermination': True}}],
IamInstanceProfile = iam IamInstanceProfile = iam,
Monitoring = {'Enabled': True}
) )
self.info = res['Instances'][0] self.info = res['Instances'][0]
self.private_ip = self.info['NetworkInterfaces'][0]['PrivateIpAddress'] self.private_ip = self.info['NetworkInterfaces'][0]['PrivateIpAddress']
# FIXME there should be a check here in case some instances fail to # TODO there should be a check here in case some instances fail to
# start up in a timely manner # start up in a timely manner
# Start a countdown in the background # Start a countdown in the background
# to give time for the instance to start up # to give time for the instance to start up
@ -66,14 +68,17 @@ class Machine:
daemon = True) daemon = True)
self.thread.start() self.thread.start()
def status(self): def status(self):
resp = self.pier.ec2.describe_instance_status(InstanceIds=[self.info['InstanceId']], resp = self.pier.ec2.describe_instance_status(InstanceIds=[self.info['InstanceId']],
IncludeAllInstances=True) IncludeAllInstances=True)
return resp['InstanceStatuses'][0]['InstanceState']['Name'] return resp['InstanceStatuses'][0]['InstanceState']['Name']
def join(self): def join(self):
self.thread.join() self.thread.join()
def wait(self, n): def wait(self, n):
i = 0 i = 0
# Time for the server to show as "running" # Time for the server to show as "running"
@ -91,6 +96,7 @@ class Machine:
time.sleep(25) time.sleep(25)
self.ready = True self.ready = True
# alternatively, could maybe implement this with SSM so that we can access # alternatively, could maybe implement this with SSM so that we can access
# private subnets? TODO # private subnets? TODO
def login(self): def login(self):
@ -117,11 +123,13 @@ class Machine:
) )
return True return True
def prep_variables(self): def prep_variables(self):
varz = [f"export {var}={repr(val)}" for var, val in self.variables.items()] varz = [f"export {var}={repr(val)}" for var, val in self.variables.items()]
base = ['source ~/.profile'] base = ['source ~/.profile']
return "; ".join([*base, *varz]) return "; ".join([*base, *varz])
# Unfortunately, under the hood, it's running /bin/bash -c '...' # Unfortunately, under the hood, it's running /bin/bash -c '...'
# You stand informed # You stand informed
def cmd(self, command, hide=True, disown=False): def cmd(self, command, hide=True, disown=False):
@ -132,16 +140,19 @@ class Machine:
return res return res
def write_env_file(self, variables, fname="~/env.list"): def write_env_file(self, variables, fname="~/env.list"):
vals = "\n".join([f"{var}={val}" for var, val in variables.items()]) vals = "\n".join([f"{var}={val}" for var, val in variables.items()])
self.cmd(f"echo {shlex.quote(vals)} > {fname}") self.cmd(f"echo {shlex.quote(vals)} > {fname}")
return fname return fname
def aws_docker_login(self, ecr): def aws_docker_login(self, ecr):
return self.cmd(f"aws ecr get-login-password --region {self.pier.session.region_name} | " + return self.cmd(f"aws ecr get-login-password --region {self.pier.session.region_name} | " +
f"docker login --username AWS --password-stdin {ecr}" f"docker login --username AWS --password-stdin {ecr}"
) )
def docker_run(self, uri, cmd="", env={}): def docker_run(self, uri, cmd="", env={}):
if env: if env:
fname = self.write_env_file(env) fname = self.write_env_file(env)
@ -151,9 +162,11 @@ class Machine:
return self.cmd(f"docker run {environ} {uri} {cmd}") return self.cmd(f"docker run {environ} {uri} {cmd}")
def docker_pull(self, uri): def docker_pull(self, uri):
return self.cmd(f"docker pull {uri}") return self.cmd(f"docker pull {uri}")
def terminate(self): def terminate(self):
if self.terminated: if self.terminated:
return return

View file

@ -1,6 +1,6 @@
[tool.poetry] [tool.poetry]
name = "minerva" name = "minerva"
version = "0.6.2" version = "0.6.3"
description = "Easier access to AWS Athena and Redshift" description = "Easier access to AWS Athena and Redshift"
authors = [ authors = [
"Ari Brown <ari@airintech.com>", "Ari Brown <ari@airintech.com>",