forked from bellwether/minerva
added more examples
This commit is contained in:
parent
a3374fd85c
commit
745919e587
3 changed files with 138 additions and 0 deletions
35
examples/parallelization.py
Normal file
35
examples/parallelization.py
Normal file
|
|
@ -0,0 +1,35 @@
|
||||||
|
import minerva
|
||||||
|
import pprint
|
||||||
|
|
||||||
|
pp = pprint.PrettyPrinter(indent=4)
|
||||||
|
|
||||||
|
# Create the Minerva object which gives you access to the account under the
|
||||||
|
# profile `hay`
|
||||||
|
m = minerva.Minerva("hay")
|
||||||
|
|
||||||
|
# Get the Athena object
|
||||||
|
athena = m.athena("s3://haystac-pmo-athena/")
|
||||||
|
|
||||||
|
# Parallelize across the `data` and split it into `n` chunks, one chunk per process.
|
||||||
|
# Since `num_agents` is a number, it's turned into a range and then split.
|
||||||
|
num_agents = 10000
|
||||||
|
parallel = athena.parallelize("trajectories", n = 200, data = num_agents)
|
||||||
|
for agents in parallel:
|
||||||
|
# Everything *needs* to have a column in order for unloading to parquet to work,
|
||||||
|
# so scalar values have to be assigned something, so here we use `as count` to
|
||||||
|
# create a temporary column called `count`
|
||||||
|
sql = f"""
|
||||||
|
select count(*) as cnt
|
||||||
|
from trajectories.basline
|
||||||
|
where agent >= {min(agents)} and
|
||||||
|
agent < {max(agents)}
|
||||||
|
group by agent
|
||||||
|
"""
|
||||||
|
parallel << athena.query(query, partition = {"agent": agents})
|
||||||
|
|
||||||
|
pp.pprint(parallel.results().head(10))
|
||||||
|
|
||||||
|
# We also get important statistics
|
||||||
|
print(parallel.runtime)
|
||||||
|
print(parallel.cost)
|
||||||
|
|
||||||
86
examples/repartition.py
Normal file
86
examples/repartition.py
Normal file
|
|
@ -0,0 +1,86 @@
|
||||||
|
import minerva as m
|
||||||
|
import servers as s
|
||||||
|
import json
|
||||||
|
import math
|
||||||
|
import sys
|
||||||
|
|
||||||
|
src_top_level = "s3://phase1.trial2/ta1.kitware/te.apl/transforms/plain/"
|
||||||
|
dst_top_level = "s3://phase1.trial2/ta1.kitware/te.apl/transforms/ari_sorted/"
|
||||||
|
|
||||||
|
def sort_hour(mach, hour):
|
||||||
|
image = "436820952613.dkr.ecr.us-east-1.amazonaws.com/apl-pyarrow-experiment"
|
||||||
|
|
||||||
|
# Prep the info for the docker container
|
||||||
|
srted_loc = src_top_level + '/'.join(hour.split('/')[-4:])
|
||||||
|
srted_loc += "/data.zstd.parquet"
|
||||||
|
variables = {"source": hour, # hour
|
||||||
|
"destination": srted_loc } # precise location of new file
|
||||||
|
|
||||||
|
# Create the machine to run it
|
||||||
|
dock = m.Docker(machine = mach,
|
||||||
|
container = image,
|
||||||
|
variables = {"PAYLOAD": json.dumps(variables)},
|
||||||
|
stdout = sys.stdout,
|
||||||
|
stderr = sys.stderr)
|
||||||
|
dock.run()
|
||||||
|
|
||||||
|
|
||||||
|
def repartition(mach, agents):
|
||||||
|
image = "436820952613.dkr.ecr.us-east-1.amazonaws.com/apl-pyarrow-experiment-agent"
|
||||||
|
|
||||||
|
# Prep the info for the docker container
|
||||||
|
variables = {"min_agent": min(agents),
|
||||||
|
"max_agent": max(agents),
|
||||||
|
"source": src_top_level,
|
||||||
|
"destination": dst_top_level,
|
||||||
|
"secondary_destination": None}
|
||||||
|
|
||||||
|
# Create the machine to run it
|
||||||
|
dock = m.Docker(machine = mach,
|
||||||
|
container = image,
|
||||||
|
variables = {"PAYLOAD": json.dumps(variables)},
|
||||||
|
stdout = sys.stdout,
|
||||||
|
stderr = sys.stderr)
|
||||||
|
dock.run()
|
||||||
|
|
||||||
|
|
||||||
|
#####################################
|
||||||
|
# Prep the work
|
||||||
|
# Find out how many hours there are in the dataset
|
||||||
|
pool_size = 1
|
||||||
|
|
||||||
|
objs = s.m.s3.ls(src_top_level + "year=")
|
||||||
|
hours = set(["s3://" + '/'.join([o.bucket_name, *o.key.split("/")[0:-1]])
|
||||||
|
for o in objs])
|
||||||
|
|
||||||
|
print(f"{len(hours)} hours to sort")
|
||||||
|
hours = sorted(hours)
|
||||||
|
hours = [hours[0]]
|
||||||
|
|
||||||
|
# Split the agents into chunks for each machine in the pool
|
||||||
|
agents = list(range(200))
|
||||||
|
size = math.ceil(len(agents) / pool_size)
|
||||||
|
groups = [agents[i:i + size] for i in range(0, len(agents), size)]
|
||||||
|
|
||||||
|
try:
|
||||||
|
#######################################
|
||||||
|
# Create the machines
|
||||||
|
# This also waits for them to be made
|
||||||
|
pool = m.Pool(s.worker, pool_size)
|
||||||
|
|
||||||
|
########################################
|
||||||
|
# Now that we have the pool, put them to work
|
||||||
|
# Each will pull an item off of `data`, process it, and then keep
|
||||||
|
# doing that until the list is empty
|
||||||
|
|
||||||
|
# First part: sort the individual files
|
||||||
|
pool.run(sort_hour, data=hours)
|
||||||
|
|
||||||
|
# Second part: repartition
|
||||||
|
pool.run(repartition, data=groups)
|
||||||
|
|
||||||
|
finally:
|
||||||
|
pool.terminate()
|
||||||
|
|
||||||
|
print(f"Cost: ${pool.cost()}")
|
||||||
|
|
||||||
17
examples/servers.py
Normal file
17
examples/servers.py
Normal file
|
|
@ -0,0 +1,17 @@
|
||||||
|
import minerva
|
||||||
|
import json
|
||||||
|
|
||||||
|
m = minerva.Minerva("hay-te")
|
||||||
|
pier = m.pier(subnet_id = "subnet-08438df942a357b21", # haystac-te-subnet-public1-us-east-1c
|
||||||
|
sg_groups = ["sg-005d1f7b02f1e4b06", # ssh
|
||||||
|
"sg-06f81d2d2d58dfc6b"], # default
|
||||||
|
iam = "Minerva",
|
||||||
|
key_pair = ("Ari-Brown-HAY-TE", "~/.ssh/Ari-Brown-HAY-TE.pem"))
|
||||||
|
|
||||||
|
def worker(num):
|
||||||
|
return pier.machine(instance_type = "r6a.2xlarge",
|
||||||
|
username = "ubuntu",
|
||||||
|
name = f"minerva-worker-{num}",
|
||||||
|
ami = "ami-0796c86095e0ac8fe",
|
||||||
|
disk_size = 512)
|
||||||
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue