compb-dla-data-analysis/lib/lib.py
2023-03-17 21:49:53 +00:00

161 lines
4.0 KiB
Python

import os
from glob import glob
from pathlib import Path
import numpy as np
import pandas as pd
def read_xy(path: str):
df = pd.read_csv(path, skipinitialspace=True)
df['N'] = df.index + 1
df['r'] = (df.x ** 2 + df.y ** 2) ** 0.5
df['cr'] = df.r.cummax()
df['fd'] = np.log(df.N) / np.log(df.cr)
df['run'] = os.path.splitext(Path(path).name)[0]
return df.replace([np.inf, -np.inf], np.nan).dropna()
def read_xy_alt(path: str):
df = pd.read_csv(path)
df['N'] = df.index + 1
# Find the outermost corner of this object
df['r'] = (
(df.x.abs() + np.sqrt(0.5)) ** 2 +
(df.y.abs() + np.sqrt(0.5)) ** 2
) ** 0.5
df['cr'] = df.r.cummax()
df['fd'] = np.log(df.N) / np.log(df.cr)
df['run'] = os.path.splitext(Path(path).name)[0]
return df
def read_xyz(path: str):
df = pd.read_csv(path)
df['N'] = df.index + 1
df['r'] = (df.x ** 2 + df.y ** 2 + df.z ** 2) ** 0.5
df['cr'] = df.r.cummax()
df['fd'] = np.log(df.N) / np.log(df.cr)
df['run'] = os.path.splitext(Path(path).name)[0]
return df
def read_xyz_alt(path: str):
df = pd.read_csv(path)
df['N'] = df.index + 1
# Find the outermost corner of this object
df['r'] = (
(df.x.abs() + np.sqrt(0.5)) ** 2 +
(df.y.abs() + np.sqrt(0.5)) ** 2 +
(df.z.abs() + np.sqrt(0.5)) ** 2
) ** 0.5
df['cr'] = df.r.cummax()
df['fd'] = np.log(df.N) / np.log(df.cr)
df['run'] = os.path.splitext(Path(path).name)[0]
return df
def read_load(load_dir: str, reader=read_xy_alt):
paths = glob(f'{load_dir}/*.csv')
return pd.concat([reader(path) for path in paths])
def augment_read_with_sp(inner_reader):
def hoc(path: str):
probability = float(Path(path).parent.name)
df = inner_reader(path)
df['probability'] = probability
return df
return hoc
def read_sp_xy(specific_probability_dir: str):
probability = float(Path(specific_probability_dir).name)
df = read_load(specific_probability_dir)
df['probability'] = probability
return df
def read_sp(sp_dir: str, inner_reader=read_xy_alt):
if not Path(sp_dir).exists():
raise Exception("Root does not exist")
reader = augment_read_with_sp(inner_reader)
return pd.concat([
read_load(specific_probability_dir, reader)
for specific_probability_dir in glob(f'{sp_dir}/*')
])
def convergent_tail_index(series, tol):
diffs = np.abs(np.ediff1d(series))
for i in range(0, len(diffs)):
if np.max(diffs[i:]) <= tol:
return i
# No convergence found
return None
def mean_of_tail(series, tol=0.05):
tail_index = convergent_tail_index(series, tol)
if tail_index is None:
raise Exception("No convergence found.")
return np.mean(series[tail_index:])
def std_of_tail(series, tol=0.05):
tail_index = convergent_tail_index(series, tol)
if tail_index is None:
raise Exception("No convergence found.")
return np.std(series[tail_index:])
def fd_stats(dfs):
fds = [mean_of_tail(df.fd, 0.1) for df in dfs]
fds_clean = [f for f in fds if f < np.inf]
return np.mean(fds_clean), np.mean(fds_clean) / np.sqrt(fds_clean.length())
def linear(x, a, b):
return x * a + b
def mean_across(df):
runs = df.run.unique().size
data = df.groupby('N').agg({'fd': ['mean', 'std', ['stderr', lambda fd: np.std(fd) / np.sqrt(runs)]]}) \
.reset_index() \
.replace([np.inf, -np.inf], np.nan)
return data
def aggregate_sp_fd(df):
by_run = df.groupby(['probability', 'N'])
by_probability = by_run.agg(
overall_fd=('fd', lambda fd: np.mean(fd[-100:])),
overall_fd_std=('fd', 'std')
).reset_index().groupby('probability')
data = by_probability.agg(
fd=('overall_fd', 'mean'),
# TODO Check stats
fd_std=('overall_fd_std', lambda std: np.sqrt(np.mean(np.square(std))))
)
return data