forked from bellwether/minerva
134 lines
4.1 KiB
Python
134 lines
4.1 KiB
Python
import dask
|
|
|
|
# https://cloudprovider.dask.org/en/latest/aws.html#elastic-compute-cloud-ec2
|
|
|
|
# https://github.com/dask/dask-ec2/blob/master/notebooks/03-timeseries.ipynb
|
|
|
|
# https://github.com/dask/distributed/issues/2267
|
|
|
|
# import dask
|
|
# import dask.distributed
|
|
# import dask.dataframe as dd
|
|
#
|
|
# c = dask.distributed.Client('<ip>:8786')
|
|
# d = dask.delayed(dd.read_parquet)('gcs://<bucket_name>/0.parquet',
|
|
# storage_options={'token':'cloud'})
|
|
# df = dask.compute(d)[0]
|
|
|
|
# https://saturncloud.io/blog/how-to-read-parquet-files-from-s3-using-dask-with-a-specific-aws-profile/
|
|
|
|
# Manually build cluster
|
|
# https://saturncloud.io/blog/how-to-set-up-a-dask-cluster/
|
|
|
|
from dask.distributed import Client
|
|
import dask
|
|
|
|
dask.config.set({"distributed.comm.retry.count": 10})
|
|
dask.config.set({"distributed.comm.timeouts.connect": 30})
|
|
dask.config.set({"distributed.worker.memory.terminate": False})
|
|
|
|
class Cluster:
|
|
def __init__(self, pier, scheduler, worker, num_workers=1):
|
|
self.pier = pier
|
|
self.methods = {"scheduler": scheduler,
|
|
"worker": worker}
|
|
self.scheduler = scheduler(pier)
|
|
self.workers = [worker(pier, n) for n in range(num_workers)]
|
|
|
|
|
|
def start(self):
|
|
self.create()
|
|
self.login()
|
|
self.start_dask()
|
|
#self.connect()
|
|
|
|
#return self.client
|
|
|
|
|
|
# Begin the startup process in the background
|
|
def create(self):
|
|
self.scheduler.create()
|
|
for w in self.workers:
|
|
w.create()
|
|
|
|
|
|
# Wait for the instances to finish starting up and log in to them
|
|
def login(self):
|
|
self.scheduler.login()
|
|
for w in self.workers:
|
|
w.login()
|
|
|
|
self.public_location = f"{self.scheduler.public_ip}:8786"
|
|
self.private_location = f"{self.scheduler.private_ip}:8786"
|
|
|
|
|
|
# Start the dask processes necessary for cluster communication
|
|
def start_dask(self):
|
|
self.scheduler.cmd("dask scheduler", disown=True)
|
|
for w in self.workers:
|
|
w.cmd(f"dask worker {self.scheduler.private_ip}:8786", disown=True)
|
|
|
|
|
|
def connect(self):
|
|
self.client = Client(self.location)
|
|
return self.client
|
|
|
|
|
|
def terminate(self):
|
|
self.scheduler.terminate()
|
|
for w in self.workers:
|
|
w.terminate()
|
|
|
|
|
|
def make_security_group(self, vpc_id, name="Dask", desc="Worker and Scheduler communication"):
|
|
response = self.pier.ec2.create_security_group(GroupName = name,
|
|
Description = desc,
|
|
VpcId = vpc_id)
|
|
security_group_id = response['GroupId']
|
|
print('security group created %s in vpc %s.' % (security_group_id, vpc_id))
|
|
|
|
data = self.pier.ec2.authorize_security_group_ingress(
|
|
GroupId = security_group_id,
|
|
IpPermissions = [
|
|
{'IpProtocol': 'tcp',
|
|
'FromPort': 8786,
|
|
'ToPort': 8787,
|
|
'IpRanges': [{'CidrIp': '0.0.0.0/0'}],
|
|
'UserIdGroupPairs': [{'GroupId': security_group_id}]
|
|
},
|
|
{'IpProtocol': 'tcp',
|
|
'FromPort': 49152,
|
|
'ToPort': 65535,
|
|
'IpRanges': [{'CidrIp': '0.0.0.0/0'}],
|
|
'UserIdGroupPairs': [{'GroupId': security_group_id}]
|
|
},
|
|
{'IpProtocol': 'icmp',
|
|
'FromPort': -1,
|
|
'ToPort': -1,
|
|
'IpRanges': [{'CidrIp': '0.0.0.0/0'}],
|
|
'UserIdGroupPairs': [{'GroupId': security_group_id}]
|
|
}
|
|
])
|
|
|
|
return security_group_id
|
|
|
|
|
|
# Monkeypatch to address a known bug in Dask
|
|
# https://github.com/dask/distributed/issues/4305
|
|
from dask.distributed import Future
|
|
import sys
|
|
|
|
def monkeypatch_del(self):
|
|
try:
|
|
self.release()
|
|
except AttributeError:
|
|
# Occasionally we see this error when shutting down the client
|
|
# https://github.com/dask/distributed/issues/4305
|
|
if not sys.is_finalizing():
|
|
raise
|
|
#pass
|
|
except RuntimeError: # closed event loop
|
|
pass
|
|
|
|
Future.__del__ = monkeypatch_del
|
|
|