K2LL33D SHELL

 Apache/2.4.7 (Ubuntu)
 Linux sman1baleendah 3.13.0-24-generic #46-Ubuntu SMP Thu Apr 10 19:11:08 UTC 2014 x86_64
 uid=33(www-data) gid=33(www-data) groups=33(www-data)
 safemode : OFF
 MySQL: ON | Perl: ON | cURL: OFF | WGet: ON
  >  / usr / bin /
server ip : 172.67.156.115

your ip : 108.162.242.39

H O M E


Filename/usr/bin/axi-cache
Size32.72 kb
Permissionrwxr-xr-x
Ownerroot : root
Create time27-Apr-2025 09:53
Last modified23-Feb-2014 22:30
Last accessed05-Jul-2025 16:10
Actionsedit | rename | delete | download (gzip)
Viewtext | code | image
#! /usr/bin/python
# coding: utf-8

#
# axi-cache - Query apt-xapian-index database
#
# Copyright (C) 2010 Enrico Zini <[email protected]>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#

# TODO:
# - save NAME save the query to be recalled later with @NAME (or if notmuch
# has a syntax for saved queries, recycle it)

from optparse import OptionParser
from cStringIO import StringIO
import sys
import os, os.path
import axi

VERSION="0.45"

# Setup configuration
DEBTAGS_VOCABULARY = "/var/lib/debtags/vocabulary"

#XDG_CONFIG_HOME = os.environ.get("XDG_CONFIG_HOME", os.expanduser("~/.config"))
XDG_CACHE_HOME = os.environ.get("XDG_CACHE_HOME", os.path.expanduser("~/.cache"))
CACHEFILE = os.path.join(XDG_CACHE_HOME, "axi-cache.state")

# Activate support for the CJK tokenizer
os.environ["XAPIAN_CJK_NGRAM"] = "1"

try:
from ConfigParser import RawConfigParser
import re
import math
import xapian
import apt
try:
from debian import deb822
except ImportError:
from debian_bundle import deb822
helponly = False
except ImportError, e:
print >>sys.stderr, "%s: only help functions are implemented, for the sake of help2man" % str(e)
helponly = True

if not helponly:
class SavedStateError(Exception):
pass

class AptSilentProgress(apt.progress.text.OpProgress) :
"Quiet progress so we don't get cache loading messages from Apt"
def __init__(self):
pass
def done(self):
pass
def update(self, percent=None):
pass

def readVocabulary():
try:
fin = open(DEBTAGS_VOCABULARY)
except Exception, e:
# Only show this when being verbose
print >>sys.stderr, "Cannot read %s: %s. Please install `debtags' t" % (DEBTAGS_VOCABULARY, str(e))
return None, None
facets = dict()
tags = dict()
for entry in deb822.Deb822.iter_paragraphs(fin):
if "Facet" in entry:
facets[entry["Facet"]] = entry
elif "Tag" in entry:
tags[entry["Tag"]] = entry
return facets, tags


class DB(object):
class BasicFilter(xapian.ExpandDecider):
def __init__(self, stemmer=None, exclude=None, prefix=None):
super(DB.BasicFilter, self).__init__()
self.stem = stemmer if stemmer else lambda x:x
self.exclude = set([self.stem(x) for x in exclude]) if exclude else set()
self.prefix = prefix
def __call__(self, term):
if len(term) < 4: return False
if self.prefix is not None:
# Skip leading uppercase chars
t = term
while t and t[0].isupper():
t = t[1:]
if not t.startswith(self.prefix):
return False
if self.stem(term) in self.exclude: return False
if term.startswith("XT") or term.startswith("XS"): return True
return term[0].islower()

class TermFilter(BasicFilter):
def __call__(self, term):
if len(term) < 4: return False
if self.stem(term) in self.exclude: return False
return term[0].islower()

class TagFilter(xapian.ExpandDecider):
def __call__(self, term):
return term.startswith("XT")

def __init__(self):
# Access the Xapian index
self.db = xapian.Database(axi.XAPIANINDEX)

self.stem = xapian.Stem("english")

