minerva/minerva/cluster.py

106 lines
3.5 KiB
Python

import dask
# https://cloudprovider.dask.org/en/latest/aws.html#elastic-compute-cloud-ec2
# https://github.com/dask/dask-ec2/blob/master/notebooks/03-timeseries.ipynb
# https://github.com/dask/distributed/issues/2267
# import dask
# import dask.distributed
# import dask.dataframe as dd
#
# c = dask.distributed.Client('<ip>:8786')
# d = dask.delayed(dd.read_parquet)('gcs://<bucket_name>/0.parquet',
# storage_options={'token':'cloud'})
# df = dask.compute(d)[0]
# https://saturncloud.io/blog/how-to-read-parquet-files-from-s3-using-dask-with-a-specific-aws-profile/
# Manually build cluster
# https://saturncloud.io/blog/how-to-set-up-a-dask-cluster/
from dask.distributed import Client
import dask
dask.config.set({"distributed.comm.retry.count": 10})
dask.config.set({"distributed.comm.timeouts.connect": 30})
dask.config.set({"distributed.worker.memory.terminate": False})
class Cluster:
def __init__(self, pier, scheduler, worker, num_workers=1):
self.pier = pier
self.methods = {"scheduler": scheduler,
"worker": worker}
self.scheduler = scheduler(pier)
self.workers = [worker(pier, n) for n in range(num_workers)]
def start(self):
self.create()
self.login()
self.start_dask()
# Begin the startup process in the background
def create(self):
self.scheduler.create()
for w in self.workers:
w.create()
# Wait for the instances to finish starting up and log in to them
def login(self):
self.scheduler.login()
for w in self.workers:
w.login()
self.public_location = f"{self.scheduler.public_ip}:8786"
self.private_location = f"{self.scheduler.private_ip}:8786"
# Start the dask processes necessary for cluster communication
def start_dask(self):
self.scheduler.cmd("dask scheduler", disown=True)
for w in self.workers:
w.cmd(f"dask worker {self.scheduler.private_ip}:8786", disown=True)
def terminate(self):
self.scheduler.terminate()
for w in self.workers:
w.terminate()
def make_security_group(self, vpc_id, name="Dask", desc="Worker and Scheduler communication"):
response = self.pier.ec2.create_security_group(GroupName = name,
Description = desc,
VpcId = vpc_id)
security_group_id = response['GroupId']
print('security group created %s in vpc %s.' % (security_group_id, vpc_id))
data = self.pier.ec2.authorize_security_group_ingress(
GroupId = security_group_id,
IpPermissions = [
{'IpProtocol': 'tcp',
'FromPort': 8786,
'ToPort': 8787,
'IpRanges': [{'CidrIp': '0.0.0.0/0'}],
'UserIdGroupPairs': [{'GroupId': security_group_id}]
},
{'IpProtocol': 'tcp',
'FromPort': 49152,
'ToPort': 65535,
'IpRanges': [{'CidrIp': '0.0.0.0/0'}],
'UserIdGroupPairs': [{'GroupId': security_group_id}]
},
{'IpProtocol': 'icmp',
'FromPort': -1,
'ToPort': -1,
'IpRanges': [{'CidrIp': '0.0.0.0/0'}],
'UserIdGroupPairs': [{'GroupId': security_group_id}]
}
])
return security_group_id