tidying up examples and readme

This commit is contained in:
Ari Brown 2024-01-30 18:13:56 -05:00
parent 819bf7abf3
commit e3c11fb1aa
5 changed files with 138 additions and 66 deletions

142
README.md
View file

@ -10,12 +10,121 @@ that your code waits for the result), I wrote `minerva` to make it seamless.
The results are returned as pyarrow datasets (with parquet files as the The results are returned as pyarrow datasets (with parquet files as the
underlying structure). underlying structure).
Please follow along at `examples/athena_basic_query.py`.
Import the required and helpful libraries:
```
import minerva
import pprint
pp = pprint.PrettyPrinter(indent=4)
```
The first substantive line is create a handle to the AWS account according to
your AWS profile in `~/.aws/credentials`:
```
m = minerva.Minerva("hay")
```
Then, we create a handle to `Athena`. The argument passed is the S3 output
location where results will be saved (`s3://<output>/results/<random number for
the query>/`):
```
athena = m.athena("s3://haystac-pmo-athena/")
```
We place the query in a non-blocking manner:
```
query = athena.query(
"""
select round(longitude, 3) as lon, count(*) as count
from trajectories.baseline
where agent = 4
group by round(longitude, 3)
order by count(*) desc
"""
)
```
Minerva will automatically wrap `query()` in an `UNLOAD` statement so that the
data is unloaded to S3 in the `parquet` format. As a prerequisite for this,
**all columns must have names**, which is why `round(longitude, 3)` is
designated `lon` and `count(*)` is `count`.
(If you *don't* want to retrieve any results, such as a `CREATE TABLE`
statement, then use `execute()` to commence a non-blocking query and use
`finish()` to block to completion.)
When we're reading to **block** until the results are ready **and retrieve the
results**, we do:
```
data = query.results()
```
This is **blocking**, so the code will wait here (checking with AWS every 5
seconds) until the results are ready. Then, the results are downloaded to
`/tmp/` and **lazily** interpreted as parquet files in the form of a
`pyarrow.dataset.dataset`.
We can sample the results easily without overloading memory:
```
pp.pprint(data.head(10))
```
And we also get useful statistics on the query:
```
print(query.runtime)
print(query.cost)
```
**DO NOT END YOUR STATEMENTS WITH A SEMICOLON**
**ONLY ONE STATEMENT PER QUERY ALLOWED**
## Redshift ## Redshift
Please follow along at `examples/redshift_basic_query.py`.
The only difference from Athena is the creation of the Redshift handle:
```
red = m.redshift("s3://haystac-te-athena/",
db = "train",
workgroup = "phase1-trial2")
```
In this case, we're connecting to the `train` DB and the access to the workgroup
and DB is handled through our IAM role. Permission has to be initially granted
by running (in the Redshift web console):
```
grant USAGE ON schema public to "IAM:<my_iam_user>"
```
## S3 ## S3
```
import minerva
m = minerva.Minerva("hay")
objs = m.s3.ls("s3://haystac-pmo-athena/")
print(list(objs))
```
See `minerva/s3.py` for a full list of supported methods.
## EC2 ## EC2
Follow along with `examples/simple_instance.py`
## Cluster ## Cluster
## Dask ## Dask
@ -34,39 +143,6 @@ with Timing("my cool test"):
``` ```
# Basic Usage # Basic Usage
```
import minerva as m
athena = m.Athena("hay", "s3://haystac-pmo-athena/")
query = athena.query('select * from "trajectories"."kitware" limit 10')
data = query.results()
print(data.head(10))
```
First, a connection to Athena is made. The first argument is the AWS profile in
`~/.aws/credentials`. The second argument is the S3 location where the results
will be stored.
In the second substantive line, an SQL query is made. This is **non-blocking**.
The query is off and running and you are free to do whatever you want now.
In the third line, the results are requested. This is **blocking**, so the code
will wait here (checking with AWS every 5 seconds) until the results are ready.
Then, the results are downloaded to `/tmp/` and lazily interpreted as parquet
files in the form of a `pyarrow.dataset.dataset`.
**DO NOT END YOUR STATEMENTS WITH A SEMICOLON**
**ONLY ONE STATEMENT PER QUERY ALLOWED**
# Returning Scalar Values
In SQL, scalar values get assigned an anonymous column -- Athena doesn't like
that. Thus, you have to assign the column a name.
```
data = athena.query('select count(*) as my_col from "trajectories"."kitware"').results()
print(data.head(1))
```
# Build # Build

View file