# Build query parser
self.qp = xapian.QueryParser()
self.qp.set_default_op(xapian.Query.OP_AND)
self.qp.set_database(self.db)
self.qp.set_stemmer(self.stem)
self.qp.set_stemming_strategy(xapian.QueryParser.STEM_SOME)
self.qp.add_prefix("pkg", "XP")
self.qp.add_boolean_prefix("tag", "XT")
self.qp.add_boolean_prefix("sec", "XS")

#notmuch->value_range_processor = new Xapian::NumberValueRangeProcessor (NOTMUCH_VALUE_TIMESTAMP);
#notmuch->query_parser->add_valuerangeprocessor (notmuch->value_range_processor);

# Read state from previous runs
self.cache = RawConfigParser()
if os.path.exists(CACHEFILE):
try:
self.cache.read(CACHEFILE)
except Exception, e:
print >>sys.stderr, e
print >>sys.stderr, "ignoring %s which seems to be corrupted" % CACHEFILE

self.dirty = False
self.facets = None
self.tags = None

def save(self):
"Save the state so we find it next time"
if self.dirty:
if not os.path.exists(XDG_CACHE_HOME):
os.makedirs(XDG_CACHE_HOME, mode=0700)
self.cache.write(open(CACHEFILE, "w"))
self.dirty = False

def vocabulary(self):
if self.facets is None:
self.facets, self.tags = readVocabulary()
return self.facets, self.tags

def unprefix(self, term):
"Convert DB prefixes to user prefixes"
if term.startswith("XT"):
return "tag:" + term[2:]
elif term.startswith("XS"):
return "sec:" + term[2:]
elif term.startswith("XP"):
return "pkg:" + term[2:]
return term

def is_tag(self, t):
return self.db.term_exists("XT" + t)

def set_query_args(self, args, secondary=False):
def term_or_tag(t):
if "::" in t and self.is_tag(t):
return "tag:" + t
else:
return t
args = map(term_or_tag, args)
self.set_query_string(" ".join(args), secondary=secondary)

def set_query_string(self, q, secondary=False):
"Set the query in the cache"
if not secondary:
self.set_cache_last("query", q)
self.unset_cache_last("secondary query")
else:
self.set_cache_last("secondary query", q)
self.unset_cache_last("skip")

def set_sort(self, key=None, cutoff=60):
"Set sorting method (default is by relevance)"
if key is None:
self.unset_cache_last("sort")
self.unset_cache_last("cutoff")
else:
self.set_cache_last("sort", key)
self.set_cache_last("cutoff", str(cutoff))
self.unset_cache_last("skip")

def build_query(self):
"Build query from cached query info"
q = self.get_cache_last("query")
if not self.cache.has_option("last", "query"):
raise SavedStateError("no saved query")
self.query = self.qp.parse_query(q,
xapian.QueryParser.FLAG_BOOLEAN |
xapian.QueryParser.FLAG_LOVEHATE |
xapian.QueryParser.FLAG_BOOLEAN_ANY_CASE |
xapian.QueryParser.FLAG_WILDCARD |
xapian.QueryParser.FLAG_PURE_NOT |
xapian.QueryParser.FLAG_SPELLING_CORRECTION |
xapian.QueryParser.FLAG_AUTO_SYNONYMS)

secondary = self.get_cache_last("secondary query", None)
if secondary:
secondary = self.qp.parse_query(secondary,
xapian.QueryParser.FLAG_BOOLEAN |
xapian.QueryParser.FLAG_LOVEHATE |
xapian.QueryParser.FLAG_BOOLEAN_ANY_CASE |
xapian.QueryParser.FLAG_WILDCARD |
xapian.QueryParser.FLAG_PURE_NOT |
xapian.QueryParser.FLAG_SPELLING_CORRECTION |
xapian.QueryParser.FLAG_AUTO_SYNONYMS)
self.query = xapian.Query(xapian.Query.OP_AND, self.query, secondary)

# print "Query:", self.query

# Build the enquire with the query
self.enquire = xapian.Enquire(self.db)
self.enquire.set_query(self.query)

sort = self.get_cache_last("sort")
if sort is not None:
values, descs = axi.readValueDB()

