K2LL33D SHELL

 Apache/2.4.7 (Ubuntu)
 Linux sman1baleendah 3.13.0-24-generic #46-Ubuntu SMP Thu Apr 10 19:11:08 UTC 2014 x86_64
 uid=33(www-data) gid=33(www-data) groups=33(www-data)
 safemode : OFF
 MySQL: ON | Perl: ON | cURL: OFF | WGet: ON
  >  / usr / share / apport / testsuite /
server ip : 172.67.156.115

your ip : 172.70.80.99

H O M E


Filename/usr/share/apport/testsuite/test_hooks.py
Size12.6 kb
Permissionrw-r--r--
Ownerroot : root
Create time27-Apr-2025 09:55
Last modified04-Apr-2014 22:30
Last accessed06-Jul-2025 19:05
Actionsedit | rename | delete | download (gzip)
Viewtext | code | image
'''Test the various package hooks.'''

# Copyright (C) 2007 - 2009 Canonical Ltd.
# Author: Martin Pitt <[email protected]>
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version. See http://www.gnu.org/copyleft/gpl.html for
# the full text of the license.

import unittest, subprocess, tempfile, os, shutil, os.path, sys, optparse

from datetime import datetime

import apport, apport.fileutils

# parse command line options
optparser = optparse.OptionParser('%prog [options]')

datadir = os.environ.get('APPORT_DATA_DIR', '/usr/share/apport')


class T(unittest.TestCase):
def setUp(self):
self.orig_report_dir = apport.fileutils.report_dir
apport.fileutils.report_dir = tempfile.mkdtemp()
os.environ['APPORT_REPORT_DIR'] = apport.fileutils.report_dir

self.workdir = tempfile.mkdtemp()

def tearDown(self):
shutil.rmtree(apport.fileutils.report_dir)
apport.fileutils.report_dir = self.orig_report_dir

shutil.rmtree(self.workdir)

def test_package_hook_nologs(self):
'''package_hook without any log files.'''

ph = subprocess.Popen(['%s/package_hook' % datadir, '-p', 'bash'],
stdin=subprocess.PIPE)
ph.communicate(b'something is wrong')
self.assertEqual(ph.returncode, 0, 'package_hook finished successfully')

reps = apport.fileutils.get_new_reports()
self.assertEqual(len(reps), 1, 'package_hook created a report')

r = apport.Report()
with open(reps[0], 'rb') as f:
r.load(f)

self.assertEqual(r['ProblemType'], 'Package')
self.assertEqual(r['Package'], 'bash')
self.assertEqual(r['SourcePackage'], 'bash')
self.assertEqual(r['ErrorMessage'], 'something is wrong')

def test_package_hook_uninstalled(self):
'''package_hook on an uninstalled package (might fail to install).'''

pkg = apport.packaging.get_uninstalled_package()
ph = subprocess.Popen(['%s/package_hook' % datadir, '-p', pkg],
stdin=subprocess.PIPE)
ph.communicate(b'something is wrong')
self.assertEqual(ph.returncode, 0, 'package_hook finished successfully')

reps = apport.fileutils.get_new_reports()
self.assertEqual(len(reps), 1, 'package_hook created a report')

r = apport.Report()
with open(reps[0], 'rb') as f:
r.load(f)

self.assertEqual(r['ProblemType'], 'Package')
self.assertEqual(r['Package'], pkg)
self.assertEqual(r['SourcePackage'], apport.packaging.get_source(pkg))
self.assertEqual(r['ErrorMessage'], 'something is wrong')

def test_package_hook_logs(self):
'''package_hook with a log dir and a log file.'''

with open(os.path.join(self.workdir, 'log_1.log'), 'w') as f:
f.write('Log 1\nbla')
with open(os.path.join(self.workdir, 'log2'), 'w') as f:
f.write('Yet\nanother\nlog')
os.mkdir(os.path.join(self.workdir, 'logsub'))
with open(os.path.join(self.workdir, 'logsub', 'notme.log'), 'w') as f:
f.write('not me!')

ph = subprocess.Popen(['%s/package_hook' % datadir, '-p', 'bash', '-l',
os.path.realpath(sys.argv[0]), '-l', self.workdir],
stdin=subprocess.PIPE)
ph.communicate(b'something is wrong')
self.assertEqual(ph.returncode, 0, 'package_hook finished successfully')

reps = apport.fileutils.get_new_reports()
self.assertEqual(len(reps), 1, 'package_hook created a report')

r = apport.Report()
with open(reps[0], 'rb') as f:
r.load(f)

