added dask clustering support

This commit is contained in:
Ari Brown 2023-10-02 17:26:02 -04:00
parent f4dd130266
commit 5eb2e39c69
7 changed files with 625 additions and 62 deletions

View file

@ -20,3 +20,57 @@ import dask
# Manually build cluster
# https://saturncloud.io/blog/how-to-set-up-a-dask-cluster/
from dask.distributed import Client
class Cluster:
def __init__(self, pier, scheduler, worker, num_workers=1):
self.pier = pier
self.methods = {"scheduler": scheduler,
"worker": worker}
self.scheduler = scheduler(pier)
self.workers = [worker(pier, n) for n in range(num_workers)]
def start(self):
self.create()
self.login()
self.start_dask()
#self.connect()
#return self.client
# Begin the startup process in the background
def create(self):
self.scheduler.create()
for w in self.workers:
w.create()
# Wait for the instances to finish starting up and log in to them
def login(self):
self.scheduler.login()
for w in self.workers:
w.login()
self.public_location = f"{self.scheduler.public_ip}:8786"
self.private_location = f"{self.scheduler.private_ip}:8786"
# Start the dask processes necessary for cluster communication
def start_dask(self):
self.scheduler.cmd("dask scheduler", disown=True)
for w in self.workers:
w.cmd(f"dask worker {self.scheduler.private_ip}:8786", disown=True)
def connect(self):
self.client = Client(self.location)
return self.client
def terminate(self):
self.scheduler.terminate()
for w in self.workers:
w.terminate()

View file

@ -47,6 +47,7 @@ class Machine:
)
self.info = res['Instances'][0]
self.private_ip = self.info['NetworkInterfaces'][0]['PrivateIpAddress']
# FIXME there should be a check here in case some instances fail to
# start up in a timely manner
@ -74,8 +75,9 @@ class Machine:
time.sleep(10)
i += 1
if i > 12:
raise f"{self.info['InstanceId']} took too long to start ({i} attempts)"
if i > 18:
reason = f"{self.info['InstanceId']} took too long to start ({i} attempts)"
raise Exception(reason)
self.ready = True
@ -93,10 +95,10 @@ class Machine:
resp = self.pier.ec2.describe_instances(InstanceIds=[self.info['InstanceId']])
self.description = resp['Reservations'][0]['Instances'][0]
self.ip_address = self.description['PublicIpAddress']
print(f"\t{self.name} ({self.info['InstanceId']}) => {self.ip_address}")
self.public_ip = self.description['PublicIpAddress']
print(f"\t{self.name} ({self.info['InstanceId']}) => {self.public_ip} ({self.private_ip})")
self.ssh = Connection(self.ip_address,
self.ssh = Connection(self.public_ip,
self.username,
connect_kwargs = {
"key_filename": self.pier.key_path
@ -105,14 +107,17 @@ class Machine:
return True
def prep_variables(self):
return "; ".join([f"export {var}={val}" for var, val in self.variables.items()])
varz = [f"export {var}={repr(val)}" for var, val in self.variables.items()]
base = ['source ~/.profile']
return "; ".join([*base, *varz])
# Unfortunately, under the hood, it's running /bin/bash -c '...'
# You stand informed
def cmd(self, command, hide=True):
def cmd(self, command, hide=True, disown=False):
res = self.ssh.run(f"{self.prep_variables()}; {command}",
warn=True,
hide=hide)
hide=hide,
disown=disown)
return res