# If we don't sort by relevance, we need to specify a cutoff in order to
# remove poor results from the output
#
# Note: ept-cache implements an adaptive cutoff as follows:
# 1. Retrieve only one result, with default sorting. Read its relevance as
# the maximum relevance.
# 2. Set the cutoff as some percentage of the maximum relevance
# 3. Set sort by the wanted value
# 4. Perform the query
# TODO: didn't this use to work?
#self.enquire.set_cutoff(int(self.get_cache_last("cutoff", 60)))

reverse = sort[0] == '-' or sort[-1] == '-'
sort = sort.strip('-')

# Sort by the requested value
self.enquire.set_sort_by_value(values[sort], reverse)

def get_matches(self, first=None, count=20):
"""
Return a Xapian mset with the next page of results.
"""
if first is None:
first = int(self.get_cache_last("skip", 0))
self.set_cache_last("lastfirst", first)
self.set_cache_last("skip", first + count)
matches = self.enquire.get_mset(first, count)
return matches

def get_all_matches(self, first=None):
"""
Generate Xapian matches for all query matches
"""
if first is None:
first = int(self.get_cache_last("skip", 0))
self.unset_cache_last("lastfirst")
self.unset_cache_last("skip")
while True:
matches = self.enquire.get_mset(first, 100)
count = matches.size()
if count == 0: break
for m in matches:
yield m
first += 100

def get_spelling_correction(self):
return self.qp.get_corrected_query_string()

def get_suggestions(self, count=10, filter=None):
"""
Compute suggestions for more terms

Return a Xapian ESet
"""
# Use the first 30 results as the key ones to use to compute relevant
# terms
rset = xapian.RSet()
for m in self.enquire.get_mset(0, 30):
rset.add_document(m.docid)

# Get results, optionally filtered
if filter is None:
filter = self.BasicFilter()

return self.enquire.get_eset(count, rset, filter)

# ConfigParser access wrappers with lots of extra ifs, needed because the
# ConfigParser API has been designed to throw exceptions in the most stupid
# places one could possibly conceive

def get_cache_last(self, key, default=None):
if self.cache.has_option("last", key):
return self.cache.get("last", key)
return default

def set_cache_last(self, key, val):
if not self.cache.has_section("last"):
self.cache.add_section("last")
self.cache.set("last", key, val)
self.dirty = True

def unset_cache_last(self, key):
if not self.cache.has_section("last"):
return
self.cache.remove_option("last", key)

def get_rdeps(self, name, pfx):
"Return all the rdeps of type @pfx for package @name"
enquire = xapian.Enquire(self.db)
enquire.set_query(xapian.Query(pfx+name))
first = 0
while True:
found = 0
for m in enquire.get_mset(first, first + 20):
found += 1
yield m.document.get_data()
if found < 20:
break
first += 20

class Cmdline(object):
BOOLWORDS = set(["and", "or", "not"])

def __init__(self):
self.name = "axi-cache"

class Parser(OptionParser):
def __init__(self, cmdline, *args, **kwargs):
OptionParser.__init__(self, *args, **kwargs)
self.cmdline = cmdline

def error(self, msg):
sys.stderr.write("%s: error: %s\n\n" % (self.get_prog_name(), msg))
self.print_help(sys.stderr)
sys.exit(2)

def print_help(self, fd=sys.stdout):
commands = StringIO()
self.cmdline.format_help(commands)
buf = StringIO()
# Still in 2010 Cmdline is not an object. Oh dear.
OptionParser.print_help(self, buf)
buf = buf.getvalue().replace("ENDDESC", "\n\n" + commands.getvalue()[:-1])
fd.write(buf)

self.parser = Parser(self, usage="usage: %prog [options] command [args]",
version="%prog "+ VERSION,
description="Query the Apt Xapian index.ENDDESC")
self.parser.add_option("-s", "--sort", help="sort by the given value, as listed in %s. Add a '-' to reverse sort order" % axi.XAPIANDBVALUES)
self.parser.add_option("--tags", action="store_true", help="show matching tags, rather than packages")
self.parser.add_option("--tabcomplete", action="store", metavar="TYPE", help="suggest words for tab completion of the current command line (type is 'plain' or 'partial')")
self.parser.add_option("--last", action="store_true", help="use 'show --last' to limit tab completion to only the packages from the last search results")
self.parser.add_option("--all", action="store_true", help="disable pagination and always show all results. Note that search results are normally sorted by relevance, so you may find meaningless results at the end of the output")

