Merge "blueprint <multi-process-api-service>"
This commit is contained in:
@@ -213,6 +213,7 @@ Yun Mao <yunmao@gmail.com>
|
||||
Yun Shen <Yun.Shen@hp.com>
|
||||
Yuriy Taraday <yorik.sar@gmail.com>
|
||||
Zed Shaw <zedshaw@zedshaw.com>
|
||||
Zhiteng Huang <zhiteng.huang@intel.com>
|
||||
Zhixue Wu <Zhixue.Wu@citrix.com>
|
||||
Zhongyue Luo <lzyeval@gmail.com>
|
||||
Ziad Sawalha <github@highbridgellc.com>
|
||||
|
||||
+1
-1
@@ -24,7 +24,7 @@ Starts both the EC2 and OpenStack APIs in separate greenthreads.
|
||||
"""
|
||||
|
||||
import eventlet
|
||||
eventlet.monkey_patch()
|
||||
eventlet.monkey_patch(os=False)
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
+1
-1
@@ -20,7 +20,7 @@
|
||||
"""Starter script for Nova EC2 API."""
|
||||
|
||||
import eventlet
|
||||
eventlet.monkey_patch()
|
||||
eventlet.monkey_patch(os=False)
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
@@ -20,7 +20,7 @@
|
||||
"""Starter script for Nova Metadata API."""
|
||||
|
||||
import eventlet
|
||||
eventlet.monkey_patch()
|
||||
eventlet.monkey_patch(os=False)
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
@@ -20,7 +20,7 @@
|
||||
"""Starter script for Nova OS API."""
|
||||
|
||||
import eventlet
|
||||
eventlet.monkey_patch()
|
||||
eventlet.monkey_patch(os=False)
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
@@ -20,7 +20,7 @@
|
||||
"""Starter script for Nova OS API."""
|
||||
|
||||
import eventlet
|
||||
eventlet.monkey_patch()
|
||||
eventlet.monkey_patch(os=False)
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
@@ -489,12 +489,18 @@
|
||||
# ec2_listen_port=8773
|
||||
#### (IntOpt) port for ec2 api to listen
|
||||
|
||||
# ec2_workers=0
|
||||
#### (IntOpt) Number of EC2 API workers
|
||||
|
||||
# osapi_compute_listen=0.0.0.0
|
||||
#### (StrOpt) IP address for OpenStack API to listen
|
||||
|
||||
# osapi_compute_listen_port=8774
|
||||
#### (IntOpt) list port for osapi compute
|
||||
|
||||
# osapi_compute_workers=0
|
||||
#### (IntOpt) Number of workers for OpenStack API
|
||||
|
||||
# metadata_manager=nova.api.manager.MetadataManager
|
||||
#### (StrOpt) OpenStack metadata service manager
|
||||
|
||||
@@ -504,12 +510,18 @@
|
||||
# metadata_listen_port=8775
|
||||
#### (IntOpt) port for metadata api to listen
|
||||
|
||||
# metadata_workers=0
|
||||
#### (IntOpt) Number of workers for metadata API
|
||||
|
||||
# osapi_volume_listen=0.0.0.0
|
||||
#### (StrOpt) IP address for OpenStack Volume API to listen
|
||||
|
||||
# osapi_volume_listen_port=8776
|
||||
#### (IntOpt) port for os volume api to listen
|
||||
|
||||
# osapi_volume_workers=0
|
||||
#### (IntOpt) Number of workers for OpenStack Volume API
|
||||
|
||||
|
||||
######## defined in nova.test ########
|
||||
|
||||
|
||||
+16
-10
@@ -61,12 +61,18 @@ service_opts = [
|
||||
cfg.IntOpt('ec2_listen_port',
|
||||
default=8773,
|
||||
help='port for ec2 api to listen'),
|
||||
cfg.IntOpt('ec2_workers',
|
||||
default=0,
|
||||
help='Number of workers for EC2 API service'),
|
||||
cfg.StrOpt('osapi_compute_listen',
|
||||
default="0.0.0.0",
|
||||
help='IP address for OpenStack API to listen'),
|
||||
cfg.IntOpt('osapi_compute_listen_port',
|
||||
default=8774,
|
||||
help='list port for osapi compute'),
|
||||
cfg.IntOpt('osapi_compute_workers',
|
||||
default=0,
|
||||
help='Number of workers for OpenStack API service'),
|
||||
cfg.StrOpt('metadata_manager',
|
||||
default='nova.api.manager.MetadataManager',
|
||||
help='OpenStack metadata service manager'),
|
||||
@@ -76,12 +82,18 @@ service_opts = [
|
||||
cfg.IntOpt('metadata_listen_port',
|
||||
default=8775,
|
||||
help='port for metadata api to listen'),
|
||||
cfg.IntOpt('metadata_workers',
|
||||
default=0,
|
||||
help='Number of workers for metadata service'),
|
||||
cfg.StrOpt('osapi_volume_listen',
|
||||
default="0.0.0.0",
|
||||
help='IP address for OpenStack Volume API to listen'),
|
||||
cfg.IntOpt('osapi_volume_listen_port',
|
||||
default=8776,
|
||||
help='port for os volume api to listen')
|
||||
help='port for os volume api to listen'),
|
||||
cfg.IntOpt('osapi_volume_workers',
|
||||
default=0,
|
||||
help='Number of workers for OpenStack Volume API service')
|
||||
]
|
||||
|
||||
FLAGS = flags.FLAGS
|
||||
@@ -135,14 +147,6 @@ class Launcher(object):
|
||||
:returns: None
|
||||
|
||||
"""
|
||||
def sigterm(sig, frame):
|
||||
LOG.audit(_("SIGTERM received"))
|
||||
# NOTE(jk0): Raise a ^C which is caught by the caller and cleanly
|
||||
# shuts down the service. This does not yet handle eventlet
|
||||
# threads.
|
||||
raise KeyboardInterrupt
|
||||
|
||||
signal.signal(signal.SIGTERM, sigterm)
|
||||
|
||||
for service in self._services:
|
||||
try:
|
||||
@@ -362,10 +366,12 @@ class WSGIService(object):
|
||||
self.app = self.loader.load_app(name)
|
||||
self.host = getattr(FLAGS, '%s_listen' % name, "0.0.0.0")
|
||||
self.port = getattr(FLAGS, '%s_listen_port' % name, 0)
|
||||
self.workers = getattr(FLAGS, '%s_workers' % name, 0)
|
||||
self.server = wsgi.Server(name,
|
||||
self.app,
|
||||
host=self.host,
|
||||
port=self.port)
|
||||
port=self.port,
|
||||
workers=self.workers)
|
||||
|
||||
def _get_manager(self):
|
||||
"""Initialize a Manager object appropriate for this service.
|
||||
|
||||
@@ -366,5 +366,5 @@ def run():
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
eventlet.monkey_patch()
|
||||
eventlet.monkey_patch(os=False)
|
||||
run()
|
||||
|
||||
@@ -0,0 +1,186 @@
|
||||
# Copyright (c) 2012 Intel, LLC
|
||||
# Copyright (c) 2012 OpenStack, LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
Test multiprocess enabled EC2/OSAPI_Compute/OSAPI_Volume/Metadata API service.
|
||||
"""
|
||||
import boto
|
||||
from boto.ec2 import regioninfo
|
||||
import os
|
||||
import signal
|
||||
import sys
|
||||
import time
|
||||
|
||||
from nova import flags
|
||||
from nova.log import logging
|
||||
from nova import service
|
||||
from nova.tests.integrated import integrated_helpers
|
||||
|
||||
FLAGS = flags.FLAGS
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class MultiprocessEC2Test(integrated_helpers._IntegratedTestBase):
|
||||
def _start_api_service(self):
|
||||
self.osapi = service.WSGIService("ec2")
|
||||
self.osapi.start()
|
||||
self.auth_url = 'http://%s:%s/services/Cloud' % \
|
||||
(self.osapi.host, self.osapi.port)
|
||||
LOG.warn(self.auth_url)
|
||||
|
||||
def _get_flags(self):
|
||||
f = super(MultiprocessEC2Test, self)._get_flags()
|
||||
f['ec2_workers'] = 2
|
||||
return f
|
||||
|
||||
def test_ec2(self):
|
||||
region = regioninfo.RegionInfo(None, 'test', self.osapi.host)
|
||||
self.ec2 = boto.connect_ec2(
|
||||
aws_access_key_id='fake',
|
||||
aws_secret_access_key='fake',
|
||||
is_secure=False,
|
||||
region=region,
|
||||
host=self.osapi.host,
|
||||
port=self.osapi.port,
|
||||
path='/services/Cloud')
|
||||
result = self.ec2.get_all_regions()
|
||||
self.assertEqual(len(result), 1)
|
||||
|
||||
|
||||
class MultiprocessMetadataTest(integrated_helpers._IntegratedTestBase):
|
||||
def _start_api_service(self):
|
||||
self.osapi = service.WSGIService("metadata")
|
||||
self.osapi.start()
|
||||
self.auth_url = 'http://%s:%s/' % (self.osapi.host, self.osapi.port)
|
||||
LOG.warn(self.auth_url)
|
||||
|
||||
def _get_flags(self):
|
||||
f = super(MultiprocessMetadataTest, self)._get_flags()
|
||||
f['metadata_workers'] = 2
|
||||
return f
|
||||
|
||||
def request(self, relative_url):
|
||||
return self.api.api_get(relative_url)
|
||||
|
||||
def test_meta(self):
|
||||
userdata_url = self.auth_url + '/user-data'
|
||||
resp = self.api.request(userdata_url)
|
||||
self.assertEqual(resp.status, 200)
|
||||
|
||||
|
||||
class MultiprocessOSAPIComputeTest(integrated_helpers._IntegratedTestBase):
|
||||
def _start_api_service(self):
|
||||
self.osapi = service.WSGIService("osapi_compute")
|
||||
self.osapi.start()
|
||||
self.auth_url = 'http://%s:%s/v2' % (self.osapi.host, self.osapi.port)
|
||||
LOG.warn(self.auth_url)
|
||||
|
||||
def _get_flags(self):
|
||||
f = super(MultiprocessOSAPIComputeTest, self)._get_flags()
|
||||
f['osapi_compute_workers'] = 2
|
||||
return f
|
||||
|
||||
def test_osapi_compute(self):
|
||||
flavors = self.api.get_flavors()
|
||||
self.assertTrue(len(flavors) > 0, 'Num of flavors > 0.')
|
||||
|
||||
|
||||
class MultiprocessOSAPIVolumesTest(integrated_helpers._IntegratedTestBase):
|
||||
def _start_api_service(self):
|
||||
self.osapi = service.WSGIService("osapi_volume")
|
||||
self.osapi.start()
|
||||
self.auth_url = 'http://%s:%s/v1' % (self.osapi.host, self.osapi.port)
|
||||
LOG.warn(self.auth_url)
|
||||
|
||||
def _get_flags(self):
|
||||
f = super(MultiprocessOSAPIVolumesTest, self)._get_flags()
|
||||
f['osapi_volume_workers'] = 2
|
||||
f['use_local_volumes'] = False # Avoids calling local_path
|
||||
f['volume_driver'] = 'nova.volume.driver.LoggingVolumeDriver'
|
||||
return f
|
||||
|
||||
def test_create_volumes(self):
|
||||
"""Create Volume with API"""
|
||||
body = {'volume': {'size': 1,
|
||||
'snapshot_id': None,
|
||||
'display_name': None,
|
||||
'display_description': None,
|
||||
'volume_type': None}}
|
||||
created_volume = self.api.post_volume(body)
|
||||
self.assertTrue(created_volume['id'])
|
||||
|
||||
|
||||
class MultiprocessWSGITest(integrated_helpers._IntegratedTestBase):
|
||||
def setUp(self):
|
||||
self.workers = 4
|
||||
super(MultiprocessWSGITest, self).setUp()
|
||||
|
||||
def _start_api_service(self):
|
||||
self.osapi = service.WSGIService("osapi_compute")
|
||||
self.osapi.start()
|
||||
self.master_worker_pid = self.osapi.server.master_worker.pid
|
||||
LOG.warn('Master_work pid is: %d' % self.master_worker_pid)
|
||||
self.auth_url = 'http://%s:%s/v2' % (self.osapi.host, self.osapi.port)
|
||||
LOG.warn(self.auth_url)
|
||||
|
||||
def _get_flags(self):
|
||||
f = super(MultiprocessWSGITest, self)._get_flags()
|
||||
f['osapi_compute_workers'] = self.workers
|
||||
return f
|
||||
|
||||
def test_killed_worker_recover(self):
|
||||
# kill one worker and check if new worker can come up
|
||||
f = os.popen('ps --no-headers --ppid %d' % self.master_worker_pid)
|
||||
children_pid = f.readline().split()
|
||||
for pid in children_pid:
|
||||
LOG.warn('pid of first child is %s' % pid)
|
||||
if pid.isdigit():
|
||||
os.kill(int(pid), signal.SIGTERM)
|
||||
break
|
||||
else:
|
||||
continue
|
||||
# wait for new worker
|
||||
time.sleep(1.5)
|
||||
f = os.popen('ps --no-headers --ppid %d|wc -l' %
|
||||
self.master_worker_pid)
|
||||
workers = f.readline()
|
||||
LOG.warn('# of workers: %s' % workers)
|
||||
self.assertEqual(int(workers), self.workers,
|
||||
'Num of children = %d.' % self.workers)
|
||||
flavors = self.api.get_flavors()
|
||||
# check if api service works
|
||||
self.assertTrue(len(flavors) > 0, 'Num of flavors > 0.')
|
||||
|
||||
def test_terminate_api_with_signal(self):
|
||||
# check if api service is working
|
||||
flavors = self.api.get_flavors()
|
||||
self.assertTrue(len(flavors) > 0, 'Num of flavors > 0.')
|
||||
# send SIGTERM to master_worker will terminate service
|
||||
os.kill(self.master_worker_pid, signal.SIGTERM)
|
||||
time.sleep(1.5)
|
||||
# check if service is still available (shouldn't be)
|
||||
#"""
|
||||
try:
|
||||
self.api.get_flavors()
|
||||
self.fail('API service should have been terminated')
|
||||
except Exception as ex:
|
||||
exc_value = sys.exc_info()[1]
|
||||
self.assertTrue('Connection refused' in exc_value or
|
||||
'ECONNREFUSED' in exc_value)
|
||||
#"""
|
||||
#self.api.get_flavors()
|
||||
# check there is no OS processes left over
|
||||
f = os.popen('ps --no-headers --ppid %d' % self.master_worker_pid)
|
||||
self.assertEqual(f.readline(), '', 'No OS processes left.')
|
||||
@@ -63,6 +63,7 @@ LOG = logging.getLogger(__name__)
|
||||
ISO_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S"
|
||||
PERFECT_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%f"
|
||||
FLAGS = flags.FLAGS
|
||||
RESEED = True
|
||||
|
||||
FLAGS.register_opt(
|
||||
cfg.BoolOpt('disable_process_locking', default=False,
|
||||
@@ -290,6 +291,13 @@ def debug(arg):
|
||||
|
||||
|
||||
def generate_uid(topic, size=8):
|
||||
global RESEED
|
||||
if RESEED:
|
||||
random.seed("%d%s%s" % (os.getpid(),
|
||||
socket.gethostname(),
|
||||
time.time()))
|
||||
RESEED = False
|
||||
|
||||
characters = '01234567890abcdefghijklmnopqrstuvwxyz'
|
||||
choices = [random.choice(characters) for _x in xrange(size)]
|
||||
return '%s-%s' % (topic, ''.join(choices))
|
||||
|
||||
+154
-12
@@ -19,12 +19,16 @@
|
||||
|
||||
"""Utility methods for working with WSGI servers."""
|
||||
|
||||
import os.path
|
||||
import errno
|
||||
import os
|
||||
import signal
|
||||
import sys
|
||||
|
||||
import eventlet
|
||||
from eventlet.green import socket
|
||||
import eventlet.wsgi
|
||||
import greenlet
|
||||
import multiprocessing
|
||||
from paste import deploy
|
||||
import routes.middleware
|
||||
import webob.dec
|
||||
@@ -45,14 +49,15 @@ class Server(object):
|
||||
|
||||
default_pool_size = 1000
|
||||
|
||||
def __init__(self, name, app, host=None, port=None, pool_size=None,
|
||||
protocol=eventlet.wsgi.HttpProtocol):
|
||||
def __init__(self, name, app, host=None, port=None, workers=None,
|
||||
pool_size=None, protocol=eventlet.wsgi.HttpProtocol):
|
||||
"""Initialize, but do not start, a WSGI server.
|
||||
|
||||
:param name: Pretty name for logging.
|
||||
:param app: The WSGI application to serve.
|
||||
:param host: IP address to serve the application.
|
||||
:param port: Port number to server the application.
|
||||
:param workers: Number of process to spawn concurrently
|
||||
:param pool_size: Maximum number of eventlets to spawn concurrently.
|
||||
:returns: None
|
||||
|
||||
@@ -61,12 +66,17 @@ class Server(object):
|
||||
self.app = app
|
||||
self.host = host or "0.0.0.0"
|
||||
self.port = port or 0
|
||||
self.workers = workers or 0
|
||||
self._server = None
|
||||
self._socket = None
|
||||
self._protocol = protocol
|
||||
self._pool = eventlet.GreenPool(pool_size or self.default_pool_size)
|
||||
self._logger = logging.getLogger("eventlet.wsgi.server")
|
||||
self._pool_size = pool_size or self.default_pool_size
|
||||
self._pool = None
|
||||
self._logger = logging.getLogger("nova.%s.wsgi.server" % self.name)
|
||||
self._wsgi_logger = logging.WritableLogger(self._logger)
|
||||
self.master_worker = None
|
||||
self.children = []
|
||||
self.running = True
|
||||
|
||||
def _start(self):
|
||||
"""Run the blocking eventlet WSGI server.
|
||||
@@ -90,11 +100,124 @@ class Server(object):
|
||||
"""
|
||||
if backlog < 1:
|
||||
raise exception.InvalidInput(
|
||||
reason='The backlog must be more than 1')
|
||||
self._socket = eventlet.listen((self.host, self.port), backlog=backlog)
|
||||
self._server = eventlet.spawn(self._start)
|
||||
(self.host, self.port) = self._socket.getsockname()
|
||||
LOG.info(_("Started %(name)s on %(host)s:%(port)s") % self.__dict__)
|
||||
reason='The backlog must be more than 1')
|
||||
|
||||
try:
|
||||
self._socket = eventlet.listen((self.host, self.port),
|
||||
backlog=backlog)
|
||||
(self.host, self.port) = self._socket.getsockname()
|
||||
except socket.error, err:
|
||||
if err[0] != errno.EINVAL:
|
||||
raise
|
||||
|
||||
if self.workers == 0:
|
||||
# single process mode, useful for profiling, test, debug etc.
|
||||
self._pool = eventlet.GreenPool(self._pool_size)
|
||||
self._server = self._pool.spawn(self._start)
|
||||
LOG.info(_("Started %(name)s on %(host)s:%(port)s") %
|
||||
self.__dict__)
|
||||
return None
|
||||
|
||||
# master_worker doesn't actually do work (i.e. handle API request)
|
||||
# but it's a managing process to handle signal/termination for
|
||||
# this type of API service, only needed if workers > 1
|
||||
self.master_worker = multiprocessing.Process(target=self.run_workers,
|
||||
args=())
|
||||
self.master_worker.start()
|
||||
self._logger.info(_("Started %(name)s on %(host)s:%(port)s in process")
|
||||
% self.__dict__)
|
||||
return None
|
||||
|
||||
def run_server_in_process(self):
|
||||
"""Run a WSGI server."""
|
||||
eventlet.wsgi.HttpProtocol.default_request_version = "HTTP/1.0"
|
||||
eventlet.hubs.use_hub('poll')
|
||||
eventlet.patcher.monkey_patch(all=False, socket=True)
|
||||
|
||||
self._pool = eventlet.GreenPool(size=self._pool_size)
|
||||
try:
|
||||
self._pool.spawn_n(self._start)
|
||||
except socket.error, err:
|
||||
if err[0] != errno.EINVAL:
|
||||
raise
|
||||
|
||||
self._pool.waitall()
|
||||
|
||||
def run_workers(self):
|
||||
"""Start workers and wait for them to join"""
|
||||
def kill_children(*args):
|
||||
"""Kills the entire process group."""
|
||||
#TODO(zhiteng) Gracefully kill all eventlet greenthread
|
||||
self._logger.error(_('SIGTERM or SIGINT received'))
|
||||
signal.signal(signal.SIGTERM, signal.SIG_IGN)
|
||||
signal.signal(signal.SIGINT, signal.SIG_IGN)
|
||||
self.running = False
|
||||
for pid in self.children:
|
||||
os.kill(pid, signal.SIGTERM)
|
||||
|
||||
def hup(*args):
|
||||
"""
|
||||
Shuts down the server, but allows running requests to complete
|
||||
"""
|
||||
self._logger.error(_('SIGHUP received'))
|
||||
signal.signal(signal.SIGHUP, signal.SIG_IGN)
|
||||
self.running = False
|
||||
|
||||
signal.signal(signal.SIGTERM, kill_children)
|
||||
signal.signal(signal.SIGINT, kill_children)
|
||||
signal.signal(signal.SIGHUP, hup)
|
||||
|
||||
while len(self.children) < self.workers:
|
||||
self.run_child()
|
||||
|
||||
self._logger.info(_("Started %(children_count)d worker for %(name)s")
|
||||
% {'children_count': len(self.children),
|
||||
'name': self.name})
|
||||
|
||||
self.wait_on_children()
|
||||
|
||||
def run_child(self):
|
||||
try:
|
||||
pid = os.fork()
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
if pid == 0:
|
||||
signal.signal(signal.SIGHUP, signal.SIG_DFL)
|
||||
signal.signal(signal.SIGTERM, signal.SIG_DFL)
|
||||
# to avoid race condition that child receive signal before
|
||||
# parent and is respawned
|
||||
signal.signal(signal.SIGINT, signal.SIG_IGN)
|
||||
try:
|
||||
self.run_server_in_process()
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
self._logger.info(_('Child %d exiting normally') % os.getpid())
|
||||
return None
|
||||
else:
|
||||
self._logger.info(_('[%(name)s] Started worker (pid: %(pid)s)') %
|
||||
{'name': self.name,
|
||||
'pid': pid})
|
||||
self.children.append(pid)
|
||||
|
||||
def wait_on_children(self):
|
||||
while self.running:
|
||||
try:
|
||||
pid, status = os.wait()
|
||||
if os.WIFEXITED(status) or os.WIFSIGNALED(status):
|
||||
self._logger.error(_('Dead worker %(pid)s') % locals())
|
||||
if pid in self.children:
|
||||
self.children.remove(pid)
|
||||
self.run_child()
|
||||
except OSError, err:
|
||||
if err.errno not in (errno.EINTR, errno.ECHILD):
|
||||
raise
|
||||
except KeyboardInterrupt:
|
||||
self._logger.info(_('Caught keyboard interrupt. Exiting.'))
|
||||
self.running = False
|
||||
break
|
||||
eventlet.greenio.shutdown_safe(self._socket)
|
||||
self._socket.close()
|
||||
self._logger.debug(_('Exited'))
|
||||
|
||||
def stop(self):
|
||||
"""Stop this server.
|
||||
@@ -106,7 +229,19 @@ class Server(object):
|
||||
|
||||
"""
|
||||
LOG.info(_("Stopping WSGI server."))
|
||||
self._server.kill()
|
||||
if self.workers > 0:
|
||||
# set running state to false and kill all workers
|
||||
self.running = False
|
||||
for pid in self.children:
|
||||
os.kill(pid, signal.SIGTERM)
|
||||
self.children.remove(pid)
|
||||
# now terminate master_worker
|
||||
if self.master_worker.is_alive():
|
||||
self.master_worker.terminate()
|
||||
else:
|
||||
# Resize Pool to stop accepting new connection
|
||||
self._pool.resize(0)
|
||||
self._server.kill()
|
||||
|
||||
def wait(self):
|
||||
"""Block, until the server has stopped.
|
||||
@@ -117,9 +252,16 @@ class Server(object):
|
||||
|
||||
"""
|
||||
try:
|
||||
self._server.wait()
|
||||
if self.workers and self.master_worker:
|
||||
# for services enabled multi-process,a separate master_worker
|
||||
# is already waiting
|
||||
pass
|
||||
else:
|
||||
self._pool.waitall()
|
||||
except greenlet.GreenletExit:
|
||||
LOG.info(_("WSGI server has stopped."))
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
|
||||
|
||||
class Request(webob.Request):
|
||||
|
||||
Reference in New Issue
Block a user