minerva/dask_test.py
2023-11-16 13:26:26 -05:00

65 lines
2 KiB
Python

import sys
import minerva
from minerva.timing import Timing
import dask
from dask.distributed import Client
m = minerva.Minerva("hay")
print(f"connecting to {sys.argv[1]}")
client = Client(sys.argv[1])
try:
# Practice with a big array
# https://matthewrocklin.com/blog/work/2017/01/12/dask-dataframes
#import numpy as np
#import dask.array as da
import dask.dataframe as dd
import time
# https://stackoverflow.com/questions/43796774/loading-local-file-from-client-onto-dask-distributed-cluster
athena = m.athena("s3://haystac-pmo-athena/")
# Iteratively load files and scatter them to the cluster
with Timing("athena query"):
query = athena.execute("select * from trajectories.kitware limit 1000000", format='csv')
query.finish()
df = dd.read_csv(query.info_cache['ResultConfiguration']['OutputLocation'])
#query = athena.query("select * from trajectories.kitware limit 100000000")
#df = dd.read_parquet(query.manifest_files())
#df = query.distribute_results(client, size=10000)
print("distributed")
with Timing("partitioning"):
#df = df.categorize('agent')
#df['agent'] = df['agent'].cat.as_ordered()
#partitions = df.describe(include=['category']).compute().iloc[0][0]
#df = df.set_index('agent', npartitions=partitions)
divisions = list(range(0, 10001))
df = df.set_index('agent', divisions=divisions)
with Timing("persisting"):
dp = df.persist()
import IPython
IPython.embed()
with Timing("count()"):
print(dp.count().visualize(filename='count.svg'))
with Timing("mean latitude"):
print(dp.groupby(dp.index).latitude.mean().visualize(filename='latitude.svg'))
with Timing("mean longitude"):
print(dp.groupby(dp.index).longitude.mean().visualize(filename='longitude.svg'))
finally:
########## FIN #######################
client.close()
print("end")
input()