from minerva.cluster import Cluster from minerva.pier import Pier from dask.distributed import Client ########### PREP ############################ def worker(pier, n): mach = pier.machine(ami = "ami-05a242924e713f80a", # dask on ubuntu 22.04 x86 instance_type = "t3.medium", username = "ubuntu", name = f"dask-worker-{n}", variables = {"type": "worker", "number": n}) return mach def scheduler(pier): mach = pier.machine(ami = "ami-05a242924e713f80a", # dask on ubuntu 22.04 x86 instance_type = "t3.medium", username = "ubuntu", name = f"dask-scheduler", variables = {"type": "scheduler"}) return mach ########## CLUSTER ########################## pier = Pier("hay", subnet_id = "subnet-05eb26d8649a093e1", # project-subnet-public1-us-east-1a sg_groups = ["sg-0f9e555954e863954", # ssh "sg-0b34a3f7398076545", # default "sg-04cd2626d91ac093c"], # dask (8786, 8787, ping) iam = "S3+SSM+CloudWatch+ECR", key_pair = ("Ari-Brown-HAY", "~/.ssh/Ari-Brown-HAY.pem")) cluster = Cluster(pier, scheduler, worker, num_workers=1) cluster.start() client = Client(cluster.private_location) print(client) ########## USAGE ######################## try: # Practice with a big array import numpy as np import dask.array as da large_array = np.random.rand(10000, 10000) print("oh boy") dask_array = da.from_array(large_array, chunks=(100, 100)) dask_array = dask_array.persist() # non-blocking mean = dask_array.mean().compute() print(mean) finally: ########## FIN ####################### cluster.terminate()