(opts, args) = self.parser.parse_args()

self.opts = opts
self.args = args

if opts.tags and opts.all:
self.parser.error("--tags conflicts with --all")

def tabcomplete_query_args(self):
if self.opts.tabcomplete == "partial" and self.args:
queryargs = self.args[:-1]
partial = self.args[-1]
else:
queryargs = self.args
partial = None
# Remove trailing boolean terms
while queryargs and queryargs[-1].lower() in self.BOOLWORDS:
queryargs = queryargs[:-1]
return queryargs, partial

def do_info(self, args):
"info: print information about the apt-xapian-index environment"
import time
import datetime
import axi.indexer
# General info
print "Main data directory:", axi.XAPIANDBPATH
try:
cur_timestamp = os.path.getmtime(axi.XAPIANDBSTAMP)
cur_time = time.strftime("%c", time.localtime(cur_timestamp))
cur_time = "last updated: " + cur_time
except e:
cur_timestamp = 0
cur_time = "not available: " + str(e)
print "Update timestamp: %s (%s)" % (axi.XAPIANDBSTAMP, cur_time)
try:
index_loc = open(axi.XAPIANINDEX).read().split(" ", 1)[1].strip()
index_loc = "pointing to " + index_loc
except e:
index_loc = "not available: " + str(e)
print "Index location: %s (%s)" % (axi.XAPIANINDEX, index_loc)
def fileinfo(fname):
if os.path.exists(fname):
return fname
else:
return fname + " (available from next reindex)"
print "Documentation of index contents:", fileinfo(axi.XAPIANDBDOC)
print "Documentation of available prefixes:", fileinfo(axi.XAPIANDBPREFIXES)
print "Documentation of available values:", fileinfo(axi.XAPIANDBVALUES)
print "Plugin directory:", axi.PLUGINDIR

# Aggregated plugin information
# { name: { path=path, desc=desc, status=status } }
plugins = dict()

# Aggregated value information
# { valuename: { val=num, desc=shortdesc, plugins=[plugin names] } }
values, descs = axi.readValueDB()
values = dict(((a, dict(val=b, desc=descs[a], plugins=[])) for a, b in values.iteritems()))

# Aggregated data source information
# { pathname: { desc=shortdesc, plugins=[plugin names] } }
sources = dict()

# Per-plugin info
all_plugins_names = set((x for x in os.listdir(axi.PLUGINDIR) if x.endswith(".py")))
for plugin in axi.indexer.Plugins():
all_plugins_names.remove(plugin.filename)
doc = plugin.obj.doc() or dict()

# Last update status / whether an update is due
# TODO: check if the data from the plugin is present in the index
ts = plugin.info["timestamp"]
if cur_timestamp == 0:
status = "enabled, not indexed"
elif ts == 0:
status = "enabled, up to date"
elif ts <= cur_timestamp:
delta = datetime.timedelta(seconds=cur_timestamp-ts)
status = "enabled, up to date (%s older than index)" % str(delta)
else:
delta = datetime.timedelta(seconds=ts-cur_timestamp)
status = "enabled, needs indexing (%s newer than index)" % str(delta)

desc = doc.get("shortDesc", "(description unavailable)")
plugins[plugin.name] = dict(
path=os.path.join(axi.PLUGINDIR, plugin.filename),
desc=desc,
status=status,
)

# Aggregate info about values
for v in plugin.info.get("values", []):
name = v.get("name", None)
if name is None: continue
if name not in values:
values[name] = dict(val=None, desc=v.get("desc", "(unknown"), plugins=[])
values[name].setdefault("plugins", []).append(plugin.name)

