diff --git a/README.md b/README.md index 5b632d4..d5ac721 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,9 @@ access, and even offers its own cluster management with Dask. In order to ease programmatic access to Athena and offer blocking access (so that your code waits for the result), I wrote `minerva` to make it seamless. +**IN ORDER TO GET UNLOAD TIMESTAMPS, YOU MUST FOLLOW THE INSTRUCTIONS IN THE +TIMESTAMP SECTION** + The results are returned as pyarrow datasets (with parquet files as the underlying structure). @@ -109,6 +112,8 @@ by running (in the Redshift web console): grant USAGE ON schema public to "IAM:" ``` +Run the query in the same way as you would on Athena. + ## S3 ``` @@ -125,9 +130,146 @@ See `minerva/s3.py` for a full list of supported methods. Follow along with `examples/simple_instance.py` +Like in the Athena example, we first need to gain access to our desired AWS +account via the profile in `~/.aws/credentials`: + +``` +import minerva + +m = minerva.Minerva("hay") +``` + +With that, we now create a `Pier`, which is the base from which we launch our +machines. To keep things simple, we specify the subnet, security group, IAM +profile, and PKI key pair here: all machines launched off this `pier` will share +those qualities: + +``` +pier = m.pier(subnet_id = "subnet-05eb26d8649a093e1", # project-subnet-public1-us-east-1a + sg_groups = ["sg-0f9e555954e863954", # ssh + "sg-0b34a3f7398076545"], # default + iam = "S3+SSM+CloudWatch+ECR", + key_pair = ("Ari-Brown-HAY", "~/.ssh/Ari-Brown-HAY.pem")) +``` + +In this example, we'll only create one machine, but this shows the method you'd +use to create multiple. + +We're going to specify the AMI, the instance type, and the username to log in +with. We can then specify the name of the instance (always a good idea), the +EBS disk size in GB, and any variables that we want to make available to shell +environment. + +``` +def worker(pier, n=0): + mach = pier.machine(ami = "ami-0399a4f70ca684620", # dask on ubuntu 22.04 x86 + instance_type = "t3.medium", + username = "ubuntu", + name = f"test-{n}", + disk_size = 32, + variables = {"type": "worker", + "number": n}) + return mach +``` + +Here, we create a single machine. Nothing will happen on AWS yet! + +``` +mach = worker(pier) +``` + +Then we tell AWS to *asynchronously* create our instance: + +``` +mach.create() +``` + +Then we can **block** and wait until the instance has been started and establish +an SSH connection. This has the potential to fail if AWS takes too long to +either start the instance (> 3 minutes) or if the internal services on the +machine take too long to start up (> 35 seconds). + +``` +mach.login() +``` + +Now we can begin to use it! Using `mach.cmd()` will let you run commands on the +server via SSH. Unfortunately, they all take place within `/bin/bash -c`. + +The return value of the `cmd()` function is a combination of STDOUT and STDERR, +which can be accessed via `stdout` and `stderr`, respectively. + +``` +print("*******") +print(repr(mach.cmd("echo 'hello world'").stdout)) +print("*******") +print(mach.cmd("echo I am machine $number of type $type")) +print("*******") +``` + +When you're done, terminate the instance: + +``` +mach.terminate() +``` + + ## Cluster -## Dask +Creating a `dask` cluster builds on what we've seen in the other EC2 examples. + +On top of that, we'll have to load in extra libraries: + +``` +from dask.distributed import Client +import dask +``` + +We define functions for creating a scheduler and our workers: + +``` +def worker(pier, n): + ... + +def scheduler(pier): + ... +``` + +Our pier needs to include a security group that allows the cluster to talk to +itself (ports 8786 and 8787): + +``` +m = minerva.Minerva("hay") +pier = m.pier(... + sg_groups = [..., + ..., + "sg-04cd2626d91ac093c"], # dask (8786, 8787) + ... +``` + +Finally, our hard work is done and we can start the cluster! We specify the +pier, the scheduler creation function, the worker creation function, and the +number of workers. + +Note that since `Cluster` isn't fully production-ready yet, it's not available +via `pier.cluster()` yet. + +``` +cluster = pier.cluster(scheduler, worker, num_workers=5) +cluster.start() +``` + +This will start up a scheduler (whose dashboard will be visible at the end at +http://scheduler_ip:8787) and 5 workers who are available via the Dask library. + +Connect to the dask cluster: + +``` +client = Client(cluster.public_location) +``` + +And use according to standard dask instructions. + ## Helpers I wrote a `Timing` module to help with timing various functions: @@ -142,7 +284,29 @@ with Timing("my cool test"): # => 32.45 ``` -# Basic Usage +## Timestamps + +Athena can't unload timestamps that have a timezone. Thus, you have to create a +view in order to asbtract away the detail of the UTC timestamp. This is required +for any unloaded format that isn't CSV. + +When we ingest data, we ingest it into e.g. `baseline_original` and then create +a view `baseline` that doesn't have the timestamp involved. + +``` +-- view to accomodate for the fact that Athena can't handle timezones +-- required in order to unload the data in any format that's not CSV + +create or replace view my_data AS +select + agent, from_unixtime(cast(to_unixtime(timestamp) AS bigint)) as timestamp, latitude, longitude +from my_data_original +``` + +## Writing SQL +I recommend keeping the SQL in separate files (be organized!) and then +processing them with the `Mako` templating library. Use `${}` within your SQL +files in order to pass variables. # Build diff --git a/examples/athena_basic_execute.py b/examples/athena_basic_execute.py index c9995f0..f50b333 100644 --- a/examples/athena_basic_execute.py +++ b/examples/athena_basic_execute.py @@ -1,11 +1,9 @@ import minerva -import pprint -pp = pprint.PrettyPrinter(indent=4) - -m = minerva.Minerva("hay") +m = minerva.Minerva("hay") athena = m.athena("s3://haystac-pmo-athena/") +# `execute()` does NOT provide any results and does NOT use `UNLOAD` query = athena.execute( """ create database if not exists test diff --git a/examples/athena_basic_query.py b/examples/athena_basic_query.py index 5718d68..c48e9d2 100644 --- a/examples/athena_basic_query.py +++ b/examples/athena_basic_query.py @@ -13,15 +13,17 @@ athena = m.athena("s3://haystac-pmo-athena/") # Everything *needs* to have a column in order for unloading to parquet to work, # so scalar values have to be assigned something, so here we use `as count` to # create a temporary column called `count` -query = athena.query( - """ - select round(longitude, 3) as lon, count(*) as count - from trajectories.baseline - where agent = 4 - group by round(longitude, 3) - order by count(*) desc - """ -) +#query = athena.query( +# """ +# select round(longitude, 3) as lon, count(*) as count +# from trajectories.basline +# where agent = 4 +# group by round(longitude, 3) +# order by count(*) desc +# """ +#) + +query = athena.query("select * from trajectories.basline where agent < 100 limit 100") data = query.results() pp.pprint(data.head(10)) diff --git a/examples/dask_cluster.py b/examples/dask_cluster.py index 160c102..388e722 100644 --- a/examples/dask_cluster.py +++ b/examples/dask_cluster.py @@ -1,5 +1,4 @@ -from minerva.cluster import Cluster -from minerva.pier import Pier +import minerva from minerva.timing import Timing from dask.distributed import Client import dask @@ -7,7 +6,7 @@ import dask ########### PREP ############################ def worker(pier, n): - mach = pier.machine(ami = "ami-01f85b935dc9f674c", # dask on ubuntu 22.04 x86 + mach = pier.machine(ami = "ami-0399a4f70ca684620", # dask on ubuntu 22.04 x86 instance_type = "t3.medium", username = "ubuntu", name = f"dask-worker-{n}", @@ -17,24 +16,25 @@ def worker(pier, n): return mach def scheduler(pier): - mach = pier.machine(ami = "ami-01f85b935dc9f674c", # dask on ubuntu 22.04 x86 + mach = pier.machine(ami = "ami-0399a4f70ca684620", # dask on ubuntu 22.04 x86 instance_type = "t3.medium", username = "ubuntu", + disk_size = 32, name = f"dask-scheduler", variables = {"type": "scheduler"}) return mach ########## CLUSTER ########################## -pier = Pier("hay", - subnet_id = "subnet-05eb26d8649a093e1", # project-subnet-public1-us-east-1a - sg_groups = ["sg-0f9e555954e863954", # ssh - "sg-0b34a3f7398076545", # default - "sg-04cd2626d91ac093c"], # dask (8786, 8787) - key_pair = ("Ari-Brown-HAY", "~/.ssh/Ari-Brown-HAY.pem"), - iam = "S3+SSM+CloudWatch+ECR") +m = minerva.Minerva("hay") +pier = m.pier(subnet_id = "subnet-05eb26d8649a093e1", # project-subnet-public1-us-east-1a + sg_groups = ["sg-0f9e555954e863954", # ssh + "sg-0b34a3f7398076545", # default + "sg-04cd2626d91ac093c"], # dask (8786, 8787) + key_pair = ("Ari-Brown-HAY", "~/.ssh/Ari-Brown-HAY.pem"), + iam = "S3+SSM+CloudWatch+ECR") -cluster = Cluster(pier, scheduler, worker, num_workers=5) +cluster = pier.cluster(scheduler, worker, num_workers=2) cluster.start() ########## USAGE ######################## @@ -43,31 +43,9 @@ try: client = Client(cluster.public_location) print(client) - # Practice with a big array - # https://matthewrocklin.com/blog/work/2017/01/12/dask-dataframes - - #import numpy as np - #import dask.array as da - import dask.dataframe as dd - import time - - # https://stackoverflow.com/questions/43796774/loading-local-file-from-client-onto-dask-distributed-cluster - - # Iteratively load files and scatter them to the cluster - # - # futures = [] - # for fn in filenames: - # df = pd.read_csv(fn) - # future = client.scatter(df) - # futures.append(future) - # - # ddf = dd.from_delayed(futures, meta=df) - - # query = athena.query("select * from trajectories") - # ddf = query.distribute_results(client) - - with Timing("reading parquet"): - df = dd.read_parquet("s3://haystac-archive-phase1.trial1/ta1.kitware/ta1/simulation/train/") + athena = m.athena("s3://haystac-pmo-athena/") + query = athena.query("select * from trajectories.basline where agent < 100") + df = query.distribute_results(client) with Timing("persisting"): dp = df.persist() diff --git a/examples/docker_instance.py b/examples/docker_instance.py new file mode 100644 index 0000000..d5bd274 --- /dev/null +++ b/examples/docker_instance.py @@ -0,0 +1,26 @@ +import minerva +from minerva.docker import Docker +import sys + +m = minerva.Minerva("hay") +pier = m.pier(subnet_id = "subnet-05eb26d8649a093e1", # project-subnet-public1-us-east-1a + sg_groups = ["sg-0f9e555954e863954", # ssh + "sg-0b34a3f7398076545"], # default + iam = "S3+SSM+CloudWatch+ECR") + +mach = pier.machine(ami = "ami-0b0cd81283738558a", # ubuntu 22.04 x86 + instance_type = "t3.medium", + username = "ubuntu", + name = f"minerva-aws-test") + +# Running the machine in the HAYSTAC PMO account +# Pulling a container from the HAYSTAC T&E account +d = Docker(machine = mach, + #container = "436820952613.dkr.ecr.us-east-1.amazonaws.com/test:latest", + container = "amazon/aws-cli:latest", + variables = {"hello": "world"}, + stdout = sys.stdout) + +d.create() +d.run("s3 ls") +d.terminate() diff --git a/examples/launch_instances.py b/examples/launch_instances.py deleted file mode 100644 index 0b95c2e..0000000 --- a/examples/launch_instances.py +++ /dev/null @@ -1,29 +0,0 @@ -from minerva.pier import Pier -from minerva.docker import Docker -import sys - -profile = "hay" -pier = Pier(profile, - subnet_id = "subnet-05eb26d8649a093e1", # project-subnet-public1-us-east-1a - sg_groups = ["sg-0f9e555954e863954", # ssh - "sg-0b34a3f7398076545"], # default - iam = "S3+SSM+CloudWatch+ECR") - -num = 0 -mach = pier.machine(ami = "ami-0b0cd81283738558a", # ubuntu 22.04 x86 - instance_type = "t3.medium", - username = "ubuntu", - name = f"minerva-{num}", - variables = {"num": num}) - -# Running the machine in the HAYSTAC PMO account -# Pulling a container from the HAYSTAC T&E account -d = Docker(machine = mach, - #container = "436820952613.dkr.ecr.us-east-1.amazonaws.com/test:latest", - container = "amazon/aws-cli:latest", - variables = {"num": num}, - stdout = sys.stdout) - -d.create() -d.run() -#d.terminate() diff --git a/examples/redshift_basic_query.py b/examples/redshift_basic_query.py index ec0b7da..18b7dc2 100644 --- a/examples/redshift_basic_query.py +++ b/examples/redshift_basic_query.py @@ -8,8 +8,11 @@ red = m.redshift("s3://haystac-te-athena/", db = "train", workgroup = "phase1-trial2") -query = red.query("select agent, st_astext(geom), datetime from public.baseline where agent = 4 limit 200") +query = red.query("""select agent, st_astext(geom), timestamp from + public.baseline_v1 where agent = 44 limit 200""") + data = query.results() + pp.pprint(data.head(10)) print(query.runtime) print(query.cost) diff --git a/examples/simple_instance.py b/examples/simple_instance.py index 3a448ec..b16d807 100644 --- a/examples/simple_instance.py +++ b/examples/simple_instance.py @@ -1,23 +1,31 @@ import minerva -def worker(pier, n=0): - mach = pier.machine(ami = "ami-05a242924e713f80a", # dask on ubuntu 22.04 x86 - instance_type = "t3.medium", - username = "ubuntu", - name = f"test-{n}", - variables = {"type": "worker", - "number": n}) - return mach - m = minerva.Minerva("hay") pier = m.pier(subnet_id = "subnet-05eb26d8649a093e1", # project-subnet-public1-us-east-1a sg_groups = ["sg-0f9e555954e863954", # ssh - "sg-0b34a3f7398076545"] # default + "sg-0b34a3f7398076545"], # default iam = "S3+SSM+CloudWatch+ECR", key_pair = ("Ari-Brown-HAY", "~/.ssh/Ari-Brown-HAY.pem")) + +def worker(pier, n=0): + mach = pier.machine(ami = "ami-0399a4f70ca684620", # dask on ubuntu 22.04 x86 + instance_type = "t3.medium", + username = "ubuntu", + name = f"test-{n}", + disk_size = 32, + variables = {"type": "worker", + "number": n}) + return mach + mach = worker(pier) + mach.create() mach.login() +print("*******") +print(repr(mach.cmd("echo 'hello world'").stdout)) +print("*******") +print(mach.cmd("echo 'I am machine $number of type $type'")) +print("*******") mach.terminate() diff --git a/minerva/redshift.py b/minerva/redshift.py index e5219f6..ddff77d 100644 --- a/minerva/redshift.py +++ b/minerva/redshift.py @@ -43,14 +43,15 @@ class Execute: Execute is meant to be used for DML statements such as CREATE DATABASE/TABLE """ def __init__(self, redshift, sql): - self.redshift = redshift - self.handler = redshift.handler - self.client = redshift.client - self.sql = sql - self.info_cache = None - self.ds = None - self.files = None - self.temps = [] + self.redshift = redshift + self.handler = redshift.handler + self.client = redshift.client + self.sql = sql + self.info_cache = None + self.status_cache = None + self.ds = None + self.files = None + self.temps = [] def query(self): return self.sql @@ -58,9 +59,9 @@ class Execute: def run(self): if self.redshift.cluster: - resp = self.client.execute_statement(Sql=self.query(), - Database=self.redshift.database, - ClusterIdentifier=self.redshift.cluster) + resp = self.client.execute_statement(Sql = self.query(), + Database = self.redshift.database, + ClusterIdentifier = self.redshift.cluster) else: params = {"WorkgroupName": self.redshift.workgroup} if self.redshift.secret: @@ -79,8 +80,13 @@ class Execute: def info(self): + if self.status_cache in ['FINISHED', 'ABORTED', 'FAILED']: + return self.info_cache + res = self.client.describe_statement(Id=self.query_id) - self.info_cache = res + self.info_cache = res + self.status_cache = res['Status'] + return self.info_cache @@ -94,7 +100,8 @@ class Execute: self.runtime = self.info_cache['UpdatedAt'] - self.info_cache['CreatedAt'] if self.redshift.rpus: - self.cost = 0.36 * self.redshift.rpus * self.runtime.seconds / 3600.0 # $0.36 / RPU-hour + # $0.36 / RPU-hour + self.cost = 0.36 * self.redshift.rpus * self.runtime.seconds / 3600.0 return stat # finalized state @@ -104,8 +111,8 @@ class Query(Execute): def query(self): self.out = os.path.join(self.redshift.output, - str(random.random()), - '') + "results", + str(random.random()) + ".") #query = f"unload ({repr(self.sql)}) to {repr(self.out)} " + \ # f"iam_role default " + \ # f"format as {self.DATA_STYLE} " + \ @@ -119,10 +126,14 @@ format as {self.DATA_STYLE} manifest; drop table temp_data; """ + print(query) return query def manifest_files(self): + if self.files: + return self.files + status = self.finish() if status == "FINISHED": @@ -137,14 +148,19 @@ drop table temp_data; js = json.load(f) # Filter empty strings - files = [e['url'].strip() for e in js['entries'] if e['url'].strip()] + self.files = [e['url'].strip() for e in js['entries'] if e['url'].strip()] - return files + return self.files else: return status # canceled or error def results(self): + # if it's not a list, then we've failed + if type(self.manifest_files()) != type([]): + raise Exception(f"""Query has status {self.status()} did not complete and + thus has no results""") + self.temps = [self.handler.s3.download(f) for f in self.manifest_files()] #local = parallel_map(self.handler.s3.download, self.manifest_files()) self.ds = pa.dataset.dataset(self.temps)