diff --git a/examples/parallelization.py b/examples/parallelization.py new file mode 100644 index 0000000..4fee2db --- /dev/null +++ b/examples/parallelization.py @@ -0,0 +1,35 @@ +import minerva +import pprint + +pp = pprint.PrettyPrinter(indent=4) + +# Create the Minerva object which gives you access to the account under the +# profile `hay` +m = minerva.Minerva("hay") + +# Get the Athena object +athena = m.athena("s3://haystac-pmo-athena/") + +# Parallelize across the `data` and split it into `n` chunks, one chunk per process. +# Since `num_agents` is a number, it's turned into a range and then split. +num_agents = 10000 +parallel = athena.parallelize("trajectories", n = 200, data = num_agents) +for agents in parallel: + # Everything *needs* to have a column in order for unloading to parquet to work, + # so scalar values have to be assigned something, so here we use `as count` to + # create a temporary column called `count` + sql = f""" + select count(*) as cnt + from trajectories.basline + where agent >= {min(agents)} and + agent < {max(agents)} + group by agent + """ + parallel << athena.query(query, partition = {"agent": agents}) + +pp.pprint(parallel.results().head(10)) + +# We also get important statistics +print(parallel.runtime) +print(parallel.cost) + diff --git a/examples/repartition.py b/examples/repartition.py new file mode 100644 index 0000000..093a0b1 --- /dev/null +++ b/examples/repartition.py @@ -0,0 +1,86 @@ +import minerva as m +import servers as s +import json +import math +import sys + +src_top_level = "s3://phase1.trial2/ta1.kitware/te.apl/transforms/plain/" +dst_top_level = "s3://phase1.trial2/ta1.kitware/te.apl/transforms/ari_sorted/" + +def sort_hour(mach, hour): + image = "436820952613.dkr.ecr.us-east-1.amazonaws.com/apl-pyarrow-experiment" + + # Prep the info for the docker container + srted_loc = src_top_level + '/'.join(hour.split('/')[-4:]) + srted_loc += "/data.zstd.parquet" + variables = {"source": hour, # hour + "destination": srted_loc } # precise location of new file + + # Create the machine to run it + dock = m.Docker(machine = mach, + container = image, + variables = {"PAYLOAD": json.dumps(variables)}, + stdout = sys.stdout, + stderr = sys.stderr) + dock.run() + + +def repartition(mach, agents): + image = "436820952613.dkr.ecr.us-east-1.amazonaws.com/apl-pyarrow-experiment-agent" + + # Prep the info for the docker container + variables = {"min_agent": min(agents), + "max_agent": max(agents), + "source": src_top_level, + "destination": dst_top_level, + "secondary_destination": None} + + # Create the machine to run it + dock = m.Docker(machine = mach, + container = image, + variables = {"PAYLOAD": json.dumps(variables)}, + stdout = sys.stdout, + stderr = sys.stderr) + dock.run() + + +##################################### +# Prep the work +# Find out how many hours there are in the dataset +pool_size = 1 + +objs = s.m.s3.ls(src_top_level + "year=") +hours = set(["s3://" + '/'.join([o.bucket_name, *o.key.split("/")[0:-1]]) + for o in objs]) + +print(f"{len(hours)} hours to sort") +hours = sorted(hours) +hours = [hours[0]] + +# Split the agents into chunks for each machine in the pool +agents = list(range(200)) +size = math.ceil(len(agents) / pool_size) +groups = [agents[i:i + size] for i in range(0, len(agents), size)] + +try: + ####################################### + # Create the machines + # This also waits for them to be made + pool = m.Pool(s.worker, pool_size) + + ######################################## + # Now that we have the pool, put them to work + # Each will pull an item off of `data`, process it, and then keep + # doing that until the list is empty + + # First part: sort the individual files + pool.run(sort_hour, data=hours) + + # Second part: repartition + pool.run(repartition, data=groups) + +finally: + pool.terminate() + + print(f"Cost: ${pool.cost()}") + diff --git a/examples/servers.py b/examples/servers.py new file mode 100644 index 0000000..d87d1dc --- /dev/null +++ b/examples/servers.py @@ -0,0 +1,17 @@ +import minerva +import json + +m = minerva.Minerva("hay-te") +pier = m.pier(subnet_id = "subnet-08438df942a357b21", # haystac-te-subnet-public1-us-east-1c + sg_groups = ["sg-005d1f7b02f1e4b06", # ssh + "sg-06f81d2d2d58dfc6b"], # default + iam = "Minerva", + key_pair = ("Ari-Brown-HAY-TE", "~/.ssh/Ari-Brown-HAY-TE.pem")) + +def worker(num): + return pier.machine(instance_type = "r6a.2xlarge", + username = "ubuntu", + name = f"minerva-worker-{num}", + ami = "ami-0796c86095e0ac8fe", + disk_size = 512) +