# Data files used by the plugin
for s in plugin.info.get("sources", []):
path = s.get("path", "(unknown)")
if path not in sources:
sources[path] = dict(desc=s.get("desc", "(unknown)"), plugins=[])
sources[path].setdefault("plugins", []).append(plugin.name)

# Disabled plugins
for plugin in sorted(all_plugins_names):
desc = doc.get("shortDesc", "(unavailable)")
name = os.path.splitext(plugin)[0]
plugins[name] = dict(
path=os.path.join(axi.PLUGINDIR, plugin),
desc="(plugin disabled, description unavailable)",
status="disabled",
)

# Plugin information
# { name: { path=path, desc=desc, status=status } }
#print "Plugins:"
#maxname = max((len(x) for x in plugins.iterkeys()))
#for name, info in sorted(plugins.iteritems(), key=lambda x:x[0]):
# print " %s:" % info["path"]
# print " ", info["desc"]
print "Plugin status:"
maxname = max((len(x) for x in plugins.iterkeys()))
for name, info in sorted(plugins.iteritems(), key=lambda x:x[0]):
print " ", name.ljust(maxname), info["status"]

# Value information
print "Values:"
maxname = max((len(x) for x in values.iterkeys()))
print " ", "Value".ljust(maxname), "Code", "Provided by"
for name, val in sorted(values.iteritems(), key=lambda x:x[0]):
plugins = val.get("plugins", [])
if plugins:
provider = ", ".join(plugins)
else:
provider = "update-apt-xapian-index"
print " ", name.ljust(maxname), "%4d" % int(val["val"]), provider

# Source files information
print "Data sources:"
maxpath = 0
maxdesc = 0
for k, v in sources.iteritems():
if len(k) > maxpath: maxpath = len(k)
if len(v["desc"]) > maxdesc: maxdesc = len(v["desc"])
print " ", "Source".ljust(maxpath), "Description".ljust(maxdesc), "Used by"
for path, info in sources.iteritems():
provider = ", ".join(info.get("plugins", []))
print " ", path.ljust(maxpath), info["desc"].ljust(maxdesc), provider

return 0

def do_search(self, args):
"search [terms]: start a new search"
self.db = DB()
self.db.set_query_args(args)
if self.opts.sort:
self.db.set_sort(self.opts.sort)
else:
self.db.set_sort(None)
self.db.build_query()

if self.opts.all:
self.print_all_matches(self.db.get_all_matches())
else:
self.print_matches(self.db.get_matches())

if not self.opts.tabcomplete:
self.db.save()
return 0

def complete_search(self):
self.db = DB()
if self.opts.tabcomplete == "partial" and len(self.args) == 1:
# Simple prefix search
terms = set()
terms.update((str(term) for term in self.db.db.synonym_keys(self.args[0])))
terms.update((term.term for term in self.db.db.allterms(self.args[0])))
for term in sorted(terms):
print term
for term in self.db.db.allterms("XT" + self.args[0]):
print term.term[2:]
return 0
elif self.opts.tabcomplete == "plain" and not self.args:
# Show a preset list of tags
for facet in ["interface", "role", "use", "works-with"]:
for term in self.db.db.allterms("XT" + facet + "::"):
print term.term[2:]
return 0
else:
# Context-sensitive hints
qargs, partial = self.tabcomplete_query_args()
self.db.set_query_args(qargs)
self.db.build_query()
self.print_completions(self.db.get_matches())
return 0

def do_again(self, args):
"again [query]: repeat the last search, possibly adding query terms"
self.db = DB()
self.db.set_query_args(args, secondary=True)
if self.opts.sort:
self.db.set_sort(self.opts.sort)
else:
self.db.set_sort(None)

try:
self.db.build_query()
except SavedStateError, e:
print >>sys.stderr, "%s: maybe you need to run '%s search' first?" % (self.name, str(e))
return 1

self.print_matches(self.db.get_matches(first = 0))

if not self.opts.tabcomplete:
self.db.save()
return 0

def complete_again(self):
self.db = DB()
qargs, partial = self.tabcomplete_query_args()
self.db.set_query_args(qargs, secondary=True)
try:
self.db.build_query()
except SavedStateError, e:
return 0
self.print_completions(self.db.get_matches(first = 0))
return 0

