significant improvement to the readme and verification that all the examples work

This commit is contained in:
Ari Brown 2024-01-31 16:18:32 -05:00
parent e3c11fb1aa
commit 5dccce53e9
9 changed files with 275 additions and 109 deletions

168
README.md
View file

@ -7,6 +7,9 @@ access, and even offers its own cluster management with Dask.
In order to ease programmatic access to Athena and offer blocking access (so In order to ease programmatic access to Athena and offer blocking access (so
that your code waits for the result), I wrote `minerva` to make it seamless. that your code waits for the result), I wrote `minerva` to make it seamless.
**IN ORDER TO GET UNLOAD TIMESTAMPS, YOU MUST FOLLOW THE INSTRUCTIONS IN THE
TIMESTAMP SECTION**
The results are returned as pyarrow datasets (with parquet files as the The results are returned as pyarrow datasets (with parquet files as the
underlying structure). underlying structure).
@ -109,6 +112,8 @@ by running (in the Redshift web console):
grant USAGE ON schema public to "IAM:<my_iam_user>" grant USAGE ON schema public to "IAM:<my_iam_user>"
``` ```
Run the query in the same way as you would on Athena.
## S3 ## S3
``` ```
@ -125,9 +130,146 @@ See `minerva/s3.py` for a full list of supported methods.
Follow along with `examples/simple_instance.py` Follow along with `examples/simple_instance.py`
Like in the Athena example, we first need to gain access to our desired AWS
account via the profile in `~/.aws/credentials`:
```
import minerva
m = minerva.Minerva("hay")
```
With that, we now create a `Pier`, which is the base from which we launch our
machines. To keep things simple, we specify the subnet, security group, IAM
profile, and PKI key pair here: all machines launched off this `pier` will share
those qualities:
```
pier = m.pier(subnet_id = "subnet-05eb26d8649a093e1", # project-subnet-public1-us-east-1a
sg_groups = ["sg-0f9e555954e863954", # ssh
"sg-0b34a3f7398076545"], # default
iam = "S3+SSM+CloudWatch+ECR",
key_pair = ("Ari-Brown-HAY", "~/.ssh/Ari-Brown-HAY.pem"))
```
In this example, we'll only create one machine, but this shows the method you'd
use to create multiple.
We're going to specify the AMI, the instance type, and the username to log in
with. We can then specify the name of the instance (always a good idea), the
EBS disk size in GB, and any variables that we want to make available to shell
environment.
```
def worker(pier, n=0):
mach = pier.machine(ami = "ami-0399a4f70ca684620", # dask on ubuntu 22.04 x86
instance_type = "t3.medium",
username = "ubuntu",
name = f"test-{n}",
disk_size = 32,
variables = {"type": "worker",
"number": n})
return mach
```
Here, we create a single machine. Nothing will happen on AWS yet!
```
mach = worker(pier)
```
Then we tell AWS to *asynchronously* create our instance:
```
mach.create()
```
Then we can **block** and wait until the instance has been started and establish
an SSH connection. This has the potential to fail if AWS takes too long to
either start the instance (> 3 minutes) or if the internal services on the
machine take too long to start up (> 35 seconds).
```
mach.login()
```
Now we can begin to use it! Using `mach.cmd()` will let you run commands on the
server via SSH. Unfortunately, they all take place within `/bin/bash -c`.
The return value of the `cmd()` function is a combination of STDOUT and STDERR,
which can be accessed via `stdout` and `stderr`, respectively.
```
print("*******")
print(repr(mach.cmd("echo 'hello world'").stdout))
print("*******")
print(mach.cmd("echo I am machine $number of type $type"))
print("*******")
```
When you're done, terminate the instance:
```
mach.terminate()
```
## Cluster ## Cluster
## Dask Creating a `dask` cluster builds on what we've seen in the other EC2 examples.
On top of that, we'll have to load in extra libraries:
```
from dask.distributed import Client
import dask
```
We define functions for creating a scheduler and our workers:
```
def worker(pier, n):
...
def scheduler(pier):
...
```
Our pier needs to include a security group that allows the cluster to talk to
itself (ports 8786 and 8787):
```
m = minerva.Minerva("hay")
pier = m.pier(...
sg_groups = [...,
...,
"sg-04cd2626d91ac093c"], # dask (8786, 8787)
...
```
Finally, our hard work is done and we can start the cluster! We specify the
pier, the scheduler creation function, the worker creation function, and the
number of workers.
Note that since `Cluster` isn't fully production-ready yet, it's not available
via `pier.cluster()` yet.
```
cluster = pier.cluster(scheduler, worker, num_workers=5)
cluster.start()
```
This will start up a scheduler (whose dashboard will be visible at the end at
http://scheduler_ip:8787) and 5 workers who are available via the Dask library.
Connect to the dask cluster:
```
client = Client(cluster.public_location)
```
And use according to standard dask instructions.
## Helpers ## Helpers
I wrote a `Timing` module to help with timing various functions: I wrote a `Timing` module to help with timing various functions:
@ -142,7 +284,29 @@ with Timing("my cool test"):
# => 32.45 # => 32.45
``` ```
# Basic Usage ## Timestamps
Athena can't unload timestamps that have a timezone. Thus, you have to create a
view in order to asbtract away the detail of the UTC timestamp. This is required
for any unloaded format that isn't CSV.
When we ingest data, we ingest it into e.g. `baseline_original` and then create
a view `baseline` that doesn't have the timestamp involved.
```
-- view to accomodate for the fact that Athena can't handle timezones
-- required in order to unload the data in any format that's not CSV
create or replace view my_data AS
select
agent, from_unixtime(cast(to_unixtime(timestamp) AS bigint)) as timestamp, latitude, longitude
from my_data_original
```
## Writing SQL
I recommend keeping the SQL in separate files (be organized!) and then
processing them with the `Mako` templating library. Use `${}` within your SQL
files in order to pass variables.
# Build # Build

View file

@ -1,11 +1,9 @@
import minerva import minerva
import pprint
pp = pprint.PrettyPrinter(indent=4)
m = minerva.Minerva("hay") m = minerva.Minerva("hay")
athena = m.athena("s3://haystac-pmo-athena/") athena = m.athena("s3://haystac-pmo-athena/")
# `execute()` does NOT provide any results and does NOT use `UNLOAD`
query = athena.execute( query = athena.execute(
""" """
create database if not exists test create database if not exists test

View file

@ -13,15 +13,17 @@ athena = m.athena("s3://haystac-pmo-athena/")
# Everything *needs* to have a column in order for unloading to parquet to work, # Everything *needs* to have a column in order for unloading to parquet to work,
# so scalar values have to be assigned something, so here we use `as count` to # so scalar values have to be assigned something, so here we use `as count` to
# create a temporary column called `count` # create a temporary column called `count`
query = athena.query( #query = athena.query(
""" # """
select round(longitude, 3) as lon, count(*) as count # select round(longitude, 3) as lon, count(*) as count
from trajectories.baseline # from trajectories.basline
where agent = 4 # where agent = 4
group by round(longitude, 3) # group by round(longitude, 3)
order by count(*) desc # order by count(*) desc
""" # """
) #)
query = athena.query("select * from trajectories.basline where agent < 100 limit 100")
data = query.results() data = query.results()
pp.pprint(data.head(10)) pp.pprint(data.head(10))

View file

@ -1,5 +1,4 @@
from minerva.cluster import Cluster import minerva
from minerva.pier import Pier
from minerva.timing import Timing from minerva.timing import Timing
from dask.distributed import Client from dask.distributed import Client
import dask import dask
@ -7,7 +6,7 @@ import dask
########### PREP ############################ ########### PREP ############################
def worker(pier, n): def worker(pier, n):
mach = pier.machine(ami = "ami-01f85b935dc9f674c", # dask on ubuntu 22.04 x86 mach = pier.machine(ami = "ami-0399a4f70ca684620", # dask on ubuntu 22.04 x86
instance_type = "t3.medium", instance_type = "t3.medium",
username = "ubuntu", username = "ubuntu",
name = f"dask-worker-{n}", name = f"dask-worker-{n}",
@ -17,24 +16,25 @@ def worker(pier, n):
return mach return mach
def scheduler(pier): def scheduler(pier):
mach = pier.machine(ami = "ami-01f85b935dc9f674c", # dask on ubuntu 22.04 x86 mach = pier.machine(ami = "ami-0399a4f70ca684620", # dask on ubuntu 22.04 x86
instance_type = "t3.medium", instance_type = "t3.medium",
username = "ubuntu", username = "ubuntu",
disk_size = 32,
name = f"dask-scheduler", name = f"dask-scheduler",
variables = {"type": "scheduler"}) variables = {"type": "scheduler"})
return mach return mach
########## CLUSTER ########################## ########## CLUSTER ##########################
pier = Pier("hay", m = minerva.Minerva("hay")
subnet_id = "subnet-05eb26d8649a093e1", # project-subnet-public1-us-east-1a pier = m.pier(subnet_id = "subnet-05eb26d8649a093e1", # project-subnet-public1-us-east-1a
sg_groups = ["sg-0f9e555954e863954", # ssh sg_groups = ["sg-0f9e555954e863954", # ssh
"sg-0b34a3f7398076545", # default "sg-0b34a3f7398076545", # default
"sg-04cd2626d91ac093c"], # dask (8786, 8787) "sg-04cd2626d91ac093c"], # dask (8786, 8787)
key_pair = ("Ari-Brown-HAY", "~/.ssh/Ari-Brown-HAY.pem"), key_pair = ("Ari-Brown-HAY", "~/.ssh/Ari-Brown-HAY.pem"),
iam = "S3+SSM+CloudWatch+ECR") iam = "S3+SSM+CloudWatch+ECR")
cluster = Cluster(pier, scheduler, worker, num_workers=5) cluster = pier.cluster(scheduler, worker, num_workers=2)
cluster.start() cluster.start()
########## USAGE ######################## ########## USAGE ########################
@ -43,31 +43,9 @@ try:
client = Client(cluster.public_location) client = Client(cluster.public_location)
print(client) print(client)
# Practice with a big array athena = m.athena("s3://haystac-pmo-athena/")
# https://matthewrocklin.com/blog/work/2017/01/12/dask-dataframes query = athena.query("select * from trajectories.basline where agent < 100")
df = query.distribute_results(client)
#import numpy as np
#import dask.array as da
import dask.dataframe as dd
import time
# https://stackoverflow.com/questions/43796774/loading-local-file-from-client-onto-dask-distributed-cluster
# Iteratively load files and scatter them to the cluster
#
# futures = []
# for fn in filenames:
# df = pd.read_csv(fn)
# future = client.scatter(df)
# futures.append(future)
#
# ddf = dd.from_delayed(futures, meta=df)
# query = athena.query("select * from trajectories")
# ddf = query.distribute_results(client)
with Timing("reading parquet"):
df = dd.read_parquet("s3://haystac-archive-phase1.trial1/ta1.kitware/ta1/simulation/train/")
with Timing("persisting"): with Timing("persisting"):
dp = df.persist() dp = df.persist()

View file

@ -0,0 +1,26 @@
import minerva
from minerva.docker import Docker
import sys
m = minerva.Minerva("hay")
pier = m.pier(subnet_id = "subnet-05eb26d8649a093e1", # project-subnet-public1-us-east-1a
sg_groups = ["sg-0f9e555954e863954", # ssh
"sg-0b34a3f7398076545"], # default
iam = "S3+SSM+CloudWatch+ECR")
mach = pier.machine(ami = "ami-0b0cd81283738558a", # ubuntu 22.04 x86
instance_type = "t3.medium",
username = "ubuntu",
name = f"minerva-aws-test")
# Running the machine in the HAYSTAC PMO account
# Pulling a container from the HAYSTAC T&E account
d = Docker(machine = mach,
#container = "436820952613.dkr.ecr.us-east-1.amazonaws.com/test:latest",
container = "amazon/aws-cli:latest",
variables = {"hello": "world"},
stdout = sys.stdout)
d.create()
d.run("s3 ls")
d.terminate()

View file

@ -1,29 +0,0 @@
from minerva.pier import Pier
from minerva.docker import Docker
import sys
profile = "hay"
pier = Pier(profile,
subnet_id = "subnet-05eb26d8649a093e1", # project-subnet-public1-us-east-1a
sg_groups = ["sg-0f9e555954e863954", # ssh
"sg-0b34a3f7398076545"], # default
iam = "S3+SSM+CloudWatch+ECR")
num = 0
mach = pier.machine(ami = "ami-0b0cd81283738558a", # ubuntu 22.04 x86
instance_type = "t3.medium",
username = "ubuntu",
name = f"minerva-{num}",
variables = {"num": num})
# Running the machine in the HAYSTAC PMO account
# Pulling a container from the HAYSTAC T&E account
d = Docker(machine = mach,
#container = "436820952613.dkr.ecr.us-east-1.amazonaws.com/test:latest",
container = "amazon/aws-cli:latest",
variables = {"num": num},
stdout = sys.stdout)
d.create()
d.run()
#d.terminate()

View file

@ -8,8 +8,11 @@ red = m.redshift("s3://haystac-te-athena/",
db = "train", db = "train",
workgroup = "phase1-trial2") workgroup = "phase1-trial2")
query = red.query("select agent, st_astext(geom), datetime from public.baseline where agent = 4 limit 200") query = red.query("""select agent, st_astext(geom), timestamp from
public.baseline_v1 where agent = 44 limit 200""")
data = query.results() data = query.results()
pp.pprint(data.head(10)) pp.pprint(data.head(10))
print(query.runtime) print(query.runtime)
print(query.cost) print(query.cost)

View file

@ -1,23 +1,31 @@
import minerva import minerva
def worker(pier, n=0):
mach = pier.machine(ami = "ami-05a242924e713f80a", # dask on ubuntu 22.04 x86
instance_type = "t3.medium",
username = "ubuntu",
name = f"test-{n}",
variables = {"type": "worker",
"number": n})
return mach
m = minerva.Minerva("hay") m = minerva.Minerva("hay")
pier = m.pier(subnet_id = "subnet-05eb26d8649a093e1", # project-subnet-public1-us-east-1a pier = m.pier(subnet_id = "subnet-05eb26d8649a093e1", # project-subnet-public1-us-east-1a
sg_groups = ["sg-0f9e555954e863954", # ssh sg_groups = ["sg-0f9e555954e863954", # ssh
"sg-0b34a3f7398076545"] # default "sg-0b34a3f7398076545"], # default
iam = "S3+SSM+CloudWatch+ECR", iam = "S3+SSM+CloudWatch+ECR",
key_pair = ("Ari-Brown-HAY", "~/.ssh/Ari-Brown-HAY.pem")) key_pair = ("Ari-Brown-HAY", "~/.ssh/Ari-Brown-HAY.pem"))
def worker(pier, n=0):
mach = pier.machine(ami = "ami-0399a4f70ca684620", # dask on ubuntu 22.04 x86
instance_type = "t3.medium",
username = "ubuntu",
name = f"test-{n}",
disk_size = 32,
variables = {"type": "worker",
"number": n})
return mach
mach = worker(pier) mach = worker(pier)
mach.create() mach.create()
mach.login() mach.login()
print("*******")
print(repr(mach.cmd("echo 'hello world'").stdout))
print("*******")
print(mach.cmd("echo 'I am machine $number of type $type'"))
print("*******")
mach.terminate() mach.terminate()

