from minerva.cluster import Cluster from minerva.pier import Pier from minerva.timing import Timing from dask.distributed import Client import dask ########### PREP ############################ def worker(pier, n): mach = pier.machine(ami = "ami-01f85b935dc9f674c", # dask on ubuntu 22.04 x86 instance_type = "t3.medium", username = "ubuntu", name = f"dask-worker-{n}", variables = {"type": "worker", "number": n}, disk_size = 512) return mach def scheduler(pier): mach = pier.machine(ami = "ami-01f85b935dc9f674c", # dask on ubuntu 22.04 x86 instance_type = "t3.medium", username = "ubuntu", name = f"dask-scheduler", variables = {"type": "scheduler"}) return mach ########## CLUSTER ########################## pier = Pier("hay", subnet_id = "subnet-05eb26d8649a093e1", # project-subnet-public1-us-east-1a sg_groups = ["sg-0f9e555954e863954", # ssh "sg-0b34a3f7398076545", # default "sg-04cd2626d91ac093c"], # dask (8786, 8787) key_pair = ("Ari-Brown-HAY", "~/.ssh/Ari-Brown-HAY.pem"), iam = "S3+SSM+CloudWatch+ECR") cluster = Cluster(pier, scheduler, worker, num_workers=5) cluster.start() ########## USAGE ######################## try: client = Client(cluster.public_location) print(client) # Practice with a big array # https://matthewrocklin.com/blog/work/2017/01/12/dask-dataframes #import numpy as np #import dask.array as da import dask.dataframe as dd import time # https://stackoverflow.com/questions/43796774/loading-local-file-from-client-onto-dask-distributed-cluster # Iteratively load files and scatter them to the cluster # # futures = [] # for fn in filenames: # df = pd.read_csv(fn) # future = client.scatter(df) # futures.append(future) # # ddf = dd.from_delayed(futures, meta=df) # query = athena.query("select * from trajectories") # ddf = query.distribute_results(client) with Timing("reading parquet"): df = dd.read_parquet("s3://haystac-archive-phase1.trial1/ta1.kitware/ta1/simulation/train/") with Timing("persisting"): dp = df.persist() with Timing("count()"): print(dp.count()) with Timing("mean latitude"): print(dp.latitude.mean().compute()) with Timing("mean longitude"): print(dp.longitude.mean().compute()) finally: ########## FIN ####################### cluster.terminate()