| Server IP : 104.21.37.246 / Your IP : 172.71.28.145 [ Web Server : Apache System : Linux cpanel01wh.bkk1.cloud.z.com 2.6.32-954.3.5.lve1.4.59.el6.x86_64 #1 SMP Thu Dec 6 05:11:00 EST 2018 x86_64 User : cp648411 ( 1354) PHP Version : 7.2.34 Disable Function : NONE Domains : 0 Domains MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : OFF | Pkexec : OFF Directory : /proc/2388321/root/usr/lib/python2.6/site-packages/supervisor/ |
Upload File : |
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""supervisord -- run a set of applications as daemons.
Usage: %s [options]
Options:
-c/--configuration URL -- configuration file or URL
-n/--nodaemon -- run in the foreground (same as 'nodaemon true' in config file)
-h/--help -- print this usage message and exit
-u/--user USER -- run supervisord as this user (or numeric uid)
-m/--umask UMASK -- use this umask for daemon subprocess (default is 022)
-d/--directory DIRECTORY -- directory to chdir to when daemonized
-l/--logfile FILENAME -- use FILENAME as logfile path
-y/--logfile_maxbytes BYTES -- use BYTES to limit the max size of logfile
-z/--logfile_backups NUM -- number of backups to keep when max bytes reached
-e/--loglevel LEVEL -- use LEVEL as log level (debug,info,warn,error,critical)
-j/--pidfile FILENAME -- write a pid file for the daemon process to FILENAME
-i/--identifier STR -- identifier used for this instance of supervisord
-q/--childlogdir DIRECTORY -- the log directory for child process logs
-k/--nocleanup -- prevent the process from performing cleanup (removal of
orphaned child log files, etc.) at startup.
-w/--http_port SOCKET -- the host/port that the HTTP server should listen on
-g/--http_username STR -- the username for HTTP auth
-r/--http_password STR -- the password for HTTP auth
-a/--minfds NUM -- the minimum number of file descriptors for start success
--minprocs NUM -- the minimum number of processes available for start success
"""
import os
import sys
import time
import errno
import socket
import select
import signal
import asyncore
import traceback
import StringIO
import shlex
import logging
from options import ServerOptions
from options import decode_wait_status
from options import signame
class ProcessStates:
STOPPED = 0
STARTING = 10
RUNNING = 20
BACKOFF = 30
STOPPING = 40
EXITED = 100
FATAL = 200
UNKNOWN = 1000
def getProcessStateDescription(code):
for statename in ProcessStates.__dict__:
if getattr(ProcessStates, statename) == code:
return statename
class SupervisorStates:
ACTIVE = 0
SHUTDOWN = 1
def getSupervisorStateDescription(code):
for statename in SupervisorStates.__dict__:
if getattr(SupervisorStates, statename) == code:
return statename
class Subprocess:
"""A class to manage a subprocess."""
# Initial state; overridden by instance variables
pid = 0 # Subprocess pid; 0 when not running
laststart = 0 # Last time the subprocess was started; 0 if never
laststop = 0 # Last time the subprocess was stopped; 0 if never
delay = 0 # If nonzero, delay starting or killing until this time
administrative_stop = 0 # true if the process has been stopped by an admin
system_stop = 0 # true if the process has been stopped by the system
killing = 0 # flag determining whether we are trying to kill this proc
backoff = 0 # backoff counter (to startretries)
pipes = None # mapping of pipe descriptor purpose to file descriptor
childlog = None # the current logger
logbuffer = '' # buffer of characters read from child pipes
exitstatus = None # status attached to dead process by finsh()
spawnerr = None # error message attached by spawn() if any
def __init__(self, options, config):
"""Constructor.
Arguments are a ServerOptions instance and a ProcessConfig instance.
"""
self.options = options
self.config = config
self.pipes = {}
if config.logfile:
backups = config.logfile_backups
maxbytes = config.logfile_maxbytes
# using "not not maxbytes" below is an optimization. If
# maxbytes is zero, it means we're not using rotation. The
# rotating logger is more expensive than the normal one.
self.childlog = options.getLogger(config.logfile, logging.INFO,
'%(message)s',
rotating=not not maxbytes,
maxbytes=maxbytes,
backups=backups)
def removelogs(self):
if self.childlog:
for handler in self.childlog.handlers:
handler.remove()
handler.reopen()
def reopenlogs(self):
if self.childlog:
for handler in self.childlog.handlers:
handler.reopen()
def log_output(self):
if self.logbuffer:
data, self.logbuffer = self.logbuffer, ''
if self.childlog:
self.childlog.info(data)
msg = '%s output:\n%s' % (self.config.name, data)
self.options.logger.log(self.options.TRACE, msg)
def drain_stdout(self, *ignored):
output = self.options.readfd(self.pipes['stdout'])
if output == '':
self.options.close_fd(self.pipes['stdout'])
del self.pipes['stdout']
if self.config.log_stdout:
self.logbuffer += output
def drain_stderr(self, *ignored):
output = self.options.readfd(self.pipes['stderr'])
if output == '':
self.options.close_fd(self.pipes['stderr'])
del self.pipes['stderr']
if self.config.log_stderr:
self.logbuffer += output
def drain(self):
if self.pipes.has_key('stdout'):
self.drain_stdout()
if self.pipes.has_key('stderr'):
self.drain_stderr()
def get_pipe_drains(self):
if not self.pipes:
return []
drains = []
if self.pipes.has_key('stdout'):
drains.append([ self.pipes['stdout'], self.drain_stdout])
if self.pipes.has_key('stderr'):
drains.append([ self.pipes['stderr'], self.drain_stderr])
return drains
def get_execv_args(self):
"""Internal: turn a program name into a file name, using $PATH,
make sure it exists """
commandargs = shlex.split(self.config.command)
program = commandargs[0]
if "/" in program:
filename = program
try:
st = self.options.stat(filename)
return filename, commandargs, st
except OSError:
return filename, commandargs, None
else:
path = self.options.get_path()
filename = None
st = None
for dir in path:
filename = os.path.join(dir, program)
try:
st = self.options.stat(filename)
return filename, commandargs, st
except OSError:
continue
return None, commandargs, None
def record_spawnerr(self, msg):
now = time.time()
self.spawnerr = msg
self.options.logger.critical("spawnerr: %s" % msg)
self.backoff = self.backoff + 1
self.delay = now + self.backoff
def spawn(self):
"""Start the subprocess. It must not be running already.
Return the process id. If the fork() call fails, return 0.
"""
pname = self.config.name
if self.pid:
msg = 'process %r already running' % pname
self.options.logger.critical(msg)
return
self.killing = 0
self.spawnerr = None
self.exitstatus = None
self.system_stop = 0
self.administrative_stop = 0
self.laststart = time.time()
filename, argv, st = self.get_execv_args()
fail_msg = self.options.check_execv_args(filename, argv, st)
if fail_msg is not None:
self.record_spawnerr(fail_msg)
return
try:
self.pipes = self.options.make_pipes()
except OSError, why:
code = why[0]
if code == errno.EMFILE:
# too many file descriptors open
msg = 'too many open files to spawn %r' % pname
else:
msg = 'unknown error: %s' % errno.errorcode.get(code, code)
self.record_spawnerr(msg)
return
try:
pid = self.options.fork()
except OSError, why:
code = why[0]
if code == errno.EAGAIN:
# process table full
msg = 'Too many processes in process table to spawn %r' % pname
else:
msg = 'unknown error: %s' % errno.errorcode.get(code, code)
self.record_spawnerr(msg)
self.options.close_parent_pipes(self.pipes)
self.options.close_child_pipes(self.pipes)
return
if pid != 0:
# Parent
self.pid = pid
self.options.close_child_pipes(self.pipes)
self.options.logger.info('spawned: %r with pid %s' % (pname, pid))
self.spawnerr = None
# we use self.delay here as a mechanism to indicate that we're in
# the STARTING state.
self.delay = time.time() + self.config.startsecs
self.options.pidhistory[pid] = self
return pid
else:
# Child
try:
# prevent child from receiving signals sent to the
# parent by calling os.setpgrp to create a new process
# group for the child; this prevents, for instance,
# the case of child processes being sent a SIGINT when
# running supervisor in foreground mode and Ctrl-C in
# the terminal window running supervisord is pressed.
# Presumably it also prevents HUP, etc received by
# supervisord from being sent to children.
self.options.setpgrp()
self.options.dup2(self.pipes['child_stdin'], 0)
self.options.dup2(self.pipes['child_stdout'], 1)
self.options.dup2(self.pipes['child_stderr'], 2)
for i in range(3, self.options.minfds):
self.options.close_fd(i)
# sending to fd 1 will put this output in the log(s)
msg = self.set_uid()
if msg:
self.options.write(
1, "%s: error trying to setuid to %s!\n" %
(pname, self.config.uid)
)
self.options.write(1, "%s: %s\n" % (pname, msg))
try:
self.options.execv(filename, argv)
except OSError, why:
code = why[0]
self.options.write(1, "couldn't exec %s: %s\n" % (
argv[0], errno.errorcode.get(code, code)))
except:
(file, fun, line), t,v,tbinfo = asyncore.compact_traceback()
error = '%s, %s: file: %s line: %s' % (t, v, file, line)
self.options.write(1, "couldn't exec %s: %s\n" % (filename,
error))
finally:
self.options._exit(127)
def stop(self):
""" Administrative stop """
self.administrative_stop = 1
return self.kill(self.config.stopsignal)
def kill(self, sig):
"""Send a signal to the subprocess. This may or may not kill it.
Return None if the signal was sent, or an error message string
if an error occurred or if the subprocess is not running.
"""
now = time.time()
if not self.pid:
msg = ("attempted to kill %s with sig %s but it wasn't running" %
(self.config.name, signame(sig)))
self.options.logger.debug(msg)
return msg
try:
self.options.logger.debug('killing %s (pid %s) with signal %s'
% (self.config.name,
self.pid,
signame(sig)))
# RUNNING -> STOPPING
self.killing = 1
self.delay = now + self.config.stopwaitsecs
self.options.kill(self.pid, sig)
except:
io = StringIO.StringIO()
traceback.print_exc(file=io)
tb = io.getvalue()
msg = 'unknown problem killing %s (%s):%s' % (self.config.name,
self.pid, tb)
self.options.logger.critical(msg)
self.pid = 0
self.killing = 0
self.delay = 0
return msg
return None
def finish(self, pid, sts):
""" The process was reaped and we need to report and manage its state
"""
self.drain()
self.log_output()
es, msg = decode_wait_status(sts)
now = time.time()
self.laststop = now
processname = self.config.name
tooquickly = now - self.laststart < self.config.startsecs
badexit = not es in self.config.exitcodes
expected = not (tooquickly or badexit)
if self.killing:
# likely the result of a stop request
# implies STOPPING -> STOPPED
self.killing = 0
self.delay = 0
self.exitstatus = es
msg = "stopped: %s (%s)" % (processname, msg)
elif expected:
# this finish was not the result of a stop request, but
# was otherwise expected
# implies RUNNING -> EXITED
self.delay = 0
self.backoff = 0
self.exitstatus = es
msg = "exited: %s (%s)" % (processname, msg + "; expected")
else:
# the program did not stay up long enough or exited with
# an unexpected exit code
self.exitstatus = None
self.backoff = self.backoff + 1
self.delay = now + self.backoff
if tooquickly:
self.spawnerr = (
'Exited too quickly (process log may have details)')
elif badexit:
self.spawnerr = 'Bad exit code %s' % es
msg = "exited: %s (%s)" % (processname, msg + "; not expected")
self.options.logger.info(msg)
self.pid = 0
self.options.close_parent_pipes(self.pipes)
self.pipes = {}
def set_uid(self):
if self.config.uid is None:
return
msg = self.options.dropPrivileges(self.config.uid)
return msg
def __cmp__(self, other):
# sort by priority
return cmp(self.config.priority, other.config.priority)
def __repr__(self):
return '<Subprocess at %s with name %s in state %s>' % (
id(self),
self.config.name,
getProcessStateDescription(self.get_state()))
def get_state(self):
if not self.laststart:
return ProcessStates.STOPPED
elif self.killing:
return ProcessStates.STOPPING
elif self.system_stop:
return ProcessStates.FATAL
elif self.exitstatus is not None:
if self.administrative_stop:
return ProcessStates.STOPPED
else:
return ProcessStates.EXITED
elif self.delay:
if self.pid:
return ProcessStates.STARTING
else:
return ProcessStates.BACKOFF
elif self.pid:
return ProcessStates.RUNNING
return ProcessStates.UNKNOWN
class Supervisor:
mood = 1 # 1: up, 0: restarting, -1: suicidal
stopping = False # set after we detect that we are handling a stop request
lastdelayreport = 0
def __init__(self, options):
self.options = options
def main(self, args=None, test=False, first=False):
self.options.realize(args)
self.options.cleanup_fds()
info_messages = []
critical_messages = []
setuid_msg = self.options.set_uid()
if setuid_msg:
critical_messages.append(setuid_msg)
if first:
rlimit_messages = self.options.set_rlimits()
info_messages.extend(rlimit_messages)
# this sets the options.logger object
# delay logger instantiation until after setuid
self.options.make_logger(critical_messages, info_messages)
if not self.options.nocleanup:
# clean up old automatic logs
self.options.clear_autochildlogdir()
# delay "automatic" child log creation until after setuid because
# we want to use mkstemp, which needs to create the file eagerly
self.options.create_autochildlogs()
self.run(test)
def run(self, test=False):
self.processes = {}
for program in self.options.programs:
name = program.name
self.processes[name] = self.options.make_process(program)
try:
self.options.process_environment()
self.options.openhttpserver(self)
self.options.setsignals()
if not self.options.nodaemon:
self.options.daemonize()
# writing pid file needs to come *after* daemonizing or pid
# will be wrong
self.options.write_pidfile()
self.runforever(test)
finally:
self.options.cleanup()
def runforever(self, test=False):
timeout = 1
socket_map = self.options.get_socket_map()
while 1:
if self.mood > 0:
self.start_necessary()
r, w, x = [], [], []
if self.mood < 1:
if not self.stopping:
self.stop_all()
self.stopping = True
# if there are no delayed processes (we're done killing
# everything), it's OK to stop or reload
delayprocs = self.get_delay_processes()
if delayprocs:
now = time.time()
if now > (self.lastdelayreport + 3): # every 3 secs
names = [ p.config.name for p in delayprocs]
namestr = ', '.join(names)
self.options.logger.info('waiting for %s to die' %
namestr)
self.lastdelayreport = now
else:
break
process_map = {}
# process output fds
for proc in self.processes.values():
proc.log_output()
drains = proc.get_pipe_drains()
for fd, drain in drains:
r.append(fd)
process_map[fd] = drain
# medusa i/o fds
for fd, dispatcher in socket_map.items():
if dispatcher.readable():
r.append(fd)
if dispatcher.writable():
w.append(fd)
try:
r, w, x = select.select(r, w, x, timeout)
except select.error, err:
r = w = x = []
if err[0] == errno.EINTR:
self.options.logger.log(self.options.TRACE,
'EINTR encountered in select')
else:
raise
for fd in r:
if process_map.has_key(fd):
drain = process_map[fd]
# drain the file descriptor
drain(fd)
if socket_map.has_key(fd):
try:
socket_map[fd].handle_read_event()
except asyncore.ExitNow:
raise
except:
socket_map[fd].handle_error()
for fd in w:
if socket_map.has_key(fd):
try:
socket_map[fd].handle_write_event()
except asyncore.ExitNow:
raise
except:
socket_map[fd].handle_error()
self.transition()
self.kill_undead()
self.reap()
self.handle_signal()
if test:
break
def start_necessary(self):
processes = self.processes.values()
processes.sort() # asc by priority
now = time.time()
for p in processes:
state = p.get_state()
if state == ProcessStates.STOPPED and not p.laststart:
if p.config.autostart:
# STOPPED -> STARTING
p.spawn()
elif state == ProcessStates.EXITED:
if p.config.autorestart:
# EXITED -> STARTING
p.spawn()
elif state == ProcessStates.BACKOFF:
if now > p.delay:
# BACKOFF -> STARTING
p.spawn()
def stop_all(self):
processes = self.processes.values()
processes.sort()
processes.reverse() # stop in desc priority order
for proc in processes:
state = proc.get_state()
if state == ProcessStates.RUNNING:
# RUNNING -> STOPPING
proc.stop()
elif state == ProcessStates.STARTING:
# STARTING -> STOPPING (unceremoniously subvert the RUNNING
# state)
proc.stop()
elif state == ProcessStates.BACKOFF:
# BACKOFF -> FATAL
proc.delay = 0
proc.backoff = 0
proc.system_stop = 1
def transition(self):
now = time.time()
processes = self.processes.values()
for proc in processes:
state = proc.get_state()
# we need to transition processes between BACKOFF ->
# FATAL and STARTING -> RUNNING within here
config = proc.config
logger = self.options.logger
if state == ProcessStates.BACKOFF:
if proc.backoff > config.startretries:
# BACKOFF -> FATAL if the proc has exceeded its number
# of retries
proc.delay = 0
proc.backoff = 0
proc.system_stop = 1
msg = ('entered FATAL state, too many start retries too '
'quickly')
logger.info('gave up: %s %s' % (config.name, msg))
elif state == ProcessStates.STARTING:
if now - proc.laststart > config.startsecs:
# STARTING -> RUNNING if the proc has started
# successfully and it has stayed up for at least
# self.config.startsecs,
proc.delay = 0
proc.backoff = 0
msg = ('entered RUNNING state, process has stayed up for '
'> than %s seconds (startsecs)' % config.startsecs)
logger.info('success: %s %s' % (config.name, msg))
def get_delay_processes(self):
""" Processes which are starting or stopping """
return [ x for x in self.processes.values() if x.delay ]
def get_undead(self):
""" Processes which we've attempted to stop but which haven't responded
to a kill request within a given amount of time (stopwaitsecs) """
now = time.time()
processes = self.processes.values()
undead = []
for proc in processes:
if proc.get_state() == ProcessStates.STOPPING:
time_left = proc.delay - now
if time_left <= 0:
undead.append(proc)
return undead
def kill_undead(self):
for undead in self.get_undead():
# kill processes which are taking too long to stop with a final
# sigkill. if this doesn't kill it, the process will be stuck
# in the STOPPING state forever.
self.options.logger.critical(
'killing %r (%s) with SIGKILL' % (undead.config.name,
undead.pid))
undead.kill(signal.SIGKILL)
def reap(self, once=False):
pid, sts = self.options.waitpid()
if pid:
process = self.options.pidhistory.get(pid, None)
if process is None:
self.options.logger.critical('reaped unknown pid %s)' % pid)
else:
name = process.config.name
process.finish(pid, sts)
if not once:
self.reap() # keep reaping until no more kids to reap
def handle_signal(self):
if self.options.signal:
sig, self.options.signal = self.options.signal, None
if sig in (signal.SIGTERM, signal.SIGINT, signal.SIGQUIT):
self.options.logger.critical(
'received %s indicating exit request' % signame(sig))
self.mood = -1
elif sig == signal.SIGHUP:
self.options.logger.critical(
'received %s indicating restart request' % signame(sig))
self.mood = 0
elif sig == signal.SIGCHLD:
self.options.logger.info(
'received %s indicating a child quit' % signame(sig))
elif sig == signal.SIGUSR2:
self.options.logger.info(
'received %s indicating log reopen request' % signame(sig))
self.options.reopenlogs()
for process in self.processes.values():
process.reopenlogs()
else:
self.options.logger.debug(
'received %s indicating nothing' % signame(sig))
def get_state(self):
if self.mood <= 0:
return SupervisorStates.SHUTDOWN
return SupervisorStates.ACTIVE
# Main program
def main(test=False):
assert os.name == "posix", "This code makes Unix-specific assumptions"
first = True
while 1:
# if we hup, restart by making a new Supervisor()
# the test argument just makes it possible to unit test this code
options = ServerOptions()
d = Supervisor(options)
d.main(None, test, first)
first = False
if test:
return d
if d.mood < 0:
sys.exit(0)
for proc in d.processes.values():
proc.removelogs()
if d.options.httpserver:
d.options.httpserver.close()
if __name__ == "__main__":
main()