better abstractions

This commit is contained in:
Ari Brown 2023-10-26 13:10:49 -04:00
parent c9c0ad4422
commit 65247844e0
4 changed files with 22 additions and 4 deletions

View file

@ -153,12 +153,15 @@ class Query(Execute):
def distribute_results(self, client): def distribute_results(self, client):
import dask.dataframe as dd
import pandas as pd
if self.ds: if self.ds:
return self.ds return self.ds
futures = [] futures = []
for fn in self.manifest_files(): for fn in self.manifest_files():
df = pd.read_csv(fn) df = pd.read_parquet(fn)
future = client.scatter(df) future = client.scatter(df)
futures.append(future) futures.append(future)

View file

@ -6,9 +6,15 @@ class Minerva:
self.session = boto3.session.Session(profile_name=profile) self.session = boto3.session.Session(profile_name=profile)
self.s3 = m.S3(self) self.s3 = m.S3(self)
def athena(self, *args, **kwargs): def athena(self, *args, **kwargs):
return m.Athena(self, *args, **kwargs) return m.Athena(self, *args, **kwargs)
def redshift(self, *args, **kwargs): def redshift(self, *args, **kwargs):
return m.Redshift(self, *args, **kwargs) return m.Redshift(self, *args, **kwargs)
def pier(self, *args, **kwargs):
return m.Pier(self, *args, **kwargs)

View file

@ -4,17 +4,19 @@ import os
import stat import stat
from minerva.machine import Machine from minerva.machine import Machine
from minerva.cluster import Cluster
# Used for interacting with AWS # Used for interacting with AWS
class Pier: class Pier:
def __init__(self, def __init__(self,
profile = "default", handler = None,
subnet_id = None, subnet_id = None,
sg_groups = [], sg_groups = [],
key_pair = None, # (keypair name, keypair privkey pair) key_pair = None, # (keypair name, keypair privkey pair)
iam = None, iam = None,
): ):
self.session = boto3.session.Session(profile_name=profile) self.handler = handler
self.session = handler.session
self.ec2 = self.session.client("ec2") self.ec2 = self.session.client("ec2")
self.subnet_id = subnet_id self.subnet_id = subnet_id
self.groups = sg_groups self.groups = sg_groups
@ -29,6 +31,7 @@ class Pier:
else: else:
self.make_key_pair() self.make_key_pair()
def make_key_pair(self): def make_key_pair(self):
self.key_pair_name = f"Minerva-{random.random()}" self.key_pair_name = f"Minerva-{random.random()}"
print(f"making keypair ({self.key_pair_name})") print(f"making keypair ({self.key_pair_name})")
@ -39,9 +42,11 @@ class Pier:
f.write(self.key['KeyMaterial']) f.write(self.key['KeyMaterial'])
os.chmod(self.key_path, stat.S_IRUSR | stat.S_IWUSR) os.chmod(self.key_path, stat.S_IRUSR | stat.S_IWUSR)
def machine(self, **kwargs): def machine(self, **kwargs):
return Machine(self, **kwargs) return Machine(self, **kwargs)
def t3_med(self, num): def t3_med(self, num):
return self.machine(ami = "ami-0a538467cc9da9bb2", return self.machine(ami = "ami-0a538467cc9da9bb2",
instance_type = "t3.medium", instance_type = "t3.medium",
@ -49,3 +54,7 @@ class Pier:
name = f"blah-{num}", name = f"blah-{num}",
variables = {"num": num}) variables = {"num": num})
def cluster(self, *args, **kwargs):
return Cluster(self, *args, **kwargs)

View file

@ -1,6 +1,6 @@
[tool.poetry] [tool.poetry]
name = "minerva" name = "minerva"
version = "0.6.1" version = "0.6.2"
description = "Easier access to AWS Athena and Redshift" description = "Easier access to AWS Athena and Redshift"
authors = [ authors = [
"Ari Brown <ari@airintech.com>", "Ari Brown <ari@airintech.com>",