From 2f2a3a129c91de540e66c3bfbe30b0df1942cd4b Mon Sep 17 00:00:00 2001 From: pikusa Date: Wed, 03 Apr 2013 13:18:17 +0000 Subject: project commit and dir tree change --- (limited to 'Monitoring/MonitoringService') diff --git a/Monitoring/MonitoringService/Credential/SshKeygen.py b/Monitoring/MonitoringService/Credential/SshKeygen.py new file mode 100644 index 0000000..6c8042d --- /dev/null +++ b/Monitoring/MonitoringService/Credential/SshKeygen.py @@ -0,0 +1,100 @@ +''' +Created on Jul 20, 2011 + +@author: steger +''' +from M2Crypto import RSA +from base64 import b64encode +from os import chmod, path +import stat + +# paramiko provides this functionality, so maybe we don't need this class. see paramiko.PKey + +class CannotSet(Exception): + pass + +class SshKeygen(object): + ''' + Generates a pair of RSA keys. + Enables saving the keys to the file system. + ''' + def __init__(self, bits = 1024, e = 65337): + ''' + Initiates the pair of RSA keys + @param bits: the length of the keys in bits + @type bits: integer + @param e: the exponent + @type e: integer + ''' + self.rsa = RSA.gen_key(bits, e, lambda: None) + + def _check_filename(self, filename): + if path.exists(filename): + raise Exception("File exists: %s" % filename) + + @property + def private(self): + ''' + @summary: return the private key in PEM format + @return: the private key in PEM format + @rtype: string + ''' + return self.rsa.as_pem(cipher = None) + + @private.setter + def private(self, value): + raise CannotSet + + @private.deleter + def private(self): + raise CannotSet + + @staticmethod + def _convert(rsa): + return b64encode('\x00\x00\x00\x07ssh-rsa%s%s' % (rsa.pub()[0], rsa.pub()[1])) + + @property + def public(self): + ''' + @summary: return the public key in base64 format conforming to the content of authorized_keys + @return: the public key in base64 format + @rtype: string + ''' + return self._convert(self.rsa) + + @public.setter + def public(self, value): + raise CannotSet + + @public.deleter + def public(self): + raise CannotSet + + def save_private_key(self, filename): + ''' + @summary: save the private key in the file system in a named file. + @param filename: the filename to store the private key. + @type filename: string + ''' + self._check_filename(filename) + self.rsa.save_key(filename, cipher = None) + chmod(filename, stat.S_IRUSR) + + def save_public_key(self, filename): + ''' + @summary: save the public key in the file system in a named file. + @param filename: the filename to store the public key. + @type filename: string + ''' + self._check_filename(filename) + with open(filename, "w") as f: + f.write("ssh-rsa %s" % self.public) + + @staticmethod + def convert_key_from_file(filename): + ''' + @summary: convert a private key stored in a file in PEM format and return the public key in base64 format conforming to the content of authorized_keys + @return: the public key in base64 format + @rtype: string + ''' + return SshKeygen._convert( RSA.load_key(file = filename) ) diff --git a/Monitoring/MonitoringService/Credential/__init__.py b/Monitoring/MonitoringService/Credential/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/Monitoring/MonitoringService/Credential/__init__.py diff --git a/Monitoring/MonitoringService/Credential/credentialtypes.py b/Monitoring/MonitoringService/Credential/credentialtypes.py new file mode 100644 index 0000000..45da0de --- /dev/null +++ b/Monitoring/MonitoringService/Credential/credentialtypes.py @@ -0,0 +1,62 @@ +''' +Created on Oct 27, 2011 + +@author: steger, jozsef +@organization: ELTE +@contact: steger@complex.elte.hu +''' + +class Credential(object): + ''' + @summary: an empty credential to serve as an ancient class + @author: steger, jozsef + ''' + pass + +class UsernamePassword(Credential): + ''' + @summary: container for a pair of user name and password + @author: steger, jozsef + @ivar username: name of the user + @type username: str + @ivar password: password secret + @type password: str + ''' + + def __init__(self, username, password): + ''' + @summary: Constructor + @param username: the username + @type username: string + @param password: the password + @type password: string + ''' + self.username = username + self.password = password + +class UsernameRSAKey(Credential): + ''' + @summary: container for a triple of user name, private key and an optional password for the key + @author: steger, jozsef + @ivar username: name of the user + @type username: str + @ivar rsakey: a file name pointing to the user's private key secret + @type rsakey: str + @ivar password: password secret + @type password: str + ''' + + def __init__(self, username, rsakey, password = ""): + ''' + @summary: Constructor + @param username: the username + @type username: string + @param rsakey: the private key file + @type rsakey: string + @param password: the optional password to unlock the private key, default: "" + @type password: string + ''' + self.username = username + self.rsakey = rsakey + self.password = password + \ No newline at end of file diff --git a/Monitoring/MonitoringService/Credential/test.py b/Monitoring/MonitoringService/Credential/test.py new file mode 100644 index 0000000..04a59f3 --- /dev/null +++ b/Monitoring/MonitoringService/Credential/test.py @@ -0,0 +1,46 @@ +''' +Created on Aug 10, 2011 + +@author: steger +''' +import unittest +from os import close, unlink +from tempfile import mkstemp +from subprocess import Popen, PIPE +from SshKeygen import SshKeygen + +class Test(unittest.TestCase): + sshkeygen = '/usr/bin/ssh-keygen' + + def test_sshkeygen(self): + # generate a pair of RSA keys + self.key = SshKeygen() + + # save keys in a file + fid, fn = mkstemp(suffix = "_rsa") + close(fid) + unlink(fn) + self.fn_private = fn + self.fn_public = "%s.pub" % fn + self.key.save_private_key(self.fn_private) + self.key.save_public_key(self.fn_public) + + # Test the base64 format of the public key. + # convert and compare private key using ssh-keygen + proc = Popen(args = [self.sshkeygen, '-y', '-f', self.fn_private], stdout = PIPE) + converted = str(proc.communicate(input = None)[0]) + expected = "ssh-rsa %s\n" % self.key.public + self.assertEqual(expected, converted, "Base64 encoded public RSA key differs from the one generated by %s" % self.sshkeygen) + + # Test SshKeygen objects convert_key_from_file method. + expected = self.key.public + converted = SshKeygen.convert_key_from_file(self.fn_private) + self.assertEqual(expected, converted, "Base64 encoded public RSA key generated from file %s differs from expected" % self.fn_private) + + # remove generated files + unlink(self.fn_private) + unlink(self.fn_public) + +if __name__ == "__main__": + #import sys;sys.argv = ['', 'Test.test_sshkeygen'] + unittest.main() \ No newline at end of file diff --git a/Monitoring/MonitoringService/DataProcessing/Aggregator.py b/Monitoring/MonitoringService/DataProcessing/Aggregator.py new file mode 100644 index 0000000..49855fb --- /dev/null +++ b/Monitoring/MonitoringService/DataProcessing/Aggregator.py @@ -0,0 +1,288 @@ +''' +Created on Aug 10, 2011 + +@author: steger, gombos, matuszka +''' + +from DataProcessing.MeasurementLevel import Ordinal, Ratio, Interval #Nominal +from math import sqrt +from DataProcessing.DataReader import DataReader +from DataProcessing.DataHeader import DataHeader, DataHeaderCell +from DataProcessing.DataSource import DataSource +from DataProcessing.Data import Data +from DataProcessing.DataError import AggregatorError + +#FIXME: docs + +class Aggregator(DataSource): + ''' + classdocs + @cvar cn_count: the name of the column indicating the set size before aggregation + ''' + cn_count = 'Count' + + def __init__(self, datasource, cellrequest): + ''' + Constructor + @param datasource: table of records to manipulate with + @type datasource: DataSource + @param cellrequest: a column wise projection of the table is carried out, this column is kept + @type cellrequest: CellRequest + ''' + if not isinstance(datasource, DataSource): + raise AggregatorError("Wrong type of datasource %s" % datasource) + DataSource.__init__(self, dependency = datasource) + self._inputreader.extract(cellrequest = [cellrequest]) + for c in self._inputreader.headercells(): + break + if not c.dimension.level(self.dimension_compatible): + raise AggregatorError("The measurement level of input (%s) is not compatible with %s" % (c.dimension, self.name)) + header = DataHeader("%sAggregate(%s)" % (self.name, datasource.name)) + dimension = c.dimension + header.addColumn(DataHeaderCell(name = self.cn_count, dimension = dimension.manager["Countable"])) + self.cn_aggr = '%s(%s)' % (self.name, c.name) + header.addColumn(DataHeaderCell(name = self.cn_aggr, dimension = dimension, unit = c.unit)) + self._data = Data(datasource.um, header) + self._record = self._data.getTemplate(size = 1) +# self.um = datasource.um + self.source = datasource + + self._aggregate = None + +#FIXME: ez a ketto cucc tenyleg kell? + def __len__(self): + return len(self._data) + @property + def writelock(self): + return self._data.writelock + + + @property + def aggregate(self): + self.process() + return self._aggregate + + @property + def readerClass(self): + return DataReader + + @property + def dimension_compatible(self): + raise AggregatorError("dimension_compatible property is not implemented in %s" % self) + + +class Sum(Aggregator): + def __init__(self, datasource, cellrequest): + Aggregator.__init__(self, datasource, cellrequest) + self._aggregate = 0 + + @property + def dimension_compatible(self): + return Interval + + @property + def name(self): + return "Sum" + + def _process(self): + if self._inputreader.sourceCleared.isSet(): + self._inputreader.sourceCleared.clear() + self._aggregate = 0 + self._inputreader.rewind() + changed = True + else: + changed = False + for (x,) in self._inputreader: + self._aggregate += float(x) + changed = True + if changed: + self._data.clear() + self._record.update(name = self.cn_aggr, values = (self._aggregate,)) + self._record.update(name = self.cn_count, values = (len(self.source),)) + self._data.saveRecord(self._record) + return self.CLEARED | self.EXPANDED + return self.PASS + +class Min(Aggregator): + def __init__(self, datasource, cellrequest): + Aggregator.__init__(self, datasource, cellrequest) + + @property + def dimension_compatible(self): + return Ordinal + + @property + def name(self): + return "Min" + + def _process(self): + if self._inputreader.sourceCleared.isSet(): + self._inputreader.sourceCleared.clear() + self._aggregate = None + self._inputreader.rewind() + if self._aggregate is None: + samples = [] + else: + samples = [self._aggregate] + for (x,) in self._inputreader: + samples.append( float(x) ) + newvalue = min(samples) + if self._aggregate != newvalue: + self._aggregate = newvalue + self._data.clear() + self._record.update(name = self.cn_aggr, values = (self._aggregate,)) + self._record.update(name = self.cn_count, values = (len(self.source),)) + self._data.saveRecord(self._record) + return self.CLEARED | self.EXPANDED + return self.PASS + +class Max(Aggregator): + def __init__(self, datasource, cellrequest): + Aggregator.__init__(self, datasource, cellrequest) + + @property + def dimension_compatible(self): + return Ordinal + + @property + def name(self): + return "Max" + + def _process(self): + if self._inputreader.sourceCleared.isSet(): + print "MAX src cleared" + self._inputreader.sourceCleared.clear() + self._aggregate = None + self._inputreader.rewind() + if self._aggregate is None: + samples = [] + else: + samples = [self._aggregate] + for (x,) in self._inputreader: + samples.append( float(x) ) + print "SAMPLES", samples + newvalue = max(samples) + print "MAX", newvalue, "of", len(self.source) + if self._aggregate != newvalue: + print "SETTING" + self._aggregate = newvalue + self._data.clear() + self._record.update(name = self.cn_aggr, values = (self._aggregate,)) + self._record.update(name = self.cn_count, values = (len(self.source),)) + self._data.saveRecord(self._record) + print "XXX", self._record.record + return self.CLEARED | self.EXPANDED + return self.PASS + +class Mean(Aggregator): + def __init__(self, datasource, cellrequest): + Aggregator.__init__(self, datasource, cellrequest) + self._sum = 0 + + @property + def dimension_compatible(self): + return Ratio + + @property + def name(self): + return "Mean" + + def _process(self): + changed = False + for (x,) in self._inputreader: + self._sum += float(x) + changed = True + if changed: + self._aggregate = self._sum / float(len(self.source)) + self._data.clear() + self._record.update(name = self.cn_aggr, values = (self._aggregate,)) + self._record.update(name = self.cn_count, values = (len(self.source),)) + self._data.saveRecord(self._record) + return self.CLEARED | self.EXPANDED + return self.PASS + +class Deviation(Aggregator): + def __init__(self, data, cellrequest): + Aggregator.__init__(self, data, cellrequest) + self._emp = True + + @property + def empirical(self): + return self._emp + @empirical.setter + def empirical(self, emp): + self._emp = bool(emp) + + @property + def dimension_compatible(self): + return Ratio + + @property + def name(self): + return "StdDev" + + def _process(self): + changed = False + aggr = 0 + data = [] + self._inputreader.rewind() + for (x,) in self._inputreader: + x = float(x) + aggr += x + data.append(x) + changed = True + if changed: + n = float(len(data)) + avg = aggr / n + s2 = map(lambda x: (x-avg)*(x-avg), data) + if self.empirical: + self._aggregate = sqrt(sum(s2) / (n+1)) + else: + self._aggregate = sqrt(sum(s2) / n) + self._data.clear() + self._record.update(name = self.cn_aggr, values = (self._aggregate,)) + self._record.update(name = self.cn_count, values = (len(self.source),)) + self._data.saveRecord(self._record) + return self.CLEARED | self.EXPANDED + return self.PASS + +class Percentile(Aggregator): + def __init__(self, data, cellrequest): + self._percentile = .75 + Aggregator.__init__(self, data, cellrequest) + + @property + def percentile(self): + return self._percentile + @percentile.setter + def percentile(self, percentile): + self._percentile = max(0, min(1, float(percentile))) + + @property + def dimension_compatible(self): + return Ordinal + + @property + def name(self): + return "Percentile_%d%%" % int(round(100 * self.percentile)) + + def _process(self): + data = [] + self._inputreader.rewind() + for (x,) in self._inputreader: + data.append(x) + data.sort() + n = len(data) + p = int((n - 1) * self.percentile) + if n % 2: + val = data[p] + else: + val = .5 * (data[p] + data[p+1]) + if self._aggregate != val: + self._aggregate = val + self._data.clear() + self._record.update(name = self.cn_aggr, values = (self._aggregate,)) + self._record.update(name = self.cn_count, values = (len(self.source),)) + self._data.saveRecord(self._record) + return self.CLEARED | self.EXPANDED + return self.PASS diff --git a/Monitoring/MonitoringService/DataProcessing/AggregatorManager.py b/Monitoring/MonitoringService/DataProcessing/AggregatorManager.py new file mode 100644 index 0000000..16d78fc --- /dev/null +++ b/Monitoring/MonitoringService/DataProcessing/AggregatorManager.py @@ -0,0 +1,48 @@ +''' +Created on Dec 10, 2012 + +@author: steger +''' +from DataProcessing.Aggregator import AggregatorError, Aggregator +from DataProcessing.Sampler import Sampler +from DataProcessing.Parameter import ParameterList + +class AggregatorManager(object): + def __init__(self): + self._id = 0; + self._aggregators = {} + + def newAggregator(self, dataSource, cellrequest, commandflow): + for c, ca in commandflow: + if issubclass(c, Aggregator): + dataSource = c(dataSource, cellrequest) + if isinstance(ca, ParameterList): + for p in ca: + dataSource.__setattr__(p.name, p.value[0]) + else: + for k, v in ca.iteritems(): + dataSource.__setattr__(k, v) + elif issubclass(c, Sampler): + dataSource = c(dataSource) + if isinstance(ca, ParameterList): + for p in ca: + dataSource.__setattr__(p.name, p.value[0]) + else: + for k, v in ca.iteritems(): + dataSource.__setattr__(k, v) + self._id += 1 + self._aggregators[ self._id ] = dataSource + return self._id, dataSource + + def __getitem__(self, aggregatorid): + try: + return self._aggregators[ aggregatorid ] + except: + raise AggregatorError("Aggregator with id %s not found" % aggregatorid) + + def pop(self, aggregatorid): + try: + self._aggregators.pop( aggregatorid ) + except KeyError: + print "WW: Aggregator with id %s not found" % aggregatorid + \ No newline at end of file diff --git a/Monitoring/MonitoringService/DataProcessing/Bool.py b/Monitoring/MonitoringService/DataProcessing/Bool.py new file mode 100644 index 0000000..54cbb4e --- /dev/null +++ b/Monitoring/MonitoringService/DataProcessing/Bool.py @@ -0,0 +1,62 @@ +''' +Created on Mar 22, 2013 + +@author: steger +''' +from DataProcessing.DataError import DataError + +#FIXME: this is a DataSource? +class Comparator(object): + ''' + classdocs + ''' + def __init__(self, datasource): + self._datasource = datasource + + @property + def value(self): + raise DataError("Implement value property") + +class IsPositive(Comparator): + ''' + ''' + @property + def name(self): + return "IsPositive(%s)" % self._datasource.name + + @property + def value(self): + return self._datasource.value > 0 + +class IsNegative(Comparator): + ''' + ''' + @property + def name(self): + return "IsNegative(%s)" % self._datasource.name + + @property + def value(self): + return self._datasource.value < 0 + +class IsNotPositive(Comparator): + ''' + ''' + @property + def name(self): + return "IsNotPositive(%s)" % self._datasource.name + + @property + def value(self): + return self._datasource.value <= 0 + +class IsNotNegative(Comparator): + ''' + ''' + @property + def name(self): + return "IsNotNegative(%s)" % self._datasource.name + + @property + def value(self): + return self._datasource.value >= 0 \ No newline at end of file diff --git a/Monitoring/MonitoringService/DataProcessing/Data.py b/Monitoring/MonitoringService/DataProcessing/Data.py new file mode 100644 index 0000000..60ace66 --- /dev/null +++ b/Monitoring/MonitoringService/DataProcessing/Data.py @@ -0,0 +1,281 @@ +''' +Created on Sep 1, 2011 + +@author: steger, jozsef +@organization: ELTE +@contact: steger@complex.elte.hu +@author: laki, sandor +''' + +from threading import RLock, Event +from DataProcessing.DataHeader import DataHeaderCell, DataHeader, DataError +from DataProcessing.Unit import UnitManager + + +class Data(object): + ''' + @author: steger, jozsef + @summary: + This class implements the representation of data provided by a tool or other sort of data source. + This class contains the data in a tabular format. + The meta information of the columns are dictated by a DataHeader instance. + All items in the same column are data of the same kind, + whereas all data in the same record (row) are correlated. + + Contents of cells of a given column are either single items or Data objects + as prescribed by the header of the table. + + New records can be added using the Record class, for which a template generator is provided by the class. + + DataReaders access the content of this class, they need to register Events to get notified + of new data insertion or of content erasure. + ''' + + class Record(object): + ''' + @author: steger, jozsef + @summary: + This class represents a given set of records that can be appended to a Data table. + It provides useful methods manipulate data within the record. + ''' + def __init__(self, unitmanager, dataheader, size = 1): + ''' + @summary: Constructor + @param unitmanager: necessary to handle conversion + @type unitmanager: UnitManager + @param dataheader: the record conforms to the data header provided here + @type dataheader: DataHeader + @param size: the number of items to handle at once, default is 1 + @type size: integer + ''' + self.um = unitmanager + self.record = {} + self.units = {} + self.subheaders = {} + self.subrecords = {} + self.size = size + self.names = dataheader._cellnames + for name, cell in dataheader._cells.iteritems(): + if isinstance(cell, DataHeaderCell): + self.record[name] = [ None ] * self.size + self.units[name] = cell.unit + elif isinstance(cell, DataHeader): + self.subheaders[name] = cell + else: + raise DataError("Data header declaration is wrong") + + def __str__(self): + return ": " % (id(self), self.size, ','.join(self.record.keys()), ','.join(self.subheaders.keys())) + + def clear(self, size = None): + ''' + @summary: Clean the record containers and optionally resize the container + @note: if DataRecord container is resized, sub record pointers are invalidated + @param size: the requested new size of the container, default is None, which means keep the original size + @type size: integer + ''' + if size is None: + for name in self.record.keys(): + self.record[name] = [ None ] * self.size + if self.subrecords.has_key(name): + for r in self.subrecords[name]: + r.clear() + else: + self.size = size + for name in self.record.keys(): + self.record[name] = [ None ] * self.size + self.subrecords.clear() + + def getRecordTemplates(self, name, sizes = None): + ''' + @summary: Sub record templates are pointing to table valued cells. This method allocates container to those data structures. + @param name: the column name, that point to table valued columns + @type name: string + @param sizes: a list of integers that indicate the sizes of each sub tables. Default is None, which means the allocation of single row containers + @type sizes: list/tuple of integers or None + @return: a list of Record containers with size items + @rtype: a list of Record + @raise DataError: column name not found / wrong record sizes + ''' + if sizes == None: + sizes = [1] * self.size + if len(sizes) != self.size: + raise DataError("wrong record sizes requested") + if not self.subheaders.has_key(name): + raise DataError("Cannot find column name: %s" % name) + hdr = self.subheaders[name] + self.subrecords[name] = [] + while len(sizes): + self.subrecords[name].append( Data.Record(unitmanager = self.um, dataheader = hdr, size = sizes.pop(0)) ) + return self.subrecords[name] + + def update(self, name, values, unit = None): + ''' + @summary: Update a the column with the new value and make sure the unit is converted to the current unit of the model + @param name: the name of the column + @type name: string + @param values: a list of data values to update the cells + @type values: list + @param unit: the unit of the values in the list, default is None, which means it is the same as the current unit stated in the unit model + @type unit: string or None + @raise DataError: missing column name / table valued cells / size mismatch + ''' + if not self.record.has_key(name): + raise DataError("Record has no column named %s" % name) + if not self.units.has_key(name): + raise DataError("Cannot update column named %s (table valued cells)" % name) + if len(values) != self.size: + raise DataError("The size of values don't match expected %d and got %d" % (len(values), self.size)) + if unit is None: + self.record[name] = values[:] + elif isinstance(unit, UnitManager.Unit): + myunit = self.units[name] + if unit == myunit: + self.record[name] = values[:] + else: + self.record[name] = [ self.um.convert(value = quantity, from_unit = unit, to_unit = myunit) for quantity in values ] + else: + raise DataError("wrong type of unit") + + def updateMany(self, names, values, units = None): + ''' + @summary: Update more columns with a single call + @param names: a list of the non-table valued columns to update + @type names: list/tuple of string + @param values: a matrix of data values + @type values: list of list of value + @param units: a list of units corresponding to each columns, default is None, meaning everything is expected to be in the current unit + @type units: list/tuple of sting or None + @raise DataError: size mismatch / unknown column name + ''' + names = list(names) + if len(values) != self.size: + raise DataError("The size of values don't match %d" % self.size) + for name in names: + if not self.record.has_key(name): + raise DataError("Record has no column named %s" % name) + transpose = dict( map(lambda n: (n, []), names) ) + s = len(names) + idxs = range(s) + while len(values): + value = values.pop(0) + if len(value) == s: + for idx in idxs: + transpose[names[idx]].append(value.pop(0)) + else: + raise DataError("Record size does not match") + if units is None: + units = [ None ] * s + else: + units = list(units) + while len(names): + name = names.pop(0) + unit = units.pop(0) + self.update(name = name, values = transpose[name], unit = unit) + + def extract(self): + ''' + @summary: Extract values stored in this record represented in a list in the order of names + @return: a list of values + @rtype: list + ''' + retval = [] + idx = 0 + while idx < self.size: + rec = [] + for name in self.names: + if self.record.has_key(name): + rec.append( self.record[name][idx] ) + elif self.subrecords.has_key(name): + rec.append( self.subrecords[name][idx].extract() ) + idx += 1 + retval.append(tuple(rec)) + return retval + + def __init__(self, unitmanager, header): + ''' + @summary: Constructor + @param unitmanager: necessary to handle conversion + @type unitmanager: UnitManager + @param header: the header declaration of the data table + @type header: DataHeader + @raise DataError: raised upon wrong table header is given + ''' + if not isinstance(header, DataHeader): + raise DataError("attempt to allocate table with a wrong header") + self.um = unitmanager + self.header = header + self._rawrecords = [] + self._chunks = [] + self.readlock = RLock() + self.writelock = RLock() + self.evExpanded = Event() + self.evCleared = Event() + + def __str__(self): + ''' + @summary: returns the name of the table and the python object id + @return: abbreviated representation of the table + @rtype: string + ''' + return "" % (self.header.name, id(self)) + + def __len__(self): + return len(self._rawrecords) + + def __getitem__(self, k): + return self._rawrecords.__getitem__(k) + + @property + def name(self): + ''' + @summary: the name of the data is defined by the header + @return: the name of the header + @rtype: string + ''' + return self.header.name + + @property + def tail(self): + ''' + @summary: Tail property indicates how many new records have been saved to the table in the last call + @return: number of new records + @rtype: integer + ''' + try: + return self._chunks[-1] + except IndexError: + return 0 + + def getTemplate(self, size = 1): + ''' + @summary: Generate a helper class to extend the table with new values + @param size: the size of the new records wished to handle together, default is 1 + @type size: integer + @return: an empty row with the structure dictated by the header of the table + @rtype: Record + ''' + return self.Record(unitmanager = self.um, dataheader = self.header.getHeader(self.header.name), size = size) + + def saveRecord(self, record): + ''' + @summary: append values stored in the record to the table + @param record: a record with new data values + @type record: DataRecord + ''' + #TODO: check if record is not corrupted + newrecords = record.extract() + with self.writelock: + self._rawrecords.extend( newrecords ) + self._chunks.append( len(newrecords) ) + self.evExpanded.set() + + def clear(self): + ''' + @summary: delete all data records stored + ''' + with self.writelock: + self._rawrecords = [] + self._chunks = [] + self.evCleared.set() + diff --git a/Monitoring/MonitoringService/DataProcessing/DataError.py b/Monitoring/MonitoringService/DataProcessing/DataError.py new file mode 100644 index 0000000..4933f85 --- /dev/null +++ b/Monitoring/MonitoringService/DataProcessing/DataError.py @@ -0,0 +1,29 @@ +''' +Created on Dec 20, 2012 + +@author: steger, jozsef +@organization: ELTE +@contact: steger@complex.elte.hu +''' + +class DataError(Exception): + pass + +class PrefixError(DataError): + pass + +class UnitError(DataError): + pass + +class DimensionError(DataError): + pass + +class ParameterError(DataError): + pass + +class SamplerError(DataError): + pass + +class AggregatorError(DataError): + pass + diff --git a/Monitoring/MonitoringService/DataProcessing/DataFormatter.py b/Monitoring/MonitoringService/DataProcessing/DataFormatter.py new file mode 100644 index 0000000..1871ab4 --- /dev/null +++ b/Monitoring/MonitoringService/DataProcessing/DataFormatter.py @@ -0,0 +1,155 @@ +''' +Created on 08.08.2011 + +@author: steger +''' +from DataProcessing.DataReader import DataReader +from DataProcessing.Data import Data +from DataProcessing.DataError import DataError + +class Formatter(object): + def __init__(self, datasource): + ''' + Constructor + ''' + self.source = datasource + self.reader = DataReader(datasource) + + def _cell(self): + raise DataError("Implement _cell() method") + + def header(self): + raise DataError("Implement header() method") + + def serialize(self): + raise DataError("Implement serialize() method") + + @property + def name(self): + return self.source.name + + @property + def sourceExpanded(self): + return self.reader.sourceExpanded.isSet() + + @property + def sourceCleared(self): + return self.reader.sourceCleared.isSet() + +class JsonFormatter(Formatter): + ''' + @summary: + Serialize Data in JSON format + ''' + + def _cell(self, c): + ''' + @summary: serialize a column in JSON format + ''' + try: + feature = "\n \"FEATURE\": \"%s\"," % c.feature + except: + feature = "" + score = c.unit.reference.count('_') + if score == 0: + ret = """{%s + "NAME" : "%s", + "DIMENTSION" : "%s", + "UNIT" : "%s" + }""" % (feature, c.name, c.dimension.name, c.unit.reference) + elif score == 1: + prefix, base = c.unit.reference.split('_') + ret = """{%s + "NAME" : "%s", + "DIMENTSION" : "%s", + "PREFIX" : "%s", + "UNIT" : "%s" + }""" % (feature, c.name, c.dimension.name, prefix, base) + else: + ret = "ERROR: %s" % c + return ret + + def header(self): + ''' + @summary: serialize full header + ''' + return """{ + "NAME" : "DataHeader %s", + "HDRINFO" : [ + %s + ] + }""" % (id(self.source._data.header), ",\n ".join([ self._cell(c) for c in self.reader.headercells() ])) + + def serialize(self): + ''' + @summary: serialize the header and the new lines of the table into JSON format + @return: formatted string representation of the table + @rtype: string + ''' + self.source.process() + if self.sourceCleared: + self.reader.rewind() + if not self.sourceExpanded: + return "" + r = [] + for rec in self.reader: + st = [] + for d in rec: + if isinstance(d, Data): + #FIXME: + st.append( d._dump() ) + else: + st.append( str(d) ) + r.append("[ %s ]" % ", ".join(st)) + return """{ + "TYPE" : "%s", + "ID" : %d, + "HDR" : %s, + "DATA" : [ + %s + ] +}""" % (self.source._data.header.name, id(self), self.header(), ",\n ".join(r)) + +class DumbFormatter(Formatter): + ''' + @summary: + Serialize Data in a trivial format + ''' + + def _cell(self, c): + ''' + @summary: serialize column + ''' + try: + return "%s (%s/%s) [%s]" % (c.name, c.feature, c.dimension.name, c.unit) + except: + return "%s (/%s) [%s]" % (c.name, c.dimension.name, c.unit) + + def header(self): + ''' + @summary: serialize full header + ''' + return ": {%s: [%s]}" % (id(self.source._data.header), self.name, ", ".join([ self._cell(c) for c in self.reader.headercells() ])) + + def serialize(self): + ''' + @summary: serialize the header and the new lines of the table + @return: formatted string representation of the table, if no new data are ready the empty string is returned + @rtype: string + ''' + self.source.process() + if self.sourceCleared: + self.reader.rewind() + if not self.sourceExpanded: + return "" + r = [] + for rec in self.reader: + st = [] + for d in rec: + if isinstance(d, Data): + #FIXME: + st.append( d._dump() ) + else: + st.append( str(d) ) + r.append("(%s)" % ", ".join(st)) + return "{%s:\nHDR:%s\n DATA:[\n%s\n]}" % (str(self), self.header(), ", \n".join(r)) diff --git a/Monitoring/MonitoringService/DataProcessing/DataHeader.py b/Monitoring/MonitoringService/DataProcessing/DataHeader.py new file mode 100644 index 0000000..722094e --- /dev/null +++ b/Monitoring/MonitoringService/DataProcessing/DataHeader.py @@ -0,0 +1,200 @@ +''' +Created on Sep 1, 2011 + +@author: steger, jozsef +@organization: ELTE +@contact: steger@complex.elte.hu +@author: laki, sandor +''' + +from DataProcessing.Dimension import DimensionManager +from DataProcessing.DataError import DataError +from DataProcessing.DataHeaderCell import DataHeaderCell, CellRequestByName,\ + CellRequestByFeature + +class DataHeader(object): + ''' + @author: steger, jozsef + @summary: + This class represents the full header of a table. + One can construct the header as a single step, + if they provide a header description or they can use + methods to add new columns. + + In order to be able to represent a wide variety of data and relationship + between them, a column can refer to another table. + In that latter case a specific column refer to another DataHeader. + ''' + + def __init__(self, name): + ''' + @summary: Constructor + @param name: the name of the table + @type name: string + @raise DataError: corrupt header description + ''' + self._name = name + self._cellnames = [] + self._cells = {} + + def __iter__(self): + for cn in self._cellnames: + yield self._cells[cn] + + def __len__(self): + ''' + @summary: Return the number of columns + @return: the number of columns currently set + @rtype: integer + ''' + return len(self._cellnames) + + def __eq__(self, header): + ''' + @summary: Comparison operator of table headers. + Two tables are declared equal, if all the columns' names and their unit models are the same. + Two headers are still regarded equal if the order of their columns are different + or the current unit of the corresponding columns are not the same. + @raise DataError: if not DataHeader instances are compared + @return: True if both the header name and all columns match, their order may vary + @rtype: boolean + ''' + if not isinstance(header, DataHeader): + raise DataError("wrong type to compare") + if self.name != header.name: + return False + if len(self._cellnames) != len(header._cellnames): + return False + if self._cells.keys() != header._cells.keys(): + return False + for n in self._cellnames: + if self._cells[n] != header._cells[n]: + return False + return True + + def __ne__(self, header): + ''' + @summary: comparison operator of table headers. + @return: True if tables headers differ + @rtype: boolean + ''' + return not self.__eq__(header) + + @property + def name(self): + return self._name + + def has_name(self, name): + ''' + @summary: Check for the existence of a given column name + @param name: the name of the column looking for + @type name: string + @return: true if such a name exists + @rtype: boolean + ''' + return name in self._cellnames + + def addColumn(self, cell): + ''' + @summary: Append a new column at the end of the current table header structure + @param cell: pointer to the header of the new column + @type cell: DataHeader or DataHeaderCell + @raise DataError: cell is of a wrong type + ''' + ishdr = isinstance(cell, DataHeader) + if ishdr or isinstance(cell, DataHeaderCell): + name = cell.name + if self.has_name(name): + raise DataError("attempt to add a column with an already existing name (%s)" % cell.name) + self._cells[name] = cell + self._cellnames.append(name) + else: + raise DataError("attempt to add a wrong type of header cell") + + def removeColumn(self, name): + ''' + @summary: remove a named column if it exists in the header. Otherwise do silently nothing + @param name: the name of the column to remove + @type name: string + ''' + if self.has_name(name): + self._cells.pop(name) + self._cellnames.pop(self._cellnames.index(name)) + + def getHeader(self, name): + ''' + @summary: Return a pointer to the named sub header in the naming hierarchy + @param name: the name of the sub header searched + @type name: string + @raise DataError: name not found + ''' + if name.count('.') == 0: + if name == self.name: + return self + if self.has_name(name) and isinstance(self._cells[name], DataHeader): + return self._cells[name] + elif name.count('.') == 1: + n_pre, n_post = name.split('.', 1) + if n_pre == self.name and self.has_name(n_post) and isinstance(self._cells[n_post], DataHeader): + return self._cells[n_post] + else: + n_pre, n, n_post = name.split('.', 2) + if n_pre == self.name and self.has_name(n) and isinstance(self._cells[n], DataHeader): + return self._cells[n].getHeader(n_post) + raise DataError("Lost in the naming hierarchy: %s < %s" % (self.name, name)) + +#FIXME: complex table lookup is not implemented + def getCell(self, cellrequest): + ''' + @summary: Return the index and the cell referenced by a name + @param cellrequest: + @type name: CellRequest + @return: index and the cell + @rtype: (int, Cell) + @raise DataError: name not found + ''' + if isinstance(cellrequest, CellRequestByName): + name = cellrequest.name + try: + yield (self._cellnames.index(name), self._cells[name]) + except: + DataError("Cell with name %s not found" % name) + elif isinstance(cellrequest, CellRequestByFeature): + for c in self: + try: + if cellrequest == c: + yield (self._cellnames.index(c.name), c) + except DataError: + continue + else: + raise DataError("wrong request type") + + + +class DataHeaderGeneratedByDescription(DataHeader): + def __init__(self, name, headerdescription): + ''' + @summary: Constructor + @param name: the name of the table + @type name: string + @param headerdescription: the description of a full table header + @param headerdescription: list or None + @raise DataError: corrupt header description + ''' + DataHeader.__init__(self, name) + for item in headerdescription: + if len(item) == 2: + name, description = item + unit = None + else: + name, description, unit = item + if self.has_name(name): + raise DataError("Duplicate column name declaration (%s)" % name) + if description is None or isinstance(description, DimensionManager.Dimension): + cell = DataHeaderCell(name = name, dimension = description, unit = unit) + self.addColumn(cell) + elif isinstance(description, list): + hdr = DataHeaderGeneratedByDescription(name = name, headerdescription = description) + self.addColumn(hdr) + else: + raise DataError("corrupt header description (%s)" % name) diff --git a/Monitoring/MonitoringService/DataProcessing/DataHeaderCell.py b/Monitoring/MonitoringService/DataProcessing/DataHeaderCell.py new file mode 100644 index 0000000..f3bdb16 --- /dev/null +++ b/Monitoring/MonitoringService/DataProcessing/DataHeaderCell.py @@ -0,0 +1,180 @@ +''' +Created on Dec 20, 2012 + +@author: steger, jozsef +@organization: ELTE +@contact: steger@complex.elte.hu +''' + +from DataProcessing.Dimension import DimensionManager +from DataProcessing.DataError import DataError +from DataProcessing.Unit import UnitManager + +class Cell(object): + ''' + @summary: This class is a skeleton to represent the meta information of a single table column. + It combines the following information: + - the name of the cell, + - the feature associated to the underlying data, + - the dimension of the underlying data, + - the unit of the underlying data, + @ivar name: the name of the cell + @type name: str + @ivar dimension: the dimension of the cell + @type dimension: L{Dimension} + @ivar unit: the unit of a cell, if not set, the default unit of the dimension is applied + @type unit: L{Unit} + @ivar feature: the metric of the column + @type feature: str + ''' + def __init__(self): + self._name = None + self._dimension = None + self._unit = None + self._feature = None + + @property + def name(self): + if self._name is None: + raise DataError("name property is not set") + return self._name + @name.setter + def name(self, name): + if not isinstance(name, basestring): + raise DataError("name is not a string") + if name.count('.'): + raise DataError("name must not contain any periods (%s)" % name) + if self._name is not None and self._name != name: + raise DataError("name property cannot be modified") + self._name = name + + @property + def dimension(self): + if not self._dimension: + raise DataError("dimension property is not set") + return self._dimension + @dimension.setter + def dimension(self, dimension): + if not isinstance(dimension, DimensionManager.Dimension): + raise DataError("dimension is invalid") + if self._unit is not None: + if not dimension.containsUnit(self._unit): + raise DataError("unit %s is not in the basin of dimension %s" % (self.unit, dimension)) + self._dimension = dimension + + @property + def unit(self): + if self._unit is None: + return self.dimension.unit + else: + return self._unit + @unit.setter + def unit(self, unit): + if not isinstance(unit, UnitManager.Unit): + raise DataError("unit is invalid") + if self._dimension is not None: + if not self.dimension.containsUnit(unit): + raise DataError("unit %s is not in the basin of dimension %s" % (unit, self.dimension)) + self._unit = unit + + @property + def feature(self): + if self._feature is None: + raise DataError("feature property is not set") + return self._feature + @feature.setter + def feature(self, feature): + if self._feature is not None and self._feature != feature: + raise DataError("feature property cannot be modified") + self._feature = feature + + def __eq__(self, cell): + ''' + @summary: comparison operator of two columns' meta + @return: True if column names, features, units and dimensions match + @rtype: bool + ''' + if not isinstance(cell, Cell): + raise DataError("type error expecting Cell for comparison") + return self._name == cell._name and self._feature == cell._feature and self._unit == cell._unit and self._dimension == cell._dimension + + def __ne__(self, cell): + ''' + @summary: comparison operator of two columns' meta + @return: True if column names or their units differ + @rtype: bool + ''' + return not self.__eq__(cell) + +class DataHeaderCell(Cell): + ''' + @summary: represents meta information of a single column + ''' + def __init__(self, name, dimension, feature = None, unit = None): + ''' + @summary: constructor + @param name: the nema of the cell + @type name: str + @param dimension: the dimension of the cell + @type dimension: L{Dimension} + @param feature: pointer if it is a monitoring feature + @param unit: indicates the unit of a column if it is different from the default + @type unit: L{Unit} + ''' + Cell.__init__(self) + self.name = name + self.dimension = dimension + if unit is not None and unit != dimension.unit: + self.unit = unit + if feature is not None: + self.feature = feature + +class CellRequest(Cell): + ''' + @summary: skeleton, which is used to search the among meta information. It is basically a cell with missing certain details + ''' + pass + +class CellRequestByName(CellRequest): + ''' + @summary: This class represents the user request for a data column matching the name of the column. + One can specify the requested unit. + ''' + def __init__(self, name, unit = None): + ''' + @summary: Constructor + @param name: the name of the requested column + @type name: str + @param unit: the requested unit, default is None, which means no conversion request + @type unit: L{Unit} or None + ''' + Cell.__init__(self) + self.name = name + if unit is not None: + self.unit = unit + + def __eq__(self, cell): + return self.name == cell.name + +class CellRequestByFeature(CellRequest): + ''' + @author: steger, jozsef + @summary: + This class represents the user request for a data column(s) matching the feature of the column. + One can also specify the requested unit. + ''' + def __init__(self, feature, unit = None): + ''' + @summary: Constructor + @param feature: the feature of the requested column + @type feature: str + @param unit: the requested unit, default is None, which means no conversion request + @type unit: L{Unit} or None + ''' + Cell.__init__(self) + self.feature = feature + if unit is not None: + self.unit = unit + + def __eq__(self, cell): + return self.feature == cell.feature diff --git a/Monitoring/MonitoringService/DataProcessing/DataIndex.py b/Monitoring/MonitoringService/DataProcessing/DataIndex.py new file mode 100644 index 0000000..84d8390 --- /dev/null +++ b/Monitoring/MonitoringService/DataProcessing/DataIndex.py @@ -0,0 +1,39 @@ +''' +Created on Dec 31, 2012 + +@author: steger +''' +from DataProcessing.DataReader import DataReader + +class DataIndex(DataReader): + ''' + classdocs + ''' + + def __init__(self, datasource, key): + ''' + Constructor + ''' + DataReader.__init__(self, datasource) + self.indexmap = {} + self.extract(cellrequest = key) + + def buildindex(self): + i = len(self.indexmap) + for k in self: + self.indexmap[tuple(k)] = i + i += 1 + + def __getitem__(self, k): + if self.sourceCleared.isSet(): + self.sourceCleared.clear() + self.indexmap.clear() + self.buildindex() + try: + iter(k) + except TypeError: + k = (k,) + if not self.indexmap.has_key(k) and self.sourceExpanded.isSet(): + self.sourceExpanded.clear() + self.buildindex() + return self.source._rawrecords[ self.indexmap[k] ] \ No newline at end of file diff --git a/Monitoring/MonitoringService/DataProcessing/DataReader.py b/Monitoring/MonitoringService/DataProcessing/DataReader.py new file mode 100644 index 0000000..b9734e7 --- /dev/null +++ b/Monitoring/MonitoringService/DataProcessing/DataReader.py @@ -0,0 +1,169 @@ +''' +Created on Nov 19, 2012 + +@author: steger +''' +from threading import Event, Lock +from DataProcessing.DataHeader import DataError +from DataProcessing.DataSource import DataSource + +#FIXME: docs +class DataReader(object): + ''' + This class is an extension to the DataSource class. + It provides an iterator over the rows / records of the DataSource. + When the iterator is invoked several times only new records are yielded. + In order to access rows, which have already been iterated, use the rewind() method to move the pointer to the first record. + + By default iteration yields all columns. + In case user is interested in a specific slice of the table (or wants to retrieve row records on a different column order), + they can do so by invoking the extract method, which expects an ordered list of the interesting column names. + Besides the column names user may indicate the requested unit, in which case iteration will yield properly transformed data. + + DataReader objects register clear and expand events in the underlying DataSource class instance in order to catch signal upon + new data insertion or deletion. + ''' + + def __init__(self, datasource): + ''' + Constructor + @param datasource: the + @type datasource: DataSource + ''' + if not isinstance(datasource, DataSource): + raise DataError("Expect DataSource, got %s" % datasource) + self.source = datasource + self.sourceCleared = Event() + self.sourceExpanded = Event() + self.readlock = Lock() + datasource.registerReader(self) + self._seq = 0 + self._extractmap = None + self._conversionmap = None + self.extract() + + def __del__(self): + self.source.deregisterReader(self) + + @property + def processedrecords(self): + ''' + @summary: This property indicates how many records are processed by this reader + @return: the index of the record iterator + @rtype: integer + @note: the current value may be unreliable if an iteration is currently carried out + ''' + return self._seq + @processedrecords.setter + def processedrecords(self, index): + ''' + @summary: set the iterator to a given position. A negative index means rewinding by that many rows + @param index: position description + @type index: integer + ''' + index = int(index) + if index < 0: + self._seq = max(0, self._seq + index) + else: + self._seq = min(index, len(self.source)) + @processedrecords.deleter + def processedrecords(self): + ''' + @summary: rewind to the first record row + ''' + self._seq = 0 + + def rewind(self): + ''' + @summary: sets the next row record to the first item. + ''' + del self.processedrecords +# self.sourceCleared.clear() + +#FIXME: DataSampleReader!!! + def __iter__(self): + with self.readlock: + self.sourceCleared.clear() + while self._seq < len(self.source): + if self.sourceCleared.isSet(): + raise DataError("Data cleared while reading records %s %s" % (self, self.source)) + self._seq += 1 + yield self._extract(self._seq - 1) + self.sourceExpanded.clear() + raise StopIteration + + def sourcecleared(self): + with self.source.writelock: + self.sourceCleared.set() + + def sourceexpanded(self): + with self.source.writelock: + self.sourceExpanded.set() + +#FIXME: Sample specifik + def headercells(self): + ''' + @summary: iterator over those columns of the Data which are relevant (i.e. which are extracted) + @return: generator + @rtype: DataHeaderCell + ''' + meta = self.source._data.header + for i in self._extractmap: + cellname = meta._cellnames[i] + yield meta._cells[cellname] + + def extract(self, cellrequest = None): + ''' + @summary: Presets the iterator to the first row record and selects only those columns to show and convert who are referenced in the cell request. + This method works in a best effort manner, those column names that are not in this data table are silently omitted. + Also in case the unit requested is not allowed by a unit model that column of data is silently ignored. + @param cellrequest: the list of the column names and the corresponding unit to show during iteration, default is None which means show all columns without unit conversion + @type cellrequest: list of CellRequest + ''' + self._seq = 0 + meta = self.source._data.header + if cellrequest is None: + s = len(meta._cellnames[:]) + self._extractmap = range(s) + self._conversionmap = [(None, None)] * s + else: + self._extractmap = [] + self._conversionmap = [] + for cellreq in cellrequest: + for (colidx, cell) in meta.getCell( cellreq ): + try: + unit = cell.unit + dimension = cell.dimension + if cellreq.unit == unit: + unitmap = (None, None) + elif dimension.containsUnit(cellreq.unit): + unitmap = (unit, cellreq.unit) + else: + raise Exception("unit %s is not in the basin of dimension %s" % (unit, cell.dimension)) + except DataError: + unitmap = (None, None) + self._extractmap.append( colidx ) + self._conversionmap.append( unitmap ) + + def _extract(self, idx): + ''' + @summary: an internal helper method that takes care of extracting and ordering the columns in the order predefined by calling the extract method. + @param idx: the row index + @type idx: integer + @return: a list of the cell data slice from the row pointed by the row index + @rtype: list + ''' + ret = [] + i = 0 + s = len(self._extractmap) + D = self.source._data + while i < s: + c = self._extractmap[i] + celldata = D[idx][c] + sourceunit, targetunit = self._conversionmap[i] + if sourceunit is None: + ret.append( celldata ) + else: + ret.append( D.um.convert(celldata, sourceunit, targetunit) ) + i += 1 + return ret diff --git a/Monitoring/MonitoringService/DataProcessing/DataSample.py b/Monitoring/MonitoringService/DataProcessing/DataSample.py new file mode 100644 index 0000000..ee7245d --- /dev/null +++ b/Monitoring/MonitoringService/DataProcessing/DataSample.py @@ -0,0 +1,54 @@ +''' +Created on Mar 4, 2013 + +@author: steger +''' +from DataProcessing.DataSource import DataSource +from DataProcessing.DataReader import DataReader + +class DataSample(DataSource): + pass + + def __init__(self, table): + ''' + Constructor + ''' + DataSource.__init__(self) + self._data = table + + def __len__(self): + return len(self._data) + + def __getitem__(self, k): + return None + + @property + def name(self): + return "Original(%s)" % self._data.name + + @property + def readerClass(self): + return DataReader + + def _process(self): + status = 0 + with self._data.readlock: + if self._data.evCleared.isSet(): + self._sourcecleared() + self._data.evCleared.clear() + status |= self.CLEARED + if self._data.evExpanded.isSet(): + self._sourceexpanded() + self._data.evExpanded.clear() + status |= self.EXPANDED + return status + + @property + def writelock(self): + return self._data.writelock + + + @property + def um(self): + return self._data.um + diff --git a/Monitoring/MonitoringService/DataProcessing/DataSource.py b/Monitoring/MonitoringService/DataProcessing/DataSource.py new file mode 100644 index 0000000..a384df0 --- /dev/null +++ b/Monitoring/MonitoringService/DataProcessing/DataSource.py @@ -0,0 +1,116 @@ +''' +Created on Dec 10, 2012 + +@author: steger +''' +from threading import RLock +from DataProcessing.DataError import DataError, SamplerError +#FIXME: import dependency problem, circular reference +#from DataProcessing.DataReader import DataReader + +class DataSource(object): + ''' +#FIXME: docs + @summary: a template to represent any data generated by a tool or derived via various operations on data + @ivar source: pointer to the origin of the data + @note: DataSource instances reference their ancestor via the source property, in a recursive manner, the last item needs to implement a read and a write lock + @ivar data: pointer to the actual data container + @note: results of operations on data yield their result in the container referenced by the data property, the original data generator's data and source properties are meant to be the same + @ivar name: a name for the data source class + @ivar um: reference to the unit model of the ancient data source + ''' + PASS = 0 + CLEARED = 1 + EXPANDED = 2 + + def __init__(self, dependency = None): + self._readers = set() + if dependency is None: + self._inputreader = None + else: + self._inputreader = self.readerClass(dependency) + self._processlock = RLock() + self._data = None + +#FIXME: import error + def registerReader(self, reader): + ''' + @summary: registers a reader to catch clear and update events + @param reader: data consumer + @type reader: DataReader + @raise DataError: wrong argument + ''' + try: +# if isinstance(reader, DataReader): + self._readers.add(reader) + if len(self): + reader.sourceexpanded() + else: + reader.sourcecleared() +# else: + except SamplerError: + pass + except: + raise + raise DataError("Expecting a DataReader, got %s" % reader) + + def deregisterReader(self, reader): + ''' + @summary: removes a registered reader + @param reader: data consumer + @type reader: DataReader + ''' + try: + self._readers.remove(reader) + except KeyError: + pass + + def __len__(self): + raise DataError("%s must implement __len__ method" % self) + + def __getitem__(self, k): + raise DataError("%s must implement __getitem__ method" % self) + + @property + def name(self): + raise DataError("%s must implement name property" % self) + + @property + def readerClass(self): + raise DataError("%s must implement readerClass property" % self) + + @property + def data(self): + self.process() + return self._data + + def process(self): + ''' + @summary: recursively process data records of the source chain + @return: status of the data processing + @rtype: int + ''' + with self._processlock: + if self._inputreader: +# print "PROC SRC", self, self._inputreader.source + self._inputreader.source.process() +# print "PROC", self + status = self._process() + if status & self.CLEARED: +# print "SRC cleared", self + self._sourcecleared() + if status & self.EXPANDED: +# print "SRC expanded", self + self._sourceexpanded() + return status + + def _process(self): + raise DataError("%s must implement _process method returning process status" % self) + + def _sourcecleared(self): + for r in self._readers: + r.sourcecleared() + + def _sourceexpanded(self): + for r in self._readers: + r.sourceexpanded() diff --git a/Monitoring/MonitoringService/DataProcessing/Dimension.py b/Monitoring/MonitoringService/DataProcessing/Dimension.py new file mode 100644 index 0000000..0365ee2 --- /dev/null +++ b/Monitoring/MonitoringService/DataProcessing/Dimension.py @@ -0,0 +1,309 @@ +''' +Created on Feb 27, 2012 + +@author: steger +''' +from Unit import UnitManager +from DataProcessing.MeasurementLevel import MeasurementLevel, Interval, Ratio +from DataProcessing.DataError import DimensionError + +class DimensionManager(object): + ''' + @summary: the dimension container + @ivar dimenstions: the container of the known dimensions + @type dimensions: dict(str: L{Dimension}) + @ivar unitmanager: reference to the unit manager + @type unitmanager: L{UnitManager} + ''' + class Dimension(object): + ''' + @summary: a skeleton class for all the dimensions handled by L{DimensionManager} + @ivar manager: back reference to the dimension manager + @type manager: L{DimensionManager} + @ivar unitmanager: a reference to the unit manager + @type unitmanager: L{UnitManager} + @ivar reference: the unique identifier of the dimension + @ivar name: the name of the dimension + @type name: str + @ivar unit: the default unit of the dimension + @type unit: L{Unit} + @ivar basin: the set of units which are valid for this dimension + @type basin: set(L{Unit}) + ''' + def __init__(self, dimensionmanager, reference, name, unit, level): + ''' + @summary: constructor + @param dimensionmanager: reference to the dimension manager + @type dimensionmanager: L{DimensionManager} + @param reference: the reference to the dimension + @type reference: str + @param unit: the default unit of the dimension + @type unit: L{Unit} + @param level: the measurement level of the dimension + @type level: L{MeasurementLevel} + @note: the level is not a class instance + @raise L{DimensionError}: Wrong type of unit / Wrong type of level + ''' + if not isinstance(unit, UnitManager.Unit): + raise DimensionError("Wrong type of unit %s" % unit) + try: + if not issubclass(level, MeasurementLevel): + raise DimensionError("Wrong type of level %s" % level) + except TypeError: + raise DimensionError("Wrong type of level %s" % level) + self._data = (dimensionmanager, reference, name, unit) + self._level = level + @property + def manager(self): + return self._data[0] + @property + def unitmanager(self): + return self._data[0].unitmanager + @property + def reference(self): + return self._data[1] + @property + def name(self): + return self._data[2] + @property + def unit(self): + return self._data[3] + @property + def basin(self): + return self.unitmanager.getBasinByUnit(self.unit) + def level(self, level): + ''' + @summary: check measurement level against the given level + @param level: measurement level + @type level: L{MeasurementLevel} + @return: True if the measurement level given as a parameter + is the same or looser than the level of the dimension + @rtype: bool + @raise L{DimensionError}: Wrong type of level + ''' + if not issubclass(level, MeasurementLevel): + raise DimensionError("Wrong type of level %s" % level) + return issubclass(self._level, level) + def __str__(self): + return "%s [%s]" % (self.name, self.unit) + def __eq__(self, d): + if not isinstance(d, DimensionManager.Dimension): + raise DimensionError("wrong type") + return self._level == d._level and self.containsUnit(d.unit) + def containsUnit(self, unit): + ''' + @summary: checks if a given unit is in the basin of this dimension + @param unit: the unit to check + @type unit: L{Unit} + @return: true if the unit is applicable for this dimension + @rtype: bool + ''' + return unit in self.unitmanager.getBasinByUnit(self.unit) + + class BaseDimension(Dimension): + ''' + @summary: a dimension axiom + ''' + pass + + class DerivedDimension(Dimension): + ''' + @summary: a skeleton for dimensions, which are deriving from other already known dimensions + ''' + def ancestors(self): + ''' + @summary: iterate over all ancestors this dimension is derived from + @return: generator over ancestors + @rtype: L{Dimension} + ''' + for d in self._ancestor: + yield d + + class DifferenceDimension(DerivedDimension): + ''' + @summary: a dimension defined by subtracting two individuals of a known dimension + ''' + def __init__(self, dimensionmanager, reference, name, unit, derivedfrom): + ''' + @summary: constructor + @param dimensionmanager: reference to the dimension manager + @type dimensionmanager: L{DimensionManager} + @param reference: the reference to the dimension + @param unit: the default unit of the dimension + @type unit: L{Unit} + @param derivedfrom: the ancestor dimension this dimension is derived from + @type derivedfrom: L{Dimension} + @raise L{DimensionError}: Wrong type of derivedfrom + ''' + if not isinstance(derivedfrom, DimensionManager.Dimension): + raise DimensionError("Wrong type of derivedfrom") + if not derivedfrom.level(Interval): + raise DimensionError("Cannot subtract %s" % derivedfrom) + DimensionManager.Dimension.__init__(self, dimensionmanager, reference, name, unit, Ratio) + self._ancestor = derivedfrom + + class PowerDimension(DerivedDimension): + ''' + @summary: a dimension defined by raising an existing dimension to a given power + @ivar exponent: the power + @type exponent: int + ''' + def __init__(self, dimensionmanager, reference, name, unit, derivedfrom, exponent): + ''' + @summary: constructor + @param dimensionmanager: reference to the dimension manager + @type dimensionmanager: L{DimensionManager} + @param reference: the reference to the dimension + @param unit: the default unit of the dimension + @type unit: L{Unit} + @param derivedfrom: the ancestor dimension this dimension is derived from + @type derivedfrom: L{Dimension} + @param exponent: dimension is a derivative of the derivedfrom dimension, by raising to power exponent + @type exponent: int + @raise DimensionError: Wrong type of derivedfrom / Cannot power + ''' + if not isinstance(derivedfrom, DimensionManager.Dimension): + raise DimensionError("Wrong type of derivedfrom") + if not derivedfrom.level(Ratio): + raise DimensionError("Cannot power %s" % derivedfrom) + DimensionManager.Dimension.__init__(self, dimensionmanager, reference, name, unit, Ratio) + self._ancestor = (derivedfrom,) + self._exponent = exponent + @property + def exponent(self): + return self._exponent + + class ProductDimension(DerivedDimension): + ''' + @summary: dimension defined by multiplying at least two known different dimensions + ''' + def __init__(self, dimensionmanager, reference, name, unit, derivedfrom): + ''' + @summary: constructor + @param dimensionmanager: reference to the dimension manager + @type dimensionmanager: L{DimensionManager} + @param reference: the reference to the dimension + @param unit: the default unit of the dimension + @type unit: L{Unit} + @param derivedfrom: the set of dimensions that compose this dimension + @type derivedfrom: tuple(L{Dimension}) + @raise L{DimensionError}: Wrong type of derivedfrom / ProductDimension is derived from more than 2 Dimensions / Cannot be a factor + ''' + if not isinstance(derivedfrom, tuple): + raise DimensionError("Wrong type of derivedfrom") + if len(derivedfrom) < 2: + raise DimensionError("ProductDimension is derived from more than 2 Dimensions, got %d instead" % len(derivedfrom)) + for d in derivedfrom: + if not d.level(Ratio): + raise DimensionError("%s cannot be a factor" % d) + DimensionManager.Dimension.__init__(self, dimensionmanager, reference, name, unit, Ratio) + self._ancestor = derivedfrom + + class RatioDimension(DerivedDimension): + ''' + @summary: dimension defined by dividing two known dimensions + ''' + def __init__(self, dimensionmanager, reference, name, unit, derivedfrom): + ''' + @summary: constructor + @param dimensionmanager: reference to the dimension manager + @type dimensionmanager: L{DimensionManager} + @param reference: the reference to the dimension + @param unit: the default unit of the dimension + @type unit: L{Unit} + @param derivedfrom: the set of dimensions that compose this dimension + @type derivedfrom: tuple(L{Dimension}) + @raise L{DimensionError}: Wrong type of derivedfrom / Cannot be a factor + ''' + if not isinstance(derivedfrom, DimensionManager.Dimension): + raise DimensionError("Wrong type of derivedfrom") + if not derivedfrom.level(Ratio): + raise DimensionError("%s cannot be a factor" % derivedfrom) + DimensionManager.Dimension.__init__(self, dimensionmanager, reference, name, unit, Ratio) + self._ancestor = (derivedfrom,) + + def __init__(self, unitmanager): + ''' + @summary: constructor + @param unitmanager: the unit manager needs to be referenced, to check the basins of a unit + @type unitmanager: L{UnitManager} + ''' + self.dimensions = {} + self.unitmanager = unitmanager + + def __len__(self): + ''' + @summary: the number of dimension known by the L{DimensionManager} + @return: the number of dimension known by the L{DimensionManager} + @rtype: int + ''' + return len(self.dimensions) + + def __iter__(self): + ''' + @summary: an iterator over known dimensions + @return: the next known dimension + @rtype: L{Dimension} + ''' + for d in self.dimensions.values(): + yield d + + def newBaseDimension(self, reference, name, unit, level): + ''' + @summary: generate a new dimension + @param reference: the reference to the dimension + @type reference: str + @param unit: the default unit of the dimension + @type unit: L{Unit} + @param level: the measurement level of the dimension + @type level: L{MeasurementLevel} + @note: the level is not a class instance + @return: the new dimension + @rtype: L{Dimension} + @raise L{DimensionError}: Dimension with reference already exists / Wrong type of unit / Wrong type of level / Wrong type of dimension / + Expecting derivedfrom set / Wrong number of derived from Dimensions + ''' + if self.dimensions.has_key(reference): + raise DimensionError("Dimension with reference %s already exists" % reference) + dimension = self.BaseDimension(self, reference, name, unit, level) + self.dimensions[reference] = dimension + return dimension + + def newDerivedDimension(self, reference, name, unit, derivedfrom, dimtype, **kw): + ''' + @summary: generate a new dimension + @param reference: the reference to the dimension + @param unit: the default unit of the dimension + @type unit: L{Unit} + @param derivedfrom: the set of dimensions that compose this dimension + @type derivedfrom: tuple(L{Dimension}) or L{Dimension} + @param dimtype: possible dimension types are L{DifferenceDimension}, L{PowerDimension}, L{ProductDimension}, L{RatioDimension} + @note: dimtype parameter is not an instance, but a class scheme + @type dimtype: L{Dimension} + @return: the new dimension + @rtype: L{Dimension} + @keyword kw: L{PowerDimension} expects an integer valued parameter: exponent + @raise L{DimensionError}: Dimension with reference already exists / Wrong type of dimension + ''' + if self.dimensions.has_key(reference): + raise DimensionError("Dimension with reference %s already exists" % reference) + if issubclass(dimtype, self.DifferenceDimension)or issubclass(dimtype, self.ProductDimension) or issubclass(dimtype, self.RatioDimension): + dimension = dimtype(self, reference, name, unit, derivedfrom) + elif issubclass(dimtype, self.PowerDimension): + dimension = dimtype(self, reference, name, unit, derivedfrom, kw.get('exponent')) + else: + raise DimensionError("Wrong type of dimension %s" % dimtype) + self.dimensions[reference] = dimension + return dimension + + def __getitem__(self, reference): + ''' + @summary: look up the prefix in the DimensionManager based on its reference + @param reference: the reference to the dimension + @return: the dimension if found + @rtype: L{Dimension} + @raise L{DimensionError}: Dimension with reference not found + ''' + if not self.dimensions.has_key(reference): + raise DimensionError("Dimension with reference %s not found" % reference) + return self.dimensions[reference] diff --git a/Monitoring/MonitoringService/DataProcessing/LinearCombination.py b/Monitoring/MonitoringService/DataProcessing/LinearCombination.py new file mode 100644 index 0000000..5174b58 --- /dev/null +++ b/Monitoring/MonitoringService/DataProcessing/LinearCombination.py @@ -0,0 +1,42 @@ +''' +Created on Mar 21, 2013 + +@author: steger +''' +from DataProcessing.DataSource import DataSource +from DataProcessing.Aggregator import Aggregator +from DataProcessing.DataError import DataError + +class LinearCombination(DataSource): + ''' + classdocs + ''' + + def __init__(self): + ''' + Constructor + ''' + DataSource.__init__(self, dependency = None) + self._terms = [] + self._value = None + + def addTerm(self, factor, aggregate): + if not isinstance(aggregate, Aggregator): + raise DataError("Wrong type of term") + self._terms.append((factor, aggregate)) + + @property + def name(self): + return "BLA" + + @property + def value(self): + self.process() + return self._value + + def process(self): + result = 0 + for factor, aggregate in self._terms: + term = aggregate.aggregate + result += factor * term + self._value = result \ No newline at end of file diff --git a/Monitoring/MonitoringService/DataProcessing/MeasurementLevel.py b/Monitoring/MonitoringService/DataProcessing/MeasurementLevel.py new file mode 100644 index 0000000..4e3d702 --- /dev/null +++ b/Monitoring/MonitoringService/DataProcessing/MeasurementLevel.py @@ -0,0 +1,46 @@ +''' +Created on Mar 22, 2012 + +@author: steger, jozsef + +@summary: Class representation of the measurement levels (aka measurement scale) defined by Stanley Smith Stevens. +Stevens proposed his theory in a 1946 Science article titled "On the theory of scales of measurement". +@note: These classes are not meant to be instantiated ever. +''' + +class MeasurementLevel: + ''' + @summary: It serves as the common scheme for the measurement levels. Only its subclasses have a meaning. + ''' + pass + +class Nominal(MeasurementLevel): + ''' + @summary: Values of this kind of measurement are mere elements of a set. + ''' + pass + +class Ordinal(Nominal): + ''' + @summary: A ranking is defined between the values of this kind of measurement. + ''' + pass + +class Interval(Ordinal): + ''' + @summary: A difference is defined which can be evaluated for any two values of this kind of measurement. + ''' + pass + +class Ratio(Interval): + ''' + @summary: There is a reference value defined for this kind of measurement, that is "zero" has a meaning. + ''' + pass + +lut_level = { + 'NominalLevel': Nominal, + 'OrdinalLevel': Ordinal, + 'IntervalLevel': Interval, + 'RatioLevel': Ratio, +} diff --git a/Monitoring/MonitoringService/DataProcessing/Parameter.py b/Monitoring/MonitoringService/DataProcessing/Parameter.py new file mode 100644 index 0000000..2f48fa1 --- /dev/null +++ b/Monitoring/MonitoringService/DataProcessing/Parameter.py @@ -0,0 +1,288 @@ +''' +Created on Oct 20, 2011 + +@author: steger, jozsef +@organization: ELTE +@contact: steger@complex.elte.hu +''' + +from DataProcessing.Dimension import DimensionManager +from DataProcessing.DataError import ParameterError + +class Parameter(object): + ''' + @author: steger, jozsef + @summary: + This class represents the control parameters of a monitoring task. + ''' + + def __init__(self, name, valuetype, unitmanager, dimension, default = None): + ''' + @summary: Constructor + @param name: the name of the parameter + @type name: str + @param valuetype: the type of the parameter (used when reading value information) + @type valuetype: type +@fixme: docs + @param default: the preset unit aware value of the parameter + @type default: a tuple of value and unit + ''' + self.um = unitmanager + if not isinstance(dimension, DimensionManager.Dimension): + raise ParameterError("wrong type of dimension") + self._data = (name, valuetype, dimension) + self._value = None + if default is not None: + self.value = default + + def __str__(self): + if self._value is None: + return "%s (%s)" % (self.name, self.dimension) + else: + return "%s (%s) = %s [%s] as %s" % (self.name, self.dimension.name, self._value[0], self._value[1], self.valuetype) + + @property + def name(self): + return self._data[0] + + @property + def valuetype(self): + return self._data[1] + + @property + def dimension(self): + return self._data[2] + + @property + def value(self): + return self._value + @value.setter + def value(self, value): + _, unit = value + if not self.dimension.containsUnit(unit): + raise ParameterError("Unit %s is not in the basin of the dimension %s" % (unit, self.dimension)) + self._value = tuple(value) + @value.deleter + def value(self): + self._value = None + + def copy(self): + return Parameter(name = self.name, valuetype = self.valuetype, unitmanager = self.um, dimension = self.dimension, default = self.value) + + def convert(self, unit): + ''' + @summary: returns the value of the given parameter in the required unit + @param unit: the requested unit, which must adhere to the unit model of this parameter + @type unit: Unit + @return: the parameter value represented in the requested units + @rtype: + @raise ParameterError: Unit not in dimension basin / Unit is not initialized + ''' + if not self.dimension.containsUnit(unit): + raise ParameterError("Unit %s is not in the basin of the dimension %s" % (unit, self.dimension)) + if self._value is None: + raise ParameterError("%s is not initialized" % self) + val, un = self._value + if unit == un: + return self.valuetype(val) + else: + return self.valuetype( self.um.convert(value = val, from_unit = un, to_unit = unit) ) + + def convertToReferencedUnit(self, unitreference): + ''' + @summary: returns the parameter value in units, where the unit is referenced + @param unitreference: the reference to the requested unit, which must adhere to the unit model of this parameter + @type unit: str + ''' + return self.convert( self.um[unitreference] ) + + + + + + + + + + + + + + + + + + + + + + + + + + + +class ParameterList(object): + ''' + @author: steger, jozsef + @summary: + This class represents a list of control parameters of a monitoring task. + ''' + + def __init__(self, parameterlist = []): + ''' + @summary: Constructor + @param parameterlist: a list of parameters to handle together + @type parameterlist: list(Parameter) or ParameterList + ''' + self.parameter = {} + self.extend(parameterlist) + + def __str__(self): + ''' + ''' + return " [%s\n\t]" % "\n\t\t".join([ "%s," % (p) for p in self.parameter.values() ]) + + def __len__(self): + ''' + @summary: return the size of the parameter list + @return: the size of the parameter list + @rtype: integer + ''' + return len(self.parameter) + + def __iter__(self): + ''' + @summary: provide an iterator over all the parameter elements + @return: the next parameter + @rtype: Parameter + ''' + for p in self.parameter.values(): + yield p + + def __getitem__(self, key): + ''' + @summary: provide the value of a parameter without unit conversion + @return: current value + @rtype: (str, unit) + ''' + return self.parameter[key].value + + def append(self, p): + ''' + @summary: append a new Parameter to the parameter list. If a wrong type of parameter is given, silently discard it. + In case a parameter with the same name exists overwrite its value only. + @param p: a new parameter to add or an existing parameter to update former values + @type p: Parameter + ''' + if not isinstance(p, Parameter): + print "WW: %s is not a parameter" % str(p) + return + if self.has_key(p.name): + print "WW: parameter with name %s is updated" % p.name + self.parameter[p.name].value = p.value + else: + self.parameter[p.name] = p + + def has_key(self, name): + ''' + @summary: Check if a parameter with a given name is already in the list + @param name: the name of the parameter looking for + @type name: str + ''' + return self.parameter.has_key(name) + + def get(self, name, unit): + ''' + @summary: Read the parameter pointed by a given name in the required unit + @param name: the name of the parameter + @type name: str + @param unit: the target unit the caller wants the named parameter to be expressed in + @type unit: Unit + @raise ParameterError: no such parameter name + ''' + if not self.has_key(name): + raise ParameterError("No Parameter with name: %s" % name) + return self.parameter[name].convert(unit) + + def getInReferencedUnits(self, name, unitreference): + ''' + @summary: Read the parameter pointed by a given name in the required unit + @param name: the name of the parameter + @type name: str + @param unitreference: the target unit the caller wants the named parameter to be expressed in + @type unitreference: str + @raise ParameterError: no such parameter name + ''' + if not self.has_key(name): + raise ParameterError("No Parameter with name: %s" % name) + return self.parameter[name].convertToReferencedUnit(unitreference) + + def update(self, name, value, unit): + ''' + @summary: reset the value of the parameter with the given name + @param name: the name of the parameter to update + @type name: str + @param value: the new value + @type value: depends on the Parameter.type + @param unit: the new unit + @type unit: Unit + ''' + self.parameter[name].value = value, unit + + def updateInReferencedUnits(self, name, value, unitreference): + ''' + @summary: reset the value of the parameter with the given name + @param name: the name of the parameter to update + @type name: str + @param value: the new value + @type value: depends on the Parameter.type + @param unitreference: the new unit + @type unitreference: str + ''' + p = self.parameter[name] + p.value = value, p.um[unitreference] + + def update_by_list(self, p_updating): + ''' + @summary: update parameter list with matching elements of another parameter list + @param p_updating: parameter list, whose matching elements update the element of this list + @type p_updating: ParameterList + @raise ParameterError: wrong argument type + ''' + if not isinstance(p_updating, ParameterList): + raise ParameterError("wrong argument type") + for name in p_updating.parameter_names(): + if self.has_key(name): + v = p_updating.parameter[name].value + if v is not None: + self.parameter[name].value = v + + def clear(self): + ''' + @summary: Empty the parameter list + ''' + self.parameter.clear() + + def copy(self): + return ParameterList( map(lambda p: p.copy(), self) ) + + def extend(self, parameterlist): + ''' + @summary: extends this parameter list with the items of another parameter list + @param paramlist: the list of parameter items to extend with + @type paramlist: ParameterList + ''' + for p in parameterlist: + self.append(p) + + def parameter_names(self): + ''' + @summary: List the names of the currently hold parameters + @return: list of Parameter.name + @rtype: list + ''' + return self.parameter.keys() + + def formkeyvaldict(self): + return dict( [ (name, p.value[0]) for (name, p) in self.parameter.iteritems() ] ) diff --git a/Monitoring/MonitoringService/DataProcessing/Prefix.py b/Monitoring/MonitoringService/DataProcessing/Prefix.py new file mode 100644 index 0000000..5e4e1d3 --- /dev/null +++ b/Monitoring/MonitoringService/DataProcessing/Prefix.py @@ -0,0 +1,129 @@ +''' +Created on Feb 27, 2012 + +@author: steger +''' +from DataProcessing.DataError import PrefixError + +class PrefixManager(object): + ''' + @summary: acts a unit prefix container + @ivar prefixes: the container of the known prefixes + @type prefixes: dict(str: L{Prefix}) + @ivar duplicatesymbols: the set of prefixes, which map to the same symbol + @type duplicatesymbols: set(str) + ''' + class Prefix(object): + ''' + @summary: represents a unit prefix and the scaling information + @ivar reference: a uniq prefix identifier + @ivar symbol: a short representation of the prefix + @type symbol: str + @ivar base: the base of the scaling factor + @type base: int + @ivar exponent: the exponent of the scaling factor + @type exponent: int + @ivar scale: the scaling factor, which is base ** exponent + @type scale: float + ''' + def __init__(self, reference, symbol, base, exponent): + ''' + @summary: constructor + @param reference: the reference to the unit prefix + @type reference: str + @param symbol: a short form of the unit prefix + @type symbol: str + @param base: the base of the unit prefix, typically 2 or 10 + @type base: int + @param exponent: the exponent of the unit prefix + @type exponent: int + ''' + scale = base ** exponent + self._data = (reference, symbol, base, exponent, scale) + def __str__(self): + return self.symbol + @property + def reference(self): + return self._data[0] + @property + def symbol(self): + return self._data[1] + @property + def base(self): + return self._data[2] + @property + def exponent(self): + return self._data[3] + @property + def scale(self): + return self._data[4] + + def __init__(self): + ''' + @summary: constructor + ''' + self.prefixes = {} + self.duplicatesymbols = set() + + def __contains__(self, item): + ''' + @summary: check the existence of a unit prefix + @param item: a prefix or a prefix symbol + @type item: L{Prefix} or str + @return: True if the prefix is known by the L{PrefixManager} + @rtype: bool + @raise L{PrefixError}: Wrong item type + ''' + if isinstance(item, self.Prefix): + return item in self.prefixes.values() + elif isinstance(item, str): + for prefix in self.prefixes.values(): + if prefix.symbol == item: + return True + return False + else: + raise PrefixError("Wrong item type %s" % item) + + def __len__(self): + ''' + @summary: the number of prefixes known by the L{PrefixManager} + @return: the number of prefixes known by the L{PrefixManager} + @rtype: int + ''' + return len(self.prefixes) + + def newPrefix(self, reference, symbol, base, exponent): + ''' + @summary: generate a new unit prefix + @param reference: the reference to the unit prefix + @type reference: str + @param symbol: a short form of the unit prefix + @type symbol: str + @param base: the base of the unit prefix, typically 2 or 10 + @type base: int + @param exponent: the exponent of the unit prefix + @type exponent: int + @return: the new unit prefix + @rtype: L{Prefix} + @raise L{PrefixError}: Prefix with reference exists + ''' + if self.prefixes.has_key(reference): + raise PrefixError("Prefix with reference %s already exists" % reference) + if PrefixManager.__contains__(self, symbol): + self.duplicatesymbols.add(symbol) + prefix = self.Prefix(reference, symbol, base, exponent) + self.prefixes[reference] = prefix + return prefix + + def __getitem__(self, reference): + ''' + @summary: look up the prefix in the L{PrefixManager} based on its reference + @param reference: the reference to the unit prefix + @type reference: str + @return: the unit prefix found + @rtype: L{Prefix} + @raise L{PrefixError}: Prefix with reference not found + ''' + if self.prefixes.has_key(reference): + return self.prefixes[reference] + raise PrefixError("Prefix with reference %s not found" % reference) diff --git a/Monitoring/MonitoringService/DataProcessing/Sampler.py b/Monitoring/MonitoringService/DataProcessing/Sampler.py new file mode 100644 index 0000000..df8c2ec --- /dev/null +++ b/Monitoring/MonitoringService/DataProcessing/Sampler.py @@ -0,0 +1,204 @@ +''' +Created on Nov 20, 2012 + +@author: steger +''' +from DataProcessing.DataReader import DataReader +from DataProcessing.DataHeader import DataHeader +from DataProcessing.DataSource import DataSource +from DataProcessing.Data import Data +from DataProcessing.DataError import SamplerError + +class Sampler(DataSource): + ''' + classdocs + ''' + + def __init__(self, datasource): + ''' + Constructor + @param datasource: table of records to manipulate with + @type datasource: DataSource + ''' + if not isinstance(datasource, DataSource): + raise SamplerError("Wrong type of datasource %s" % datasource) + DataSource.__init__(self, dependency = datasource) + self.source = datasource + header = DataHeader("%sSample(%s)" % (self.name, self.source.name)) + for c in self._inputreader.headercells(): + header.addColumn(c) + self.um = self.source.um + self._data = Data(self.um, header) + + @property + def readerClass(self): + return DataReader + + @property + def header(self): + return self._data.header + + def __len__(self): + self.process() + return len(self._data) + + def __getitem__(self, k): + return self._data._rawrecords.__getitem__(k) + + @property + def writelock(self): + return self._data.writelock + + +class Head(Sampler): + def __init__(self, datasource, head = 10): + ''' + Constructor + @param datasource: table of records to manipulate with + @type datasource: DataSource + @param head: the top n records of the table + @type head: int + ''' + Sampler.__init__(self, datasource) + self._head = head + + @property + def name(self): + return "Head" + + @property + def head(self): + return self._head + @head.setter + def head(self, head): + self._head = int(head) + self._data.clear() + self._inputreader.rewind() + + def _process(self): + status = self.PASS + if self._inputreader.sourceCleared.isSet(): + self._inputreader.sourceCleared.clear() + self._inputreader.rewind() + self._data.clear() + status |= self.CLEARED + if len(self._data) == self.head: + return status + for x in self._inputreader: + self._data._rawrecords.append(x) + if len(self._data) == self.head: + status |= self.EXPANDED + return status + raise SamplerError("Not enough sample %d/%d" % (len(self._data), self.head)) + +class Tail(Sampler): + def __init__(self, datasource, tail = 10): + ''' + Constructor + @param datasource: table of records to manipulate with + @type datasource: DataSource + @param tail: the last n records of the table + @type tail: int + ''' + Sampler.__init__(self, datasource) + self._tail = tail + + @property + def name(self): + return "Tail" + + @property + def tail(self): + return self._tail + @tail.setter + def tail(self, tail): + self._tail = int(tail) + self._data.clear() + self._inputreader.rewind() + + def _process(self): + status = self.PASS + clear = False + if self._inputreader.sourceCleared.isSet(): + self._inputreader.sourceCleared.clear() + self._inputreader.rewind() + self._data.clear() + for x in self._inputreader: + if len(self._data) == self.tail: + self._data._rawrecords.pop(0) + clear = True + self._data._rawrecords.append(x) + if clear: + status |= self.CLEARED + if len(self._data) == self.tail: + status |= self.EXPANDED + return status + else: + raise SamplerError("Not enough sample %d/%d" % (len(self._data), self.tail)) + +class Sorter(Sampler): + def __init__(self, datasource, keycell = None, ascending = True): + ''' + Constructor + @param datasource: table of records to manipulate with + @type datasource: DataSource + @param keycell: the key column to use for sorting + @type keycell: CellRequest or None + @param ascending: indicate the sortin order + @type ascending: bool + ''' + Sampler.__init__(self, datasource) + self._asc = ascending + self._key = 0 + if keycell: + self.keycell = keycell + + @property + def name(self): + return "Sort" + + @property + def ascending(self): + return self._asc + @ascending.setter + def ascending(self, ascending): + if bool(ascending) != self._asc: + self._asc = bool(ascending) + self._data.clear() + self._inputreader.rewind() + + @property + def key(self): + return self._key + @key.setter + def key(self, key): + self._key = int(key) + + @property + def keycell(self): + raise SamplerError("don't read this property") + @keycell.setter + def keycell(self, cellrequest): + for idx, _ in self.source._data.header.getCell(cellrequest): + self._key = idx + break + + def _process(self): + status = self.PASS + if self._inputreader.sourceCleared.isSet(): + self._inputreader.sourceCleared.clear() + status |= self.CLEARED + if self._inputreader.sourceExpanded.isSet(): + self._inputreader.sourceExpanded.clear() + status |= self.CLEARED + if status == self.PASS: + return self.PASS + self._inputreader.rewind() + self._data.clear() + self._data._rawrecords = sorted(self._inputreader.source._data._rawrecords, key=lambda r: r[self.key], reverse = not self.ascending) + if len(self._data): + status |= self.EXPANDED + return status + else: + raise SamplerError("Not enough sample...") + diff --git a/Monitoring/MonitoringService/DataProcessing/Unit.py b/Monitoring/MonitoringService/DataProcessing/Unit.py new file mode 100644 index 0000000..c21339c --- /dev/null +++ b/Monitoring/MonitoringService/DataProcessing/Unit.py @@ -0,0 +1,345 @@ +''' +Created on Oct 19, 2011 + +@author: steger, jozsef +@organization: ELTE +@contact: steger@complex.elte.hu +''' +from DataError import UnitError + +class UnitManager(object): + ''' + @summary: the unit container + + @note: The relationship between various unit, describing the derivation paths are not stored in this model, + because this information can be inferred from the dimension derivations, represented in the L{DimensionManager}. + @note: Units that are formed by prepending a unit prefix (L{Prefix}) are dealt as a L{DerivedUnit}. + + @ivar units: container of known units + @type units: dict(str: L{Unit}) + @ivar conversionpaths: is a map of operations to carry out from a unit to get a different unit + @type conversionpaths: dict((L{Unit}, L{Unit}): (callable, args)) + @ivar basins: indicates the derivatives of a basic unit + @type basins: dict(L{BasicUnit}: set(L{Unit})) + @ivar duplicatesymbols: collection of unit symbols, which more than one unit may bare + @type duplicatesymbols: set(str) + ''' + + class Unit(object): + ''' + @summary: common skeleton of all units + @ivar manager: reference to the unit manager + @type manager: L{UnitManager} + @ivar reference: unique reference of the unit + @ivar symbol: short form of the unit + @type symbol: str + ''' + def __init__(self, manager, reference, symbol, ancestor): + ''' + @summary: bind and store common information of the unit + @param manager: the unit manager + @type manager: L{UnitManager} + @param reference: a unique identifier + @param symbol: short human readable representation of the unit + @type symbol: str + @param ancestor: the ancestor of this unit is deriving from + @type ancestor: L{Unit} + ''' + self._data = (manager, reference, symbol) + self._ancestor = ancestor + @property + def manager(self): + return self._data[0] + @property + def reference(self): + return self._data[1] + @property + def symbol(self): + return self._data[2] + def __str__(self): + return self.symbol + def __eq__(self, u): + return self._data == u._data + + class BasicUnit(Unit): + ''' + @summary: a unit axiom + ''' + def __init__(self, manager, reference, symbol): + ''' + @summary: constructor + A BasicUnit is an instance of either set of BaseUnit, ProductUnit and PowerUnit as of the information model. + @param manager: a reference to the unit manager + @type manager: L{UnitManager} + @param reference: the reference to the unit + @param symbol: an abbreviation for the unit + @type symbol: str + ''' + UnitManager.Unit.__init__(self, manager, reference, symbol, None) + + class DerivedUnit(Unit): + ''' + @summary: a unit deriving from various known units + ''' + def __init__(self, manager, reference, symbol, ancestor): + ''' + @summary: constructor + A DerivedUnit is an instance of either set of LinearTransformedUnit and RegexpScaledUnit as of the information model. + Also units that have any unit prefix fall in this set. + @param manager: a reference to the unit manager + @type manager: L{UnitManager} + @param reference: the reference to the unit + @param symbol: an abbreviation for the unit + @type symbol: str + @param ancestor: the neighbor unit, whose derivative this instance is. + @type ancestor: L{Unit} + ''' + UnitManager.Unit.__init__(self, manager, reference, symbol, ancestor) + + + def __init__(self): + ''' + @summary: constructor + ''' + self.units = {} + self.conversionpaths = {} + self.basins = {} + self.duplicatesymbols = set() + + def __contains__(self, item): + ''' + @summary: check the existence of a unit + @param item: a unit or its symbol + @type item: L{Unit} or str + @return: True if the unit is known by the L{UnitManager} + @rtype: bool + @raise L{UnitError}: Wrong item type + ''' + units = set(self.units.values()) + if isinstance(item, self.Unit): + return item in units + elif isinstance(item, str): + for unit in units: + if unit.symbol == item: + return True + return False + else: + raise UnitError("Wrong item type %s" % item) + + def __len__(self): + ''' + @summary: the number of units known by the L{UnitManager} + @return: the number of units known by the L{UnitManager} + @rtype: int + ''' + return len(self.units) + + @staticmethod + def intORfloat(x): + ''' + @summary: a conversion helper to read out a value as a number + @param x: a number + @type x: str + @return: the number converted to integer or floating point decimal + @rtype: int or float + ''' + if isinstance(x, str): + try: + return int(x) + except ValueError: + return float(x) + else: + return float(x) + + def __getitem__(self, reference): + ''' + @summary: look up the unit in the L{UnitManager} using its reference + @param reference: the reference to the unit + @return: the unit found + @rtype: L{Unit} + @raise L{UnitError}: Unit with reference not found + ''' + if self.units.has_key(reference): + return self.units[reference] + raise UnitError("Unit with reference %s not found" % reference) + + def newBasicUnit(self, reference, symbol): + ''' + @summary: generate a new basic unit + @param reference: the reference to the unit + @param symbol: a short form of the unit + @type symbol: str + @return: the new unit + @rtype: L{BasicUnit} + @raise L{UnitError}: Unit with reference exists + ''' + if self.units.has_key(reference): + raise UnitError("Unit with reference %s exists" % reference) + if UnitManager.__contains__(self, symbol): + self.duplicatesymbols.add(symbol) + unit = self.BasicUnit(self, reference, symbol) + self.units[reference] = unit + self.basins[unit] = set([unit]) + self.__dict__[reference] = unit + return unit + + def addLinearTransformedUnit(self, reference, symbol, derivedfrom, scale, offset = 0): + ''' + @summary: generate a derived unit + @param reference: the reference to the unit + @param symbol: a short form of the unit + @type symbol: str + @param derivedfrom: the neighbor unit + @type derivedfrom: L{Unit} + @param scale: scaling factor for the linear transformation + @type scale: float + @param offset: the shift in the linear transformation, defaults to 0 + @type offset: float + @return: the new unit + @rtype: L{DerivedUnit} + @raise L{UnitError}: Wrong type of derivedfrom / Unit not found / Unit with reference exists / Cannot extend basin with unit, because Unit not found + ''' + if not isinstance(derivedfrom, self.Unit): + raise UnitError("Wrong type of derivedfrom %s" % derivedfrom) + if not UnitManager.__contains__(self, str(derivedfrom)): + raise UnitError("Unit %s not found" % derivedfrom) + if self.units.has_key(reference): + raise UnitError("Unit with reference %s exists" % reference) + unit = self.DerivedUnit(self, reference, symbol, derivedfrom) + basic = derivedfrom + while basic._ancestor: + basic = basic._ancestor + if not self.basins.has_key(basic): + raise UnitError("Cannot extend basin with unit %s, because Unit %s not found" % (unit, basic)) + if UnitManager.__contains__(self, symbol): + self.duplicatesymbols.add(symbol) + self.units[reference] = unit + self.conversionpaths[(unit, derivedfrom)] = (self.op_lt_forward, (scale, offset)) + self.conversionpaths[(derivedfrom, unit)] = (self.op_lt_inverse, (scale, offset)) + self.basins[basic].add(unit) + self.__dict__[reference] = unit + return unit + + def addRegexpTransformedUnit(self, reference, symbol, derivedfrom, expr_forward, expr_inverse): + ''' + @summary: generate a derived unit + @param reference: the reference to the unit + @param symbol: a short form of the unit + @type symbol: str + @param derivedfrom: the neighbor unit + @type derivedfrom: L{Unit} + @param expr_forward: the expression driving the forward transformation + @type expr_forward: str + @param expr_inverse: the expression driving the inverse transformation + @type expr_inverse: str + @return: the new unit + @rtype: L{DerivedUnit} + @raise L{UnitError}: Wrong type of derivedfrom / Unit not found / Unit with reference exists / Cannot extend basin with unit, because Unit not found + ''' + if not isinstance(derivedfrom, self.Unit): + raise UnitError("Wrong type of derivedfrom %s" % derivedfrom) + if not UnitManager.__contains__(self, str(derivedfrom)): + raise UnitError("Unit %s not found" % derivedfrom) + if self.units.has_key(reference): + raise UnitError("Unit with reference %s exists" % reference) + unit = self.DerivedUnit(self, reference, symbol, derivedfrom) + basic = derivedfrom + while basic._ancestor: + basic = basic._ancestor + if not self.basins.has_key(basic): + raise UnitError("Cannot extend basin with unit %s, because Unit %s not found" % (unit, basic)) + if UnitManager.__contains__(self, symbol): + self.duplicatesymbols.add(symbol) + self.units[reference] = unit + self.conversionpaths[(unit, derivedfrom)] = (self.op_rt_forward, expr_forward) + self.conversionpaths[(derivedfrom, unit)] = (self.op_rt_inverse, expr_inverse) + self.basins[basic].add(unit) + self.__dict__[reference] = unit + return unit + + def getBasinByUnit(self, unit): + ''' + @summary: return the set of units, which are compatible with a given unit + @param unit: the unit to look up + @type unit: L{Unit} + @return: the set of compatible units + @rtype: set(L{Unit}) + @raise L{UnitError}: not found + ''' + for basin in self.basins.values(): + if unit in basin: + return basin + raise UnitError("Basin for unit %s not found" % unit) + + def getBasinByReference(self, reference): + ''' + @summary: look up the compatible units of a given unit with the calling reference + @param reference: + @return: the set of compatible units + @rtype: set(L{Unit}) + @raise L{UnitError}: not found + ''' + try: + unit = self[reference] + return self.getBasinByUnit(unit) + except UnitError: + raise UnitError("Basin for unit reference %s not found" % reference) + + def op_lt_forward(self, value, so): + (scale, offset) = so + def op(value): + return scale * self.intORfloat( value ) + offset + if isinstance(value, list): + return map(lambda x: op(x), value) + return op(value) + + def op_lt_inverse(self, value, so): + (scale, offset) = so + def op(value): + return (self.intORfloat( value ) - offset) / float(scale) + if isinstance(value, list): + return map(lambda x: op(x), value) + return op(value) + + def op_rt_forward(self, value, expression): + def op(value): + raise UnitError("not implemented") + if isinstance(value, list): + return map(lambda x: op(x), value) + return op(value) + + op_rt_inverse = op_rt_forward + + def convert(self, value, from_unit, to_unit): + ''' + @summary: convert a value of one unit to the other + @param value: input value in from_unit + @param from_unit: the original unit of the input value + @type from_unit: L{Unit} + @param to_unit: the requested new unit + @type to_unit: L{Unit} + @raise L{UnitError}: unknown unit / incompatible units + ''' + if not UnitManager.__contains__(self, str(from_unit)): + raise UnitError("Unknown from_unit") + if not UnitManager.__contains__(self, str(to_unit)): + raise UnitError("Unknown to_unit") + if from_unit == to_unit: + return value + + while from_unit._ancestor: + op, oparg = self.conversionpaths[(from_unit, from_unit._ancestor)] + value = op(value, oparg) + from_unit = from_unit._ancestor + heap = [] + while to_unit._ancestor: + op, oparg = self.conversionpaths[(to_unit._ancestor, to_unit)] + heap.append((op, oparg)) + to_unit = to_unit._ancestor + if from_unit != to_unit: + raise UnitError("Different base units %s %s" % (from_unit, to_unit)) + while len(heap): + op, oparg = heap.pop(0) + value = op(value, oparg) + return value + diff --git a/Monitoring/MonitoringService/DataProcessing/__init__.py b/Monitoring/MonitoringService/DataProcessing/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/Monitoring/MonitoringService/DataProcessing/__init__.py diff --git a/Monitoring/MonitoringService/DataProcessing/test.py b/Monitoring/MonitoringService/DataProcessing/test.py new file mode 100644 index 0000000..e664512 --- /dev/null +++ b/Monitoring/MonitoringService/DataProcessing/test.py @@ -0,0 +1,366 @@ +''' +Created on Sep 2, 2011 + +@author: steger +''' +import unittest +from DataProcessing.Parameter import Parameter +from random import randint +from Example.Prefixes import prefixes, PM +from Example.Dimensions import DM, timeinterval, countable, ipaddress, cardinal +from Example.Units import UM, milli_second, pico_second, dozen, micro_second,\ + piece, nano_second, second +from DataProcessing.Data import Data +from DataProcessing.Aggregator import Sum, Max, Min, Mean, Deviation, Percentile +from DataProcessing.DataReader import DataReader +from DataProcessing.DataFormatter import JsonFormatter, DumbFormatter +from DataProcessing.Sampler import Head, Tail, Sorter +from DataProcessing.AggregatorManager import AggregatorManager +from DataProcessing.DataHeader import DataHeaderGeneratedByDescription,\ + DataHeader +from DataProcessing.DataHeaderCell import DataHeaderCell, CellRequestByName,\ + CellRequestByFeature +from DataProcessing.DataError import DataError, SamplerError +from DataProcessing.DataSample import DataSample + + +class Test(unittest.TestCase): + eps = 1e-15 + + def different(self, expect, got): + return abs(expect - got) / float(expect) < self.eps + + def setUp(self): + pass + + def test_PM(self): + for ref, symbol, base, exponent in prefixes: + scale = base ** exponent + p = PM[ref] + self.assertEqual(str(p), symbol, "symbol cannot be read back %s %s" % (p, symbol)) + self.assertEqual(p.scale, scale, "prefix %s scale error got: %f expect: %f" % (p, p.scale, scale)) + + self.assertTrue('p' in PM, "cannot find symbol") + self.assertFalse('pico' in PM, "found a symbol, which I shouldn't") + + + def test_UM(self): + s = randint(1, 10000) + expect = s * 1e-3 + got = UM.convert(s, milli_second, second) + self.assertTrue(self.different(expect, got), "Different (%d ms) expect %f s got %f s" % (s, expect, got)) + expect = s * 1e9 + got = UM.convert(s, milli_second, pico_second) + self.assertTrue(self.different(expect, got), "Different (%d ms) expect %f ps got %f ps" % (s, expect, got)) + + kilobit = UM["kilo_bit"] + megaByte = UM["mega_Byte"] + b = randint(1, 1000) + expect = b * 1e-3 / 8. + got = UM.convert(b, kilobit, megaByte) + self.assertTrue(self.different(expect, got), "Different (%d kbit) expect %f MB got %f MB" % (b, expect, got)) + + def test_D(self): + dim = DM['TimeInterval'] + for u in [second, milli_second]: + self.assertTrue(dim.containsUnit(u), "piece %s not in dim" % u) + bu = UM.getBasinByUnit(UM['second']) + br = UM.getBasinByReference('micro_second') + self.assertTrue(bu == br, "basins differ") + + def test_parameter(self): + n = randint(0, 1000) + parameter = Parameter(name = 'testparameter', valuetype = float, unitmanager = UM, dimension = countable, default = (n, dozen)) + v1 = 12 * parameter.value[0] + v2 = parameter.convert(piece) + self.assertTrue(abs(v1 - v2) < self.eps, "%d dozen and %d are not equal (type 1)" % (n, v2)) + n = randint(0, 1000) + parameter.value = (n, piece) + v = parameter.convert(dozen) + self.assertTrue(abs(12 * v - n) < self.eps, "%f dozen and %d are not equal (type 2)" % (v, n)) + + def test_addcolumn(self): + ''' + ''' + c1 = DataHeaderCell(name = "oszlop", dimension = timeinterval, unit = milli_second) + c2 = DataHeaderCell(name = "oszlop2", dimension = timeinterval, unit = second, feature = "kutyafule") + h = DataHeader(name = "proba") + h.addColumn(c1) + h.addColumn(c2) + self.assertRaises(DataError, h.addColumn, c1) + cr1 = CellRequestByName(name = "oszlop2") + cr2 = CellRequestByFeature(feature = "kutyafule") + qr1 = [ x for x in h.getCell(cellrequest = cr1) ] + qr2 = [ x for x in h.getCell(cellrequest = cr2) ] + self.assertEqual(qr1, qr2, "getCell oopses 1") + qr = [ x for x in h.getCell(cellrequest = CellRequestByFeature(feature = "macskanyelv")) ] + self.assertEqual(len(qr), 0, "getCell oopses 2") + + + def test_createheadertemplate(self): + header = DataHeader(name = "traceroute") + cell = DataHeaderCell(name = "idx", dimension = cardinal) + header.addColumn(cell) + iphdr = DataHeader(name = "info") + cell = DataHeaderCell(name = "address", dimension = ipaddress) + iphdr.addColumn(cell) + rtthdr = DataHeader(name = "rttinfo") + cell = DataHeaderCell(name = "roundtripdelay", dimension = timeinterval, unit = milli_second) + rtthdr.addColumn(cell) + iphdr.addColumn(rtthdr) + header.addColumn(iphdr) + header2 = DataHeaderGeneratedByDescription("traceroute", [('idx', cardinal), ("info", [('address', ipaddress), ("rttinfo", [('roundtripdelay', timeinterval, milli_second)])])]) + self.assertTrue(header == header2, "headers differ:\n%s\n%s" % (header, header2)) + + def test_complex_table(self): + ''' + ''' + header = DataHeaderGeneratedByDescription("traceroute", [('idx', cardinal), ("info", [('address', ipaddress), ("rttinfo", [('roundtripdelay', timeinterval, milli_second)])])]) + + D = Data(UM, header) + hoprecord = D.getTemplate(size = 2) + iprec1, iprec2 = hoprecord.getRecordTemplates(name = "info") + (rttrec1,) = iprec1.getRecordTemplates(name = "rttinfo", sizes = [3,]) + (rttrec2,) = iprec2.getRecordTemplates(name = "rttinfo", sizes = [3,]) + + rttrec1.update(name = 'roundtripdelay', values = [2.925, 3.279, 3.758], unit = milli_second) + iprec1.update(name = 'address', values = ['192.168.1.1']) + + rttrec2.update(name = 'roundtripdelay', values = [.008634, .008857, .009054], unit = second) + iprec2.update(name = 'address', values = ['157.181.172.126']) + + hoprecord.update(name = 'idx', values = [1,2]) + + D.saveRecord(hoprecord) + + def test_iteratorNextractor(self): + N = 1000 + header = DataHeaderGeneratedByDescription("temptable", [('idx', cardinal), ('RoundTripDelay', timeinterval, milli_second)]) + milli = map(lambda x: randint(1, 100000), range(N)) + micro = map(lambda x: 1000*x, milli) + nano = map(lambda x: 1000000*x, milli) + D = Data(UM, header) + hoprecord = D.getTemplate(size = N) + hoprecord.update(name = 'RoundTripDelay', values = milli, unit = milli_second) + hoprecord.update(name = 'idx', values = range(N)) + D.saveRecord(hoprecord) + + DS = DataSample(table = D) + + DR = DataReader(datasource = DS) + DR.extract(cellrequest = [CellRequestByName(name = 'RoundTripDelay'), CellRequestByName(name = 'RoundTripDelay', unit = micro_second), CellRequestByName(name = 'RoundTripDelay', unit = nano_second)]) + for x in DR: + mill, mic, nan = milli.pop(0), micro.pop(0), nano.pop(0) + delta = [(x[0]-mill)/mill, (x[1]-mic)/mic, (x[2]-nan)/nan] + mask = map(lambda d: abs(d)< self.eps, delta) + self.assertFalse((False in mask), "Conversion introduced a huge error GOT: %s EXPECTED: %s %s %s DELTA: %s MASK: %s" % (x, mill,mic,nan, delta, mask)) + + + @staticmethod + def randheader(): + return DataHeaderGeneratedByDescription("temptable", [('idx', cardinal), ('rnd', countable)]) + + def randtable(self, N = 10): + n = map(lambda x: randint(1, 100000), range(N)) + D = Data(UM, self.randheader()) + hoprecord = D.getTemplate(size = N) + hoprecord.update(name = 'rnd', values = n) + hoprecord.update(name = 'idx', values = range(N)) + D.saveRecord(hoprecord) + S = DataSample(table = D) + return n, D, S + + def test_reader(self): + N = 10 + header = self.randheader() + n1 = map(lambda x: randint(1, 100000), range(N)) + n2 = map(lambda x: randint(1, 100000), range(N)) + D = Data(UM, header) + hoprecord = D.getTemplate(size = N) + hoprecord.update(name = 'rnd', values = n1) + hoprecord.update(name = 'idx', values = range(N)) + DS = DataSample(table = D) + DR = DataReader(datasource = DS) + self.assertFalse(DR.sourceExpanded.isSet(), "dataready, howcome?") + D.saveRecord(hoprecord) + DS.process() + self.assertTrue(DR.sourceExpanded.isSet(), "data not ready, howcome?") + for _ in DR: + pass + self.assertFalse(DR.sourceExpanded.isSet(), "data still ready, howcome?") + hoprecord.update(name = 'rnd', values = n2) + D.saveRecord(hoprecord) + DS.process() + self.assertTrue(DR.sourceExpanded.isSet(), "data not ready, howcome?") + DR.rewind() + got = len([x for x in DR]) + self.assertEqual(2*N, got, "Expected %d items and got %d" % (2*N, got)) + + def test_formatter(self): + _, _, DS = self.randtable() + DF = DumbFormatter(datasource = DS) + res = DF.serialize() + #print res + self.assertGreater(len(res), 2, "empty? %s" % res) + JF = JsonFormatter(datasource = DS) + JF.reader.extract(cellrequest = [CellRequestByName(name = 'rnd')]) + res = JF.serialize() + #print res + self.assertGreater(len(res), 2, "empty? %s" % res) + + def test_aggregator(self): + N = 10 + n, _, DS = self.randtable(N) +# self.assertRaises(AggregatorError, Aggregator(D, CellRequestByName(name = 'rnd'))) + s = Sum(DS, CellRequestByName(name = 'rnd')) + mn = Min(DS, CellRequestByName(name = 'rnd')) + mx = Max(DS, CellRequestByName(name = 'rnd')) + avg = Mean(DS, CellRequestByName(name = 'rnd')) + S = sum(n) + self.assertEqual(s.data._rawrecords[0], (N, S), "sum %f != %f" % (s._aggregate, S)) + self.assertEqual(mn.data._rawrecords[0], (N, min(n)), "min %f != %f" % (mn._aggregate, min(n))) + self.assertEqual(mx.data._rawrecords[0], (N, max(n)), "max %f != %f" % (mx._aggregate, max(n))) + self.assertEqual(avg.data._rawrecords[0], (N, S/float(N)), "avg %f != %f" % (avg._aggregate, S/N)) + + def test_sampler(self): + header = self.randheader() + D = Data(UM, header) + DS = DataSample(table = D) + T = Tail(datasource = DS, tail = 10) + self.assertRaises(SamplerError, T.process) + + n, D, DS = self.randtable() + + H = Head(datasource = DS, head = 5) + DR = DataReader(datasource = H) + expect = n[:5] + got = [ x for _, x in DR ] + self.assertEqual(got, expect, "head %s != %s" % (got, expect)) + + T = Tail(datasource = DS) + T.tail = 5 + DR = DataReader(datasource = T) + expect = n[-5:] + got = [ x for _, x in DR ] + self.assertEqual(got, expect, "tail %s != %s" % (got, expect)) + + expect = n[:] + expect.sort() + S = Sorter(datasource = DS, keycell = CellRequestByName(name = 'rnd')) + DR = DataReader(datasource = S) + got = [ x for _, x in DR ] + self.assertEqual(got, expect, "sort %s != %s" % (got, expect)) + + expect.reverse() + S = Sorter(datasource = DS, keycell = CellRequestByName(name = 'rnd'), ascending = False) + DR = DataReader(datasource = S) + got = [ x for _, x in DR ] + self.assertEqual(got, expect, "sort %s != %s" % (got, expect)) + + + def test_DispersionOK(self): + header = self.randheader() + items = [55,56,57,63,67,68] + D = Data(UM, header) + hoprecord = D.getTemplate(size = len(items)) + hoprecord.update(name = 'rnd', values = items) + hoprecord.update(name = 'idx', values = range(len(items))) + D.saveRecord(hoprecord) + DS = DataSample(table = D) + a = Deviation(DS, CellRequestByName(name = 'rnd')) + a.empirical = False + a.data + self.assertTrue((5.26 == round(a._aggregate,2) ), "Dispersion FAILED 5.26 = "+str(a._aggregate)) + + def test_PercentOK(self): + header = self.randheader() + items = [4.0,5.0,5.0,4.0] + D = Data(UM, header) + hoprecord = D.getTemplate(size = len(items)) + hoprecord.update(name = 'rnd', values = items) + hoprecord.update(name = 'idx', values = range(len(items))) + D.saveRecord(hoprecord) + DS = DataSample(table = D) + a = Percentile(DS, CellRequestByName(name = 'rnd')) + a.percentile = .5 + a.data + self.assertTrue((4.5 == a._aggregate ), "Percent is FAILED 4.5 = "+str(a._aggregate)) + + def test_Pipe(self): + header = self.randheader() + items = [55,56,57,63,67,68] + D = Data(UM, header) + hoprecord = D.getTemplate(size = len(items)) + hoprecord.update(name = 'rnd', values = items) + hoprecord.update(name = 'idx', values = range(len(items))) + D.saveRecord(hoprecord) + DS = DataSample(table = D) + a = Mean(datasource = Tail(datasource = Head(datasource = DS, head = 4), tail = 2), cellrequest = CellRequestByName(name = 'rnd')) + a.data + res = a._aggregate + self.assertTrue((60 == res ), "Pipe FAILED 60 = "+str(res)) + + def test_aggregatorManager(self): + N = 12 + n, D, DS = self.randtable(N) + + AM = AggregatorManager() + _, A = AM.newAggregator(DS, CellRequestByName(name = 'rnd'), [(Tail, {'tail': 10}), (Head, {'head': 5}), (Sum, {})]) + + A.process() + expected = sum(n[-10:][:5]) + got = A._aggregate + self.assertEqual(expected, got, "sum (exp) %f != (got) %f" % (expected, got)) + + hoprecord = D.getTemplate(size = N) + n = map(lambda x: randint(1, 100000), range(N)) + hoprecord.update(name = 'rnd', values = n) + D.saveRecord(hoprecord) + + A.process() + + got = A._aggregate + expected = sum(n[-10:][:5]) + self.assertEqual(expected, got, "2 sum (exp) %f != (got) %f" % (expected, got)) + + + def test_aggrformatter(self): + n, _, DS = self.randtable(15) + AM = AggregatorManager() + _, A = AM.newAggregator(DS, CellRequestByName(name = 'rnd'), [(Tail, {'tail': 10}), (Head, {'head': 5}), (Sum, {})]) +# _, A = AM.newAggregator(DS, CellRequestByName(name = 'rnd'), [(Tail, {'tail': 10})]) + DF = JsonFormatter(datasource = A) + res = DF.serialize() +# print "_"*10 +# print res +# print "^"*10 + self.assertGreater(len(res), 2, "empty? %s" % res) + expected = sum(n[-10:][:5]) + got = A._aggregate + self.assertEqual(expected, got, "2 sum(head(tail())) (exp) %f != (got) %f" % (expected, got)) + + + def test_ComplexaggregateOK(self): + ''' + ''' + header = DataHeaderGeneratedByDescription("traceroute", [('idx', cardinal), ("info", [("rttinfo", countable)])]) + + D = Data(UM, header) + hoprecord = D.getTemplate(size = 5) + inf1, inf2, inf3, inf4, inf5 = hoprecord.getRecordTemplates(name = "info") + + inf1.update(name = 'rttinfo', values = [10]) + inf2.update(name = 'rttinfo', values = [15]) + inf3.update(name = 'rttinfo', values = [16]) + inf4.update(name = 'rttinfo', values = [18]) + inf5.update(name = 'rttinfo', values = [20]) + + hoprecord.update(name = 'idx', values = [1,2,3,4,5]) + + D.saveRecord(hoprecord) + #a = Aggregator(D, ['info','rttinfo']) + + +if __name__ == "__main__": + #import sys;sys.argv = ['', 'Test.test_UM'] + unittest.main() diff --git a/Monitoring/MonitoringService/Database/ConnectionPool.py b/Monitoring/MonitoringService/Database/ConnectionPool.py new file mode 100644 index 0000000..a9f2aa0 --- /dev/null +++ b/Monitoring/MonitoringService/Database/ConnectionPool.py @@ -0,0 +1,17 @@ +''' +Created on Aug 10, 2011 + +@author: steger +''' + +class ConnectionPool(object): + ''' + classdocs + ''' + + + def __init__(self, params): + ''' + Constructor + ''' + \ No newline at end of file diff --git a/Monitoring/MonitoringService/Database/DatabaseAccess.py b/Monitoring/MonitoringService/Database/DatabaseAccess.py new file mode 100644 index 0000000..d24a7bd --- /dev/null +++ b/Monitoring/MonitoringService/Database/DatabaseAccess.py @@ -0,0 +1,20 @@ +''' +Created on 08.08.2011 + +@author: csc +''' + +from ConnectionPool import ConnectionPool + +class DatabaseAccess(): + ''' + classdocs + ''' + + + def __init__(self, parent): + ''' + Constructor + ''' + self.parent = parent + self.pool = ConnectionPool(params = "foo") \ No newline at end of file diff --git a/Monitoring/MonitoringService/Database/StorageFIFO.py b/Monitoring/MonitoringService/Database/StorageFIFO.py new file mode 100644 index 0000000..ea406d2 --- /dev/null +++ b/Monitoring/MonitoringService/Database/StorageFIFO.py @@ -0,0 +1,17 @@ +''' +Created on Aug 10, 2011 + +@author: steger +''' + +class StorageFIFO(object): + ''' + classdocs + ''' + + + def __init__(self, params): + ''' + Constructor + ''' + \ No newline at end of file diff --git a/Monitoring/MonitoringService/Database/__init__.py b/Monitoring/MonitoringService/Database/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/Monitoring/MonitoringService/Database/__init__.py diff --git a/Monitoring/MonitoringService/Database/test.py b/Monitoring/MonitoringService/Database/test.py new file mode 100644 index 0000000..01c954f --- /dev/null +++ b/Monitoring/MonitoringService/Database/test.py @@ -0,0 +1,26 @@ +''' +Created on Aug 10, 2011 + +@author: steger +''' +import unittest + + +class Test(unittest.TestCase): + + + def setUp(self): + pass + + + def tearDown(self): + pass + + + def testName(self): + pass + + +if __name__ == "__main__": + #import sys;sys.argv = ['', 'Test.testName'] + unittest.main() \ No newline at end of file diff --git a/Monitoring/MonitoringService/Driver/Driver.py b/Monitoring/MonitoringService/Driver/Driver.py new file mode 100644 index 0000000..bd2ed18 --- /dev/null +++ b/Monitoring/MonitoringService/Driver/Driver.py @@ -0,0 +1,20 @@ +''' +Created on Oct 28, 2011 + +@author: steger, jozsef +@organization: ELTE +@contact: steger@complex.elte.hu +''' +import logging + +class Driver(object): + ''' + @summary: an empty driver to serve as an ancient class + @author: steger, jozsef + @cvar logger: an interface to the logger named "NOVI.DRIVER" + @type logger: logging.Logger + ''' + logger = logging.getLogger("NOVI.DRIVER") + +class DriverError(Exception): + pass \ No newline at end of file diff --git a/Monitoring/MonitoringService/Driver/LocalExec.py b/Monitoring/MonitoringService/Driver/LocalExec.py new file mode 100644 index 0000000..2cf4f8f --- /dev/null +++ b/Monitoring/MonitoringService/Driver/LocalExec.py @@ -0,0 +1,56 @@ +''' +Created on Feb 4, 2013 + +@author: steger, jozsef +@organization: ELTE +@contact: steger@complex.elte.hu +''' + +#TODO: nested command execution is not working properly: e.g.: echo `hostname` + +from Driver import Driver +from subprocess import Popen, PIPE + +class LocalExec(Driver): + ''' + @summary: implements a driver to execute local commands + @ivar command: the default command + @type command: str + @ivar p: the process api + @type p: subprocess.Popen or None + ''' + + def __init__(self, command = "echo -n helloworld"): + ''' + @summary: save a default command + @param command: the default command + @type command: str + ''' + self.command = command + self.p = None + + def __del__(self): + ''' + @summary: an implicit deletion of the driver triggers a kill signal on a running process + ''' + if self.p: + self.p.kill() + self.p = None + + def execute(self, command = None): + ''' + @summary: executes a local command + @param command: the command to run, if None, the default command is issued + @type command: str or None + @return: the standard output of the command run + @rtype: str + ''' + if command is None: + command = self.command + self.p = Popen(args = command.split(' '), stdout = PIPE, stderr = PIPE) + stout, sterr = self.p.communicate() + self.p = None + self.logger.debug("executed '%s'" % (command)) + if len(sterr): + self.logger.warning("execution '%s' failed: %s" % (command, sterr)) + return stout diff --git a/Monitoring/MonitoringService/Driver/REST.py b/Monitoring/MonitoringService/Driver/REST.py new file mode 100644 index 0000000..bf4fc2f --- /dev/null +++ b/Monitoring/MonitoringService/Driver/REST.py @@ -0,0 +1,63 @@ +''' +Created on Feb 4, 2013 + +@author: steger, jozsef +@organization: ELTE +@contact: steger@complex.elte.hu +''' + +from Driver import Driver +from Credential.credentialtypes import UsernamePassword +from httplib2 import Http + +class RESTDriver(Driver): + ''' + @summary: implements REST driver to fetch using http GET + @cvar timeout: timeout of connection + @type timeout: float + @cvar cache: a cache directory + @type cache: str + @ivar url: a default document locator to be reused + @type url: str + @ivar proxy: an interface to the http server + @type proxy: httplib2.Http + ''' + timeout = 10 + cache = "/tmp/.cache" + + def __init__(self, url, credential = None, validate_ssl = False): + ''' + @summary: initializes a proxy to the http service and saves a default document locator + @param url: the default document locator + @type url: str + @param credential: an authentication secret + @type credential: L{Credential} or None + @param validate_ssl: whether to apply strick certificate validation, default is False + @type validate_ssl: bool + ''' + self.url = url + self.proxy = Http(cache = self.cache, timeout = self.timeout) + self.proxy.disable_ssl_certificate_validation = not validate_ssl + if isinstance(credential, UsernamePassword): + # use password authentication + self.proxy.add_credentials(credential.username, credential.password) + + def fetch(self, url = None): + ''' + @summary: retrieve the document + @param url: the document locator, if not present the default is used + @type url: str or None + @return: the remote document + @rtype: str or None + @note: if the remote content cached is not changed, None is returned + ''' + if url is None: + url = self.url + status, response = self.proxy.request(uri = url, method = "GET") + if status.status == 200: + return response + if status.status == 304: + self.logger.warning("remote content @ %s was not changed" % url) + return None + self.logger.error("%s -- retrieving @%s failed: %s" % (status, url, response)) + return None diff --git a/Monitoring/MonitoringService/Driver/SOAPClient.py b/Monitoring/MonitoringService/Driver/SOAPClient.py new file mode 100644 index 0000000..7de5b70 --- /dev/null +++ b/Monitoring/MonitoringService/Driver/SOAPClient.py @@ -0,0 +1,22 @@ +''' +Created on Sep 2, 2011 + +@author: laki, sandor +@organization: ELTE +@contact: laki@complex.elte.hu +@author: steger, jozsef +''' + +#TODO: catch exception using the .service attribute and log it in the Driver log +from suds import transport, client, wsse +from Driver import Driver + +class SOAPClient(Driver, client.Client): + ''' + @summary: implements SOAP driver to access remote procedures + ''' + pass + +SOAPSecurity=wsse.Security +SOAPUsernameToken=wsse.UsernameToken +SOAPHttpAuthenticated=transport.http.HttpAuthenticated diff --git a/Monitoring/MonitoringService/Driver/SshExec.py b/Monitoring/MonitoringService/Driver/SshExec.py new file mode 100644 index 0000000..454253c --- /dev/null +++ b/Monitoring/MonitoringService/Driver/SshExec.py @@ -0,0 +1,170 @@ +''' +Created on Jul 18, 2011 + +@author: steger, jozsef +@organization: ELTE +@contact: steger@complex.elte.hu +''' + +from Driver import Driver, DriverError +from paramiko import SSHClient, RSAKey, SSHException, AutoAddPolicy +from Credential.credentialtypes import UsernamePassword, UsernameRSAKey +from time import sleep +import socket +import logging + +class SshDriver(Driver): + ''' + @summary: implements a driver to build an SSH connection to a server using paramiko package. + @cvar timeout: a timeout to set up the connection + @type timeout: float + @cvar trials: how many times to retry if a time out event occurs or if the remote server is busy to respond + @type trials: int + @cvar wait: how long to wait between two trials + @type wait: .2 + @ivar client: the api of the ssh client + @type client: paramiko.SSHClient or None + @ivar host: the host name of the remote ssh server + @type host: str or None + @ivar isconnected: indicates whether the connection is set up + @type isconnected: bool + ''' + timeout = 5 + trials = 3 + wait = .2 + + def __init__(self): + ''' + @summary: bind the paramiko loggers to the "NOVI.DRIVER" + ''' + self.client = None + self.host = None + #bind paramiko.transport logger to the same handlers used by NOVI.DRIVER + l = logging.getLogger("paramiko.transport") + for hlr in self.logger.handlers: + l.addHandler(hlr) + + def __del__(self): + ''' + @summary: close connection upon an implicit deletion of the driver + ''' + self.close() + + def connect(self, host, credential, port = 22, known_host = None): + ''' + @summary: set up the connection + @param host: the host name of the remote server + @type host: str + @param credential: the secret to use for connection set up + @type credential: L{Credential} + @param port: the port remote ssh server is listening + @type port: int + @param known_host: a file name containing host signatures to check, if None AutoAddPolicy applies + @type known_host: str + ''' + self.client = SSHClient() + if known_host is None: + self.client.set_missing_host_key_policy( AutoAddPolicy() ) + else: + self.client.load_host_keys(filename = known_host) + if isinstance(credential, UsernamePassword): + # use password authentication + self.client.connect(hostname = host, port = port, + username = credential.username, password =credential.password, + timeout = self.timeout, look_for_keys = False, compress = True) + elif isinstance(credential, UsernameRSAKey): + # use the RSA key + if credential.password: + pw = credential.password + else: + pw = None + key = RSAKey(password = pw, filename = credential.rsakey) + n = self.trials + while n: + try: + self.client.connect(hostname = host, port = port, + username = credential.username, pkey = key, + timeout = self.timeout, look_for_keys = False, compress = True) + break + except SSHException, e: + if e.message.startswith("Error reading SSH protocol banner"): + n -= 1 + self.logger.warn("retry %d times to connect @%s in %f seconds" % (n, host, self.wait)) + sleep(self.wait) + else: + self.logger.error("cannot connect @%s" % (host)) + raise DriverError("Cannot connect @%s " % host) + except socket.timeout: + n -= 1 + self.logger.warn("time out, retry %d times to connect @%s in %f seconds" % (n, host, self.wait)) + sleep(self.wait) + if not self.isConnected: + self.close() + self.logger.error("cannot connect @%s" % (host)) + raise DriverError("Cannot connect @%s " % host) + self.host = host + self.logger.info("ssh connected @ %s:%d" % (self.host, port)) + + def close(self): + ''' + @summary: closes the ssh connection + ''' + try: + self.client.close() + self.logger.info("ssh disconnected @ %s" % (self.host)) + except: + pass + finally: + self.client = None + self.host = None + + @property + def isConnected(self): + try: + return self.client.get_transport().is_active() + except: + return False + +class SshExec(SshDriver): + ''' + @summary: an extension of the L{SshDriver} to execute commands on the remote machine + @ivar command: the string representation of the command to run + @type command: str + ''' + + def __init__(self, host, credential, port = 22, command = "echo helloworld @ `hostname`", known_host = None): + ''' + @summary: initializes an ssh connection and stores a default command + @param host: the host name of the remote server + @type host: str + @param credential: the secret to use for connection set up + @type credential: L{Credential} + @param port: the port remote ssh server is listening + @type port: int + @param command: the default remote command + @type command: str + @param known_host: a file name containing host signatures to check, if None AutoAddPolicy applies + @type known_host: str + ''' + SshDriver.__init__(self) + self.connect(host, credential, port, known_host) + self.command = command + + def execute(self, command = None): + ''' + @summary: executes a remote command + @param command: the command to run, if None, the default command is issued + @type command: str or None + @return: the standard output of the command run + @rtype: paramico.ChannelFile + ''' + if not self.isConnected: + raise DriverError("Not connected") + if command is None: + command = self.command + _, stout, sterr = self.client.exec_command(command = command) + e = sterr.read() + self.logger.debug("executed @%s '%s'" % (self.host, command)) + if len(e): + self.logger.warning("execution @%s '%s' failed: %s" % (self.host, command, e)) + return stout diff --git a/Monitoring/MonitoringService/Driver/SshTunnel.py b/Monitoring/MonitoringService/Driver/SshTunnel.py new file mode 100644 index 0000000..f2b7d39 --- /dev/null +++ b/Monitoring/MonitoringService/Driver/SshTunnel.py @@ -0,0 +1,178 @@ +''' +Created on Jan 14, 2013 + +@author: steger +''' + +import select +import SocketServer +from Driver import Driver +from SshExec import SshDriver +from threading import Thread +from SshExec import SshExec + +class SshTunnel(SshDriver): + ''' + @summary: this class extends L{SshDriver} and establishes a connection + to the requested SSH server and sets up local port + forwarding (the openssh -L option) from a local port through a tunneled + connection to a destination reachable from the SSH server machine. + @ivar t: the thread container + @type t: threading.Thread or None + ''' + + class ForwardServer (SocketServer.ThreadingTCPServer): + daemon_threads = True + allow_reuse_address = True + + class Handler (SocketServer.BaseRequestHandler): + def handle(self): + try: + chan = self.ssh_transport.open_channel('direct-tcpip', + (self.chain_host, self.chain_port), + self.request.getpeername()) + except Exception, e: + Driver.logger.debug('Incoming request to %s:%d failed: %s' % (self.chain_host, + self.chain_port, + repr(e))) + return + if chan is None: + Driver.logger.debug('Incoming request to %s:%d was rejected by the SSH server.' % + (self.chain_host, self.chain_port)) + return + + Driver.logger.debug('Tunnel open %r -> %r -> %r' % (self.request.getpeername(), + chan.getpeername(), (self.chain_host, self.chain_port))) + while True: + r, _, _ = select.select([self.request, chan], [], []) + if self.request in r: + data = self.request.recv(1024) + if len(data) == 0: + break + chan.send(data) + if chan in r: + data = chan.recv(1024) + if len(data) == 0: + break + self.request.send(data) + chan.close() + self.request.close() + Driver.logger.debug('Tunnel closed from %r' % (self.request.getpeername(),)) + + def __init__(self): + ''' + @summary: allocates thread pointer container + ''' + SshDriver.__init__(self) + self.t = None + + def connect(self, host, credential, localport, port, remoteserver, remoteport, known_host = None): + ''' + @summary: set up the tunnel connection + @param host: the host name of the remote server acting a port forwarder + @type host: str + @param credential: the secret to use for connection set up + @type credential: L{Credential} + @param localport: the local port entry mapped to the remoteport + @type localport: int + @param port: the port of the forwarder ssh server + @type port: int + @param remoteserver: the sink of the tunnel + @type remoteserver: str + @param remoteport: the port of the tunnel sink + @type remoteport: int + @param known_host: a file name containing host signatures to check, if None AutoAddPolicy applies + @type known_host: str + ''' + SshDriver.connect(self, host, credential, port, known_host) + self.logger.info('Now forwarding port %d to %s:%d ...' % (localport, remoteserver, remoteport)) + self.t = Thread(target = self._tran, kwargs = {'localport': localport, 'remoteserver': remoteserver, 'remoteport': remoteport}) + self.t.daemon = True + self.t.start() + + def _tran(self, localport, remoteserver, remoteport): + ''' + @summary: thread worker to transport data over the tunnel + @param localport: the local port entry mapped to the remoteport + @type localport: int + @param remoteserver: the sink of the tunnel + @type remoteserver: str + @param remoteport: the port of the tunnel sink + @type remoteport: int + ''' + try: + # this is a little convoluted, but lets me configure things for the Handler + # object. (SocketServer doesn't give Handlers any way to access the outer + # server normally.) + class SubHander (self.Handler): + chain_host = remoteserver + chain_port = remoteport + ssh_transport = self.client.get_transport() + self.service = self.ForwardServer(('', localport), SubHander) + self.service.serve_forever() + except KeyboardInterrupt: + self.logger.debug('C-c: Port forwarding stopped.') + self.close() + + def close(self): + ''' + @summary: stops the thread and tears down the tunnel + ''' + if self.t is None: + return + self.t.join(timeout = self.timeout) + self.t = None + self.service.shutdown() + self.logger.info('Port forwarding stopped @ %s.' % self.host) + SshDriver.close(self) + + +class SshExecTunnel(SshTunnel): + ''' + @summary: an extension of the L{SshTunnel} driver to execute commands + on the remote machine accessed via the tunnel + @ivar command: the string representation of the command to run + @type command: str + @ivar localdriver: the representation of an ssh client connecting over an existing ssh tunnel + @type localdriver: L{SshExec} + ''' + + def __init__(self, host, credential, localport, port, remoteserver, remoteport, remotecredential = None, command = "echo helloworld @ `hostname`", known_host = None): + ''' + @summary: initializes an ssh connection and stores a default command + @param host: the host name of the remote server acting a port forwarder + @type host: str + @param credential: the secret to use for tunnel set up + @type credential: L{Credential} + @param localport: the local port entry mapped to the remoteport + @type localport: int + @param port: the port of the forwarder ssh server + @type port: int + @param remoteserver: the sink of the tunnel + @type remoteserver: str + @param remoteport: the port of the tunnel sink + @type remoteport: int + @param remotecredential: the secret to use for connection set up, if None then we fall back to the credential + @type remotecredential: L{Credential} or None + @param command: the default remote command + @type command: str + @param known_host: a file name containing host signatures to check, if None AutoAddPolicy applies + @type known_host: str + ''' + SshTunnel.__init__(self) + self.connect(host, credential, localport, port, remoteserver, remoteport, known_host) + self.command = command + if remotecredential is None: + remotecredential = credential + self.localdriver = SshExec(host = 'localhost', credential = remotecredential, port = localport, command = command, known_host = None) + self.logger.info("connected over tunnel") + + def execute(self, command = None): + ''' + @summary: executes a remote command + @param command: the command to run, if None, the default command is issued + @type command: str or None + @return: the standard output of the command run + @rtype: paramico.ChannelFile + ''' + return self.localdriver.execute(command) diff --git a/Monitoring/MonitoringService/Driver/__init__.py b/Monitoring/MonitoringService/Driver/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/Monitoring/MonitoringService/Driver/__init__.py diff --git a/Monitoring/MonitoringService/Driver/test.py b/Monitoring/MonitoringService/Driver/test.py new file mode 100644 index 0000000..4395750 --- /dev/null +++ b/Monitoring/MonitoringService/Driver/test.py @@ -0,0 +1,153 @@ +''' +Created on Aug 10, 2011 + +@author: steger +''' +import unittest +from random import randint +from Example.Resources import PLnodes +from threading import Thread +from Example.credentials import noviCredential, sonomaCredential +from SshExec import SshExec +from SOAPClient import SOAPClient +from LocalExec import LocalExec +from logging import FileHandler +import logging +from time import sleep +from SshTunnel import SshExecTunnel + +class Test(unittest.TestCase): + testnodes = map(lambda x: x.get_ipaddress("eth0")[0], PLnodes) + cred_novi = noviCredential + url_sonoma = "http://complex.elte.hu/~steger/sonoma/user.wsdl" + cred_sonoma = sonomaCredential + + + def setUp(self): + pass + + def tearDown(self): + pass + + def gettestnode(self): + ''' + @summary: Return a test node IP address chosen random + @return: ip address + @rtype: string + ''' + return self.testnodes[randint(1, len(self.testnodes))-1] + + def test_helloworld(self): + ''' + @summary: Execute local command + ''' + proc = LocalExec() + result = proc.execute() + self.assertTrue(result.startswith("helloworld"), "Local command output differs from expected") + + def test_helloworldWithMaster(self): + ''' + @summary: Execute remote command in the name of the testuser authenticated with the master key + ''' + proc = SshExec(host = self.gettestnode(), credential = self.cred_novi) + result = proc.execute().read() + self.assertTrue(result.startswith("helloworld @ "), "Remote command output differs from expected") + + def echoWithMaster(self, address): + ''' + @summary: Execute remote echo command in the name of the testuser authenticated with the master key + @param address: ip address of the remote machine + @type address: string + ''' + try: + n = randint(0, 10000) + command = "echo %d" % n + proc = SshExec(host = address, credential = self.cred_novi, command = command) + result = proc.execute().read() + self.assertTrue(result.strip() == str(n), "Remote command @%s output differs from expected: (%s != %d)" % (address, result, n)) + except Exception, e: + self.assertFalse(True, "Got an error %s" % e) + + def test_echoWithMaster(self): + ''' + @summary: Execute remote echo command in the name of the testuser authenticated with the master key + ''' + self.echoWithMaster(self.gettestnode()) + + def test_distributedEcho(self): + ''' + @summary: Execute parallel remote echo commands in a distributed fashion + ''' + threads = [] + for n in self.testnodes: + t = Thread(target = self.echoWithMaster, args = (n,)) + t.daemon = True + t.start() + threads.append(t) + while len(threads): + t = threads.pop() + t.join(5) + + def test_parallelEcho(self): + ''' + @summary: Execute parallel remote echo commands in a test node + ''' + N = 20 + n = self.gettestnode() + threads = [] + while N: + N -= 1 + t = Thread(target = self.echoWithMaster, args = (n,)) + t.daemon = True + t.start() + threads.append(t) + while len(threads): + t = threads.pop() + t.join(5) + + def test_stress(self): + ''' + @summary: Consecutively execute parallel remote echo commands in a distributed fashion + ''' + threads = [] + for n in self.testnodes: + N = randint(5, 20) + while N: + N -= 1 + t = Thread(target = self.echoWithMaster, args = (n,)) + t.daemon = True + t.start() + threads.append(t) + while len(threads): + t = threads.pop() + t.join(5) + + def test_soap(self): + ''' + @summary: Run SONoMA getNodeList + ''' + client = SOAPClient(self.url_sonoma) + resources = client.service.getNodeList(filter = "AVAILABLE") + self.assertGreater(len(resources), 0, "sonoma reports no nodes") + + def test_tunnel(self): + T = SshExecTunnel(host = 'novilab.elte.hu', credential = noviCredential, localport = 4000, port = 22, + remoteserver = 'smilax1.man.poznan.pl', remoteport = 22) + response = T.execute().read().strip() + expected = "helloworld @ smilax1.man.poznan.pl" + self.assertEqual(response, expected, "Remote command output differs from expected %s != %s" % (expected, response)) + +if __name__ == "__main__": + #import sys;sys.argv = ['', 'Test.test_helloworldWithMaster'] + fn = "/tmp/Driver_test.log" + hdlr = FileHandler(filename = fn, mode = 'w') + l = logging.getLogger("NOVI.DRIVER") + l.setLevel(level = logging.DEBUG) +# l.setLevel(level = logging.INFO) + l.addHandler(hdlr = hdlr) + l.info("START TEST") + try: + unittest.main() + finally: + l.info("FINISHED TEST") + hdlr.close() diff --git a/Monitoring/MonitoringService/Example/Dimensions.py b/Monitoring/MonitoringService/Example/Dimensions.py new file mode 100644 index 0000000..526ee11 --- /dev/null +++ b/Monitoring/MonitoringService/Example/Dimensions.py @@ -0,0 +1,42 @@ +''' +Created on Oct 12, 2011 + +@author: steger +@summary: Here we declare some unit models to enable parameter conversions +''' +from DataProcessing.Dimension import DimensionManager +from Example.Units import UM +from DataProcessing.MeasurementLevel import Nominal, Interval, Ratio + +DM = DimensionManager(unitmanager = UM) + +basedimensions = [ + ("Cardinal", "unitless", Nominal), + ("NameOfSomething", "unitless", Nominal), + ("Countable", "piece", Ratio), + ("InformationSize", "bit", Ratio), + ("IPAddress", "ipv4dotted", Nominal), + ("PointInTime", "unixtimestamp", Interval), + ] + +deriveddimensions = [ + ("TimeInterval", "second", "PointInTime", DM.DifferenceDimension), + ("Probability", "fraction", "Countable", DM.RatioDimension), + ] + +for reference, unitreference, measurementlevel in basedimensions: + DM.newBaseDimension(reference, reference, UM[unitreference], measurementlevel) + +for reference, unitreference, ancestorreference, dimtype in deriveddimensions: + DM.newDerivedDimension(reference, reference, UM[unitreference], DM[ancestorreference], dimtype) + + +#Some dimensions explicitely references +nameofsomething = DM["NameOfSomething"] +pointintime = DM["PointInTime"] +timeinterval = DM["TimeInterval"] +cardinal = DM["Cardinal"] +countable = DM["Countable"] +ipaddress = DM["IPAddress"] +informationsize = DM["InformationSize"] +probability = DM["Probability"] diff --git a/Monitoring/MonitoringService/Example/Metrics.py b/Monitoring/MonitoringService/Example/Metrics.py new file mode 100644 index 0000000..e0d2b82 --- /dev/null +++ b/Monitoring/MonitoringService/Example/Metrics.py @@ -0,0 +1,68 @@ +''' +Created on Oct 12, 2011 + +@author: steger +@summary: Here we declare monitorable metrics and combine them with tools that are measuring them +''' +from Example.Tools import sshping, sonomashortping, sshtraceroute, sshmeminfo,\ + sonomashortchirp +from DataProcessing.Parameter import ParameterList, Parameter +from Example.Units import UM, unitless, milli_second, Byte, piece +from Example.Dimensions import nameofsomething, informationsize,\ + timeinterval, cardinal, countable +from Resource.node import node +from Resource.path import path + +class FreeMemory(object): + name = 'Free Memory' + resourcetype = node + p_obligatory = ParameterList() + p_optional = ParameterList() + +class DiskUsage(object): + name = 'Disk Usage' + resourcetype = node + p_obligatory = ParameterList([ + Parameter(name = "Directory", valuetype = str, unitmanager = UM, dimension = nameofsomething, default = ('/dev/mapper/planetlab-vservers', unitless)) + ]) + p_optional = ParameterList() + +class RoundTripDelay(object): + name = 'Round Trip Delay' + resourcetype = path + p_obligatory = ParameterList() + p_optional = ParameterList([ + Parameter(name = "Count", valuetype = int, unitmanager = UM, dimension = countable, default = (5, piece)), + Parameter(name = "PacketSize", valuetype = int, unitmanager = UM, dimension = informationsize, default = (64, Byte)), + Parameter(name = "Delay", valuetype = float, unitmanager = UM, dimension = timeinterval, default = (200, milli_second)), + Parameter(name = "TimeToLive", valuetype = int, unitmanager = UM, dimension = countable, default = (32, piece)), + Parameter(name = "Interface", valuetype = str, unitmanager = UM, dimension = nameofsomething, default = ("eth0", unitless)), + ]) + +class OnewayDelay(object): + name = 'One Way Delay' + resourcetype = path + p_obligatory = ParameterList() + p_optional = ParameterList([ + Parameter(name = "Count", valuetype = int, unitmanager = UM, dimension = countable, default = (5, piece)), + Parameter(name = "Delay", valuetype = int, unitmanager = UM, dimension = timeinterval, default = (200, milli_second)), + Parameter(name = "TimeToLive", valuetype = int, unitmanager = UM, dimension = countable, default = (32, piece)), +# Parameter(name = "Interface", valuetype = str, unitmanager = UM, dimension = nameofsomething, default = (novi_iface, unitless)), + Parameter(name = "PacketSize", valuetype = int, unitmanager = UM, dimension = informationsize, default = (64, Byte)), + Parameter(name = "SourcePort", valuetype = int, unitmanager = UM, dimension = cardinal, default = (7777, unitless)), + Parameter(name = "DestinationPort", valuetype = int, unitmanager = UM, dimension = cardinal, default = (7777, unitless)), + ]) + +class HopMeasurement(object): + name = 'Hop Measurement' + resourcetype = path + p_obligatory = ParameterList() + p_optional = ParameterList() + + +MonitorMetrics = { + FreeMemory: [sshmeminfo], + RoundTripDelay: [sshping, sonomashortping], + OnewayDelay: [sonomashortchirp], + HopMeasurement: [sshtraceroute] +} diff --git a/Monitoring/MonitoringService/Example/Platforms.py b/Monitoring/MonitoringService/Example/Platforms.py new file mode 100644 index 0000000..5936124 --- /dev/null +++ b/Monitoring/MonitoringService/Example/Platforms.py @@ -0,0 +1,10 @@ +''' +Created on Nov 20, 2012 + +@author: steger +''' +from Example.model import owl_pl_conf, owl_fed_conf +federation = { + 'PlanetLab': (1234, owl_pl_conf), + 'FEDERICA': (1235, owl_fed_conf) +} diff --git a/Monitoring/MonitoringService/Example/Prefixes.py b/Monitoring/MonitoringService/Example/Prefixes.py new file mode 100644 index 0000000..54fa7b9 --- /dev/null +++ b/Monitoring/MonitoringService/Example/Prefixes.py @@ -0,0 +1,23 @@ +''' +Created on Oct 12, 2011 + +@author: steger +@summary: Here we declare some unit models to enable parameter conversions +''' +from DataProcessing.Prefix import PrefixManager + +prefixes = [ + ('pico', 'p', 10, -12), + ('nano', 'n', 10, -9), + ('micro', 'mu', 10, -6), + ('milli', 'm', 10, -3), + ('deco', 'd', 10, 0), + ('hecto', 'h', 10, 2), + ('kilo', 'k', 10, 3), + ('mega', 'M', 10, 6), + ('giga', 'G', 10, 9), +] + +PM = PrefixManager() +for reference, symbol, base, exponent in prefixes: + PM.newPrefix(reference, symbol, base, exponent) diff --git a/Monitoring/MonitoringService/Example/Resources.py b/Monitoring/MonitoringService/Example/Resources.py new file mode 100644 index 0000000..6c3418d --- /dev/null +++ b/Monitoring/MonitoringService/Example/Resources.py @@ -0,0 +1,61 @@ +''' +Created on Oct 12, 2011 + +@author: steger +@summary: Here we define the nodes that can take part in monitoring procedures + +@note: how to extract information +for h in novilab.elte.hu planetlab1-novi.lab.netmode.ece.ntua.gr planetlab2-novi.lab.netmode.ece.ntua.gr smilax1.man.poznan.pl smilax2.man.poznan.pl smilax3.man.poznan.pl smilax4.man.poznan.pl smilax5.man.poznan.pl; do echo -n "\"$h\", "; ssh site_admin@$h -i ~/Private/ssh/novi_rsa /sbin/ifconfig | awk '/^[^[:space:]]/ { iface = $1} /inet addr/ { printf ("(\"%s\", \"%s\"), ", iface, $2) }' | sed s,addr.,,g | sed s/', $'// ; done +''' + +from Resource.node import node +from Resource.path import path +from Resource.interface import interface +from Example.Units import UM + +# PL node resources +direction = interface.INGRESS | interface.EGRESS +PLnodes = [] +def extendpl(hostname, ifaces): + n = node(name = hostname, resourceid = hostname) + for iface, ip in ifaces: + I = interface(name = iface, resourceid = "%s:%s" % (hostname, iface)) + ipwu = ip, UM.ipv4dotted + if iface == "eth0": + I.setvalues(ifacename = iface, address = ipwu, ispublic = True, direction = direction, hostname = hostname) + else: + I.setvalues(ifacename = iface, address = ipwu, ispublic = False, direction = direction) + n.addinterface(I) + PLnodes.append(n) + +extendpl("novilab.elte.hu", [("eth0", "157.181.175.243"), ("federica", "192.168.29.45"), ("novi", "192.168.28.97"), ("novi_monitoring", "192.168.31.21")]) +extendpl("planetlab1-novi.lab.netmode.ece.ntua.gr", [("eth0", "147.102.22.66"), ("federica", "192.168.29.57"), ("novi", "192.168.28.161"), ("novi_monitoring", "192.168.31.33"), ("tun515-1", "192.168.20.1")]) +extendpl("planetlab2-novi.lab.netmode.ece.ntua.gr", [("eth0", "147.102.22.67"), ("federica", "192.168.29.61"), ("novi", "192.168.28.165"), ("tap514-1", "192.168.20.3")]) +extendpl("smilax1.man.poznan.pl", [("eth0", "150.254.160.19"), ("federica", "192.168.29.21"), ("novi", "192.168.28.29"), ("novi_fia_1", "192.168.32.5"), ("novi_monitoring", "192.168.31.13"), ("tap513-1", "192.168.20.4")]) +#extendpl("smilax2.man.poznan.pl", [("eth0", "150.254.160.20"), ("federica", "192.168.29.25"), ("novi", "192.168.28.33"), ("novi_fia_2", "192.168.32.5")]) +#extendpl("smilax3.man.poznan.pl", [("eth0", "150.254.160.21"), ("federica", "192.168.29.29"), ("novi", "192.168.28.37"), ("novi_fia_2", "192.168.32.17")]) +#extendpl("smilax4.man.poznan.pl", [("eth0", "150.254.160.22"), ("federica", "192.168.29.33"), ("novi", "192.168.28.41")]) +#extendpl("smilax5.man.poznan.pl", [("eth0", "150.254.160.23"), ("federica", "192.168.29.37"), ("novi", "192.168.28.45")]) + +PLdict = dict(map(lambda x: (x.name, x), PLnodes)) + +# PL are fully connected over the Internet +PLpaths = [] +for s in PLdict.values(): + for d in PLdict.values(): + if s == d: continue + name = "%s->%s" % (s.name, d.name) + PLpaths.append( path(name = name, source = s, destination = d) ) + + +# FED node resources +FEDnodes = [] +for nick, addr in [ ("fed.psnc", '192.168.31.1'), ("fed.dfn", '192.168.31.5'), ("fed.garr", '192.168.31.9') ]: + n = node(name = nick, resourceid = nick) + I = interface(name = "eth0", resourceid = "%s:eth0" % nick) + ipwu = (addr, UM.ipv4dotted) + I.setvalues(ifacename = "eth0", address = ipwu, ispublic = False, direction = direction) + n.addinterface(I) + FEDnodes.append(n) + +FEDdict = dict(map(lambda x: (x.name, x), FEDnodes)) diff --git a/Monitoring/MonitoringService/Example/Tools.py b/Monitoring/MonitoringService/Example/Tools.py new file mode 100644 index 0000000..d842d04 --- /dev/null +++ b/Monitoring/MonitoringService/Example/Tools.py @@ -0,0 +1,311 @@ +''' +Created on Oct 12, 2011 + +@author: steger +@summary: Here we define some monitoring tools and dress them up with parameters and work flow description +''' +from DataProcessing.Parameter import Parameter, ParameterList +from Example.Resources import PLdict +from Credential.credentialtypes import UsernamePassword, UsernameRSAKey +from Driver.SOAPClient import SOAPClient +from Driver.SshExec import SshExec +from Example.Units import UM, Byte, micro_second, piece, milli_second,\ + nano_second, unitless, nano_unixtimestamp, unixtimestamp, fraction,\ + kilo_Byte, second +from Example.Dimensions import cardinal, countable, ipaddress, timeinterval,\ + informationsize, pointintime, nameofsomething, probability +from DataProcessing.DataHeader import DataHeaderGeneratedByDescription + +DOM_SUBSTRATE = 1 +DOM_SLICE = 2 + +sonoma_url = "http://complex.elte.hu/~steger/sonoma/user.wsdl" + +nodes = map(lambda x:(x.get_hostname("eth0"), unitless), PLdict.values()) + +class sonomashortping: + driver = SOAPClient + name = "SONoMAPing" + domain = DOM_SUBSTRATE + dataheaderdeclaration = DataHeaderGeneratedByDescription('ping', [('Run', countable), + ('Sequence Number', countable), + ('Source Address', ipaddress), + ('Destination Address', ipaddress), + ('Packet Size', informationsize, Byte), + ('Time To Live', countable), + ('Round Trip Delay', timeinterval, micro_second)]) + + authtype = (UsernamePassword, ) + kwargs = { "url": sonoma_url, "MAserviceport": 11123 } + hooks = { + "prehook" : """ +from base64 import b64decode +self.decode = b64decode +self.pattern = re.compile('^(\d+)\s+(\d+\.\d+\.\d+\.\d+)\s+(\d+\.\d+\.\d+\.\d+)\s+(\d+)\s+(\d+)\s+(\d+)$') +self.username=self.credential.username +self.password=self.credential.password +self.client = self.driver(kw.get('url')) +self.sessionId = self.client.service.requestSession(self.username, self.password, 'CSV', False) +self.port = kw.get('MAserviceport') +self.template = self.data.getTemplate(size = 1) + """, + "retrievehook" : """ +source = "%s:%d" % (self.parameters.get('SourceAddress', self.um.ipv4dotted), self.port) +res = self.client.service.shortPing(self.sessionId, + source, self.parameters.get('DestinationAddress', self.um.ipv4dotted), self.parameters.get('Count', self.um.piece), + self.parameters.get('Delay', self.um.micro_second), self.parameters.get('PacketSize', self.um.Byte)) +rec = self.decode(res).splitlines() +for r in rec: + if self.pattern.match(r): + self.template.clear() + ex = self.pattern.split(r)[:-1] + ex[0] = self.runcount + self.template.updateMany( ('Run', 'Sequence Number', 'Source Address', 'Destination Address', 'Packet Size', 'Time To Live', 'Round Trip Delay'), [ex,] ) + self.data.saveRecord(self.template) +return True + """, + "posthook": "self.client.service.closeSession(self.username, self.password, self.sessionId)"} + parameters = ParameterList([ Parameter(name = "SourceAddress", valuetype = str, unitmanager = UM, dimension = ipaddress), + Parameter(name = "DestinationAddress", valuetype = str, unitmanager = UM, dimension = ipaddress), + Parameter(name = "Count", valuetype = int, unitmanager = UM, dimension = countable, default = (5, piece)), + Parameter(name = "Delay", valuetype = int, unitmanager = UM, dimension = timeinterval, default = (100, milli_second)), + Parameter(name = "PacketSize", valuetype = int, unitmanager = UM, dimension = informationsize, default = (64, Byte)) ]) + +class sonomashortchirp: + driver = SOAPClient + name = "SONoMAChirp" + domain = DOM_SUBSTRATE + dataheaderdeclaration = DataHeaderGeneratedByDescription('onewaydelay', [('Run', countable), + ('SequenceNumber', countable), + ('SourceAddress', ipaddress), + ('DestinationAddress', ipaddress), + ('TimestampSend', pointintime, nano_unixtimestamp), + ('OnewayDelay', timeinterval, nano_second) ]) + authtype = (UsernamePassword, ) + kwargs = { "url": sonoma_url, "MAserviceport": 11123 } + hooks = { + "prehook" : """ +from base64 import b64decode +self.decode = b64decode +self.pattern = re.compile('^(\d+)\s+(\d+\.\d+\.\d+\.\d+)\s+(\d+\.\d+\.\d+\.\d+)\s+(\d+)\s+(\d+)$') +self.username=self.credential.username +self.password=self.credential.password +self.client = self.driver(kw.get('url')) +self.sessionId = self.client.service.requestSession(self.username, self.password, 'CSV', False) +self.port = kw.get('MAserviceport') +self.template = self.data.getTemplate(size = 1) +self.delaylist = self.client.factory.create("delayList") +self.delaylist.gap = [100000,100000] + """, + "retrievehook" : """ +source = "%s:%d" % (self.parameters.get('SourceAddress', self.um.ipv4dotted), self.port) +destination = "%s:%d" % (self.parameters.get('DestinationAddress', self.um.ipv4dotted), self.port) +res = self.client.service.shortChirp(self.sessionId, + source, self.parameters.get('SourcePort', self.um.unitless), + destination, self.parameters.get('DestinationPort', self.um.unitless), + self.parameters.get('Count', self.um.piece), self.parameters.get('Delay', self.um.milli_second), + self.parameters.get('PacketSize', self.um.Byte), self.delaylist) +rec = self.decode(res).splitlines() +data = [] +for r in rec: + if self.pattern.match(r): + self.template.clear() + ex = self.pattern.split(r)[:-1] + ex[0] = self.runcount + ex[-1] = int(ex[-1])-int(ex[-2]) + data.append( ex ) +self.template.clear(size = len(data)) +self.template.updateMany( ('Run', 'SequenceNumber', 'SourceAddress', 'DestinationAddress', 'TimestampSend', 'OnewayDelay'), data ) +self.data.saveRecord(self.template) +return True + """, + "posthook": "self.client.service.closeSession(self.username, self.password, self.sessionId)"} + parameters = ParameterList([ Parameter(name = "SourceAddress", valuetype = str, unitmanager = UM, dimension = ipaddress), + Parameter(name = "DestinationAddress", valuetype = str, unitmanager = UM, dimension = ipaddress), + Parameter(name = "Count", valuetype = int, unitmanager = UM, dimension = countable, default = (5, piece)), + Parameter(name = "Delay", valuetype = int, unitmanager = UM, dimension = timeinterval, default = (100, milli_second)), + Parameter(name = "PacketSize", valuetype = int, unitmanager = UM, dimension = informationsize, default = (64, Byte)), + Parameter(name = "SourcePort", valuetype = int, unitmanager = UM, dimension = cardinal, default = (7777, unitless)), + Parameter(name = "DestinationPort", valuetype = int, unitmanager = UM, dimension = cardinal, default = (7777, unitless)), ]) + + +class sshping: + driver = SshExec + name = "sshPing" + domain = DOM_SLICE + dataheaderdeclaration = DataHeaderGeneratedByDescription('ping', [('Run', cardinal), + ('TimeReceived', pointintime), + ('PacketSize', informationsize), + ('DestinationAddress', ipaddress), + ('SequenceNumber', countable), ('TimeToLive', countable), + ('RoundTripDelay', timeinterval, milli_second)]) + authtype = (UsernameRSAKey, UsernamePassword) + kwargs = {} + hooks = { + "prehook" : """ +self.pattern = re.compile('^\[(\d+\.?\d*)\]\s*(\d+)\s*bytes\s*from\s*(\d+\.\d+\.\d+\.\d+):\s*icmp_req=(\d+)\s*ttl=(\d+)\s*time=(\d+\.?\d*)\s*(\w*)') +self.template = self.data.getTemplate(size = self.parameters.get('Count', self.um.piece)) +command = "ping -D -n -c %d -i %f -t %d -I %s %s" % ( + self.parameters.get('Count', self.um.piece), self.parameters.get('Delay', self.um.second), + self.parameters.get('TimeToLive', self.um.piece), self.parameters.get('Interface', self.um.unitless), + self.parameters.get('DestinationAddress', self.um.ipv4dotted)) +self.client = self.driver(host = self.parameters.get('SourceAddress', self.um.ipv4dotted), credential = self.credential, command = command) + """, + "retrievehook" : """ +data = [] +for r in self.client.execute().readlines(): + if self.pattern.match(r): + ex = self.pattern.split(r)[:-2] + ex[0] = self.runcount + data.append( ex ) +self.template.clear(size = len(data)) +self.template.updateMany( ('Run', 'TimeReceived', 'PacketSize', 'DestinationAddress', 'SequenceNumber', 'TimeToLive', 'RoundTripDelay'), data ) +self.data.saveRecord(self.template) +return True + """} + parameters = ParameterList([ Parameter(name = "SourceAddress", valuetype = str, unitmanager = UM, dimension = ipaddress), + Parameter(name = "DestinationAddress", valuetype = str, unitmanager = UM, dimension = ipaddress), + Parameter(name = "Count", valuetype = int, unitmanager = UM, dimension = countable, default = (5, piece)), + Parameter(name = "Delay", valuetype = float, unitmanager = UM, dimension = timeinterval, default = (200, milli_second)), + Parameter(name = "TimeToLive", valuetype = int, unitmanager = UM, dimension = countable, default = (32, piece)), + Parameter(name = "Interface", valuetype = str, unitmanager = UM, dimension = nameofsomething, default = ("eth0", unitless)) ] ) + +class sshmeminfo: + driver = SshExec + name = "sshMeminfo" + domain = DOM_SLICE | DOM_SUBSTRATE + dataheaderdeclaration = DataHeaderGeneratedByDescription('meminfo', [('Run', cardinal), + ('AvailableMemory', informationsize), + ('FreeMemory', informationsize)]) + authtype = (UsernameRSAKey, UsernamePassword) + kwargs = {} + hooks = { + "prehook" : """ +self.pattern = re.compile('^(.*):\s*(\d+)\s+(.B)$') +self.template = self.data.getTemplate(size = 1) +command = "cat /proc/meminfo" +self.client = self.driver(host = self.parameters.get('SourceAddress', self.um.ipv4dotted), credential = self.credential, command = command) + """, + "retrievehook" : """ +self.template.clear() +self.template.update('Run', (self.runcount,)) +for r in self.client.execute().readlines(): + if self.pattern.match(r): + n, v, u = self.pattern.split(r)[1:-1] + if n == 'MemTotal' and u == 'kB': + self.template.update('AvailableMemory', (v,)) + elif n == 'MemFree' and u == 'kB': + self.template.update('FreeMemory', (v,)) +self.data.saveRecord(self.template) +return True + """} + parameters = ParameterList([ Parameter(name = "SourceAddress", valuetype = str, unitmanager = UM, dimension = ipaddress), ]) + +class sshdf: + driver = SshExec + name = "sshDiskinfo" + domain = DOM_SLICE | DOM_SUBSTRATE + dataheaderdeclaration = DataHeaderGeneratedByDescription('diskinfo', [('Run', cardinal), + ('Available', informationsize, kilo_Byte), + ('Used', informationsize, kilo_Byte)]) + authtype = (UsernameRSAKey, UsernamePassword) + kwargs = {} + hooks = { + "prehook" : """ +self.pattern = re.compile('^.*\s+\d+\s+(\d+)\s+(\d+)\s+\d+%\s+.*$') +self.template = self.data.getTemplate(size = 1) +command = "df %s" % self.parameters.get('Directory', self.um.unitless) +self.client = self.driver(host = self.parameters.get('SourceAddress', self.um.ipv4dotted), credential = self.credential, command = command) + """, + "retrievehook" : """ +self.template.clear() +self.template.update('Run', (self.runcount,)) +for r in self.client.execute().readlines(): + if self.pattern.match(r): + u, a = self.pattern.split(r)[1:-1] + self.template.update('Available', (a,)) + self.template.update('Used', (u,)) +self.data.saveRecord(self.template) +return True + """} + parameters = ParameterList([ + Parameter(name = "SourceAddress", valuetype = str, unitmanager = UM, dimension = ipaddress), + Parameter(name = "Directory", valuetype = str, unitmanager = UM, dimension = nameofsomething), + ]) + + +class sshtraceroute: + driver = SshExec + name = "sshTraceroute" + domain = DOM_SLICE + dataheaderdeclaration = DataHeaderGeneratedByDescription('traceroute', [('Run', cardinal), + ('Hop', countable), + ('Raw', nameofsomething)]) + authtype = (UsernameRSAKey, UsernamePassword) + kwargs = {} + hooks = { + "prehook" : """ +self.pattern = re.compile('^\s*(\d+)\s+(.*)$') +self.template = self.data.getTemplate(size = 1) +command = "traceroute -n %s" % (self.parameters.get('DestinationAddress', self.um.ipv4dotted)) +self.client = self.driver(host = self.parameters.get('SourceAddress', self.um.ipv4dotted), credential = self.credential, command = command) + """, + "retrievehook" : """ +data = [] +for r in self.client.execute().readlines(): + if self.pattern.match(r): + ex = self.pattern.split(r)[:-1] + ex[0] = self.runcount + data.append( ex ) +self.template.clear(size = len(data)) +self.template.updateMany( ('Run', 'Hop', 'Raw'), data ) +self.data.saveRecord(self.template) +return True + """} + parameters = ParameterList([ Parameter(name = "SourceAddress", valuetype = str, unitmanager = UM, dimension = ipaddress), + Parameter(name = "DestinationAddress", valuetype = str, unitmanager = UM, dimension = ipaddress), + Parameter(name = "Count", valuetype = int, unitmanager = UM, dimension = countable, default = (5, piece)), ]) + +class sshhades: + driver = SshExec + name = "HADESaggregates" + domain = DOM_SUBSTRATE + dataheaderdeclaration = DataHeaderGeneratedByDescription('hadestable', [('Run', cardinal), + ('Time', pointintime, unixtimestamp), + ('MinDelay', timeinterval, second), + ('MedianDelay', timeinterval, second), + ('MaxDelay', timeinterval, second), + ('Loss', probability, fraction), + ]) + authtype = (UsernameRSAKey, UsernamePassword) + kwargs = { 'repository': '194.132.52.212', 'samplecount': 9 } + hooks = { + "prehook" : """ +self.repository = kw.get('repository') +self.pattern = re.compile('^(\d+)\s+(-?\d+\.?\d*)\s+(-?\d+\.?\d*)\s+(-?\d+\.?\d*)\s+(\d+)\s+.*$') +self.template = self.data.getTemplate(size = 1) +lookup = { '192.168.31.1': 'PSNC_FED', '192.168.31.5': 'DFN_FED', '192.168.31.9': 'GARR_FED' } +root = "/home/novi-monitoring" +source = lookup[ self.parameters.get('SourceAddress', self.um.ipv4dotted) ] +destination = lookup[ self.parameters.get('DestinationAddress', self.um.ipv4dotted) ] +lookupcommand = "echo %s/data/hades/novi/www/*/*/*/%s.%s.0.qos_ai.dat" % (root, source, destination) +self.client = self.driver(host = self.repository, credential = self.credential) +files = self.client.execute(lookupcommand).read().split() +self.command = "%s/hades/bin/hades-show-data.pl --config=novi %s" % (root, files[-1]) +self.nsamples = int(kw.get('samplecount')) + """, + "retrievehook" : """ +data = [] +for r in self.client.execute(self.command).readlines(): + print r + if self.pattern.match(r): + ts, dtmin, dtmed, dtmax, loss = self.pattern.split(r)[1:-1] + data.append( [ self.runcount, ts, dtmin, dtmed, dtmax, float(loss)/self.nsamples ] ) +self.template.clear(size = len(data)) +self.template.updateMany( ('Run', 'Time', 'MinDelay', 'MedianDelay', 'MaxDelay', 'Loss'), data ) +self.data.saveRecord(self.template) +return True + """} + parameters = ParameterList([ Parameter(name = "SourceAddress", valuetype = str, unitmanager = UM, dimension = ipaddress), + Parameter(name = "DestinationAddress", valuetype = str, unitmanager = UM, dimension = ipaddress), + ]) diff --git a/Monitoring/MonitoringService/Example/Units.py b/Monitoring/MonitoringService/Example/Units.py new file mode 100644 index 0000000..83ac386 --- /dev/null +++ b/Monitoring/MonitoringService/Example/Units.py @@ -0,0 +1,80 @@ +''' +Created on Oct 12, 2011 + +@author: steger +@summary: Here we declare some unit models to enable parameter conversions +''' +from DataProcessing.Unit import UnitManager +from Example.Prefixes import PM +from DataProcessing.DataError import PrefixError + +UM = UnitManager() + +def getPrefixBySymbol(symbol): + ''' + @summary: look up the prefix in the PrefixManager based on its symbol + @param symbol: the symbol of the unit prefix + @type symbol: str + @return: the unit prefix found + @rtype: Prefix + @raise PrefixError: Prefix with symbol not found + ''' + for prefix in PM.prefixes.values(): + if prefix.symbol == symbol: + return prefix + raise PrefixError("Prefix with symbol %s not found" % symbol) + + +basicunits = [ + ("piece", "(1)", None), + ("unitless", "", None), + ("fraction", "", None), + ("second", "s", ['m', 'mu', 'n', 'p']), + ("unixtimestamp", "tss", ['n']), + ("ipv4dotted", "", None), + ("bit", "bit", ['k', 'M' ]), + ] + +lintransformedunits = [ + ("dozen", "(12)", "piece", 12, None), + ("Byte", "B", "bit", 8, ['k', 'M' ]), + ] + +def storeprefixes(u, prefixes): + if prefixes: + for ps in prefixes: + p = getPrefixBySymbol(ps) + nr = "%s_%s" % (p.reference, u.reference) + ns = "%s%s" % (p.symbol, u.symbol) + UM.addLinearTransformedUnit(nr, ns, u, p.scale) + +for reference, symbol, prefixes in basicunits: + u = UM.newBasicUnit(reference, symbol) + storeprefixes(u, prefixes) + +for reference, symbol, ancientref, scale, prefixes in lintransformedunits: + u = UM.addLinearTransformedUnit(reference, symbol, UM[ancientref], scale) + storeprefixes(u, prefixes) + + +# Some units explicitely referenced +pico_second = UM["pico_second"] +nano_second = UM["nano_second"] +micro_second = UM["micro_second"] +milli_second = UM["milli_second"] +second = UM["second"] + +Byte = UM["Byte"] +kilo_Byte = UM["kilo_Byte"] + +piece = UM["piece"] +dozen = UM["dozen"] + +unitless = UM["unitless"] + +unixtimestamp = UM["unixtimestamp"] +nano_unixtimestamp = UM["nano_unixtimestamp"] + +fraction = UM["fraction"] + +ipv4dotted = UM["ipv4dotted"] \ No newline at end of file diff --git a/Monitoring/MonitoringService/Example/__init__.py b/Monitoring/MonitoringService/Example/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/Monitoring/MonitoringService/Example/__init__.py diff --git a/Monitoring/MonitoringService/Example/credentials.py b/Monitoring/MonitoringService/Example/credentials.py new file mode 100644 index 0000000..1abcdaf --- /dev/null +++ b/Monitoring/MonitoringService/Example/credentials.py @@ -0,0 +1,34 @@ +''' +Created on Oct 27, 2011 + +@author: steger +''' +from Credential.credentialtypes import UsernameRSAKey, UsernamePassword +from os import path + +noviCredentialIARGS = { 'username': 'novi_novi', 'rsakey': path.expanduser("~/Private/ssh/novi_rsa") } +noviCredential = UsernameRSAKey(**noviCredentialIARGS) + +novisaCredentialIARGS = { 'username': 'root', 'rsakey': path.expanduser("~/Private/ssh/novi_rsa") } +novisaCredential = UsernameRSAKey(**novisaCredentialIARGS) + +novihadesCredentialIARGS = { 'username': 'novi-monitoring', 'rsakey': path.expanduser("~/Private/ssh/novimonitoring_rsa") } +novihadesCredential = UsernameRSAKey(**novihadesCredentialIARGS) + +sonomaCredentialIARGS = {'username': "guest", 'password': "guest"} +sonomaCredential = UsernamePassword(**sonomaCredentialIARGS) + +g3CredentialIARGS = {'username': "monitor1", 'password': "m/n.t,r1"} +g3Credential = UsernamePassword(**g3CredentialIARGS) + +fedsubstrateCredentialIARGS = {'username': "novi-reader", 'password': "JS5no6vi7JS"} +fedsubstrateCredential = UsernamePassword(**fedsubstrateCredentialIARGS) + + +#mykeyring = [ noviCredentialIARGS, sonomaCredentialIARGS, novihadesCredentialIARGS ] + +ple_credentials = [ novisaCredentialIARGS, sonomaCredentialIARGS ] +fed_credentials = [ novisaCredentialIARGS, novihadesCredentialIARGS, g3CredentialIARGS, fedsubstrateCredentialIARGS ] + +if __name__ == '__main__': + pass \ No newline at end of file diff --git a/Monitoring/MonitoringService/Example/model.py b/Monitoring/MonitoringService/Example/model.py new file mode 100644 index 0000000..c7ec71a --- /dev/null +++ b/Monitoring/MonitoringService/Example/model.py @@ -0,0 +1,14 @@ +model_dir = "/home/steger/repo/git/novi/Software/information-model/monitoring-model" + +owl_unit = "%s/unit.owl" % model_dir +owl_param = "%s/monitoring_parameters.owl" % model_dir +owl_features = "%s/monitoring_features.owl" % model_dir +owl_task = "%s/monitoring_task.owl" % model_dir +owl_query = "%s/monitoring_query.owl" % model_dir +owl_stat = "%s/monitoring_stat.owl" % model_dir +owl_core = "%s/novi-im.owl" % model_dir + +owl_pl_conf = "%s/config_planetlab.owl" % model_dir +owl_fed_conf = "%s/config_federica.owl" % model_dir + +owl_qry_example = "%s/monitoringQuery_example.owl" % model_dir diff --git a/Monitoring/MonitoringService/Resource/__init__.py b/Monitoring/MonitoringService/Resource/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/Monitoring/MonitoringService/Resource/__init__.py diff --git a/Monitoring/MonitoringService/Resource/interface.py b/Monitoring/MonitoringService/Resource/interface.py new file mode 100644 index 0000000..db7b2d3 --- /dev/null +++ b/Monitoring/MonitoringService/Resource/interface.py @@ -0,0 +1,78 @@ +''' +Created on Jul 11, 2012 + +@author: steger +''' +from Resource.resource import resource + +class interface(resource): + ''' + classdocs + ''' + UNDEFINED = 0 + INGRESS = 1 + EGRESS = 2 + + def __init__(self, name = None, resourceid = None): + resource.__init__(self, name, resourceid) + self._public = False + self._direction = self.UNDEFINED + self._iface = None + self._address = None + self._hostname = None + + def setvalues(self, ifacename, address, ispublic = False, direction = 0, hostname = None): + self.interface = ifacename + self.address = address + self.ispublic = ispublic + self.direction = direction + self.hostname = hostname + + @property + def ispublic(self): + if not self._iface: + raise Exception("No interface name defined yet for %s" % self.resourceid) + if not self._address: + raise Exception("No address defined yet for %s" % self.resourceid) + return self._public + @ispublic.setter + def ispublic(self, ispublic): + if isinstance(ispublic, bool): + self._public = ispublic + else: + self._public = False + + @property + def hostname(self): + return self._hostname + @hostname.setter + def hostname(self, hostname): + self._hostname = self.ipret(hostname) + + @property + def address(self): + return self._address + @address.setter + def address(self, address): + if isinstance(address, tuple): + self._address = address + else: + self._address = self.ipret(address) + + @property + def interface(self): + return self._iface + @interface.setter + def interface(self, iface): + self._iface = self.ipret(iface) + + @property + def direction(self): + return self._direction + @direction.setter + def direction(self, direction): + self._direction = direction & (self.INGRESS | self.EGRESS) + + @property + def isvalid(self): + return self.address and self.interface diff --git a/Monitoring/MonitoringService/Resource/link.py b/Monitoring/MonitoringService/Resource/link.py new file mode 100644 index 0000000..6e32dd9 --- /dev/null +++ b/Monitoring/MonitoringService/Resource/link.py @@ -0,0 +1,35 @@ +''' +Created on May 31, 2012 + +@author: steger +''' +from Resource.resource import resource +from Resource.node import node + +class link(resource): + def __init__(self, name = None, resourceid = None, source = None, destination = None): + resource.__init__(self, name, resourceid) + self._source = source + self._destination = destination + + @property + def source(self): + return self._source + @source.setter + def source(self, source): + if isinstance(source, node): + self._source = source + @source.deleter + def source(self): + self._source = None + + @property + def destination(self): + return self._destination + @destination.setter + def destination(self, destination): + if isinstance(destination, node): + self._destination = destination + @destination.deleter + def destination(self): + self._destination = None \ No newline at end of file diff --git a/Monitoring/MonitoringService/Resource/node.py b/Monitoring/MonitoringService/Resource/node.py new file mode 100644 index 0000000..5a65121 --- /dev/null +++ b/Monitoring/MonitoringService/Resource/node.py @@ -0,0 +1,48 @@ +''' +Created on May 31, 2012 + +@author: steger +''' +from Resource.resource import resource +from Resource.interface import interface + +class node(resource): + def __init__(self, name = None, resourceid = None): + resource.__init__(self, name, resourceid) + self._public = False + self._interfaces = {} + + @property + def ispublic(self): + if not len(self._interfaces): + raise Exception("No interfaces defined yet for %s" % self.resourceid) + return self._public + + def addinterface(self, iface): + if not isinstance(iface, interface): + raise Exception("Wrong resource type %s is not an interface" % iface) + self._interfaces[iface.interface] = iface + self._public |= iface.ispublic + + def interfaces(self): + for iface in self._interfaces.itervalues(): + if not iface.isvalid: + print "WW: invalid interface:", iface.resourceid + continue + yield iface.interface, iface.address, iface.ispublic, iface.hostname, iface.direction + + def get_ipaddress(self, interfacename): + for ifname, address, _, _, _ in self.interfaces(): + if ifname == interfacename: + return address + raise Exception("%s has no interface %s" % (self.resourceid, interfacename)) + + def get_hostname(self, interfacename): + for ifname, address, _, hostname, _ in self.interfaces(): + if ifname != interfacename: + continue + if hostname: + return hostname + else: + return address + raise Exception("%s has no interface %s" % (self.resourceid, interfacename)) diff --git a/Monitoring/MonitoringService/Resource/path.py b/Monitoring/MonitoringService/Resource/path.py new file mode 100644 index 0000000..6296bc4 --- /dev/null +++ b/Monitoring/MonitoringService/Resource/path.py @@ -0,0 +1,9 @@ +''' +Created on Jun 12, 2012 + +@author: steger +''' +from Resource.link import link + +class path(link): + pass \ No newline at end of file diff --git a/Monitoring/MonitoringService/Resource/resource.py b/Monitoring/MonitoringService/Resource/resource.py new file mode 100644 index 0000000..fb093bd --- /dev/null +++ b/Monitoring/MonitoringService/Resource/resource.py @@ -0,0 +1,44 @@ +''' +Created on May 31, 2012 + +@author: steger +''' + +class resource(object): + def __init__(self, name = None, resourceid = None): + self._name = name + self._resourceid = resourceid + + @staticmethod + def ipret(x): + if not x: + return None + x = str(x) + if len(x): + return x + else: + return None + + @property + def name(self): + if self._name is None: + raise Exception("resource name is not set") + return self._name + @name.setter + def name(self, name): + self._name = self.ipret(name) + @name.deleter + def name(self): + self._name = None + + @property + def resourceid(self): + if self._resourceid is None: + raise Exception("resource id is not set") + return self._resourceid + @resourceid.setter + def resourceid(self, resourceid): + self._resourceid = resourceid + @resourceid.deleter + def resourceid(self): + self._resourceid = None \ No newline at end of file diff --git a/Monitoring/MonitoringService/Resource/slice.py b/Monitoring/MonitoringService/Resource/slice.py new file mode 100644 index 0000000..002318f --- /dev/null +++ b/Monitoring/MonitoringService/Resource/slice.py @@ -0,0 +1,40 @@ +''' +Created on Oct 30, 2012 + +@author: steger +''' + +class slice_pointer(object): + ''' + classdocs + ''' + + def __init__(self, sliceid = None, slicename = ""): + ''' + Constructor + ''' + self._sliceid = sliceid + self._name = slicename + + @property + def sliceid(self): + if self._sliceid is None: + raise Exception("slice id is not set") + return self._sliceid + @sliceid.setter + def sliceid(self, sliceid): + self._sliceid = sliceid + @sliceid.deleter + def sliceid(self): + self._sliceid = None + + @property + def name(self): + return self._name + @name.setter + def name(self, name): + self._name = name + @name.deleter + def name(self): + self._name = "" + \ No newline at end of file diff --git a/Monitoring/MonitoringService/Semantics/FeatureModel.py b/Monitoring/MonitoringService/Semantics/FeatureModel.py new file mode 100644 index 0000000..e266ccb --- /dev/null +++ b/Monitoring/MonitoringService/Semantics/FeatureModel.py @@ -0,0 +1,202 @@ +''' +Created on Aug 10, 2011 + +@author: steger +''' +from DataProcessing.Parameter import ParameterList, Parameter + +class FeatureModel(object): + ''' + classdocs + ''' + typelookup = { + 'Integer': int, + 'Float': float, + 'String': str + } + + def __init__(self, dimensionmanager, unitmanager, ontology): + ''' + @summary: constructor + @param dimensionmanager: the container to form a cell's dimension + @type dimensionmanager: DimensionManager + @param unitmanager: the container to form a cell's unit + @type unitmanager: UnitManager + @param ontology: the basic knowledge + @type ontology: Ontology + ''' + self.ontology = ontology + self.dm = dimensionmanager + self.um = unitmanager + + def inferDomains(self): + ''' + @summary: extract the monitoring domains from the information model + @return: generator of the list of domains + @rtype: URIRef + ''' + for uri_domain, _, _ in self.ontology.triples((None, self.ontology.ns('rdf')['type'], self.ontology.ns('task')['MonitoringDomain'])): + yield uri_domain + + def inferFeatures(self): + ''' + @summary: extract the monitored features from the information model + @return: a generator of the list of (feature reference, name, resource type) tuples + @rtype: (URIRef, str, URIRef) + ''' + q = """ +SELECT ?feature ?name ?resource +WHERE { + ?feature a owl:NamedIndividual ; + a ?parent ; + feature:featureName ?name . + ?parent rdfs:subClassOf feature:MonitoredFeature . + ?resource feature:hasFeature ?feature +} + """ + for uri_feature, name, uri_resource in self.ontology.query(q): + yield uri_feature, str(name), uri_resource + + def inferObligatoryParametersOf(self, feature): + ''' + @summary: extract the parameter list for a given feature + @param feature: reference to the monitored feature + @type feature: URIRef + @return: an initialized list of the parameters for this feature + @rtype: ParameterList + ''' + q = """ +SELECT ?name ?ptype ?dim ?defval ?unit ?prefix +WHERE { + feature:%s feature:obligatoryParameter ?par . + ?par param:paramName ?name ; + param:hasType ?ptype ; + param:hasDimension ?dim . + OPTIONAL { + ?par param:paramValue ?defval . + OPTIONAL { + ?par param:hasUnit ?unit . + OPTIONAL { + ?par param:hasPrefix ?prefix . + } + } + } +} + """ % (self.ontology._tail(feature)) + paramlist = ParameterList() + for name, uri_ptype, uri_dim, default, uri_unit, uri_prefix in self.ontology.query(q): + p = self.translateParameter(str(name), uri_dim, uri_unit, uri_prefix, uri_ptype, default) + paramlist.append(p) + return paramlist + + def inferOptionalParametersOf(self, feature): + ''' + @summary: extract the parameter list for a given feature + @param feature: reference to the monitored feature + @type feature: URIRef + @return: an initialized list of the parameters for this feature + @rtype: ParameterList + ''' + q = """ +SELECT ?name ?ptype ?dim ?defval ?unit ?prefix +WHERE { + feature:%s feature:optionalParameter ?par . + ?par param:paramName ?name ; + param:hasType ?ptype ; + param:hasDimension ?dim . + OPTIONAL { + ?par param:paramValue ?defval . + OPTIONAL { + ?par param:hasUnit ?unit . + OPTIONAL { + ?par param:hasPrefix ?prefix . + } + } + } +} + """ % (self.ontology._tail(feature)) + paramlist = ParameterList() + for name, uri_ptype, uri_dim, default, uri_unit, uri_prefix in self.ontology.query(q): + p = self.translateParameter(str(name), uri_dim, uri_unit, uri_prefix, uri_ptype, default) + paramlist.append(p) + return paramlist + + def inferFeatureMonitoringParameters(self): + ''' + @summary: extract parameters declared for feature monitoring + @return: an iterator over parameters + @rtype: (parameter name, dimension, value, unit) + ''' + q = """ +SELECT ?name ?dim ?defval ?unit ?prefix +WHERE { + ?par a feature:FeatureMonitoringParameter ; + param:paramName ?name ; + param:hasDimension ?dim . + OPTIONAL { + ?par param:paramValue ?defval . + OPTIONAL { + ?par param:hasUnit ?unit . + OPTIONAL { + ?par param:hasPrefix ?prefix . + } + } + } +} + """ + for name, uri_dim, default, uri_unit, uri_prefix in self.ontology.query(q): +#FIXME: duplicate (similar thing in translateParameter!!! + d = self.dm[ self.ontology._tail(uri_dim) ] + if default is None: + yield str(name), d, "", d.unit + else: + if uri_unit is None: + if uri_prefix is None: + u = d.unit + else: + ref = "%s_%s" % (self.ontology._tail(uri_prefix), d.unit.reference) + u = self.um[ref] + else: + if uri_prefix is None: + u = self.um[ self.ontology._tail(uri_unit) ] + else: + ref = "%s_%s" % (self.ontology._tail(uri_prefix), self.ontology._tail(uri_unit)) + u = self.um[ref] + yield str(name), d, str(default), u + + def translateParameter(self, name, uri_dim, uri_unit, uri_prefix, uri_ptype, default = None): + ''' + @summary: helper method to instantiate a Parameter + @param name: the reference name of the parameter + @type name: str + @param uri_dim: the dimension of the parameter + @type uri_dim: URIRef + @param uri_unit: the unit of the parameter, if None we fall back to the unit of the dimension + @type uri_unit: URIRef + @param uri_prefix: accounts only if uri_unit is not None + @type uri_prefix: URIRef + @param uri_ptype: the type of the parameter to use for serialization + @type uri_ptype: URIRef + @param default: the parameter value to initialize with, if None, parameter won't hol a value + @type default: Literal + @return: a parameter + @rtype: Parameter + ''' + vt = self.typelookup[ self.ontology._tail(uri_ptype) ] + d = self.dm[ self.ontology._tail(uri_dim) ] + if default is None: + return Parameter(name = name, valuetype = vt, unitmanager = self.um, dimension = d) + else: + if uri_unit is None: + if uri_prefix is None: + u = d.unit + else: + ref = "%s_%s" % (self.ontology._tail(uri_prefix), d.unit.reference) + u = self.um[ref] + else: + if uri_prefix is None: + u = self.um[ self.ontology._tail(uri_unit) ] + else: + ref = "%s_%s" % (self.ontology._tail(uri_prefix), self.ontology._tail(uri_unit)) + u = self.um[ref] + return Parameter(name = name, valuetype = vt, unitmanager = self.um, dimension = d, default = (vt(default), u)) diff --git a/Monitoring/MonitoringService/Semantics/InformationModel.py b/Monitoring/MonitoringService/Semantics/InformationModel.py new file mode 100644 index 0000000..f43a425 --- /dev/null +++ b/Monitoring/MonitoringService/Semantics/InformationModel.py @@ -0,0 +1,109 @@ +''' +Created on Aug 10, 2011 + +@author: steger +''' +from rdflib import Graph, Namespace, URIRef, plugin +from rdflib.query import Processor, Result + +class IMError(Exception): + pass + +class Ontology(object): + + def __init__(self): + + plugin.register( + 'sparql', Processor, + 'rdfextras.sparql.processor', 'Processor') + plugin.register( + 'sparql', Result, + 'rdfextras.sparql.query', 'SPARQLQueryResult') + self._graph = Graph() + self._graph.bind('owl', Namespace("http://www.w3.org/2002/07/owl#")) + + def load(self, prefix, owl_url, namespace_url = None): + ''' + @summary: load owl file and bind name space + @param prefix: an abbreviation of the name space to be used in sparql queries + @type prefix: str + @param owl_url: the location of the owl document to load + @type owl_url: str + @param namespace_url: the name space if None a # is added to the owl_url + @type namespace_url: str or None + ''' + if namespace_url is None: + ns = Namespace("%s#" % namespace_url) + else: + ns = Namespace(namespace_url) + try: + self._graph += Graph().parse(source = owl_url) + #except URLError: + except: + raise IMError("URLError: Cannot read model %s" % owl_url) + try: + self._graph.bind(prefix, ns) + except: + pass + + @staticmethod + def _float(f): + if '.' in f or 'e' in f or 'E' in f: + return float(f) + else: + return int(f) + + @staticmethod + def ipret(x): + if not x: + return None + x = str(x) + if len(x): + return x + else: + return None + + @staticmethod + def _tail(uriref): + if not isinstance(uriref, URIRef): + raise IMError("Wrong uriref %s" % uriref) + return str(uriref).split("#")[-1] + + def query(self, query): + return self._graph.query(query, initNs = dict(self._graph.namespaces())) + + def triples(self, spo_tuple): + return self._graph.triples(spo_tuple) + + def ns(self, prefix): + for p, ns in self._graph.namespaces(): + if p == prefix: + return Namespace(ns) + raise IMError("Unknown prefix: %s" % prefix) + + @property + def ns_dict(self): + return dict(self._graph.namespaces()) + + def emptygraph(self): + g = Graph() + # bind name spaces + for prefix, ns in self.ns_dict.iteritems(): + g.bind(prefix, ns) + return g + + @property + def g(self): + g = Graph() + g += self._graph + for prefix, ns in self.ns_dict.iteritems(): + g.bind(prefix, ns) + return g + +# @property +# def graph(self): +# return self._graph + + def dump(self): + for t in self._graph.triples((None, None, None)): + print t diff --git a/Monitoring/MonitoringService/Semantics/Query.py b/Monitoring/MonitoringService/Semantics/Query.py new file mode 100644 index 0000000..0ffb758 --- /dev/null +++ b/Monitoring/MonitoringService/Semantics/Query.py @@ -0,0 +1,150 @@ +''' +Created on Feb 21, 2012 + +@author: steger +''' +from DataProcessing.Parameter import ParameterList +from Resource.resource import resource as coreresource +from DataProcessing.DataFormatter import JsonFormatter, DumbFormatter, Formatter + +class Query(object): + ''' + @summary: the common skeleton of all queries + ''' + pass + +class InformationSource(object): + ''' + @summary: represents a (feature, resource) pair, standing for what and where to measure + @ivar resource: pointer to the resource + @ivar feature: ponter to the feature + ''' + def __init__(self): + self._feature = None + self._resource = None + + @property + def resource(self): + return self._resource + @resource.setter + def resource(self, (resourceid, resource)): + if not isinstance(resource, coreresource): + raise Exception("%s is not a resource type" % resource) + self._resource = (resourceid, resource) + + @property + def feature(self): + return self._feature + @feature.setter + def feature(self, feature): + self._feature = feature + +class Identifier(object): + ''' + @summary: represents a task, an aggregate or condition identifier + @ivar sourceid: pointer to the task, aggregate or condition + @type sourceid: str + ''' + def __init__(self): + self._sourceid = None + + @property + def sourceid(self): + return self._sourceid + @sourceid.setter + def sourceid(self, sourceid): + self._sourceid = sourceid + +class SingleQuery(Query, InformationSource): + ''' + @summary: an extension of L{InformationSource} to store: + - the requested output format and + - some optional parameters L{ParameterList} + @ivar samplechain: data manipulation chain + @ivar formatter: a pointer to the requested formatter + @type formatter: L{Formatter} + @ivar paramlist: L{ParameterList} + ''' + def __init__(self): + InformationSource.__init__(self) + self._samplechain = None + self._formatter = None + self._parameters = ParameterList() + + @property + def samplechain(self): + return self._samplechain + @samplechain.setter + def samplechain(self, samplechain): + self._samplechain = samplechain + + @property + def formatter(self): + return self._formatter + @formatter.setter + def formatter(self, uri_formatter): + if str(uri_formatter).endswith("Formatter_JSON"): + self._formatter = JsonFormatter + elif str(uri_formatter).endswith("Formatter_CSV"): + self._formatter = DumbFormatter + elif issubclass(uri_formatter, Formatter): + self._formatter = uri_formatter + else: + raise Exception("%s is not a formatter type" % uri_formatter) + + @property + def paramlist(self): + return self._parameters + + def addParameter(self, parameter): + ''' + @summary: append a new parameter to the list + @param parameter: the parameter to append to the list + @type parameter: L{Parameter} + ''' + self._parameters.append(parameter) + +class SingleSampleQuery(SingleQuery, Identifier): + ''' + @summary: represents a (feature, resource, source identifier) triplet to fully identify a data generator + and binds a chain of operations to it. + The requested output format is also stored here + ''' + def __init__(self): + SingleQuery.__init__(self) + Identifier.__init__(self) + +class SingleConditionQuery(SingleQuery, Identifier): + def __init__(self): + SingleQuery.__init__(self) + Identifier.__init__(self) + +class QueryBundle(object): + ''' + @summary: represents a collection of SingleQueries + ''' + def __init__(self): + self.atoms = {} + + def __len__(self): + return len(self.atoms) + + def has_key(self, key): + return self.atoms.has_key(key) + + def __iter__(self): + for q in self.atoms.itervalues(): + yield q + + def getResource(self, resourceid): + for q in self: + if q.resource[0] == resourceid: + return q.resource[1] + return None + + def add(self, reference, q): + if self.atoms.has_key(reference): + raise Exception("Duplicate MonitoringQuery entry") + if not isinstance(q, Query): + raise Exception("Wrong type") + self.atoms[reference] = q diff --git a/Monitoring/MonitoringService/Semantics/QueryInterpreter.py b/Monitoring/MonitoringService/Semantics/QueryInterpreter.py new file mode 100644 index 0000000..28a2775 --- /dev/null +++ b/Monitoring/MonitoringService/Semantics/QueryInterpreter.py @@ -0,0 +1,308 @@ +''' +Created on Feb 21, 2012 + +@author: steger +''' + +from Semantics.Query import QueryBundle, SingleQuery, SingleSampleQuery,\ + SingleConditionQuery +from Resource.node import node +from Resource.interface import interface +from DataProcessing.Aggregator import Max, Min, Percentile, Mean, Deviation +from DataProcessing.Sampler import Tail, Head +from DataProcessing.Parameter import ParameterList +from DataProcessing.Bool import IsPositive, IsNegative + +class QueryInterpreter(object): + ''' + classdocs + ''' + samplesource = 'UnmodifiedExtractOfFeatureSamples' + lut_skeleton = { + 'Maximum': Max, + 'Minimum': Min, + 'Percentile': Percentile, + 'Average': Mean, + 'Variance': Deviation, + 'Tail': Tail, + 'Head': Head + } + lut_condition = { + 'IsPositive': IsPositive, + 'IsNegative': IsNegative, + #FIXME: IsNotNegative, IsNotPositive + 'AndExpression': '', + 'OrExpression': '', + } + + def __init__(self, model): + ''' + @summary: constructor + @param model: the task model to resolve the tools + @type model: TaskModel + ''' + self.model = model + + def getUnitOfDimension(self, ref_dim): + return self.model.dm[ref_dim].unit + + def getUnit(self, uri_prefix, uri_unit): + if uri_prefix is None: + uref = self.model._tail(uri_unit) + else: + uref = "%s_%s" % (self.model._tail(uri_prefix), self.model._tail(uri_unit)) + return self.um[uref] + + @property + def myns(self): + return self.model.ontology.ns_dict + + def inferInterfacesOf(self, qgraph, uri_node): + q = """ +SELECT ?ifin ?address ?unit ?prefix +WHERE { + <%s> core:hasInboundInterface ?ifin ; + core:hasOutboundInterface ?ifout . + ?ifin a core:Interface ; + core:hasIPv4Address ?addressobj . + ?ifout a core:Interface ; + core:hasIPv4Address ?addressobj . + ?addressobj a owl:NamedIndividual ; + a unit:IPAddress ; + unit:hasValue ?address . + OPTIONAL { + ?addressobj unit:hasUnit ?unit . + } + OPTIONAL { + ?addressobj unit:hasPrefix ?prefix . + } +} + """ % uri_node + for uri_ifin, address, uri_unit, uri_prefix in qgraph.query(q, initNs = self.myns): + name = self.model.ontology._tail(uri_ifin) + iface = interface(name, resourceid = uri_ifin) + if uri_unit is not None: + iface.address = str(address), self.getUnit(uri_prefix, uri_unit) + else: + iface.address = str(address), self.getUnitOfDimension('IPAddress') + iface.direction = iface.EGRESS | iface.INGRESS + #FIXME: this info should come from the model + iface.interface = "eth0" + iface.ispublic = True + yield iface + #TODO: similarly look up uni directional interfaces of the node and yield them as well + + def inferBundleQueries(self, qgraph): + ''' + @summary: + ''' + q = """ +SELECT ?query ?resource ?feature ?sample ?formatter +WHERE { + ?query a owl:NamedIndividual ; + a query:BundleQuery ; + feature:hasFeature ?feature ; + stat:hasSample ?sample ; + query:hasResource ?resource ; + query:hasFormatter ?formatter . +} + """ + Q = QueryBundle() + for uri_query, uri_resource, uri_feature, uri_sample, uri_formatter in qgraph.query(q, initNs = self.myns): + r = Q.getResource(uri_resource) + if r is None: + r = self.translateResource(qgraph, uri_resource) + sq = SingleQuery() + sq.feature = uri_feature + sq.resource = (uri_resource, r) + sq.formatter = uri_formatter + sq.samplechain = self.inferSampleChain(qgraph, uri_sample) + for p in self.inferParameters(qgraph, uri_query): + sq.addParameter(parameter = p) + Q.add(uri_query, sq) + return Q + + def inferSampleManipulationQueries(self, qgraph): + ''' + @summary: + ''' + q = """ +SELECT ?query ?resource ?feature ?sourceid ?sample ?formatter +WHERE { + ?query a owl:NamedIndividual ; + a query:SampleManipulationQuery ; + query:hasResource ?resource ; + feature:hasFeature ?feature ; + query:hasProcessid ?sourceid ; + stat:hasSample ?sample ; + query:hasFormatter ?formatter . +} + """ + Q = QueryBundle() + for uri_query, uri_resource, uri_feature, uri_sourceid, uri_sample, uri_formatter in qgraph.query(q, initNs = self.myns): + r = Q.getResource(uri_resource) + if r is None: + r = self.translateResource(qgraph, uri_resource) + aq = SingleSampleQuery() + aq.feature = uri_feature + aq.resource = (uri_resource, r) + aq.formatter = uri_formatter + aq.samplechain = self.inferSampleChain(qgraph, uri_sample) + aq.sourceid = str(uri_sourceid) + Q.add(uri_query, aq) + return Q + + def inferConditionQueries(self, qgraph): + ''' + @summary: + ''' + q = """ +SELECT ?query ?resource ?sourceid ?feature ?cond +WHERE { + ?query a owl:NamedIndividual ; + a query:ConditionQuery ; + query:hasResource ?resource ; + query:hasProcessid ?sourceid ; + feature:hasFeature ?feature ; + stat:hasCondition ?cond ; +} + """ + Q = QueryBundle() + for uri_query, uri_resource, uri_sourceid, uri_feature, uri_cond in qgraph.query(q, initNs = self.myns): + r = Q.getResource(uri_resource) + if r is None: + r = self.translateResource(qgraph, uri_resource) + C, op = self.inferCondition(qgraph, uri_cond) + print uri_query, uri_resource, uri_sourceid, uri_cond + + cq = SingleConditionQuery() + cq.feature = uri_feature + cq.resource = (uri_resource, r) + cq.operation = op + cq.conditiontype = C + cq.sourceid = str(uri_sourceid) + Q.add(uri_query, cq) + return Q + + def inferCondition(self, qgraph, uri_cond): + q = """ +SELECT ?what ?sample +WHERE { + <%s> a owl:NamedIndividual ; + a ?what ; + stat:hasSample ?sample ; +} + """ % uri_cond + for what, uri_sample in qgraph.query(q, initNs = self.myns): + tail = self.model.ontology._tail(what) + if tail in [ 'NamedIndividual' ]: + continue + C = self.lut_condition[tail] + if C in [IsPositive, IsNegative]: #FIXME: IsNotNegative, IsNotPositive + return C, self.inferLineraCombinedSample(qgraph, uri_sample) + else: + print "BALHE" + raise Exception("QI NOT IMPLMENTED") + + def inferLineraCombinedSample(self, qgraph, uri_sample): + q = """ +SELECT ?sample ?factor +WHERE { + <%s> a owl:NamedIndividual ; + a stat:LinearCombinedSample ; + stat:hasTerm ?term . + ?term stat:hasSample ?sample . + OPTIONAL { + ?term stat:hasScale ?factor . + } +} + """ % uri_sample + terms = [] + for uri_sample, factor in qgraph.query(q, initNs = self.myns): + try: + factor = float(factor) + except: + factor = 1 + op = self.inferSampleChain(qgraph, uri_sample) + terms.append( ( factor, op) ) + return terms + + def inferSampleChain(self, qgraph, uri_sample): + tail = self.model.ontology._tail(uri_sample) + if tail == self.samplesource: + return [] + q = """ +SELECT ?nextsample ?sampleop +WHERE { + <%s> a owl:NamedIndividual ; + stat:hasSample ?nextsample ; + a ?sampleop +} + """ % uri_sample + for uri_sample_next, uri_sampleop in qgraph.query(q, initNs = self.myns): + tail = self.model.ontology._tail(uri_sampleop) + if tail in [ 'NamedIndividual' ]: + continue + op = self.inferSampleChain(qgraph, uri_sample_next) + break + skeleton = self.lut_skeleton[tail] + parlist = ParameterList([ p for p in self.inferParameters(qgraph, uri_sample) ]) + op.append( (skeleton, parlist) ) + return op + + def inferParameters(self, qgraph, uri_query): + q = """ +SELECT ?name ?type ?dim ?defval ?unit ?prefix +WHERE { + <%s> param:hasParameter ?par . + ?par a owl:NamedIndividual ; + param:paramName ?name ; + param:hasType ?type ; + a ?dim . + OPTIONAL { + ?par unit:hasValue ?defval . + OPTIONAL { + ?par unit:hasUnit ?unit . + } + OPTIONAL { + ?par unit:hasPrefix ?prefix . + } + } +} + """ % uri_query + for uri_name, uri_type, uri_dim, uri_default, uri_unit, uri_prefix in qgraph.query(q, initNs = self.myns): + tail = self.model.ontology._tail(uri_dim) +#FIXME: query should include the filter, but rdflib has a bug and only the spelt out form would work +# FILTER ( ?dim != owl:NamedIndividual ) +# FILTER ( ?dim != query:QueryParameter ) +# FILTER ( ?dim != stat:SampleOperatorParameter ) +# +# like: +# FILTER ( ?dim != ) + if tail in [ 'QueryParameter', 'SOP_tail', 'SOP_head', 'SOP_order', 'NamedIndividual' ]: + continue + yield self.model.translateParameter(str(uri_name), uri_dim, uri_unit, uri_prefix, uri_type, uri_default) + + def translateResource(self, qgraph, uri_resource): + resource_name = self.model.ontology._tail(uri_resource) + q = """ +SELECT ?resourcetype +WHERE { + <%s> a owl:NamedIndividual ; + a core:Resource ; + a ?resourcetype ; +} + """ % uri_resource + for uri_rtype, in qgraph.query(q, initNs = self.myns): + tail = self.model.ontology._tail(uri_rtype) + if tail in [ 'Resource', 'NamedIndividual' ]: + continue + if tail == "Node": + r = node(name = resource_name, resourceid = uri_resource) + for iface in self.inferInterfacesOf(qgraph, uri_resource): + r.addinterface(iface) + return r + else: + print "WW: unhandled rtype", uri_rtype + continue + \ No newline at end of file diff --git a/Monitoring/MonitoringService/Semantics/TaskModel.py b/Monitoring/MonitoringService/Semantics/TaskModel.py new file mode 100644 index 0000000..7e0c8bf --- /dev/null +++ b/Monitoring/MonitoringService/Semantics/TaskModel.py @@ -0,0 +1,304 @@ +''' +Created on Aug 10, 2011 + +@author: steger +''' +from Credential.credentialtypes import UsernamePassword, UsernameRSAKey +from DataProcessing.Data import DataHeader, DataHeaderCell +from DataProcessing.Parameter import ParameterList, Parameter +from Driver.SOAPClient import SOAPClient +from Driver.SshExec import SshExec +from Driver.LocalExec import LocalExec +from Driver.REST import RESTDriver + +class TaskModelError(Exception): + pass + +class TaskModel(object): + ''' + classdocs + ''' + hooklookup = { + 'hasPreHook' : 'prehook', + 'hasStartHook' : 'starthook', + 'hasRetrieveHook' : 'retrievehook', + 'hasStopHook' : 'stophook', + 'hasPostHook' : 'posthook', + } + typelookup = { + 'Integer': int, + 'Float': float, + 'String': str + } + + def __init__(self, dimensionmanager, unitmanager, ontology): + ''' + @summary: constructor + @param dimensionmanager: the container to form a cell's dimension + @type dimensionmanager: DimensionManager + @param unitmanager: the container to form a cell's unit + @type unitmanager: UnitManager + @param ontology: the basic knowledge + @type ontology: Ontology + ''' + self.ontology = ontology + self.dm = dimensionmanager + self.um = unitmanager + + def inferDomains(self): + ''' + @summary: extract the monitoring domains from the information model + @return: generator of the list of domains + @rtype: URIRef + ''' + for uri_domain, _, _ in self.ontology.triples((None, self.ontology.ns('rdf')['type'], self.ontology.ns('task')['MonitoringDomain'])): + yield uri_domain + + def inferTasks(self, domain, feature): + ''' + @summary: provides a generator to crawl over the tasks that can measure a given feature in the given domain of interest + @param domain: domain of interest + @type domain: URIRef + @param feature: the feature to measure + @type feature: URIRef + @return: a generator of the list of (task reference, task name) pairs + @rtype: (URIRef, str) + ''' + q = """ +SELECT ?task ?name +WHERE { + ?task a owl:NamedIndividual ; + a task:MonitoringTask ; + task:name ?name ; + task:hasMonitoringDomain task:%s ; + task:hasOutputTableFormat ?data . +?data task:hasColumn ?col . +?col task:hasMonitoredFeature feature:%s +} + """ % (self.ontology._tail(domain), self.ontology._tail(feature)) + for uri_task, tname in self.ontology.query(q): + yield uri_task, str(tname) + + def inferCredentialOf(self, task): + ''' + @summary: extracts the set of acceptable credential templates the given task accepts + @param task: reference to the monitoring task + @type task: URIRef + @return: a set of an uninitialized Credential classes + @rtype: set(Credential) + @raise IMError: Unknown authentication type + ''' + creds = set() + for (_, _, auth) in self.ontology.triples((task, self.ontology.ns('task')['hasAuthenticationType'], None)): + if auth == self.ontology.ns('task')["UsernamePassword"]: + creds.add(UsernamePassword) + elif auth == self.ontology.ns('task')["UsernameRSAKey"]: + creds.add(UsernameRSAKey) + else: + raise TaskModelError("Unknown authentication type %s" % auth) + return creds + + def inferDriverOf(self, task): + ''' + @summary: extarcts the driver of the task + @param task: reference to the monitoring task + @type task: URIRef + @return: the appropriate driver class uninstantiated + @rtype: Driver + @raise IMError: Unknown driver type / hasDriver missing + ''' + try: + _, _, driver = self.ontology.triples((task, self.ontology.ns('task')['hasDriver'], None)).next() + if driver == self.ontology.ns('task')["SOAPClient"]: + return SOAPClient + elif driver == self.ontology.ns('task')["SSH"]: + return SshExec + elif driver == self.ontology.ns('task')["LocalExec"]: + return LocalExec + elif driver == self.ontology.ns('task')["REST"]: + return RESTDriver + else: + raise TaskModelError("Unknown driver type %s" % driver) + except StopIteration: + raise TaskModelError("hasDriver is missing for task %s" % task) + + def inferHookparametersOf(self, task): + ''' + @summary: extract the necessary control parameters for task initialization + @param task: reference to the monitoring task + @type task: URIRef + @return: a lookup table of arguments, which are passed to the Task object's prehook method as keyword arguments + @rtype: dict + ''' + q = """ +SELECT ?name ?value ?type +WHERE { + config:%s task:hasHookParameter ?p . + ?p param:paramName ?name ; + a owl:NamedIndividual ; + rdf:type task:HookParameter ; + unit:hasValue ?value ; + param:hasType ?type . +} + """ % (self.ontology._tail(task)) + d = {} + for pname, pvalue, ptype in self.ontology.query(q): + pname = str(pname) + if ptype == self.ontology.ns('param')["Integer"]: + d[pname] = int(pvalue) + elif ptype == self.ontology.ns('param')["Float"]: + d[pname] = float(pvalue) + else: + d[pname] = str(pvalue) + return d + + def inferHookdefinitionsOf(self, task): + ''' + @summary: extract the hook implementation details for task initialization + @param task: reference to the monitoring task + @type task: URIRef + @return: a lookup table of hook definitions + @rtype: dict + ''' + q = """ +SELECT ?rel ?value +WHERE { + config:%s ?rel ?h . + ?h task:hookCode ?value . +} + """ % (self.ontology._tail(task)) + d = {} + for hrel, hvalue in self.ontology.query(q): + hook = self.ontology._tail(uriref = hrel) + d[self.hooklookup[hook]] = str(hvalue).replace('\\n', '\n').replace('\\t', '\t').replace('\\\\', '\\').strip() + return d + + def inferDataheaderOf(self, task): + ''' + @summary: extract the data header declaration the for task + @param task: reference to the monitoring task + @type task: URIRef + @return: an initialized DataHeader instance + @rtype: DataHeader + ''' + q = """ +SELECT ?tablename ?colname ?dim ?feature ?unit ?prefix +WHERE { + config:%s task:hasOutputTableFormat ?hdr . + ?hdr task:name ?tablename . + ?hdr task:hasColumn ?col . + ?col task:name ?colname ; + a owl:NamedIndividual ; + a ?dim ; + task:sequenceNumber ?seqno . + { + ?dim rdfs:subClassOf unit:BaseDimension . + } UNION { + ?dim rdfs:subClassOf ?p . + ?p rdfs:subClassOf unit:DerivedDimension . + } + OPTIONAL { + ?col task:hasMonitoredFeature ?feature . + } + OPTIONAL { + ?col unit:hasUnit ?unit . + OPTIONAL { + ?col unit:hasPrefix ?prefix . + } + } +} +ORDER BY ?seqno + """ % (self.ontology._tail(task)) + datahdr = None + for tablename, colname, uri_dim, uri_feature, uri_unit, uri_prefix in self.ontology.query(q): + if datahdr is None: + datahdr = DataHeader(str(tablename)) + if uri_unit is None: + u = None + elif uri_prefix is None: + u = self.um[ self.ontology._tail(uri_unit) ] + else: + ref = "%s_%s" % (self.ontology._tail(uri_prefix), self.ontology._tail(uri_unit)) + u = self.um[ref] + d = self.dm[ self.ontology._tail(uri_dim) ] + if uri_feature is None: + cell = DataHeaderCell(name = str(colname), dimension = d, unit = u) + else: + cell = DataHeaderCell(name = str(colname), dimension = d, feature = uri_feature, unit = u) + datahdr.addColumn(cell) + return datahdr + + def inferParametersOf(self, task): + ''' + @summary: extract the parameter list for the given task + @param task: reference to the monitoring task + @type task: URIRef + @return: an initialized list of the parameters of the task + @rtype: ParameterList + ''' + q = """ +SELECT ?name ?ptype ?dim ?defval ?unit ?prefix +WHERE { + config:%s task:hasExecutionParameter ?par . + ?par param:paramName ?name ; + param:hasType ?ptype ; + a ?dim . + { + ?dim rdfs:subClassOf unit:BaseDimension . + } UNION { + ?dim rdfs:subClassOf ?p . + ?p rdfs:subClassOf unit:DerivedDimension . + } + OPTIONAL { + ?par unit:hasValue ?defval . + OPTIONAL { + ?par unit:hasUnit ?unit . + OPTIONAL { + ?par unit:hasPrefix ?prefix . + } + } + } +} + """ % (self.ontology._tail(task)) + paramlist = ParameterList() + for name, uri_ptype, uri_dim, default, uri_unit, uri_prefix in self.ontology.query(q): + p = self.translateParameter(str(name), uri_dim, uri_unit, uri_prefix, uri_ptype, default) + paramlist.append(p) + return paramlist + + def translateParameter(self, name, uri_dim, uri_unit, uri_prefix, uri_ptype, default = None): + ''' + @summary: helper method to instantiate a Parameter + @param name: the reference name of the parameter + @type name: str + @param uri_dim: the dimension of the parameter + @type uri_dim: URIRef + @param uri_unit: the unit of the parameter, if None we fall back to the unit of the dimension + @type uri_unit: URIRef + @param uri_prefix: accounts only if uri_unit is not None + @type uri_prefix: URIRef + @param uri_ptype: the type of the parameter to use for serialization + @type uri_ptype: URIRef + @param default: the parameter value to initialize with, if None, parameter won't hol a value + @type default: Literal + @return: a parameter + @rtype: Parameter + ''' + vt = self.typelookup[ self.ontology._tail(uri_ptype) ] + d = self.dm[ self.ontology._tail(uri_dim) ] + if default is None: + return Parameter(name = name, valuetype = vt, unitmanager = self.um, dimension = d) + else: + if uri_unit is None: + if uri_prefix is None: + u = d.unit + else: + ref = "%s_%s" % (self.ontology._tail(uri_prefix), d.unit.reference) + u = self.um[ref] + else: + if uri_prefix is None: + u = self.um[ self.ontology._tail(uri_unit) ] + else: + ref = "%s_%s" % (self.ontology._tail(uri_prefix), self.ontology._tail(uri_unit)) + u = self.um[ref] + return Parameter(name = name, valuetype = vt, unitmanager = self.um, dimension = d, default = (vt(default), u)) diff --git a/Monitoring/MonitoringService/Semantics/UnitModel.py b/Monitoring/MonitoringService/Semantics/UnitModel.py new file mode 100644 index 0000000..e09a22a --- /dev/null +++ b/Monitoring/MonitoringService/Semantics/UnitModel.py @@ -0,0 +1,398 @@ +''' +Created on Feb 12, 2012 + +@author: steger +''' +from DataProcessing.Prefix import PrefixManager +from DataProcessing.Unit import UnitManager +from DataProcessing.Dimension import DimensionManager +from DataProcessing.MeasurementLevel import lut_level + +class UnitModel(object): + ''' + @summary: an interface to infer prefix, unit and dimension related information from the model + ''' + + def __init__(self, ontology): + ''' + @summary: constructor + @param ontology: the basic knowledge + @type ontology: Ontology + ''' + self.ontology = ontology + self.pm = PrefixManager() + self.um = UnitManager() + self.dm = DimensionManager(self.um) + + # infer and store prefixes + for (p_reference, p_symbol, base, exponent) in self.inferPrefixes(): + self.pm.newPrefix( self.ontology._tail(p_reference), p_symbol, base, exponent ) + + # infer basic units + for u_reference, u_symbol in self.inferBaseUnits(): + self.storeBasicUnit(u_reference, u_symbol) + for u_reference, u_symbol, _, _ in self.inferPowerUnits(): + self.storeBasicUnit(u_reference, u_symbol) + for u_reference, u_symbol, _ in self.inferProductUnits(): + self.storeBasicUnit(u_reference, u_symbol) + for u_reference, u_symbol, derivedfrom, scale, offset in self.inferLinearTransformedUnits(): + self.storeLinearTransformedUnit(u_reference, u_symbol, derivedfrom, scale, offset) + for u_reference, u_symbol, derivedfrom, expr_fwd, expr_inv in self.inferRegexpTransformedUnits(): + uref = self.ontology._tail(u_reference) + ancestor = self.um[ self.ontology._tail(derivedfrom) ] + self.um.addRegexpTransformedUnit(uref, u_symbol, ancestor, expr_fwd, expr_inv) + + # infer dimensions + #FIXME: if there is a reference loop an error is raised... + for d_reference, u_reference, level in self.inferBaseDimensions(): + dref = self.ontology._tail(d_reference) + uref = self.ontology._tail(u_reference) + lref = self.ontology._tail(level) + level = lut_level[lref] + unit = self.um[uref] + self.dm.newBaseDimension(dref, dref, unit, level) + for d_reference, u_reference, d_derivedfrom in self.inferDifferenceDimensions(): + dref = self.ontology._tail(d_reference) + uref = self.ontology._tail(u_reference) + daref = self.ontology._tail(d_derivedfrom) + unit = self.um[uref] + derivedfrom = self.dm[daref] + self.dm.newDerivedDimension(dref, dref, unit, derivedfrom, self.dm.DifferenceDimension) + for d_reference, u_reference, d_derivedfrom, exponent in self.inferPowerDimensions(): + dref = self.ontology._tail(d_reference) + uref = self.ontology._tail(u_reference) + daref = self.ontology._tail(d_derivedfrom) + unit = self.um[uref] + derivedfrom = self.dm[daref] + self.dm.newDerivedDimension(dref, dref, unit, derivedfrom, self.dm.PowerDimension, exponent = exponent) + for d_reference, u_reference, d_derivedfrom in self.inferProductDimensions(): + dref = self.ontology._tail(d_reference) + uref = self.ontology._tail(u_reference) + unit = self.um[uref] + derivedfrom = tuple( self.dm[self.ontology._tail(x)] for x in d_derivedfrom ) + self.dm.newDerivedDimension(dref, dref, unit, derivedfrom, self.dm.ProductDimension) + for d_reference, u_reference, d_derivedfrom in self.inferRatioDimensions(): + dref = self.ontology._tail(d_reference) + uref = self.ontology._tail(u_reference) + daref = self.ontology._tail(d_derivedfrom) + unit = self.um[uref] + derivedfrom = self.dm[daref] + self.dm.newDerivedDimension(dref, dref, unit, derivedfrom, self.dm.RatioDimension) + + def storeBasicUnit(self, u_reference, u_symbol): + uref = self.ontology._tail(u_reference) + bu = self.um.newBasicUnit(uref, u_symbol) + for p_reference in self.inferPossiblePrefixesOf(u_reference): + p = self.pm[ self.ontology._tail(p_reference) ] + puref = "%s_%s" % (p.reference, uref) + symbol = "%s%s" % (p.symbol, bu.symbol) + self.um.addLinearTransformedUnit(puref, symbol, bu, p.scale) + + def storeLinearTransformedUnit(self, u_reference, u_symbol, derivedfrom, scale, offset): + uref = self.ontology._tail(u_reference) + ancestor = self.um[ self.ontology._tail(derivedfrom) ] + bu = self.um.addLinearTransformedUnit(uref, u_symbol, ancestor, scale, offset) + for p_reference in self.inferPossiblePrefixesOf(u_reference): + p = self.pm[ self.ontology._tail(p_reference) ] + puref = "%s_%s" % (p.reference, uref) + symbol = "%s%s" % (p.symbol, bu.symbol) + self.um.addLinearTransformedUnit(puref, symbol, bu, p.scale) + + def inferPrefixes(self): + ''' + @summary: iterate over all prefixes defined in the model. + @return: a generator of the prefix details: (reference, symbol, base, exponent) + @rtype: (URIRef, str, int, int) + @todo: in case the unit:base is not present in a Prefix individual, + we should fall back to the restriction on the base defined for the given sibling of the Prefix. + This sibling is referenced ?basegroup in the query. + ''' + q = """ +SELECT ?prefix ?symbol ?base ?exponent +WHERE { + ?prefix a owl:NamedIndividual ; + a ?basegroup ; + unit:exponent ?exponent ; + unit:base ?base . + ?basegroup rdfs:subClassOf unit:Prefix . + OPTIONAL { + ?prefix unit:symbol ?symbol . + } +} + """ + for uri_prefix, symbol, base, exponent in self.ontology.query(q): + if symbol is None: + yield uri_prefix, self.ontology._tail(uri_prefix), int(base), int(exponent) + else: + yield uri_prefix, str(symbol), int(base), int(exponent) + + def inferPrefixSymbolOf(self, prefixuri): + ''' + @summary: generates an short written form of a unit prefix if unit:symbol is present in the model, + otherwise an abbreviation is derived from the tail of the uri (the reference name to the individual). + @param prefixuri: the uri reference to the unit prefix + @type prefixuri: URIRef + @return: the short form + @rtype: str + ''' + try: + _, _, symbol = self.ontology.triples((prefixuri, self.ontology.ns('unit')['symbol'], None)).next() + return str(symbol) + except StopIteration: + return self.ontology._tail(prefixuri) + + + def inferBaseUnits(self): + ''' + @summary: iterate over all BaseUnits defined in the model. + @return: a generator of the unit details: (reference, symbol) + @rtype: (URIRef, str) + ''' + q = """ +SELECT ?unit ?symbol +WHERE { + ?unit a owl:NamedIndividual ; + a unit:BaseUnit . + OPTIONAL { + ?unit unit:symbol ?symbol . + } +} + """ + for uri_unit, symbol in self.ontology.query(q): + if symbol is None: + yield uri_unit, self.ontology._tail(uri_unit) + else: + yield uri_unit, str(symbol) + + def inferPowerUnits(self): + ''' + @summary: iterate over all PowerUnits defined in the model. + @return: a generator of the unit details: (reference, symbol, powerof, exponent) + @rtype: (URIRef, str, URIRef, int) + ''' + q = """ +SELECT ?unit ?symbol ?powerof ?exponent +WHERE { + ?unit a owl:NamedIndividual ; + a unit:PowerUnit ; + unit:exponent ?exponent ; + unit:derivedFrom ?powerof . + OPTIONAL { + ?unit unit:symbol ?symbol . + } +} + """ + for uri_unit, symbol, uri_powerof, exponent in self.ontology.query(q): + if symbol is None: + yield uri_unit, self.ontology._tail(uri_unit), uri_powerof, int(exponent) + else: + yield uri_unit, str(symbol), uri_powerof, int(exponent) + + def inferProductUnits(self): + ''' + @summary: iterate over all ProductUnits defined in the model. + @return: a generator of the unit details: (reference, symbol, productof) + @rtype: (URIRef, str, set(URIRef)) + ''' + q = """ +SELECT ?unit ?symbol ?productof +WHERE { + ?unit a owl:NamedIndividual ; + a unit:ProductUnit ; + unit:derivedFrom ?productof + OPTIONAL { + ?unit unit:symbol ?symbol . + } +} + """ + container = {} + for uri_unit, symbol, uri_productof in self.ontology.query(q): + if symbol is None: + key = uri_unit, self.ontology_tail(uri_unit) + else: + key = uri_unit, str(symbol) + if not container.has_key(key): + container[key] = set() + container[key].add(uri_productof) + for (uri_unit, symbol), productof in container.iteritems(): + yield uri_unit, symbol, productof + + def inferLinearTransformedUnits(self): + ''' + @summary: iterate over all LinearTransformedUnits defined in the model. + @return: a generator of the unit details: (reference, symbol, derivedfrom, scale, offset) + @rtype: (URIRef, str, URIRef, float, float) + ''' + q = """ +SELECT ?unit ?symbol ?scale ?offset ?derivedfrom +WHERE { + ?unit a owl:NamedIndividual ; + a unit:LinearTransformedUnit ; + unit:derivedFrom ?derivedfrom ; + unit:scale ?scale . + OPTIONAL { + ?unit unit:offset ?offset . + } + OPTIONAL { + ?unit unit:symbol ?symbol . + } +} + """ + for uri_unit, symbol, scale, offset, uri_derivedfrom in self.ontology.query(q): + if offset is None: + offset = 0 + else: + offset = self.ontology._float(offset) + if symbol is None: + yield uri_unit, self.ontology._tail(uri_unit), uri_derivedfrom, self.ontology._float(scale), offset + else: + yield uri_unit, str(symbol), uri_derivedfrom, self.ontology._float(scale), offset + + def inferRegexpTransformedUnits(self): + ''' + @summary: iterate over all RegexpTransformedUnits defined in the model. + @return: a generator of the unit details: (reference, symbol, derivedfrom, expr_fwd, expr_inv) + @rtype: (URIRef, str, URIRef, str, str) + ''' + q = """ +SELECT ?unit ?symbol ?derivedfrom ?fwd ?inv +WHERE { + ?unit a owl:NamedIndividual ; + a unit:RegexpTransformedUnit ; + unit:derivedFrom ?derivedfrom ; + unit:forwardExpression ?fwd ; + unit:inverseExpression ?inv . + OPTIONAL { + ?unit unit:symbol ?symbol . + } +} + """ + for uri_unit, symbol, uri_derivedfrom, expr_fwd, expr_inv in self.ontology.query(q): + if symbol is None: + yield uri_unit, self.ontology._tail(uri_unit), uri_derivedfrom, str(expr_fwd), str(expr_inv) + else: + yield uri_unit, str(symbol), uri_derivedfrom, str(expr_fwd), str(expr_inv) + + def inferPossiblePrefixesOf(self, uri_unit): + ''' + @summary: extract possible prefixes for the given unit + @param unit: reference to the unit + @type unit: URIRef + @return: a generator over the references of the possible unit prefixes + @rtype: URIRef + ''' + for _, _, uri_prefix in self.ontology.triples((uri_unit, self.ontology.ns('unit')['possiblePrefix'], None)): + yield uri_prefix + + def inferBaseDimensions(self): + ''' + @summary: extract BaseDimensions and their corresponding units from the model + @return: a generator of the BaseDimension details: (reference, unit, level) + @rtype: (URIRef, URIRef, str) + ''' + q = """ +SELECT ?dimension ?unit ?level +WHERE { + ?dimension rdfs:subClassOf unit:BaseDimension ; + rdfs:subClassOf ?constraint ; + rdfs:subClassOf ?level . + ?constraint owl:onProperty unit:defaultUnit ; + owl:hasValue ?unit . + FILTER regex(?level, "Level") . +} + """ + for uri_dimension, uri_unit, level in self.ontology.query(q): + yield uri_dimension, uri_unit, level + + def inferDifferenceDimensions(self): + ''' + @summary: extract DifferenceDimensions and their corresponding units from the model + @return: a generator of the DifferenceDimension details: (reference, unit, derivedfrom) + @rtype: (URIRef, URIRef, URIRef) + ''' + q = """ +SELECT ?dimension ?unit ?derivedFrom +WHERE { + ?dimension rdfs:subClassOf unit:DifferenceDimension ; + rdfs:subClassOf ?constraint1 ; + rdfs:subClassOf ?constraint2 . + ?constraint1 owl:onProperty unit:defaultUnit ; + owl:hasValue ?unit . + ?constraint2 owl:onProperty unit:derivedFrom ; + owl:onClass ?derivedFrom . +} + """ + for uri_dimension, uri_unit, uri_derivedfrom in self.ontology.query(q): + yield uri_dimension, uri_unit, uri_derivedfrom + + def inferPowerDimensions(self): + ''' + @summary: extract PowerDimensions and their corresponding units from the model + @return: a generator of the PowerDimension details: (reference, unit, derivedfrom, exponent) + @rtype: (URIRef, URIRef, URIRef, int) + ''' + q = """ +SELECT ?dimension ?unit ?derivedFrom ?exponent +WHERE { + ?dimension rdfs:subClassOf unit:PowerDimension ; + rdfs:subClassOf ?constraint1 ; + rdfs:subClassOf ?constraint2 ; + rdfs:subClassOf ?constraint3 . + ?constraint1 owl:onProperty unit:defaultUnit ; + owl:hasValue ?unit . + ?constraint2 owl:onProperty unit:derivedFrom ; + owl:onClass ?derivedFrom . + ?constraint3 owl:onProperty unit:exponent ; + owl:hasValue ?exponent . +} + """ + for uri_dimension, uri_unit, uri_derivedfrom, exponent in self.ontology.query(q): + yield uri_dimension, uri_unit, uri_derivedfrom, int(exponent) + + def inferProductDimensions(self): + ''' + @summary: extract ProductDimensions and their corresponding units from the model + @return: a generator of the ProductDimension details: (reference, unit, set of derivedfrom references) + @rtype: (URIRef, URIRef, tuple(URIRef)) + ''' + q = """ +SELECT ?dimension ?unit ?derivedFrom +WHERE { + ?dimension rdfs:subClassOf unit:ProductDimension ; + rdfs:subClassOf ?constraint1 ; + rdfs:subClassOf ?constraint2 . + ?constraint1 owl:onProperty unit:defaultUnit ; + owl:hasValue ?unit . + ?constraint2 owl:onProperty unit:derivedFrom ; + owl:onClass ?derivedFrom . +} + """ + container = {} + for uri_dimension, uri_unit, uri_derivedfrom in self.ontology.query(q): + if not container.has_key(uri_dimension): + container[uri_dimension] = (uri_unit, set()) + container[uri_dimension][1].add(uri_derivedfrom) + for uri_dimension, (uri_unit, set_derivedfrom) in container.iteritems(): + yield uri_dimension, uri_unit, tuple(set_derivedfrom) + + def inferRatioDimensions(self): + ''' + @summary: extract RatioDimensions and their corresponding units from the model + @return: a generator of the RatioDimension details: (reference, unit, derivedfrom) + @rtype: (URIRef, URIRef, URIRef) + ''' + q = """ +SELECT ?dimension ?unit ?derivedFrom +WHERE { + ?dimension rdfs:subClassOf unit:RatioDimension ; + rdfs:subClassOf ?constraint1 ; + rdfs:subClassOf ?constraint2 . + ?constraint1 owl:onProperty unit:defaultUnit ; + owl:hasValue ?unit . + ?constraint2 owl:onProperty unit:derivedFrom ; + owl:onClass ?derivedFrom . +} + """ + for uri_dimension, uri_unit, uri_derivedfrom in self.ontology.query(q): + yield uri_dimension, uri_unit, uri_derivedfrom + diff --git a/Monitoring/MonitoringService/Semantics/__init__.py b/Monitoring/MonitoringService/Semantics/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/Monitoring/MonitoringService/Semantics/__init__.py diff --git a/Monitoring/MonitoringService/Semantics/test.py b/Monitoring/MonitoringService/Semantics/test.py new file mode 100644 index 0000000..8784e44 --- /dev/null +++ b/Monitoring/MonitoringService/Semantics/test.py @@ -0,0 +1,336 @@ +''' +Created on Aug 10, 2011 + +@author: steger +''' +import unittest +from rdflib import Graph +from Example.Metrics import RoundTripDelay +from Example.Tools import sonomashortping +from DataProcessing.Parameter import ParameterList, Parameter +from Example.credentials import ple_credentials, fed_credentials +from DataProcessing.DataHeaderCell import CellRequestByName +from DataProcessing.DataError import SamplerError +from DataProcessing.DataReader import DataReader +from Service.mock_framework import Framework +from Example.Platforms import federation +from Semantics.FeatureModel import FeatureModel + +fw = Framework() +for platform, (_, config_owl) in federation.iteritems(): + plif = fw.add(platform, config_owl) + +class Test(unittest.TestCase): + + def setUp(self): + self.MS_planetlab = fw.getInterface(platform = 'PlanetLab').service + self.MS_federica = fw.getInterface(platform = 'FEDERICA').service + + self.substrate_pl = self.MS_planetlab.ontology.ns('task')['Substrate'] + self.slice_pl = self.MS_planetlab.ontology.ns('task')['Slice'] + dim_ipaddress = self.MS_planetlab.dm['IPAddress'] + node = ("157.181.175.243", self.MS_planetlab.um.ipv4dotted) + self.p_src_eltenode = Parameter(name = "SourceAddress", valuetype = str, unitmanager = self.MS_planetlab.um, dimension = dim_ipaddress, default = node) + node = ("147.102.22.66", self.MS_planetlab.um.ipv4dotted) + self.p_dst_ntuanode = Parameter(name = "DestinationAddress", valuetype = str, unitmanager = self.MS_planetlab.um, dimension = dim_ipaddress, default = node) + + self.substrate_fed = self.MS_federica.ontology.ns('task')['Substrate'] + self.slice_fed = self.MS_federica.ontology.ns('task')['Slice'] + dim_ipaddress = self.MS_federica.dm['IPAddress'] + node = ("194.132.52.2", self.MS_federica.um.ipv4dotted) #sw01.erl.de + self.p_src_fednode = Parameter(name = "SourceAddress", valuetype = str, unitmanager = self.MS_federica.um, dimension = dim_ipaddress, default = node) + node = ("194.132.52.4", self.MS_federica.um.ipv4dotted) #sw01.poz.pl + self.p_dst_fednode = Parameter(name = "DestinationAddress", valuetype = str, unitmanager = self.MS_federica.um, dimension = dim_ipaddress, default = node) + + self.substrate_pl = self.MS_planetlab.ontology.ns('task')['Substrate'] + self.slice_pl = self.MS_planetlab.ontology.ns('task')['Slice'] + + self.feat_task_pl = { + 'OnewayDelay': (['SONoMAChirp'], []), + 'RoundtripDelay': (['SONoMAPing'], ['sshpingSlice']), + 'AvailableBandwidth': ([], ['sshabSlice']), + 'AvailableMemory': (['sshMeminfo'], ['sshMeminfoSlice']), + 'FreeMemory': (['sshMeminfo'], ['sshMeminfoSlice']), + 'MemoryUtilization': (['sshMeminfo'], ['sshMeminfoSlice']), + 'Uptime': (['sshuptime'], ['sshuptimeSlice']), + 'CPULoad': (['sshcpuload'], ['sshcpuloadSlice']), + #'CPUCores': (['sshcpuload'], ['sshcpuloadSlice']), + 'CPUCores': (['sshcpuinfo2'], ['sshcpuinfo2Slice']), + 'CPUSockets': (['sshcpuinfo2'], ['sshcpuinfo2Slice']), + 'CPUUtilization': (['sshcpuload'], ['sshcpuloadSlice']), + 'FreeDiskSpace': (['sshdiskinfo'], ['sshdiskinfoSlice']), + 'UsedDiskSpace': (['sshdiskinfo'], ['sshdiskinfoSlice']), + 'DiskUtilization': (['sshdiskinfo'], ['sshdiskinfoSlice']), + 'LinkUtilization': ([], []), + } + + self.feat_task_fed = { +#FIXME: 3 items? +# 'OnewayDelay': (['hadesaggregates', 'hadesaggregates', 'hadesaggregates'], []), +# 'RoundtripDelay': ([], []), +# 'AvailableBandwidth': ([], []), +# 'AvailableMemory': (['sshMeminfo'], []), +# 'FreeMemory': (['sshMeminfo'], []), + 'MemoryUtilization': (['G3 memory utilization look up'], []), +# 'Uptime': (['sshuptime'], []), +# 'CPULoad': (['sshcpuload'], []), + #'CPUCores': ([], []), +# 'CPUCores': (['sshcpuload'], []), +# 'CPUSockets': ([], []), + 'CPUUtilization': (['G3 CPU utilization look up'], []), +# 'FreeDiskSpace': (['sshdiskinfo'], []), +# 'UsedDiskSpace': (['sshdiskinfo'], []), +# 'DiskUtilization': ([], []), + 'LinkUtilization': (['G3 link utilization look up'], []), + } + + dim_nameofsomething = self.MS_planetlab.dm['NameOfSomething'] + self.slicename = Parameter(name = "SliceName", valuetype = str, + unitmanager = self.MS_planetlab.um, dimension = dim_nameofsomething, + default = ('novi_novi', self.MS_planetlab.um.unitless)) + dim_countable = self.MS_planetlab.dm['Countable'] + self.count = Parameter(name = 'Count', valuetype = int, + unitmanager = self.MS_planetlab.um, dimension = dim_countable, + default = (5, self.MS_planetlab.um.piece)) + self.fm = FeatureModel(self.MS_planetlab.dm, self.MS_planetlab.um, self.MS_planetlab.ontology) + + def test_managers(self): + expect = 14 + infer = len(self.MS_planetlab.pm) + self.assertEqual(infer, expect, "Prefix: got %d expect %d" % (infer, expect)) + + expect = 10 + infer = [ s for _, s in self.MS_planetlab.unitmodel.inferBaseUnits() ] + self.assertEqual(expect, len(infer), "BaseUnit: expect %d, got %d\n%s" % (expect, len(infer), str(infer))) + + expect = 1 + infer = [ d for _, d, _ in self.MS_planetlab.unitmodel.inferProductUnits() ] + self.assertEqual(expect, len(infer), "ProductUnit: expect %d, got %d\n%s" % (expect, len(infer), str(infer))) + + expect = 1 + infer = [ d for _, d, _, _ in self.MS_planetlab.unitmodel.inferPowerUnits() ] + self.assertEqual(expect, len(infer), "PowerUnit: expect %d, got %d\n%s" % (expect, len(infer), str(infer))) + + expect = 12 + infer = [ d for _, d, _, _, _ in self.MS_planetlab.unitmodel.inferLinearTransformedUnits() ] + self.assertEqual(expect, len(infer), "LinearTransformedUnit: expect %d, got %d\n%s" % (expect, len(infer), str(infer))) + + expect = 2 + infer = [ d for _, d, _, _, _ in self.MS_planetlab.unitmodel.inferRegexpTransformedUnits() ] + self.assertEqual(expect, len(infer), "RegexpTransformedUnit: expect %d, got %d\n%s" % (expect, len(infer), str(infer))) + + expect = 8 + infer = [ d for d, _, _ in self.MS_planetlab.unitmodel.inferBaseDimensions() ] + self.assertEqual(expect, len(infer), "BaseDimension: expect %d, got %d\n%s" % (expect, len(infer), str(infer))) + + expect = 1 + infer = [ d for d, _, _ in self.MS_planetlab.unitmodel.inferDifferenceDimensions() ] + self.assertEqual(expect, len(infer), "DifferenceDimension: expect %d, got %d\n%s" % (expect, len(infer), str(infer))) + + expect = 1 + infer = [ d for d, _, _, _ in self.MS_planetlab.unitmodel.inferPowerDimensions() ] + self.assertEqual(expect, len(infer), "PowerDimension: expect %d, got %d\n%s" % (expect, len(infer), str(infer))) + + expect = 1 + infer = [ d for d, _, _ in self.MS_planetlab.unitmodel.inferProductDimensions() ] + self.assertEqual(expect, len(infer), "ProductDimension: expect %d, got %d\n%s" % (expect, len(infer), str(infer))) + + expect = 4 + infer = [ d for d, _, _ in self.MS_planetlab.unitmodel.inferRatioDimensions() ] + self.assertEqual(expect, len(infer), "RatioDimension: expect %d, got %d\n%s" % (expect, len(infer), str(infer))) + + + NS = self.MS_planetlab.ontology.ns('unit') + for expect, uri in [(4, NS['second']), (7, NS['Byte']), (3, NS['bit']), (1, NS['unixtimestamp'])]: + infer = [s for s in self.MS_planetlab.unitmodel.inferPossiblePrefixesOf(uri)] + self.assertEqual(expect, len(infer), "inferPossiblePrefixesOf: expect %d, got %d\n%s" % (expect, len(infer), str(infer))) + + + + def test_IM_domainsfeatures(self): + fm = self.fm + + expect = set(['Slice', 'Substrate']) + infer = set([ self.MS_planetlab.ontology._tail(x) for x in fm.inferDomains() ]) + self.assertEqual(expect, infer, "inferDomains: expect %d, got %d\n%s" % (len(expect), len(infer), str(infer))) + + expect = len(self.feat_task_pl) + infer = [ x for x in fm.inferFeatures()] + self.assertEqual(expect, len(infer), "inferFeatures (PL): expect %d, got %d\n%s" % (expect, len(infer), str(infer))) + + def test_IM_featparameters(self): + feature = self.MS_planetlab.ontology.ns('feature')['RoundtripDelay'] + expect = RoundTripDelay.p_obligatory + infer = self.fm.inferObligatoryParametersOf(feature) + self.assertEqual(len(expect), len(infer), "obligatory parameters for %s differ expect: %s got: %s" % (feature, expect.parameter_names(), infer.parameter_names())) + for k, p in expect.parameter.iteritems(): + inf_v = infer.parameter[k].value + exp_v = p.value + inf_v = (inf_v[0], str(inf_v[1])) + exp_v = (exp_v[0], str(exp_v[1])) + self.assertEqual(inf_v, exp_v, "Parameter value differ %s expect:\n%s\ngot:\n%s" % (k, exp_v, inf_v)) + + + def test_IM_task(self): + for feat, (t_subst, t_slice) in self.feat_task_pl.iteritems(): + feature = self.MS_planetlab.ontology.ns('feature')[feat] + infer_t_subst = [ name for _, name in self.MS_planetlab.taskmodel.inferTasks(self.substrate_pl, feature)] + infer_t_slice = [ name for _, name in self.MS_planetlab.taskmodel.inferTasks(self.slice_pl, feature)] + self.assertEqual(infer_t_subst, t_subst, "(PL) feature: %s searchtask (substrate): expect %s, got %s" % (feat, t_subst, infer_t_subst)) + self.assertEqual(infer_t_slice, t_slice, "(PL) feature: %s searchtask (slice): expect %s, got %s" % (feat, t_slice, infer_t_slice)) + + for feat, (t_subst, t_slice) in self.feat_task_fed.iteritems(): + feature = self.MS_federica.ontology.ns('feature')[feat] + infer_t_subst = [ name for _, name in self.MS_federica.taskmodel.inferTasks(self.substrate_fed, feature)] + infer_t_slice = [ name for _, name in self.MS_federica.taskmodel.inferTasks(self.slice_fed, feature)] + self.assertEqual(infer_t_subst, t_subst, "(FED) feature: %s searchtask (substrate): expect %s, got %s" % (feat, t_subst, infer_t_subst)) + self.assertEqual(infer_t_slice, t_slice, "(FED) feature: %s searchtask (slice): expect %s, got %s" % (feat, t_slice, infer_t_slice)) + + + task = self.MS_planetlab.ontology.ns('config')['T_SONoMAPing'] + infer = self.MS_planetlab.taskmodel.inferCredentialOf(task) + expect = set(sonomashortping.authtype) + self.assertEqual(infer, expect, "credentials differ expect: %s got: %s" % (expect, infer)) + + infer = self.MS_planetlab.taskmodel.inferDriverOf(task) + expect = sonomashortping.driver + self.assertEqual(infer, expect, "drivers differ expect: %s got: %s" % (expect, infer)) + + infer = self.MS_planetlab.taskmodel.inferHookparametersOf(task) + expect = sonomashortping.kwargs + self.assertEqual(infer, expect, "hook parameters differ expect: %s got: %s" % (expect, infer)) + + H = self.MS_planetlab.taskmodel.inferHookdefinitionsOf(task) + for k, h in H.iteritems(): + exp = sonomashortping.hooks[k].strip() + h = h.strip() + self.assertEqual(h, exp, "%s hook differs\nexpect:\n%s\ngot:\n%s" % (k, exp, h)) + + #TODO: check feature equality + infer = [ (c.name, str(c._unit), str(c._dimension)) for c in self.MS_planetlab.taskmodel.inferDataheaderOf(task) ] + expect = [ (c.name, str(c._unit), str(c._dimension)) for c in sonomashortping.dataheaderdeclaration ] + self.assertEqual(infer, expect, "output header declarations differ expect:\n%s\ngot:\n%s" % (expect, infer)) + + infer = self.MS_planetlab.taskmodel.inferParametersOf(task) + expect = sonomashortping.parameters + n_inf, n_exp = set(infer.parameter_names()), set(expect.parameter_names()) + self.assertEqual(n_inf, n_exp, "runtime parameters differ expect: %s got: %s" %(n_exp, n_inf)) + for k, p in expect.parameter.iteritems(): + inf_v = infer.parameter[k].value + exp_v = p.value + if exp_v is None: + self.assertFalse(inf_v, "Expected uninitialized value, got %s" % inf_v) + else: + inf_v = (inf_v[0], str(inf_v[1])) + exp_v = (exp_v[0], str(exp_v[1])) + self.assertEqual(inf_v, exp_v, "Parameter value differ %s expect:\n%s\ngot:\n%s" % (k, exp_v, inf_v)) + + + def test_taskBYuri(self): + # PlanetLab + cases = { + 'T_SSHPingSlice': [self.p_src_eltenode, self.slicename, self.count, self.p_dst_ntuanode], + 'T_SSHMemInfo': [self.p_src_eltenode], + 'T_SSHMemInfoSlice': [self.p_src_eltenode, self.slicename], + 'T_SSHCPULoad': [self.p_src_eltenode], + 'T_SSHCPULoadSlice': [self.p_src_eltenode, self.slicename], + 'T_SSHUptime': [self.p_src_eltenode], + 'T_SSHUptimeSlice': [self.p_src_eltenode, self.slicename], + 'T_SSHDiskInfo': [self.p_src_eltenode], + 'T_SSHDiskInfoSlice': [self.p_src_eltenode, self.slicename], + 'T_SSHCPUInfo': [self.p_src_eltenode], + 'T_SSHCPUInfoSlice': [self.p_src_eltenode, self.slicename], +# 'T_SONoMAPing': [self.p_src_eltenode, self.p_dst_ntuanode], + } + for l,p in cases.iteritems(): + task_uri = self.MS_planetlab.ontology.ns('config')[l] + print 1, task_uri + _, task = self.MS_planetlab.newTask(task = task_uri, + cred = ple_credentials, + resource = None, + parameters = ParameterList(p)) +# print task_uri, ParameterList(p) + task.enable() + task.dataAdded.wait( 15 ) + self.assertGreater(len(task.data), 0, "measurement %s yielded empty result" % l) +# print task.data._rawrecords + + def test_taskBYuri2(self): + # FEDERICA + cases = { +# 'T_SSHPingSlice': [self.p_src_eltenode, self.slicename, self.count, self.p_dst_ntuanode], +# 'T_SSHMemInfo': [self.p_src_eltenode], +# 'T_SSHMemInfoSlice': [self.p_src_eltenode, self.slicename], +# 'T_SSHCPULoad': [self.p_src_eltenode], +# 'T_SSHCPULoadSlice': [self.p_src_eltenode, self.slicename], +# 'T_SSHUptime': [self.p_src_eltenode], +# 'T_SSHUptimeSlice': [self.p_src_eltenode, self.slicename], +# 'T_SSHDiskInfo': [self.p_src_eltenode], +# 'T_SSHDiskInfoSlice': [self.p_src_eltenode, self.slicename], +# 'T_SSHCPUInfo': [self.p_src_eltenode], +# 'T_SSHCPUInfoSlice': [self.p_src_eltenode, self.slicename], +# 'T_SONoMAPing': [self.p_src_eltenode, self.p_dst_ntuanode], +# 'T_hadesaggregate': [self.p_src_fednode, self.p_dst_fednode], + 'T_G3linkutil': [self.p_src_fednode, self.p_dst_fednode], + 'T_G3CPUutil': [self.p_src_fednode], + 'T_G3MEMutil': [self.p_src_fednode], + } + for l,p in cases.iteritems(): + task_uri = self.MS_federica.ontology.ns('config')[l] + _, task = self.MS_federica.newTask(task = task_uri, + cred = fed_credentials, + resource = None, + parameters = ParameterList(p)) +# print task_uri, ParameterList(p) + task.enable() + task.dataAdded.wait( 15 ) + self.assertGreater(len(task.data), 0, "measurement %s yielded empty result" % l) +# print task.data._data._rawrecords + + + def test_owlexamples(self): + doc = "../../information-model/monitoring-model/monitoringQuery_example.owl" + g = Graph() + g.parse(source = doc) + qdict = self.MS_planetlab.QI.inferBundleQueries(qgraph = g) + self.assertTrue(len(qdict), "Got empty query") + for q in qdict: + domain = self.MS_planetlab.ontology.ns('task')['Substrate'] + taskgen = self.MS_planetlab.taskmodel.inferTasks(domain, q.feature) + #we are ugly here: use the first tool + for task_uri, _ in taskgen: break + _, task = self.MS_planetlab.newTask(task = task_uri, + cred = ple_credentials, + resource = q.resource[1], + parameters = q.paramlist) + del task.strategy # make sure STRAT_ONDEMAND + task.enable() + task.dataAdded.wait( 15 ) + task.dataAdded.clear() + if q.samplechain: + flow = [] + for skeleton, parlist in q.samplechain: + flow.append((skeleton, parlist.formkeyvaldict())) + _, A = self.MS_planetlab.am.newAggregator(task.datasource, CellRequestByName(name = 'Free Memory'), flow) + while True: + try: + s, a = A.data._rawrecords[0] + self.assertEqual(s, len(task.data), "inconsistency in length len(data)=%d, max of %d samples?" % (len(task.data), s)) + R = DataReader(datasource = task.datasource) + R.extract(cellrequest = [CellRequestByName(name = 'Free Memory')]) + expect = max( [ float(x) for x, in R ] ) + self.assertEqual(expect, a, "inconsistency in aggregare %f <> %f" % (expect, a)) + break + except SamplerError: + print "MEASURE SOME MORE ..." + task.disable() + task.enable() + task.dataAdded.wait( 15 ) + task.dataAdded.clear() + self.assertGreater(len(task.data), 0, "measurement yielded empty result") + + + +if __name__ == "__main__": + #import sys;sys.argv = ['', 'Test.test_IM_domainsfeatures'] + unittest.main() diff --git a/Monitoring/MonitoringService/Service/MonitoringService.py b/Monitoring/MonitoringService/Service/MonitoringService.py new file mode 100644 index 0000000..9b1f87d --- /dev/null +++ b/Monitoring/MonitoringService/Service/MonitoringService.py @@ -0,0 +1,342 @@ +''' +Created on Mar 22, 2012 + +@author: steger +''' +from time import sleep +from Semantics.InformationModel import Ontology +from Semantics.UnitModel import UnitModel +from Semantics.TaskModel import TaskModel +from Semantics.QueryInterpreter import QueryInterpreter +from Task.Task import SubtaskManager, TaskError, STRAT_PERIODICAL,\ + STRAT_ONDEMAND +from DataProcessing.Parameter import ParameterList +from Resource.node import node +from DataProcessing.AggregatorManager import AggregatorManager +from paramiko.ssh_exception import BadAuthenticationType +import logging +from StringIO import StringIO +from DataProcessing.DataFormatter import JsonFormatter +from DataProcessing.DataHeaderCell import CellRequestByFeature,\ + CellRequestByName +from DataProcessing.DataError import SamplerError +from Example.model import owl_unit, owl_param, owl_features, owl_task, owl_query,\ + owl_stat, owl_core +from Service.WatchdogManager import WatchdogManager + +ontology = { + 'unit': (owl_unit, "http://fp7-novi.eu/unit.owl#"), + 'param': (owl_param, "http://fp7-novi.eu/monitoring_parameter.owl#"), + 'feature': (owl_features, "http://fp7-novi.eu/monitoring_features.owl#"), + 'task': (owl_task, "http://fp7-novi.eu/monitoring_task.owl#"), + 'query': (owl_query, "http://fp7-novi.eu/monitoring_query.owl#"), + 'stat': (owl_stat, 'http://fp7-novi.eu/monitoring_stat.owl#'), + 'core': (owl_core, "http://fp7-novi.eu/im.owl#"), +} + + + + +class MonitoringService(object): + ''' + classdocs + ''' + version = "0.0" + + def __str__(self): + return "NOVI Monitoring Service v%s @ %s" % (self.version, self.platform) + + @property + def platform(self): + return self._if.platform + + def __init__(self, interface, config_owl): + ''' + @summary: constructor + @param interface: + @type interface: MSInterface + @param config_owl: platform specific configuration model + @type config_owl: str + ''' + self._if = interface + self.logger = logging.getLogger(name = "NOVI.MS.%s" % self.platform) + self.log = self._if.log # to be removed + self.ontology = Ontology() + for prefix, (owl_url, ns) in ontology.iteritems(): + if owl_url is None: + continue + self.ontology.load(prefix, owl_url, ns) + self.ontology.load('config', config_owl, 'http://fp7-novi.eu/config.owl#') + self.unitmodel = UnitModel(self.ontology) + self.stm = SubtaskManager(self.um) + self.am = AggregatorManager() + self.wm = WatchdogManager(self.am) + + self.taskmodel = TaskModel(self.dm, self.um, self.ontology) + self.QI = QueryInterpreter(self.taskmodel) + + self._nextID = 0 + self.subtaskIDs = {} + self.aggregatorIDs = {} + self.watchdogIDs = {} + self.formatters = {} + + @property + def pm(self): return self.unitmodel.pm + + @property + def um(self): return self.unitmodel.um + + @property + def dm(self): return self.unitmodel.dm + + + def newProcessID(self): + try: + return "%s:process:%d" % (self.platform, self._nextID) + finally: + self._nextID += 1 + + def newAggregateID(self): + try: + return "%s:aggregate:%d" % (self.platform, self._nextID) + finally: + self._nextID += 1 + + def newWatchdogID(self): + try: + return "%s:watchdog:%d" % (self.platform, self._nextID) + finally: + self._nextID += 1 + + + def measure(self, credential, query): + #TODO: docs + ''' + ''' + g = self.ontology.g + sio = StringIO(query) + g.parse(source = sio) + responses = [] + errors = [] + queries = self.QI.inferBundleQueries(qgraph = g) + self.log(shortmsg = "measurements starting...", message = "Attempt to launch %d measurement threads" % len(queries)) + for q in queries: + feature_uri = q.feature + domain = self.ontology.ns('task')['Substrate'] + taskgen = self.taskmodel.inferTasks(domain, feature_uri) + no_tool = True + (resource_uri, resource) = q.resource + #we are ugly here: use the first tool + for task_uri, _ in taskgen: + print task_uri + no_tool = False + _, task = self.newTask(task = task_uri, cred = credential, resource = resource, parameters = q.paramlist) + if q.samplechain: + task.strategy = STRAT_PERIODICAL + task.enable() + # we apply some aggregation to the data + flow = [] + for skeleton, parlist in q.samplechain: + flow.append((skeleton, parlist.formkeyvaldict())) + _, A = self.am.newAggregator(task.datasource, CellRequestByFeature(feature = q.feature), flow) + formatter = JsonFormatter(datasource = A) + while True: + try: + task.dataAdded.wait( 15 ) + responses.append( formatter.serialize() ) + break + except SamplerError: + task.dataAdded.clear() + sleep(1) + else: + task.strategy = STRAT_ONDEMAND + task.enable() + task.dataAdded.wait( 15 ) + formatter = JsonFormatter(datasource = task.datasource) + formatter.reader.extract(cellrequest = [ + CellRequestByName(name = "Run"), + CellRequestByFeature(feature = feature_uri) + ]) + responses.append( formatter.serialize() ) + task.destroy() + if no_tool: + err_description = "No tools to measure %s @ %s" % (feature_uri, resource_uri) + errors.append(err_description) + self.log(shortmsg = "Limited result set", message = err_description) + useful_data = ",\n".join( responses ) + error_data = ",\n".join(errors) + if len(errors): + if len(useful_data): + response = "[%s,\n{\"errors\" : \"%s\"}]" % (useful_data, error_data) + else: + response = "[{\"errors\" : \"%s\"}]" % (error_data) + else: + response = "[%s]" % useful_data + return response + + def launchTasks(self, credential, query): + #TODO: many things in common with measure!!! + g = self.ontology.g + sio = StringIO(query) + g.parse(source = sio) + taskID = self.newProcessID() + idstore = self.subtaskIDs[taskID] = [] + formatters = self.formatters[taskID] = [] + for q in self.QI.inferBundleQueries(qgraph = g): + feature_uri = q.feature + domain = self.ontology.ns('task')['Slice'] + taskgen = self.taskmodel.inferTasks(domain, feature_uri) + #we are ugly here: use the first tool + for task_uri, _ in taskgen: + subtaskID, task = self.newTask(task = task_uri, cred = credential, resource = q.resource[1], parameters = q.paramlist) + task.strategy = STRAT_PERIODICAL + task.enable() + idstore.append(subtaskID) + f = q.formatter(datasource = task.datasource) + formatters.append(f) + if len(idstore): + print "KONYVELT", taskID + return taskID + else: + self.subtaskIDs.pop(taskID) + self.formatters.pop(taskID) + return None + + def newTask(self, task, cred, resource = None, parameters = ParameterList()): + ''' + @summary: initialize a Task object, which is referenced by a uri + @param task: the reference to the task description + @type task: URIRef + @param cred: an iterable over dictionaries, which are used as input parameters to initialize Credential templates passed to the Task object for authentication, authorization purposes + @type cred: dict generator + @param resource: the resource to measure + @type resource: resource or None + @param parameters: the parameter list to refresh the default parameters of the Task object + @type parameters: ParameterList + @return: the tuple of taskID and the initialized measurement Task object + @rtype: int, Task + ''' + name = self.ontology._tail(task) + credset = self.taskmodel.inferCredentialOf(task) + driver = self.taskmodel.inferDriverOf(task) + hdr = self.taskmodel.inferDataheaderOf(task) + hooks = self.taskmodel.inferHookdefinitionsOf(task) + hookpar = self.taskmodel.inferHookparametersOf(task) + taskparameters = self.taskmodel.inferParametersOf(task) + + taskparameters.update_by_list(parameters) + + #TODO: maybe better push resource to the Task as an argument + if isinstance(resource, node): + addr, unit = resource.get_ipaddress("eth0") + taskparameters.update("SourceAddress", addr, unit) + else: + print "EEEEE unhandled resource", resource +# print taskparameters + + while len(credset): + ct = credset.pop() + for c in cred: + try: + credential = ct(**c) + except: + # credential mismatch go on with the next + continue + try: + return self.stm.generate(name = name, driver = driver, dataheader = hdr, + hookimplementations = hooks, parameters = taskparameters, credential = credential, **hookpar) + except BadAuthenticationType: + pass + raise TaskError("Cannot initialize the Task with the credential set provided for %s" % name) + + def delTask(self, taskidentifier): + self.stm.pop( taskidentifier ) + + def getTask(self, taskidentifier): + return self.stm[ taskidentifier ] + + def attachAggregators(self, credential, query): + g = self.ontology.g + sio = StringIO(query) + g.parse(source = sio) + aggregatorID = self.newAggregateID() + idstore = self.aggregatorIDs[aggregatorID] = [] + formatters = self.formatters[aggregatorID] = [] + + for q in self.QI.inferSampleManipulationQueries(qgraph = g): + _, sourcetype, _ = q.sourceid.split(':') + if sourcetype == 'process': + #FIXME: csak egy subprocessre muxik perpill + sourceid = self.subtaskIDs[q.sourceid][0] + + ds = self.stm[ sourceid ].datasource + elif sourcetype == 'aggregate': + sourceid = self.aggregatorIDs[q.sourceid][0] + + ds = self.am[ sourceid ] + else: + raise Exception("Unknown source type %s" % sourcetype) + cr = CellRequestByFeature(feature = q.feature) + aggID, A = self.am.newAggregator(dataSource = ds, cellrequest = cr, commandflow = q.samplechain) + idstore.append(aggID) + f = q.formatter(datasource = A) + formatters.append(f) + if len(idstore): + return aggregatorID + else: + self.aggregatorIDs.pop(aggregatorID) + self.formatters.pop(aggregatorID) + return None + + def delAggregator(self, aggregatoridentifier): + self.am.pop( aggregatoridentifier ) + + def getAggregator(self, aggregatoridentifier): + return self.am[ aggregatoridentifier ] + + def attachWatchdogs(self, credential, query): + g = self.ontology.g + sio = StringIO(query) + g.parse(source = sio) + watchdogID = self.newWatchdogID() + idstore = self.watchdogIDs[watchdogID] = [] + + for q in self.QI.inferConditionQueries(qgraph = g): + _, sourcetype, _ = q.sourceid.split(':') + if sourcetype == 'process': + #FIXME: csak egy subprocessre muxik perpill + sourceid = self.subtaskIDs[q.sourceid][0] + + ds = self.stm[ sourceid ].datasource + elif sourcetype == 'aggregate': + sourceid = self.aggregatorIDs[q.sourceid][0] + + ds = self.am[ sourceid ] + else: + raise Exception("Unknown source type %s" % sourcetype) + + #ITT A TENNIVALO + cr = CellRequestByFeature(feature = q.feature) + watchID, _ = self.wm.newConditional(dataSource = ds, cellrequest = cr, conditiontype = q.conditiontype, operation = q.operation) + + + + + idstore.append(watchID) + if len(idstore): + return watchdogID + else: + self.watchdogIDs.pop(watchdogID) + return None + + def delWatchdog(self, watchdogidentifier): + self.wm.pop( watchdogidentifier ) + + + def checkWatchdog(self, watchdogidentifier): + resp = [] + for watchID in self.watchdogIDs[watchdogidentifier]: + WD = self.wm[watchID] + resp.append("\'%s\': %s" % (WD.name, WD.value)) + return "{\n\t%s\n}" % ",\n\t".join(resp) diff --git a/Monitoring/MonitoringService/Service/WatchdogManager.py b/Monitoring/MonitoringService/Service/WatchdogManager.py new file mode 100644 index 0000000..e40f1c3 --- /dev/null +++ b/Monitoring/MonitoringService/Service/WatchdogManager.py @@ -0,0 +1,46 @@ +''' +Created on Dec 10, 2012 + +@author: steger +''' +from DataProcessing.LinearCombination import LinearCombination +from DataProcessing.Bool import IsPositive, IsNotPositive, IsNegative,\ + IsNotNegative + +class WatchdogManager(object): + def __init__(self, am): + self._id = 0; + self._conditionals = {} + self.am = am + self._dep = {} + + def newConditional(self, dataSource, cellrequest, conditiontype, operation): + deps = [] + if conditiontype in [ IsPositive, IsNegative, IsNotNegative, IsNotPositive ]: + lincomb = LinearCombination() + for factor, commandflow in operation: + Aid, A = self.am.newAggregator(dataSource, cellrequest, commandflow) + lincomb.addTerm(factor, A) + deps.append(Aid) + DS = conditiontype(lincomb) + self._id += 1 + self._conditionals[ self._id ] = DS + self._dep[ self._id ] = deps[:] + return self._id, DS + + def __getitem__(self, watchdogid): + try: + return self._conditionals[ watchdogid ] + except: + raise Exception("Watchdog with id %s not found" % watchdogid) + + def pop(self, watchdogid): + try: + self._conditionals.pop( watchdogid ) + deps = self._dep.pop(watchdogid) + while len(deps): + Aid = deps.pop() + self.am.pop(Aid) + except KeyError: + print "WW: Watchdog with id %s not found" % watchdogid + \ No newline at end of file diff --git a/Monitoring/MonitoringService/Service/__init__.py b/Monitoring/MonitoringService/Service/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/Monitoring/MonitoringService/Service/__init__.py diff --git a/Monitoring/MonitoringService/Service/interface.py b/Monitoring/MonitoringService/Service/interface.py new file mode 100644 index 0000000..2ba917d --- /dev/null +++ b/Monitoring/MonitoringService/Service/interface.py @@ -0,0 +1,344 @@ +''' +Created on 08.08.2011 + +@author: steger, jozsef +''' +from Service.MonitoringService import MonitoringService +import logging + +class InterfaceError(Exception): + pass + +#TODO: add and handle bindings at this level +class MSInterface(object): + ''' + @summary: Implements a thin service layer on top of the MonitoringService instance + to collect methods that need to be exported and mapped in the NOVI API. + It also provides a reference to the framework to be able to communicate with + remote MonitoringService instances. The log() method is a place holder + to sink information to be pushed in the NOVI UserFeedback service. + The emit() method is a place holder to sink signals to be pushed in the NOVI + Policy Service component installed on top of the same platform. + ''' + + def __init__(self, framework, reference, config_owl): + ''' + Constructor + @param framework: a service which provides getService() method to look up MonSrv instances of different reference + @type framework: Framework + @param reference: the name of the platform + @type reference: str + @param config_owl: platform specific configuration model + @type config_owl: str + ''' + self._framework = framework + self.platform = reference + self._ms = MonitoringService(self, config_owl) + self.logger = logging.getLogger(name = "NOVI.MSI.%s" % reference) + + @property + def service(self): + ''' + @return: the underlying monitoring service component + @rtype: MonitoringService + ''' + return self._ms + + @property + def proxy(self): + ''' + @return: a proxy service to look up the rest of the monitoring service components + @rtype: Framework + ''' + return self._framework + + def dispatchID(self, identifier): + ''' + @summary: this method finds the MonitoringService instance that is responsible for handling an identified Task or Aggregate + @param identifier: identifier of a task or aggregate, it follows the form: :: + @type identifier: string + @return: the monitoring service instance + @rtype: MonitoringService + ''' + try: + platform, _, _ = identifier.split(':') + if self.platform == platform: + return self.service + return self.framework.getService(platform) + except ValueError: + raise InterfaceError("Wrong identifier format") + + def log(self, shortmsg, message): + # overridden by the JAVA wrapper + self.logger.info("[%s] %s" % (shortmsg, message)) + + def emit(self, what): + # overridden by the JAVA wrapper + self.framework.getPolicyService(self.platform).trigger(what) + + # Test purpose function + def echo(self, platform): + ''' + @summary: An integration tester function (to be exported public) + @param platform: name of the platform + @type platform: string + @return: messages of the platforms taking part in the message flow + @rtype: string + ''' + self.logger.info("[echo] calling %s" % platform) + otherservice = self._framework.getInterface(platform).service + return "%s -> %s" % (str(self.service), str(otherservice)) + + + # Substrate monitoring function + def measure(self, credential, query): + ''' + @summary: Method to handle substrate monitoring queries (to be exported public) + @param credential: + @type credential: + @param query: an owl document containing several BundleQuery instances + @type query: string + @return: response to the query + @rtype: string + ''' + #TODO: split query and concatenate results + return self.service.measure(credential, query) + + # Slice monitoring functions + def sliceTasks(self, credential, query): + raise InterfaceError("sliceTasks() method is not implemented") + + def addTask(self, credential, query): + ''' + @summary: Method to start slice monitoring tasks (to be exported public) + @param credential: + @type credential: + @param query: an owl document containing several BundleQuery instances + @type query: string + @return: process identifier + @rtype: string + ''' + #TODO: investigate if the service instance under this interface should be the boss + return self.service.launchTasks(credential, query) + + def describeTaskData(self, credential, query): + ''' + @summary: Method to retrieve meta data of task data (to be exported public) + @param credential: + @type credential: + @param query: + @type query: string + @return: serialize the header of the data tables + @rtype: string + ''' + taskID = query + ms = self.dispatchID(identifier = taskID) + #TODO: move this in the MonitoringService + headers = map(lambda x: x.header(), ms.formatters[taskID]) + return "[%s]" % "\n,\n".join(headers) + + def fetchTaskData(self, credential, query): + ''' + @summary: Method to retrieve task data collected since last fetch or the start (to be exported public) + @param credential: + @type credential: + @param query: + @type query: string + @return: serialize the appended content of the data tables + @rtype: string + ''' + taskID = query + ms = self.dispatchID(identifier = taskID) + #TODO: move this in the MonitoringService + response = [] + try: + for f in ms.formatters[taskID]: + response.append( f.serialize() ) + except Exception, e: + print "EEE", e + pass + return "[%s]" % "\n,\n".join(response) + + def modifyTask(self, credential, query): + raise InterfaceError("modifyTask() method is not implemented") + + def removeTask(self, credential, query): + ''' + @summary: Method to remove a slice measurement task (to be exported public) + @param credential: + @type credential: + @param query: + @type query: string + ''' + taskID = query + ms = self.dispatchID(identifier = taskID) + #TODO: move this in the MonitoringService + try: + subtaskids = ms.subtaskIDs.pop(taskID) + ms.formatters.pop(taskID) + while len(subtaskids): + subtaskid = subtaskids.pop() + ms.delTask(taskidentifier = subtaskid) + except KeyError: + # the taskID does not belong to me + pass + + def enableTask(self, credential, query): + ''' + @summary: Method to enable a slice measurement task (to be exported public) + @param credential: + @type credential: + @param query: + @type query: string + ''' + taskID = query + ms = self.dispatchID(identifier = taskID) + try: + for subtaskid in ms.subtaskIDs[taskID]: + t = ms.getTask(taskidentifier = subtaskid) + t.enable() + except KeyError: + # the taskID does not belong to me + pass + + def disableTask(self, credential, query): + ''' + @summary: Method to disable a slice measurement task temporarily (to be exported public) + @param credential: + @type credential: + @param query: + @type query: string + ''' + taskID = query + ms = self.dispatchID(identifier = taskID) + try: + for subtaskid in ms.subtaskIDs[taskID]: + t = ms.getTask(taskidentifier = subtaskid) + t.disable() + except KeyError: + # the taskID does not belong to me + pass + + def getTaskStatus(self, credential, query): + ''' + @summary: Method to check the state of a slice measurement task (to be exported public) + @param credential: + @type credential: + @param query: + @type query: string + @return: True if the tasks are running + @rtype: boolean + ''' + taskID = query + ms = self.dispatchID(identifier = taskID) + try: + for subtaskid in ms.subtaskIDs[taskID]: + t = ms.getTask(taskidentifier = subtaskid) + if t.state == t.STATE_RUNNING: + return True + except KeyError: + # the taskID does not belong to me + pass + return False + + def addAggregator(self, credential, query): + ''' + @summary: Method to define new data manipulation on slice monitoring data (to be exported public) + @param credential: + @type credential: + @param query: an owl document containing several SampleManipulationQuery instances + @type query: string + @return: aggregator identifier + @rtype: string + ''' + #TODO: investigate if the service instance under this interface should be the boss + return self.service.attachAggregators(credential, query) + + def removeAggregator(self, credential, query): + ''' + @summary: Method to remove data manipulation on slice monitoring data (to be exported public) + @param credential: + @type credential: + @param query: + @type query: string + ''' + aggregatorID = query + ms = self.dispatchID(identifier = aggregatorID) + try: + aggregatorids = ms.aggregatorIDs.pop(aggregatorID) + ms.formatters.pop(aggregatorID) + while len(aggregatorids): + aggregatorid = aggregatorids.pop() + ms.delAggregator(aggregatorid) + except KeyError: + # the aggregatorID does not belong to me + pass + + def fetchAggregatorData(self, credential, query): + ''' + @summary: Method to refresh and serialize results of data manipulation on slice monitoring data (to be exported public) + @param credential: + @type credential: + @param query: + @type query: string + @return: result of aggregators + @rtype: string + ''' + aggregatorID = query + ms = self.dispatchID(identifier = aggregatorID) + response = [] + try: + for f in ms.formatters[aggregatorID]: + f.source.process() + print "ALMA", f.source + print "ALMA", f.source.source + print "ALMA", f.source.data + response.append( f.serialize() ) + except Exception, e: + print "EEE", e + pass + return "[%s]" % "\n,\n".join(response) + + def addWatchdog(self, credential, query): + ''' + @summary: + @param credential: + @type credential: + @param query: an owl document containing several SampleManipulationQuery instances + @type query: string + @return: watchdog identifier + @rtype: string + ''' + #TODO: investigate if the service instance under this interface should be the boss + return self.service.attachWatchdogs(credential, query) + + def removeCondition(self, credential, query): + ''' + @summary: Method to remove conditions bound to slice monitoring data (to be exported public) + @param credential: + @type credential: + @param query: + @type query: string + ''' + watchdogID = query + ms = self.dispatchID(identifier = watchdogID) + try: + watchdogids = ms.watchdogIDs.pop(watchdogID) + while len(watchdogids): + watchdogid = watchdogids.pop() + ms.delWatchdog(watchdogid) + except KeyError: + # the aggregatorID does not belong to me + pass + + def checkCondition(self, credential, query): + ''' + @summary: Method to examine a conditions bound to slice monitoring data (to be exported public) + @param credential: + @type credential: + @param query: + @type query: string + ''' + watchdogID = query + ms = self.dispatchID(identifier = watchdogID) + return ms.checkWatchdog(watchdogID) diff --git a/Monitoring/MonitoringService/Service/mock_framework.py b/Monitoring/MonitoringService/Service/mock_framework.py new file mode 100644 index 0000000..8a8874b --- /dev/null +++ b/Monitoring/MonitoringService/Service/mock_framework.py @@ -0,0 +1,284 @@ +''' +Created on Nov 20, 2012 + +@author: steger +''' +from Service.interface import MSInterface +import logging +from logging.handlers import TimedRotatingFileHandler +from os import path, unlink +from flask import Flask, request +from threading import Thread +from Example.Platforms import federation +from select import select +from sys import stdin +import httplib2 +from os import urandom + +#FIXME: DIRTY! +from Example.credentials import novisaCredentialIARGS +import traceback +import sys + +class FrameworkError(Exception): + pass + +class PolicyMock(object): + def __init__(self): + fn = "/tmp/ps.log" + if path.exists(fn): + unlink(fn) + hdlr = TimedRotatingFileHandler(filename = fn) + self.logger = logging.getLogger("NOVI.PS") + self.logger.setLevel(level = logging.DEBUG) + self.logger.addHandler(hdlr = hdlr) + + def trigger(self, what): + self.logger.info(what) + +class Framework(object): + ''' + This class mimics the integration framework. It helps to look up remote Monitoring Service instances + ''' + + def __init__(self): + ''' + Constructor + ''' + self._if = {} + self._pol = PolicyMock() + + def add(self, platform, config_owl): + fn = "/tmp/ms_%s.log" % platform + if path.exists(fn): + unlink(fn) + hdlr = TimedRotatingFileHandler(filename = fn) + l = logging.getLogger("NOVI.MS.%s" % platform) + l.setLevel(level = logging.DEBUG) + l.addHandler(hdlr = hdlr) + l = logging.getLogger("NOVI.MSI.%s" % platform) + l.setLevel(level = logging.DEBUG) + l.addHandler(hdlr = hdlr) + iface = MSInterface(self, platform, config_owl) + self._if[platform] = iface + return iface + + def getInterface(self, platform): + try: + return self._if[platform] + except: + print "EE: %s platform not found" % platform + raise FrameworkError + + def getPolicyService(self, platform): + return self._pol + + def serviceList(self): + return self._if.values() + +def app_launch(plif, port): + app = Flask(__name__) + app.secret_key = urandom(24) + t = Thread(target = run, args = (app, port)) + t.start() + + # these are here for test and control + @app.route("/", methods = ['GET']) + def hello(): + return "Hello world" + + @app.route('/shutdown', methods = ['POST']) + def shutdown(): + shutdown_server() + return 'Server shutting down...' + + + # these are the real service interfaces + @app.route("/echo", methods = ['POST']) + def echo(): + platform = request.form['platform'] + try: + return plif.echo(platform) + except Exception, e: + print "-"*60 + traceback.print_exc(file=sys.stdout) + print "="*60 + return "Error %s" % e + + @app.route("/measure", methods = ['POST']) + def measure(): + q = request.form['query'] + try: + return plif.measure(credential = [novisaCredentialIARGS], query = q) + except Exception, e: + print "-"*60 + traceback.print_exc(file=sys.stdout) + print "="*60 + return "Error %s" % e + + @app.route("/addTask", methods = ['POST']) + def addTask(): + try: + q = request.form['query'] + return plif.addTask(credential = [novisaCredentialIARGS], query = q) + except Exception, e: + print "-"*60 + traceback.print_exc(file=sys.stdout) + print "-"*60 + return "Error %s" % e + + @app.route("/fetchTaskData", methods = ['POST']) + def fetchTaskData(): + q = request.form['query'] + return plif.fetchTaskData(credential = [novisaCredentialIARGS], query = q) + + @app.route("/removeTask", methods = ['POST']) + def removeTask(): + try: + q = request.form['query'] + plif.removeTask(credential = [novisaCredentialIARGS], query = q) + except Exception, e: + print "Error %s" % e + return "OK" + + @app.route("/describeTaskData", methods = ['POST']) + def describeTaskData(): + q = request.form['query'] + return plif.describeTaskData(credential = [novisaCredentialIARGS], query = q) + + @app.route("/getTaskStatus", methods = ['POST']) + def getTaskStatus(): + try: + q = request.form['query'] + return str( plif.getTaskStatus(credential = [novisaCredentialIARGS], query = q) ) + except Exception, e: + print "-"*60 + traceback.print_exc(file=sys.stdout) + print "-"*60 + return "Error %s" % e + + @app.route("/enableTask", methods = ['POST']) + def enableTask(): + q = request.form['query'] + plif.enableTask(credential = [novisaCredentialIARGS], query = q) + return "OK" + + @app.route("/disableTask", methods = ['POST']) + def disableTask(): + q = request.form['query'] + plif.disableTask(credential = [novisaCredentialIARGS], query = q) + return "OK" + + @app.route("/addAggregator", methods = ['POST']) + def addAggregator(): + try: + q = request.form['query'] + return plif.addAggregator(credential = [novisaCredentialIARGS], query = q) + except Exception, e: + print "-"*60 + traceback.print_exc(file=sys.stdout) + print "-"*60 + return "Error %s" % e + + @app.route("/fetchAggregatorData", methods = ['POST']) + def fetchAggregatorData(): + q = request.form['query'] + return plif.fetchAggregatorData(credential = [novisaCredentialIARGS], query = q) + + @app.route("/removeAggregator", methods = ['POST']) + def removeAggregator(): + try: + q = request.form['query'] + plif.removeAggregator(credential = [novisaCredentialIARGS], query = q) + except Exception, e: + print "Error %s" % e + return "OK" + + @app.route("/addCondition", methods = ['POST']) + def addCondition(): + try: + q = request.form['query'] + return plif.addWatchdog(credential = [novisaCredentialIARGS], query = q) + except Exception, e: + print "-"*60 + traceback.print_exc(file=sys.stdout) + print "-"*60 + return "Error %s" % e + + @app.route("/removeCondition", methods = ['POST']) + def removeCondition(): + try: + q = request.form['query'] + plif.removeCondition(credential = [novisaCredentialIARGS], query = q) + except Exception, e: + print "Error %s" % e + return "OK" + + @app.route("/checkCondition", methods = ['POST']) + def checkCondition(): + try: + q = request.form['query'] + return plif.checkCondition(credential = [novisaCredentialIARGS], query = q) + except Exception, e: + print "-"*60 + traceback.print_exc(file=sys.stdout) + print "-"*60 + return "Error %s" % e + + + + return t + + +def run(app, port): + app.run(debug = False, port = port) + +def shutdown_server(): + func = request.environ.get('werkzeug.server.shutdown') + if func is None: + raise RuntimeError('Not running with the Werkzeug Server') + func() + +def emit(port): + try: + h = httplib2.Http(timeout = 10) + url = "http://localhost:%d/shutdown" % port + resp, _ = h.request(uri = url, method = "POST") + if resp.status != 200: + print "Service responded with status %s" % resp.status + except Exception, e: + print "Error contacting server @ localhost:%d: (%s)" % (port, e) + +def start_servers(): + fw = Framework() + t = [] + for platform, (port, config_owl) in federation.iteritems(): + plif = fw.add(platform, config_owl) + t.append( app_launch(plif, port) ) + return fw, t + +def stop_servers(t): + # POST the shutdown methods + for port, _ in federation.values(): + emit(port) + + # join threads + while len(t): + st = t.pop() + st.join() + +if __name__ == "__main__": + print "INIT" + # start services as separate threads + _, t = start_servers() + + # wait for a keyboard interrupt + print "PRESS ^C to stop" + while True: + try: + select([stdin],[],[]) + except KeyboardInterrupt: + break + + stop_servers(t) + print "OK" \ No newline at end of file diff --git a/Monitoring/MonitoringService/Service/test.py b/Monitoring/MonitoringService/Service/test.py new file mode 100644 index 0000000..1b6b78b --- /dev/null +++ b/Monitoring/MonitoringService/Service/test.py @@ -0,0 +1,442 @@ +''' +Created on Aug 10, 2011 + +@author: steger +''' +import unittest +from rdflib import Graph, Namespace, Literal +from Service.mock_framework import start_servers, stop_servers +import httplib2 +from Example.Platforms import federation +from urllib import urlencode +from time import sleep + +fw, t = start_servers() +from MonitoringService import ontology + +class Test(unittest.TestCase): + headers = {'Content-type': 'application/x-www-form-urlencoded'} + proxy = httplib2.Http(cache = "/tmp/ms_client_cache", timeout = 10) + + def setUp(self): + self.MSI_planetlab = fw.getInterface('PlanetLab') + self.PL_O = self.MSI_planetlab.service.ontology + NS = self.ns + self.S = NS('stat')['UnmodifiedExtractOfFeatureSamples'] + self.F = NS('query')['Formatter_JSON'] + sleep (1) + + def tearDown(self): + pass + + def test_echo(self): + p1 = "PlanetLab" + p2 = "FEDERICA" + + h = httplib2.Http(cache = "/tmp/ms_client_cache", timeout = 10) + url = "http://localhost:%d/echo" % federation[p1][0] + data = urlencode( {'platform': p2} ) + resp, response = h.request(uri = url, method = "POST", body = data, headers = self.headers) + self.assertTrue(resp.status == 200, "Service @%s responded with status %s" % (p1, resp.status)) + i, o = response.split("->") + got = (i.split("@")[-1].strip(), o.split("@")[-1].strip()) + expect = (p1, p2) + self.assertEquals(expect, got, "Echo reply differs from expected (%s): %s" % (expect, response)) + + data = urlencode( {'platform': p1} ) + resp, response = h.request(uri = url, method = "POST", body = data, headers = self.headers) + self.assertTrue(resp.status == 200, "Service @%s responded with status %s" % (p1, resp.status)) + i, o = response.split("->") + got = (i.split("@")[-1].strip(), o.split("@")[-1].strip()) + expect = (p1, p1) + self.assertEquals(expect, got, "Echo reply differs from expected (%s): %s" % (expect, response)) + + @staticmethod + def q_enc(q): + return urlencode( {'query': q} ) + + @property + def mns(self): + return Namespace("http://foo.bar/req.owl#") + + @property + def ns(self): + return self.PL_O.ns + + @property + def ns_type(self): + return self.ns('rdf')['type'] + + def measure(self, q, expect = 26): + url = "http://localhost:%d/measure" % federation['PlanetLab'][0] + data = self.q_enc(q) + resp, response = self.proxy.request(uri = url, method = "POST", body = data, headers = self.headers) + self.assertTrue(resp.status == 200, "Service responded with status %s" % (resp.status)) + self.tlen(response, expect) + return response + + def addtask(self, q): + url = "http://localhost:%d/addTask" % federation['PlanetLab'][0] + data = self.q_enc(q) + resp, response = self.proxy.request(uri = url, method = "POST", body = data, headers = self.headers) + self.assertTrue(resp.status == 200, "Service responded with status %s" % (resp.status)) + self.assertTrue(response.startswith('PlanetLab:process:'), "wrong process id %s" % response) + return response + + def fetchtaskdata(self, q, expect = 26): + url = "http://localhost:%d/fetchTaskData" % federation['PlanetLab'][0] + data = self.q_enc(q) + resp, response = self.proxy.request(uri = url, method = "POST", body = data, headers = self.headers) + self.assertTrue(resp.status == 200, "Service responded with status %s" % (resp.status)) + self.tlen(response, expect) + return response + + def deltask(self, q): + url = "http://localhost:%d/removeTask" % federation['PlanetLab'][0] + data = self.q_enc(q) + resp, response = self.proxy.request(uri = url, method = "POST", body = data, headers = self.headers) + self.assertTrue(resp.status == 200, "Service responded with status %s" % (resp.status)) + self.assertEqual(response, "OK", "Got: %s" % response) + + def addaggregator(self, q): + url = "http://localhost:%d/addAggregator" % federation['PlanetLab'][0] + data = self.q_enc(q) + resp, response = self.proxy.request(uri = url, method = "POST", body = data, headers = self.headers) + self.assertTrue(resp.status == 200, "Service responded with status %s" % (resp.status)) + self.assertTrue(response.startswith('PlanetLab:aggregate:'), "wrong process id %s" % response) + return response + + def fetchaggregate(self, q, expect = 26): + url = "http://localhost:%d/fetchAggregatorData" % federation['PlanetLab'][0] + data = self.q_enc(q) + resp, response = self.proxy.request(uri = url, method = "POST", body = data, headers = self.headers) + self.assertTrue(resp.status == 200, "Service responded with status %s" % (resp.status)) + self.tlen(response, expect) + return response + + def delaggregator(self, q): + url = "http://localhost:%d/removeAggregator" % federation['PlanetLab'][0] + data = self.q_enc(q) + resp, response = self.proxy.request(uri = url, method = "POST", body = data, headers = self.headers) + self.assertTrue(resp.status == 200, "Service responded with status %s" % (resp.status)) + self.assertEqual(response, "OK", "Got: %s" % response) + + def addcondition(self, q): + url = "http://localhost:%d/addCondition" % federation['PlanetLab'][0] + data = self.q_enc(q) + resp, response = self.proxy.request(uri = url, method = "POST", body = data, headers = self.headers) + self.assertTrue(resp.status == 200, "Service responded with status %s" % (resp.status)) + self.assertTrue(response.startswith('PlanetLab:watchdog:'), "wrong process id %s" % response) + return response + + def delcondition(self, q): + url = "http://localhost:%d/removeCondition" % federation['PlanetLab'][0] + data = self.q_enc(q) + resp, response = self.proxy.request(uri = url, method = "POST", body = data, headers = self.headers) + self.assertTrue(resp.status == 200, "Service responded with status %s" % (resp.status)) + self.assertEqual(response, "OK", "Got: %s" % response) + + def checkcondition(self, q): + url = "http://localhost:%d/checkCondition" % federation['PlanetLab'][0] + data = self.q_enc(q) + resp, response = self.proxy.request(uri = url, method = "POST", body = data, headers = self.headers) + self.assertTrue(resp.status == 200, "Service responded with status %s" % (resp.status)) + self.assertGreater(len(response.splitlines()), 2, "Got: %s" % response) + return response + + + def tlen(self, response, expect = 26): +# print response + self.assertTrue(response, "Got nothing due to former errors") + if expect > 1: + self.assertGreater(len(response.splitlines()), expect, "got empty measurement response") + else: + self.assertGreater(len(response), 2, "got empty measurement response") + return response + + + + def new_g(self): + g = Graph() + for k, (_, ns) in ontology.iteritems(): + g.bind(k, Namespace(ns)) + mns = self.mns + g.bind('q', mns) + + return g + + def save(self, fn, q): + try: + with open(fn, 'w') as f: + f.write(q) + with open("%s.ue" % fn, 'w') as f: + f.write( self.q_enc(q) ) + except: + pass + + def test_measure(self): + doc = "../../information-model/monitoring-model//monitoringQuery_example.owl" + with open(doc) as fp: + q = fp.read() + self.measure(q) + + def addnode(self, g, resname = 'smilax1', address = '150.254.160.19'): + mns = self.mns + NS = self.ns + TYPE = self.ns_type + R = mns[resname] + I1 = mns['ifin'] + I2 = mns['ifout'] + IPADDR = Literal(address) + ADDR = mns['%s_address' % resname] + g.add((R, TYPE, NS('core')['Node'])) + g.add((R, TYPE, NS('core')['Resource'])) + g.add((R, TYPE, NS('owl')['NamedIndividual'])) + + g.add((R, NS('core')['hasInboundInterface'], I1)) + g.add((R, NS('core')['hasOutboundInterface'], I1)) + g.add((I1, TYPE, NS('core')['Interface'])) + g.add((I2, TYPE, NS('core')['Interface'])) + g.add((I1, NS('core')['hasIPv4Address'], ADDR)) + g.add((I2, NS('core')['hasIPv4Address'], ADDR)) + g.add((ADDR, TYPE, NS('owl')['NamedIndividual'])) + g.add((ADDR, TYPE, NS('unit')['IPAddress'])) + g.add((ADDR, NS('unit')['hasValue'], IPADDR)) + return R + + def addPar(self, g, pname = 'partition', pval = '/', ptype = 'String', pdim = 'NameOfSomething'): + P = self.mns['par_%s' % pname] + NS = self.ns + TYPE = self.ns_type + g.add((P, TYPE, NS('owl')['NamedIndividual'])) + g.add((P, TYPE, NS('query')['QueryParameter'])) + g.add((P, NS('param')['paramName'], Literal(pname))) + g.add((P, NS('unit')['hasValue'], Literal(pval))) + g.add((P, NS('param')['hasType'], NS('param')[ptype])) + g.add((P, TYPE, NS('unit')[pdim])) + return P + + def bindNode(self, g, Q, R): + g.add((Q, self.ns('query')['hasResource'], R)) + + def bindPar(self, g, Q, P): + g.add((Q, self.ns('param')['hasParameter'], P)) + + def newQ(self, g, what = 'MemoryUtilization'): + Q = self.mns['measure_%s' % what] + NS = self.ns + TYPE = NS('rdf')['type'] + g.add((Q, TYPE, NS('owl')['NamedIndividual'])) + g.add((Q, TYPE, NS('query')['BundleQuery'])) + g.add((Q, NS('feature')['hasFeature'], NS('feature')[what])) + + g.add((Q, NS('stat')['hasSample'], self.S)) + g.add((Q, NS('query')['hasFormatter'], self.F)) + return Q + + def createaggregatorquery(self, pid, what = 'MemoryUtilization'): + g = self.new_g() + R = self.addnode(g) + Q = self.mns['aggr_%s' % what] + NS = self.ns + TYPE = NS('rdf')['type'] + g.add((Q, TYPE, NS('owl')['NamedIndividual'])) + g.add((Q, TYPE, NS('query')['SampleManipulationQuery'])) + g.add((Q, NS('feature')['hasFeature'], NS('feature')[what])) + + g.add((Q, NS('query')['hasFormatter'], self.F)) + g.add((Q, NS('query')['hasProcessid'], Literal(pid))) + g.add((Q, NS('query')['hasResource'], R)) + + P = self.mns['par_last5'] + NS = self.ns + TYPE = self.ns_type + g.add((P, TYPE, NS('owl')['NamedIndividual'])) + g.add((P, TYPE, NS('query')['SOP_tail'])) + g.add((P, NS('param')['paramName'], Literal('tail'))) + g.add((P, NS('unit')['hasValue'], Literal('5'))) + g.add((P, NS('param')['hasType'], NS('param')['Integer'])) + g.add((P, TYPE, NS('unit')['Countable'])) + + L5 = self.mns['last5'] + g.add((L5, TYPE, NS('owl')['NamedIndividual'])) + g.add((L5, TYPE, NS('stat')['Tail'])) + g.add((L5, NS('stat')['hasSample'], self.S)) + g.add((L5, NS('param')['hasParameter'], P)) + ML5 = self.mns['maxlast5'] + g.add((ML5, TYPE, NS('owl')['NamedIndividual'])) + g.add((ML5, TYPE, NS('stat')['Maximum'])) + g.add((ML5, NS('stat')['hasSample'], L5)) + g.add((Q, NS('stat')['hasSample'], ML5)) + + return g.serialize() + + + def createconditionquery(self, pid, what = 'MemoryUtilization'): + g = self.new_g() + R = self.addnode(g) + NS = self.ns + TYPE = self.ns_type + + P = self.mns['par_last'] + g.add((P, TYPE, NS('owl')['NamedIndividual'])) + g.add((P, TYPE, NS('query')['SOP_tail'])) + g.add((P, NS('param')['paramName'], Literal('tail'))) + g.add((P, NS('unit')['hasValue'], Literal('1'))) + g.add((P, NS('param')['hasType'], NS('param')['Integer'])) + g.add((P, TYPE, NS('unit')['Countable'])) + + L = self.mns['last'] + g.add((L, TYPE, NS('owl')['NamedIndividual'])) + g.add((L, TYPE, NS('stat')['Tail'])) + g.add((L, NS('stat')['hasSample'], self.S)) + g.add((L, NS('param')['hasParameter'], P)) + + MIN = self.mns['minall'] + g.add((MIN, TYPE, NS('owl')['NamedIndividual'])) + g.add((MIN, TYPE, NS('stat')['Minimum'])) + g.add((MIN, NS('stat')['hasSample'], self.S)) + + ML = self.mns['maxlast1'] + g.add((ML, TYPE, NS('owl')['NamedIndividual'])) + g.add((ML, TYPE, NS('stat')['Maximum'])) + g.add((ML, NS('stat')['hasSample'], L)) + + T1 = self.mns['lastitem_%s' % what] + g.add((T1, TYPE, NS('owl')['NamedIndividual'])) + g.add((T1, TYPE, NS('stat')['SampleTerm'])) +# g.add((T, NS('stat')['hasScale'], Literal(1))) + g.add((T1, NS('stat')['hasSample'], ML)) + + T2 = self.mns['minitem_%s' % what] + g.add((T2, TYPE, NS('owl')['NamedIndividual'])) + g.add((T2, TYPE, NS('stat')['SampleTerm'])) + g.add((T2, NS('stat')['hasScale'], Literal('-1.5'))) + g.add((T2, NS('stat')['hasSample'], MIN)) + + LCS = self.mns['lcs_%s' % what] + g.add((LCS, TYPE, NS('owl')['NamedIndividual'])) + g.add((LCS, TYPE, NS('stat')['LinearCombinedSample'])) + g.add((LCS, NS('stat')['hasTerm'], T1)) + g.add((LCS, NS('stat')['hasTerm'], T2)) + + # condition: "last measurement" > "1.5 * min(all measurements)" + C = self.mns['cond_%s' % what] + g.add((C, TYPE, NS('owl')['NamedIndividual'])) + g.add((C, TYPE, NS('stat')['IsPositive'])) + g.add((C, NS('stat')['hasSample'], LCS)) + + Q = self.mns['condq_%s' % what] + g.add((Q, TYPE, NS('owl')['NamedIndividual'])) + g.add((Q, TYPE, NS('query')['ConditionQuery'])) + g.add((Q, NS('feature')['hasFeature'], NS('feature')[what])) + g.add((Q, NS('stat')['hasCondition'], C)) + g.add((Q, NS('query')['hasProcessid'], Literal(pid))) + g.add((Q, NS('query')['hasResource'], R)) + + return g.serialize() + + + def test_genq_mem(self): + g = self.new_g() + Q = self.newQ(g) + R = self.addnode(g) + self.bindNode(g, Q, R) + query = g.serialize() + self.save(fn = "/tmp/genq_mem.owl", q = query) + self.measure(query, 20) + + def test_genq_cpu(self): + g = self.new_g() + Q = self.newQ(g, what= 'CPUUtilization') + R = self.addnode(g) + self.bindNode(g, Q, R) + query = g.serialize() + self.save(fn = "/tmp/genq_cpu.owl", q = query) + self.measure(query, 16) + + def test_genq_err(self): + g = self.new_g() + Q = self.newQ(g, what = 'AlmaFa') + R = self.addnode(g) + self.bindNode(g, Q, R) + query = g.serialize() + self.save(fn = "/tmp/genq_mem_err.owl", q = query) + response = self.measure(query, 1) + self.assertTrue("error" in response, "no error message! got %s" % response) + + def test_genq_complex(self): + g = self.new_g() + R = self.addnode(g) + P = self.addPar(g) + for feature in ['FreeMemory', 'CPULoad', 'FreeDiskSpace', 'AlmaFa']: + Q = self.newQ(g, what = feature) + self.bindNode(g, Q, R) + if feature == 'FreeDiskSpace': #FIXME: ugly + self.bindPar(g, Q, P) + + query = g.serialize() + self.save(fn = "/tmp/genq_complex.owl", q = query) + response = self.measure(query, 26) + #print response + self.assertTrue("error" in response, "no error message! got %s" % response) + + def test_genq_memslice(self): + g = self.new_g() + Q = self.newQ(g) + R = self.addnode(g) + P = self.addPar(g, pname = 'SliceName', pval = 'novi_novi') + self.bindNode(g, Q, R) + self.bindPar(g, Q, P) + query = g.serialize() + self.save(fn = "/tmp/genq_memslice.owl", q = query) + pid = self.addtask(query) + query = self.createaggregatorquery(pid) + self.save(fn = "/tmp/genq_memaggr.owl", q = query) + aid = self.addaggregator(query) + print "COLLECTING DATA WAIT FOR 10 SECS" + sleep(10) + self.fetchaggregate(q = aid, expect = 21) + print "COLLECTING SOME MORE DATA WAIT FOR 10 SECS" + sleep(10) + self.fetchaggregate(q = aid, expect = 21) + + self.delaggregator(q = aid) + self.deltask(q = pid) + + + + + + def test_condition(self): + g = self.new_g() + Q = self.newQ(g) + R = self.addnode(g) + P = self.addPar(g, pname = 'SliceName', pval = 'novi_novi') + self.bindNode(g, Q, R) + self.bindPar(g, Q, P) + query = g.serialize() + self.save(fn = "/tmp/genq_memslice_c.owl", q = query) + pid = self.addtask(query) + query = self.createconditionquery(pid) + self.save(fn = "/tmp/genq_cond.owl", q = query) + cid = self.addcondition(query) + + sleep(3) + print self.checkcondition(q = cid) + + self.delcondition(q = cid) + self.deltask(q = pid) + + + + + + + +if __name__ == "__main__": + #import sys;sys.argv = ['', 'Test.test_genq'] + try: + unittest.main() + finally: + stop_servers(t) diff --git a/Monitoring/MonitoringService/Task/EventHandler.py b/Monitoring/MonitoringService/Task/EventHandler.py new file mode 100644 index 0000000..40bd8e9 --- /dev/null +++ b/Monitoring/MonitoringService/Task/EventHandler.py @@ -0,0 +1,17 @@ +''' +Created on 08.08.2011 + +@author: csc +''' + +class EventHandler(): + ''' + classdocs + ''' + + + def __init__(self, parent): + ''' + Constructor + ''' + self.parent = parent \ No newline at end of file diff --git a/Monitoring/MonitoringService/Task/Task.py b/Monitoring/MonitoringService/Task/Task.py new file mode 100644 index 0000000..ce1b603 --- /dev/null +++ b/Monitoring/MonitoringService/Task/Task.py @@ -0,0 +1,447 @@ +''' +Created on 08.08.2011 + +@author: steger, jozsef +@organization: ELTE +@contact: steger@complex.elte.hu +''' + +from time import time +from threading import Thread, Event, RLock +from DataProcessing.Data import Data, DataHeader +from DataProcessing.Parameter import ParameterList +from Credential.credentialtypes import Credential +import Driver +from DataProcessing.DataSample import DataSample +import logging + +class TaskError(Exception): + pass + +''' +@var STRAT_PERIODICAL: sampling strategy, when samples are taken periodically controlled by wait +@var STRAT_ONDEMAND: sampling strategy, retrievehook is run once, then automatically Task is disabled +@var STRAT_FUNCTIONAL: sampling strategy, user provides a call back function to generate the time series to wait between consecutive sampling +''' +STRAT_PERIODICAL, STRAT_ONDEMAND, STRAT_FUNCTIONAL = range(3) + + +class SubtaskManager(object): + ''' + @author: Jozsef Steger + @summary: SubtaskManager class provides the basic functionalities: + 1. to generate a new Task + 2. to access a Task referenced by the task identifier + 3. to destroy a Task explicit or referenced by the task identifier + ''' + logger = logging.getLogger("NOVI.MS.STM") + + class Task(object): + ''' + @author: Jozsef Steger + @summary: + This class represents a simplified control plane of a wide variety of monitoring tools. + In order to generate monitoring data tools are manipulated via several steps. + Descendants of the Task class implement these steps. + In the simplified work flow the following steps are abstracted: + - prehook: it is run once to initiate the communication channel to the tool and/or to initialize a monitoring task + - starthook: in case the tool operates in such a manner, this step is run to trigger the start up of a background measurement + - retrievehook: implements data retrieval from the tool, it has to transform measurement data to the internal data representation + - stophook: it takes care for stopping any processes running in the background at the tool + - posthook: provides a means to clean up after a monitoring task is over + + Subclasses must be generated dynamically using the generateSubclass() method + + @cvar timeout: the time to wait for the background thread to finish + + @cvar STATE_SLEEPING: a state, retirevehook does not carry out data collection, monitoring is kept asleep + @cvar STATE_RUNNING: a state, retrievehook carries out data collection + + @note: Only descendants of this class are intended to be instantiated via the static method generateSubclass + + @ivar slock: lock concurrent enable/disable calls + @ivar cleanup: the function to call back after task is deleted + @ivar um: the unit manager + @ivar strategy: indicates how sampling is done, ie. how long to wait between two measurements + @ivar _wait: for periodical sampling (STRAT_PERIODICAL) this constant is yielded by the generator + @ivar gen_functional: for user defined sampling (STRAT_FUNCTIONAL) this is the generator to use + @ivar credential: credential + @ivar parameters: parameters + @ivar sliceID: slice identifier the Task belongs to, -1 means unspecified. + @ivar data: internal representation of monitoring data + @ivar inRetrievehook: Event indicating current data retrieval + @ivar dataAdded: Event indicating new data are produced by former runs + @ivar stopworker: Event indicating that the Task was disabled + @ivar t: Thread to take care of new measurements + @ivar _runcount: integer showing how many data retrieval attempt have been made + @ivar _durationrecord: a list of time stamp and duration pairs showing when data retrievals + happened and how long they took + ''' + timeout = 5 + STATE_SLEEPING, STATE_RUNNING = range(2) + + def __init__(self, taskmanager, credential, parameters = None, samplingfunction = None, **kw): + ''' + @summary: Constructor + @param taskmanager: a reference to the SubtaskManager, which needs to called back + if explicit deletion of the Task happens to keep the list of tasks coherent + @type taskmanager: SubtaskManager + @param credential: the necessary authentication information to access and control the tools + @type credential: Credential + @param parameters: the parameters to control the tools, ie. measurement details + @type parameters: ParameterList + @param samplingfunction: a generator function yielding a number, if the generator provides a + finite series of numbers, after all items are consumed the Task gets disabled + @type samplingfunction: callable + @keyword kw: parameters, which are passed to the prehook method of the Task + @raise TaskError: wrong type of credential + + + @note: deleting the strategy property disables the Task and sets the strategy to STRAT_ONDEMAND + @note: setting the strategy property takes effect only after the Task is (re)enabled + ''' + self.slock = RLock() + self.logger = taskmanager.logger + self.cleanup = taskmanager.removefromtasklist + self.um = taskmanager.um + self._strategy = STRAT_PERIODICAL + self._wait = 1 + self._sliceID = -1 + self.gen_functional = samplingfunction + if isinstance(credential, Credential): + self.credential = credential + else: + raise TaskError("wrong type of credential") + if isinstance(parameters, ParameterList): + self.parameters = parameters + else: + self.parameters = ParameterList() + D = Data(self.um, self.dataheader) + self.datasource = DataSample(table = D) + self.data = self.datasource #FIXME: left here for compatibility + #FIXME: mimic DataSource to look like Data for some methods + self.data.getTemplate = D.getTemplate + def saverecord(record): + D.saveRecord(record) + self.datasource.process() + self.data.saveRecord = saverecord + #FIXME: mimicry over + self.inRetrievehook = Event() + self.dataAdded = Event() + self.stopworker = Event() + self.stopworker.set() + try: + self.prehook(**kw) + except: + raise #TaskError() + self.t = None + self._runcount = 0 + self._durationrecord = [] + self.driver = None + self.dataheader = None + self.logger.debug("+ INIT %s" % self) + + def __del__(self): + print "DEL" + self.destroy() + + def destroy(self): + if self.state == self.STATE_RUNNING: + # let us join the thread if necessary + self.disable() + with self.slock: + self.posthook() + if self.cleanup: + # remove reference in the SubTaskManager + self.cleanup(self) + self.cleanup = None + self.logger.debug("- DESTROY %s, reference removed" % self) + else: + self.logger.debug("- DESTROY %s, reference already removed" % self) + + def wait(self, wait): + ''' + @summary: wait until Task is disabled or the provided waiting time is over + @param wait: requests time to wait for the next sample + @type wait: float + ''' + try: + self.stopworker.wait( max(.1, float(wait)) ) + except: + self.stopworker.wait( 1 ) + + @property + def sliceID(self): + return self._sliceID + @sliceID.setter + def sliceID(self, value): + if (self._sliceID != -1) and (self._sliceID != value): + raise TaskError("you can set sliceID property only once") + self._sliceID = value + @sliceID.deleter + def sliceID(self): + raise TaskError("shan't ever delete this property") + + @property + def runcount(self): + return self._runcount + @runcount.setter + def runcount(self, value): + raise TaskError("shan't ever set this property") + @runcount.deleter + def runcount(self): + raise TaskError("shan't ever delete this property") + + @property + def duration(self): + if len(self._durationrecord): + return self._durationrecord[-1][1] + else: + return -1 + + @property + def state(self): + if self.stopworker.isSet(): + return self.STATE_SLEEPING + else: + return self.STATE_RUNNING + + def state_hrn(self): + """ + @summary: return the state of the task in human readable format + @return: the state of the task + @rtype: string + """ + if self.stopworker.isSet(): + return "SLEEPING" + else: + return "RUNNING" + + @property + def strategy(self): + with self.slock: + return self._strategy + @strategy.setter + def strategy(self, value): + if value in [STRAT_ONDEMAND, STRAT_PERIODICAL, STRAT_FUNCTIONAL]: + with self.slock: + self._strategy = value + @strategy.deleter + def strategy(self): + self.disable() + with self.slock: + self._strategy = STRAT_ONDEMAND + + def gen_ondemand(self): + yield 0 + + def gen_periodic(self): + while True: + yield self._wait + + def enable(self): + """ + @summary: enable task + """ + with self.slock: + if self.t is not None: + raise TaskError("You can enable a perfectly disabled Task") + self.stopworker.clear() + # run starthook + self.starthook() + # initialize working thread + self.t = Thread(target = self._worker, name = str(self)) + self.t.setDaemon(True) + self.t.start() + + def disable(self): + """ + @summary: disable task + """ + with self.slock: + if self.t is None: + self.logger.debug("W %s already disabled" % self) + return + self.stopworker.set() + try: + # wait for working thread to finish + n = 0 + while True: + n += 1 + self.t.join(self.timeout) + if self.t.isAlive(): + self.logger.error("E %s timeout occurred %d times while disabling task" % self) + else: + break + except RuntimeError: + # generator does not provide any more waiting time interval + # thread tries to join itself + pass + # run stophook + try: + self.stophook() + finally: + self.t = None + + def _worker(self): + ''' + @summary: This method is running in a background thread. + It takes care of calling retrievehook. + If new data are produced by the tool it is indicated via dataAdded. + ''' + strategy = self.strategy + if strategy == STRAT_ONDEMAND: + generator = self.gen_ondemand + elif strategy == STRAT_PERIODICAL: + generator = self.gen_periodic + elif strategy == STRAT_FUNCTIONAL: + generator = self.gen_functional + for wait in generator(): + if not self.stopworker.isSet(): + # the task is still running + self._runcount += 1 + self.inRetrievehook.set() + invocation = time() + try: + R = self.retrievehook() + except Exception, e: + self.logger.error("E %s exception in retrievehook() %s" % (self, e)) + break + finally: + self.inRetrievehook.clear() + stopped = time() + self._durationrecord.append( (invocation, stopped - invocation) ) + if R: + self.dataAdded.set() + else: + # the task is disabled + break + self.wait(wait) + if not self.stopworker.isSet(): + # the Task is not disabled + # but there are no more items in the series of waiting time + # so we disable it + self.disable() + + def __init__(self, unitmanager): + self._tasks = {} + self.tasklock = RLock() + self._lastid = -1 + self.um = unitmanager + self.logger.debug("II: INIT %s" % self) + + def __del__(self): + self.logger.debug("II: DEL %s" % self) + + def __str__(self): + return "" % (id(self), len(self)) + + def __len__(self): + return len(self._tasks) + + @property + def uniqid(self): + self._lastid += 1 + return self._lastid + + @staticmethod + def _shiftMethod(implementation): + ''' + @summary: helps indentation of the piece of implemented code + ''' + return "\n\t\t".join( filter(None, implementation.splitlines()) ) + + def generate(self, name, driver, dataheader, hookimplementations, credential, parameters, samplingfunction = None, **kw): + ''' + @summary: This method is responsible for dynamically generate a new Task subclass + @param name: the name of the dynamically generated class + @type name: string + @param driver: driver implementing a communication channel to the tool + @type driver: Driver + @param dataheader: contains information of the data structure of the result + @type dataheader: DataHeader + @param hookimplementations: the work flow steps (hooks) + @type hookimplementations: dict + @param credential: the credential used by the driver + @type credential: dict + @param parameters: list of control parameters to fine tune measurements + @type parameters: ParameterList + @param samplingfunction: a generator yielding the time interval elements of a series to wait between consecutive samples + @type samplingfunction: callable + @keyword kw: extra keyword arguments passed to the prehook of the new Task + @return: identifier and the subclass instance of (Task) + @rtype: int, Task + @raise TaskError: wrong Driver type / wrong DataHeader type / erroneous implementation + ''' + prehook = self._shiftMethod( hookimplementations.get("prehook", "pass") ) + starthook = self._shiftMethod( hookimplementations.get("starthook", "pass") ) + stophook = self._shiftMethod( hookimplementations.get("stophook", "pass") ) + posthook = self._shiftMethod( hookimplementations.get("posthook", "pass") ) + retrievehook = self._shiftMethod( hookimplementations.get("retrievehook", "raise TaskException(\"retrievehook() must be implemented\")") ) + if not issubclass(driver, Driver.Driver.Driver): + raise TaskError("wrong Driver type %s" % driver) + if not isinstance(dataheader, DataHeader): + raise TaskError("wrong DataHeader type %s" % dataheader) + classtemplate = """ +import re +from DataProcessing.Data import DataHeader +class %s(SubtaskManager.Task): +\tdef prehook(self, **kw): +\t\t%s +\tdef starthook(self): +\t\t%s +\tdef retrievehook(self): +\t\t%s +\tdef stophook(self): +\t\t%s +\tdef posthook(self): +\t\t%s""" % (name, prehook, starthook, retrievehook, stophook, posthook) + try: + exec(classtemplate, globals()) + except: + self.logger.error(classtemplate) + raise TaskError("erroneous implementation (%s)" % name) + taskclass = globals()[name] + taskclass.driver = driver + taskclass.dataheader = dataheader + taskid = self.uniqid + task = taskclass(self, credential, parameters, samplingfunction, **kw) + with self.tasklock: + self._tasks[taskid] = task + self.logger.info("++: %s new Task %s is identified by %d" % (self, task, taskid)) + return taskid, task + + def __getitem__(self, taskidentifier): + taskidentifier = int(taskidentifier) + with self.tasklock: + return self._tasks[taskidentifier] + + def getidentifier(self, task): + with self.tasklock: + for tid, t in self._tasks.iteritems(): + if task == t: + return tid + raise TaskError("Task %s is unknown to me" % task) + + def removefromtasklist(self, task): + with self.tasklock: + try: + taskidentifier = self.getidentifier(task) + self.pop(taskidentifier) + except TaskError: + pass + + def pop(self, taskidentifier): + try: + task = self._tasks.pop(taskidentifier) + task.cleanup = None + self.logger.info("--: %s Task %s identified by %d removed" % (self, task, taskidentifier)) + task.destroy() + task = None + except KeyError: + self.logger.warning("WW: %s Task to remove identified by %s is unknown to me" % (self, taskidentifier)) + + def tasks_of_slice(self, sliceID = -1): + for t in self._tasks.values(): + if t.sliceID == sliceID: + yield t diff --git a/Monitoring/MonitoringService/Task/__init__.py b/Monitoring/MonitoringService/Task/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/Monitoring/MonitoringService/Task/__init__.py diff --git a/Monitoring/MonitoringService/Task/test.py b/Monitoring/MonitoringService/Task/test.py new file mode 100644 index 0000000..5bfd188 --- /dev/null +++ b/Monitoring/MonitoringService/Task/test.py @@ -0,0 +1,237 @@ +''' +Created on Sep 1, 2011 + +@author: steger +''' +import unittest +from Task import SubtaskManager, STRAT_FUNCTIONAL, STRAT_ONDEMAND +from Example.Tools import sshping, sshtraceroute, sonomashortping, \ + sshmeminfo, sonomashortchirp, sshdf, sshhades +from Example.credentials import noviCredential, sonomaCredential,\ + novisaCredential, novihadesCredential +from Example.Metrics import RoundTripDelay, HopMeasurement, FreeMemory,\ + OnewayDelay, DiskUsage +from Example.Units import UM, unitless +from Resource.node import node +from Resource.path import path +from Example.Resources import PLdict, PLpaths +from time import sleep, time +from random import shuffle +from logging import FileHandler, NullHandler +import logging + + +class Test(unittest.TestCase): + cred_novi = noviCredential + cred_siteadmin = novisaCredential + cred_sonoma = sonomaCredential + cred_hades = novihadesCredential + + def setUp(self): + pass + + def tearDown(self): + pass + + @staticmethod + def map_resource_to_parameter(tool, metric, iface = "eth0"): + pl = tool.parameters.copy() + if issubclass(metric.resourcetype, node): + resource = PLdict.values()[0] + v, u = resource.get_ipaddress("eth0") + pl.update("SourceAddress", v, u) + elif issubclass(metric.resourcetype, path): + resource = PLpaths[0] + v, u = resource.source.get_ipaddress("eth0") + pl.update("SourceAddress", v, u) + v, u = resource.destination.get_ipaddress(iface) + pl.update("DestinationAddress", v, u) + else: + raise Exception("Unknown resource type") + return pl + + def sshPingFg(self, tool, iface): + tool.parameters.update('Interface', iface, unitless) + pl = self.map_resource_to_parameter(tool, RoundTripDelay, iface) + _, task = TM.generate(name = tool.name, driver = tool.driver, dataheader = tool.dataheaderdeclaration, + hookimplementations = tool.hooks, parameters = pl, credential = self.cred_novi) + task.enable() + task.dataAdded.wait( 15 ) + self.assertGreater(len(task.data), 0, "measurement yielded empty result") + task.destroy() + + def test_sshPingFgNOVI(self): + tool = sshping + self.sshPingFg(tool, 'novi') + + def test_sshPingFgSubstrate(self): + tool = sshping + self.sshPingFg(tool, 'eth0') + + def test_sshTracerouteFg(self): + tool = sshtraceroute + pl = self.map_resource_to_parameter( tool, HopMeasurement ) + _, task = TM.generate(name = tool.name, driver = tool.driver, dataheader = tool.dataheaderdeclaration, + hookimplementations = tool.hooks, parameters = pl, credential = self.cred_novi) + task.enable() + task.dataAdded.wait( 15 ) + self.assertGreater(len(task.data), 0, "measurement yielded empty result") + task.destroy() + + def test_sshMeminfo(self): + tool = sshmeminfo + pl = self.map_resource_to_parameter( tool, FreeMemory ) + _, task = TM.generate(name = tool.name, driver = tool.driver, dataheader = tool.dataheaderdeclaration, + hookimplementations = tool.hooks, parameters = pl, credential = self.cred_novi) + task.enable() + task.dataAdded.wait( 15 ) + self.assertGreater(len(task.data), 0, "measurement yielded empty result") + task.destroy() + + def test_sshDiskinfo(self): + tool = sshdf + pl = self.map_resource_to_parameter( tool, DiskUsage ) + pl.update_by_list( DiskUsage.p_obligatory ) + _, task = TM.generate(name = tool.name, driver = tool.driver, dataheader = tool.dataheaderdeclaration, + hookimplementations = tool.hooks, parameters = pl, credential = self.cred_siteadmin) + task.enable() + task.dataAdded.wait( 15 ) + self.assertGreater(len(task.data), 0, "measurement yielded empty result") + task.destroy() + + def test_sshMeminfoUserDefinedSampling(self): + def sample5(): + for s in [.1, 1, 2, .5, -10]: + yield s + tool = sshmeminfo + pl = self.map_resource_to_parameter( tool, FreeMemory ) + _, task = TM.generate(name = tool.name, driver = tool.driver, dataheader = tool.dataheaderdeclaration, + hookimplementations = tool.hooks, parameters = pl, credential = self.cred_novi, samplingfunction = sample5) + task.strategy = STRAT_FUNCTIONAL + task.enable() + while task.state == task.STATE_RUNNING: + sleep(1) + task.dataAdded.wait( 1 ) + self.assertGreater(len(task.data), 0, "measurement yielded empty result") + task.destroy() + + def test_SONoMAShortPing(self): + #FIXME: + return + tool = sonomashortping + pl = self.map_resource_to_parameter( tool, RoundTripDelay ) + _, task = TM.generate(name = tool.name, driver = tool.driver, dataheader = tool.dataheaderdeclaration, + hookimplementations = tool.hooks, parameters = pl, credential = self.cred_sonoma, **tool.kwargs) + task.enable() + task.dataAdded.wait( 15 ) + self.assertGreater(len(task.data), 0, "measurement yielded empty result") + task.destroy() + + def test_SONoMAShortChirp(self): + #FIXME: + return + tool = sonomashortchirp + pl = self.map_resource_to_parameter( tool, OnewayDelay ) + _, task = TM.generate(name = tool.name, driver = tool.driver, dataheader = tool.dataheaderdeclaration, + hookimplementations = tool.hooks, parameters = pl, credential = self.cred_sonoma, **tool.kwargs) + task.enable() + task.dataAdded.wait( 15 ) + self.assertGreater(len(task.data), 0, "measurement yielded empty result") + task.destroy() + + def test_HADES(self): + #FIXME: + return + tool = sshhades + pl = self.map_resource_to_parameter( tool, RoundTripDelay ) + pl.update('SourceAddress', '192.168.31.1', self.um.ipv4dotted) + pl.update('DestinationAddress', '192.168.31.5', self.um.ipv4dotted) + _, task = TM.generate(name = tool.name, driver = tool.driver, dataheader = tool.dataheaderdeclaration, + hookimplementations = tool.hooks, parameters = pl, credential = self.cred_hades, **tool.kwargs) + task.enable() + task.dataAdded.wait( 15 ) + self.assertGreater(len(task.data), 0, "measurement yielded empty result") + task.destroy() + + def test_toggle(self): + tool = sshmeminfo + pl = self.map_resource_to_parameter( tool, FreeMemory ) + _, task = TM.generate(name = tool.name, driver = tool.driver, dataheader = tool.dataheaderdeclaration, + hookimplementations = tool.hooks, parameters = pl, credential = self.cred_novi) + n = 10 + to = 1.5 + t = time() + while n: + task.enable() + task.dataAdded.wait( to ) + task.disable() + task.dataAdded.clear() + n -= 1 + dt = time()-t + self.assertGreater(len(task.data), 0, "measurement yielded empty result") + self.assertGreater(dt, n*to, "measurement lasted longer than expected %f > %f" % (dt, n*to)) + task.destroy() + + def test_tm(self): + tool = sshmeminfo + pl = self.map_resource_to_parameter( tool, FreeMemory ) + N = len(TM) + # deletion by task.destroy() + n= 10 + tasks = [] + while n: + _, task = TM.generate(name = tool.name, driver = tool.driver, dataheader = tool.dataheaderdeclaration, + hookimplementations = tool.hooks, parameters = pl, credential = self.cred_novi) + n -= 1 + task.enable() + tasks.append(task) + shuffle(tasks) + while len(tasks): + t = tasks.pop() + t.destroy() + # deletion by task using TM method + n= 10 + tasks = [] + while n: + _, task = TM.generate(name = tool.name, driver = tool.driver, dataheader = tool.dataheaderdeclaration, + hookimplementations = tool.hooks, parameters = pl, credential = self.cred_novi) + n -= 1 + task.enable() + tasks.append(task) + shuffle(tasks) + while len(tasks): + t = tasks.pop() + TM.removefromtasklist(t) + # destroy by taskid, these tasks are never enabled, not launching new threads + n= 10 + taskids = [] + while n: + taskid, _ = TM.generate(name = tool.name, driver = tool.driver, dataheader = tool.dataheaderdeclaration, + hookimplementations = tool.hooks, parameters = pl, credential = self.cred_novi) + n -= 1 + taskids.append(taskid) + shuffle(taskids) + while len(taskids): + tid = taskids.pop() + TM.pop(tid) + self.assertEqual(N, len(TM), "some tasks were not removed from the SubtaskManager") + + +if __name__ == "__main__": + #import sys;sys.argv = ['', 'Test.test_toggle'] + l = logging.getLogger("NOVI.DRIVER") + l.addHandler(NullHandler()) + fn = "/tmp/Task_test.log" + hdlr = FileHandler(filename = fn, mode = 'w') + l = logging.getLogger("NOVI.MS.STM") +# l.setLevel(level = logging.DEBUG) + l.setLevel(level = logging.INFO) + l.addHandler(hdlr = hdlr) + l.info("START TEST") + try: + TM = SubtaskManager(UM) + unittest.main() + finally: + del(TM) + l.info("FINISHED TEST") + hdlr.close() diff --git a/Monitoring/MonitoringService/debugger.py b/Monitoring/MonitoringService/debugger.py new file mode 100644 index 0000000..e488fb6 --- /dev/null +++ b/Monitoring/MonitoringService/debugger.py @@ -0,0 +1,13 @@ +from time import time + +class d(object): + def __init__(self, n = None): + self.n = n + self.c = 0 + self.t = time() + def __str__(self): + t = time() + self.c += 1 + r = "%s %d %f" % (self.n, self.c, t-self.t) + self.t = t + return r -- cgit