K2LL33D SHELL

 Apache/2.4.7 (Ubuntu)
 Linux sman1baleendah 3.13.0-24-generic #46-Ubuntu SMP Thu Apr 10 19:11:08 UTC 2014 x86_64
 uid=33(www-data) gid=33(www-data) groups=33(www-data)
 safemode : OFF
 MySQL: ON | Perl: ON | cURL: OFF | WGet: ON
  >  / usr / lib / python2.7 / dist-packages / landscape / package /
server ip : 104.21.89.46

your ip : 108.162.216.216

H O M E


Filename/usr/lib/python2.7/dist-packages/landscape/package/reporter.py
Size25.8 kb
Permissionrw-r--r--
Ownerroot : root
Create time27-Apr-2025 09:56
Last modified20-Feb-2014 23:01
Last accessed06-Jul-2025 20:05
Actionsedit | rename | delete | download (gzip)
Viewtext | code | image
import urlparse
import logging
import time
import sys
import os
import glob
import apt_pkg

from twisted.internet.defer import Deferred, succeed

from landscape.lib.sequenceranges import sequence_to_ranges
from landscape.lib.twisted_util import gather_results, spawn_process
from landscape.lib.fetch import fetch_async
from landscape.lib.fs import touch_file
from landscape.lib import bpickle

from landscape.package.taskhandler import (
PackageTaskHandlerConfiguration, PackageTaskHandler, run_task_handler)
from landscape.package.store import UnknownHashIDRequest, FakePackageStore


HASH_ID_REQUEST_TIMEOUT = 7200
MAX_UNKNOWN_HASHES_PER_REQUEST = 500


class PackageReporterConfiguration(PackageTaskHandlerConfiguration):
"""Specialized configuration for the Landscape package-reporter."""

def make_parser(self):
"""
Specialize L{Configuration.make_parser}, adding options
reporter-specific options.
"""
parser = super(PackageReporterConfiguration, self).make_parser()
parser.add_option("--force-apt-update", default=False,
action="store_true",
help="Force running apt-update.")
return parser


class PackageReporter(PackageTaskHandler):
"""Report information about the system packages.

@cvar queue_name: Name of the task queue to pick tasks from.
"""
config_factory = PackageReporterConfiguration

queue_name = "reporter"

apt_update_filename = "/usr/lib/landscape/apt-update"
sources_list_filename = "/etc/apt/sources.list"
sources_list_directory = "/etc/apt/sources.list.d"
_session_id = None
_got_task = False

def run(self):
self._got_task = False

result = Deferred()
# Set us up to communicate properly
result.addCallback(lambda x: self.get_session_id())

result.addCallback(lambda x: self.run_apt_update())

# If the appropriate hash=>id db is not there, fetch it
result.addCallback(lambda x: self.fetch_hash_id_db())

# Attach the hash=>id database if available
result.addCallback(lambda x: self.use_hash_id_db())

# Now, handle any queued tasks.
result.addCallback(lambda x: self.handle_tasks())

# Then, remove any expired hash=>id translation requests.
result.addCallback(lambda x: self.remove_expired_hash_id_requests())

# After that, check if we have any unknown hashes to request.
result.addCallback(lambda x: self.request_unknown_hashes())

# Finally, verify if we have anything new to report to the server.
result.addCallback(lambda x: self.detect_changes())

result.callback(None)
return result

def send_message(self, message):
return self._broker.send_message(
message, self._session_id, True)

def fetch_hash_id_db(self):
"""
Fetch the appropriate pre-canned database of hash=>id mappings
from the server. If the database is already present, it won't
be downloaded twice.

The format of the database filename is <uuid>_<codename>_<arch>,
and it will be downloaded from the HTTP directory set in
config.package_hash_id_url, or config.url/hash-id-databases if
the former is not set.

Fetch failures are handled gracefully and logged as appropriate.
"""

def fetch_it(hash_id_db_filename):

if hash_id_db_filename is None:
# Couldn't determine which hash=>id database to fetch,
# just ignore the failure and go on
return

if os.path.exists(hash_id_db_filename):
# We don't download twice
return

