diff options
author | pikusa <pikusa@man.poznan.pl> | 2013-04-03 13:18:17 (GMT) |
---|---|---|
committer | pikusa <pikusa@man.poznan.pl> | 2013-04-03 13:18:17 (GMT) |
commit | 2f2a3a129c91de540e66c3bfbe30b0df1942cd4b (patch) | |
tree | 2d313cdf0068af368d4de6067d676be16f6a6464 /Monitoring/MonitoringService/Task/Task.py | |
parent | ff8aa232b071a9b54dff833714a870fd0aec0b30 (diff) | |
download | novi-public-2f2a3a129c91de540e66c3bfbe30b0df1942cd4b.zip novi-public-2f2a3a129c91de540e66c3bfbe30b0df1942cd4b.tar.gz novi-public-2f2a3a129c91de540e66c3bfbe30b0df1942cd4b.tar.bz2 |
project commit and dir tree change
Diffstat (limited to 'Monitoring/MonitoringService/Task/Task.py')
-rw-r--r-- | Monitoring/MonitoringService/Task/Task.py | 447 |
1 files changed, 447 insertions, 0 deletions
diff --git a/Monitoring/MonitoringService/Task/Task.py b/Monitoring/MonitoringService/Task/Task.py new file mode 100644 index 0000000..ce1b603 --- /dev/null +++ b/Monitoring/MonitoringService/Task/Task.py @@ -0,0 +1,447 @@ +''' +Created on 08.08.2011 + +@author: steger, jozsef +@organization: ELTE +@contact: steger@complex.elte.hu +''' + +from time import time +from threading import Thread, Event, RLock +from DataProcessing.Data import Data, DataHeader +from DataProcessing.Parameter import ParameterList +from Credential.credentialtypes import Credential +import Driver +from DataProcessing.DataSample import DataSample +import logging + +class TaskError(Exception): + pass + +''' +@var STRAT_PERIODICAL: sampling strategy, when samples are taken periodically controlled by wait +@var STRAT_ONDEMAND: sampling strategy, retrievehook is run once, then automatically Task is disabled +@var STRAT_FUNCTIONAL: sampling strategy, user provides a call back function to generate the time series to wait between consecutive sampling +''' +STRAT_PERIODICAL, STRAT_ONDEMAND, STRAT_FUNCTIONAL = range(3) + + +class SubtaskManager(object): + ''' + @author: Jozsef Steger + @summary: SubtaskManager class provides the basic functionalities: + 1. to generate a new Task + 2. to access a Task referenced by the task identifier + 3. to destroy a Task explicit or referenced by the task identifier + ''' + logger = logging.getLogger("NOVI.MS.STM") + + class Task(object): + ''' + @author: Jozsef Steger + @summary: + This class represents a simplified control plane of a wide variety of monitoring tools. + In order to generate monitoring data tools are manipulated via several steps. + Descendants of the Task class implement these steps. + In the simplified work flow the following steps are abstracted: + - prehook: it is run once to initiate the communication channel to the tool and/or to initialize a monitoring task + - starthook: in case the tool operates in such a manner, this step is run to trigger the start up of a background measurement + - retrievehook: implements data retrieval from the tool, it has to transform measurement data to the internal data representation + - stophook: it takes care for stopping any processes running in the background at the tool + - posthook: provides a means to clean up after a monitoring task is over + + Subclasses must be generated dynamically using the generateSubclass() method + + @cvar timeout: the time to wait for the background thread to finish + + @cvar STATE_SLEEPING: a state, retirevehook does not carry out data collection, monitoring is kept asleep + @cvar STATE_RUNNING: a state, retrievehook carries out data collection + + @note: Only descendants of this class are intended to be instantiated via the static method generateSubclass + + @ivar slock: lock concurrent enable/disable calls + @ivar cleanup: the function to call back after task is deleted + @ivar um: the unit manager + @ivar strategy: indicates how sampling is done, ie. how long to wait between two measurements + @ivar _wait: for periodical sampling (STRAT_PERIODICAL) this constant is yielded by the generator + @ivar gen_functional: for user defined sampling (STRAT_FUNCTIONAL) this is the generator to use + @ivar credential: credential + @ivar parameters: parameters + @ivar sliceID: slice identifier the Task belongs to, -1 means unspecified. + @ivar data: internal representation of monitoring data + @ivar inRetrievehook: Event indicating current data retrieval + @ivar dataAdded: Event indicating new data are produced by former runs + @ivar stopworker: Event indicating that the Task was disabled + @ivar t: Thread to take care of new measurements + @ivar _runcount: integer showing how many data retrieval attempt have been made + @ivar _durationrecord: a list of time stamp and duration pairs showing when data retrievals + happened and how long they took + ''' + timeout = 5 + STATE_SLEEPING, STATE_RUNNING = range(2) + + def __init__(self, taskmanager, credential, parameters = None, samplingfunction = None, **kw): + ''' + @summary: Constructor + @param taskmanager: a reference to the SubtaskManager, which needs to called back + if explicit deletion of the Task happens to keep the list of tasks coherent + @type taskmanager: SubtaskManager + @param credential: the necessary authentication information to access and control the tools + @type credential: Credential + @param parameters: the parameters to control the tools, ie. measurement details + @type parameters: ParameterList + @param samplingfunction: a generator function yielding a number, if the generator provides a + finite series of numbers, after all items are consumed the Task gets disabled + @type samplingfunction: callable + @keyword kw: parameters, which are passed to the prehook method of the Task + @raise TaskError: wrong type of credential + + + @note: deleting the strategy property disables the Task and sets the strategy to STRAT_ONDEMAND + @note: setting the strategy property takes effect only after the Task is (re)enabled + ''' + self.slock = RLock() + self.logger = taskmanager.logger + self.cleanup = taskmanager.removefromtasklist + self.um = taskmanager.um + self._strategy = STRAT_PERIODICAL + self._wait = 1 + self._sliceID = -1 + self.gen_functional = samplingfunction + if isinstance(credential, Credential): + self.credential = credential + else: + raise TaskError("wrong type of credential") + if isinstance(parameters, ParameterList): + self.parameters = parameters + else: + self.parameters = ParameterList() + D = Data(self.um, self.dataheader) + self.datasource = DataSample(table = D) + self.data = self.datasource #FIXME: left here for compatibility + #FIXME: mimic DataSource to look like Data for some methods + self.data.getTemplate = D.getTemplate + def saverecord(record): + D.saveRecord(record) + self.datasource.process() + self.data.saveRecord = saverecord + #FIXME: mimicry over + self.inRetrievehook = Event() + self.dataAdded = Event() + self.stopworker = Event() + self.stopworker.set() + try: + self.prehook(**kw) + except: + raise #TaskError() + self.t = None + self._runcount = 0 + self._durationrecord = [] + self.driver = None + self.dataheader = None + self.logger.debug("+ INIT %s" % self) + + def __del__(self): + print "DEL" + self.destroy() + + def destroy(self): + if self.state == self.STATE_RUNNING: + # let us join the thread if necessary + self.disable() + with self.slock: + self.posthook() + if self.cleanup: + # remove reference in the SubTaskManager + self.cleanup(self) + self.cleanup = None + self.logger.debug("- DESTROY %s, reference removed" % self) + else: + self.logger.debug("- DESTROY %s, reference already removed" % self) + + def wait(self, wait): + ''' + @summary: wait until Task is disabled or the provided waiting time is over + @param wait: requests time to wait for the next sample + @type wait: float + ''' + try: + self.stopworker.wait( max(.1, float(wait)) ) + except: + self.stopworker.wait( 1 ) + + @property + def sliceID(self): + return self._sliceID + @sliceID.setter + def sliceID(self, value): + if (self._sliceID != -1) and (self._sliceID != value): + raise TaskError("you can set sliceID property only once") + self._sliceID = value + @sliceID.deleter + def sliceID(self): + raise TaskError("shan't ever delete this property") + + @property + def runcount(self): + return self._runcount + @runcount.setter + def runcount(self, value): + raise TaskError("shan't ever set this property") + @runcount.deleter + def runcount(self): + raise TaskError("shan't ever delete this property") + + @property + def duration(self): + if len(self._durationrecord): + return self._durationrecord[-1][1] + else: + return -1 + + @property + def state(self): + if self.stopworker.isSet(): + return self.STATE_SLEEPING + else: + return self.STATE_RUNNING + + def state_hrn(self): + """ + @summary: return the state of the task in human readable format + @return: the state of the task + @rtype: string + """ + if self.stopworker.isSet(): + return "SLEEPING" + else: + return "RUNNING" + + @property + def strategy(self): + with self.slock: + return self._strategy + @strategy.setter + def strategy(self, value): + if value in [STRAT_ONDEMAND, STRAT_PERIODICAL, STRAT_FUNCTIONAL]: + with self.slock: + self._strategy = value + @strategy.deleter + def strategy(self): + self.disable() + with self.slock: + self._strategy = STRAT_ONDEMAND + + def gen_ondemand(self): + yield 0 + + def gen_periodic(self): + while True: + yield self._wait + + def enable(self): + """ + @summary: enable task + """ + with self.slock: + if self.t is not None: + raise TaskError("You can enable a perfectly disabled Task") + self.stopworker.clear() + # run starthook + self.starthook() + # initialize working thread + self.t = Thread(target = self._worker, name = str(self)) + self.t.setDaemon(True) + self.t.start() + + def disable(self): + """ + @summary: disable task + """ + with self.slock: + if self.t is None: + self.logger.debug("W %s already disabled" % self) + return + self.stopworker.set() + try: + # wait for working thread to finish + n = 0 + while True: + n += 1 + self.t.join(self.timeout) + if self.t.isAlive(): + self.logger.error("E %s timeout occurred %d times while disabling task" % self) + else: + break + except RuntimeError: + # generator does not provide any more waiting time interval + # thread tries to join itself + pass + # run stophook + try: + self.stophook() + finally: + self.t = None + + def _worker(self): + ''' + @summary: This method is running in a background thread. + It takes care of calling retrievehook. + If new data are produced by the tool it is indicated via dataAdded. + ''' + strategy = self.strategy + if strategy == STRAT_ONDEMAND: + generator = self.gen_ondemand + elif strategy == STRAT_PERIODICAL: + generator = self.gen_periodic + elif strategy == STRAT_FUNCTIONAL: + generator = self.gen_functional + for wait in generator(): + if not self.stopworker.isSet(): + # the task is still running + self._runcount += 1 + self.inRetrievehook.set() + invocation = time() + try: + R = self.retrievehook() + except Exception, e: + self.logger.error("E %s exception in retrievehook() %s" % (self, e)) + break + finally: + self.inRetrievehook.clear() + stopped = time() + self._durationrecord.append( (invocation, stopped - invocation) ) + if R: + self.dataAdded.set() + else: + # the task is disabled + break + self.wait(wait) + if not self.stopworker.isSet(): + # the Task is not disabled + # but there are no more items in the series of waiting time + # so we disable it + self.disable() + + def __init__(self, unitmanager): + self._tasks = {} + self.tasklock = RLock() + self._lastid = -1 + self.um = unitmanager + self.logger.debug("II: INIT %s" % self) + + def __del__(self): + self.logger.debug("II: DEL %s" % self) + + def __str__(self): + return "<SubtaskManager %s (%d tasks)>" % (id(self), len(self)) + + def __len__(self): + return len(self._tasks) + + @property + def uniqid(self): + self._lastid += 1 + return self._lastid + + @staticmethod + def _shiftMethod(implementation): + ''' + @summary: helps indentation of the piece of implemented code + ''' + return "\n\t\t".join( filter(None, implementation.splitlines()) ) + + def generate(self, name, driver, dataheader, hookimplementations, credential, parameters, samplingfunction = None, **kw): + ''' + @summary: This method is responsible for dynamically generate a new Task subclass + @param name: the name of the dynamically generated class + @type name: string + @param driver: driver implementing a communication channel to the tool + @type driver: Driver + @param dataheader: contains information of the data structure of the result + @type dataheader: DataHeader + @param hookimplementations: the work flow steps (hooks) + @type hookimplementations: dict + @param credential: the credential used by the driver + @type credential: dict + @param parameters: list of control parameters to fine tune measurements + @type parameters: ParameterList + @param samplingfunction: a generator yielding the time interval elements of a series to wait between consecutive samples + @type samplingfunction: callable + @keyword kw: extra keyword arguments passed to the prehook of the new Task + @return: identifier and the subclass instance of (Task) + @rtype: int, Task + @raise TaskError: wrong Driver type / wrong DataHeader type / erroneous implementation + ''' + prehook = self._shiftMethod( hookimplementations.get("prehook", "pass") ) + starthook = self._shiftMethod( hookimplementations.get("starthook", "pass") ) + stophook = self._shiftMethod( hookimplementations.get("stophook", "pass") ) + posthook = self._shiftMethod( hookimplementations.get("posthook", "pass") ) + retrievehook = self._shiftMethod( hookimplementations.get("retrievehook", "raise TaskException(\"retrievehook() must be implemented\")") ) + if not issubclass(driver, Driver.Driver.Driver): + raise TaskError("wrong Driver type %s" % driver) + if not isinstance(dataheader, DataHeader): + raise TaskError("wrong DataHeader type %s" % dataheader) + classtemplate = """ +import re +from DataProcessing.Data import DataHeader +class %s(SubtaskManager.Task): +\tdef prehook(self, **kw): +\t\t%s +\tdef starthook(self): +\t\t%s +\tdef retrievehook(self): +\t\t%s +\tdef stophook(self): +\t\t%s +\tdef posthook(self): +\t\t%s""" % (name, prehook, starthook, retrievehook, stophook, posthook) + try: + exec(classtemplate, globals()) + except: + self.logger.error(classtemplate) + raise TaskError("erroneous implementation (%s)" % name) + taskclass = globals()[name] + taskclass.driver = driver + taskclass.dataheader = dataheader + taskid = self.uniqid + task = taskclass(self, credential, parameters, samplingfunction, **kw) + with self.tasklock: + self._tasks[taskid] = task + self.logger.info("++: %s new Task %s is identified by %d" % (self, task, taskid)) + return taskid, task + + def __getitem__(self, taskidentifier): + taskidentifier = int(taskidentifier) + with self.tasklock: + return self._tasks[taskidentifier] + + def getidentifier(self, task): + with self.tasklock: + for tid, t in self._tasks.iteritems(): + if task == t: + return tid + raise TaskError("Task %s is unknown to me" % task) + + def removefromtasklist(self, task): + with self.tasklock: + try: + taskidentifier = self.getidentifier(task) + self.pop(taskidentifier) + except TaskError: + pass + + def pop(self, taskidentifier): + try: + task = self._tasks.pop(taskidentifier) + task.cleanup = None + self.logger.info("--: %s Task %s identified by %d removed" % (self, task, taskidentifier)) + task.destroy() + task = None + except KeyError: + self.logger.warning("WW: %s Task to remove identified by %s is unknown to me" % (self, taskidentifier)) + + def tasks_of_slice(self, sliceID = -1): + for t in self._tasks.values(): + if t.sliceID == sliceID: + yield t |