ceilometer-agent-compute 代码分析
说明:本文基于ceilometer mitaka版本代码分析
ceilometer是openstack项目中负责计量的组件,他会通过不同的方式从系统中收集必要的计量数据。其收集数据主要有两种方式:
- Poller:通过在不同的节点上部署agent,按照预定义的规则轮询获取系统中的计量信息,这里Poller的方式主要依赖两个agent来获取不同的数据,其中为:
- agent-compute:部署在每个计算节点,通过image的driver来轮询获取资源使用的统计数据
- agent-central:部署在管理节点,通过轮询调用各个组件(nova/neutron/cinder …)的api来获取各个组件的资源使用数据
- Notification:对应的是ceilometer-collector,运行在一个或者多个管理节点,它会监控各个组件的消息队列,队列中的通知信息会被它处理和转换为计量信息,再发回到消息系统中。
接下来我们首先来分析agent-compute的服务启动和代码运行机制
代码整体结构
代码入口
在setup.cfg中有如下的内容:
ceilometer-polling = ceilometer.cmd.polling:main
这说明ceilometer-polling的代码入口在 ceilometer/cmd/polling.py 的 main()函数
ceilometer/cmd/polling.py
def main():
service.prepare_service()
os_service.launch(CONF, manager.AgentManager(CONF.polling_namespaces,
CONF.pollster_list)).wait()
说明:main函数中主要做了如下几件事
- service.prepare_service() :预处理,主要是初始化一些参数,设置日志级别
- manger.AgentManager() 生成agentManager 实例
- 对于agent-compute来说,CONF.polling_namespaces = “compute”, CONF.pollster_list=[]
- 调用os_service.launch()方法加载并启动实例
- 之后调用wait()方法等待进程结束
agent-compute的核心就在于 AgentManager()的实例化
代码详解:
service.prepare_service()
def prepare_service(argv=None, config_files=None):
oslo_i18n.enable_lazy()
log.register_options(cfg.CONF)
log_levels = (cfg.CONF.default_log_levels +
['stevedore=INFO', 'keystoneclient=INFO',
'neutronclient=INFO'])
log.set_defaults(default_log_levels=log_levels)
defaults.set_cors_middleware_defaults()
if argv is None:
argv = sys.argv
cfg.CONF(argv[1:], project='ceilometer', validate_default_values=True,
version=version.version_info.version_string(),
default_config_files=config_files)
keystone_client.setup_keystoneauth(cfg.CONF)
log.setup(cfg.CONF, 'ceilometer')
# NOTE(liusheng): guru cannot run with service under apache daemon, so when
# ceilometer-api running with mod_wsgi, the argv is [], we don't start
# guru.
if argv:
gmr.TextGuruMeditation.setup_autorun(version)
messaging.setup()
主要功能:
- 初始化预定义的参数,主要设定了host,http_timeout, workers参数
- 设置日志级别
- 加载指定的配置文件
AgentManager()实例化过程
代码入口:
ceilometer/agent/manager.py
class AgentManager(service_base.BaseService):
def __init__(self, namespaces=None, pollster_list=None):
namespaces = namespaces or ['compute', 'central']
pollster_list = pollster_list or []
group_prefix = cfg.CONF.polling.partitioning_group_prefix
# features of using coordination and pollster-list are exclusive, and
# cannot be used at one moment to avoid both samples duplication and
# samples being lost
if pollster_list and cfg.CONF.coordination.backend_url:
raise PollsterListForbidden()
super(AgentManager, self).__init__()
说明:
- namespaces = [‘compute’]
- pollster_list = []
- group_prefix = ?
- 调用父类的init方法进行初始化,父类的 init 方法中是创建了线程池,默认大小为1000,线程池赋值到了 self.tg
class Service(ServiceBase):
"""Service object for binaries running on hosts."""
def __init__(self, threads=1000):
self.tg = threadgroup.ThreadGroup(threads)
加载extension:根据ceilometer.poll.compute 获取所有的插件列表
ceilometer/agent/manager.py
# we'll have default ['compute', 'central'] here if no namespaces will
# be passed
extensions = (self._extensions('poll', namespace).extensions
for namespace in namespaces)
def _extensions(self, category, agent_ns=None):
namespace = ('ceilometer.%s.%s' % (category, agent_ns) if agent_ns
else 'ceilometer.%s' % category)
return self._get_ext_mgr(namespace)
说明:
- 调用self._extensions(),传递两个参数:poll,compute
- self._extensions(),根据传递进来的参数,组装出namespace = ceilometer.poll.compute
- 调用 self._get_ext_mar(),传递进去 ceilometer.poll.compute这个namespace
@staticmethod
def _get_ext_mgr(namespace):
def _catch_extension_load_error(mgr, ep, exc):
# Extension raising ExtensionLoadError can be ignored,
# and ignore anything we can't import as a safety measure.
if isinstance(exc, plugin_base.ExtensionLoadError):
LOG.exception(_("Skip loading extension for %s") % ep.name)
return
if isinstance(exc, ImportError):
LOG.error(_("Failed to import extension for %(name)s: "
"%(error)s"),
{'name': ep.name, 'error': exc})
return
raise exc
return extension.ExtensionManager(
namespace=namespace,
invoke_on_load=True,
on_load_failure_callback=_catch_extension_load_error,
)
setup.cfg
ceilometer.poll.compute =
disk.read.requests = ceilometer.compute.pollsters.disk:ReadRequestsPollster
disk.write.requests = ceilometer.compute.pollsters.disk:WriteRequestsPollster
disk.read.bytes = ceilometer.compute.pollsters.disk:ReadBytesPollster
说明:
- _get_ext_mgr(),根据传递进来的namespace,调用 extension.ExtensionManager() 并返回最终的实例
- extension是stevedore这个公共库实现的
- namespace: ceilometer.poll.compute 对应的定义在 setup.cfg 中,对应的是compute支持的各项计量指标的采集入口,如上。最终各项指标都是通过以上插件来实现获取的。接下来先继续分析插件的加载过程
site-packages/stevedore/extension.py
def __init__(self, namespace,
invoke_on_load=False,
invoke_args=(),
invoke_kwds={},
propagate_map_exceptions=False,
on_load_failure_callback=None,
verify_requirements=False):
self._init_attributes(
namespace,
propagate_map_exceptions=propagate_map_exceptions,
on_load_failure_callback=on_load_failure_callback)
extensions = self._load_plugins(invoke_on_load,
invoke_args,
invoke_kwds,
verify_requirements)
self._init_plugins(extensions)
说明:extension.ExtensionManager()的 init 方法中做了三件事:
- 调用 __init_attributes, 初始化了三个参数,分别赋值给如下内容,这里重点是namespace = ceilometer.poll.compute
self.namespace = namespace
self.propagate_map_exceptions = propagate_map_exceptions
self._on_load_failure_callback = on_load_failure_callback
- 调用self._load_plugins加载对应namespace下的各种插件
- 调用 self._init_plugins 将上一步load完成的extensions赋值给 self.extensions
def _load_plugins(self, invoke_on_load, invoke_args, invoke_kwds,
verify_requirements):
extensions = []
for ep in self._find_entry_points(self.namespace):
LOG.debug('found extension %r', ep)
try:
ext = self._load_one_plugin(ep,
invoke_on_load,
invoke_args,
invoke_kwds,
verify_requirements,
)
if ext:
extensions.append(ext)
except (KeyboardInterrupt, AssertionError):
raise
except Exception as err:
if self._on_load_failure_callback is not None:
self._on_load_failure_callback(self, ep, err)
else:
# Log the reason we couldn't import the module,
# usually without a traceback. The most common
# reason is an ImportError due to a missing
# dependency, and the error message should be
# enough to debug that. If debug logging is
# enabled for our logger, provide the full
# traceback.
LOG.error('Could not load %r: %s', ep.name, err,
exc_info=LOG.isEnabledFor(logging.DEBUG))
return extensions
def _load_one_plugin(self, ep, invoke_on_load, invoke_args, invoke_kwds,
verify_requirements):
# NOTE(dhellmann): Using require=False is deprecated in
# setuptools 11.3.
if hasattr(ep, 'resolve') and hasattr(ep, 'require'):
if verify_requirements:
ep.require()
plugin = ep.resolve()
else:
plugin = ep.load(require=verify_requirements)
if invoke_on_load:
obj = plugin(*invoke_args, **invoke_kwds)
else:
obj = None
return Extension(ep.name, ep, plugin, obj)
说明:以上两个方法是整个插件的加载的关键过程
- for ep in self._find_entry_points(self.namespace): 这一步实际调用的是pkg_resources,在ceilometer的egg包中的 entry_points.txt 文件中找到 ceilometer.poll.compute 的详细定义。不同的操作系统entry_points.txt 文件的具体路径会有差异,但是都是在类似的 Python2.7/site-packages/xxx.dist-info 目录中
- 遍历通过ceilometer.poll.compute找到的插件列表,对每个插件调用self._load_one_plugin()方法
- 比如对于第一个获取到的插件:
disk.read.requests = ceilometer.compute.pollsters.disk:ReadRequestsPollster
- 进入 _load_one_plugin(),实际会调用 plugin = ep.load()方法加载对应插件的类,之后在实际遍历循环的过程就会调用这个插件的方法,具体在后边分析。最终返回一个 Extesion()对象,ep.name=disk.read.requests, plugin=对应的插件类实例
- 在_load_one_plugin()返回一个extension实例之后,就会执行 extensions.append(ext),将对应的实例append
说明:插件返回流程:
- _load_plugins:返回 extensions = [] , extensions中是一个一个插件实例
- ExtensionManager获取到 _load_plugins的返回值,extensions = self._load_plugins()
- ExtensionManager调用 self._init_plugin()将 extensions赋值给 self.extensions = extensions
- _get_ext_mgr返回的是 ExtensionManager()的结果给 _extensions 方法
- 最终到 AgentManager的 __init__函数中,AgentManager的成员变量就得到了初始化:
extensions = (self._extensions('poll', namespace).extensions
for namespace in namespaces)
至此,AgentManager的 extensions 变量就得到了初始化,对应的是 ceilometer.poll.compute 下的各个插件的实例
加载extensions_fb:
ceilometer/agent/manager.py
# get the extensions from pollster builder
extensions_fb = (self._extensions_from_builder('poll', namespace)
for namespace in namespaces)
说明:
- 调用 self._extensions_from_builder 传递两个参数:poll,compute
def _extensions_from_builder(self, category, agent_ns=None):
ns = ('ceilometer.builder.%s.%s' % (category, agent_ns) if agent_ns
else 'ceilometer.builder.%s' % category)
mgr = self._get_ext_mgr(ns)
def _build(ext):
return ext.plugin.get_pollsters_extensions()
# NOTE: this seems a stevedore bug. if no extensions are found,
# map will raise runtimeError which is not documented.
if mgr.names():
return list(itertools.chain(*mgr.map(_build)))
else:
return []
说明:
- ns赋值,这里对应的是 ceilometer.builder.poll.compute
- 调用 self._get_ext_mgr(ns), 这一步和上边加载extensions一样,根据namespace加载对应的extensions,如果有的话,最终返回的就是 ExtensionsManager实例
- 对于compute来说,这里对应的namespace在setup.cfg中没有定义,那么 extensions_fb = [] ,是一个空列表,我们就先不深入了
extensions赋值
self.extensions = list(itertools.chain(*list(extensions))) + list(
itertools.chain(*list(extensions_fb)))
说明:
- 以上加载完毕的extensions和extensions_fb都会赋值给 self.extensions
加载discovery_manager
ceilometer/agent/manager.py
self.discovery_manager = self._extensions('discover')
def _extensions(self, category, agent_ns=None):
namespace = ('ceilometer.%s.%s' % (category, agent_ns) if agent_ns
else 'ceilometer.%s' % category)
return self._get_ext_mgr(namespace)
ceilometer.discover =
local_instances = ceilometer.compute.discovery:InstanceDiscovery
endpoint = ceilometer.agent.discovery.endpoint:EndpointDiscovery
tenant = ceilometer.agent.discovery.tenant:TenantDiscovery
说明:
- 调用self._extensions(“discover”),传递的参数只有一个:discover
- _extensions()组装的namespace=ceilometer.discover
- 之后调用 self._get_ext_mgr()加载ceilometer.discover这个namespace对应的所有插件,具体的过程不再赘述
- 最终self.discovery_manager 得到的就是 ceilometer.discover这个namespace下所有插件的实例
notification初始化-transport获取
ceilometer/agent/manager.py
self.notifier = oslo_messaging.Notifier(
messaging.get_transport(),
driver=cfg.CONF.publisher_notifier.telemetry_driver,
publisher_id="ceilometer.polling")
说明:
- self.notifier 是oslo_messaging.Notifier对应的一个实例,这里初始化的时候传递了三个参数:
- 第一个参数:messaging.get_transport()
- 第二个参数:driver = messagingv2
- 第三个参数:publisher_id = “ceilometer.polling”
- 接下来主要来继续分析一下 get_transport()
ceilometer/messaging.py
def get_transport(url=None, optional=False, cache=True):
"""Initialise the oslo_messaging layer."""
global TRANSPORTS, DEFAULT_URL
cache_key = url or DEFAULT_URL
transport = TRANSPORTS.get(cache_key)
if not transport or not cache:
try:
transport = oslo_messaging.get_transport(cfg.CONF, url)
except (oslo_messaging.InvalidTransportURL,
oslo_messaging.DriverLoadFailure):
if not optional or url:
# NOTE(sileht): oslo_messaging is configured but unloadable
# so reraise the exception
raise
return None
else:
if cache:
TRANSPORTS[cache_key] = transport
return transport
说明:
- transport = None,cache = None, url=None
- transport = oslo_messaging.get_transport(cfg.CONF,url) 调用加载transport
site-packages/oslo_messaging/transport.py
def get_transport(conf, url=None, allowed_remote_exmods=None, aliases=None):
allowed_remote_exmods = allowed_remote_exmods or []
conf.register_opts(_transport_opts)
if not isinstance(url, TransportURL):
url = TransportURL.parse(conf, url, aliases)
kwargs = dict(default_exchange=conf.control_exchange,
allowed_remote_exmods=allowed_remote_exmods)
try:
mgr = driver.DriverManager('oslo.messaging.drivers',
url.transport.split('+')[0],
invoke_on_load=True,
invoke_args=[conf, url],
invoke_kwds=kwargs)
except RuntimeError as ex:
raise DriverLoadFailure(url.transport, ex)
return Transport(mgr.driver)
[oslo.messaging.drivers]
amqp = oslo_messaging._drivers.impl_amqp1:ProtonDriver
fake = oslo_messaging._drivers.impl_fake:FakeDriver
kafka = oslo_messaging._drivers.impl_kafka:KafkaDriver
kombu = oslo_messaging._drivers.impl_rabbit:RabbitDriver
pika = oslo_messaging._drivers.impl_pika:PikaDriver
rabbit = oslo_messaging._drivers.impl_rabbit:RabbitDriver
zmq = oslo_messaging._drivers.impl_zmq:ZmqDriver
说明:
- url = rabbit
- mgr = driver.DriverManager() 主要是两个参数,第一个参数是一个namespace: oslo.messaging.drivers,第二个参数是 rabbit
- mgr最终是DriverManager的一个实例,其中oslo.messaging.drivers
site-packages/stevedore/driver.py
def __init__(self, namespace, name,
invoke_on_load=False, invoke_args=(), invoke_kwds={},
on_load_failure_callback=None,
verify_requirements=False):
on_load_failure_callback = on_load_failure_callback \
or self._default_on_load_failure
super(DriverManager, self).__init__(
namespace=namespace,
names=[name],
invoke_on_load=invoke_on_load,
invoke_args=invoke_args,
invoke_kwds=invoke_kwds,
on_load_failure_callback=on_load_failure_callback,
verify_requirements=verify_requirements,
)
说明:
- 生成 DriverManager实例,首先调用父类的 __init__方法,其中第一个参数为 oslo.messaging.drivers,第二个参数为 rabbit
- 代表加载的是 rabbit的driver
site-packages/stevedore/named.py
def __init__(self, namespace, names,
invoke_on_load=False, invoke_args=(), invoke_kwds={},
name_order=False, propagate_map_exceptions=False,
on_load_failure_callback=None,
on_missing_entrypoints_callback=None,
verify_requirements=False,
warn_on_missing_entrypoint=True):
self._init_attributes(
namespace, names, name_order=name_order,
propagate_map_exceptions=propagate_map_exceptions,
on_load_failure_callback=on_load_failure_callback)
extensions = self._load_plugins(invoke_on_load,
invoke_args,
invoke_kwds,
verify_requirements)
self._missing_names = set(names) - set([e.name for e in extensions])
if self._missing_names:
if on_missing_entrypoints_callback:
on_missing_entrypoints_callback(self._missing_names)
elif warn_on_missing_entrypoint:
LOG.warning('Could not load %s' %
', '.join(self._missing_names))
self._init_plugins(extensions)
def _init_attributes(self, namespace, names, name_order=False,
propagate_map_exceptions=False,
on_load_failure_callback=None):
super(NamedExtensionManager, self)._init_attributes(
namespace, propagate_map_exceptions=propagate_map_exceptions,
on_load_failure_callback=on_load_failure_callback)
self._names = names
self._missing_names = set()
self._name_order = name_order
def _init_attributes(self, namespace, propagate_map_exceptions=False,
on_load_failure_callback=None):
self.namespace = namespace
self.propagate_map_exceptions = propagate_map_exceptions
self._on_load_failure_callback = on_load_failure_callback
说明:
- 首先执行 self.__init_attrivutes, 初始化几个重要参数,
- self.names = rabbit
- self.namespace = oslo.messaging.drivers
- 调用 self._load_plugins() 加载 oslo.messaging.drivers 对应的 rabbit的driver:rabbit = oslo_messaging._drivers.impl_rabbit:RabbitDriver,加载到extensions这个字段中
- 调用self._init_plugin将extensions初始化赋值给 self.extensions
说明:返回流程:
-
NamedExtensionManager 向上返回一个 NamedExtensionManager()实例,其中 self.extensions 对应的是rabbit 的Driver
-
get_transport()函数中:mgr 对应的就是上边加载到的rabbit的Driver实例,向上返回一个 TransPort对象
mgr = driver.DriverManager('oslo.messaging.drivers',
url.transport.split('+')[0],
invoke_on_load=True,
invoke_args=[conf, url],
invoke_kwds=kwargs)
return Transport(mgr.driver)
- 在Transport对象的实例化过程,self._driver = driver 对应的就是上边加载到的rabbit 的driver
def __init__(self, driver):
self.conf = driver.conf
self._driver = driver
- 在ceilometer/messaging.py的 get_transport中 transport = oslo_messaging.get_transport(cfg.CONF, url), transport对应的就是上文加载到的rabbit的driver
- 至此,Notifier初始化过程中的第一个参数就生成了
接下来继续分析Notifier的初始化过程
Notifer初始化
ceilometer/agent/manager.py
self.notifier = oslo_messaging.Notifier(
messaging.get_transport(),
driver=cfg.CONF.publisher_notifier.telemetry_driver,
publisher_id="ceilometer.polling")
说明:
- 参数一:对应的是rabbit的driver
- 参数二:对应的messagingv2
site_packages/oslo_messaging/notify/notifier.py
def __init__(self, transport, publisher_id=None,
driver=None, topic=None,
serializer=None, retry=None,
topics=None):
conf = transport.conf
conf.register_opts(_notifier_opts,
group='oslo_messaging_notifications')
self.transport = transport
self.publisher_id = publisher_id
self.retry = retry
self._driver_names = ([driver] if driver is not None else
conf.oslo_messaging_notifications.driver)
if topics is not None:
self._topics = topics
elif topic is not None:
self._topics = [topic]
else:
self._topics = conf.oslo_messaging_notifications.topics
self._serializer = serializer or msg_serializer.NoOpSerializer()
self._driver_mgr = named.NamedExtensionManager(
'oslo.messaging.notify.drivers',
names=self._driver_names,
invoke_on_load=True,
invoke_args=[conf],
invoke_kwds={
'topics': self._topics,
'transport': self.transport,
}
)
_marker = object()
说明:
- 创建了一个Notify对象,其中transport是rabbit,publisher_id=ceilometer.polling, self._driver_names=messagingv2
- self._driver_mgr 是根据 rabbit 从 oslo.messaging.notify.drivers中加载的rabbit的driver,对应的是:
[oslo.messaging.notify.drivers]
log = oslo_messaging.notify._impl_log:LogDriver
messaging = oslo_messaging.notify.messaging:MessagingDriver
messagingv2 = oslo_messaging.notify.messaging:MessagingV2Driver
noop = oslo_messaging.notify._impl_noop:NoOpDriver
routing = oslo_messaging.notify._impl_routing:RoutingDriver
test = oslo_messaging.notify._impl_test:TestDriver
AgentManager初始化总结
至此,AgentManager初始化的过程就结束了,整个过程中我们可以看到主要做了如下几件事:
- self.extensions赋值: 加载ceilometer.poll.compute 这个namespace下对应的插件,实现指标的具体采集
- self.discovery_manager赋值,加载ceilometer.discover对应的所有插件
- self.notifier 赋值:
- 加载了oslo_messaging下的rabbit driver赋值给 self.transport (对应的是self.notifier.transport)
- 加载oslo_messaging下的 messagingv2 driver,赋值给 self._driver_mgr(对应的是self.notifer._driver_mgr)
- 赋值,self.publisher_id = ceilometer.polling (对应的是self.notifier.publisher_id)
服务启动过程
ceilometer/cmd/polling.py
os_service.launch(CONF, manager.AgentManager(CONF.polling_namespaces,
CONF.pollster_list)).wait()
说明:
- 服务启动,调用的是os_service对应的是公共库 oslo_service
- launch 用来启动AgentManager实例
接下来就继续来分析launch的过程
服务启动的入口:
site-packages/oslo_service/service.py
def launch(conf, service, workers=1, restart_method='reload'):
if workers is not None and workers <= 0:
raise ValueError(_("Number of workers should be positive!"))
if workers is None or workers == 1:
launcher = ServiceLauncher(conf, restart_method=restart_method)
else:
launcher = ProcessLauncher(conf, restart_method=restart_method)
launcher.launch_service(service, workers=workers)
return launcher
说明:
- 根据workers的取值,选择不同的启动方式
- agent-compute的workers=1,对应的是通过launcher = ServiceLauncher()实例化
- launcher.launch_service 启动服务
launcher实例化过程
site-packages/oslo_service/service.py
class ServiceLauncher(Launcher):
"""Runs one or more service in a parent process."""
def __init__(self, conf, restart_method='reload'):
"""Constructor.
:param conf: an instance of ConfigOpts
:param restart_method: passed to super
"""
super(ServiceLauncher, self).__init__(
conf, restart_method=restart_method)
self.signal_handler = SignalHandler()
说明:
- 调用父类的 init 方法
- 初始化 self.signal_handler ,给信号捕获初始化handle
class Launcher(object):
def __init__(self, conf, restart_method='reload'):
self.conf = conf
conf.register_opts(_options.service_opts)
self.services = Services()
self.backdoor_port = (
eventlet_backdoor.initialize_if_enabled(self.conf))
self.restart_method = restart_method
if restart_method not in _LAUNCHER_RESTART_METHODS:
raise ValueError(_("Invalid restart_method: %s") % restart_method)
说明:
- 配置初始化
- 实例化 Services(),并赋值给 self.services
class Services(object):
def __init__(self):
self.services = []
self.tg = threadgroup.ThreadGroup()
self.done = event.Event()
说明:
- 实例化self.tg 是一个进程池的对象
- 其中self.tg 的具体实例化过程中,创建了一个默认大小为10的进程池并赋值给 self.pool(对应的是self.tg.pool)
至此launcher实例化成功,接下来看launcher启动过程,重点关注:
- self.services 是一个大小为 10 的进程池
launcher启动过程
site-packages/oslo_service/service.py
launcher.launch_service(service, workers=workers)
说明:
- 调用 launch_service 方法进行服务启动,对应传递进来的service就是AgentManager实例
def launch_service(self, service, workers=1):
if workers is not None and workers != 1:
raise ValueError(_("Launcher asked to start multiple workers"))
_check_service_base(service)
service.backdoor_port = self.backdoor_port
self.services.add(service)
说明:这里的关键点是 self.services.add(service)
- 通过add方法,将这个AgentManager实例传递给 services (这个是之前创建的进程池)
class Services(object):
def add(self, service):
self.services.append(service)
self.tg.add_thread(self.run_service, service, self.done)
说明:
- self.services.append(service), 只是做一个记录,知道启动了什么进程
- self.tg.add_thread() 将service传递给 add_thread 方法
- add_thread方法会调用 self.run_service 这个函数
@staticmethod
def run_service(service, done):
try:
service.start()
except Exception:
LOG.exception(_LE('Error starting thread.'))
raise SystemExit(1)
else:
done.wait()
说明:
- 调用service.start()方法来启动服务,对应的就是调用AgentManager对应的start方法
- 执行done.wait(),等待进程退出
至此,进程启动的过程就结束了
服务启动过程总结:
这个过程的关键点:
- launcher实例化:通过调用oslo_service的方法来进行launcher实例化,实例化之后主要关注几个关键变量:
- launcher.services 是一个大小为 10 的进程池
- launcher.done是一个 event.Event()的实例,负责进程退出的相关事宜
- launcher服务启动:通过调用 launcher.launcher_service(),根据配置的worker大小来启动AgentManager实例
AgentManager实例的启动调用的是 AgentManager的 start()方法,接下来我们来继续分析 AgentManager的实例启动过程。
AgentManager 启动过程
ceilometer/agent/manager.py
def start(self):
self.polling_manager = pipeline.setup_polling()
self.partition_coordinator.start()
self.join_partitioning_groups()
self.pollster_timers = self.configure_polling_tasks()
self.init_pipeline_refresh()
说明:
- 根据pipeline.yaml 初始化polling_manager
- ? partition_coordinator.start()
- ? join_partitioning_groups()
- 设置并启动polling_task
pipeline初始化
ceilometer/pipeline.py
def setup_polling():
"""Setup polling manager according to yaml config file."""
cfg_file = cfg.CONF.pipeline_cfg_file
return _setup_polling_manager(cfg_file)
说明:
- cfg_file = pipeline.yaml
- 调用_setup_polling_manager(cfg_file), 生成pipeline
def _setup_polling_manager(cfg_file):
if not os.path.exists(cfg_file):
cfg_file = cfg.CONF.find_file(cfg_file)
LOG.debug("Polling config file: %s", cfg_file)
with open(cfg_file) as fap:
data = fap.read()
pipeline_cfg = yaml.safe_load(data)
LOG.info(_LI("Pipeline config: %s"), pipeline_cfg)
return PollingManager(pipeline_cfg)
说明:
- 打开pipeline.yaml 并通过yaml加载到pipeline_cfg
- 通过PollingManager生成pipeline对象
class PollingManager(object):
def __init__(self, cfg):
self.sources = []
if not ('sources' in cfg and 'sinks' in cfg):
raise PipelineException("Both sources & sinks are required",
cfg)
LOG.info(_LI('detected decoupled pipeline config format'))
unique_names = set()
for s in cfg.get('sources', []):
name = s.get('name')
if name in unique_names:
raise PipelineException("Duplicated source names: %s" %
name, self)
else:
unique_names.add(name)
self.sources.append(SampleSource(s))
unique_names.clear()
说明:
- 初始化所有的sources,每一个sources就是一个SampleSource对象
class SampleSource(Source):
def __init__(self, cfg):
super(SampleSource, self).__init__(cfg)
# Support 'counters' for backward compatibility
self.meters = cfg.get('meters', cfg.get('counters'))
try:
self.interval = int(cfg.get('interval', 600))
except ValueError:
raise PipelineException("Invalid interval value", cfg)
if self.interval <= 0:
raise PipelineException("Interval value should > 0", cfg)
self.resources = cfg.get('resources') or []
if not isinstance(self.resources, list):
raise PipelineException("Resources should be a list", cfg)
self.discovery = cfg.get('discovery') or []
if not isinstance(self.discovery, list):
raise PipelineException("Discovery should be a list", cfg)
self.check_source_filtering(self.meters, 'meters')
说明:
- super的 init 函数中,做了几个变量初始化:
- self.cfg = cfg
- self.name = cfg[‘name’]
- self.sinks = cfg.get(‘sinks’)
如下,一个pipeline.yaml的实例:
- sources中定义了要采集的各项指标(meters), 采集周期(interval),采集的后置处理sinks
- sinks:定义了一类采集项采集到数据知乎的发布方式以及对应到系统中的meter_name, unit, type, scale
---
sources:
- name: cpu_source
interval: 600
meters:
- "cpu"
sinks:
- cpu_sink
- cpu_delta_sink
sinks:
- name: cpu_sink
transformers:
- name: "rate_of_change"
parameters:
target:
name: "cpu_util"
unit: "%"
type: "gauge"
scale: "100.0 / (10**9 * (resource_metadata.cpu_number or 1))"
publishers:
- notifier://
- 初始化meters对象
- 初始化interval,默认给赋值600
- self.resources = [], self.discovery = []
总结:
每一个SampleSources对象主要包含如下几部分内容:
- name: cpu_source
- interval: 600
- meters: [‘cpu’]
- sinks: cpu_sink, cpu_delta_sink
最终 self.polling_manager 就是PollingManager()的一个实例,这个实例中,有一个关键变量,self.sources,而self.sources 是一个[] 集合,每个元素都是一个 SampleSources()对象
? partition_coordinator.start()
TODO:暂时还没搞清楚
? join_partitioning_groups()
TODO: 暂时还没搞清楚
设置并启动polling_task
ceilometer/agent/manager.py
self.pollster_timers = self.configure_polling_tasks()
说明:
- 设置polling_tasks
polling_task 的获取,并赋值给data变量
详细过程
def configure_polling_tasks(self):
# allow time for coordination if necessary
delay_start = self.partition_coordinator.is_active()
# set shuffle time before polling task if necessary
delay_polling_time = random.randint(
0, cfg.CONF.shuffle_time_before_polling_task)
pollster_timers = []
data = self.setup_polling_tasks()
说明:
- 前几步暂时忽略,从 data = self.setup_polling_tasks()开始
- data = self.setup_polling_tasks()
def setup_polling_tasks(self):
polling_tasks = {}
for source in self.polling_manager.sources:
polling_task = None
for pollster in self.extensions:
if source.support_meter(pollster.name):
polling_task = polling_tasks.get(source.get_interval())
if not polling_task:
polling_task = self.create_polling_task()
polling_tasks[source.get_interval()] = polling_task
polling_task.add(pollster, source)
return polling_tasks
说明:
- 遍历polling_manager.sources
- 对每一个sources而言,需要把所有的extensions遍历一遍(setup.cfg 中定义的所有采集项),判断如果对应的extension的name在当前的source中,那么就创建一个polling_task
- 如果这个polling_task 不在 polling_tasks中,则polling_tasks[interval] = polling_task
- polling_task中add进入当前这个source和pollster,这样就给在pipeline.yaml中定义的一个meter找到了他所对应的pollster(extenstion)
- 最终返回polling_tasks
- 接下来继续看polling_task的创建过程
def create_polling_task(self):
"""Create an initially empty polling task."""
return PollingTask(self)
说明:
- 每一个polling_task 都是 PollingTask的一个实例
class PollingTask(object):
def __init__(self, agent_manager):
self.manager = agent_manager
# elements of the Cartesian product of sources X pollsters
# with a common interval
self.pollster_matches = collections.defaultdict(set)
# we relate the static resources and per-source discovery to
# each combination of pollster and matching source
resource_factory = lambda: Resources(agent_manager)
self.resources = collections.defaultdict(resource_factory)
self._batch = cfg.CONF.batch_polled_samples
self._telemetry_secret = cfg.CONF.publisher.telemetry_secret
说明:
- PollingTask的类定义
- 这里有个疑问,self.manager = agent_manager, 这个agent_manager 从哪里来?
总结
- data = self.setup_polling_tasks() 最终获取到的是PollingTask 对象
- data的组织形式是一个hash,poling_tasks, 对应的是:
{
interval1:[polling_task1, polling_task2],
interval2:[polling_task3, polling_task4]
}
polling_tasks 加入定时器
for interval, polling_task in data.items():
delay_time = (interval + delay_polling_time if delay_start
else delay_polling_time)
pollster_timers.append(self.tg.add_timer(interval,
self.interval_task,
initial_delay=delay_time,
task=polling_task))
self.tg.add_timer(cfg.CONF.coordination.heartbeat,
self.partition_coordinator.heartbeat)
return pollster_timers
说明:
- 遍历polling_tasks,按照不同的interval,将对应的polling_task加入不同的定时器队列
self.tg.add_timer(interval,
self.interval_task,
initial_delay=delay_time,
task=polling_task)
说明:
- 调用self.interval_task, 并传入interval变量和polling_task
def interval_task(self, task):
self._keystone = None
self._keystone_last_exception = None
task.poll_and_notify()
def poll_and_notify(self):
"""Polling sample and notify."""
cache = {}
discovery_cache = {}
poll_history = {}
for source_name in self.pollster_matches:
for pollster in self.pollster_matches[source_name]:
key = Resources.key(source_name, pollster)
candidate_res = list(
self.resources[key].get(discovery_cache))
if not candidate_res and pollster.obj.default_discovery:
candidate_res = self.manager.discover(
[pollster.obj.default_discovery], discovery_cache)
# Remove duplicated resources and black resources. Using
# set() requires well defined __hash__ for each resource.
# Since __eq__ is defined, 'not in' is safe here.
polling_resources = []
black_res = self.resources[key].blacklist
history = poll_history.get(pollster.name, [])
for x in candidate_res:
if x not in history:
history.append(x)
if x not in black_res:
polling_resources.append(x)
poll_history[pollster.name] = history
# If no resources, skip for this pollster
if not polling_resources:
p_context = 'new ' if history else ''
LOG.info(_LI("Skip pollster %(name)s, no %(p_context)s"
"resources found this cycle"),
{'name': pollster.name, 'p_context': p_context})
continue
LOG.info(_LI("Polling pollster %(poll)s in the context of "
"%(src)s"),
dict(poll=pollster.name, src=source_name))
try:
samples = pollster.obj.get_samples(
manager=self.manager,
cache=cache,
resources=polling_resources
)
sample_batch = []
for sample in samples:
sample_dict = (
publisher_utils.meter_message_from_counter(
sample, self._telemetry_secret
))
if self._batch:
sample_batch.append(sample_dict)
else:
self._send_notification([sample_dict])
if sample_batch:
self._send_notification(sample_batch)
except plugin_base.PollsterPermanentError as err:
LOG.error(_(
'Prevent pollster %(name)s for '
'polling source %(source)s anymore!')
% ({'name': pollster.name, 'source': source_name}))
self.resources[key].blacklist.extend(err.fail_res_list)
except Exception as err:
LOG.warning(_(
'Continue after error from %(name)s: %(error)s')
% ({'name': pollster.name, 'error': err}),
exc_info=True)
说明:
- interval_task 周期性的任务
- 调用对应的pollster对应的 get_sample()方法获取对应的samples数据
- 对每一个sample数据调用ceilometer/publisher/util.py中的 meter_message_from_counter()方法,组装一个msg
- 如果配置文件中定义了批量发送sample数据,则将这个sample数据append到sample_batch,否则直接调用self._send_notification()进行数据发送
- 在 self._send_notification()中,调用self.manager.notifier.sample()将数据发送出去, 这里的manager是AgentManager的一个实例,notifier是上文通过transport、driver创建的Notification对象,sample函数是用来发送数据的
以上的定时任务,最终通过 self.tg.add_timer, 加入了定时任务的队列中,定期执行
self.init_pipeline_refresh()
定期刷新pipeline
TODO: 待后续详细分析