base_url = self._get_hash_id_db_base_url()
if not base_url:
logging.warning("Can't determine the hash=>id database url")
return

# Cast to str as pycurl doesn't like unicode
url = str(base_url + os.path.basename(hash_id_db_filename))

def fetch_ok(data):
hash_id_db_fd = open(hash_id_db_filename, "w")
hash_id_db_fd.write(data)
hash_id_db_fd.close()
logging.info("Downloaded hash=>id database from %s" % url)

def fetch_error(failure):
exception = failure.value
logging.warning("Couldn't download hash=>id database: %s" %
str(exception))

result = fetch_async(url,
cainfo=self._config.get("ssl_public_key"))
result.addCallback(fetch_ok)
result.addErrback(fetch_error)

return result

result = self._determine_hash_id_db_filename()
result.addCallback(fetch_it)
return result

def _get_hash_id_db_base_url(self):

base_url = self._config.get("package_hash_id_url")

if not base_url:

if not self._config.get("url"):
# We really have no idea where to download from
return None

# If config.url is http://host:123/path/to/message-system
# then we'll use http://host:123/path/to/hash-id-databases
base_url = urlparse.urljoin(self._config.url.rstrip("/"),
"hash-id-databases")

return base_url.rstrip("/") + "/"

def _apt_sources_have_changed(self):
"""Return a boolean indicating if the APT sources were modified."""
from landscape.monitor.packagemonitor import PackageMonitor

filenames = []

if os.path.exists(self.sources_list_filename):
filenames.append(self.sources_list_filename)

if os.path.exists(self.sources_list_directory):
filenames.extend(
[os.path.join(self.sources_list_directory, filename) for
filename in os.listdir(self.sources_list_directory)])

for filename in filenames:
seconds_since_last_change = (
time.time() - os.path.getmtime(filename))
if seconds_since_last_change < PackageMonitor.run_interval:
return True

return False

def _apt_update_timeout_expired(self, interval):
"""Check if the apt-update timeout has passed."""
if os.path.exists(self.update_notifier_stamp):
stamp = self.update_notifier_stamp
elif os.path.exists(self._config.update_stamp_filename):
stamp = self._config.update_stamp_filename
else:
return True

last_update = os.stat(stamp).st_mtime
return (last_update + interval) < time.time()

def run_apt_update(self):
"""Run apt-update and log a warning in case of non-zero exit code.

@return: a deferred returning (out, err, code)
"""
if (self._config.force_apt_update or self._apt_sources_have_changed()
or self._apt_update_timeout_expired(
self._config.apt_update_interval)):

result = spawn_process(self.apt_update_filename)

def callback((out, err, code)):
accepted_apt_errors = (
"Problem renaming the file /var/cache/apt/srcpkgcache.bin",
"Problem renaming the file /var/cache/apt/pkgcache.bin")

touch_file(self._config.update_stamp_filename)
logging.debug(
"'%s' exited with status %d (out='%s', err='%s')" % (
self.apt_update_filename, code, out, err))

if code != 0:
logging.warning("'%s' exited with status %d (%s)" % (
self.apt_update_filename, code, err))

# Errors caused by missing cache files are acceptable, as
# they are not an issue for the lists update process.
# These errors can happen if an 'apt-get clean' is run
# while 'apt-get update' is running.
for message in accepted_apt_errors:
if message in err:
out, err, code = "", "", 0
break

elif not self._facade.get_channels():
code = 1
err = ("There are no APT sources configured in %s or %s." %
(self.sources_list_filename,
self.sources_list_directory))

deferred = self._broker.call_if_accepted(
"package-reporter-result", self.send_result, code, err)
deferred.addCallback(lambda ignore: (out, err, code))
return deferred

return result.addCallback(callback)

else:
logging.debug("'%s' didn't run, update interval has not passed" %
self.apt_update_filename)
return succeed(("", "", 0))

def send_result(self, code, err):
"""
Report the package reporter result to the server in a message.
"""
message = {
"type": "package-reporter-result",
"code": code,
"err": err}
return self.send_message(message)