filekey = None
log1key = None
log2key = None
for k in r.keys():
if k.endswith('Testhookspy'):
filekey = k
elif k.endswith('Log1log'):
log1key = k
elif k.endswith('Log2'):
log2key = k
elif 'sub' in k:
self.fail('logsub should not go into log files')

self.assertTrue(filekey)
self.assertTrue(log1key)
self.assertTrue(log2key)
self.assertTrue('0234lkjas' in r[filekey])
self.assertEqual(len(r[filekey]), os.path.getsize(sys.argv[0]))
self.assertEqual(r[log1key], 'Log 1\nbla')
self.assertEqual(r[log2key], 'Yet\nanother\nlog')

def test_package_hook_tags(self):
'''package_hook with extra tags argument.'''

ph = subprocess.Popen(['%s/package_hook' % datadir, '-p', 'bash',
'-t', 'dist-upgrade, verybad'], stdin=subprocess.PIPE)
ph.communicate(b'something is wrong')
self.assertEqual(ph.returncode, 0, 'package_hook finished successfully')

reps = apport.fileutils.get_new_reports()
self.assertEqual(len(reps), 1, 'package_hook created a report')

r = apport.Report()
with open(reps[0], 'rb') as f:
r.load(f)

self.assertEqual(r['Tags'], 'dist-upgrade verybad')

def test_kernel_crashdump_kexec(self):
'''kernel_crashdump using kexec-tools.'''

f = open(os.path.join(apport.fileutils.report_dir, 'vmcore'), 'wb')
f.write(b'\x01' * 100)
f.close()
f = open(os.path.join(apport.fileutils.report_dir, 'vmcore.log'), 'w')
f.write('vmcore successfully dumped')
f.close()

self.assertEqual(subprocess.call('%s/kernel_crashdump' % datadir), 0,
'kernel_crashdump finished successfully')

reps = apport.fileutils.get_new_reports()
self.assertEqual(len(reps), 1, 'kernel_crashdump created a report')

r = apport.Report()
with open(reps[0], 'rb') as f:
r.load(f)

self.assertEqual(set(r.keys()), set(['Date', 'Package', 'ProblemType',
'VmCore', 'VmCoreLog', 'Uname',
'Architecture', 'DistroRelease']))
self.assertEqual(r['ProblemType'], 'KernelCrash')
self.assertEqual(r['VmCoreLog'], 'vmcore successfully dumped')
self.assertEqual(r['VmCore'], b'\x01' * 100)
self.assertTrue('linux' in r['Package'])

self.assertTrue(os.uname()[2].split('-')[0] in r['Package'])

r.add_package_info(r['Package'])
self.assertTrue(' ' in r['Package']) # appended version number

def test_kernel_crashdump_kdump(self):
'''kernel_crashdump using kdump-tools.'''

timedir = datetime.strftime(datetime.now(), '%Y%m%d%H%M')
vmcore_dir = os.path.join(apport.fileutils.report_dir, timedir)
os.mkdir(vmcore_dir)

dmesgfile = os.path.join(vmcore_dir, 'dmesg.' + timedir)
f = open(dmesgfile, 'wt')
f.write('1' * 100)
f.close()

self.assertEqual(subprocess.call('%s/kernel_crashdump' % datadir), 0,
'kernel_crashdump finished successfully')

reps = apport.fileutils.get_new_reports()
self.assertEqual(len(reps), 1, 'kernel_crashdump created a report')

r = apport.Report()
with open(reps[0], 'rb') as f:
r.load(f)

self.assertEqual(set(r.keys()), set(['Date', 'Package', 'ProblemType',
'VmCoreDmesg', 'Uname',
'Architecture', 'DistroRelease']))
self.assertEqual(r['ProblemType'], 'KernelCrash')
self.assertEqual(r['VmCoreDmesg'], '1' * 100)
self.assertTrue('linux' in r['Package'])

self.assertTrue(os.uname()[2].split('-')[0] in r['Package'])

r.add_package_info(r['Package'])
self.assertTrue(' ' in r['Package']) # appended version number

@classmethod
def _gcc_version_path(klass):
'''Determine a valid version and executable path of gcc and return it
as a tuple.'''

gcc = subprocess.Popen(['gcc', '--version'], stdout=subprocess.PIPE)
out = gcc.communicate()[0].decode()
assert gcc.returncode == 0, '"gcc --version" must work for this test suite'

gcc_ver = '.'.join(out.splitlines()[0].split()[2].split('.')[:2])
gcc_path = '/usr/bin/gcc-' + gcc_ver

gcc = subprocess.Popen([gcc_path, '--version'], stdout=subprocess.PIPE)
gcc.communicate()
assert gcc.returncode == 0, gcc_path + ' must exist and work for this test suite'

