forked from bellwether/minerva
52 lines
1.5 KiB
Python
52 lines
1.5 KiB
Python
import minerva
|
|
import math
|
|
|
|
m = minerva.Minerva("hay-te")
|
|
athena = m.athena("s3://haystac-te-athena/")
|
|
|
|
partition_by = "chunk"
|
|
table = "haystacdb.kitware_med"
|
|
cols = athena.describe_columns(table)
|
|
|
|
out = "haystacdb.exported_table"
|
|
dest = "s3://haystac-te-athena/test_export/"
|
|
|
|
# Have to use different datatypes here
|
|
cols = [(name, "int") if dtype == "long" else (name, dtype)
|
|
for name, dtype in cols]
|
|
cols = [(name, "int") if dtype == "integer" else (name, dtype)
|
|
for name, dtype in cols]
|
|
|
|
# TODO might have to replace integer with int
|
|
part_col = [x for x in cols if x[0] == partition_by][0]
|
|
rest_cols = [x for x in cols if x[0] != partition_by]
|
|
|
|
string = f"""
|
|
create external table {out}({', '.join(map(lambda x: ' '.join(x), rest_cols))})
|
|
partitioned by ({' '.join(part_col)})
|
|
location '{dest}'
|
|
tblproperties ('spark.sql.sources.provider' = 'delta',
|
|
'parquet.compression' = 'zstd')
|
|
"""
|
|
print(string)
|
|
|
|
try:
|
|
e = athena.execute(string)
|
|
e.finish()
|
|
|
|
count = athena.query(f"select count(distinct {partition_by}) as count from {table}").scalar()
|
|
|
|
concurrent = 100 # 100 is max concurrent writers allowed
|
|
|
|
col_names = map(lambda x: x[0], cols)
|
|
|
|
for i in range(0, count, concurrent):
|
|
print(i)
|
|
sql = f"insert into {out} select {', '.join(col_names)} from {table} where {partition_by} >= {i} and {partition_by} < {i + concurrent}"
|
|
print(sql)
|
|
q = athena.query(sql)
|
|
q.finish()
|
|
|
|
finally:
|
|
athena.delete_table("haystacdb", out)
|
|
|