def handle_task(self, task):
message = task.data
if message["type"] == "package-ids":
self._got_task = True
return self._handle_package_ids(message)
if message["type"] == "resynchronize":
self._got_task = True
return self._handle_resynchronize()

def _handle_package_ids(self, message):
unknown_hashes = []

try:
request = self._store.get_hash_id_request(message["request-id"])
except UnknownHashIDRequest:
# We've lost this request somehow. It will be re-requested later.
return succeed(None)

hash_ids = {}

for hash, id in zip(request.hashes, message["ids"]):
if id is None:
unknown_hashes.append(hash)
else:
hash_ids[hash] = id

self._store.set_hash_ids(hash_ids)

logging.info("Received %d package hash => id translations, %d hashes "
"are unknown." % (len(hash_ids), len(unknown_hashes)))

if unknown_hashes:
result = self._handle_unknown_packages(unknown_hashes)
else:
result = succeed(None)

# Remove the request if everything goes well.
result.addCallback(lambda x: request.remove())

return result

def _handle_resynchronize(self):
self._store.clear_available()
self._store.clear_available_upgrades()
self._store.clear_installed()
self._store.clear_locked()

# Don't clear the hash_id_requests table because the messages
# associated with the existing requests might still have to be
# delivered, and if we clear the table and later create a new request,
# that new request could get the same id of one of the deleted ones,
# and when the pending message eventually gets delivered the reporter
# would think that the message is associated to the newly created
# request, as it has the same id has the deleted request the message
# actually refers to. This would cause the ids in the message to be
# possibly mapped to the wrong hashes.
#
# This problem would happen for example when switching the client from
# one Landscape server to another, because the uuid-changed event would
# cause a resynchronize task to be created by the monitor. See #417122.

return succeed(None)

def _handle_unknown_packages(self, hashes):

self._facade.ensure_channels_reloaded()

hashes = set(hashes)
added_hashes = []
packages = []
for package in self._facade.get_packages():
hash = self._facade.get_package_hash(package)
if hash in hashes:
added_hashes.append(hash)
skeleton = self._facade.get_package_skeleton(package)
packages.append({"type": skeleton.type,
"name": skeleton.name,
"version": skeleton.version,
"section": skeleton.section,
"summary": skeleton.summary,
"description": skeleton.description,
"size": skeleton.size,
"installed-size": skeleton.installed_size,
"relations": skeleton.relations})

if packages:
logging.info("Queuing messages with data for %d packages to "
"exchange urgently." % len(packages))

message = {"type": "add-packages", "packages": packages}

result = self._send_message_with_hash_id_request(message,
added_hashes)
else:
result = succeed(None)

return result

def remove_expired_hash_id_requests(self):
now = time.time()
timeout = now - HASH_ID_REQUEST_TIMEOUT

def update_or_remove(is_pending, request):
if is_pending:
# Request is still in the queue. Update the timestamp.
request.timestamp = now
elif request.timestamp < timeout:
# Request was delivered, and is older than the threshold.
request.remove()

results = []
for request in self._store.iter_hash_id_requests():
if request.message_id is None:
# May happen in some rare cases, when a send_message() is
# interrupted abruptly. If it just fails normally, the
# request is removed and so we don't get here.
request.remove()
else:
result = self._broker.is_message_pending(request.message_id)
result.addCallback(update_or_remove, request)
results.append(result)

return gather_results(results)

def request_unknown_hashes(self):
"""Detect available packages for which we have no hash=>id mappings.

This method will verify if there are packages that APT knows
about but for which we don't have an id yet (no hash => id
translation), and deliver a message (unknown-package-hashes)
to request them.

Hashes previously requested won't be requested again, unless they
have already expired and removed from the database.
"""
self._facade.ensure_channels_reloaded()

unknown_hashes = set()

for package in self._facade.get_packages():
hash = self._facade.get_package_hash(package)
if self._store.get_hash_id(hash) is None:
unknown_hashes.add(self._facade.get_package_hash(package))

# Discard unknown hashes in existent requests.
for request in self._store.iter_hash_id_requests():
unknown_hashes -= set(request.hashes)

