diff --git a/examples/dask_cluster.py b/examples/dask_cluster.py index 65f7ebd..ab8f55f 100644 --- a/examples/dask_cluster.py +++ b/examples/dask_cluster.py @@ -1,20 +1,23 @@ from minerva.cluster import Cluster from minerva.pier import Pier +from minerva.timing import Timing from dask.distributed import Client +import dask ########### PREP ############################ def worker(pier, n): - mach = pier.machine(ami = "ami-05a242924e713f80a", # dask on ubuntu 22.04 x86 + mach = pier.machine(ami = "ami-01f85b935dc9f674c", # dask on ubuntu 22.04 x86 instance_type = "t3.medium", username = "ubuntu", name = f"dask-worker-{n}", variables = {"type": "worker", - "number": n}) + "number": n}, + disk_size = 512) return mach def scheduler(pier): - mach = pier.machine(ami = "ami-05a242924e713f80a", # dask on ubuntu 22.04 x86 + mach = pier.machine(ami = "ami-01f85b935dc9f674c", # dask on ubuntu 22.04 x86 instance_type = "t3.medium", username = "ubuntu", name = f"dask-scheduler", @@ -27,31 +30,56 @@ pier = Pier("hay", subnet_id = "subnet-05eb26d8649a093e1", # project-subnet-public1-us-east-1a sg_groups = ["sg-0f9e555954e863954", # ssh "sg-0b34a3f7398076545", # default - "sg-04cd2626d91ac093c"], # dask (8786, 8787, ping) - iam = "S3+SSM+CloudWatch+ECR", - key_pair = ("Ari-Brown-HAY", "~/.ssh/Ari-Brown-HAY.pem")) + "sg-04cd2626d91ac093c"], # dask (8786, 8787) + key_pair = ("Ari-Brown-HAY", "~/.ssh/Ari-Brown-HAY.pem"), + iam = "S3+SSM+CloudWatch+ECR") -cluster = Cluster(pier, scheduler, worker, num_workers=1) +cluster = Cluster(pier, scheduler, worker, num_workers=5) cluster.start() -client = Client(cluster.private_location) -print(client) - ########## USAGE ######################## try: + client = Client(cluster.public_location) + print(client) + # Practice with a big array - import numpy as np - import dask.array as da + # https://matthewrocklin.com/blog/work/2017/01/12/dask-dataframes - large_array = np.random.rand(10000, 10000) - print("oh boy") - dask_array = da.from_array(large_array, chunks=(100, 100)) - dask_array = dask_array.persist() # non-blocking + #import numpy as np + #import dask.array as da + import dask.dataframe as dd + import time - mean = dask_array.mean().compute() - print(mean) + # https://stackoverflow.com/questions/43796774/loading-local-file-from-client-onto-dask-distributed-cluster + # Iteratively load files and scatter them to the cluster + # + # futures = [] + # for fn in filenames: + # df = pd.read_csv(fn) + # future = client.scatter(df) + # futures.append(future) + # + # ddf = dd.from_delayed(futures, meta=df) + + # query = athena.query("select * from trajectories") + # ddf = query.distribute_results(client) + + with Timing("reading parquet"): + df = dd.read_parquet("s3://haystac-archive-phase1.trial1/ta1.kitware/ta1/simulation/train/") + + #with Timing("persisting"): + # dp = df.persist() + + #with Timing("count()"): + # print(dp.count()) + + #with Timing("mean latitude"): + # print(dp.latitude.mean().compute()) + + #with Timing("mean longitude"): + # print(dp.longitude.mean().compute()) finally: ########## FIN #######################