@ -3,37 +3,31 @@ import pprint
pp = pprint.PrettyPrinter(indent=4) pp = pprint.PrettyPrinter(indent=4)
# Create the Minerva object which gives you access to the account under the
# profile `hay`
m = minerva.Minerva("hay") m = minerva.Minerva("hay")
# Get the Athena object
athena = m.athena("s3://haystac-pmo-athena/") athena = m.athena("s3://haystac-pmo-athena/")
#query = athena.query( # Everything *needs* to have a column in order for unloading to parquet to work,
#"""SELECT * # so scalar values have to be assigned something, so here we use `as count` to
#FROM trajectories.kitware # create a temporary column called `count`
#WHERE ST_Disjoint(
# ST_GeometryFromText('POLYGON((103.6 1.2151693, 103.6 1.5151693, 104.14797 1.5151693, 104.14797 1.2151693, 103.6 1.2151693))'),
# ST_Point(longitude, latitude)
#)
#""")
# Everything *needs* to have a column in order for parquet to work, so scalar
# values have to be assigned something, so here we use `as count` to create
# a temporary column called `count`
#print(athena.query("select count(*) as count from trajectories.kitware").results().head(1))
query = athena.query( query = athena.query(
""" """
select round(longitude, 3) as lon, count(*) as count select round(longitude, 3) as lon, count(*) as count
from trajectories.kitware from trajectories.baseline
where agent = 4 where agent = 4
group by round(longitude, 3) group by round(longitude, 3)
order by count(*) desc order by count(*) desc
""" """
) )
data = query.results() data = query.results()
pp.pprint(data.head(10)) pp.pprint(data.head(10))
# We also get important statistics
print(query.runtime) print(query.runtime)
print(query.cost)
#import IPython
#IPython.embed()

View file

@ -3,12 +3,14 @@ import pprint
pp = pprint.PrettyPrinter(indent=4) pp = pprint.PrettyPrinter(indent=4)
m = minerva.Minerva("hay") m = minerva.Minerva("hay-te")
red = m.redshift("s3://haystac-pmo-athena/", red = m.redshift("s3://haystac-te-athena/",
db="dev", db = "train",
cluster="redshift-cluster-1") workgroup = "phase1-trial2")
query = red.query("select count(*) from myspectrum_schema.kitware where agent = 4")
query = red.query("select agent, st_astext(geom), datetime from public.baseline where agent = 4 limit 200")
data = query.results() data = query.results()
pp.pprint(data.head(10)) pp.pprint(data.head(10))
print(query.runtime) print(query.runtime)
print(query.cost)

View file

@ -1,24 +1,23 @@
from minerva.cluster import Cluster import minerva
from minerva.pier import Pier
def worker(pier, n=0): def worker(pier, n=0):
mach = pier.machine(ami = "ami-05a242924e713f80a", # dask on ubuntu 22.04 x86 mach = pier.machine(ami = "ami-05a242924e713f80a", # dask on ubuntu 22.04 x86
instance_type = "t3.medium", instance_type = "t3.medium",
username = "ubuntu", username = "ubuntu",
name = f"dask-client-{n}", name = f"test-{n}",
variables = {"type": "worker", variables = {"type": "worker",
"number": n}) "number": n})
return mach return mach
pier = Pier("hay", m = minerva.Minerva("hay")
subnet_id = "subnet-05eb26d8649a093e1", # project-subnet-public1-us-east-1a pier = m.pier(subnet_id = "subnet-05eb26d8649a093e1", # project-subnet-public1-us-east-1a
sg_groups = ["sg-0f9e555954e863954", # ssh sg_groups = ["sg-0f9e555954e863954", # ssh
"sg-0b34a3f7398076545", # default "sg-0b34a3f7398076545"] # default
"sg-04cd2626d91ac093c"], # dask (8786, 8787) iam = "S3+SSM+CloudWatch+ECR",
iam = "S3+SSM+CloudWatch+ECR", key_pair = ("Ari-Brown-HAY", "~/.ssh/Ari-Brown-HAY.pem"))
key_pair = ("Ari-Brown-HAY", "~/.ssh/Ari-Brown-HAY.pem"))
mach = worker(pier) mach = worker(pier)
mach.create() mach.create()
mach.login() mach.login()
mach.terminate()

View file

@ -132,6 +132,7 @@ class Execute:
class Query(Execute): class Query(Execute):
# Automatically includes unloading the results to Parquet format # Automatically includes unloading the results to Parquet format
# TODO use the query ID instead of a random number
def query(self): def query(self):
out = os.path.join(self.athena.output, out = os.path.join(self.athena.output,
"results", "results",