minerva/README.md
2024-01-31 16:21:26 -05:00

324 lines
8.3 KiB
Markdown

# Minerva
Minerva is the Roman equivalent of Athena, and Athena is AWS's database that
stores results in S3. However, Minerva goes beyond that, and now eases all AWS
access, and even offers its own cluster management with Dask.
## Athena
In order to ease programmatic access to Athena and offer blocking access (so
that your code waits for the result), I wrote `minerva` to make it seamless.
**IN ORDER TO GET UNLOAD TIMESTAMPS, YOU MUST FOLLOW THE INSTRUCTIONS IN THE
TIMESTAMP SECTION**
The results are returned as pyarrow datasets (with parquet files as the
underlying structure).
Please follow along at `examples/athena_basic_query.py`.
Import the required and helpful libraries:
```
import minerva
import pprint
pp = pprint.PrettyPrinter(indent=4)
```
The first substantive line is create a handle to the AWS account according to
your AWS profile in `~/.aws/credentials`:
```
m = minerva.Minerva("hay")
```
Then, we create a handle to `Athena`. The argument passed is the S3 output
location where results will be saved (`s3://<output>/results/<random number for
the query>/`):
```
athena = m.athena("s3://haystac-pmo-athena/")
```
We place the query in a non-blocking manner:
```
query = athena.query(
"""
select round(longitude, 3) as lon, count(*) as count
from trajectories.baseline
where agent = 4
group by round(longitude, 3)
order by count(*) desc
"""
)
```
Minerva will automatically wrap `query()` in an `UNLOAD` statement so that the
data is unloaded to S3 in the `parquet` format. As a prerequisite for this,
**all columns must have names**, which is why `round(longitude, 3)` is
designated `lon` and `count(*)` is `count`.
(If you *don't* want to retrieve any results, such as a `CREATE TABLE`
statement, then use `execute()` to commence a non-blocking query and use
`finish()` to block to completion.)
When we're reading to **block** until the results are ready **and retrieve the
results**, we do:
```
data = query.results()
```
This is **blocking**, so the code will wait here (checking with AWS every 5
seconds) until the results are ready. Then, the results are downloaded to
`/tmp/` and **lazily** interpreted as parquet files in the form of a
`pyarrow.dataset.dataset`.
We can sample the results easily without overloading memory:
```
pp.pprint(data.head(10))
```
And we also get useful statistics on the query:
```
print(query.runtime)
print(query.cost)
```
**DO NOT END YOUR STATEMENTS WITH A SEMICOLON**
**ONLY ONE STATEMENT PER QUERY ALLOWED**
## Redshift
Please follow along at `examples/redshift_basic_query.py`.
The only difference from Athena is the creation of the Redshift handle:
```
red = m.redshift("s3://haystac-te-athena/",
db = "train",
workgroup = "phase1-trial2")
```
In this case, we're connecting to the `train` DB and the access to the workgroup
and DB is handled through our IAM role. Permission has to be initially granted
by running (in the Redshift web console):
```
grant USAGE ON schema public to "IAM:<my_iam_user>"
```
Run the query in the same way as you would on Athena.
## S3
```
import minerva
m = minerva.Minerva("hay")
objs = m.s3.ls("s3://haystac-pmo-athena/")
print(list(objs))
```
See `minerva/s3.py` for a full list of supported methods.
## EC2
Follow along with `examples/simple_instance.py`
Like in the Athena example, we first need to gain access to our desired AWS
account via the profile in `~/.aws/credentials`:
```
import minerva
m = minerva.Minerva("hay")
```
With that, we now create a `Pier`, which is the base from which we launch our
machines. To keep things simple, we specify the subnet, security group, IAM
profile, and PKI key pair here: all machines launched off this `pier` will share
those qualities:
```
pier = m.pier(subnet_id = "subnet-05eb26d8649a093e1", # project-subnet-public1-us-east-1a
sg_groups = ["sg-0f9e555954e863954", # ssh
"sg-0b34a3f7398076545"], # default
iam = "S3+SSM+CloudWatch+ECR",
key_pair = ("Ari-Brown-HAY", "~/.ssh/Ari-Brown-HAY.pem"))
```
In this example, we'll only create one machine, but this shows the method you'd
use to create multiple.
We're going to specify the AMI, the instance type, and the username to log in
with. We can then specify the name of the instance (always a good idea), the
EBS disk size in GB, and any variables that we want to make available to shell
environment.
```
def worker(pier, n=0):
mach = pier.machine(ami = "ami-0399a4f70ca684620", # dask on ubuntu 22.04 x86
instance_type = "t3.medium",
username = "ubuntu",
name = f"test-{n}",
disk_size = 32,
variables = {"type": "worker",
"number": n})
return mach
```
Here, we create a single machine. Nothing will happen on AWS yet!
```
mach = worker(pier)
```
Then we tell AWS to *asynchronously* create our instance:
```
mach.create()
```
Then we can **block** and wait until the instance has been started and establish
an SSH connection. This has the potential to fail if AWS takes too long to
either start the instance (> 3 minutes) or if the internal services on the
machine take too long to start up (> 35 seconds).
```
mach.login()
```
Now we can begin to use it! Using `mach.cmd()` will let you run commands on the
server via SSH. Unfortunately, they all take place within `/bin/bash -c`.
The return value of the `cmd()` function is a combination of STDOUT and STDERR,
which can be accessed via `stdout` and `stderr`, respectively.
```
print("*******")
print(repr(mach.cmd("echo 'hello world'").stdout))
print("*******")
print(mach.cmd("echo I am machine $number of type $type"))
print("*******")
```
When you're done, terminate the instance:
```
mach.terminate()
```
## Cluster
**IN PROGRESS**
Creating a `dask` cluster builds on what we've seen in the other EC2 examples.
On top of that, we'll have to load in extra libraries:
```
from dask.distributed import Client
import dask
```
We define functions for creating a scheduler and our workers:
```
def worker(pier, n):
...
def scheduler(pier):
...
```
Our pier needs to include a security group that allows the cluster to talk to
itself (ports 8786 and 8787):
```
m = minerva.Minerva("hay")
pier = m.pier(...
sg_groups = [...,
...,
"sg-04cd2626d91ac093c"], # dask (8786, 8787)
...
```
Finally, our hard work is done and we can start the cluster! We specify the
pier, the scheduler creation function, the worker creation function, and the
number of workers.
Note that since `Cluster` isn't fully production-ready yet, it's not available
via `pier.cluster()` yet.
```
cluster = pier.cluster(scheduler, worker, num_workers=5)
cluster.start()
```
This will start up a scheduler (whose dashboard will be visible at the end at
http://scheduler_ip:8787) and 5 workers who are available via the Dask library.
Connect to the dask cluster:
```
client = Client(cluster.public_location)
```
And use according to standard dask instructions.
## Helpers
I wrote a `Timing` module to help with timing various functions:
```
with Timing("my cool test"):
long_function()
# Prints the following
#
# my cool test:
# => 32.45
```
## Timestamps
Athena can't unload timestamps that have a timezone. Thus, you have to create a
view in order to asbtract away the detail of the UTC timestamp. This is required
for any unloaded format that isn't CSV.
When we ingest data, we ingest it into e.g. `baseline_original` and then create
a view `baseline` that doesn't have the timestamp involved.
```
-- view to accomodate for the fact that Athena can't handle timezones
-- required in order to unload the data in any format that's not CSV
create or replace view my_data AS
select
agent, from_unixtime(cast(to_unixtime(timestamp) AS bigint)) as timestamp, latitude, longitude
from my_data_original
```
## Writing SQL
I recommend keeping the SQL in separate files (be organized!) and then
processing them with the `Mako` templating library. Use `${}` within your SQL
files in order to pass variables.
# Build
To build the project, run the following commands. (Requires poetry installed):
```bash
poetry install
poetry build
```
# TODO
* parallelize the downloading of files