#!/usr/bin/env python3 """ sort processes by oom_score """ # @TODO: user input validation from os import listdir from argparse import ArgumentParser from time import sleep def parse_arguments(): """ parse CLI args """ parser = ArgumentParser() parser.add_argument( '--num', '-n', help="""max number of lines; default: 99999""", default=99999, type=int ) parser.add_argument( '--len', '-l', help="""max cmdline length; default: 99999""", default=99999, type=int ) parser.add_argument( '--refresh', '-r', help='refresh interval (0 to disable); default: 0. ' 'Use it with --num/-n to also limit the output length', default=0, type=int ) return parser.parse_args() def human_readable(num): """ KiB to MiB """ return str(round(num / 1024.0)).rjust(6, ' ') def clear_screen(): """ print ANSI sequence to clear the screen """ print('\033c') class TableIndexes: # pylint: disable=too-few-public-methods """ table headers from /proc/*/status for further searching positions of VmRSS and VmSwap in each process output """ def __init__(self): with open('/proc/self/status') as status_file: status_list = status_file.readlines() status_names = [] for line in status_list: status_names.append(line.split(':')[0]) self.vm_rss = status_names.index('VmRSS') self.vm_swap = status_names.index('VmSwap') self.uid = status_names.index('Uid') INDEX = TableIndexes() class ProcessInfo: pid = None cmdline = None oom_score = None oom_score_adj = None name = None uid = None vm_rss = None vm_swap = None @classmethod def from_pid(cls, pid): """ create ProcessInfo instance reading process info from /proc/{pid}/ """ info = cls() info.pid = pid try: with open('/proc/' + pid + '/cmdline') as file: try: info.cmdline = file.readlines()[0].replace('\x00', ' ') except IndexError: return None with open('/proc/' + pid + '/oom_score') as file: info.oom_score = int(file.readlines()[0][:-1]) with open('/proc/' + pid + '/oom_score_adj') as file: info.oom_score_adj = int(file.readlines()[0][:-1]) except FileNotFoundError: return None except ProcessLookupError: return None return info def read_status(self): """ return True if process have info in /proc/{pid}/status """ try: # @TODO: stop reading file after VmSwap value retrieved with open('/proc/' + self.pid + '/status') as file: status_list = file.readlines() self.vm_rss = int( status_list[INDEX.vm_rss].split(':')[1].split(' ')[-2] ) self.vm_swap = int( status_list[INDEX.vm_swap].split(':')[1].split(' ')[-2] ) self.name = status_list[0][:-1].split('\t')[1] self.uid = status_list[INDEX.uid].split('\t')[1] except FileNotFoundError: return False except ProcessLookupError: return False return True def format_output(self, display_cmdline): """ format output for printing """ return '{} {} {} {} {} {} M {} M {}'.format( str(self.oom_score).rjust(9), str(self.oom_score_adj).rjust(13), self.uid.rjust(5), str(self.pid).rjust(5), self.name.ljust(15), human_readable(self.vm_rss), human_readable(self.vm_swap), self.cmdline[:display_cmdline] ) class Application: """ oom-sort application """ oom_list = None def __init__(self): args = parse_arguments() self.num_lines = args.num self.display_cmdline = args.len self.refresh_interval = args.refresh def print_stats(self): """ print processes stats sorted by OOM score """ oom_list = [] for pid in listdir('/proc'): # skip non-numeric entries and PID 1 if pid.isdigit() is not True or pid == '1': continue proc_info = ProcessInfo.from_pid(pid) if proc_info: oom_list.append(proc_info) oom_list.sort(key=lambda p: p.oom_score, reverse=True) if self.display_cmdline == 0: print('oom_score oom_score_adj UID PID Name VmRSS VmSwap') print('--------- ------------- ----- ----- --------------- -------- --------') else: print('oom_score oom_score_adj UID PID Name VmRSS VmSwap cmdline') print('--------- ------------- ----- ----- --------------- -------- -------- -------') # iterate through list sorted by oom_score and print name, pid, etc for proc_info in oom_list[:self.num_lines]: if proc_info.read_status(): print( proc_info.format_output( display_cmdline=self.display_cmdline ) ) def oom_top(self): """ show `top`-like refreshing stats """ while True: try: clear_screen() print("Refreshing each {} seconds, press to interrupt:".format( self.refresh_interval )) self.print_stats() sleep(self.refresh_interval) except KeyboardInterrupt: break def main(self): """ application entrypoint """ if not self.refresh_interval: self.print_stats() else: self.oom_top() if __name__ == "__main__": Application().main()