Skip to content

Add policy comparison pipeline

This merge request adds the ability to automatically calculate file differences between GPFS logs to help assess how many files are being created, deleted, or modified within a given device. This provides us a rough measure of activity within a project on a per-policy basis. A notebook showing how to access the churn metrics and visualize them using plotly along with an example batch script showing how to submit the churn calculation as a job were added.

Implementation Details

Input Parquet Structure

The functions calculating churn read from a hive structure partitioned on tld and policy acquisition date (acq). Reading from the flat structure output from convert-to-parquet had a few issues:

  1. There was no index, and the file entries are unsorted. This meant needing to read through the entire dataset filtering for each tld for comparison. This is a non-starter for CPU backends
  2. CUDA OOM errors appeared for any tld with a large number of records in the dataset. This was despite the total memory usage for each tld being at or below the VRAM limit on an A100.

Conversion to a hive structure allows indexing and sorting by file path within a given tld and acquisition date. Each hive "leaf" (terminal subdirectory containing the parquet files) can be read as its own independent dataset as well which vastly improves data I/O. Additionally, metadata about each sub-dataset can be calculated and added for extra information.

Churn Methodology

Churn was defined as the total number of files created, deleted, or modified according to the modify metadata timestamp. An implementation for calculating churn was added for pandas and cudf aggregators. Dask aggregators were excluded due to not supporting the specific implementation method.

Any given file was determined to have changed by reading the modify time, the policy acquisition date, and the path index for the policy outputs for the date in question and the prior most recent log (hopefully from the day before, but that's variable). These data were combined into a single dataframe, and all duplicates of path and modification time were dropped. The remaining files were sorted as such:

  1. Files with 1 entry and the more recent policy acquisition date were "created"
  2. Files with 1 entry and the older policy acquisition date were "deleted"
  3. Files with 2 entries and differing modification times were "modified"

All dropped files were considered to have not changed. We should add the ability to directly confirm that by comparing the other metadata fields (except access time) later though.

Dask Considerations

Dask dataframes do not implement the keep=False option for the drop_duplicates method which is necessary for removing all files we do not consider to have changed. Instead, an implementation using per-partition merges and comparing each modification time should be added in the future.

Major Changes

  • Added "public" calculate_churn function to the process module
  • Added "private" calculate_churn methods to the pandas and cudf aggregator classes to implement the actual comparison
  • Added db submodule for database convenience functions. Current functions will create a SQLite database with the correct schema for the churn table and will write a dataframe to that table without needing to explicitly create an engine. The write function can probably be removed later though.
  • Added write_dataset_metadata function to policy module. This writes a _metadata.json file in the parent directory for a given parquet dataset including the following information:
    • Allocated RAM or VRAM needed for each column in the dataset
    • Number of rows across all parquet files in the dataset
    • Number of columns in the dataset

Minor Changes

  • Changed convert_flat_to_hive to accept a reservation option
  • Added explicit RMM pool creation for CUDF Aggregator methods to improve memory performance and reduce VRAM allocation errors
  • Added GPUtil package for detecting existence of a GPU. Will slowly transition from pynvml to this for performance

Bugfixes

  • Fixed declared type for df input in PandasAggregator.aggregate method to pd.DataFrame.

Resolved Issues

Edited by Matthew K Defenderfer

Merge request reports

Loading