def do_last(self, args):
"last [count]: show the last results again"
self.db = DB()
try:
self.db.build_query()
except SavedStateError, e:
print >>sys.stderr, "%s: maybe you need to run '%s search' first?" % (self.name, str(e))
return 1

count = int(args[0]) if args else 20
first = int(self.db.get_cache_last("lastfirst", 0))
self.print_matches(self.db.get_matches(first=first, count=count))
return 0

def do_more(self, args):
"more [count]: show more terms from the last search"
self.db = DB()
try:
self.db.build_query()
except SavedStateError, e:
print >>sys.stderr, "%s: maybe you need to run '%s search' first?" % (self.name, str(e))
return 1

count = int(args[0]) if args else 20
self.print_matches(self.db.get_matches(count=count))

if not self.opts.tabcomplete:
self.db.save()
return 0

def do_show(self, args):
"show pkgname[s]: run apt-cache show pkgname[s]"
os.execvp("/usr/bin/apt-cache", ["apt-cache", "show"] + args)

def complete_show(self):
self.db = DB()
if self.opts.tabcomplete == "partial" and self.args:
blacklist = set(self.args[:-1])
partial = self.args[-1]
else:
blacklist = set(self.args)
partial = None

if self.opts.last:
# Expand from output of last query
try:
self.db.build_query()
first = int(self.db.get_cache_last("lastfirst", 0))
for m in self.db.get_matches(first = first):
pkg = m.document.get_data()
if partial is not None and not pkg.startswith(partial): continue
if pkg not in blacklist:
print pkg
except SavedStateError, e:
return 0
else:
# Prefix expand
for term in self.db.db.allterms("XP" + (partial or "")):
pkg = term.term[2:]
if pkg not in blacklist:
print pkg
return 0

def do_showpkg(self, args):
"showpkg pkgname[s]: run apt-cache showpkg pkgname[s]"
os.execvp("/usr/bin/apt-cache", ["apt-cache", "showpkg"] + args)
complete_showpkg = complete_show

def do_showsrc(self, args):
"showsrc pkgname[s]: run apt-cache showsrc pkgname[s]"
os.execvp("/usr/bin/apt-cache", ["apt-cache", "showsrc"] + args)
complete_showsrc = complete_show

def do_depends(self, args):
"depends pkgname[s]: run apt-cache depends pkgname[s]"
os.execvp("/usr/bin/apt-cache", ["apt-cache", "depends"] + args)
complete_depends = complete_show

def do_rdepends(self, args):
"rdepends pkgname[s]: run apt-cache rdepends pkgname[s]"
db = DB()
for name in args:
print name
print "Reverse Depends:"
for pfx in ("XRD", "XRR", "XRS", "XRE", "XRP", "XRB", "XRC"):
for pkg in db.get_rdeps(name, pfx):
print " ", pkg
complete_rdepends = complete_show

def do_rdetails(self, args):
"rdetails pkgname[s]: show details of reverse relationships for the given packages"
db = DB()
for name in args:
for pfx, tag in (
("XRD", "dep"),
("XRR", "rec"),
("XRS", "sug"),
("XRE", "enh"),
("XRP", "pre"),
("XRB", "bre"),
("XRC", "con")):
deps = list(db.get_rdeps(name, pfx))
if not deps: continue
print name, tag, " ".join(deps)
complete_rdetails = complete_show

def do_policy(self, args):
"policy pkgname[s]: run apt-cache policy pkgname[s]"
os.execvp("/usr/bin/apt-cache", ["apt-cache", "policy"] + args)
complete_policy = complete_show

def do_madison(self, args):
"madison pkgname[s]: run apt-cache madison pkgname[s]"
os.execvp("/usr/bin/apt-cache", ["apt-cache", "madison"] + args)
complete_madison = complete_show

