#!/usr/bin/env python3 from ctypes import CDLL from time import sleep import os """ Execute the command find /sys/fs/cgroup -name memory.pressure to find available memory.pressue files (except /proc/pressure/memory). (actual for cgroup2) """ psi_path = '/proc/pressure/memory' def mlockall(): MCL_CURRENT = 1 MCL_FUTURE = 2 MCL_ONFAULT = 4 libc = CDLL('libc.so.6', use_errno=True) result = libc.mlockall( MCL_CURRENT | MCL_FUTURE | MCL_ONFAULT ) if result != 0: result = libc.mlockall( MCL_CURRENT | MCL_FUTURE ) if result != 0: print('WARNING: cannot lock all memory') else: pass else: pass mlockall() def psi_path_to_metrics(psi_path): with open(psi_path) as f: psi_list = f.readlines() # print(psi_list) some_list, full_list = psi_list[0].split(' '), psi_list[1].split(' ') #print(some_list, full_list) some_avg10 = some_list[1].split('=')[1] some_avg60 = some_list[2].split('=')[1] some_avg300 = some_list[3].split('=')[1] full_avg10 = full_list[1].split('=')[1] full_avg60 = full_list[1].split('=')[1] full_avg300 = full_list[1].split('=')[1] return (some_avg10, some_avg60, some_avg300, full_avg10, full_avg60, full_avg300) (some_avg10, some_avg60, some_avg300, full_avg10, full_avg60, full_avg300 ) = psi_path_to_metrics(psi_path) def cgroup2_root(): """ """ with open('/proc/mounts') as f: for line in f: if line.startswith('cgroup2 '): return line[7:].rpartition(' cgroup2 ')[0] i = cgroup2_root() print(i) def get_psi_mem_files(cgroup2_path): """ """ path_list = [] for root, dirs, files in os.walk(cgroup2_path): for file in files: path = os.path.join(root, file) if path.endswith('/memory.pressure'): path_list.append(path) return path_list if i is not None: print(get_psi_mem_files(i))