diff options
author | pikusa <pikusa@man.poznan.pl> | 2013-04-03 13:18:17 (GMT) |
---|---|---|
committer | pikusa <pikusa@man.poznan.pl> | 2013-04-03 13:18:17 (GMT) |
commit | 2f2a3a129c91de540e66c3bfbe30b0df1942cd4b (patch) | |
tree | 2d313cdf0068af368d4de6067d676be16f6a6464 /Monitoring/src/main/python/Task | |
parent | ff8aa232b071a9b54dff833714a870fd0aec0b30 (diff) | |
download | novi-public-2f2a3a129c91de540e66c3bfbe30b0df1942cd4b.zip novi-public-2f2a3a129c91de540e66c3bfbe30b0df1942cd4b.tar.gz novi-public-2f2a3a129c91de540e66c3bfbe30b0df1942cd4b.tar.bz2 |
project commit and dir tree change
Diffstat (limited to 'Monitoring/src/main/python/Task')
-rw-r--r-- | Monitoring/src/main/python/Task/EventHandler.py | 17 | ||||
-rw-r--r-- | Monitoring/src/main/python/Task/EventHandler.py.old | 17 | ||||
-rw-r--r-- | Monitoring/src/main/python/Task/Task$py.class | bin | 0 -> 34368 bytes | |||
-rw-r--r-- | Monitoring/src/main/python/Task/Task.py | 452 | ||||
-rw-r--r-- | Monitoring/src/main/python/Task/Task.py.old | 427 | ||||
-rw-r--r-- | Monitoring/src/main/python/Task/__init__$py.class | bin | 0 -> 2052 bytes | |||
-rw-r--r-- | Monitoring/src/main/python/Task/__init__.py | 0 | ||||
-rw-r--r-- | Monitoring/src/main/python/Task/__init__.py.old | 0 | ||||
-rw-r--r-- | Monitoring/src/main/python/Task/test.py | 203 | ||||
-rw-r--r-- | Monitoring/src/main/python/Task/test.py.old | 203 |
10 files changed, 1319 insertions, 0 deletions
diff --git a/Monitoring/src/main/python/Task/EventHandler.py b/Monitoring/src/main/python/Task/EventHandler.py new file mode 100644 index 0000000..40bd8e9 --- /dev/null +++ b/Monitoring/src/main/python/Task/EventHandler.py @@ -0,0 +1,17 @@ +''' +Created on 08.08.2011 + +@author: csc +''' + +class EventHandler(): + ''' + classdocs + ''' + + + def __init__(self, parent): + ''' + Constructor + ''' + self.parent = parent
\ No newline at end of file diff --git a/Monitoring/src/main/python/Task/EventHandler.py.old b/Monitoring/src/main/python/Task/EventHandler.py.old new file mode 100644 index 0000000..40bd8e9 --- /dev/null +++ b/Monitoring/src/main/python/Task/EventHandler.py.old @@ -0,0 +1,17 @@ +''' +Created on 08.08.2011 + +@author: csc +''' + +class EventHandler(): + ''' + classdocs + ''' + + + def __init__(self, parent): + ''' + Constructor + ''' + self.parent = parent
\ No newline at end of file diff --git a/Monitoring/src/main/python/Task/Task$py.class b/Monitoring/src/main/python/Task/Task$py.class Binary files differnew file mode 100644 index 0000000..455462b --- /dev/null +++ b/Monitoring/src/main/python/Task/Task$py.class diff --git a/Monitoring/src/main/python/Task/Task.py b/Monitoring/src/main/python/Task/Task.py new file mode 100644 index 0000000..26f3a1c --- /dev/null +++ b/Monitoring/src/main/python/Task/Task.py @@ -0,0 +1,452 @@ +from __future__ import with_statement +''' +Created on 08.08.2011 + +@author: steger, jozsef +@organization: ELTE +@contact: steger@complex.elte.hu +''' + +from time import time +from threading import Thread, Event, Lock, RLock +from DataProcessing.Data import Data, DataHeader +from DataProcessing.Parameter import ParameterList +from Credential.credentialtypes import Credential +import Driver +from org.slf4j import Logger +from org.slf4j import LoggerFactory +import threading +#TODO: logger + +class TaskError(Exception): + pass + +''' +@cvar STRAT_PERIODICAL: sampling strategy, when samples are taken periodically controlled by wait +@cvar STRAT_ONDEMAND: sampling strategy, retrievehook is run once, then automatically Task is disabled +@cvar STRAT_FUNCTIONAL: sampling strategy, user provides a call back function to generate the time series to wait between consecutive sampling +''' +STRAT_PERIODICAL, STRAT_ONDEMAND, STRAT_FUNCTIONAL = range(3) + + +class SubtaskManager(object): + ''' + @author: Jozsef Steger + @summary: SubtaskManager class provides the basic functionalities: + 1. to generate a new Task + 2. to access a Task referenced by the task identifier + 3. to destroy a Task explicit or referenced by the task identifier + ''' + logger = LoggerFactory.getLogger("MS.Task.STM") + + class Task(object): + ''' + @author: Jozsef Steger + @summary: + This class represents a simplified control plane of a wide variety of monitoring tools. + In order to generate monitoring data tools are manipulated via several steps. + Descendants of the Task class implement these steps. + In the simplified work flow the following steps are abstracted: + - prehook: it is run once to initiate the communication channel to the tool and/or to initialize a monitoring task + - starthook: in case the tool operates in such a manner, this step is run to trigger the start up of a background measurement + - retrievehook: implements data retrieval from the tool, it has to transform measurement data to the internal data representation + - stophook: it takes care for stopping any processes running in the background at the tool + - posthook: provides a means to clean up after a monitoring task is over + + Subclasses must be generated dynamically using the generateSubclass() method + + @cvar timeout: the time to wait for the background thread to finish + + @cvar STATE_SLEEPING: a state, retirevehook does not carry out data collection, monitoring is kept asleep + @cvar STATE_RUNNING: a state, retrievehook carries out data collection + + + @note: Only descendants of this class are intended to be instantiated via the static method generateSubclass + ''' + timeout = 5 + STATE_SLEEPING, STATE_RUNNING = range(2) + + def __init__(self, taskmanager, credential, parameters = None, samplingfunction = None, **kw): + ''' + @summary: Constructor + @param taskmanager: a reference to the SubtaskManager, which needs to called back + if explicit deletion of the Task happens to keep the list of tasks coherent + @type taskmanager: SubtaskManager + @param credential: the necessary authentication information to access and control the tools + @type credential: Credential + @param parameters: the parameters to control the tools, ie. measurement details + @type parameters: ParameterList + @param samplingfunction: a generator function yielding a number, if the generator provides a + finite series of numbers, after all items are consumed the Task gets disabled + @type samplingfunction: callable + @keyword kw: parameters, which are passed to the prehook method of the Task + @raise TaskError: wrong type of credential + + @ivar slock: lock concurrent enable/disable calls + @ivar cleanup: the function to call back after task is deleted + @ivar um: the unit manager + @ivar strategy: indicates how sampling is done, ie. how long to wait between two measurements + @ivar _wait: for periodical sampling (STRAT_PERIODICAL) this constant is yielded by the generator + @ivar gen_functional: for user defined sampling (STRAT_FUNCTIONAL) this is the generator to use + @ivar credential: credential + @ivar parameters: parameters + @ivar sliceID: slice identifier the Task belongs to, -1 means unspecified. + @ivar data: internal representation of monitoring data + @ivar inRetrievehook: Event indicating current data retrieval + @ivar dataAdded: Event indicating new data are produced by former runs + @ivar stopworker: Event indicating that the Task was disabled + @ivar t: Thread to take care of new measurements + @ivar _runcount: integer showing how many data retrieval attempt have been made + @ivar _durationrecord: a list of time stamp and duration pairs showing when data retrievals + happened and how long they took + + @note: deleting the strategy property disables the Task and sets the strategy to STRAT_ONDEMAND + @note: setting the strategy property takes effect only after the Task is (re)enabled + ''' + self.slock = Lock() + self.logger = taskmanager.logger + self.cleanup = taskmanager.removefromtasklist + self.um = taskmanager.um + self._strategy = STRAT_PERIODICAL + self._wait = 1 + self._sliceID = -1 + self.gen_functional = samplingfunction + if isinstance(credential, Credential): + self.credential = credential + else: + raise TaskError("wrong type of credential") + if isinstance(parameters, ParameterList): + self.parameters = parameters + else: + self.parameters = ParameterList() + self.data = Data(self.um, self.dataheader) + self.inRetrievehook = Event() + self.dataAdded = Event() + self.stopworker = Event() + self.stopworker.set() + try: + self.prehook(**kw) + except: + raise #TaskError() + self.t = None + self._runcount = 0 + self._durationrecord = [] + +# def __del__(self): +# if self.cleanup is not None: +# print "WW: explicit deletion of %s" % self +# self.destroy() + + def destroy(self): + print "Destroy %s" % self + if self.cleanup is None: + print "WW: %s is already destroyed" % self + return + if self.state == self.STATE_RUNNING: + print "Call disable %s" % self + self.disable() + print "Disabled %s" % self + with self.slock: + print "Call posthook %s" % self + self.posthook() + if self.cleanup: + self.cleanup(self) + self.cleanup = None + + def wait(self, wait): + ''' + @summary: wait until Task is disabled or the provided waiting time is over + @param wait: requests time to wait for the next sample + @type wait: float + ''' + try: + self.stopworker.wait( max(.1, float(wait)) ) + except: + self.stopworker.wait( 1 ) + + def _get_sliceID(self): + return self._sliceID + def _set_sliceID(self, value): + if (self._sliceID != -1) and (self._sliceID != value): + raise TaskError("you can set sliceID property only once") + self._sliceID = value + def _del_sliceID(self): + raise TaskError("shan't ever delete this property") + + def _get_runcount(self): + return self._runcount + def _set_runcount(self, value): + raise TaskError("shan't ever set this property") + def _del_runcount(self): + raise TaskError("shan't ever delete this property") + + def _get_duration(self): + if len(self._durationrecord): + return self._durationrecord[-1][1] + else: + return -1 + + def _get_state(self): + if self.stopworker.isSet(): + return self.STATE_SLEEPING + else: + return self.STATE_RUNNING + + def state_hrn(self): + """ + @summary: return the state of the task in human readable format + @return: the state of the task + @rtype: string + """ + if self.stopworker.isSet(): + return "SLEEPING" + else: + return "RUNNING" + + def _get_strategy(self): + with self.slock: + return self._strategy + def _set_strategy(self, value): + if value in [STRAT_ONDEMAND, STRAT_PERIODICAL, STRAT_FUNCTIONAL]: + with self.slock: + self._strategy = value + def _del_strategy(self): + self.disable() + with self.slock: + self._strategy = STRAT_ONDEMAND + + def gen_ondemand(self): + yield 0 + + def gen_periodic(self): + while True: + yield self._wait + + def enable(self): + """ + @summary: enable task + """ + print "Enable" + with self.slock: + if self.t is not None: + raise TaskError("You can enable a perfectly disabled Task") + self.stopworker.clear() + # run starthook + self.starthook() + # initialize working thread + self.t = Thread(target = self._worker, name = str(self)) + self.t.setDaemon(False) + self.t.start() + + def disable(self): + """ + @summary: disable task + """ + print "Disable" + with self.slock: + if self.t is None: + print "WW: %s already disabled" % self + return + self.stopworker.set() + try: + # wait for working thread to finish + n = 0 + if self.t==threading.currentThread(): raise RuntimeError("alma") + while True: + n += 1 + self.t.join(self.timeout) + if self.t.isAlive(): + if n==20: self.logger.info("EE: timeout occurred %d times while disabling: %s" % (n, self)) + # print "EE: timeout occurred %d times while disabling: %s" % (n, self) + else: + break + #self.logger.info("Thread has destroyed") + except RuntimeError: + # generator does not provide any more waiting time interval + # thread tries to join itself + # self.logger.info("thread tries t join itself") + pass + # run stophook + try: + self.stophook() + finally: + self.t = None + + def _worker(self): + ''' + @summary: This method is running in a background thread. + It takes care of calling retrievehook. + If new data are produced by the tool it is indicated via dataAdded. + ''' + strategy = self.strategy + if strategy == STRAT_ONDEMAND: + generator = self.gen_ondemand + elif strategy == STRAT_PERIODICAL: + generator = self.gen_periodic + elif strategy == STRAT_FUNCTIONAL: + generator = self.gen_functional + for wait in generator(): + if not self.stopworker.isSet(): + # the task is still running + self._runcount += 1 + self.inRetrievehook.set() + invocation = time() + try: + print "Call retrievehook" + R = self.retrievehook() + except Exception, e: + self.logger.info( "EE: %s %s" % (self, e)) + break + finally: + self.inRetrievehook.clear() + stopped = time() + self._durationrecord.append( (invocation, stopped - invocation) ) + if R: + self.dataAdded.set() + else: + # the task is disabled + break + try: + self.wait(wait) + except: + self.logger.info("Unexpected exception in %s during waiting"% self) + if not self.stopworker.isSet(): + # the Task is not disabled + # but there are no more items in the series of waiting time + # so we disable it + #self.logger.info("Calling disable task") + self.disable() + self.logger.info("Task has just ended %s." % self) + + + duration = property(_get_duration,None,None) + + sliceID = property(_get_sliceID,_set_sliceID,_del_sliceID) + + runcount = property(_get_runcount,_set_runcount,_del_runcount) + + state = property(_get_state,None,None) + + strategy = property(_get_strategy,_set_strategy,_del_strategy) + def __init__(self, unitmanager): + self._tasks = {} + self.tasklock = RLock() + self._lastid = -1 + self.um = unitmanager + + def __str__(self): + return "<SubtaskManager %s (%d tasks)>" % (id(self), len(self)) + + def __len__(self): + return len(self._tasks) + + def _get_uniqid(self): + self._lastid += 1 + return self._lastid + + @staticmethod + def _shiftMethod(implementation): + ''' + @summary: helps indentation of the piece of implemented code + ''' + return "\n\t\t".join( filter(None, implementation.splitlines()) ) + + + + uniqid = property(_get_uniqid,None,None) + + + def generate(self, name, driver, dataheader, hookimplementations, credential, parameters, samplingfunction = None, **kw): + ''' + @summary: This method is responsible for dynamically generate a new Task subclass + @param name: the name of the dynamically generated class + @type name: string + @param driver: driver implementing a communication channel to the tool + @type driver: Driver + @param dataheader: contains information of the data structure of the result + @type dataheader: DataHeader + @param hookimplementations: the work flow steps (hooks) + @type hookimplementations: dict + @param credential: the credential used by the driver + @type credential: dict + @param parameters: list of control parameters to fine tune measurements + @type parameters: ParameterList + @param samplingfunction: a generator yielding the time interval elements of a series to wait between consecutive samples + @type samplingfunction: callable + @keyword kw: extra keyword arguments passed to the prehook of the new Task + @return: identifier and the subclass instance of (Task) + @rtype: int, Task + @raise TaskError: wrong Driver type / wrong DataHeader type / erroneous implementation + ''' + prehook = self._shiftMethod( hookimplementations.get("prehook", "pass") ) + starthook = self._shiftMethod( hookimplementations.get("starthook", "pass") ) + stophook = self._shiftMethod( hookimplementations.get("stophook", "pass") ) + posthook = self._shiftMethod( hookimplementations.get("posthook", "pass") ) + retrievehook = self._shiftMethod( hookimplementations.get("retrievehook", "raise TaskException(\"retrievehook() must be implemented\")") ) + if not issubclass(driver, Driver.Driver.Driver): + raise TaskError("wrong Driver type %s" % driver) + if not isinstance(dataheader, DataHeader): + raise TaskError("wrong DataHeader type %s" % dataheader) + classtemplate = """ +import re +from DataProcessing.Data import DataHeader +class %s(SubtaskManager.Task): +\tdriver = None +\tdataheader = None +\tdef prehook(self, **kw): +\t\t%s +\tdef starthook(self): +\t\t%s +\tdef retrievehook(self): +\t\t%s +\tdef stophook(self): +\t\t%s +\tdef posthook(self): +\t\t%s""" % (name, prehook, starthook, retrievehook, stophook, posthook) + try: + exec(classtemplate, globals()) + except: + print classtemplate + raise TaskError("erroneous implementation (%s)" % name) + taskclass = globals()[name] + taskclass.driver = driver + taskclass.dataheader = dataheader + taskid = self.uniqid + task = taskclass(self, credential, parameters, samplingfunction, **kw) + with self.tasklock: + self._tasks[taskid] = task +# print "+ new Task %s is identified by %d" % (task, taskid) +# print self + return taskid, task + + def __getitem__(self, taskidentifier): + with self.tasklock: + return self._tasks[taskidentifier] + + def getidentifier(self, task): + with self.tasklock: + for tid, t in self._tasks.iteritems(): + if task == t: + return tid + raise TaskError("Task %s is unknown to me" % task) + + def removefromtasklist(self, task): + with self.tasklock: + try: + taskidentifier = self.getidentifier(task) + self.pop(taskidentifier) + except TaskError: + pass + + def pop(self, taskidentifier): + try: + task = self[ taskidentifier ] + #task.destroy() + except KeyError: + print "WW: Task identified by %s is unknown to me" % taskidentifier + + def tasks_of_slice(self, sliceID = -1): + for t in self._tasks.values(): + if t.sliceID == sliceID: + yield t + diff --git a/Monitoring/src/main/python/Task/Task.py.old b/Monitoring/src/main/python/Task/Task.py.old new file mode 100644 index 0000000..a123763 --- /dev/null +++ b/Monitoring/src/main/python/Task/Task.py.old @@ -0,0 +1,427 @@ +''' +Created on 08.08.2011 + +@author: steger, jozsef +@organization: ELTE +@contact: steger@complex.elte.hu +''' + +from time import time +from threading import Thread, Event, Lock, RLock +from DataProcessing.Data import Data, DataHeader +from DataProcessing.Parameter import ParameterList +from Credential.credentialtypes import Credential +import Driver +#TODO: logger + +class TaskError(Exception): + pass + +''' +@cvar STRAT_PERIODICAL: sampling strategy, when samples are taken periodically controlled by wait +@cvar STRAT_ONDEMAND: sampling strategy, retrievehook is run once, then automatically Task is disabled +@cvar STRAT_FUNCTIONAL: sampling strategy, user provides a call back function to generate the time series to wait between consecutive sampling +''' +STRAT_PERIODICAL, STRAT_ONDEMAND, STRAT_FUNCTIONAL = range(3) + + +class SubtaskManager(object): + ''' + @author: Jozsef Steger + @summary: SubtaskManager class provides the basic functionalities: + 1. to generate a new Task + 2. to access a Task referenced by the task identifier + 3. to destroy a Task explicit or referenced by the task identifier + ''' + + class Task(object): + ''' + @author: Jozsef Steger + @summary: + This class represents a simplified control plane of a wide variety of monitoring tools. + In order to generate monitoring data tools are manipulated via several steps. + Descendants of the Task class implement these steps. + In the simplified work flow the following steps are abstracted: + - prehook: it is run once to initiate the communication channel to the tool and/or to initialize a monitoring task + - starthook: in case the tool operates in such a manner, this step is run to trigger the start up of a background measurement + - retrievehook: implements data retrieval from the tool, it has to transform measurement data to the internal data representation + - stophook: it takes care for stopping any processes running in the background at the tool + - posthook: provides a means to clean up after a monitoring task is over + + Subclasses must be generated dynamically using the generateSubclass() method + + @cvar timeout: the time to wait for the background thread to finish + + @cvar STATE_SLEEPING: a state, retirevehook does not carry out data collection, monitoring is kept asleep + @cvar STATE_RUNNING: a state, retrievehook carries out data collection + + + @note: Only descendants of this class are intended to be instantiated via the static method generateSubclass + ''' + timeout = 5 + STATE_SLEEPING, STATE_RUNNING = range(2) + + def __init__(self, taskmanager, credential, parameters = None, samplingfunction = None, **kw): + ''' + @summary: Constructor + @param taskmanager: a reference to the SubtaskManager, which needs to called back + if explicit deletion of the Task happens to keep the list of tasks coherent + @type taskmanager: SubtaskManager + @param credential: the necessary authentication information to access and control the tools + @type credential: Credential + @param parameters: the parameters to control the tools, ie. measurement details + @type parameters: ParameterList + @param samplingfunction: a generator function yielding a number, if the generator provides a + finite series of numbers, after all items are consumed the Task gets disabled + @type samplingfunction: callable + @keyword kw: parameters, which are passed to the prehook method of the Task + @raise TaskError: wrong type of credential + + @ivar slock: lock concurrent enable/disable calls + @ivar cleanup: the function to call back after task is deleted + @ivar um: the unit manager + @ivar strategy: indicates how sampling is done, ie. how long to wait between two measurements + @ivar _wait: for periodical sampling (STRAT_PERIODICAL) this constant is yielded by the generator + @ivar gen_functional: for user defined sampling (STRAT_FUNCTIONAL) this is the generator to use + @ivar credential: credential + @ivar parameters: parameters + @ivar sliceID: slice identifier the Task belongs to, -1 means unspecified. + @ivar data: internal representation of monitoring data + @ivar inRetrievehook: Event indicating current data retrieval + @ivar dataAdded: Event indicating new data are produced by former runs + @ivar stopworker: Event indicating that the Task was disabled + @ivar t: Thread to take care of new measurements + @ivar _runcount: integer showing how many data retrieval attempt have been made + @ivar _durationrecord: a list of time stamp and duration pairs showing when data retrievals + happened and how long they took + + @note: deleting the strategy property disables the Task and sets the strategy to STRAT_ONDEMAND + @note: setting the strategy property takes effect only after the Task is (re)enabled + ''' + self.slock = Lock() + self.cleanup = taskmanager.removefromtasklist + self.um = taskmanager.um + self._strategy = STRAT_PERIODICAL + self._wait = 1 + self._sliceID = -1 + self.gen_functional = samplingfunction + if isinstance(credential, Credential): + self.credential = credential + else: + raise TaskError("wrong type of credential") + if isinstance(parameters, ParameterList): + self.parameters = parameters + else: + self.parameters = ParameterList() + self.data = Data(self.um, self.dataheader) + self.inRetrievehook = Event() + self.dataAdded = Event() + self.stopworker = Event() + self.stopworker.set() + try: + self.prehook(**kw) + except: + raise #TaskError() + self.t = None + self._runcount = 0 + self._durationrecord = [] + + def __del__(self): + if self.cleanup is not None: + print "WW: explicit deletion of %s" % self + self.destroy() + + def destroy(self): + if self.cleanup is None: + print "WW: %s is already destroyed" % self + return + if self.state == self.STATE_RUNNING: + self.disable() + with self.slock: + self.posthook() + if self.cleanup: + self.cleanup(self) + self.cleanup = None + + def wait(self, wait): + ''' + @summary: wait until Task is disabled or the provided waiting time is over + @param wait: requests time to wait for the next sample + @type wait: float + ''' + try: + self.stopworker.wait( max(.1, float(wait)) ) + except: + self.stopworker.wait( 1 ) + + @property + def sliceID(self): + return self._sliceID + @sliceID.setter + def sliceID(self, value): + if (self._sliceID != -1) and (self._sliceID != value): + raise TaskError("you can set sliceID property only once") + self._sliceID = value + @sliceID.deleter + def sliceID(self): + raise TaskError("shan't ever delete this property") + + @property + def runcount(self): + return self._runcount + @runcount.setter + def runcount(self, value): + raise TaskError("shan't ever set this property") + @runcount.deleter + def runcount(self): + raise TaskError("shan't ever delete this property") + + @property + def duration(self): + if len(self._durationrecord): + return self._durationrecord[-1][1] + else: + return -1 + + @property + def state(self): + if self.stopworker.isSet(): + return self.STATE_SLEEPING + else: + return self.STATE_RUNNING + + def state_hrn(self): + """ + @summary: return the state of the task in human readable format + @return: the state of the task + @rtype: string + """ + if self.stopworker.isSet(): + return "SLEEPING" + else: + return "RUNNING" + + @property + def strategy(self): + with self.slock: + return self._strategy + @strategy.setter + def strategy(self, value): + if value in [STRAT_ONDEMAND, STRAT_PERIODICAL, STRAT_FUNCTIONAL]: + with self.slock: + self._strategy = value + @strategy.deleter + def strategy(self): + self.disable() + with self.slock: + self._strategy = STRAT_ONDEMAND + + def gen_ondemand(self): + yield 0 + + def gen_periodic(self): + while True: + yield self._wait + + def enable(self): + """ + @summary: enable task + """ + with self.slock: + if self.t is not None: + raise TaskError("You can enable a perfectly disabled Task") + self.stopworker.clear() + # run starthook + self.starthook() + # initialize working thread + self.t = Thread(target = self._worker, name = str(self)) + self.t.setDaemon(True) + self.t.start() + + def disable(self): + """ + @summary: disable task + """ + with self.slock: + if self.t is None: + print "WW: %s already disabled" % self + return + self.stopworker.set() + try: + # wait for working thread to finish + n = 0 + while True: + n += 1 + self.t.join(self.timeout) + if self.t.isAlive(): + print "EE: timeout occurred %d times while disabling: %s" % (n, self) + else: + break + except RuntimeError: + # generator does not provide any more waiting time interval + # thread tries to join itself + pass + # run stophook + try: + self.stophook() + finally: + self.t = None + + def _worker(self): + ''' + @summary: This method is running in a background thread. + It takes care of calling retrievehook. + If new data are produced by the tool it is indicated via dataAdded. + ''' + strategy = self.strategy + if strategy == STRAT_ONDEMAND: + generator = self.gen_ondemand + elif strategy == STRAT_PERIODICAL: + generator = self.gen_periodic + elif strategy == STRAT_FUNCTIONAL: + generator = self.gen_functional + for wait in generator(): + if not self.stopworker.isSet(): + # the task is still running + self._runcount += 1 + self.inRetrievehook.set() + invocation = time() + try: + R = self.retrievehook() + except Exception, e: + print "EE: %s %s" % (self, e) + break + finally: + self.inRetrievehook.clear() + stopped = time() + self._durationrecord.append( (invocation, stopped - invocation) ) + if R: + self.dataAdded.set() + else: + # the task is disabled + break + self.wait(wait) + if not self.stopworker.isSet(): + # the Task is not disabled + # but there are no more items in the series of waiting time + # so we disable it + self.disable() + + def __init__(self, unitmanager): + self._tasks = {} + self.tasklock = RLock() + self._lastid = -1 + self.um = unitmanager + + def __str__(self): + return "<SubtaskManager %s (%d tasks)>" % (id(self), len(self)) + + def __len__(self): + return len(self._tasks) + + @property + def uniqid(self): + self._lastid += 1 + return self._lastid + + @staticmethod + def _shiftMethod(implementation): + ''' + @summary: helps indentation of the piece of implemented code + ''' + return "\n\t\t".join( filter(None, implementation.splitlines()) ) + + def generate(self, name, driver, dataheader, hookimplementations, credential, parameters, samplingfunction = None, **kw): + ''' + @summary: This method is responsible for dynamically generate a new Task subclass + @param name: the name of the dynamically generated class + @type name: string + @param driver: driver implementing a communication channel to the tool + @type driver: Driver + @param dataheader: contains information of the data structure of the result + @type dataheader: DataHeader + @param hookimplementations: the work flow steps (hooks) + @type hookimplementations: dict + @param credential: the credential used by the driver + @type credential: dict + @param parameters: list of control parameters to fine tune measurements + @type parameters: ParameterList + @param samplingfunction: a generator yielding the time interval elements of a series to wait between consecutive samples + @type samplingfunction: callable + @keyword kw: extra keyword arguments passed to the prehook of the new Task + @return: identifier and the subclass instance of (Task) + @rtype: int, Task + @raise TaskError: wrong Driver type / wrong DataHeader type / erroneous implementation + ''' + prehook = self._shiftMethod( hookimplementations.get("prehook", "pass") ) + starthook = self._shiftMethod( hookimplementations.get("starthook", "pass") ) + stophook = self._shiftMethod( hookimplementations.get("stophook", "pass") ) + posthook = self._shiftMethod( hookimplementations.get("posthook", "pass") ) + retrievehook = self._shiftMethod( hookimplementations.get("retrievehook", "raise TaskException(\"retrievehook() must be implemented\")") ) + if not issubclass(driver, Driver.Driver.Driver): + raise TaskError("wrong Driver type %s" % driver) + if not isinstance(dataheader, DataHeader): + raise TaskError("wrong DataHeader type %s" % dataheader) + classtemplate = """ +import re +from DataProcessing.Data import DataHeader +class %s(SubtaskManager.Task): +\tdriver = None +\tdataheader = None +\tdef prehook(self, **kw): +\t\t%s +\tdef starthook(self): +\t\t%s +\tdef retrievehook(self): +\t\t%s +\tdef stophook(self): +\t\t%s +\tdef posthook(self): +\t\t%s""" % (name, prehook, starthook, retrievehook, stophook, posthook) + try: + exec(classtemplate, globals()) + except: + print classtemplate + raise TaskError("erroneous implementation (%s)" % name) + taskclass = globals()[name] + taskclass.driver = driver + taskclass.dataheader = dataheader + taskid = self.uniqid + task = taskclass(self, credential, parameters, samplingfunction, **kw) + with self.tasklock: + self._tasks[taskid] = task +# print "+ new Task %s is identified by %d" % (task, taskid) +# print self + return taskid, task + + def __getitem__(self, taskidentifier): + with self.tasklock: + return self._tasks[taskidentifier] + + def getidentifier(self, task): + with self.tasklock: + for tid, t in self._tasks.iteritems(): + if task == t: + return tid + raise TaskError("Task %s is unknown to me" % task) + + def removefromtasklist(self, task): + with self.tasklock: + try: + taskidentifier = self.getidentifier(task) + self.pop(taskidentifier) + except TaskError: + pass + + def pop(self, taskidentifier): + try: + task = self[ taskidentifier ] + task.destroy() + except KeyError: + print "WW: Task identified by %s is unknown to me" % taskidentifier + + def tasks_of_slice(self, sliceID = -1): + for t in self._tasks.values(): + if t.sliceID == sliceID: + yield t + diff --git a/Monitoring/src/main/python/Task/__init__$py.class b/Monitoring/src/main/python/Task/__init__$py.class Binary files differnew file mode 100644 index 0000000..f126bd8 --- /dev/null +++ b/Monitoring/src/main/python/Task/__init__$py.class diff --git a/Monitoring/src/main/python/Task/__init__.py b/Monitoring/src/main/python/Task/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/Monitoring/src/main/python/Task/__init__.py diff --git a/Monitoring/src/main/python/Task/__init__.py.old b/Monitoring/src/main/python/Task/__init__.py.old new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/Monitoring/src/main/python/Task/__init__.py.old diff --git a/Monitoring/src/main/python/Task/test.py b/Monitoring/src/main/python/Task/test.py new file mode 100644 index 0000000..5a42e9a --- /dev/null +++ b/Monitoring/src/main/python/Task/test.py @@ -0,0 +1,203 @@ +''' +Created on Sep 1, 2011 + +@author: steger +''' +import unittest +from Task import SubtaskManager, STRAT_FUNCTIONAL +from Example.Tools import sshping, sshtraceroute, sonomashortping, \ + sshmeminfo, sonomashortchirp, sshdf, sshhades +from Example.credentials import noviCredential, sonomaCredential,\ + novisaCredential, novihadesCredential +from Example.Metrics import RoundTripDelay, HopMeasurement, FreeMemory,\ + OnewayDelay, DiskUsage +from Example.Units import UM, unitless +from Resource.node import node +from Resource.path import path +from Example.Resources import PLdict, PLpaths +from time import sleep, time +from random import shuffle + +TM = SubtaskManager(UM) + +class Test(unittest.TestCase): + cred_novi = noviCredential + cred_siteadmin = novisaCredential + cred_sonoma = sonomaCredential + cred_hades = novihadesCredential + + def setUp(self): + self.um = UM + self.tm = TM + + def tearDown(self): + pass + + def map_resource_to_parameter(self, tool, metric): + pl = tool.parameters.copy() + if issubclass(metric.resourcetype, node): + resource = PLdict.values()[0] + v, u = resource.get_ipaddress("eth0") + pl.update("SourceAddress", v, u) + elif issubclass(metric.resourcetype, path): + resource = PLpaths[0] + v, u = resource.source.get_ipaddress("eth0") + pl.update("SourceAddress", v, u) + v, u = resource.destination.get_ipaddress("eth0") + pl.update("DestinationAddress", v, u) + else: + raise Exception("Unknown resource type") + return pl + + def sshPingFg(self, tool): + pl = self.map_resource_to_parameter(tool, RoundTripDelay) + _, task =self.tm.generate(name = tool.name, driver = tool.driver, dataheader = tool.dataheaderdeclaration, + hookimplementations = tool.hooks, parameters = pl, credential = self.cred_novi) + task.enable() + task.dataAdded.wait( 15 ) + self.assertGreater(len(task.data), 0, "measurement yielded empty result") + task.destroy() + + def test_sshPingFgNOVI(self): + tool = sshping + tool.parameters.update('Interface', "novi", unitless) + self.sshPingFg(tool) + + def test_sshPingFgSubstrate(self): + tool = sshping + tool.parameters.update('Interface', "eth0", unitless) + self.sshPingFg(tool) + + def test_sshTracerouteFg(self): + tool = sshtraceroute + pl = self.map_resource_to_parameter( tool, HopMeasurement ) + _, task =self.tm.generate(name = tool.name, driver = tool.driver, dataheader = tool.dataheaderdeclaration, + hookimplementations = tool.hooks, parameters = pl, credential = self.cred_novi) + task.enable() + task.dataAdded.wait( 15 ) + self.assertGreater(len(task.data), 0, "measurement yielded empty result") + task.destroy() + + def test_sshMeminfo(self): + tool = sshmeminfo + pl = self.map_resource_to_parameter( tool, FreeMemory ) + _, task =self.tm.generate(name = tool.name, driver = tool.driver, dataheader = tool.dataheaderdeclaration, + hookimplementations = tool.hooks, parameters = pl, credential = self.cred_novi) + task.enable() + task.dataAdded.wait( 15 ) + self.assertGreater(len(task.data), 0, "measurement yielded empty result") + task.destroy() + + def test_sshDiskinfo(self): + tool = sshdf + pl = self.map_resource_to_parameter( tool, DiskUsage ) + pl.update_by_list( DiskUsage.p_obligatory ) + _, task =self.tm.generate(name = tool.name, driver = tool.driver, dataheader = tool.dataheaderdeclaration, + hookimplementations = tool.hooks, parameters = pl, credential = self.cred_siteadmin) + task.enable() + task.dataAdded.wait( 15 ) + self.assertGreater(len(task.data), 0, "measurement yielded empty result") + task.destroy() + + def test_sshMeminfoUserDefinedSampling(self): + def sample5(): + for s in [.1, 1, 2, .5, -10]: + yield s + tool = sshmeminfo + pl = self.map_resource_to_parameter( tool, FreeMemory ) + _, task =self.tm.generate(name = tool.name, driver = tool.driver, dataheader = tool.dataheaderdeclaration, + hookimplementations = tool.hooks, parameters = pl, credential = self.cred_novi, samplingfunction = sample5) + task.strategy = STRAT_FUNCTIONAL + task.enable() + while task.state == task.STATE_RUNNING: + sleep(1) + task.dataAdded.wait( 1 ) + self.assertGreater(len(task.data), 0, "measurement yielded empty result") + task.destroy() + + def test_SONoMAShortPing(self): + tool = sonomashortping + pl = self.map_resource_to_parameter( tool, RoundTripDelay ) + _, task =self.tm.generate(name = tool.name, driver = tool.driver, dataheader = tool.dataheaderdeclaration, + hookimplementations = tool.hooks, parameters = pl, credential = self.cred_sonoma, **tool.kwargs) + task.enable() + task.dataAdded.wait( 15 ) + self.assertGreater(len(task.data), 0, "measurement yielded empty result") + task.destroy() + + def test_SONoMAShortChirp(self): + tool = sonomashortchirp + pl = self.map_resource_to_parameter( tool, OnewayDelay ) + _, task =self.tm.generate(name = tool.name, driver = tool.driver, dataheader = tool.dataheaderdeclaration, + hookimplementations = tool.hooks, parameters = pl, credential = self.cred_sonoma, **tool.kwargs) + task.enable() + task.dataAdded.wait( 15 ) + self.assertGreater(len(task.data), 0, "measurement yielded empty result") + task.destroy() + + def test_HADES(self): + tool = sshhades + pl = self.map_resource_to_parameter( tool, RoundTripDelay ) + pl.update('SourceAddress', '192.168.31.1', self.um.ipv4dotted) + pl.update('DestinationAddress', '192.168.31.5', self.um.ipv4dotted) + _, task =self.tm.generate(name = tool.name, driver = tool.driver, dataheader = tool.dataheaderdeclaration, + hookimplementations = tool.hooks, parameters = pl, credential = self.cred_hades, **tool.kwargs) + task.enable() + task.dataAdded.wait( 15 ) + self.assertGreater(len(task.data), 0, "measurement yielded empty result") + task.destroy() + + def test_toggle(self): + tool = sshmeminfo + pl = self.map_resource_to_parameter( tool, FreeMemory ) + _, task =self.tm.generate(name = tool.name, driver = tool.driver, dataheader = tool.dataheaderdeclaration, + hookimplementations = tool.hooks, parameters = pl, credential = self.cred_novi) + n = 10 + to = 1.5 + t = time() + while n: + task.enable() + task.dataAdded.wait( to ) + task.disable() + task.dataAdded.clear() + n -= 1 + dt = time()-t + self.assertGreater(len(task.data), 0, "measurement yielded empty result") + self.assertGreater(dt, n*to, "measurement lasted longer than expected %f > %f" % (dt, n*to)) + task.destroy() + + def test_tm(self): + tool = sshmeminfo + pl = self.map_resource_to_parameter( tool, FreeMemory ) + N = len(self.tm) + # deletion by task + n= 10 + tasks = [] + while n: + _, task =self.tm.generate(name = tool.name, driver = tool.driver, dataheader = tool.dataheaderdeclaration, + hookimplementations = tool.hooks, parameters = pl, credential = self.cred_novi) + n -= 1 + tasks.append(task) + shuffle(tasks) + while len(tasks): + t = tasks.pop() + tid = self.tm.getidentifier(t) + self.tm.destroy(tid) + # destroy by taskid + n= 10 + taskids = [] + while n: + taskid, _ =self.tm.generate(name = tool.name, driver = tool.driver, dataheader = tool.dataheaderdeclaration, + hookimplementations = tool.hooks, parameters = pl, credential = self.cred_novi) + n -= 1 + taskids.append(taskid) + shuffle(taskids) + while len(taskids): + tid = taskids.pop() + self.tm.destroy(tid) + self.assertEqual(N, len(self.tm), "some tasks were not removed from the SubtaskManager") + + +if __name__ == "__main__": + #import sys;sys.argv = ['', 'Test.test_toggle'] + unittest.main() diff --git a/Monitoring/src/main/python/Task/test.py.old b/Monitoring/src/main/python/Task/test.py.old new file mode 100644 index 0000000..5a42e9a --- /dev/null +++ b/Monitoring/src/main/python/Task/test.py.old @@ -0,0 +1,203 @@ +''' +Created on Sep 1, 2011 + +@author: steger +''' +import unittest +from Task import SubtaskManager, STRAT_FUNCTIONAL +from Example.Tools import sshping, sshtraceroute, sonomashortping, \ + sshmeminfo, sonomashortchirp, sshdf, sshhades +from Example.credentials import noviCredential, sonomaCredential,\ + novisaCredential, novihadesCredential +from Example.Metrics import RoundTripDelay, HopMeasurement, FreeMemory,\ + OnewayDelay, DiskUsage +from Example.Units import UM, unitless +from Resource.node import node +from Resource.path import path +from Example.Resources import PLdict, PLpaths +from time import sleep, time +from random import shuffle + +TM = SubtaskManager(UM) + +class Test(unittest.TestCase): + cred_novi = noviCredential + cred_siteadmin = novisaCredential + cred_sonoma = sonomaCredential + cred_hades = novihadesCredential + + def setUp(self): + self.um = UM + self.tm = TM + + def tearDown(self): + pass + + def map_resource_to_parameter(self, tool, metric): + pl = tool.parameters.copy() + if issubclass(metric.resourcetype, node): + resource = PLdict.values()[0] + v, u = resource.get_ipaddress("eth0") + pl.update("SourceAddress", v, u) + elif issubclass(metric.resourcetype, path): + resource = PLpaths[0] + v, u = resource.source.get_ipaddress("eth0") + pl.update("SourceAddress", v, u) + v, u = resource.destination.get_ipaddress("eth0") + pl.update("DestinationAddress", v, u) + else: + raise Exception("Unknown resource type") + return pl + + def sshPingFg(self, tool): + pl = self.map_resource_to_parameter(tool, RoundTripDelay) + _, task =self.tm.generate(name = tool.name, driver = tool.driver, dataheader = tool.dataheaderdeclaration, + hookimplementations = tool.hooks, parameters = pl, credential = self.cred_novi) + task.enable() + task.dataAdded.wait( 15 ) + self.assertGreater(len(task.data), 0, "measurement yielded empty result") + task.destroy() + + def test_sshPingFgNOVI(self): + tool = sshping + tool.parameters.update('Interface', "novi", unitless) + self.sshPingFg(tool) + + def test_sshPingFgSubstrate(self): + tool = sshping + tool.parameters.update('Interface', "eth0", unitless) + self.sshPingFg(tool) + + def test_sshTracerouteFg(self): + tool = sshtraceroute + pl = self.map_resource_to_parameter( tool, HopMeasurement ) + _, task =self.tm.generate(name = tool.name, driver = tool.driver, dataheader = tool.dataheaderdeclaration, + hookimplementations = tool.hooks, parameters = pl, credential = self.cred_novi) + task.enable() + task.dataAdded.wait( 15 ) + self.assertGreater(len(task.data), 0, "measurement yielded empty result") + task.destroy() + + def test_sshMeminfo(self): + tool = sshmeminfo + pl = self.map_resource_to_parameter( tool, FreeMemory ) + _, task =self.tm.generate(name = tool.name, driver = tool.driver, dataheader = tool.dataheaderdeclaration, + hookimplementations = tool.hooks, parameters = pl, credential = self.cred_novi) + task.enable() + task.dataAdded.wait( 15 ) + self.assertGreater(len(task.data), 0, "measurement yielded empty result") + task.destroy() + + def test_sshDiskinfo(self): + tool = sshdf + pl = self.map_resource_to_parameter( tool, DiskUsage ) + pl.update_by_list( DiskUsage.p_obligatory ) + _, task =self.tm.generate(name = tool.name, driver = tool.driver, dataheader = tool.dataheaderdeclaration, + hookimplementations = tool.hooks, parameters = pl, credential = self.cred_siteadmin) + task.enable() + task.dataAdded.wait( 15 ) + self.assertGreater(len(task.data), 0, "measurement yielded empty result") + task.destroy() + + def test_sshMeminfoUserDefinedSampling(self): + def sample5(): + for s in [.1, 1, 2, .5, -10]: + yield s + tool = sshmeminfo + pl = self.map_resource_to_parameter( tool, FreeMemory ) + _, task =self.tm.generate(name = tool.name, driver = tool.driver, dataheader = tool.dataheaderdeclaration, + hookimplementations = tool.hooks, parameters = pl, credential = self.cred_novi, samplingfunction = sample5) + task.strategy = STRAT_FUNCTIONAL + task.enable() + while task.state == task.STATE_RUNNING: + sleep(1) + task.dataAdded.wait( 1 ) + self.assertGreater(len(task.data), 0, "measurement yielded empty result") + task.destroy() + + def test_SONoMAShortPing(self): + tool = sonomashortping + pl = self.map_resource_to_parameter( tool, RoundTripDelay ) + _, task =self.tm.generate(name = tool.name, driver = tool.driver, dataheader = tool.dataheaderdeclaration, + hookimplementations = tool.hooks, parameters = pl, credential = self.cred_sonoma, **tool.kwargs) + task.enable() + task.dataAdded.wait( 15 ) + self.assertGreater(len(task.data), 0, "measurement yielded empty result") + task.destroy() + + def test_SONoMAShortChirp(self): + tool = sonomashortchirp + pl = self.map_resource_to_parameter( tool, OnewayDelay ) + _, task =self.tm.generate(name = tool.name, driver = tool.driver, dataheader = tool.dataheaderdeclaration, + hookimplementations = tool.hooks, parameters = pl, credential = self.cred_sonoma, **tool.kwargs) + task.enable() + task.dataAdded.wait( 15 ) + self.assertGreater(len(task.data), 0, "measurement yielded empty result") + task.destroy() + + def test_HADES(self): + tool = sshhades + pl = self.map_resource_to_parameter( tool, RoundTripDelay ) + pl.update('SourceAddress', '192.168.31.1', self.um.ipv4dotted) + pl.update('DestinationAddress', '192.168.31.5', self.um.ipv4dotted) + _, task =self.tm.generate(name = tool.name, driver = tool.driver, dataheader = tool.dataheaderdeclaration, + hookimplementations = tool.hooks, parameters = pl, credential = self.cred_hades, **tool.kwargs) + task.enable() + task.dataAdded.wait( 15 ) + self.assertGreater(len(task.data), 0, "measurement yielded empty result") + task.destroy() + + def test_toggle(self): + tool = sshmeminfo + pl = self.map_resource_to_parameter( tool, FreeMemory ) + _, task =self.tm.generate(name = tool.name, driver = tool.driver, dataheader = tool.dataheaderdeclaration, + hookimplementations = tool.hooks, parameters = pl, credential = self.cred_novi) + n = 10 + to = 1.5 + t = time() + while n: + task.enable() + task.dataAdded.wait( to ) + task.disable() + task.dataAdded.clear() + n -= 1 + dt = time()-t + self.assertGreater(len(task.data), 0, "measurement yielded empty result") + self.assertGreater(dt, n*to, "measurement lasted longer than expected %f > %f" % (dt, n*to)) + task.destroy() + + def test_tm(self): + tool = sshmeminfo + pl = self.map_resource_to_parameter( tool, FreeMemory ) + N = len(self.tm) + # deletion by task + n= 10 + tasks = [] + while n: + _, task =self.tm.generate(name = tool.name, driver = tool.driver, dataheader = tool.dataheaderdeclaration, + hookimplementations = tool.hooks, parameters = pl, credential = self.cred_novi) + n -= 1 + tasks.append(task) + shuffle(tasks) + while len(tasks): + t = tasks.pop() + tid = self.tm.getidentifier(t) + self.tm.destroy(tid) + # destroy by taskid + n= 10 + taskids = [] + while n: + taskid, _ =self.tm.generate(name = tool.name, driver = tool.driver, dataheader = tool.dataheaderdeclaration, + hookimplementations = tool.hooks, parameters = pl, credential = self.cred_novi) + n -= 1 + taskids.append(taskid) + shuffle(taskids) + while len(taskids): + tid = taskids.pop() + self.tm.destroy(tid) + self.assertEqual(N, len(self.tm), "some tasks were not removed from the SubtaskManager") + + +if __name__ == "__main__": + #import sys;sys.argv = ['', 'Test.test_toggle'] + unittest.main() |