import configparser import os import contextlib import sys import pprint from minerva.pier import Pier from dask.distributed import Client, SSHCluster pp = pprint.PrettyPrinter(indent=4) #import logging #logging.basicConfig(level='DEBUG') def start_worker(n): mach = pier.machine(ami = "ami-03f64555d230faf32", # ubuntu 22.04 x86 instance_type = "t3.medium", username = "ubuntu", name = f"dask-worker-{n}", variables = {"type": "worker", "number": n}) mach.create() return mach def start_scheduler(): mach = pier.machine(ami = "ami-03f64555d230faf32", # dask on ubuntu 22.04 x86 instance_type = "t3.medium", username = "ubuntu", name = f"dask-scheduler", variables = {"type": "scheduler"}) mach.create() return mach ############################################### profile = "hay" num = 1 pier = Pier(profile, subnet_id = "subnet-05eb26d8649a093e1", # project-subnet-public1-us-east-1a sg_groups = ["sg-0f9e555954e863954", # ssh "sg-0b34a3f7398076545", # default "sg-04cd2626d91ac093c"], # dask (8786, 8787) iam = "S3+SSM+CloudWatch+ECR", key_pair = ("Ari-Brown-HAY", "~/.ssh/Ari-Brown-HAY.pem")) # start the machines in the background workers = [start_worker(n) for n in range(num)] print("started workers") scheduler = start_scheduler() print("started scheduler") # wait for them to boot up scheduler.login() for w in workers: w.login() print("logged in to everything") # might need this: https://github.com/fabric/fabric/issues/395 # Start the scheduler print(scheduler.cmd("echo hello world")) scheduler.cmd("dask-scheduler", disown=True) print("scheduler running") # have the workers connect for w in workers: w.cmd(f"dask-worker {scheduler.private_ip}:8786", disown=True) print("workers running") try: #cluster = SSHCluster([scheduler.public_ip, *[w.public_ip for w in workers]], # connect_options={"client_keys": ["~/.ssh/Ari-Brown-HAY.pem"], # "known_hosts": None, # "username": "ubuntu"}) #print(cluster) #client = Client(cluster) client = Client(f"{scheduler.public_ip}:8786") print(client) finally: print("terminating instances") scheduler.terminate() for w in workers: w.terminate()