forked from bellwether/minerva
added machine pool for easy clustering and trying to get glue support
This commit is contained in:
parent
c2bc9e1028
commit
fdc0fd2ded
3 changed files with 71 additions and 20 deletions
|
|
@ -7,6 +7,7 @@ from .s3 import S3
|
|||
from .docker import Docker
|
||||
from .machine import Machine
|
||||
from .pier import Pier
|
||||
from .pool import Pool
|
||||
|
||||
from .minerva import Minerva
|
||||
|
||||
|
|
@ -22,6 +23,7 @@ __all__ = [
|
|||
"cluster_pool",
|
||||
"load_template",
|
||||
"load_sql",
|
||||
"AWS_INSTANCES"
|
||||
"AWS_INSTANCES",
|
||||
"Pool"
|
||||
]
|
||||
|
||||
|
|
|
|||
|
|
@ -67,24 +67,6 @@ class Athena:
|
|||
return e
|
||||
|
||||
|
||||
# FIXME bad sql, can't drop multiple tables in athena
|
||||
def delete_tables(self, db_name, tables):
|
||||
e = Execute(self, f"drop table {', '.join(tables)}")
|
||||
e.run()
|
||||
e.finish()
|
||||
|
||||
try:
|
||||
self.glue.batch_delete_table(DatabaseName = db_name,
|
||||
TablesToDelete = tables)
|
||||
finally:
|
||||
pass
|
||||
|
||||
for table in tables:
|
||||
s3_uri = os.path.join(self.output, table, "")
|
||||
#print(f"deleting {s3_uri}")
|
||||
self.handler.s3.rm(s3_uri)
|
||||
|
||||
|
||||
def cancel(self, query_id):
|
||||
return self.client.stop_query_execution(QueryExecutionId = query_id)
|
||||
|
||||
|
|
@ -106,7 +88,11 @@ class Athena:
|
|||
e.run()
|
||||
e.finish()
|
||||
|
||||
# 2. In chunks
|
||||
# 2. Run the Glue ETL job
|
||||
return self.glue.start_job_run(
|
||||
JobName = 'convert table to delta',
|
||||
Arguments = {'--from': table,
|
||||
'--to': to})
|
||||
|
||||
|
||||
def describe_columns(self, table):
|
||||
|
|
|
|||
63
minerva/pool.py
Normal file
63
minerva/pool.py
Normal file
|
|
@ -0,0 +1,63 @@
|
|||
from threading import Thread, Lock
|
||||
|
||||
class Pool:
|
||||
def __init__(self, worker, num=1):
|
||||
# TODO can move the creation into a thread, but that might be too
|
||||
# many concurrent requests for AWS
|
||||
self.machines = [worker(i).create() for i in range(num)]
|
||||
self.mutex = None
|
||||
|
||||
for machine in self.machines:
|
||||
machine.join()
|
||||
machine.login()
|
||||
|
||||
def run(func, data=[]):
|
||||
if not data or not func:
|
||||
return
|
||||
|
||||
# We'll be modifying this, don't mess with the original
|
||||
self.mutex = Lock()
|
||||
data = data.copy()
|
||||
|
||||
# All threads are sharing the same `data` and access is controlled by a mutex
|
||||
threads = [Thread(target=self.process_queue, args=(machine, func, data))
|
||||
for machine in self.machines]
|
||||
|
||||
# Start the threads
|
||||
for thread in threads:
|
||||
thread.start()
|
||||
|
||||
# Wait for the workers to finish
|
||||
# TODO maybe return STDOUT from everything?
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
|
||||
|
||||
def process_queue(self, machine, func, data):
|
||||
self.mutex.acquire()
|
||||
while data:
|
||||
item = data.pop()
|
||||
if type(item) == type([]):
|
||||
print(f"i'm doing work with [{min(item)}..{max(item)}] on {machine}")
|
||||
else:
|
||||
print(f"i'm doing work with {item} on {machine}")
|
||||
self.mutex.release()
|
||||
|
||||
# do the work
|
||||
func(machine, item)
|
||||
#time.sleep(0.5)
|
||||
|
||||
# prior to return to the while-loop check
|
||||
self.mutex.acquire()
|
||||
|
||||
self.mutex.release() # we're done!
|
||||
|
||||
|
||||
def terminate(self):
|
||||
for mach in self.machines:
|
||||
mach.terminate()
|
||||
|
||||
|
||||
def cost(self):
|
||||
return sum([mach.cost() for mach in machines])
|
||||
|
||||
Loading…
Add table
Add a link
Reference in a new issue