From e854a93e60402666b049971c71c06b9a58c29e53 Mon Sep 17 00:00:00 2001 From: Ari Brown Date: Fri, 1 Dec 2023 09:05:17 -0500 Subject: [PATCH] moving cluster scripts to new dir --- cluster/ec2_cluster.py | 102 +++++++++++++++++++++++ run_cluster.py => cluster/run_cluster.py | 7 +- dask_test.py | 87 ++++++++++--------- test.py | 56 ------------- test2.py | 86 ------------------- 5 files changed, 154 insertions(+), 184 deletions(-) create mode 100644 cluster/ec2_cluster.py rename run_cluster.py => cluster/run_cluster.py (88%) delete mode 100644 test.py delete mode 100644 test2.py diff --git a/cluster/ec2_cluster.py b/cluster/ec2_cluster.py new file mode 100644 index 0000000..fe45574 --- /dev/null +++ b/cluster/ec2_cluster.py @@ -0,0 +1,102 @@ +import sys +import dask +from dask_cloudprovider.aws import EC2Cluster +from minerva.timing import Timing +import dask.dataframe as dd +import dask.distributed +from dask.distributed import Client +import configparser +import os +import contextlib + +def get_aws_credentials(): + parser = configparser.RawConfigParser() + parser.read(os.path.expanduser('~/.aws/credentials')) + credentials = parser.items('default') + all_credentials = {key.upper(): value for key, value in credentials} + with contextlib.suppress(KeyError): + all_credentials["AWS_REGION"] = all_credentials.pop("REGION") + return all_credentials + +env = get_aws_credentials() +env['EXTRA_PIP_PACKAGES'] = 's3fs' + +cluster = EC2Cluster(env_vars = env, + n_workers = 2, + instance_type = 'm5.large', + subnet_id = "subnet-05eb26d8649a093e1", # project-subnet-public1-us-east-1a + security_groups = ["sg-0f9e555954e863954", # ssh + "sg-0b34a3f7398076545", # default + "sg-04cd2626d91ac093c"], # dask (8786, 8787) + ami = "ami-0399a4f70ca684620", + key_name = "Ari-Brown-HAY", + security = False, + iam_instance_profile = {'Name': 'Minerva'}) + +client = Client(cluster) + +manifest_files = ['s3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_90fc7ac6-1be8-42a3-8485-f5fda039a23b', + 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_3fd296d8-4d16-4491-a950-d5e4b0fff172', + 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_19afa54e-ed8f-4da5-a575-a2f7acd02399', + 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_0240d219-11d8-4965-9425-ed2cbe9f0984', + 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_657a9eee-2db8-4b55-8c2b-c98c67eb2992', + 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_4fb5f559-ac3e-4d0b-bff6-e90aefcecff7', + 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_a7b3bdfa-aa06-4c7b-b010-f0c59d925051', + 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_444110c5-885d-4369-9cbd-bb842383baa7', + 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_921dc55e-8d94-4f91-b71b-ec1b45ff999f', + 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_dc3fafb8-8d8a-432c-a9f4-386332b7720c', + 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_9b8c79b9-d8a6-487e-a10b-6d65dae9daff', + 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_26fe7c8b-15f4-419f-a7c7-c87461f1b69c', + 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_45c5c82b-befc-4b0c-97fa-673de0feb9dd', + 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_aca904de-b154-4f56-b255-b71de0be3060', + 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_9a131e04-2353-44f1-9c13-7c1b381ca553', + 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_66e9e85d-406b-4164-87f3-3ffbe4ff9162', + 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_e32f16e7-591e-4da4-a2b1-4b00ac4cb617', + 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_88eb15ea-9278-4e54-978b-9c211a4b834f', + 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_71b697fe-3da2-4c5b-a046-e5773b494e7b', + 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_0454e926-10fb-4af4-82ea-082e6bdb7c5c', + 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_6ee2d3b4-a837-419f-b181-53a127a791e3', + 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_abfdb890-64d6-4e05-adf6-c020633fb1ab', + 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_ee27888e-c3fa-4731-a75b-b2f20efcafc3', + 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_c0d5978a-a66a-4faf-9d5b-5f5ebd7e7311', + 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_bdf24af9-bdca-467c-abca-04e215eb190c', + 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_0dddbf5f-ce5d-4685-8361-cda906bef37c', + 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_a6800c32-4790-4e77-bfdf-7900ed44097a', + 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_0d0913a9-9f82-4418-9450-4cbf364ca9fb', + 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_ab1a2407-b7a4-4d69-9539-a05391945149', + 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_bcfde315-905f-475c-a3c3-3a3556e53fe4'] + +try: + with Timing("read parquet from athena"): + df = dd.read_parquet(manifest_files) + print("distributed") + + with Timing("partitioning"): + divisions = list(range(0, 10001)) + df = df.set_index('agent', divisions=divisions) + + with Timing("persisting"): + dp = df.persist() + + with Timing("memory usage"): + print(dp.get_partition(400).memory_usage()) + + with Timing("count()"): + print(dp.count().compute()) + + with Timing("memory usage"): + print(dp.get_partition(400).memory_usage()) + + with Timing("mean latitude"): + print(dp.groupby(dp.index).latitude.mean().compute()) + + with Timing("count()"): + print(dp.count().compute()) + +finally: + ########## FIN ####################### + print("closing client") + client.close() + +cluster.close() + diff --git a/run_cluster.py b/cluster/run_cluster.py similarity index 88% rename from run_cluster.py rename to cluster/run_cluster.py index 848fe08..1a00115 100644 --- a/run_cluster.py +++ b/cluster/run_cluster.py @@ -4,12 +4,11 @@ from minerva.pier import Pier ########### PREP ############################ -# modin base (built on dask base) -DASK_BASE = "ami-0a8b05658a4f00e52" # dask on ubuntu 22.04 x86 +DASK_BASE = "ami-0399a4f70ca684620" # dask on ubuntu 22.04 x86 def worker(pier, n): mach = pier.machine(ami = DASK_BASE, - instance_type = "r5.xlarge", + instance_type = "m5.large", username = "ubuntu", name = f"dask-worker-{n}", variables = {"type": "worker", @@ -19,7 +18,7 @@ def worker(pier, n): def scheduler(pier): mach = pier.machine(ami = DASK_BASE, - instance_type = "r5.xlarge", + instance_type = "m5.large", # "r5.xlarge", username = "ubuntu", name = f"dask-scheduler", variables = {"type": "scheduler"}, diff --git a/dask_test.py b/dask_test.py index d1a2a9c..c46eb11 100644 --- a/dask_test.py +++ b/dask_test.py @@ -2,47 +2,51 @@ import sys import minerva from minerva.timing import Timing import dask -import dask.distributed -from dask.distributed import Client import dask.dataframe as dd import time +#dask.config.set({'distributed.worker.multiprocessing-method': 'fork'}) + +import dask.distributed +from dask.distributed import Client + m = minerva.Minerva("hay") print(f"connecting to {sys.argv[1]}") client = Client(sys.argv[1]) -#client = Client() -manifest_files = ['s3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_0934fdc9-1bc7-4dae-a371-c3f58f3b31fc', - 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_7011b49a-bf6c-42d0-9852-85aa10a3ad37', - 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_8ca98d70-74c2-4d17-b059-83cd25e398c0', - 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_61265893-e992-4026-8b69-e60a4641dd10', - 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_d4318a64-7fc0-4b59-8f85-8b1790e72a70', - 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_4e2e19a8-b360-49fe-9b82-f66a1e23ad3e', - 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_c5a77760-f078-432c-aaf5-6d99fb0cee0c', - 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_db5f90f3-9fc0-4b86-98da-d840bb9b5423', - 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_2472c560-1113-448b-ae18-e23032d3f3d8', - 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_551944ef-47f4-475d-a575-e209ca1ee7b4', - 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_67569166-18f9-40bd-9845-20f069f8dc8a', - 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_8a81dca0-767d-43bf-b0aa-a087f98d41c5', - 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_c9dd6a02-0f92-4202-ba70-77fda02d4acf', - 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_9f094026-03f4-4ec2-a348-0498eeb6b04d', - 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_2c20c942-1e2c-4f37-86ec-4adce83edbea', - 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_237d6c26-8372-4584-ae15-9f693d2295a6', - 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_cf41cbf5-eb46-4bb9-b904-f1518affbefa', - 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_420f8434-5a71-4af8-91b5-d5364c941ded', - 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_f155616d-39bc-483e-b19a-56da9fbae685', - 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_ef8a7770-43b7-44bb-9dde-f92ed8faae9b', - 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_8aa7de20-91ee-4ada-8f48-c7a5b313572f', - 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_7c265580-ada5-4b94-b818-5cebdb4bb6c6', - 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_13562caf-4578-4e36-83fe-8b7a5eabc7e8', - 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_b0f80cd3-4aa5-40ac-a0c8-632c763f8969', - 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_2f1e190b-621c-4638-91f1-e961ba674714', - 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_5dcd43e4-67bd-41ba-b3cd-7e8f8da9b1f9', - 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_293c6916-4f22-4ca0-a240-078ebe48368b', - 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_80e8c47e-f254-47c4-80af-d744885e8174', - 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_6b1ba112-0545-4c3f-9321-dbf597d98bc1', - 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_0b3dd1a8-72ef-4852-bf1d-f7091629d3b6'] +#manifest_files = ['s3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_b311740c-02a5-42a5-9996-b17bdbf604ca', +# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_1abe3b5f-ab17-401e-91e9-a5c4f3fa17b9', +# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_c08442a6-74b8-4cf0-afc5-ab1a03f59eea', +# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_399a2786-68b5-4c40-961e-476451885f7f', +# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_8a699085-7d6b-426a-8a59-1ae4a6a183b9', +# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_0ca91f4c-85cf-4c93-b039-4f307dde0a86', +# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_bf38065d-fde8-45d3-87eb-3a56479f8b57', +# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_6a0ebea5-9a23-4fb5-8547-d4bb82f6a821', +# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_3464c3f6-fa5c-4d6b-bdf4-be387ff9dfda', +# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_9088425e-0c87-49ef-a883-92c217a715bb', +# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_0517e42b-089b-4819-b166-fa21489485a6', +# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_0197ff95-8c3c-4121-8077-a499927bb097', +# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_990579d4-0bae-4547-a630-e5a05341d264', +# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_2c838980-65d6-4a3d-898d-bfcb63e361bd', +# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_930df083-3c4c-45d6-963e-51e47d3a704c', +# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_6739cc33-43e0-468d-a6d2-fa17161bbd5e', +# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_685d81de-ffff-4629-bfb2-4bca3941474b', +# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_41e3cb39-a917-460a-bd86-413df774806b', +# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_ce533449-b14f-439e-92dc-69abdeda706f', +# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_c68ff721-8b97-4abf-8573-ac3fd617af97', +# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_04f13894-63d8-453c-ace7-87368511e94b', +# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_0d906712-e469-419a-8bf7-508b0b79848d', +# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_c514089d-c2eb-496a-a1a2-8501ecaf7d84', +# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_fe04a9d7-56ad-4e99-bbfa-8adb398af971', +# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_1ea10655-a063-468e-ad9e-0788eb15bd8d', +# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_7146bbe6-1278-43d3-a9e3-1a9e6a776105', +# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_add3ea7e-961f-4760-88b3-c839d4a344b3', +# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_2a00d610-18bf-4611-8f58-f47bdd484504', +# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_54ea410d-fa42-4ab5-bc2b-8179ad1cfb46', +# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_07aded26-d650-4852-9d3f-732409db3fd7'] + +manifest_files = ["s3://haystac-pmo-athena/results/f236b9a3-8aa7-4e29-b4c7-1fee5981a0ad.csv"] def full_s3(obj): return "s3://" + obj.bucket_name + "/" + obj.key @@ -75,12 +79,12 @@ try: #query = athena.execute(f"select * from trajectories.kitware limit {records}", format="csv") #query.finish() #df = dd.read_csv(query.info_cache['ResultConfiguration']['OutputLocation']) + df = dd.read_csv(manifest_files[0]) # Unload to Parquet files #query = athena.query(f"select * from trajectories.kitware limit {records}") #df = dd.read_parquet(query.manifest_files()) - df = dd.read_parquet(manifest_files) - print("distributed") + #df = dd.read_parquet(manifest_files, engine='fastparquet') with Timing("partitioning"): divisions = list(range(0, 10001)) @@ -88,18 +92,25 @@ try: with Timing("persisting"): dp = df.persist() + print(dp) #import IPython #IPython.embed() + with Timing("memory usage"): + print(dp.get_partition(400).memory_usage()) + with Timing("count()"): - print(dp.count().compute()) + print(dp.count()) + + with Timing("memory usage"): + print(dp.get_partition(400).memory_usage()) with Timing("mean latitude"): - print(dp.groupby(dp.index).latitude.mean().compute()) + print(dp.groupby(dp.index).latitude.mean()) with Timing("count()"): - print(dp.count().compute()) + print(dp.count()) #with Timing("mean longitude"): # print(dp.groupby(dp.index).longitude.mean().compute()) diff --git a/test.py b/test.py deleted file mode 100644 index c9c7081..0000000 --- a/test.py +++ /dev/null @@ -1,56 +0,0 @@ -import dask.distributed as distributed -import dask_cloudprovider.aws as aws -import configparser -import os -import contextlib - -# altered /Users/ari/opt/miniconda3/envs/mamba_oa_env/lib/python3.10/site-packages/aiobotocore/endpoint.py:96 - -# needs [default] AWS credential -# `security = False` (can't use TLS because otherwise the UserData param is too long) - -def aws_profile(profile): - parser = configparser.RawConfigParser() - parser.read(os.path.expanduser("~/.aws/credentials")) - config = parser.items(profile) - config = {key.upper(): value for key, value in [*config]} - config['AWS_REGION'] = config.pop('REGION') - return config - -# Create a cluster -cluster = aws.EC2Cluster( - env_vars = aws_profile("hay"), - key_name = "Ari-Brown-HAY", - vpc = "vpc-0823964489ecc1e85", - subnet_id = "subnet-05eb26d8649a093e1", # project-subnet-public1-us-east-1a - n_workers = 2, - region = "us-east-1", - bootstrap = True, - - security_groups = ["sg-0f9e555954e863954", # ssh - "sg-0b34a3f7398076545", # default - "sg-04cd2626d91ac093c"], # dask (8786, 8787) - #worker_module = "dask_cuda.cli.dask_cuda_worker", # for running GPU clusters - - #iam_instance_profile = "S3+SSM+CloudWatch+ECR", # this is actually a dict? what contents??? - worker_instance_type = "t3.small", - ami = "ami-0b0cd81283738558a", # ubuntu 22.04 x86 - - security = False) -print(cluster) -exit() - -# Connect to the cluster -client = distributed.Client(cluster) -print(client) - -# Practice with a big array -import numpy as np -import dask.array as da - -large_array = np.random.rand(1000000, 1000000) -dask_array = da.from_array(large_array, chunks=(1000, 1000)) -dask_array = dask_array.persist() # non-blocking - -mean = dask_array.mean().compute() -print(mean) diff --git a/test2.py b/test2.py deleted file mode 100644 index 06b536c..0000000 --- a/test2.py +++ /dev/null @@ -1,86 +0,0 @@ -import configparser -import os -import contextlib -import sys -import pprint -from minerva.pier import Pier -from dask.distributed import Client, SSHCluster - -pp = pprint.PrettyPrinter(indent=4) - -#import logging -#logging.basicConfig(level='DEBUG') - -def start_worker(n): - mach = pier.machine(ami = "ami-03f64555d230faf32", # ubuntu 22.04 x86 - instance_type = "t3.medium", - username = "ubuntu", - name = f"dask-worker-{n}", - variables = {"type": "worker", - "number": n}) - mach.create() - return mach - -def start_scheduler(): - mach = pier.machine(ami = "ami-03f64555d230faf32", # dask on ubuntu 22.04 x86 - instance_type = "t3.medium", - username = "ubuntu", - name = f"dask-scheduler", - variables = {"type": "scheduler"}) - mach.create() - return mach - -############################################### - -profile = "hay" -num = 1 - -pier = Pier(profile, - subnet_id = "subnet-05eb26d8649a093e1", # project-subnet-public1-us-east-1a - sg_groups = ["sg-0f9e555954e863954", # ssh - "sg-0b34a3f7398076545", # default - "sg-04cd2626d91ac093c"], # dask (8786, 8787) - iam = "S3+SSM+CloudWatch+ECR", - key_pair = ("Ari-Brown-HAY", "~/.ssh/Ari-Brown-HAY.pem")) - - -# start the machines in the background -workers = [start_worker(n) for n in range(num)] -print("started workers") -scheduler = start_scheduler() -print("started scheduler") - -# wait for them to boot up -scheduler.login() -for w in workers: - w.login() - -print("logged in to everything") - -# might need this: https://github.com/fabric/fabric/issues/395 - -# Start the scheduler -print(scheduler.cmd("echo hello world")) -scheduler.cmd("dask-scheduler", disown=True) -print("scheduler running") - -# have the workers connect -for w in workers: - w.cmd(f"dask-worker {scheduler.private_ip}:8786", disown=True) -print("workers running") - -try: - #cluster = SSHCluster([scheduler.public_ip, *[w.public_ip for w in workers]], - # connect_options={"client_keys": ["~/.ssh/Ari-Brown-HAY.pem"], - # "known_hosts": None, - # "username": "ubuntu"}) - #print(cluster) - #client = Client(cluster) - client = Client(f"{scheduler.public_ip}:8786") - print(client) -finally: - print("terminating instances") - scheduler.terminate() - for w in workers: - w.terminate() -