forked from bellwether/minerva
added test files
This commit is contained in:
parent
9f8b67b98f
commit
e3055333b3
9 changed files with 313 additions and 17 deletions
|
|
@ -3,25 +3,32 @@ import servers as s
|
|||
import json
|
||||
import math
|
||||
import sys
|
||||
import os
|
||||
|
||||
src_top_level = "s3://phase1.trial2/ta1.kitware/te.apl/transforms/plain/"
|
||||
dst_top_level = "s3://phase1.trial2/ta1.kitware/te.apl/transforms/ari_sorted/"
|
||||
sorted_top = "s3://phase1.trial2/ta1.kitware/te.apl/transforms/ari_sorted/"
|
||||
dst_top_level = "s3://phase1.trial2/ta1.kitware/te.apl/transforms/ari_sorted_2/"
|
||||
|
||||
def sort_hour(mach, hour):
|
||||
image = "436820952613.dkr.ecr.us-east-1.amazonaws.com/apl-pyarrow-experiment"
|
||||
|
||||
# Prep the info for the docker container
|
||||
srted_loc = src_top_level + '/'.join(hour.split('/')[-4:])
|
||||
srted_loc = sorted_top + '/'.join(hour.split('/')[-4:])
|
||||
srted_loc += "/data.zstd.parquet"
|
||||
variables = {"source": hour, # hour
|
||||
"destination": srted_loc } # precise location of new file
|
||||
|
||||
try:
|
||||
os.mkdir(f"/tmp/{mach.name}")
|
||||
except:
|
||||
pass
|
||||
|
||||
# Create the machine to run it
|
||||
dock = m.Docker(machine = mach,
|
||||
container = image,
|
||||
variables = {"PAYLOAD": json.dumps(variables)},
|
||||
stdout = sys.stdout,
|
||||
stderr = sys.stderr)
|
||||
stdout = open(f"/tmp/{mach.name}/sort_stdout.out", "wb"),
|
||||
stderr = open(f"/tmp/{mach.name}/sort_stderr.out", "wb"))
|
||||
dock.run()
|
||||
|
||||
|
||||
|
|
@ -39,8 +46,8 @@ def repartition(mach, agents):
|
|||
dock = m.Docker(machine = mach,
|
||||
container = image,
|
||||
variables = {"PAYLOAD": json.dumps(variables)},
|
||||
stdout = sys.stdout,
|
||||
stderr = sys.stderr)
|
||||
stdout = open(f"/tmp/{mach.name}/repartition_stdout.out", "wb"),
|
||||
stderr = open(f"/tmp/{mach.name}/repartition_stderr.out", "wb"))
|
||||
dock.run()
|
||||
|
||||
|
||||
|
|
@ -55,7 +62,8 @@ hours = set(["s3://" + '/'.join([o.bucket_name, *o.key.split("/")[0:-1]])
|
|||
|
||||
print(f"{len(hours)} hours to sort")
|
||||
hours = sorted(hours)
|
||||
hours = [hours[0]]
|
||||
hours = hours[0:pool_size + 1]
|
||||
print(f"\tprocessing {len(hours)} of them")
|
||||
|
||||
# Split the agents into chunks for each machine in the pool
|
||||
agents = list(range(200))
|
||||
|
|
@ -66,7 +74,7 @@ try:
|
|||
#######################################
|
||||
# Create the machines
|
||||
# This also waits for them to be made
|
||||
pool = m.Pool(s.worker, pool_size, web=True)
|
||||
pool = m.Pool(s.worker, pool_size)
|
||||
|
||||
########################################
|
||||
# Now that we have the pool, put them to work
|
||||
|
|
@ -74,10 +82,10 @@ try:
|
|||
# doing that until the list is empty
|
||||
|
||||
# First part: sort the individual files
|
||||
#pool.run(sort_hour, data=hours)
|
||||
pool.run(sort_hour, data=hours)
|
||||
|
||||
# Second part: repartition
|
||||
#pool.run(repartition, data=groups)
|
||||
pool.run(repartition, data=groups)
|
||||
|
||||
import IPython
|
||||
IPython.embed()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue