Add policy comparison pipeline
This merge request adds the ability to automatically calculate file differences between GPFS logs to help assess how many files are being created, deleted, or modified within a given device. This provides us a rough measure of activity within a project on a per-policy basis. A notebook showing how to access the churn metrics and visualize them using plotly along with an example batch script showing how to submit the churn calculation as a job were added.
Implementation Details
Input Parquet Structure
The functions calculating churn read from a hive structure partitioned on tld
and policy acquisition date (acq
). Reading from the flat structure output from convert-to-parquet
had a few issues:
- There was no index, and the file entries are unsorted. This meant needing to read through the entire dataset filtering for each
tld
for comparison. This is a non-starter for CPU backends - CUDA OOM errors appeared for any tld with a large number of records in the dataset. This was despite the total memory usage for each tld being at or below the VRAM limit on an A100.
Conversion to a hive structure allows indexing and sorting by file path within a given tld and acquisition date. Each hive "leaf" (terminal subdirectory containing the parquet files) can be read as its own independent dataset as well which vastly improves data I/O. Additionally, metadata about each sub-dataset can be calculated and added for extra information.
Churn Methodology
Churn was defined as the total number of files created, deleted, or modified according to the modify
metadata timestamp. An implementation for calculating churn was added for pandas and cudf aggregators. Dask aggregators were excluded due to not supporting the specific implementation method.
Any given file was determined to have changed by reading the modify
time, the policy acquisition date, and the path index for the policy outputs for the date in question and the prior most recent log (hopefully from the day before, but that's variable). These data were combined into a single dataframe, and all duplicates of path and modification time were dropped. The remaining files were sorted as such:
- Files with 1 entry and the more recent policy acquisition date were "created"
- Files with 1 entry and the older policy acquisition date were "deleted"
- Files with 2 entries and differing modification times were "modified"
All dropped files were considered to have not changed. We should add the ability to directly confirm that by comparing the other metadata fields (except access time) later though.
Dask Considerations
Dask dataframes do not implement the keep=False
option for the drop_duplicates
method which is necessary for removing all files we do not consider to have changed. Instead, an implementation using per-partition merges and comparing each modification time should be added in the future.
Major Changes
- Added "public"
calculate_churn
function to theprocess
module - Added "private"
calculate_churn
methods to the pandas and cudf aggregator classes to implement the actual comparison - Added
db
submodule for database convenience functions. Current functions will create a SQLite database with the correct schema for the churn table and will write a dataframe to that table without needing to explicitly create an engine. The write function can probably be removed later though. - Added
write_dataset_metadata
function topolicy
module. This writes a_metadata.json
file in the parent directory for a given parquet dataset including the following information:- Allocated RAM or VRAM needed for each column in the dataset
- Number of rows across all parquet files in the dataset
- Number of columns in the dataset
Minor Changes
- Changed
convert_flat_to_hive
to accept a reservation option - Added explicit RMM pool creation for CUDF Aggregator methods to improve memory performance and reduce VRAM allocation errors
- Added
GPUtil
package for detecting existence of a GPU. Will slowly transition frompynvml
to this for performance
Bugfixes
- Fixed declared type for
df
input inPandasAggregator.aggregate
method topd.DataFrame
.
Resolved Issues
- Resolves #52 (closed)
- Resolves #53 (closed)
- Resolves #50 (closed)