Apache/2.4.7 (Ubuntu) Linux sman1baleendah 3.13.0-24-generic #46-Ubuntu SMP Thu Apr 10 19:11:08 UTC 2014 x86_64 uid=33(www-data) gid=33(www-data) groups=33(www-data) safemode : OFF MySQL: ON | Perl: ON | cURL: OFF | WGet: ON > / usr / lib / python3 / dist-packages / UpdateManager / Core / | server ip : 104.21.89.46 your ip : 172.69.6.191 H O M E |
Filename | /usr/lib/python3/dist-packages/UpdateManager/Core/MyCache.py |
Size | 14.57 kb |
Permission | rw-r--r-- |
Owner | root : root |
Create time | 27-Apr-2025 09:55 |
Last modified | 10-Sep-2013 16:58 |
Last accessed | 07-Jul-2025 06:57 |
Actions | edit | rename | delete | download (gzip) |
View | text | code | image |
# MyCache.py
# -*- Mode: Python; indent-tabs-mode: nil; tab-width: 4; coding: utf-8 -*-
#
# Copyright (c) 2004-2008 Canonical
#
# Author: Michael Vogt <[email protected]>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
# USA
from __future__ import absolute_import, print_function
import warnings
warnings.filterwarnings("ignore", "apt API not stable yet", FutureWarning)
import apt
import apt_pkg
import logging
import os
try:
from urllib.error import HTTPError
from urllib.request import urlopen
from urllib.parse import urlsplit
except ImportError:
from urllib2 import HTTPError, urlopen
from urlparse import urlsplit
try:
from http.client import BadStatusLine
except ImportError:
from httplib import BadStatusLine
import socket
import re
import DistUpgrade.DistUpgradeCache
from gettext import gettext as _
SYNAPTIC_PINFILE = "/var/lib/synaptic/preferences"
CHANGELOGS_POOL = "http://changelogs.ubuntu.com/changelogs/pool/"
CHANGELOGS_URI = CHANGELOGS_POOL + "%s/%s/%s/%s_%s/%s"
class HttpsChangelogsUnsupportedError(Exception):
""" https changelogs with credentials are unsupported because of the
lack of certitifcation validation in urllib2 which allows MITM
attacks to steal the credentials
"""
pass
class MyCache(DistUpgrade.DistUpgradeCache.MyCache):
CHANGELOG_ORIGIN = "Ubuntu"
def __init__(self, progress, rootdir=None):
apt.Cache.__init__(self, progress, rootdir)
# raise if we have packages in reqreinst state
# and let the caller deal with that (runs partial upgrade)
assert len(self.req_reinstall_pkgs) == 0
# check if the dpkg journal is ok (we need to do that here
# too because libapt will only do it when it tries to lock
# the packaging system)
assert(not self._dpkgJournalDirty())
# init the regular cache
self._initDepCache()
self.all_changes = {}
self.all_news = {}
# on broken packages, try to fix via saveDistUpgrade()
if self._depcache.broken_count > 0:
self.saveDistUpgrade()
assert (self._depcache.broken_count == 0 and
self._depcache.del_count == 0)
def _dpkgJournalDirty(self):
"""
test if the dpkg journal is dirty
(similar to debSystem::CheckUpdates)
"""
d = os.path.dirname(
apt_pkg.config.find_file("Dir::State::status")) + "/updates"
for f in os.listdir(d):
if re.match("[0-9]+", f):
return True
return False
def _initDepCache(self):
#apt_pkg.config.set("Debug::pkgPolicy","1")
#self.depcache = apt_pkg.GetDepCache(self.cache)
#self._depcache = apt_pkg.GetDepCache(self._cache)
self._depcache.read_pinfile()
if os.path.exists(SYNAPTIC_PINFILE):
self._depcache.read_pinfile(SYNAPTIC_PINFILE)
self._depcache.init()
def clear(self):
self._initDepCache()
@property
def required_download(self):
""" get the size of the packages that are required to download """
pm = apt_pkg.PackageManager(self._depcache)
fetcher = apt_pkg.Acquire()
pm.get_archives(fetcher, self._list, self._records)
return fetcher.fetch_needed
@property
def install_count(self):
return self._depcache.inst_count
def keep_count(self):
return self._depcache.keep_count
def _check_dependencies(self, target, deps):
"""Return True if any of the dependencies in deps match target."""
# TODO: handle virtual packages
for dep_or in deps:
if not dep_or:
continue
match = True
for base_dep in dep_or:
if (base_dep.name != target.package.shortname or
not apt_pkg.check_dep(
target.version, base_dep.relation, base_dep.version)):
match = False
if match:
return True
return False
def find_removal_justification(self, pkg):
target = pkg.installed
if not target:
return False
for cpkg in self:
candidate = cpkg.candidate
if candidate is not None:
if (self._check_dependencies(
target, candidate.get_dependencies("Conflicts")) and
self._check_dependencies(
target, candidate.get_dependencies("Replaces"))):
logging.info(
"%s Conflicts/Replaces %s; allowing removal" % (
candidate.package.shortname, pkg.shortname))
return True
return False
def saveDistUpgrade(self):
""" this functions mimics a upgrade but will never remove anything """
#self._apply_dselect_upgrade()
self._depcache.upgrade(True)
wouldDelete = self._depcache.del_count
if wouldDelete > 0:
deleted_pkgs = [pkg for pkg in self if pkg.marked_delete]
assert wouldDelete == len(deleted_pkgs)
for pkg in deleted_pkgs:
if self.find_removal_justification(pkg):
wouldDelete -= 1
if wouldDelete > 0:
self.clear()
assert (self._depcache.broken_count == 0 and
self._depcache.del_count == 0)
else:
assert self._depcache.broken_count == 0
#self._apply_dselect_upgrade()
self._depcache.upgrade()
return wouldDelete
def _strip_epoch(self, verstr):
" strip of the epoch "
l = verstr.split(":")
if len(l) > 1:
verstr = "".join(l[1:])
return verstr
def _get_changelog_or_news(self, name, fname, strict_versioning=False,
changelogs_uri=None):
" helper that fetches the file in question "
# don't touch the gui in this function, it needs to be thread-safe
pkg = self[name]
# get the src package name
srcpkg = pkg.candidate.source_name
# assume "main" section
src_section = "main"
# use the section of the candidate as a starting point
section = pkg._pcache._depcache.get_candidate_ver(pkg._pkg).section
# get the source version, start with the binaries version
srcver_epoch = pkg.candidate.version
srcver = self._strip_epoch(srcver_epoch)
#print("bin: %s" % binver)
l = section.split("/")
if len(l) > 1:
src_section = l[0]
# lib is handled special
prefix = srcpkg[0]
if srcpkg.startswith("lib"):
prefix = "lib" + srcpkg[3]
# the changelogs_uri argument overrides the default changelogs_uri,
# this is useful for e.g. PPAs where we construct the changelogs
# path differently
if changelogs_uri:
uri = changelogs_uri
else:
uri = CHANGELOGS_URI % (src_section, prefix, srcpkg, srcpkg,
srcver, fname)
# https uris are not supported when they contain a username/password
# because the urllib2 https implementation will not check certificates
# and so its possible to do a man-in-the-middle attack to steal the
# credentials
res = urlsplit(uri)
if res.scheme == "https" and res.username != "":
raise HttpsChangelogsUnsupportedError(
"https locations with username/password are not"
"supported to fetch changelogs")
# print("Trying: %s " % uri)
changelog = urlopen(uri)
#print(changelog.read())
# do only get the lines that are new
alllines = ""
regexp = "^%s \((.*)\)(.*)$" % (re.escape(srcpkg))
while True:
line = changelog.readline().decode("UTF-8", "replace")
if line == "":
break
match = re.match(regexp, line)
if match:
# strip epoch from installed version
# and from changelog too
installed = getattr(pkg.installed, "version", None)
if installed and ":" in installed:
installed = installed.split(":", 1)[1]
changelogver = match.group(1)
if changelogver and ":" in changelogver:
changelogver = changelogver.split(":", 1)[1]
# we test for "==" here for changelogs
# to ensure that the version
# is actually really in the changelog - if not
# just display it all, this catches cases like:
# gcc-defaults with "binver=4.3.1" and srcver=1.76
#
# for NEWS.Debian we do require the changelogver > installed
if strict_versioning:
if (installed and
apt_pkg.version_compare(changelogver,
installed) < 0):
break
else:
if (installed and
apt_pkg.version_compare(changelogver,
installed) == 0):
break
alllines = alllines + line
return alllines
def _guess_third_party_changelogs_uri_by_source(self, name):
pkg = self[name]
deb_uri = pkg.candidate.uri
if deb_uri is None:
return None
srcrec = pkg.candidate.record.get("Source")
if not srcrec:
return None
# srcpkg can be "apt" or "gcc-default (1.0)"
srcpkg = srcrec.split("(")[0].strip()
if "(" in srcrec:
srcver = srcrec.split("(")[1].rstrip(")")
else:
srcver = pkg.candidate.version
base_uri = deb_uri.rpartition("/")[0]
return base_uri + "/%s_%s.changelog" % (srcpkg, srcver)
def _guess_third_party_changelogs_uri_by_binary(self, name):
""" guess changelogs uri based on ArchiveURI by replacing .deb
with .changelog
"""
# there is always a pkg and a pkg.candidate, no need to add
# check here
pkg = self[name]
deb_uri = pkg.candidate.uri
if deb_uri:
return "%s.changelog" % deb_uri.rsplit(".", 1)[0]
return None
def get_news_and_changelog(self, name, lock):
self.get_news(name)
self.get_changelog(name)
try:
lock.release()
except:
pass
def get_news(self, name):
" get the NEWS.Debian file from the changelogs location "
try:
news = self._get_changelog_or_news(name, "NEWS.Debian", True)
except Exception:
return
if news:
self.all_news[name] = news
def _fetch_changelog_for_third_party_package(self, name):
# Try non official changelog location
changelogs_uri_binary = \
self._guess_third_party_changelogs_uri_by_binary(name)
changelogs_uri_source = \
self._guess_third_party_changelogs_uri_by_source(name)
error_message = ""
for changelogs_uri in [changelogs_uri_binary, changelogs_uri_source]:
if changelogs_uri:
try:
changelog = self._get_changelog_or_news(
name, "changelog", False, changelogs_uri)
self.all_changes[name] += changelog
except (HTTPError, HttpsChangelogsUnsupportedError):
# no changelogs_uri or 404
error_message = _(
"This update does not come from a "
"source that supports changelogs.")
except (IOError, BadStatusLine, socket.error):
# network errors and others
logging.exception("error on changelog fetching")
error_message = _(
"Failed to download the list of changes. \n"
"Please check your Internet connection.")
self.all_changes[name] += error_message
def get_changelog(self, name):
" get the changelog file from the changelog location "
origins = self[name].candidate.origins
self.all_changes[name] = _("Changes for %s versions:\n"
"Installed version: %s\n"
"Available version: %s\n\n") % \
(name, getattr(self[name].installed, "version", None),
self[name].candidate.version)
if not self.CHANGELOG_ORIGIN in [o.origin for o in origins]:
self._fetch_changelog_for_third_party_package(name)
return
# fixup epoch handling version
srcpkg = self[name].candidate.source_name
srcver_epoch = self[name].candidate.version.replace(':', '%3A')
try:
changelog = self._get_changelog_or_news(name, "changelog")
if len(changelog) == 0:
changelog = _("The changelog does not contain any relevant "
"changes.\n\n"
"Please use http://launchpad.net/ubuntu/+source/"
"%s/%s/+changelog\n"
"until the changes become available or try "
"again later.") % (srcpkg, srcver_epoch)
except HTTPError as e:
changelog = _("The list of changes is not available yet.\n\n"
"Please use http://launchpad.net/ubuntu/+source/"
"%s/%s/+changelog\n"
"until the changes become available or try again "
"later.") % (srcpkg, srcver_epoch)
except (IOError, BadStatusLine, socket.error) as e:
print("caught exception: ", e)
changelog = _("Failed to download the list "
"of changes. \nPlease "
"check your Internet "
"connection.")
self.all_changes[name] += changelog
# -*- Mode: Python; indent-tabs-mode: nil; tab-width: 4; coding: utf-8 -*-
#
# Copyright (c) 2004-2008 Canonical
#
# Author: Michael Vogt <[email protected]>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
# USA
from __future__ import absolute_import, print_function
import warnings
warnings.filterwarnings("ignore", "apt API not stable yet", FutureWarning)
import apt
import apt_pkg
import logging
import os
try:
from urllib.error import HTTPError
from urllib.request import urlopen
from urllib.parse import urlsplit
except ImportError:
from urllib2 import HTTPError, urlopen
from urlparse import urlsplit
try:
from http.client import BadStatusLine
except ImportError:
from httplib import BadStatusLine
import socket
import re
import DistUpgrade.DistUpgradeCache
from gettext import gettext as _
SYNAPTIC_PINFILE = "/var/lib/synaptic/preferences"
CHANGELOGS_POOL = "http://changelogs.ubuntu.com/changelogs/pool/"
CHANGELOGS_URI = CHANGELOGS_POOL + "%s/%s/%s/%s_%s/%s"
class HttpsChangelogsUnsupportedError(Exception):
""" https changelogs with credentials are unsupported because of the
lack of certitifcation validation in urllib2 which allows MITM
attacks to steal the credentials
"""
pass
class MyCache(DistUpgrade.DistUpgradeCache.MyCache):
CHANGELOG_ORIGIN = "Ubuntu"
def __init__(self, progress, rootdir=None):
apt.Cache.__init__(self, progress, rootdir)
# raise if we have packages in reqreinst state
# and let the caller deal with that (runs partial upgrade)
assert len(self.req_reinstall_pkgs) == 0
# check if the dpkg journal is ok (we need to do that here
# too because libapt will only do it when it tries to lock
# the packaging system)
assert(not self._dpkgJournalDirty())
# init the regular cache
self._initDepCache()
self.all_changes = {}
self.all_news = {}
# on broken packages, try to fix via saveDistUpgrade()
if self._depcache.broken_count > 0:
self.saveDistUpgrade()
assert (self._depcache.broken_count == 0 and
self._depcache.del_count == 0)
def _dpkgJournalDirty(self):
"""
test if the dpkg journal is dirty
(similar to debSystem::CheckUpdates)
"""
d = os.path.dirname(
apt_pkg.config.find_file("Dir::State::status")) + "/updates"
for f in os.listdir(d):
if re.match("[0-9]+", f):
return True
return False
def _initDepCache(self):
#apt_pkg.config.set("Debug::pkgPolicy","1")
#self.depcache = apt_pkg.GetDepCache(self.cache)
#self._depcache = apt_pkg.GetDepCache(self._cache)
self._depcache.read_pinfile()
if os.path.exists(SYNAPTIC_PINFILE):
self._depcache.read_pinfile(SYNAPTIC_PINFILE)
self._depcache.init()
def clear(self):
self._initDepCache()
@property
def required_download(self):
""" get the size of the packages that are required to download """
pm = apt_pkg.PackageManager(self._depcache)
fetcher = apt_pkg.Acquire()
pm.get_archives(fetcher, self._list, self._records)
return fetcher.fetch_needed
@property
def install_count(self):
return self._depcache.inst_count
def keep_count(self):
return self._depcache.keep_count
def _check_dependencies(self, target, deps):
"""Return True if any of the dependencies in deps match target."""
# TODO: handle virtual packages
for dep_or in deps:
if not dep_or:
continue
match = True
for base_dep in dep_or:
if (base_dep.name != target.package.shortname or
not apt_pkg.check_dep(
target.version, base_dep.relation, base_dep.version)):
match = False
if match:
return True
return False
def find_removal_justification(self, pkg):
target = pkg.installed
if not target:
return False
for cpkg in self:
candidate = cpkg.candidate
if candidate is not None:
if (self._check_dependencies(
target, candidate.get_dependencies("Conflicts")) and
self._check_dependencies(
target, candidate.get_dependencies("Replaces"))):
logging.info(
"%s Conflicts/Replaces %s; allowing removal" % (
candidate.package.shortname, pkg.shortname))
return True
return False
def saveDistUpgrade(self):
""" this functions mimics a upgrade but will never remove anything """
#self._apply_dselect_upgrade()
self._depcache.upgrade(True)
wouldDelete = self._depcache.del_count
if wouldDelete > 0:
deleted_pkgs = [pkg for pkg in self if pkg.marked_delete]
assert wouldDelete == len(deleted_pkgs)
for pkg in deleted_pkgs:
if self.find_removal_justification(pkg):
wouldDelete -= 1
if wouldDelete > 0:
self.clear()
assert (self._depcache.broken_count == 0 and
self._depcache.del_count == 0)
else:
assert self._depcache.broken_count == 0
#self._apply_dselect_upgrade()
self._depcache.upgrade()
return wouldDelete
def _strip_epoch(self, verstr):
" strip of the epoch "
l = verstr.split(":")
if len(l) > 1:
verstr = "".join(l[1:])
return verstr
def _get_changelog_or_news(self, name, fname, strict_versioning=False,
changelogs_uri=None):
" helper that fetches the file in question "
# don't touch the gui in this function, it needs to be thread-safe
pkg = self[name]
# get the src package name
srcpkg = pkg.candidate.source_name
# assume "main" section
src_section = "main"
# use the section of the candidate as a starting point
section = pkg._pcache._depcache.get_candidate_ver(pkg._pkg).section
# get the source version, start with the binaries version
srcver_epoch = pkg.candidate.version
srcver = self._strip_epoch(srcver_epoch)
#print("bin: %s" % binver)
l = section.split("/")
if len(l) > 1:
src_section = l[0]
# lib is handled special
prefix = srcpkg[0]
if srcpkg.startswith("lib"):
prefix = "lib" + srcpkg[3]
# the changelogs_uri argument overrides the default changelogs_uri,
# this is useful for e.g. PPAs where we construct the changelogs
# path differently
if changelogs_uri:
uri = changelogs_uri
else:
uri = CHANGELOGS_URI % (src_section, prefix, srcpkg, srcpkg,
srcver, fname)
# https uris are not supported when they contain a username/password
# because the urllib2 https implementation will not check certificates
# and so its possible to do a man-in-the-middle attack to steal the
# credentials
res = urlsplit(uri)
if res.scheme == "https" and res.username != "":
raise HttpsChangelogsUnsupportedError(
"https locations with username/password are not"
"supported to fetch changelogs")
# print("Trying: %s " % uri)
changelog = urlopen(uri)
#print(changelog.read())
# do only get the lines that are new
alllines = ""
regexp = "^%s \((.*)\)(.*)$" % (re.escape(srcpkg))
while True:
line = changelog.readline().decode("UTF-8", "replace")
if line == "":
break
match = re.match(regexp, line)
if match:
# strip epoch from installed version
# and from changelog too
installed = getattr(pkg.installed, "version", None)
if installed and ":" in installed:
installed = installed.split(":", 1)[1]
changelogver = match.group(1)
if changelogver and ":" in changelogver:
changelogver = changelogver.split(":", 1)[1]
# we test for "==" here for changelogs
# to ensure that the version
# is actually really in the changelog - if not
# just display it all, this catches cases like:
# gcc-defaults with "binver=4.3.1" and srcver=1.76
#
# for NEWS.Debian we do require the changelogver > installed
if strict_versioning:
if (installed and
apt_pkg.version_compare(changelogver,
installed) < 0):
break
else:
if (installed and
apt_pkg.version_compare(changelogver,
installed) == 0):
break
alllines = alllines + line
return alllines
def _guess_third_party_changelogs_uri_by_source(self, name):
pkg = self[name]
deb_uri = pkg.candidate.uri
if deb_uri is None:
return None
srcrec = pkg.candidate.record.get("Source")
if not srcrec:
return None
# srcpkg can be "apt" or "gcc-default (1.0)"
srcpkg = srcrec.split("(")[0].strip()
if "(" in srcrec:
srcver = srcrec.split("(")[1].rstrip(")")
else:
srcver = pkg.candidate.version
base_uri = deb_uri.rpartition("/")[0]
return base_uri + "/%s_%s.changelog" % (srcpkg, srcver)
def _guess_third_party_changelogs_uri_by_binary(self, name):
""" guess changelogs uri based on ArchiveURI by replacing .deb
with .changelog
"""
# there is always a pkg and a pkg.candidate, no need to add
# check here
pkg = self[name]
deb_uri = pkg.candidate.uri
if deb_uri:
return "%s.changelog" % deb_uri.rsplit(".", 1)[0]
return None
def get_news_and_changelog(self, name, lock):
self.get_news(name)
self.get_changelog(name)
try:
lock.release()
except:
pass
def get_news(self, name):
" get the NEWS.Debian file from the changelogs location "
try:
news = self._get_changelog_or_news(name, "NEWS.Debian", True)
except Exception:
return
if news:
self.all_news[name] = news
def _fetch_changelog_for_third_party_package(self, name):
# Try non official changelog location
changelogs_uri_binary = \
self._guess_third_party_changelogs_uri_by_binary(name)
changelogs_uri_source = \
self._guess_third_party_changelogs_uri_by_source(name)
error_message = ""
for changelogs_uri in [changelogs_uri_binary, changelogs_uri_source]:
if changelogs_uri:
try:
changelog = self._get_changelog_or_news(
name, "changelog", False, changelogs_uri)
self.all_changes[name] += changelog
except (HTTPError, HttpsChangelogsUnsupportedError):
# no changelogs_uri or 404
error_message = _(
"This update does not come from a "
"source that supports changelogs.")
except (IOError, BadStatusLine, socket.error):
# network errors and others
logging.exception("error on changelog fetching")
error_message = _(
"Failed to download the list of changes. \n"
"Please check your Internet connection.")
self.all_changes[name] += error_message
def get_changelog(self, name):
" get the changelog file from the changelog location "
origins = self[name].candidate.origins
self.all_changes[name] = _("Changes for %s versions:\n"
"Installed version: %s\n"
"Available version: %s\n\n") % \
(name, getattr(self[name].installed, "version", None),
self[name].candidate.version)
if not self.CHANGELOG_ORIGIN in [o.origin for o in origins]:
self._fetch_changelog_for_third_party_package(name)
return
# fixup epoch handling version
srcpkg = self[name].candidate.source_name
srcver_epoch = self[name].candidate.version.replace(':', '%3A')
try:
changelog = self._get_changelog_or_news(name, "changelog")
if len(changelog) == 0:
changelog = _("The changelog does not contain any relevant "
"changes.\n\n"
"Please use http://launchpad.net/ubuntu/+source/"
"%s/%s/+changelog\n"
"until the changes become available or try "
"again later.") % (srcpkg, srcver_epoch)
except HTTPError as e:
changelog = _("The list of changes is not available yet.\n\n"
"Please use http://launchpad.net/ubuntu/+source/"
"%s/%s/+changelog\n"
"until the changes become available or try again "
"later.") % (srcpkg, srcver_epoch)
except (IOError, BadStatusLine, socket.error) as e:
print("caught exception: ", e)
changelog = _("Failed to download the list "
"of changes. \nPlease "
"check your Internet "
"connection.")
self.all_changes[name] += changelog