View file

@ -48,6 +48,7 @@ class Execute:
self.client = redshift.client self.client = redshift.client
self.sql = sql self.sql = sql
self.info_cache = None self.info_cache = None
self.status_cache = None
self.ds = None self.ds = None
self.files = None self.files = None
self.temps = [] self.temps = []
@ -79,8 +80,13 @@ class Execute:
def info(self): def info(self):
if self.status_cache in ['FINISHED', 'ABORTED', 'FAILED']:
return self.info_cache
res = self.client.describe_statement(Id=self.query_id) res = self.client.describe_statement(Id=self.query_id)
self.info_cache = res self.info_cache = res
self.status_cache = res['Status']
return self.info_cache return self.info_cache
@ -94,7 +100,8 @@ class Execute:
self.runtime = self.info_cache['UpdatedAt'] - self.info_cache['CreatedAt'] self.runtime = self.info_cache['UpdatedAt'] - self.info_cache['CreatedAt']
if self.redshift.rpus: if self.redshift.rpus:
self.cost = 0.36 * self.redshift.rpus * self.runtime.seconds / 3600.0 # $0.36 / RPU-hour # $0.36 / RPU-hour
self.cost = 0.36 * self.redshift.rpus * self.runtime.seconds / 3600.0
return stat # finalized state return stat # finalized state
@ -104,8 +111,8 @@ class Query(Execute):
def query(self): def query(self):
self.out = os.path.join(self.redshift.output, self.out = os.path.join(self.redshift.output,
str(random.random()), "results",
'') str(random.random()) + ".")
#query = f"unload ({repr(self.sql)}) to {repr(self.out)} " + \ #query = f"unload ({repr(self.sql)}) to {repr(self.out)} " + \
# f"iam_role default " + \ # f"iam_role default " + \
# f"format as {self.DATA_STYLE} " + \ # f"format as {self.DATA_STYLE} " + \
@ -119,10 +126,14 @@ format as {self.DATA_STYLE}
manifest; manifest;
drop table temp_data; drop table temp_data;
""" """
print(query)
return query return query
def manifest_files(self): def manifest_files(self):
if self.files:
return self.files
status = self.finish() status = self.finish()
if status == "FINISHED": if status == "FINISHED":
@ -137,14 +148,19 @@ drop table temp_data;
js = json.load(f) js = json.load(f)
# Filter empty strings # Filter empty strings
files = [e['url'].strip() for e in js['entries'] if e['url'].strip()] self.files = [e['url'].strip() for e in js['entries'] if e['url'].strip()]
return files return self.files
else: else:
return status # canceled or error return status # canceled or error
def results(self): def results(self):
# if it's not a list, then we've failed
if type(self.manifest_files()) != type([]):
raise Exception(f"""Query has status {self.status()} did not complete and
thus has no results""")
self.temps = [self.handler.s3.download(f) for f in self.manifest_files()] self.temps = [self.handler.s3.download(f) for f in self.manifest_files()]
#local = parallel_map(self.handler.s3.download, self.manifest_files()) #local = parallel_map(self.handler.s3.download, self.manifest_files())
self.ds = pa.dataset.dataset(self.temps) self.ds = pa.dataset.dataset(self.temps)