forked from bellwether/minerva
105 lines
3.6 KiB
Python
105 lines
3.6 KiB
Python
# https://cloudprovider.dask.org/en/latest/aws.html#elastic-compute-cloud-ec2
|
|
|
|
# https://github.com/dask/dask-ec2/blob/master/notebooks/03-timeseries.ipynb
|
|
|
|
# https://github.com/dask/distributed/issues/2267
|
|
|
|
# import dask
|
|
# import dask.distributed
|
|
# import dask.dataframe as dd
|
|
#
|
|
# c = dask.distributed.Client('<ip>:8786')
|
|
# d = dask.delayed(dd.read_parquet)('gcs://<bucket_name>/0.parquet',
|
|
# storage_options={'token':'cloud'})
|
|
# df = dask.compute(d)[0]
|
|
|
|
# https://saturncloud.io/blog/how-to-read-parquet-files-from-s3-using-dask-with-a-specific-aws-profile/
|
|
|
|
# Manually build cluster
|
|
# https://saturncloud.io/blog/how-to-set-up-a-dask-cluster/
|
|
|
|
#import dask
|
|
#
|
|
#dask.config.set({"distributed.comm.retry.count": 10})
|
|
#dask.config.set({"distributed.comm.timeouts.connect": 30})
|
|
#dask.config.set({"distributed.worker.memory.terminate": False})
|
|
|
|
class Cluster:
|
|
def __init__(self, pier, scheduler, worker, num_workers=1):
|
|
self.pier = pier
|
|
self.methods = {"scheduler": scheduler,
|
|
"worker": worker}
|
|
self.scheduler = scheduler(pier)
|
|
self.workers = [worker(pier, n) for n in range(num_workers)]
|
|
|
|
|
|
def start(self):
|
|
self.create()
|
|
self.login()
|
|
self.start_dask()
|
|
|
|
|
|
# Begin the startup process in the background
|
|
def create(self):
|
|
self.scheduler.create()
|
|
for w in self.workers:
|
|
w.create()
|
|
|
|
|
|
# Wait for the instances to finish starting up and log in to them
|
|
# TODO add support for tunneling through another server first
|
|
def login(self):
|
|
self.scheduler.login()
|
|
for w in self.workers:
|
|
w.login()
|
|
|
|
self.public_location = f"{self.scheduler.public_ip}:8786"
|
|
self.private_location = f"{self.scheduler.private_ip}:8786"
|
|
|
|
|
|
# Start the dask processes necessary for cluster communication
|
|
def start_dask(self):
|
|
self.scheduler.cmd("dask scheduler &> /tmp/scheduler.log", disown=True)
|
|
for w in self.workers:
|
|
dash = 50000 + w.variables['number']
|
|
w.cmd(f"dask worker {self.private_location} --dashboard-address {dash} &> /tmp/worker.log", disown=True)
|
|
|
|
|
|
def terminate(self):
|
|
self.scheduler.terminate()
|
|
for w in self.workers:
|
|
w.terminate()
|
|
|
|
|
|
def make_security_group(self, vpc_id, name="Dask", desc="Worker and Scheduler communication"):
|
|
response = self.pier.ec2.create_security_group(GroupName = name,
|
|
Description = desc,
|
|
VpcId = vpc_id)
|
|
security_group_id = response['GroupId']
|
|
print('security group created %s in vpc %s.' % (security_group_id, vpc_id))
|
|
|
|
data = self.pier.ec2.authorize_security_group_ingress(
|
|
GroupId = security_group_id,
|
|
IpPermissions = [
|
|
{'IpProtocol': 'tcp',
|
|
'FromPort': 8786,
|
|
'ToPort': 8787,
|
|
'IpRanges': [{'CidrIp': '0.0.0.0/0'}],
|
|
'UserIdGroupPairs': [{'GroupId': security_group_id}]
|
|
},
|
|
{'IpProtocol': 'tcp',
|
|
'FromPort': 49152,
|
|
'ToPort': 65535,
|
|
'IpRanges': [{'CidrIp': '0.0.0.0/0'}],
|
|
'UserIdGroupPairs': [{'GroupId': security_group_id}]
|
|
},
|
|
{'IpProtocol': 'icmp',
|
|
'FromPort': -1,
|
|
'ToPort': -1,
|
|
'IpRanges': [{'CidrIp': '0.0.0.0/0'}],
|
|
'UserIdGroupPairs': [{'GroupId': security_group_id}]
|
|
}
|
|
])
|
|
|
|
return security_group_id
|
|
|