diff --git a/README.md b/README.md index 82cd42b..5b632d4 100644 --- a/README.md +++ b/README.md @@ -10,12 +10,121 @@ that your code waits for the result), I wrote `minerva` to make it seamless. The results are returned as pyarrow datasets (with parquet files as the underlying structure). +Please follow along at `examples/athena_basic_query.py`. + +Import the required and helpful libraries: + +``` +import minerva +import pprint + +pp = pprint.PrettyPrinter(indent=4) +``` + +The first substantive line is create a handle to the AWS account according to +your AWS profile in `~/.aws/credentials`: + +``` +m = minerva.Minerva("hay") +``` + +Then, we create a handle to `Athena`. The argument passed is the S3 output +location where results will be saved (`s3:///results//`): + +``` +athena = m.athena("s3://haystac-pmo-athena/") +``` + +We place the query in a non-blocking manner: + +``` +query = athena.query( + """ + select round(longitude, 3) as lon, count(*) as count + from trajectories.baseline + where agent = 4 + group by round(longitude, 3) + order by count(*) desc + """ +) +``` + +Minerva will automatically wrap `query()` in an `UNLOAD` statement so that the +data is unloaded to S3 in the `parquet` format. As a prerequisite for this, +**all columns must have names**, which is why `round(longitude, 3)` is +designated `lon` and `count(*)` is `count`. + +(If you *don't* want to retrieve any results, such as a `CREATE TABLE` +statement, then use `execute()` to commence a non-blocking query and use +`finish()` to block to completion.) + +When we're reading to **block** until the results are ready **and retrieve the +results**, we do: + +``` +data = query.results() +``` + + +This is **blocking**, so the code will wait here (checking with AWS every 5 +seconds) until the results are ready. Then, the results are downloaded to +`/tmp/` and **lazily** interpreted as parquet files in the form of a +`pyarrow.dataset.dataset`. + +We can sample the results easily without overloading memory: + +``` +pp.pprint(data.head(10)) +``` + +And we also get useful statistics on the query: + +``` +print(query.runtime) +print(query.cost) +``` + +**DO NOT END YOUR STATEMENTS WITH A SEMICOLON** + +**ONLY ONE STATEMENT PER QUERY ALLOWED** + ## Redshift +Please follow along at `examples/redshift_basic_query.py`. + +The only difference from Athena is the creation of the Redshift handle: + +``` +red = m.redshift("s3://haystac-te-athena/", + db = "train", + workgroup = "phase1-trial2") +``` + +In this case, we're connecting to the `train` DB and the access to the workgroup +and DB is handled through our IAM role. Permission has to be initially granted +by running (in the Redshift web console): + +``` +grant USAGE ON schema public to "IAM:" +``` + ## S3 +``` +import minerva + +m = minerva.Minerva("hay") +objs = m.s3.ls("s3://haystac-pmo-athena/") +print(list(objs)) +``` + +See `minerva/s3.py` for a full list of supported methods. + ## EC2 +Follow along with `examples/simple_instance.py` + ## Cluster ## Dask @@ -34,39 +143,6 @@ with Timing("my cool test"): ``` # Basic Usage -``` -import minerva as m - -athena = m.Athena("hay", "s3://haystac-pmo-athena/") -query = athena.query('select * from "trajectories"."kitware" limit 10') -data = query.results() -print(data.head(10)) -``` - -First, a connection to Athena is made. The first argument is the AWS profile in -`~/.aws/credentials`. The second argument is the S3 location where the results -will be stored. - -In the second substantive line, an SQL query is made. This is **non-blocking**. -The query is off and running and you are free to do whatever you want now. - -In the third line, the results are requested. This is **blocking**, so the code -will wait here (checking with AWS every 5 seconds) until the results are ready. -Then, the results are downloaded to `/tmp/` and lazily interpreted as parquet -files in the form of a `pyarrow.dataset.dataset`. - -**DO NOT END YOUR STATEMENTS WITH A SEMICOLON** - -**ONLY ONE STATEMENT PER QUERY ALLOWED** - -# Returning Scalar Values -In SQL, scalar values get assigned an anonymous column -- Athena doesn't like -that. Thus, you have to assign the column a name. - -``` -data = athena.query('select count(*) as my_col from "trajectories"."kitware"').results() -print(data.head(1)) -``` # Build diff --git a/examples/athena_basic_query.py b/examples/athena_basic_query.py index 8c64435..5718d68 100644 --- a/examples/athena_basic_query.py +++ b/examples/athena_basic_query.py @@ -3,37 +3,31 @@ import pprint pp = pprint.PrettyPrinter(indent=4) +# Create the Minerva object which gives you access to the account under the +# profile `hay` m = minerva.Minerva("hay") + +# Get the Athena object athena = m.athena("s3://haystac-pmo-athena/") -#query = athena.query( -#"""SELECT * -#FROM trajectories.kitware -#WHERE ST_Disjoint( -# ST_GeometryFromText('POLYGON((103.6 1.2151693, 103.6 1.5151693, 104.14797 1.5151693, 104.14797 1.2151693, 103.6 1.2151693))'), -# ST_Point(longitude, latitude) -#) -#""") - -# Everything *needs* to have a column in order for parquet to work, so scalar -# values have to be assigned something, so here we use `as count` to create -# a temporary column called `count` -#print(athena.query("select count(*) as count from trajectories.kitware").results().head(1)) - +# Everything *needs* to have a column in order for unloading to parquet to work, +# so scalar values have to be assigned something, so here we use `as count` to +# create a temporary column called `count` query = athena.query( """ select round(longitude, 3) as lon, count(*) as count - from trajectories.kitware + from trajectories.baseline where agent = 4 group by round(longitude, 3) order by count(*) desc """ ) data = query.results() + pp.pprint(data.head(10)) + +# We also get important statistics print(query.runtime) - -#import IPython -#IPython.embed() +print(query.cost) diff --git a/examples/redshift_basic_query.py b/examples/redshift_basic_query.py index ae6fcba..ec0b7da 100644 --- a/examples/redshift_basic_query.py +++ b/examples/redshift_basic_query.py @@ -3,12 +3,14 @@ import pprint pp = pprint.PrettyPrinter(indent=4) -m = minerva.Minerva("hay") -red = m.redshift("s3://haystac-pmo-athena/", - db="dev", - cluster="redshift-cluster-1") -query = red.query("select count(*) from myspectrum_schema.kitware where agent = 4") +m = minerva.Minerva("hay-te") +red = m.redshift("s3://haystac-te-athena/", + db = "train", + workgroup = "phase1-trial2") + +query = red.query("select agent, st_astext(geom), datetime from public.baseline where agent = 4 limit 200") data = query.results() pp.pprint(data.head(10)) print(query.runtime) +print(query.cost) diff --git a/examples/simple_instance.py b/examples/simple_instance.py index 826704f..3a448ec 100644 --- a/examples/simple_instance.py +++ b/examples/simple_instance.py @@ -1,24 +1,23 @@ -from minerva.cluster import Cluster -from minerva.pier import Pier +import minerva def worker(pier, n=0): mach = pier.machine(ami = "ami-05a242924e713f80a", # dask on ubuntu 22.04 x86 instance_type = "t3.medium", username = "ubuntu", - name = f"dask-client-{n}", + name = f"test-{n}", variables = {"type": "worker", "number": n}) return mach -pier = Pier("hay", - subnet_id = "subnet-05eb26d8649a093e1", # project-subnet-public1-us-east-1a - sg_groups = ["sg-0f9e555954e863954", # ssh - "sg-0b34a3f7398076545", # default - "sg-04cd2626d91ac093c"], # dask (8786, 8787) - iam = "S3+SSM+CloudWatch+ECR", - key_pair = ("Ari-Brown-HAY", "~/.ssh/Ari-Brown-HAY.pem")) +m = minerva.Minerva("hay") +pier = m.pier(subnet_id = "subnet-05eb26d8649a093e1", # project-subnet-public1-us-east-1a + sg_groups = ["sg-0f9e555954e863954", # ssh + "sg-0b34a3f7398076545"] # default + iam = "S3+SSM+CloudWatch+ECR", + key_pair = ("Ari-Brown-HAY", "~/.ssh/Ari-Brown-HAY.pem")) mach = worker(pier) mach.create() mach.login() +mach.terminate() diff --git a/minerva/athena.py b/minerva/athena.py index 66bc7e1..4423bff 100644 --- a/minerva/athena.py +++ b/minerva/athena.py @@ -132,6 +132,7 @@ class Execute: class Query(Execute): # Automatically includes unloading the results to Parquet format + # TODO use the query ID instead of a random number def query(self): out = os.path.join(self.athena.output, "results",