return (gcc_ver, gcc_path)

def test_gcc_ide_hook_file(self):
'''gcc_ice_hook with a temporary file.'''

(gcc_version, gcc_path) = self._gcc_version_path()

test_source = tempfile.NamedTemporaryFile()
test_source.write(b'int f(int x);')
test_source.flush()
test_source.seek(0)

self.assertEqual(subprocess.call(['%s/gcc_ice_hook' % datadir, gcc_path, test_source.name]),
0, 'gcc_ice_hook finished successfully')

reps = apport.fileutils.get_new_reports()
self.assertEqual(len(reps), 1, 'gcc_ice_hook created a report')

r = apport.Report()
with open(reps[0], 'rb') as f:
r.load(f)
self.assertEqual(r['ProblemType'], 'Crash')
self.assertEqual(r['ExecutablePath'], gcc_path)
self.assertEqual(r['PreprocessedSource'], test_source.read().decode())

r.add_package_info()

self.assertEqual(r['Package'].split()[0], 'gcc-' + gcc_version)
self.assertTrue(r['SourcePackage'].startswith('gcc'))

def test_gcc_ide_hook_file_binary(self):
'''gcc_ice_hook with a temporary file with binary data.'''

(gcc_version, gcc_path) = self._gcc_version_path()

test_source = tempfile.NamedTemporaryFile()
test_source.write(b'int f(int x); \xFF\xFF')
test_source.flush()
test_source.seek(0)

self.assertEqual(subprocess.call(['%s/gcc_ice_hook' % datadir, gcc_path, test_source.name]),
0, 'gcc_ice_hook finished successfully')

reps = apport.fileutils.get_new_reports()
self.assertEqual(len(reps), 1, 'gcc_ice_hook created a report')

r = apport.Report()
with open(reps[0], 'rb') as f:
r.load(f)
self.assertEqual(r['PreprocessedSource'], test_source.read())

def test_gcc_ide_hook_pipe(self):
'''gcc_ice_hook with piping.'''

(gcc_version, gcc_path) = self._gcc_version_path()

test_source = 'int f(int x);'

hook = subprocess.Popen(['%s/gcc_ice_hook' % datadir, gcc_path, '-'],
stdin=subprocess.PIPE)
hook.communicate(test_source.encode())
self.assertEqual(hook.returncode, 0, 'gcc_ice_hook finished successfully')

reps = apport.fileutils.get_new_reports()
self.assertEqual(len(reps), 1, 'gcc_ice_hook created a report')

r = apport.Report()
with open(reps[0], 'rb') as f:
r.load(f)

self.assertEqual(r['ProblemType'], 'Crash')
self.assertEqual(r['ExecutablePath'], gcc_path)
self.assertEqual(r['PreprocessedSource'], test_source)

r.add_package_info()

self.assertEqual(r['Package'].split()[0], 'gcc-' + gcc_version)
self.assertTrue(r['SourcePackage'].startswith('gcc'))

def test_kernel_oops_hook(self):
test_source = '''------------[ cut here ]------------
kernel BUG at /tmp/oops.c:5!
invalid opcode: 0000 [#1] SMP
Modules linked in: oops cpufreq_stats ext2 i915 drm nf_conntrack_ipv4 ipt_REJECT iptable_filter ip_tables nf_conntrack_ipv6 xt_state nf_conntrack xt_tcpudp ip6t_ipv6header ip6t_REJECT ip6table_filter ip6_tables x_tables ipv6 loop dm_multipath rtc_cmos iTCO_wdt iTCO_vendor_support pcspkr i2c_i801 i2c_core battery video ac output power_supply button sg joydev usb_storage dm_snapshot dm_zero dm_mirror dm_mod ahci pata_acpi ata_generic ata_piix libata sd_mod scsi_mod ext3 jbd mbcache uhci_hcd ohci_hcd ehci_hcd
'''
hook = subprocess.Popen(['%s/kernel_oops' % datadir], stdin=subprocess.PIPE)
hook.communicate(test_source.encode())
self.assertEqual(hook.returncode, 0, 'kernel_oops finished successfully')

reps = apport.fileutils.get_new_reports()
self.assertEqual(len(reps), 1, 'kernel_oops created a report')

r = apport.Report()
with open(reps[0], 'rb') as f:
r.load(f)

self.assertEqual(r['ProblemType'], 'KernelOops')
self.assertEqual(r['OopsText'], test_source)

r.add_package_info(r['Package'])

self.assertTrue(r['Package'].startswith('linux-image-'))
self.assertIn('Uname', r)

unittest.main()