forked from bellwether/minerva
added timing functionality for easy use
This commit is contained in:
parent
153ab074dd
commit
495d786103
3 changed files with 61 additions and 0 deletions
|
|
@ -21,6 +21,11 @@ import dask
|
||||||
# https://saturncloud.io/blog/how-to-set-up-a-dask-cluster/
|
# https://saturncloud.io/blog/how-to-set-up-a-dask-cluster/
|
||||||
|
|
||||||
from dask.distributed import Client
|
from dask.distributed import Client
|
||||||
|
import dask
|
||||||
|
|
||||||
|
dask.config.set({"distributed.comm.retry.count": 10})
|
||||||
|
dask.config.set({"distributed.comm.timeouts.connect": 30})
|
||||||
|
dask.config.set({"distributed.worker.memory.terminate": False})
|
||||||
|
|
||||||
class Cluster:
|
class Cluster:
|
||||||
def __init__(self, pier, scheduler, worker, num_workers=1):
|
def __init__(self, pier, scheduler, worker, num_workers=1):
|
||||||
|
|
@ -74,3 +79,37 @@ class Cluster:
|
||||||
for w in self.workers:
|
for w in self.workers:
|
||||||
w.terminate()
|
w.terminate()
|
||||||
|
|
||||||
|
|
||||||
|
def make_security_group(self, vpc_id, name="Dask", desc="Worker and Scheduler communication"):
|
||||||
|
response = self.pier.ec2.create_security_group(GroupName = name,
|
||||||
|
Description = desc,
|
||||||
|
VpcId = vpc_id)
|
||||||
|
security_group_id = response['GroupId']
|
||||||
|
print('security group created %s in vpc %s.' % (security_group_id, vpc_id))
|
||||||
|
|
||||||
|
data = self.pier.ec2.authorize_security_group_ingress(
|
||||||
|
GroupId = security_group_id,
|
||||||
|
IpPermissions = [
|
||||||
|
{'IpProtocol': 'tcp',
|
||||||
|
'FromPort': 8786,
|
||||||
|
'ToPort': 8787,
|
||||||
|
'IpRanges': [{'CidrIp': '0.0.0.0/0'}],
|
||||||
|
'UserIdGroupPairs': [{'GroupId': security_group_id}]
|
||||||
|
},
|
||||||
|
{'IpProtocol': 'tcp',
|
||||||
|
'FromPort': 49152,
|
||||||
|
'ToPort': 65535,
|
||||||
|
'IpRanges': [{'CidrIp': '0.0.0.0/0'}],
|
||||||
|
'UserIdGroupPairs': [{'GroupId': security_group_id}]
|
||||||
|
},
|
||||||
|
{'IpProtocol': 'icmp',
|
||||||
|
'FromPort': -1,
|
||||||
|
'ToPort': -1,
|
||||||
|
'IpRanges': [{'CidrIp': '0.0.0.0/0'}],
|
||||||
|
'UserIdGroupPairs': [{'GroupId': security_group_id}]
|
||||||
|
}
|
||||||
|
])
|
||||||
|
|
||||||
|
return security_group_id
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -160,3 +160,4 @@ class Machine:
|
||||||
)
|
)
|
||||||
print(f"terminated {self.name} ({self.info['InstanceId']})")
|
print(f"terminated {self.name} ({self.info['InstanceId']})")
|
||||||
self.terminated = True
|
self.terminated = True
|
||||||
|
|
||||||
|
|
|
||||||
21
minerva/timing.py
Normal file
21
minerva/timing.py
Normal file
|
|
@ -0,0 +1,21 @@
|
||||||
|
import time
|
||||||
|
|
||||||
|
|
||||||
|
# with Timing("my operation"):
|
||||||
|
# long_operation()
|
||||||
|
#
|
||||||
|
# => prints "my operation: 45.234"
|
||||||
|
class Timing:
|
||||||
|
def __init__(self, desc):
|
||||||
|
self.desc = desc
|
||||||
|
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
self.start = time.time()
|
||||||
|
print(self.desc)
|
||||||
|
return self
|
||||||
|
|
||||||
|
|
||||||
|
def __exit__(self, exception_type, exception_value, exception_traceback):
|
||||||
|
print(f"\t=> {time.time() - self.start}s")
|
||||||
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue