# Minerva Minerva is the Roman equivalent of Athena, and Athena is AWS's database that stores results in S3. However, Minerva goes beyond that, and now eases all AWS access, and even offers its own cluster management with Dask. ## Athena In order to ease programmatic access to Athena and offer blocking access (so that your code waits for the result), I wrote `minerva` to make it seamless. **IN ORDER TO GET UNLOAD TIMESTAMPS, YOU MUST FOLLOW THE INSTRUCTIONS IN THE TIMESTAMP SECTION** The results are returned as pyarrow datasets (with parquet files as the underlying structure). Please follow along at `examples/athena_basic_query.py`. Import the required and helpful libraries: ``` import minerva import pprint pp = pprint.PrettyPrinter(indent=4) ``` The first substantive line is create a handle to the AWS account according to your AWS profile in `~/.aws/credentials`: ``` m = minerva.Minerva("hay") ``` Then, we create a handle to `Athena`. The argument passed is the S3 output location where results will be saved (`s3:///results//`): ``` athena = m.athena("s3://haystac-pmo-athena/") ``` We place the query in a non-blocking manner: ``` query = athena.query( """ select round(longitude, 3) as lon, count(*) as count from trajectories.baseline where agent = 4 group by round(longitude, 3) order by count(*) desc """ ) ``` Minerva will automatically wrap `query()` in an `UNLOAD` statement so that the data is unloaded to S3 in the `parquet` format. As a prerequisite for this, **all columns must have names**, which is why `round(longitude, 3)` is designated `lon` and `count(*)` is `count`. (If you *don't* want to retrieve any results, such as a `CREATE TABLE` statement, then use `execute()` to commence a non-blocking query and use `finish()` to block to completion.) When we're reading to **block** until the results are ready **and retrieve the results**, we do: ``` data = query.results() ``` This is **blocking**, so the code will wait here (checking with AWS every 5 seconds) until the results are ready. Then, the results are downloaded to `/tmp/` and **lazily** interpreted as parquet files in the form of a `pyarrow.dataset.dataset`. We can sample the results easily without overloading memory: ``` pp.pprint(data.head(10)) ``` And we also get useful statistics on the query: ``` print(query.runtime) print(query.cost) ``` **DO NOT END YOUR STATEMENTS WITH A SEMICOLON** **ONLY ONE STATEMENT PER QUERY ALLOWED** ## Redshift Please follow along at `examples/redshift_basic_query.py`. The only difference from Athena is the creation of the Redshift handle: ``` red = m.redshift("s3://haystac-te-athena/", db = "train", workgroup = "phase1-trial2") ``` In this case, we're connecting to the `train` DB and the access to the workgroup and DB is handled through our IAM role. Permission has to be initially granted by running (in the Redshift web console): ``` grant USAGE ON schema public to "IAM:" ``` Run the query in the same way as you would on Athena. ## S3 ``` import minerva m = minerva.Minerva("hay") objs = m.s3.ls("s3://haystac-pmo-athena/") print(list(objs)) ``` See `minerva/s3.py` for a full list of supported methods. ## EC2 Follow along with `examples/simple_instance.py` Like in the Athena example, we first need to gain access to our desired AWS account via the profile in `~/.aws/credentials`: ``` import minerva m = minerva.Minerva("hay") ``` With that, we now create a `Pier`, which is the base from which we launch our machines. To keep things simple, we specify the subnet, security group, IAM profile, and PKI key pair here: all machines launched off this `pier` will share those qualities: ``` pier = m.pier(subnet_id = "subnet-05eb26d8649a093e1", # project-subnet-public1-us-east-1a sg_groups = ["sg-0f9e555954e863954", # ssh "sg-0b34a3f7398076545"], # default iam = "S3+SSM+CloudWatch+ECR", key_pair = ("Ari-Brown-HAY", "~/.ssh/Ari-Brown-HAY.pem")) ``` In this example, we'll only create one machine, but this shows the method you'd use to create multiple. We're going to specify the AMI, the instance type, and the username to log in with. We can then specify the name of the instance (always a good idea), the EBS disk size in GB, and any variables that we want to make available to shell environment. ``` def worker(pier, n=0): mach = pier.machine(ami = "ami-0399a4f70ca684620", # dask on ubuntu 22.04 x86 instance_type = "t3.medium", username = "ubuntu", name = f"test-{n}", disk_size = 32, variables = {"type": "worker", "number": n}) return mach ``` Here, we create a single machine. Nothing will happen on AWS yet! ``` mach = worker(pier) ``` Then we tell AWS to *asynchronously* create our instance: ``` mach.create() ``` Then we can **block** and wait until the instance has been started and establish an SSH connection. This has the potential to fail if AWS takes too long to either start the instance (> 3 minutes) or if the internal services on the machine take too long to start up (> 35 seconds). ``` mach.login() ``` Now we can begin to use it! Using `mach.cmd()` will let you run commands on the server via SSH. Unfortunately, they all take place within `/bin/bash -c`. The return value of the `cmd()` function is a combination of STDOUT and STDERR, which can be accessed via `stdout` and `stderr`, respectively. ``` print("*******") print(repr(mach.cmd("echo 'hello world'").stdout)) print("*******") print(mach.cmd("echo I am machine $number of type $type")) print("*******") ``` When you're done, terminate the instance: ``` mach.terminate() ``` ## Cluster Creating a `dask` cluster builds on what we've seen in the other EC2 examples. On top of that, we'll have to load in extra libraries: ``` from dask.distributed import Client import dask ``` We define functions for creating a scheduler and our workers: ``` def worker(pier, n): ... def scheduler(pier): ... ``` Our pier needs to include a security group that allows the cluster to talk to itself (ports 8786 and 8787): ``` m = minerva.Minerva("hay") pier = m.pier(... sg_groups = [..., ..., "sg-04cd2626d91ac093c"], # dask (8786, 8787) ... ``` Finally, our hard work is done and we can start the cluster! We specify the pier, the scheduler creation function, the worker creation function, and the number of workers. Note that since `Cluster` isn't fully production-ready yet, it's not available via `pier.cluster()` yet. ``` cluster = pier.cluster(scheduler, worker, num_workers=5) cluster.start() ``` This will start up a scheduler (whose dashboard will be visible at the end at http://scheduler_ip:8787) and 5 workers who are available via the Dask library. Connect to the dask cluster: ``` client = Client(cluster.public_location) ``` And use according to standard dask instructions. ## Helpers I wrote a `Timing` module to help with timing various functions: ``` with Timing("my cool test"): long_function() # Prints the following # # my cool test: # => 32.45 ``` ## Timestamps Athena can't unload timestamps that have a timezone. Thus, you have to create a view in order to asbtract away the detail of the UTC timestamp. This is required for any unloaded format that isn't CSV. When we ingest data, we ingest it into e.g. `baseline_original` and then create a view `baseline` that doesn't have the timestamp involved. ``` -- view to accomodate for the fact that Athena can't handle timezones -- required in order to unload the data in any format that's not CSV create or replace view my_data AS select agent, from_unixtime(cast(to_unixtime(timestamp) AS bigint)) as timestamp, latitude, longitude from my_data_original ``` ## Writing SQL I recommend keeping the SQL in separate files (be organized!) and then processing them with the `Mako` templating library. Use `${}` within your SQL files in order to pass variables. # Build To build the project, run the following commands. (Requires poetry installed): ```bash poetry install poetry build ``` # TODO * parallelize the downloading of files