def format_help(self, out):
print >>out, "Commands:"
itemlist = []
maxusage = 0
for k, v in sorted(self.__class__.__dict__.iteritems()):
if not k.startswith("do_"): continue
line = self.name + " " + v.__doc__
usage, desc = line.split(": ", 1)
if len(usage) > maxusage:
maxusage = len(usage)
itemlist.append([usage, desc])
print >>out, " search commands:"
for usage, desc in [x for x in itemlist if "run apt-cache" not in x[1]]:
print >>out, " %-*.*s %s" % (maxusage, maxusage, usage, desc)
print >>out, " apt-cache front-ends:"
for usage, desc in [x for x in itemlist if "run apt-cache" in x[1]]:
print >>out, " %-*.*s %s" % (maxusage, maxusage, usage, desc)

def do_help(self, args):
"help: show a summary of commands"
self.parser.print_help()
return 0

def clean_suggestions(self, eset):
res = []
for r in eset:
res.append(self.db.unprefix(r.term))
#res.sort()
return res

def print_completions(self, matches):
if self.opts.tabcomplete == "partial":
prefix = self.args[-1] if self.args else None
exclude = self.args[:-1]
else:
prefix = None
exclude = self.args
exclude = [x for x in exclude if x.lower() not in self.BOOLWORDS]
for s in self.clean_suggestions(self.db.get_suggestions(count=10, filter=DB.BasicFilter(stemmer=self.db.stem, exclude=exclude, prefix=prefix))):
if s.startswith("tag:"):
print s[4:]
else:
print s

def print_matches(self, matches):
if self.opts.tags:
facets, tags = self.db.vocabulary()
def describe_tag(tag):
f, t = tag.split("::")
try:
fd = facets[f]["Description"].split("\n", 1)[0].strip()
td = tags[tag]["Description"].split("\n", 1)[0].strip()
return "%s: %s" % (fd, td)
except KeyError:
return None
maxscore = None
for res in self.db.get_suggestions(count=20, filter=DB.TagFilter()):
# Normalise the score in the interval [0, 1]
weight = math.log(res.weight)
if maxscore == None: maxscore = weight
score = float(weight) / maxscore
tag = self.db.unprefix(res.term)[4:]
desc = describe_tag(tag)
if desc is None:
print "%i%% %s" % (int(score * 100), tag)
else:
print "%i%% %s -- %s" % (int(score * 100), tag, desc)
else:
est = matches.get_matches_estimated()
first = matches.get_firstitem()
count = matches.size()
print "%i results found." % est
if count != 0:
print "Results %i-%i:" % (first + 1, first + count)
elif first != 0:
print "No more results to show"
self.print_all_matches((m for m in matches))
if first == 0:
sc = self.db.get_spelling_correction()
if sc:
print "Did you mean:", sc, "?"
sugg = self.clean_suggestions(self.db.get_suggestions(count=7, filter=DB.TermFilter(stemmer=self.db.stem, exclude=self.args)))
print "More terms:", " ".join(sugg)
stags = self.clean_suggestions(self.db.get_suggestions(count=7, filter=DB.TagFilter()))
print "More tags:", " ".join([x[4:] for x in stags])
if first + count < est:
print "`%s more' will give more results" % self.name
if first > 0:
print "`%s again' will restart the search" % self.name

def print_all_matches(self, matches_iter):
"""
Given an iterator of Xapian matches, print them all out nicely
formatted
"""
aptcache = apt.Cache(progress=AptSilentProgress())
for m in matches_iter:
name = m.document.get_data()
try:
pkg = aptcache[name]
except KeyError:
pkg = None
if pkg is not None and pkg.candidate:
print "%i%% %s - %s" % (m.percent, name, pkg.candidate.summary)
else:
print "%i%% %s - (unknown by apt)" % (m.percent, name)

def perform(self):
self.cmd = "help" if not self.args else self.args.pop(0)
if self.opts.tabcomplete is not None:
f = getattr(self, "complete_" + self.cmd, None)
if f is None:
return 0
return f()
else:
f = getattr(self, "do_" + self.cmd, None)
if f is None:
print >>sys.stderr, "Invalid command: `%s'.\n" % self.cmd
self.do_help(self.args)
return 1
return f(self.args)

if __name__ == "__main__":
ui = Cmdline()
if helponly: sys.exit(0)
sys.exit(ui.perform())