| cluster | ||
| examples | ||
| minerva | ||
| test | ||
| .gitignore | ||
| .gitlab-ci.yml | ||
| create_view.sql | ||
| pyproject.toml | ||
| README.md | ||
| TODO.md | ||
Minerva
Minerva is the Roman equivalent of Athena, and Athena is AWS's database that stores results in S3. However, Minerva goes beyond that, and now eases all AWS access, and even offers its own cluster management with Dask.
Athena
In order to ease programmatic access to Athena and offer blocking access (so
that your code waits for the result), I wrote minerva to make it seamless.
IN ORDER TO GET UNLOAD TIMESTAMPS, YOU MUST FOLLOW THE INSTRUCTIONS IN THE TIMESTAMP SECTION
The results are returned as pyarrow datasets (with parquet files as the underlying structure).
Please follow along at examples/athena_basic_query.py.
Import the required and helpful libraries:
import minerva
import pprint
pp = pprint.PrettyPrinter(indent=4)
The first substantive line is create a handle to the AWS account according to
your AWS profile in ~/.aws/credentials:
m = minerva.Minerva("hay")
Then, we create a handle to Athena. The argument passed is the S3 output
location where results will be saved (s3://<output>/results/<random number for the query>/):
athena = m.athena("s3://haystac-pmo-athena/")
We place the query in a non-blocking manner:
query = athena.query(
"""
select round(longitude, 3) as lon, count(*) as count
from trajectories.baseline
where agent = 4
group by round(longitude, 3)
order by count(*) desc
"""
)
Minerva will automatically wrap query() in an UNLOAD statement so that the
data is unloaded to S3 in the parquet format. As a prerequisite for this,
all columns must have names, which is why round(longitude, 3) is
designated lon and count(*) is count.
(If you don't want to retrieve any results, such as a CREATE TABLE
statement, then use execute() to commence a non-blocking query and use
finish() to block to completion.)
When we're reading to block until the results are ready and retrieve the results, we do:
data = query.results()
This is blocking, so the code will wait here (checking with AWS every 5
seconds) until the results are ready. Then, the results are downloaded to
/tmp/ and lazily interpreted as parquet files in the form of a
pyarrow.dataset.dataset.
We can sample the results easily without overloading memory:
pp.pprint(data.head(10))
And we also get useful statistics on the query:
print(query.runtime)
print(query.cost)
DO NOT END YOUR STATEMENTS WITH A SEMICOLON
ONLY ONE STATEMENT PER QUERY ALLOWED
Redshift
Please follow along at examples/redshift_basic_query.py.
The only difference from Athena is the creation of the Redshift handle:
red = m.redshift("s3://haystac-te-athena/",
db = "train",
workgroup = "phase1-trial2")
In this case, we're connecting to the train DB and the access to the workgroup
and DB is handled through our IAM role. Permission has to be initially granted
by running (in the Redshift web console):
grant USAGE ON schema public to "IAM:<my_iam_user>"
Run the query in the same way as you would on Athena.
S3
import minerva
m = minerva.Minerva("hay")
objs = m.s3.ls("s3://haystac-pmo-athena/")
print(list(objs))
See minerva/s3.py for a full list of supported methods.
EC2
Follow along with examples/simple_instance.py
Like in the Athena example, we first need to gain access to our desired AWS
account via the profile in ~/.aws/credentials:
import minerva
m = minerva.Minerva("hay")
With that, we now create a Pier, which is the base from which we launch our
machines. To keep things simple, we specify the subnet, security group, IAM
profile, and PKI key pair here: all machines launched off this pier will share
those qualities:
pier = m.pier(subnet_id = "subnet-05eb26d8649a093e1", # project-subnet-public1-us-east-1a
sg_groups = ["sg-0f9e555954e863954", # ssh
"sg-0b34a3f7398076545"], # default
iam = "S3+SSM+CloudWatch+ECR",
key_pair = ("Ari-Brown-HAY", "~/.ssh/Ari-Brown-HAY.pem"))
In this example, we'll only create one machine, but this shows the method you'd use to create multiple.
We're going to specify the AMI, the instance type, and the username to log in with. We can then specify the name of the instance (always a good idea), the EBS disk size in GB, and any variables that we want to make available to shell environment.
def worker(pier, n=0):
mach = pier.machine(ami = "ami-0399a4f70ca684620", # dask on ubuntu 22.04 x86
instance_type = "t3.medium",
username = "ubuntu",
name = f"test-{n}",
disk_size = 32,
variables = {"type": "worker",
"number": n})
return mach
Here, we create a single machine. Nothing will happen on AWS yet!
mach = worker(pier)
Then we tell AWS to asynchronously create our instance:
mach.create()
Then we can block and wait until the instance has been started and establish an SSH connection. This has the potential to fail if AWS takes too long to either start the instance (> 3 minutes) or if the internal services on the machine take too long to start up (> 35 seconds).
mach.login()
Now we can begin to use it! Using mach.cmd() will let you run commands on the
server via SSH. Unfortunately, they all take place within /bin/bash -c.
The return value of the cmd() function is a combination of STDOUT and STDERR,
which can be accessed via stdout and stderr, respectively.
print("*******")
print(repr(mach.cmd("echo 'hello world'").stdout))
print("*******")
print(mach.cmd("echo I am machine $number of type $type"))
print("*******")
When you're done, terminate the instance:
mach.terminate()
Cluster
IN PROGRESS
Creating a dask cluster builds on what we've seen in the other EC2 examples.
On top of that, we'll have to load in extra libraries:
from dask.distributed import Client
import dask
We define functions for creating a scheduler and our workers:
def worker(pier, n):
...
def scheduler(pier):
...
Our pier needs to include a security group that allows the cluster to talk to itself (ports 8786 and 8787):
m = minerva.Minerva("hay")
pier = m.pier(...
sg_groups = [...,
...,
"sg-04cd2626d91ac093c"], # dask (8786, 8787)
...
Finally, our hard work is done and we can start the cluster! We specify the pier, the scheduler creation function, the worker creation function, and the number of workers.
Note that since Cluster isn't fully production-ready yet, it's not available
via pier.cluster() yet.
cluster = pier.cluster(scheduler, worker, num_workers=5)
cluster.start()
This will start up a scheduler (whose dashboard will be visible at the end at http://scheduler_ip:8787) and 5 workers who are available via the Dask library.
Connect to the dask cluster:
client = Client(cluster.public_location)
And use according to standard dask instructions.
Helpers
I wrote a Timing module to help with timing various functions:
with Timing("my cool test"):
long_function()
# Prints the following
#
# my cool test:
# => 32.45
Timestamps
Athena can't unload timestamps that have a timezone. Thus, you have to create a view in order to asbtract away the detail of the UTC timestamp. This is required for any unloaded format that isn't CSV.
When we ingest data, we ingest it into e.g. baseline_original and then create
a view baseline that doesn't have the timestamp involved.
-- view to accomodate for the fact that Athena can't handle timezones
-- required in order to unload the data in any format that's not CSV
create or replace view my_data AS
select
agent, from_unixtime(cast(to_unixtime(timestamp) AS bigint)) as timestamp, latitude, longitude
from my_data_original
Writing SQL
I recommend keeping the SQL in separate files (be organized!) and then
processing them with the Mako templating library. Use ${} within your SQL
files in order to pass variables.
Build
To build the project, run the following commands. (Requires poetry installed):
poetry install
poetry build
TODO
- parallelize the downloading of files