forked from bellwether/minerva
added support for determining instance costs
This commit is contained in:
parent
20ae647447
commit
c2bc9e1028
11 changed files with 1028 additions and 24 deletions
81
minerva/ray.py
Normal file
81
minerva/ray.py
Normal file
|
|
@ -0,0 +1,81 @@
|
|||
# Manually build cluster
|
||||
# https://docs.ray.io/en/latest/cluster/vms/user-guides/launching-clusters/on-premises.html
|
||||
|
||||
class RayCluster:
|
||||
def __init__(self, pier, scheduler, worker, num_workers=1):
|
||||
self.pier = pier
|
||||
self.methods = {"scheduler": scheduler,
|
||||
"worker": worker}
|
||||
self.scheduler = scheduler(pier)
|
||||
self.workers = [worker(pier, n) for n in range(num_workers)]
|
||||
|
||||
|
||||
def start(self):
|
||||
self.create()
|
||||
self.login()
|
||||
self.start_ray()
|
||||
|
||||
|
||||
# Begin the startup process in the background
|
||||
def create(self):
|
||||
self.scheduler.create()
|
||||
for w in self.workers:
|
||||
w.create()
|
||||
|
||||
|
||||
# Wait for the instances to finish starting up and log in to them
|
||||
# TODO add support for tunneling through another server first
|
||||
def login(self):
|
||||
self.scheduler.login()
|
||||
for w in self.workers:
|
||||
w.login()
|
||||
|
||||
self.public_location = f"{self.scheduler.public_ip}:6379"
|
||||
self.private_location = f"{self.scheduler.private_ip}:6379"
|
||||
|
||||
|
||||
# Start the ray processes necessary for cluster communication
|
||||
def start_ray(self):
|
||||
self.scheduler.cmd("ray start --head --port=6379 &> /tmp/scheduler.log", disown=True)
|
||||
for w in self.workers:
|
||||
w.cmd(f"ray start --address={self.private_location} &> /tmp/worker.log", disown=True)
|
||||
|
||||
|
||||
def terminate(self):
|
||||
self.scheduler.terminate()
|
||||
for w in self.workers:
|
||||
w.terminate()
|
||||
|
||||
|
||||
def make_security_group(self, vpc_id, name="Ray", desc="Worker and Scheduler communication"):
|
||||
response = self.pier.ec2.create_security_group(GroupName = name,
|
||||
Description = desc,
|
||||
VpcId = vpc_id)
|
||||
security_group_id = response['GroupId']
|
||||
print('security group created %s in vpc %s.' % (security_group_id, vpc_id))
|
||||
|
||||
data = self.pier.ec2.authorize_security_group_ingress(
|
||||
GroupId = security_group_id,
|
||||
IpPermissions = [
|
||||
{'IpProtocol': 'tcp',
|
||||
'FromPort': 6379,
|
||||
'ToPort': 6379,
|
||||
'IpRanges': [{'CidrIp': '0.0.0.0/0'}],
|
||||
'UserIdGroupPairs': [{'GroupId': security_group_id}]
|
||||
},
|
||||
{'IpProtocol': 'tcp',
|
||||
'FromPort': 49152,
|
||||
'ToPort': 65535,
|
||||
'IpRanges': [{'CidrIp': '0.0.0.0/0'}],
|
||||
'UserIdGroupPairs': [{'GroupId': security_group_id}]
|
||||
},
|
||||
{'IpProtocol': 'icmp',
|
||||
'FromPort': -1,
|
||||
'ToPort': -1,
|
||||
'IpRanges': [{'CidrIp': '0.0.0.0/0'}],
|
||||
'UserIdGroupPairs': [{'GroupId': security_group_id}]
|
||||
}
|
||||
])
|
||||
|
||||
return security_group_id
|
||||
|
||||
Loading…
Add table
Add a link
Reference in a new issue