if not unknown_hashes:
result = succeed(None)
else:
unknown_hashes = sorted(unknown_hashes)
unknown_hashes = unknown_hashes[:MAX_UNKNOWN_HASHES_PER_REQUEST]

logging.info("Queuing request for package hash => id "
"translation on %d hash(es)." % len(unknown_hashes))

message = {"type": "unknown-package-hashes",
"hashes": unknown_hashes}

result = self._send_message_with_hash_id_request(message,
unknown_hashes)

return result

def _send_message_with_hash_id_request(self, message, unknown_hashes):
"""Create a hash_id_request and send message with "request-id"."""
request = self._store.add_hash_id_request(unknown_hashes)
message["request-id"] = request.id
result = self.send_message(message)

def set_message_id(message_id):
request.message_id = message_id

def send_message_failed(failure):
request.remove()
return failure

return result.addCallbacks(set_message_id, send_message_failed)

def detect_changes(self):
"""Detect all changes concerning packages.

If some changes were detected with respect to our last run, then an
event of type 'package-data-changed' will be fired in the broker
reactor.
"""

def changes_detected(result):
if result:
# Something has changed, notify the broker.
return self._broker.fire_event("package-data-changed")

deferred = self.detect_packages_changes()
return deferred.addCallback(changes_detected)

def detect_packages_changes(self):
"""
Check if any information regarding packages have changed, and if so
compute the changes and send a signal.
"""
if self._got_task or self._package_state_has_changed():
return self._compute_packages_changes()
else:
return succeed(None)

def _package_state_has_changed(self):
"""
Detect changes in the universe of known packages.

This uses the state of packages in /var/lib/dpkg/state and other files
and simply checks whether they have changed using their "last changed"
timestamp on the filesystem.

@return True if the status changed, False otherwise.
"""
stamp_file = self._config.detect_package_changes_stamp
if not os.path.exists(stamp_file):
return True

status_file = apt_pkg.config.find_file("dir::state::status")
lists_dir = apt_pkg.config.find_dir("dir::state::lists")
files = [status_file, lists_dir]
files.extend(glob.glob("%s/*Packages" % lists_dir))

last_checked = os.stat(stamp_file).st_mtime
for f in files:
last_changed = os.stat(f).st_mtime
if last_changed >= last_checked:
return True
return False

def _compute_packages_changes(self):
"""Analyse changes in the universe of known packages.

This method will verify if there are packages that:

- are now installed, and were not;
- are now available, and were not;
- are now locked, and were not;
- were previously available but are not anymore;
- were previously installed but are not anymore;
- were previously locked but are not anymore;

Additionally it will report package locks that:

- are now set, and were not;
- were previously set but are not anymore;

In all cases, the server is notified of the new situation
with a "packages" message.

@return: A deferred resulting in C{True} if package changes were
detected with respect to the previous run, or C{False} otherwise.
"""
self._facade.ensure_channels_reloaded()

old_installed = set(self._store.get_installed())
old_available = set(self._store.get_available())
old_upgrades = set(self._store.get_available_upgrades())
old_locked = set(self._store.get_locked())

current_installed = set()
current_available = set()
current_upgrades = set()
current_locked = set()

for package in self._facade.get_packages():
hash = self._facade.get_package_hash(package)
id = self._store.get_hash_id(hash)
if id is not None:
if self._facade.is_package_installed(package):
current_installed.add(id)
if self._facade.is_package_available(package):
current_available.add(id)
else:
current_available.add(id)

# Are there any packages that this package is an upgrade for?
if self._facade.is_package_upgrade(package):
current_upgrades.add(id)

for package in self._facade.get_locked_packages():
hash = self._facade.get_package_hash(package)
id = self._store.get_hash_id(hash)
if id is not None:
current_locked.add(id)

new_installed = current_installed - old_installed
new_available = current_available - old_available
new_upgrades = current_upgrades - old_upgrades
new_locked = current_locked - old_locked

not_installed = old_installed - current_installed
not_available = old_available - current_available
not_upgrades = old_upgrades - current_upgrades
not_locked = old_locked - current_locked

