diff --git a/examples/repartition.py b/examples/repartition.py index 84fb52b..220f138 100644 --- a/examples/repartition.py +++ b/examples/repartition.py @@ -3,25 +3,32 @@ import servers as s import json import math import sys +import os src_top_level = "s3://phase1.trial2/ta1.kitware/te.apl/transforms/plain/" -dst_top_level = "s3://phase1.trial2/ta1.kitware/te.apl/transforms/ari_sorted/" +sorted_top = "s3://phase1.trial2/ta1.kitware/te.apl/transforms/ari_sorted/" +dst_top_level = "s3://phase1.trial2/ta1.kitware/te.apl/transforms/ari_sorted_2/" def sort_hour(mach, hour): image = "436820952613.dkr.ecr.us-east-1.amazonaws.com/apl-pyarrow-experiment" # Prep the info for the docker container - srted_loc = src_top_level + '/'.join(hour.split('/')[-4:]) + srted_loc = sorted_top + '/'.join(hour.split('/')[-4:]) srted_loc += "/data.zstd.parquet" variables = {"source": hour, # hour "destination": srted_loc } # precise location of new file + try: + os.mkdir(f"/tmp/{mach.name}") + except: + pass + # Create the machine to run it dock = m.Docker(machine = mach, container = image, variables = {"PAYLOAD": json.dumps(variables)}, - stdout = sys.stdout, - stderr = sys.stderr) + stdout = open(f"/tmp/{mach.name}/sort_stdout.out", "wb"), + stderr = open(f"/tmp/{mach.name}/sort_stderr.out", "wb")) dock.run() @@ -39,8 +46,8 @@ def repartition(mach, agents): dock = m.Docker(machine = mach, container = image, variables = {"PAYLOAD": json.dumps(variables)}, - stdout = sys.stdout, - stderr = sys.stderr) + stdout = open(f"/tmp/{mach.name}/repartition_stdout.out", "wb"), + stderr = open(f"/tmp/{mach.name}/repartition_stderr.out", "wb")) dock.run() @@ -55,7 +62,8 @@ hours = set(["s3://" + '/'.join([o.bucket_name, *o.key.split("/")[0:-1]]) print(f"{len(hours)} hours to sort") hours = sorted(hours) -hours = [hours[0]] +hours = hours[0:pool_size + 1] +print(f"\tprocessing {len(hours)} of them") # Split the agents into chunks for each machine in the pool agents = list(range(200)) @@ -66,7 +74,7 @@ try: ####################################### # Create the machines # This also waits for them to be made - pool = m.Pool(s.worker, pool_size, web=True) + pool = m.Pool(s.worker, pool_size) ######################################## # Now that we have the pool, put them to work @@ -74,10 +82,10 @@ try: # doing that until the list is empty # First part: sort the individual files - #pool.run(sort_hour, data=hours) + pool.run(sort_hour, data=hours) # Second part: repartition - #pool.run(repartition, data=groups) + pool.run(repartition, data=groups) import IPython IPython.embed() diff --git a/minerva/docker.py b/minerva/docker.py index b2b7ab3..477db78 100644 --- a/minerva/docker.py +++ b/minerva/docker.py @@ -46,18 +46,19 @@ class Docker: res = self.machine.docker_run(self.uri, cmd=cmd, env=self.variables) - self.out["stdout"] = res[0].read - self.out["stderr"] = res[1].read + self.out["stdout"] = res[0].name + self.out["stderr"] = res[1].name if self.stdout: - self.stdout.write(res[0].read) - self.stdout.write("\n") + self.stdout.write(res[0].read()) + self.stdout.close() if self.stderr: - self.stderr.write(res[1].read) - self.stderr.write("\n") + self.stderr.write(res[1].read()) + self.stderr.close() self.finished = True + print(f"finished on {self.machine.name}") def terminate(self): self.machine.terminate() @@ -110,3 +111,4 @@ class Group: return other + diff --git a/minerva/machine.py b/minerva/machine.py index c9cb7ae..d0e5d53 100644 --- a/minerva/machine.py +++ b/minerva/machine.py @@ -132,6 +132,7 @@ class Machine(minerva.Remote): "key_filename": self.pier.key_path } ) + self.ssh.open() self.started = datetime.datetime.now() return True diff --git a/minerva/remote.py b/minerva/remote.py index b773fe9..13628b5 100644 --- a/minerva/remote.py +++ b/minerva/remote.py @@ -5,7 +5,7 @@ import threading import select import tempfile import io - +import shlex def flush_data(data, pipe, display=None): pipe.write(data) diff --git a/test/bare.py b/test/bare.py new file mode 100644 index 0000000..b06f9e1 --- /dev/null +++ b/test/bare.py @@ -0,0 +1,13 @@ +import minerva +import sys + +mach = minerva.Remote(ip = sys.argv[1], + username = "ubuntu", + key_path = "/Users/ari/.ssh/Ari-Brown-HAY.pem") + +mach.login() +out, err, thread = mach.cmd("for i in $(seq 1 10); do echo $i; >&2 echo $i; sleep 2; done") + +import IPython +IPython.embed() + diff --git a/test/dask_test.py b/test/dask_test.py new file mode 100644 index 0000000..70052c6 --- /dev/null +++ b/test/dask_test.py @@ -0,0 +1,124 @@ +import sys +import minerva +from minerva.timing import Timing +import dask +import dask.dataframe as dd +import time + +#dask.config.set({'distributed.worker.multiprocessing-method': 'fork'}) + +import dask.distributed +from dask.distributed import Client + +m = minerva.Minerva("hay") + +print(f"connecting to {sys.argv[1]}") +client = Client(sys.argv[1]) + +manifest_files = ['s3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_b311740c-02a5-42a5-9996-b17bdbf604ca', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_1abe3b5f-ab17-401e-91e9-a5c4f3fa17b9', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_c08442a6-74b8-4cf0-afc5-ab1a03f59eea', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_399a2786-68b5-4c40-961e-476451885f7f', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_8a699085-7d6b-426a-8a59-1ae4a6a183b9', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_0ca91f4c-85cf-4c93-b039-4f307dde0a86', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_bf38065d-fde8-45d3-87eb-3a56479f8b57', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_6a0ebea5-9a23-4fb5-8547-d4bb82f6a821', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_3464c3f6-fa5c-4d6b-bdf4-be387ff9dfda', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_9088425e-0c87-49ef-a883-92c217a715bb', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_0517e42b-089b-4819-b166-fa21489485a6', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_0197ff95-8c3c-4121-8077-a499927bb097', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_990579d4-0bae-4547-a630-e5a05341d264', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_2c838980-65d6-4a3d-898d-bfcb63e361bd', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_930df083-3c4c-45d6-963e-51e47d3a704c', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_6739cc33-43e0-468d-a6d2-fa17161bbd5e', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_685d81de-ffff-4629-bfb2-4bca3941474b', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_41e3cb39-a917-460a-bd86-413df774806b', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_ce533449-b14f-439e-92dc-69abdeda706f', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_c68ff721-8b97-4abf-8573-ac3fd617af97', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_04f13894-63d8-453c-ace7-87368511e94b', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_0d906712-e469-419a-8bf7-508b0b79848d', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_c514089d-c2eb-496a-a1a2-8501ecaf7d84', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_fe04a9d7-56ad-4e99-bbfa-8adb398af971', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_1ea10655-a063-468e-ad9e-0788eb15bd8d', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_7146bbe6-1278-43d3-a9e3-1a9e6a776105', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_add3ea7e-961f-4760-88b3-c839d4a344b3', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_2a00d610-18bf-4611-8f58-f47bdd484504', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_54ea410d-fa42-4ab5-bc2b-8179ad1cfb46', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_07aded26-d650-4852-9d3f-732409db3fd7'] + +#manifest_files = ["s3://haystac-pmo-athena/results/f236b9a3-8aa7-4e29-b4c7-1fee5981a0ad.csv"] + +def full_s3(obj): + return "s3://" + obj.bucket_name + "/" + obj.key + +try: + athena = m.athena("s3://haystac-pmo-athena/") + records = 1_000_000_000 + + # Iteratively load files and scatter them to the cluster + with Timing("read parquet from athena"): + # Delta Table + #files = m.s3.ls("s3://haystac-archive-phase1.trial1/ta1.kitware/ta1/simulation/train/") + #parqs = [full_s3(o) for o in files if o.key.endswith('.parquet')] + + # 20 partitions at a time, concat 20 at a time + #df = dd.read_parquet(parqs[0:20])#.astype({"agent": "category", + # # "timestamp": "datetime64[ns, UTC]"}) + #for i in range(20, len(parqs), 20): + # new = dd.read_parquet(parqs[i:i + 20])#.astype({"agent": "category", + # # "timestamp": "datetime64[ns, UTC]"}) + # df = df.concat(new) + + # 20 partitions at a time, concat all at once + #df = dd.concat([dd.read_parquet(parqs[i:i + 20], engine='fastparquet') for i in range(0, len(parqs), 20)]) + + # All partitions at once + #df = dd.read_parquet(parqs, engine='fastparquet') + + # Single CSV file + #query = athena.execute(f"select * from trajectories.kitware limit {records}", format="csv") + #query.finish() + #df = dd.read_csv(query.info_cache['ResultConfiguration']['OutputLocation']) + #df = dd.read_csv(manifest_files[0]) + + # Unload to Parquet files + #query = athena.query(f"select * from trajectories.kitware limit {records}") + #df = dd.read_parquet(query.manifest_files()) + df = dd.read_parquet(manifest_files, engine='fastparquet') + + with Timing("partitioning"): + divisions = list(range(0, 10001)) + df = df.set_index('agent', divisions=divisions) + + with Timing("persisting"): + dp = df.persist() + print(dp) + + #import IPython + #IPython.embed() + + with Timing("memory usage"): + print(dp.memory_usage().compute()) + + with Timing("count()"): + print(dp.count().compute()) + + with Timing("memory usage"): + print(dp.memory_usage().compute()) + + with Timing("mean latitude"): + print(dp.groupby(dp.index).latitude.mean().compute()) + + with Timing("count()"): + print(dp.count().compute()) + + #with Timing("mean longitude"): + # print(dp.groupby(dp.index).longitude.mean().compute()) + +finally: + ########## FIN ####################### + print("closing client") + client.close() + #print("end") + #input() + diff --git a/test/error.py b/test/error.py new file mode 100644 index 0000000..a2bdae5 --- /dev/null +++ b/test/error.py @@ -0,0 +1,69 @@ +import dask +import dask.dataframe as dd + +import dask.distributed +from dask.distributed import Client + +client = Client() + +manifest_files = ['s3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_b311740c-02a5-42a5-9996-b17bdbf604ca', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_1abe3b5f-ab17-401e-91e9-a5c4f3fa17b9', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_c08442a6-74b8-4cf0-afc5-ab1a03f59eea', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_399a2786-68b5-4c40-961e-476451885f7f', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_8a699085-7d6b-426a-8a59-1ae4a6a183b9', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_0ca91f4c-85cf-4c93-b039-4f307dde0a86', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_bf38065d-fde8-45d3-87eb-3a56479f8b57', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_6a0ebea5-9a23-4fb5-8547-d4bb82f6a821', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_3464c3f6-fa5c-4d6b-bdf4-be387ff9dfda', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_9088425e-0c87-49ef-a883-92c217a715bb', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_0517e42b-089b-4819-b166-fa21489485a6', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_0197ff95-8c3c-4121-8077-a499927bb097', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_990579d4-0bae-4547-a630-e5a05341d264', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_2c838980-65d6-4a3d-898d-bfcb63e361bd', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_930df083-3c4c-45d6-963e-51e47d3a704c', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_6739cc33-43e0-468d-a6d2-fa17161bbd5e', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_685d81de-ffff-4629-bfb2-4bca3941474b', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_41e3cb39-a917-460a-bd86-413df774806b', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_ce533449-b14f-439e-92dc-69abdeda706f', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_c68ff721-8b97-4abf-8573-ac3fd617af97', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_04f13894-63d8-453c-ace7-87368511e94b', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_0d906712-e469-419a-8bf7-508b0b79848d', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_c514089d-c2eb-496a-a1a2-8501ecaf7d84', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_fe04a9d7-56ad-4e99-bbfa-8adb398af971', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_1ea10655-a063-468e-ad9e-0788eb15bd8d', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_7146bbe6-1278-43d3-a9e3-1a9e6a776105', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_add3ea7e-961f-4760-88b3-c839d4a344b3', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_2a00d610-18bf-4611-8f58-f47bdd484504', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_54ea410d-fa42-4ab5-bc2b-8179ad1cfb46', + 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_07aded26-d650-4852-9d3f-732409db3fd7'] + +try: + print("reading parquet") + df = dd.read_parquet(manifest_files, engine='fastparquet') + + print("partitioning") + divisions = list(range(0, 10001)) + df = df.set_index('agent', divisions=divisions) + + print("persisting") + dp = df.persist() + + print("memory usage") + print(dp.get_partition(400).memory_usage()) + + print("count") + print(dp.count().compute()) + + print("memory usage") + print(dp.get_partition(400).memory_usage().compute()) + + print("latitude") + print(dp.groupby(dp.index).latitude.mean().compute()) + + print("count") + print(dp.count().compute()) + +finally: + print("closing client") + client.close() + diff --git a/test/mre.py b/test/mre.py new file mode 100644 index 0000000..83a5f78 --- /dev/null +++ b/test/mre.py @@ -0,0 +1,51 @@ +import sys +import minerva +from minerva.timing import Timing +import dask +import dask.dataframe as dd +import time + +#dask.config.set({'distributed.worker.multiprocessing-method': 'fork'}) + +import dask.distributed +from dask.distributed import Client + +m = minerva.Minerva("hay") + +print(f"connecting to {sys.argv[1]}") +client = Client(sys.argv[1]) + +manifest_files = ['s3://ari-public-test-data/test1'] + +try: + + with Timing("read parquets"): + df = dd.read_parquet(manifest_files, engine='fastparquet') + + with Timing("partitioning"): + divisions = list(range(0, 10001)) + df = df.set_index('agent', divisions=divisions) + + with Timing("persisting"): + dp = df.persist() + + with Timing("total memory usage"): + print(dp.memory_usage().compute()) + + with Timing("partition usage"): + print(dp.get_partition(300).memory_usage().compute()) + + with Timing("count()"): + print(dp.count().compute()) + + with Timing("memory usage"): + print(dp.memory_usage().compute()) + + with Timing("count()"): + print(dp.count().compute()) + +finally: + ########## FIN ####################### + print("closing client") + client.close() + diff --git a/test/streaming.py b/test/streaming.py new file mode 100644 index 0000000..8671e3f --- /dev/null +++ b/test/streaming.py @@ -0,0 +1,28 @@ +import minerva +import sys +import select + +m = minerva.Minerva("hay") +pier = m.pier(subnet_id = "subnet-05eb26d8649a093e1", # project-subnet-public1-us-east-1a + sg_groups = ["sg-0f9e555954e863954", # ssh + "sg-0b34a3f7398076545"], # default + iam = "S3+SSM+CloudWatch+ECR", + key_pair = ("Ari-Brown-HAY", "/Users/ari/.ssh/Ari-Brown-HAY.pem")) + +mach = pier.machine(ami = "ami-0b0cd81283738558a", # ubuntu 22.04 x86 + instance_type = "t3.medium", + username = "ubuntu", + name = f"minerva-aws-test") + +try: + mach.create() + mach.login() + #stdout, stderr = mach.cmd("for i in $(seq 1 10); do echo $i; >&2 echo $i; sleep 2; done", disown=True) + + import IPython + IPython.embed() + +finally: + mach.terminate() + +