minerva/examples/repartition.py
2024-09-24 10:48:47 -04:00

94 lines
3 KiB
Python

import minerva as m
import servers as s
import json
import math
import sys
import os
src_top_level = "s3://phase1.trial2/ta1.kitware/te.apl/transforms/plain/"
sorted_top = "s3://phase1.trial2/ta1.kitware/te.apl/transforms/ari_sorted/"
dst_top_level = "s3://phase1.trial2/ta1.kitware/te.apl/transforms/ari_sorted_2/"
def sort_hour(mach, hour):
image = "436820952613.dkr.ecr.us-east-1.amazonaws.com/apl-pyarrow-experiment"
# Prep the info for the docker container
srted_loc = sorted_top + '/'.join(hour.split('/')[-4:])
srted_loc += "/data.zstd.parquet"
variables = {"source": hour, # hour
"destination": srted_loc } # precise location of new file
try:
os.mkdir(f"/tmp/{mach.name}")
except:
pass
# Create the machine to run it
dock = m.Docker(machine = mach,
container = image,
variables = {"PAYLOAD": json.dumps(variables)},
stdout = open(f"/tmp/{mach.name}/sort_stdout.out", "wb"),
stderr = open(f"/tmp/{mach.name}/sort_stderr.out", "wb"))
dock.run()
def repartition(mach, agents):
image = "436820952613.dkr.ecr.us-east-1.amazonaws.com/apl-pyarrow-experiment-agent"
# Prep the info for the docker container
variables = {"min_agent": min(agents),
"max_agent": max(agents),
"source": sorted_top,
"destination": dst_top_level,
"secondary_destination": None}
# Create the machine to run it
dock = m.Docker(machine = mach,
container = image,
variables = {"PAYLOAD": json.dumps(variables)},
stdout = open(f"/tmp/{mach.name}/repartition_stdout.out", "wb"),
stderr = open(f"/tmp/{mach.name}/repartition_stderr.out", "wb"))
dock.run()
#####################################
# Prep the work
# Find out how many hours there are in the dataset
pool_size = 1
objs = s.m.s3.ls(src_top_level + "year=")
hours = set(["s3://" + '/'.join([o.bucket_name, *o.key.split("/")[0:-1]])
for o in objs])
print(f"{len(hours)} hours to sort")
hours = sorted(hours)
hours = hours[0:pool_size + 1]
print(f"\tprocessing {len(hours)} of them")
# Split the agents into chunks for each machine in the pool
agents = list(range(200))
size = math.ceil(len(agents) / pool_size)
groups = [agents[i:i + size] for i in range(0, len(agents), size)]
try:
#######################################
# Create the machines
# This also waits for them to be made
pool = m.Pool(s.worker, pool_size)
########################################
# Now that we have the pool, put them to work
# Each will pull an item off of `data`, process it, and then keep
# doing that until the list is empty
# First part: sort the individual files
pool.run(sort_hour, data=hours)
# Second part: repartition
pool.run(repartition, data=groups)
finally:
pool.terminate()
print(f"Cost: ${pool.cost()}")