#!/usr/bin/env python3
# SPDX-License-Identifier: GPL-2.0
#
# Run a perf script command multiple times in parallel, using perf script
# options --cpu and --time so that each job processes a different chunk
# of the data.
#
# Copyright (c) 2024, Intel Corporation.
import subprocess
import argparse
import pathlib
import shlex
import time
import copy
import sys
import os
import re
glb_prog_name = "parallel-perf.py"
glb_min_interval = 10.0
glb_min_samples = 64
class Verbosity():
def __init__(self, quiet=False, verbose=False, debug=False):
self.normal = True
self.verbose = verbose
self.debug = debug
self.self_test = True
if self.debug:
self.verbose = True
if self.verbose:
quiet = False
if quiet:
self.normal = False
# Manage work (Start/Wait/Kill), as represented by a subprocess.Popen command
class Work():
def __init__(self, cmd, pipe_to, output_dir="."):
self.popen = None
self.consumer = None
self.cmd = cmd
self.pipe_to = pipe_to
self.output_dir = output_dir
self.cmdout_name = f"{output_dir}/cmd.txt"
self.stdout_name = f"{output_dir}/out.txt"
self.stderr_name = f"{output_dir}/err.txt"
def Command(self):
sh_cmd = [ shlex.quote(x) for x in self.cmd ]
return " ".join(self.cmd)
def Stdout(self):
return open(self.stdout_name, "w")
def Stderr(self):
return open(self.stderr_name, "w")
def CreateOutputDir(self):
pathlib.Path(self.output_dir).mkdir(parents=True, exist_ok=True)
def Start(self):
if self.popen:
return
self.CreateOutputDir()
with open(self.cmdout_name, "w") as f:
f.write(self.Command())
f.write("\n")
stdout = self.Stdout()
stderr = self.Stderr()
if self.pipe_to:
self.popen = subprocess.Popen(self.cmd, stdout=subprocess.PIPE, stderr=stderr)
args = shlex.split(self.pipe_to)
self.consumer = subprocess.Popen(args, stdin=self.popen.stdout, stdout=stdout, stderr=stderr)
else:
self.popen = subprocess.Popen(self.cmd, stdout=stdout, stderr=stderr)
def RemoveEmptyErrFile(self):
if os.path.exists(self.stderr_name):
if os.path.getsize(self.stderr_name) == 0:
os.unlink(self.stderr_name)
def Errors(self):
if os.path.exists(self.stderr_name):
if os.path.getsize(self.stderr_name) != 0:
return [ f"Non-empty error file {self.stderr_name}" ]
return []
def TidyUp(self):
self.RemoveEmptyErrFile()
def RawPollWait(self, p, wait):
if wait:
return p.wait()
return p.poll()
def Poll(self, wait=False):
if not self.popen:
return None
result = self.RawPollWait(self.popen, wait)
if self.consumer:
res = result
result = self.RawPollWait(self.consumer, wait)
if result != None and res == None:
self.popen.kill()
result = None
elif result == 0 and res != None and res != 0:
result = res
if result != None:
self.TidyUp()
return result
def Wait(self):
return self.Poll(wait=True)
def Kill(self):
if not self.popen:
return
self.popen.kill()
if self.consumer:
self.consumer.kill()
def KillWork(worklist, verbosity):
for w in worklist:
w.Kill()
for w in worklist:
w.Wait()
def NumberOfCPUs():
return os.sysconf("SC_NPROCESSORS_ONLN")
def NanoSecsToSecsStr(x):
if x == None:
return ""
x = str(x)
if len(x) < 10:
x = "0" * (10 - len(x)) + x
return x[:len(x) - 9] + "." + x[-9:]
def InsertOptionAfter(cmd, option, after):
try:
pos = cmd.index(after)
cmd.insert(pos + 1, option)
except:
cmd.append(option)
def CreateWorkList(cmd, pipe_to, output_dir, cpus, time_ranges_by_cpu):
max_len = len(str(cpus[-1]))
cpu_dir_fmt = f"cpu-%.{max_len}u"
worklist = []
pos = 0
for cpu in cpus