message = {}
if new_installed:
message["installed"] = \
list(sequence_to_ranges(sorted(new_installed)))
if new_available:
message["available"] = \
list(sequence_to_ranges(sorted(new_available)))
if new_upgrades:
message["available-upgrades"] = \
list(sequence_to_ranges(sorted(new_upgrades)))
if new_locked:
message["locked"] = \
list(sequence_to_ranges(sorted(new_locked)))

if not_installed:
message["not-installed"] = \
list(sequence_to_ranges(sorted(not_installed)))
if not_available:
message["not-available"] = \
list(sequence_to_ranges(sorted(not_available)))
if not_upgrades:
message["not-available-upgrades"] = \
list(sequence_to_ranges(sorted(not_upgrades)))
if not_locked:
message["not-locked"] = \
list(sequence_to_ranges(sorted(not_locked)))

if not message:
return succeed(False)

message["type"] = "packages"
result = self.send_message(message)

logging.info("Queuing message with changes in known packages: "
"%d installed, %d available, %d available upgrades, "
"%d locked, %d not installed, %d not available, "
"%d not available upgrades, %d not locked."
% (len(new_installed), len(new_available),
len(new_upgrades), len(new_locked),
len(not_installed), len(not_available),
len(not_upgrades), len(not_locked)))

def update_currently_known(result):
if new_installed:
self._store.add_installed(new_installed)
if not_installed:
self._store.remove_installed(not_installed)
if new_available:
self._store.add_available(new_available)
if new_locked:
self._store.add_locked(new_locked)
if not_available:
self._store.remove_available(not_available)
if new_upgrades:
self._store.add_available_upgrades(new_upgrades)
if not_upgrades:
self._store.remove_available_upgrades(not_upgrades)
if not_locked:
self._store.remove_locked(not_locked)
# Something has changed wrt the former run, let's update the
# timestamp and return True.
stamp_file = self._config.detect_package_changes_stamp
touch_file(stamp_file)
return True

result.addCallback(update_currently_known)

return result


class FakeGlobalReporter(PackageReporter):
"""
A standard reporter, which additionally stores messages sent into its
package store.
"""

package_store_class = FakePackageStore

def send_message(self, message):
self._store.save_message(message)
return super(FakeGlobalReporter, self).send_message(message)


class FakeReporter(PackageReporter):
"""
A fake reporter which only sends messages previously stored by a
L{FakeGlobalReporter}.
"""

package_store_class = FakePackageStore
global_store_filename = None

def run(self):
result = succeed(None)

result.addCallback(lambda x: self.get_session_id())

# If the appropriate hash=>id db is not there, fetch it
result.addCallback(lambda x: self.fetch_hash_id_db())

result.addCallback(lambda x: self._store.clear_tasks())

# Finally, verify if we have anything new to send to the server.
result.addCallback(lambda x: self.send_pending_messages())

return result

def send_pending_messages(self):
"""
As the last callback of L{PackageReporter}, sends messages stored.
"""
if self.global_store_filename is None:
self.global_store_filename = os.environ["FAKE_PACKAGE_STORE"]
if not os.path.exists(self.global_store_filename):
return succeed(None)
message_sent = set(self._store.get_message_ids())
global_store = FakePackageStore(self.global_store_filename)
all_message_ids = set(global_store.get_message_ids())
not_sent = all_message_ids - message_sent
deferred = succeed(None)
got_type = set()
if not_sent:
messages = global_store.get_messages_by_ids(not_sent)
sent = []
for message_id, message in messages:
message = bpickle.loads(str(message))
if message["type"] not in got_type:
got_type.add(message["type"])
sent.append(message_id)
deferred.addCallback(
lambda x, message=message: self.send_message(message))
self._store.save_message_ids(sent)
return deferred


def main(args):
if "FAKE_GLOBAL_PACKAGE_STORE" in os.environ:
return run_task_handler(FakeGlobalReporter, args)
elif "FAKE_PACKAGE_STORE" in os.environ:
return run_task_handler(FakeReporter, args)
else:
return run_task_handler(PackageReporter, args)


def find_reporter_command():
dirname = os.path.dirname(os.path.abspath(sys.argv[0]))
return os.path.join(dirname, "landscape-package-reporter")