import minerva as m import servers as s import json import math import sys import os src_top_level = "s3://phase1.trial2/ta1.kitware/te.apl/transforms/plain/" sorted_top = "s3://phase1.trial2/ta1.kitware/te.apl/transforms/ari_sorted/" dst_top_level = "s3://phase1.trial2/ta1.kitware/te.apl/transforms/ari_sorted_2/" def sort_hour(mach, hour): image = "436820952613.dkr.ecr.us-east-1.amazonaws.com/apl-pyarrow-experiment" # Prep the info for the docker container srted_loc = sorted_top + '/'.join(hour.split('/')[-4:]) srted_loc += "/data.zstd.parquet" variables = {"source": hour, # hour "destination": srted_loc } # precise location of new file try: os.mkdir(f"/tmp/{mach.name}") except: pass # Create the machine to run it dock = m.Docker(machine = mach, container = image, variables = {"PAYLOAD": json.dumps(variables)}, stdout = open(f"/tmp/{mach.name}/sort_stdout.out", "wb"), stderr = open(f"/tmp/{mach.name}/sort_stderr.out", "wb")) dock.run() def repartition(mach, agents): image = "436820952613.dkr.ecr.us-east-1.amazonaws.com/apl-pyarrow-experiment-agent" # Prep the info for the docker container variables = {"min_agent": min(agents), "max_agent": max(agents), "source": sorted_top, "destination": dst_top_level, "secondary_destination": None} # Create the machine to run it dock = m.Docker(machine = mach, container = image, variables = {"PAYLOAD": json.dumps(variables)}, stdout = open(f"/tmp/{mach.name}/repartition_stdout.out", "wb"), stderr = open(f"/tmp/{mach.name}/repartition_stderr.out", "wb")) dock.run() ##################################### # Prep the work # Find out how many hours there are in the dataset pool_size = 1 objs = s.m.s3.ls(src_top_level + "year=") hours = set(["s3://" + '/'.join([o.bucket_name, *o.key.split("/")[0:-1]]) for o in objs]) print(f"{len(hours)} hours to sort") hours = sorted(hours) hours = hours[0:pool_size + 1] print(f"\tprocessing {len(hours)} of them") # Split the agents into chunks for each machine in the pool agents = list(range(200)) size = math.ceil(len(agents) / pool_size) groups = [agents[i:i + size] for i in range(0, len(agents), size)] try: ####################################### # Create the machines # This also waits for them to be made pool = m.Pool(s.worker, pool_size) ######################################## # Now that we have the pool, put them to work # Each will pull an item off of `data`, process it, and then keep # doing that until the list is empty # First part: sort the individual files pool.run(sort_hour, data=hours) # Second part: repartition pool.run(repartition, data=groups) finally: pool.terminate() print(f"Cost: ${pool.cost()}")