import minerva as m import servers as s import json import math import sys src_top_level = "s3://phase1.trial2/ta1.kitware/te.apl/transforms/plain/" dst_top_level = "s3://phase1.trial2/ta1.kitware/te.apl/transforms/ari_sorted/" def sort_hour(mach, hour): image = "436820952613.dkr.ecr.us-east-1.amazonaws.com/apl-pyarrow-experiment" # Prep the info for the docker container srted_loc = src_top_level + '/'.join(hour.split('/')[-4:]) srted_loc += "/data.zstd.parquet" variables = {"source": hour, # hour "destination": srted_loc } # precise location of new file # Create the machine to run it dock = m.Docker(machine = mach, container = image, variables = {"PAYLOAD": json.dumps(variables)}, stdout = sys.stdout, stderr = sys.stderr) dock.run() def repartition(mach, agents): image = "436820952613.dkr.ecr.us-east-1.amazonaws.com/apl-pyarrow-experiment-agent" # Prep the info for the docker container variables = {"min_agent": min(agents), "max_agent": max(agents), "source": src_top_level, "destination": dst_top_level, "secondary_destination": None} # Create the machine to run it dock = m.Docker(machine = mach, container = image, variables = {"PAYLOAD": json.dumps(variables)}, stdout = sys.stdout, stderr = sys.stderr) dock.run() ##################################### # Prep the work # Find out how many hours there are in the dataset pool_size = 5 objs = s.m.s3.ls(src_top_level + "year=") hours = set(["s3://" + '/'.join([o.bucket_name, *o.key.split("/")[0:-1]]) for o in objs]) print(f"{len(hours)} hours to sort") hours = sorted(hours) hours = [hours[0]] # Split the agents into chunks for each machine in the pool agents = list(range(200)) size = math.ceil(len(agents) / pool_size) groups = [agents[i:i + size] for i in range(0, len(agents), size)] try: ####################################### # Create the machines # This also waits for them to be made pool = m.Pool(s.worker, pool_size, web=True) ######################################## # Now that we have the pool, put them to work # Each will pull an item off of `data`, process it, and then keep # doing that until the list is empty # First part: sort the individual files #pool.run(sort_hour, data=hours) # Second part: repartition #pool.run(repartition, data=groups) import IPython IPython.embed() finally: pool.terminate() print(f"Cost: ${pool.cost()}")