From 495d78610353b7667ccaf61ae76e1db77ce7744f Mon Sep 17 00:00:00 2001 From: Ari Brown Date: Tue, 10 Oct 2023 19:24:16 -0400 Subject: [PATCH] added timing functionality for easy use --- minerva/cluster.py | 39 +++++++++++++++++++++++++++++++++++++++ minerva/machine.py | 1 + minerva/timing.py | 21 +++++++++++++++++++++ 3 files changed, 61 insertions(+) create mode 100644 minerva/timing.py diff --git a/minerva/cluster.py b/minerva/cluster.py index 15a8dad..5ff614b 100644 --- a/minerva/cluster.py +++ b/minerva/cluster.py @@ -21,6 +21,11 @@ import dask # https://saturncloud.io/blog/how-to-set-up-a-dask-cluster/ from dask.distributed import Client +import dask + +dask.config.set({"distributed.comm.retry.count": 10}) +dask.config.set({"distributed.comm.timeouts.connect": 30}) +dask.config.set({"distributed.worker.memory.terminate": False}) class Cluster: def __init__(self, pier, scheduler, worker, num_workers=1): @@ -74,3 +79,37 @@ class Cluster: for w in self.workers: w.terminate() + + def make_security_group(self, vpc_id, name="Dask", desc="Worker and Scheduler communication"): + response = self.pier.ec2.create_security_group(GroupName = name, + Description = desc, + VpcId = vpc_id) + security_group_id = response['GroupId'] + print('security group created %s in vpc %s.' % (security_group_id, vpc_id)) + + data = self.pier.ec2.authorize_security_group_ingress( + GroupId = security_group_id, + IpPermissions = [ + {'IpProtocol': 'tcp', + 'FromPort': 8786, + 'ToPort': 8787, + 'IpRanges': [{'CidrIp': '0.0.0.0/0'}], + 'UserIdGroupPairs': [{'GroupId': security_group_id}] + }, + {'IpProtocol': 'tcp', + 'FromPort': 49152, + 'ToPort': 65535, + 'IpRanges': [{'CidrIp': '0.0.0.0/0'}], + 'UserIdGroupPairs': [{'GroupId': security_group_id}] + }, + {'IpProtocol': 'icmp', + 'FromPort': -1, + 'ToPort': -1, + 'IpRanges': [{'CidrIp': '0.0.0.0/0'}], + 'UserIdGroupPairs': [{'GroupId': security_group_id}] + } + ]) + + return security_group_id + + diff --git a/minerva/machine.py b/minerva/machine.py index 6c98156..c0be007 100644 --- a/minerva/machine.py +++ b/minerva/machine.py @@ -160,3 +160,4 @@ class Machine: ) print(f"terminated {self.name} ({self.info['InstanceId']})") self.terminated = True + diff --git a/minerva/timing.py b/minerva/timing.py new file mode 100644 index 0000000..e1315a4 --- /dev/null +++ b/minerva/timing.py @@ -0,0 +1,21 @@ +import time + + +# with Timing("my operation"): +# long_operation() +# +# => prints "my operation: 45.234" +class Timing: + def __init__(self, desc): + self.desc = desc + + + def __enter__(self): + self.start = time.time() + print(self.desc) + return self + + + def __exit__(self, exception_type, exception_value, exception_traceback): + print(f"\t=> {time.time() - self.start}s") +