K2LL33D SHELL

 Apache/2.4.7 (Ubuntu)
 Linux sman1baleendah 3.13.0-24-generic #46-Ubuntu SMP Thu Apr 10 19:11:08 UTC 2014 x86_64
 uid=33(www-data) gid=33(www-data) groups=33(www-data)
 safemode : OFF
 MySQL: ON | Perl: ON | cURL: OFF | WGet: ON
  >  / usr / lib / python2.7 / dist-packages / landscape / user /
server ip : 172.67.156.115

your ip : 108.162.216.217

H O M E


Filename/usr/lib/python2.7/dist-packages/landscape/user/management.py
Size9.14 kb
Permissionrw-r--r--
Ownerroot : root
Create time27-Apr-2025 09:56
Last modified20-Feb-2014 23:01
Last accessed06-Jul-2025 20:06
Actionsedit | rename | delete | download (gzip)
Viewtext | code | image
# XXX: There is the potential for some sort of "unixadmin" package
# which wraps up the commands which we use in this module in a Python
# API, with thorough usage of exceptions and such, instead of pipes to
# subprocesses. liboobs (i.e. System Tools) is a possibility, and has
# documentation now in the 2.17 series, but is not wrapped to Python.

import os
import logging
import subprocess

from landscape.lib import md5crypt
from landscape.user.provider import UserManagementError, UserProvider


class UserManagement(object):
"""Manage system users and groups."""

def __init__(self, provider=None):
self._provider = provider or UserProvider()

def add_user(self, username, name, password, require_password_reset,
primary_group_name, location, work_phone, home_phone):
"""Add C{username} to the computer.

@raises UserManagementError: Raised when C{adduser} fails.
@raises UserManagementError: Raised when C{passwd} fails.
"""
logging.info("Adding user %s.", username)
gecos = "%s,%s,%s,%s" % (name, location or "", work_phone or "",
home_phone or "")
command = ["adduser", username, "--disabled-password", "--gecos",
gecos]
if primary_group_name:
command.extend(["--gid", str(self._provider.get_gid(
primary_group_name))])
result, output = self.call_popen(command)
if result != 0:
raise UserManagementError("Error adding user %s.\n%s" %
(username, output))

self._set_password(username, password)
if require_password_reset:
result, new_output = self.call_popen(["passwd", username, "-e"])
if result != 0:
raise UserManagementError("Error resetting password for user "
"%s.\n%s" % (username, new_output))
else:
output += new_output
return output

def _set_password(self, username, password):
# XXX temporary workaround? We're getting unicode here.
username = username.encode("ascii")
password = password.encode("ascii")
salt = os.urandom(6).encode("base64")[:-1]
crypted = md5crypt.md5crypt(password, salt)
result, output = self.call_popen(["usermod", "-p", crypted, username])
if result != 0:
raise UserManagementError("Error setting password for user "
"%s.\n%s" % (username, output))
return output

def _set_primary_group(self, username, groupname):
primary_gid = self._provider.get_gid(groupname)
command = ["usermod", "-g", str(primary_gid), username]
result, output = self.call_popen(command)
if result != 0:
raise UserManagementError("Error setting primary group to %d for"
"%s.\n%s" % (primary_gid, username,
output))
return output

def set_user_details(self, username, password=None, name=None,
location=None, work_number=None, home_number=None,
primary_group_name=None):
"""Update details for the account matching C{uid}."""
uid = self._provider.get_uid(username)
logging.info("Updating data for user %s (UID %d).", username, uid)
if password:
self._set_password(username, password)

if primary_group_name:
self._set_primary_group(username, primary_group_name)

command = ["chfn"]
for option, value in [("-r", location), ("-f", name),
("-w", work_number), ("-h", home_number)]:
if value is not None:
command += [option, value]

if len(command) > 1:
result, output = self.call_popen(command + [username])
if result != 0:
raise UserManagementError("Error setting details for user "
"%s.\n%s" % (username, output))
return output

def lock_user(self, username):
"""
Lock the account matching C{username} to prevent them from logging in.
"""
uid = self._provider.get_uid(username)
logging.info("Locking out user %s (UID %d).", username, uid)
result, output = self.call_popen(["usermod", "-L", username])
if result != 0:
raise UserManagementError("Error locking user %s.\n%s"
% (username, output))

def unlock_user(self, username):
"""Unlock the account matching C{username}."""
uid = self._provider.get_uid(username)
logging.info("Unlocking user %s (UID %d).", username, uid)

result, output = self.call_popen(["usermod", "-U", username])
if result != 0:
raise UserManagementError("Error unlocking user %s.\n%s"
% (username, output))
return output

def remove_user(self, username, delete_home=False):
"""Remove the account matching C{username} from the computer."""
uid = self._provider.get_uid(username)
command = ["deluser", username]
if delete_home:
logging.info("Removing user %s (UID %d) and deleting their home "
"directory.", username, uid)
command.append("--remove-home")
else:
logging.info("Removing user %s (UID %d) without deleting their "
"home directory.", username, uid)

result, output = self.call_popen(command)
if result != 0:
raise UserManagementError("Error removing user %s (UID %d).\n%s"
% (username, uid, output))
return output

def add_group(self, groupname):
"""Add C{group} with the C{addgroup} system command."""
logging.info("Adding group %s.", groupname)
result, output = self.call_popen(["addgroup", groupname])
if result != 0:
raise UserManagementError("Error adding group %s.\n%s" %
(groupname, output))
return output

def set_group_details(self, groupname, new_name):
"""Update details for the group matching C{gid}."""
gid = self._provider.get_gid(groupname)
logging.info("Renaming group %s (GID %d) to %s.",
groupname, gid, new_name)
command = ["groupmod", "-n", new_name, groupname]
result, output = self.call_popen(command)
if result != 0:
raise UserManagementError("Error renaming group %s (GID %d) to "
"%s.\n%s" % (groupname, gid, new_name,
output))
return output

def add_group_member(self, username, groupname):
"""
Add the user matching C{username} to the group matching C{groupname}
with the C{gpasswd} system command.
"""
uid = self._provider.get_uid(username)
gid = self._provider.get_gid(groupname)
logging.info("Adding user %s (UID %d) to group %s (GID %d).",
username, uid, groupname, gid)
result, output = self.call_popen(["gpasswd", "-a", username,
groupname])
if result != 0:
raise UserManagementError("Error adding user %s (UID %d) to "
"group %s (GID %d).\n%s" %
(username, uid, groupname, gid, output))
return output

def remove_group_member(self, username, groupname):
"""
Remove the user matching C{username} from the group matching
C{groupname} with the C{gpasswd} system command.
"""
uid = self._provider.get_uid(username)
gid = self._provider.get_gid(groupname)
logging.info("Removing user %s (UID %d) from group %s (GID %d).",
username, uid, groupname, gid)
result, output = self.call_popen(["gpasswd", "-d", username,
groupname])
if result != 0:
raise UserManagementError("Error removing user %s (UID %d) "
"from group %s (GID (%d).\n%s"
% (username, uid, groupname,
gid, output))
return output

def remove_group(self, groupname):
"""Remove the account matching C{groupname} from the computer."""
gid = self._provider.get_gid(groupname)
logging.info("Removing group %s (GID %d).", groupname, gid)
result, output = self.call_popen(["groupdel", groupname])
if result != 0:
raise UserManagementError("Error removing group %s (GID %d).\n%s"
% (groupname, gid, output))
return output

def call_popen(self, args):
popen = self._provider.popen(args, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
output = popen.stdout.read()
result = popen.wait()
return result, output