"""
Holds the comet methods for the NSO-JSON RPC
"""
from nso_jsonrpc_requester.common import NsoJsonRpcCommon
[docs]class NsoJsonRpcComet(NsoJsonRpcCommon):
"""
This class is used for the NSO JsonRPC API for remote logging
:type protocol: String
:param protocol: ('http', 'https') Default: http
:type ip: String
:param ip: IPv4 Address or hostname Default: 127.0.0.1
:type port: String
:param port: A protocol port Default: 8080
:type username: String
:param username: The username to use Default: admin
:type password: String
:param password: The password to use Default: admin
:type ssl_verify: Boolean
:param ssl_verify: Choice to verify SSL Cer Default: True
:rtype: None
:returns: NA init
:rasies TypeError: If protocol is not ('http', 'https')
"""
def __init__(self, protocol: str = 'http', ip: str = '127.0.0.1', port: str = '8080',
username: str = 'admin', password: str = 'admin', ssl_verify: bool = True) -> None:
super().__init__(protocol, ip, port, username, password, ssl_verify)
self.comet_started = False
self.start_comet()
def __str__(self): # pragma: no cover
return '<NsoJsonRpcComet>'
[docs] def start_comet(self):
"""
Method to start the comet process
:rtype: None
:return: None
"""
self.__check_comet_state(False)
self.comet_started = True
self.login()
self.new_trans()
self.__comet()
[docs] def stop_comet(self):
"""
Method to stop the comet process
:rtype: None
:return: None
"""
self.__check_comet_state(True)
self.__unsubscribe()
self.__comet()
self.logout()
self.comet_started = False
[docs] def comet_poll(self):
"""
Method to return comet result only
:rtype: String
:return: result
"""
self.__check_comet_state(True)
try:
return self.__comet()['result']
except Exception as err: # pragma: no cover
self.stop_comet()
[docs] def subscribe_changes(self, path):
"""
Method to send a subscribe_changes post
:type path: String
:param path: The NSO KEYPATH to the data to watch for changes
:rtype: Dict
:return: A dictionary of data, also appends variables to self.comet_handles to the handle given by NSO
:raises TypeError: if path is not a string
"""
self.__check_comet_state(True)
if not isinstance(path, str):
raise TypeError(f'param path must be of type string but received {type(path)}')
subscribe_changes_json = {'jsonrpc': '2.0',
'id': self.request_id,
'method': 'subscribe_changes',
'params': {
'comet_id': self.comet_id,
'path': path
}}
response = self.post_with_cookies(subscribe_changes_json)
if response.ok:
self.comet_handles.append(response.json()['result']['handle'])
if self.__start_subscription(response.json()['result']['handle']).ok:
return response.json()
else: # pragma: no cover
response.raise_for_status()
[docs] def subscribe_poll_leaf(self, path, interval):
"""
Method to send a subscribe_poll_leaf post
:type path: String
:param path: The NSO KEYPATH to the data to watch for changes
:type interval: Integer
:param interval: The interval of time for polling
:rtype: Dict
:return: A dictionary of data, also appends variables to self.comet_handles to the handle given by NSO
:raises TypeError: if path is not a string
:raises TypeError: if interval is not a integer
"""
self.__check_comet_state(True)
if not isinstance(path, str):
raise TypeError(f'param path must be of type string but received {type(path)}')
if not isinstance(interval, int):
raise TypeError(f'param interval must be of type integer but received {type(interval)}')
subscribe_poll_leaf_json = {'jsonrpc': '2.0',
'id': self.request_id,
'method': 'subscribe_poll_leaf',
'params': {
'comet_id': self.comet_id,
'path': path,
'interval': interval
}}
response = self.post_with_cookies(subscribe_poll_leaf_json)
if response.ok:
self.comet_handles.append(response.json()['result']['handle'])
if self.__start_subscription(response.json()['result']['handle']).ok:
return response.json()
else: # pragma: no cover
response.raise_for_status()
[docs] def subscribe_cdboper(self, path):
"""
Method to send a subscribe_cdboper post
:type path: String
:param path: The NSO KEYPATH to the data to watch for changes
:rtype: Dict
:return: A dictionary of data, also appends variables to self.comet_handles to the handle given by NSO
:raises TypeError: if path is not a string
"""
self.__check_comet_state(True)
if not isinstance(path, str):
raise TypeError(f'param path must be of type string but received {type(path)}')
subscribe_cdboper_json = {'jsonrpc': '2.0',
'id': self.request_id,
'method': 'subscribe_cdboper',
'params': {
'comet_id': self.comet_id,
'path': path
}}
response = self.post_with_cookies(subscribe_cdboper_json)
if response.ok:
self.comet_handles.append(response.json()['result']['handle'])
if self.__start_subscription(response.json()['result']['handle']).ok:
return response.json()
else: # pragma: no cover
response.raise_for_status()
[docs] def subscribe_upgrade(self):
"""
Method to send a subscribe_upgrade post
:rtype: Dict
:return: A dictionary of data, also appends variables to self.comet_handles to the handle given by NSO
"""
self.__check_comet_state(True)
subscribe_upgrade_json = {'jsonrpc': '2.0',
'id': self.request_id,
'method': 'subscribe_upgrade',
'params': {
'comet_id': self.comet_id,
}}
response = self.post_with_cookies(subscribe_upgrade_json)
if response.ok:
self.comet_handles.append(response.json()['result']['handle'])
if self.__start_subscription(response.json()['result']['handle']).ok:
return response.json()
else: # pragma: no cover
response.raise_for_status()
[docs] def subscribe_jsonrpc_batch(self):
"""
Method to send a subscribe_jsonrpc_batch post
:rtype: Dict
:return: A dictionary of data, also appends variables to self.comet_handles to the handle given by NSO
"""
self.__check_comet_state(True)
subscribe_jsonrpc_batch_json = {'jsonrpc': '2.0',
'id': self.request_id,
'method': 'subscribe_jsonrpc_batch',
'params': {
'comet_id': self.comet_id,
}}
response = self.post_with_cookies(subscribe_jsonrpc_batch_json)
if response.ok:
self.comet_handles.append(response.json()['result']['handle'])
if self.__start_subscription(response.json()['result']['handle']).ok:
return response.json()
else: # pragma: no cover
response.raise_for_status()
[docs] def get_subscriptions(self):
"""
Method to send a get_subscriptions post
This get's the sessions subscriptions
:rtype: Dict
:return: A dictionary of subscriptions
"""
self.__check_comet_state(True)
get_subscriptions_json = {'jsonrpc': '2.0',
'id': self.request_id,
'method': 'get_subscriptions',
}
response = self.post_with_cookies(get_subscriptions_json)
if response.ok:
return response.json()
else: # pragma: no cover
response.raise_for_status()
def __comet(self):
"""
Method to send a comet post
comet is a log receiver, and can receive logs from the following methods
start_cmd, subscribe_cdboper, subscribe_changes, subscribe_messages,
subscribe_poll_leaf or subscribe_upgrade
:rtype: Dict
:return: A dictionary of data
"""
comet_json = {'jsonrpc': '2.0',
'id': self.request_id,
'method': 'comet',
'params': {
'comet_id': self.comet_id,
}}
response = self.post_with_cookies(comet_json)
if response.ok:
return response.json()
else: # pragma: no cover
response.raise_for_status()
def __start_subscription(self, handle):
"""
Method to send a start_subscription post
:type handle: String
:param handle: Handle to start the subscription on
:rtype: Dict
:return: A dictionary of data
"""
self.__check_comet_state(True)
start_subscription_json = {'jsonrpc': '2.0',
'id': self.request_id,
'method': 'start_subscription',
'params': {
'handle': handle,
}}
response = self.post_with_cookies(start_subscription_json)
if response.ok:
return response
else: # pragma: no cover
response.raise_for_status()
def __unsubscribe(self):
"""
Method to send a unsubscribe post
:rtype: Dict
:return: A dictionary of data
"""
self.__check_comet_state(True)
for handle in self.comet_handles:
unsubscribe_json = {'jsonrpc': '2.0',
'id': self.request_id,
'method': 'unsubscribe',
'params': {
'handle': handle,
}}
response = self.post_with_cookies(unsubscribe_json)
if response.ok:
pass
else: # pragma: no cover
response.raise_for_status()
def __check_comet_state(self, wanted_state):
"""
Method to verify comet state
:type wanted_state: Boolean
:param wanted_state: The state expected
:rtype: None
:return: None
:raises Exception: if wanted_state is False, but it is True
:raises Exception: if wanted_state is True, but it is False
"""
if self.comet_started != wanted_state:
if self.comet_started:
raise Exception('Comet is already running!!') # pylint: disable=broad-exception-raised
if not self.comet_started:
raise Exception('Comet is not running!!') # pylint: disable=broad-exception-raised
if __name__ == '__main__': # pragma: no cover
help(NsoJsonRpcComet)