diff options
Diffstat (limited to 'Monitoring/MonitoringService/DataProcessing')
21 files changed, 3350 insertions, 0 deletions
diff --git a/Monitoring/MonitoringService/DataProcessing/Aggregator.py b/Monitoring/MonitoringService/DataProcessing/Aggregator.py new file mode 100644 index 0000000..49855fb --- /dev/null +++ b/Monitoring/MonitoringService/DataProcessing/Aggregator.py @@ -0,0 +1,288 @@ +''' +Created on Aug 10, 2011 + +@author: steger, gombos, matuszka +''' + +from DataProcessing.MeasurementLevel import Ordinal, Ratio, Interval #Nominal +from math import sqrt +from DataProcessing.DataReader import DataReader +from DataProcessing.DataHeader import DataHeader, DataHeaderCell +from DataProcessing.DataSource import DataSource +from DataProcessing.Data import Data +from DataProcessing.DataError import AggregatorError + +#FIXME: docs + +class Aggregator(DataSource): + ''' + classdocs + @cvar cn_count: the name of the column indicating the set size before aggregation + ''' + cn_count = 'Count' + + def __init__(self, datasource, cellrequest): + ''' + Constructor + @param datasource: table of records to manipulate with + @type datasource: DataSource + @param cellrequest: a column wise projection of the table is carried out, this column is kept + @type cellrequest: CellRequest + ''' + if not isinstance(datasource, DataSource): + raise AggregatorError("Wrong type of datasource %s" % datasource) + DataSource.__init__(self, dependency = datasource) + self._inputreader.extract(cellrequest = [cellrequest]) + for c in self._inputreader.headercells(): + break + if not c.dimension.level(self.dimension_compatible): + raise AggregatorError("The measurement level of input (%s) is not compatible with %s" % (c.dimension, self.name)) + header = DataHeader("%sAggregate(%s)" % (self.name, datasource.name)) + dimension = c.dimension + header.addColumn(DataHeaderCell(name = self.cn_count, dimension = dimension.manager["Countable"])) + self.cn_aggr = '%s(%s)' % (self.name, c.name) + header.addColumn(DataHeaderCell(name = self.cn_aggr, dimension = dimension, unit = c.unit)) + self._data = Data(datasource.um, header) + self._record = self._data.getTemplate(size = 1) +# self.um = datasource.um + self.source = datasource + + self._aggregate = None + +#FIXME: ez a ketto cucc tenyleg kell? + def __len__(self): + return len(self._data) + @property + def writelock(self): + return self._data.writelock + + + @property + def aggregate(self): + self.process() + return self._aggregate + + @property + def readerClass(self): + return DataReader + + @property + def dimension_compatible(self): + raise AggregatorError("dimension_compatible property is not implemented in %s" % self) + + +class Sum(Aggregator): + def __init__(self, datasource, cellrequest): + Aggregator.__init__(self, datasource, cellrequest) + self._aggregate = 0 + + @property + def dimension_compatible(self): + return Interval + + @property + def name(self): + return "Sum" + + def _process(self): + if self._inputreader.sourceCleared.isSet(): + self._inputreader.sourceCleared.clear() + self._aggregate = 0 + self._inputreader.rewind() + changed = True + else: + changed = False + for (x,) in self._inputreader: + self._aggregate += float(x) + changed = True + if changed: + self._data.clear() + self._record.update(name = self.cn_aggr, values = (self._aggregate,)) + self._record.update(name = self.cn_count, values = (len(self.source),)) + self._data.saveRecord(self._record) + return self.CLEARED | self.EXPANDED + return self.PASS + +class Min(Aggregator): + def __init__(self, datasource, cellrequest): + Aggregator.__init__(self, datasource, cellrequest) + + @property + def dimension_compatible(self): + return Ordinal + + @property + def name(self): + return "Min" + + def _process(self): + if self._inputreader.sourceCleared.isSet(): + self._inputreader.sourceCleared.clear() + self._aggregate = None + self._inputreader.rewind() + if self._aggregate is None: + samples = [] + else: + samples = [self._aggregate] + for (x,) in self._inputreader: + samples.append( float(x) ) + newvalue = min(samples) + if self._aggregate != newvalue: + self._aggregate = newvalue + self._data.clear() + self._record.update(name = self.cn_aggr, values = (self._aggregate,)) + self._record.update(name = self.cn_count, values = (len(self.source),)) + self._data.saveRecord(self._record) + return self.CLEARED | self.EXPANDED + return self.PASS + +class Max(Aggregator): + def __init__(self, datasource, cellrequest): + Aggregator.__init__(self, datasource, cellrequest) + + @property + def dimension_compatible(self): + return Ordinal + + @property + def name(self): + return "Max" + + def _process(self): + if self._inputreader.sourceCleared.isSet(): + print "MAX src cleared" + self._inputreader.sourceCleared.clear() + self._aggregate = None + self._inputreader.rewind() + if self._aggregate is None: + samples = [] + else: + samples = [self._aggregate] + for (x,) in self._inputreader: + samples.append( float(x) ) + print "SAMPLES", samples + newvalue = max(samples) + print "MAX", newvalue, "of", len(self.source) + if self._aggregate != newvalue: + print "SETTING" + self._aggregate = newvalue + self._data.clear() + self._record.update(name = self.cn_aggr, values = (self._aggregate,)) + self._record.update(name = self.cn_count, values = (len(self.source),)) + self._data.saveRecord(self._record) + print "XXX", self._record.record + return self.CLEARED | self.EXPANDED + return self.PASS + +class Mean(Aggregator): + def __init__(self, datasource, cellrequest): + Aggregator.__init__(self, datasource, cellrequest) + self._sum = 0 + + @property + def dimension_compatible(self): + return Ratio + + @property + def name(self): + return "Mean" + + def _process(self): + changed = False + for (x,) in self._inputreader: + self._sum += float(x) + changed = True + if changed: + self._aggregate = self._sum / float(len(self.source)) + self._data.clear() + self._record.update(name = self.cn_aggr, values = (self._aggregate,)) + self._record.update(name = self.cn_count, values = (len(self.source),)) + self._data.saveRecord(self._record) + return self.CLEARED | self.EXPANDED + return self.PASS + +class Deviation(Aggregator): + def __init__(self, data, cellrequest): + Aggregator.__init__(self, data, cellrequest) + self._emp = True + + @property + def empirical(self): + return self._emp + @empirical.setter + def empirical(self, emp): + self._emp = bool(emp) + + @property + def dimension_compatible(self): + return Ratio + + @property + def name(self): + return "StdDev" + + def _process(self): + changed = False + aggr = 0 + data = [] + self._inputreader.rewind() + for (x,) in self._inputreader: + x = float(x) + aggr += x + data.append(x) + changed = True + if changed: + n = float(len(data)) + avg = aggr / n + s2 = map(lambda x: (x-avg)*(x-avg), data) + if self.empirical: + self._aggregate = sqrt(sum(s2) / (n+1)) + else: + self._aggregate = sqrt(sum(s2) / n) + self._data.clear() + self._record.update(name = self.cn_aggr, values = (self._aggregate,)) + self._record.update(name = self.cn_count, values = (len(self.source),)) + self._data.saveRecord(self._record) + return self.CLEARED | self.EXPANDED + return self.PASS + +class Percentile(Aggregator): + def __init__(self, data, cellrequest): + self._percentile = .75 + Aggregator.__init__(self, data, cellrequest) + + @property + def percentile(self): + return self._percentile + @percentile.setter + def percentile(self, percentile): + self._percentile = max(0, min(1, float(percentile))) + + @property + def dimension_compatible(self): + return Ordinal + + @property + def name(self): + return "Percentile_%d%%" % int(round(100 * self.percentile)) + + def _process(self): + data = [] + self._inputreader.rewind() + for (x,) in self._inputreader: + data.append(x) + data.sort() + n = len(data) + p = int((n - 1) * self.percentile) + if n % 2: + val = data[p] + else: + val = .5 * (data[p] + data[p+1]) + if self._aggregate != val: + self._aggregate = val + self._data.clear() + self._record.update(name = self.cn_aggr, values = (self._aggregate,)) + self._record.update(name = self.cn_count, values = (len(self.source),)) + self._data.saveRecord(self._record) + return self.CLEARED | self.EXPANDED + return self.PASS diff --git a/Monitoring/MonitoringService/DataProcessing/AggregatorManager.py b/Monitoring/MonitoringService/DataProcessing/AggregatorManager.py new file mode 100644 index 0000000..16d78fc --- /dev/null +++ b/Monitoring/MonitoringService/DataProcessing/AggregatorManager.py @@ -0,0 +1,48 @@ +''' +Created on Dec 10, 2012 + +@author: steger +''' +from DataProcessing.Aggregator import AggregatorError, Aggregator +from DataProcessing.Sampler import Sampler +from DataProcessing.Parameter import ParameterList + +class AggregatorManager(object): + def __init__(self): + self._id = 0; + self._aggregators = {} + + def newAggregator(self, dataSource, cellrequest, commandflow): + for c, ca in commandflow: + if issubclass(c, Aggregator): + dataSource = c(dataSource, cellrequest) + if isinstance(ca, ParameterList): + for p in ca: + dataSource.__setattr__(p.name, p.value[0]) + else: + for k, v in ca.iteritems(): + dataSource.__setattr__(k, v) + elif issubclass(c, Sampler): + dataSource = c(dataSource) + if isinstance(ca, ParameterList): + for p in ca: + dataSource.__setattr__(p.name, p.value[0]) + else: + for k, v in ca.iteritems(): + dataSource.__setattr__(k, v) + self._id += 1 + self._aggregators[ self._id ] = dataSource + return self._id, dataSource + + def __getitem__(self, aggregatorid): + try: + return self._aggregators[ aggregatorid ] + except: + raise AggregatorError("Aggregator with id %s not found" % aggregatorid) + + def pop(self, aggregatorid): + try: + self._aggregators.pop( aggregatorid ) + except KeyError: + print "WW: Aggregator with id %s not found" % aggregatorid +
\ No newline at end of file diff --git a/Monitoring/MonitoringService/DataProcessing/Bool.py b/Monitoring/MonitoringService/DataProcessing/Bool.py new file mode 100644 index 0000000..54cbb4e --- /dev/null +++ b/Monitoring/MonitoringService/DataProcessing/Bool.py @@ -0,0 +1,62 @@ +''' +Created on Mar 22, 2013 + +@author: steger +''' +from DataProcessing.DataError import DataError + +#FIXME: this is a DataSource? +class Comparator(object): + ''' + classdocs + ''' + def __init__(self, datasource): + self._datasource = datasource + + @property + def value(self): + raise DataError("Implement value property") + +class IsPositive(Comparator): + ''' + ''' + @property + def name(self): + return "IsPositive(%s)" % self._datasource.name + + @property + def value(self): + return self._datasource.value > 0 + +class IsNegative(Comparator): + ''' + ''' + @property + def name(self): + return "IsNegative(%s)" % self._datasource.name + + @property + def value(self): + return self._datasource.value < 0 + +class IsNotPositive(Comparator): + ''' + ''' + @property + def name(self): + return "IsNotPositive(%s)" % self._datasource.name + + @property + def value(self): + return self._datasource.value <= 0 + +class IsNotNegative(Comparator): + ''' + ''' + @property + def name(self): + return "IsNotNegative(%s)" % self._datasource.name + + @property + def value(self): + return self._datasource.value >= 0
\ No newline at end of file diff --git a/Monitoring/MonitoringService/DataProcessing/Data.py b/Monitoring/MonitoringService/DataProcessing/Data.py new file mode 100644 index 0000000..60ace66 --- /dev/null +++ b/Monitoring/MonitoringService/DataProcessing/Data.py @@ -0,0 +1,281 @@ +''' +Created on Sep 1, 2011 + +@author: steger, jozsef +@organization: ELTE +@contact: steger@complex.elte.hu +@author: laki, sandor +''' + +from threading import RLock, Event +from DataProcessing.DataHeader import DataHeaderCell, DataHeader, DataError +from DataProcessing.Unit import UnitManager + + +class Data(object): + ''' + @author: steger, jozsef + @summary: + This class implements the representation of data provided by a tool or other sort of data source. + This class contains the data in a tabular format. + The meta information of the columns are dictated by a DataHeader instance. + All items in the same column are data of the same kind, + whereas all data in the same record (row) are correlated. + + Contents of cells of a given column are either single items or Data objects + as prescribed by the header of the table. + + New records can be added using the Record class, for which a template generator is provided by the class. + + DataReaders access the content of this class, they need to register Events to get notified + of new data insertion or of content erasure. + ''' + + class Record(object): + ''' + @author: steger, jozsef + @summary: + This class represents a given set of records that can be appended to a Data table. + It provides useful methods manipulate data within the record. + ''' + def __init__(self, unitmanager, dataheader, size = 1): + ''' + @summary: Constructor + @param unitmanager: necessary to handle conversion + @type unitmanager: UnitManager + @param dataheader: the record conforms to the data header provided here + @type dataheader: DataHeader + @param size: the number of items to handle at once, default is 1 + @type size: integer + ''' + self.um = unitmanager + self.record = {} + self.units = {} + self.subheaders = {} + self.subrecords = {} + self.size = size + self.names = dataheader._cellnames + for name, cell in dataheader._cells.iteritems(): + if isinstance(cell, DataHeaderCell): + self.record[name] = [ None ] * self.size + self.units[name] = cell.unit + elif isinstance(cell, DataHeader): + self.subheaders[name] = cell + else: + raise DataError("Data header declaration is wrong") + + def __str__(self): + return "<DataRecord %s, size: %d (%s; %s)>: " % (id(self), self.size, ','.join(self.record.keys()), ','.join(self.subheaders.keys())) + + def clear(self, size = None): + ''' + @summary: Clean the record containers and optionally resize the container + @note: if DataRecord container is resized, sub record pointers are invalidated + @param size: the requested new size of the container, default is None, which means keep the original size + @type size: integer + ''' + if size is None: + for name in self.record.keys(): + self.record[name] = [ None ] * self.size + if self.subrecords.has_key(name): + for r in self.subrecords[name]: + r.clear() + else: + self.size = size + for name in self.record.keys(): + self.record[name] = [ None ] * self.size + self.subrecords.clear() + + def getRecordTemplates(self, name, sizes = None): + ''' + @summary: Sub record templates are pointing to table valued cells. This method allocates container to those data structures. + @param name: the column name, that point to table valued columns + @type name: string + @param sizes: a list of integers that indicate the sizes of each sub tables. Default is None, which means the allocation of single row containers + @type sizes: list/tuple of integers or None + @return: a list of Record containers with size items + @rtype: a list of Record + @raise DataError: column name not found / wrong record sizes + ''' + if sizes == None: + sizes = [1] * self.size + if len(sizes) != self.size: + raise DataError("wrong record sizes requested") + if not self.subheaders.has_key(name): + raise DataError("Cannot find column name: %s" % name) + hdr = self.subheaders[name] + self.subrecords[name] = [] + while len(sizes): + self.subrecords[name].append( Data.Record(unitmanager = self.um, dataheader = hdr, size = sizes.pop(0)) ) + return self.subrecords[name] + + def update(self, name, values, unit = None): + ''' + @summary: Update a the column with the new value and make sure the unit is converted to the current unit of the model + @param name: the name of the column + @type name: string + @param values: a list of data values to update the cells + @type values: list + @param unit: the unit of the values in the list, default is None, which means it is the same as the current unit stated in the unit model + @type unit: string or None + @raise DataError: missing column name / table valued cells / size mismatch + ''' + if not self.record.has_key(name): + raise DataError("Record has no column named %s" % name) + if not self.units.has_key(name): + raise DataError("Cannot update column named %s (table valued cells)" % name) + if len(values) != self.size: + raise DataError("The size of values don't match expected %d and got %d" % (len(values), self.size)) + if unit is None: + self.record[name] = values[:] + elif isinstance(unit, UnitManager.Unit): + myunit = self.units[name] + if unit == myunit: + self.record[name] = values[:] + else: + self.record[name] = [ self.um.convert(value = quantity, from_unit = unit, to_unit = myunit) for quantity in values ] + else: + raise DataError("wrong type of unit") + + def updateMany(self, names, values, units = None): + ''' + @summary: Update more columns with a single call + @param names: a list of the non-table valued columns to update + @type names: list/tuple of string + @param values: a matrix of data values + @type values: list of list of value + @param units: a list of units corresponding to each columns, default is None, meaning everything is expected to be in the current unit + @type units: list/tuple of sting or None + @raise DataError: size mismatch / unknown column name + ''' + names = list(names) + if len(values) != self.size: + raise DataError("The size of values don't match %d" % self.size) + for name in names: + if not self.record.has_key(name): + raise DataError("Record has no column named %s" % name) + transpose = dict( map(lambda n: (n, []), names) ) + s = len(names) + idxs = range(s) + while len(values): + value = values.pop(0) + if len(value) == s: + for idx in idxs: + transpose[names[idx]].append(value.pop(0)) + else: + raise DataError("Record size does not match") + if units is None: + units = [ None ] * s + else: + units = list(units) + while len(names): + name = names.pop(0) + unit = units.pop(0) + self.update(name = name, values = transpose[name], unit = unit) + + def extract(self): + ''' + @summary: Extract values stored in this record represented in a list in the order of names + @return: a list of values + @rtype: list + ''' + retval = [] + idx = 0 + while idx < self.size: + rec = [] + for name in self.names: + if self.record.has_key(name): + rec.append( self.record[name][idx] ) + elif self.subrecords.has_key(name): + rec.append( self.subrecords[name][idx].extract() ) + idx += 1 + retval.append(tuple(rec)) + return retval + + def __init__(self, unitmanager, header): + ''' + @summary: Constructor + @param unitmanager: necessary to handle conversion + @type unitmanager: UnitManager + @param header: the header declaration of the data table + @type header: DataHeader + @raise DataError: raised upon wrong table header is given + ''' + if not isinstance(header, DataHeader): + raise DataError("attempt to allocate table with a wrong header") + self.um = unitmanager + self.header = header + self._rawrecords = [] + self._chunks = [] + self.readlock = RLock() + self.writelock = RLock() + self.evExpanded = Event() + self.evCleared = Event() + + def __str__(self): + ''' + @summary: returns the name of the table and the python object id + @return: abbreviated representation of the table + @rtype: string + ''' + return "<Data: %s %s>" % (self.header.name, id(self)) + + def __len__(self): + return len(self._rawrecords) + + def __getitem__(self, k): + return self._rawrecords.__getitem__(k) + + @property + def name(self): + ''' + @summary: the name of the data is defined by the header + @return: the name of the header + @rtype: string + ''' + return self.header.name + + @property + def tail(self): + ''' + @summary: Tail property indicates how many new records have been saved to the table in the last call + @return: number of new records + @rtype: integer + ''' + try: + return self._chunks[-1] + except IndexError: + return 0 + + def getTemplate(self, size = 1): + ''' + @summary: Generate a helper class to extend the table with new values + @param size: the size of the new records wished to handle together, default is 1 + @type size: integer + @return: an empty row with the structure dictated by the header of the table + @rtype: Record + ''' + return self.Record(unitmanager = self.um, dataheader = self.header.getHeader(self.header.name), size = size) + + def saveRecord(self, record): + ''' + @summary: append values stored in the record to the table + @param record: a record with new data values + @type record: DataRecord + ''' + #TODO: check if record is not corrupted + newrecords = record.extract() + with self.writelock: + self._rawrecords.extend( newrecords ) + self._chunks.append( len(newrecords) ) + self.evExpanded.set() + + def clear(self): + ''' + @summary: delete all data records stored + ''' + with self.writelock: + self._rawrecords = [] + self._chunks = [] + self.evCleared.set() + diff --git a/Monitoring/MonitoringService/DataProcessing/DataError.py b/Monitoring/MonitoringService/DataProcessing/DataError.py new file mode 100644 index 0000000..4933f85 --- /dev/null +++ b/Monitoring/MonitoringService/DataProcessing/DataError.py @@ -0,0 +1,29 @@ +''' +Created on Dec 20, 2012 + +@author: steger, jozsef +@organization: ELTE +@contact: steger@complex.elte.hu +''' + +class DataError(Exception): + pass + +class PrefixError(DataError): + pass + +class UnitError(DataError): + pass + +class DimensionError(DataError): + pass + +class ParameterError(DataError): + pass + +class SamplerError(DataError): + pass + +class AggregatorError(DataError): + pass + diff --git a/Monitoring/MonitoringService/DataProcessing/DataFormatter.py b/Monitoring/MonitoringService/DataProcessing/DataFormatter.py new file mode 100644 index 0000000..1871ab4 --- /dev/null +++ b/Monitoring/MonitoringService/DataProcessing/DataFormatter.py @@ -0,0 +1,155 @@ +''' +Created on 08.08.2011 + +@author: steger +''' +from DataProcessing.DataReader import DataReader +from DataProcessing.Data import Data +from DataProcessing.DataError import DataError + +class Formatter(object): + def __init__(self, datasource): + ''' + Constructor + ''' + self.source = datasource + self.reader = DataReader(datasource) + + def _cell(self): + raise DataError("Implement _cell() method") + + def header(self): + raise DataError("Implement header() method") + + def serialize(self): + raise DataError("Implement serialize() method") + + @property + def name(self): + return self.source.name + + @property + def sourceExpanded(self): + return self.reader.sourceExpanded.isSet() + + @property + def sourceCleared(self): + return self.reader.sourceCleared.isSet() + +class JsonFormatter(Formatter): + ''' + @summary: + Serialize Data in JSON format + ''' + + def _cell(self, c): + ''' + @summary: serialize a column in JSON format + ''' + try: + feature = "\n \"FEATURE\": \"%s\"," % c.feature + except: + feature = "" + score = c.unit.reference.count('_') + if score == 0: + ret = """{%s + "NAME" : "%s", + "DIMENTSION" : "%s", + "UNIT" : "%s" + }""" % (feature, c.name, c.dimension.name, c.unit.reference) + elif score == 1: + prefix, base = c.unit.reference.split('_') + ret = """{%s + "NAME" : "%s", + "DIMENTSION" : "%s", + "PREFIX" : "%s", + "UNIT" : "%s" + }""" % (feature, c.name, c.dimension.name, prefix, base) + else: + ret = "ERROR: %s" % c + return ret + + def header(self): + ''' + @summary: serialize full header + ''' + return """{ + "NAME" : "DataHeader %s", + "HDRINFO" : [ + %s + ] + }""" % (id(self.source._data.header), ",\n ".join([ self._cell(c) for c in self.reader.headercells() ])) + + def serialize(self): + ''' + @summary: serialize the header and the new lines of the table into JSON format + @return: formatted string representation of the table + @rtype: string + ''' + self.source.process() + if self.sourceCleared: + self.reader.rewind() + if not self.sourceExpanded: + return "" + r = [] + for rec in self.reader: + st = [] + for d in rec: + if isinstance(d, Data): + #FIXME: + st.append( d._dump() ) + else: + st.append( str(d) ) + r.append("[ %s ]" % ", ".join(st)) + return """{ + "TYPE" : "%s", + "ID" : %d, + "HDR" : %s, + "DATA" : [ + %s + ] +}""" % (self.source._data.header.name, id(self), self.header(), ",\n ".join(r)) + +class DumbFormatter(Formatter): + ''' + @summary: + Serialize Data in a trivial format + ''' + + def _cell(self, c): + ''' + @summary: serialize column + ''' + try: + return "%s (%s/%s) [%s]" % (c.name, c.feature, c.dimension.name, c.unit) + except: + return "%s (/%s) [%s]" % (c.name, c.dimension.name, c.unit) + + def header(self): + ''' + @summary: serialize full header + ''' + return "<DataHeader %s>: {%s: [%s]}" % (id(self.source._data.header), self.name, ", ".join([ self._cell(c) for c in self.reader.headercells() ])) + + def serialize(self): + ''' + @summary: serialize the header and the new lines of the table + @return: formatted string representation of the table, if no new data are ready the empty string is returned + @rtype: string + ''' + self.source.process() + if self.sourceCleared: + self.reader.rewind() + if not self.sourceExpanded: + return "" + r = [] + for rec in self.reader: + st = [] + for d in rec: + if isinstance(d, Data): + #FIXME: + st.append( d._dump() ) + else: + st.append( str(d) ) + r.append("(%s)" % ", ".join(st)) + return "{%s:\nHDR:%s\n DATA:[\n%s\n]}" % (str(self), self.header(), ", \n".join(r)) diff --git a/Monitoring/MonitoringService/DataProcessing/DataHeader.py b/Monitoring/MonitoringService/DataProcessing/DataHeader.py new file mode 100644 index 0000000..722094e --- /dev/null +++ b/Monitoring/MonitoringService/DataProcessing/DataHeader.py @@ -0,0 +1,200 @@ +''' +Created on Sep 1, 2011 + +@author: steger, jozsef +@organization: ELTE +@contact: steger@complex.elte.hu +@author: laki, sandor +''' + +from DataProcessing.Dimension import DimensionManager +from DataProcessing.DataError import DataError +from DataProcessing.DataHeaderCell import DataHeaderCell, CellRequestByName,\ + CellRequestByFeature + +class DataHeader(object): + ''' + @author: steger, jozsef + @summary: + This class represents the full header of a table. + One can construct the header as a single step, + if they provide a header description or they can use + methods to add new columns. + + In order to be able to represent a wide variety of data and relationship + between them, a column can refer to another table. + In that latter case a specific column refer to another DataHeader. + ''' + + def __init__(self, name): + ''' + @summary: Constructor + @param name: the name of the table + @type name: string + @raise DataError: corrupt header description + ''' + self._name = name + self._cellnames = [] + self._cells = {} + + def __iter__(self): + for cn in self._cellnames: + yield self._cells[cn] + + def __len__(self): + ''' + @summary: Return the number of columns + @return: the number of columns currently set + @rtype: integer + ''' + return len(self._cellnames) + + def __eq__(self, header): + ''' + @summary: Comparison operator of table headers. + Two tables are declared equal, if all the columns' names and their unit models are the same. + Two headers are still regarded equal if the order of their columns are different + or the current unit of the corresponding columns are not the same. + @raise DataError: if not DataHeader instances are compared + @return: True if both the header name and all columns match, their order may vary + @rtype: boolean + ''' + if not isinstance(header, DataHeader): + raise DataError("wrong type to compare") + if self.name != header.name: + return False + if len(self._cellnames) != len(header._cellnames): + return False + if self._cells.keys() != header._cells.keys(): + return False + for n in self._cellnames: + if self._cells[n] != header._cells[n]: + return False + return True + + def __ne__(self, header): + ''' + @summary: comparison operator of table headers. + @return: True if tables headers differ + @rtype: boolean + ''' + return not self.__eq__(header) + + @property + def name(self): + return self._name + + def has_name(self, name): + ''' + @summary: Check for the existence of a given column name + @param name: the name of the column looking for + @type name: string + @return: true if such a name exists + @rtype: boolean + ''' + return name in self._cellnames + + def addColumn(self, cell): + ''' + @summary: Append a new column at the end of the current table header structure + @param cell: pointer to the header of the new column + @type cell: DataHeader or DataHeaderCell + @raise DataError: cell is of a wrong type + ''' + ishdr = isinstance(cell, DataHeader) + if ishdr or isinstance(cell, DataHeaderCell): + name = cell.name + if self.has_name(name): + raise DataError("attempt to add a column with an already existing name (%s)" % cell.name) + self._cells[name] = cell + self._cellnames.append(name) + else: + raise DataError("attempt to add a wrong type of header cell") + + def removeColumn(self, name): + ''' + @summary: remove a named column if it exists in the header. Otherwise do silently nothing + @param name: the name of the column to remove + @type name: string + ''' + if self.has_name(name): + self._cells.pop(name) + self._cellnames.pop(self._cellnames.index(name)) + + def getHeader(self, name): + ''' + @summary: Return a pointer to the named sub header in the naming hierarchy + @param name: the name of the sub header searched + @type name: string + @raise DataError: name not found + ''' + if name.count('.') == 0: + if name == self.name: + return self + if self.has_name(name) and isinstance(self._cells[name], DataHeader): + return self._cells[name] + elif name.count('.') == 1: + n_pre, n_post = name.split('.', 1) + if n_pre == self.name and self.has_name(n_post) and isinstance(self._cells[n_post], DataHeader): + return self._cells[n_post] + else: + n_pre, n, n_post = name.split('.', 2) + if n_pre == self.name and self.has_name(n) and isinstance(self._cells[n], DataHeader): + return self._cells[n].getHeader(n_post) + raise DataError("Lost in the naming hierarchy: %s < %s" % (self.name, name)) + +#FIXME: complex table lookup is not implemented + def getCell(self, cellrequest): + ''' + @summary: Return the index and the cell referenced by a name + @param cellrequest: + @type name: CellRequest + @return: index and the cell + @rtype: (int, Cell) + @raise DataError: name not found + ''' + if isinstance(cellrequest, CellRequestByName): + name = cellrequest.name + try: + yield (self._cellnames.index(name), self._cells[name]) + except: + DataError("Cell with name %s not found" % name) + elif isinstance(cellrequest, CellRequestByFeature): + for c in self: + try: + if cellrequest == c: + yield (self._cellnames.index(c.name), c) + except DataError: + continue + else: + raise DataError("wrong request type") + + + +class DataHeaderGeneratedByDescription(DataHeader): + def __init__(self, name, headerdescription): + ''' + @summary: Constructor + @param name: the name of the table + @type name: string + @param headerdescription: the description of a full table header + @param headerdescription: list or None + @raise DataError: corrupt header description + ''' + DataHeader.__init__(self, name) + for item in headerdescription: + if len(item) == 2: + name, description = item + unit = None + else: + name, description, unit = item + if self.has_name(name): + raise DataError("Duplicate column name declaration (%s)" % name) + if description is None or isinstance(description, DimensionManager.Dimension): + cell = DataHeaderCell(name = name, dimension = description, unit = unit) + self.addColumn(cell) + elif isinstance(description, list): + hdr = DataHeaderGeneratedByDescription(name = name, headerdescription = description) + self.addColumn(hdr) + else: + raise DataError("corrupt header description (%s)" % name) diff --git a/Monitoring/MonitoringService/DataProcessing/DataHeaderCell.py b/Monitoring/MonitoringService/DataProcessing/DataHeaderCell.py new file mode 100644 index 0000000..f3bdb16 --- /dev/null +++ b/Monitoring/MonitoringService/DataProcessing/DataHeaderCell.py @@ -0,0 +1,180 @@ +''' +Created on Dec 20, 2012 + +@author: steger, jozsef +@organization: ELTE +@contact: steger@complex.elte.hu +''' + +from DataProcessing.Dimension import DimensionManager +from DataProcessing.DataError import DataError +from DataProcessing.Unit import UnitManager + +class Cell(object): + ''' + @summary: This class is a skeleton to represent the meta information of a single table column. + It combines the following information: + - the name of the cell, + - the feature associated to the underlying data, + - the dimension of the underlying data, + - the unit of the underlying data, + @ivar name: the name of the cell + @type name: str + @ivar dimension: the dimension of the cell + @type dimension: L{Dimension} + @ivar unit: the unit of a cell, if not set, the default unit of the dimension is applied + @type unit: L{Unit} + @ivar feature: the metric of the column + @type feature: str + ''' + def __init__(self): + self._name = None + self._dimension = None + self._unit = None + self._feature = None + + @property + def name(self): + if self._name is None: + raise DataError("name property is not set") + return self._name + @name.setter + def name(self, name): + if not isinstance(name, basestring): + raise DataError("name is not a string") + if name.count('.'): + raise DataError("name must not contain any periods (%s)" % name) + if self._name is not None and self._name != name: + raise DataError("name property cannot be modified") + self._name = name + + @property + def dimension(self): + if not self._dimension: + raise DataError("dimension property is not set") + return self._dimension + @dimension.setter + def dimension(self, dimension): + if not isinstance(dimension, DimensionManager.Dimension): + raise DataError("dimension is invalid") + if self._unit is not None: + if not dimension.containsUnit(self._unit): + raise DataError("unit %s is not in the basin of dimension %s" % (self.unit, dimension)) + self._dimension = dimension + + @property + def unit(self): + if self._unit is None: + return self.dimension.unit + else: + return self._unit + @unit.setter + def unit(self, unit): + if not isinstance(unit, UnitManager.Unit): + raise DataError("unit is invalid") + if self._dimension is not None: + if not self.dimension.containsUnit(unit): + raise DataError("unit %s is not in the basin of dimension %s" % (unit, self.dimension)) + self._unit = unit + + @property + def feature(self): + if self._feature is None: + raise DataError("feature property is not set") + return self._feature + @feature.setter + def feature(self, feature): + if self._feature is not None and self._feature != feature: + raise DataError("feature property cannot be modified") + self._feature = feature + + def __eq__(self, cell): + ''' + @summary: comparison operator of two columns' meta + @return: True if column names, features, units and dimensions match + @rtype: bool + ''' + if not isinstance(cell, Cell): + raise DataError("type error expecting Cell for comparison") + return self._name == cell._name and self._feature == cell._feature and self._unit == cell._unit and self._dimension == cell._dimension + + def __ne__(self, cell): + ''' + @summary: comparison operator of two columns' meta + @return: True if column names or their units differ + @rtype: bool + ''' + return not self.__eq__(cell) + +class DataHeaderCell(Cell): + ''' + @summary: represents meta information of a single column + ''' + def __init__(self, name, dimension, feature = None, unit = None): + ''' + @summary: constructor + @param name: the nema of the cell + @type name: str + @param dimension: the dimension of the cell + @type dimension: L{Dimension} + @param feature: pointer if it is a monitoring feature + @param unit: indicates the unit of a column if it is different from the default + @type unit: L{Unit} + ''' + Cell.__init__(self) + self.name = name + self.dimension = dimension + if unit is not None and unit != dimension.unit: + self.unit = unit + if feature is not None: + self.feature = feature + +class CellRequest(Cell): + ''' + @summary: skeleton, which is used to search the among meta information. It is basically a cell with missing certain details + ''' + pass + +class CellRequestByName(CellRequest): + ''' + @summary: This class represents the user request for a data column matching the name of the column. + One can specify the requested unit. + ''' + def __init__(self, name, unit = None): + ''' + @summary: Constructor + @param name: the name of the requested column + @type name: str + @param unit: the requested unit, default is None, which means no conversion request + @type unit: L{Unit} or None + ''' + Cell.__init__(self) + self.name = name + if unit is not None: + self.unit = unit + + def __eq__(self, cell): + return self.name == cell.name + +class CellRequestByFeature(CellRequest): + ''' + @author: steger, jozsef + @summary: + This class represents the user request for a data column(s) matching the feature of the column. + One can also specify the requested unit. + ''' + def __init__(self, feature, unit = None): + ''' + @summary: Constructor + @param feature: the feature of the requested column + @type feature: str + @param unit: the requested unit, default is None, which means no conversion request + @type unit: L{Unit} or None + ''' + Cell.__init__(self) + self.feature = feature + if unit is not None: + self.unit = unit + + def __eq__(self, cell): + return self.feature == cell.feature diff --git a/Monitoring/MonitoringService/DataProcessing/DataIndex.py b/Monitoring/MonitoringService/DataProcessing/DataIndex.py new file mode 100644 index 0000000..84d8390 --- /dev/null +++ b/Monitoring/MonitoringService/DataProcessing/DataIndex.py @@ -0,0 +1,39 @@ +''' +Created on Dec 31, 2012 + +@author: steger +''' +from DataProcessing.DataReader import DataReader + +class DataIndex(DataReader): + ''' + classdocs + ''' + + def __init__(self, datasource, key): + ''' + Constructor + ''' + DataReader.__init__(self, datasource) + self.indexmap = {} + self.extract(cellrequest = key) + + def buildindex(self): + i = len(self.indexmap) + for k in self: + self.indexmap[tuple(k)] = i + i += 1 + + def __getitem__(self, k): + if self.sourceCleared.isSet(): + self.sourceCleared.clear() + self.indexmap.clear() + self.buildindex() + try: + iter(k) + except TypeError: + k = (k,) + if not self.indexmap.has_key(k) and self.sourceExpanded.isSet(): + self.sourceExpanded.clear() + self.buildindex() + return self.source._rawrecords[ self.indexmap[k] ]
\ No newline at end of file diff --git a/Monitoring/MonitoringService/DataProcessing/DataReader.py b/Monitoring/MonitoringService/DataProcessing/DataReader.py new file mode 100644 index 0000000..b9734e7 --- /dev/null +++ b/Monitoring/MonitoringService/DataProcessing/DataReader.py @@ -0,0 +1,169 @@ +''' +Created on Nov 19, 2012 + +@author: steger +''' +from threading import Event, Lock +from DataProcessing.DataHeader import DataError +from DataProcessing.DataSource import DataSource + +#FIXME: docs +class DataReader(object): + ''' + This class is an extension to the DataSource class. + It provides an iterator over the rows / records of the DataSource. + When the iterator is invoked several times only new records are yielded. + In order to access rows, which have already been iterated, use the rewind() method to move the pointer to the first record. + + By default iteration yields all columns. + In case user is interested in a specific slice of the table (or wants to retrieve row records on a different column order), + they can do so by invoking the extract method, which expects an ordered list of the interesting column names. + Besides the column names user may indicate the requested unit, in which case iteration will yield properly transformed data. + + DataReader objects register clear and expand events in the underlying DataSource class instance in order to catch signal upon + new data insertion or deletion. + ''' + + def __init__(self, datasource): + ''' + Constructor + @param datasource: the + @type datasource: DataSource + ''' + if not isinstance(datasource, DataSource): + raise DataError("Expect DataSource, got %s" % datasource) + self.source = datasource + self.sourceCleared = Event() + self.sourceExpanded = Event() + self.readlock = Lock() + datasource.registerReader(self) + self._seq = 0 + self._extractmap = None + self._conversionmap = None + self.extract() + + def __del__(self): + self.source.deregisterReader(self) + + @property + def processedrecords(self): + ''' + @summary: This property indicates how many records are processed by this reader + @return: the index of the record iterator + @rtype: integer + @note: the current value may be unreliable if an iteration is currently carried out + ''' + return self._seq + @processedrecords.setter + def processedrecords(self, index): + ''' + @summary: set the iterator to a given position. A negative index means rewinding by that many rows + @param index: position description + @type index: integer + ''' + index = int(index) + if index < 0: + self._seq = max(0, self._seq + index) + else: + self._seq = min(index, len(self.source)) + @processedrecords.deleter + def processedrecords(self): + ''' + @summary: rewind to the first record row + ''' + self._seq = 0 + + def rewind(self): + ''' + @summary: sets the next row record to the first item. + ''' + del self.processedrecords +# self.sourceCleared.clear() + +#FIXME: DataSampleReader!!! + def __iter__(self): + with self.readlock: + self.sourceCleared.clear() + while self._seq < len(self.source): + if self.sourceCleared.isSet(): + raise DataError("Data cleared while reading records %s %s" % (self, self.source)) + self._seq += 1 + yield self._extract(self._seq - 1) + self.sourceExpanded.clear() + raise StopIteration + + def sourcecleared(self): + with self.source.writelock: + self.sourceCleared.set() + + def sourceexpanded(self): + with self.source.writelock: + self.sourceExpanded.set() + +#FIXME: Sample specifik + def headercells(self): + ''' + @summary: iterator over those columns of the Data which are relevant (i.e. which are extracted) + @return: generator + @rtype: DataHeaderCell + ''' + meta = self.source._data.header + for i in self._extractmap: + cellname = meta._cellnames[i] + yield meta._cells[cellname] + + def extract(self, cellrequest = None): + ''' + @summary: Presets the iterator to the first row record and selects only those columns to show and convert who are referenced in the cell request. + This method works in a best effort manner, those column names that are not in this data table are silently omitted. + Also in case the unit requested is not allowed by a unit model that column of data is silently ignored. + @param cellrequest: the list of the column names and the corresponding unit to show during iteration, default is None which means show all columns without unit conversion + @type cellrequest: list of CellRequest + ''' + self._seq = 0 + meta = self.source._data.header + if cellrequest is None: + s = len(meta._cellnames[:]) + self._extractmap = range(s) + self._conversionmap = [(None, None)] * s + else: + self._extractmap = [] + self._conversionmap = [] + for cellreq in cellrequest: + for (colidx, cell) in meta.getCell( cellreq ): + try: + unit = cell.unit + dimension = cell.dimension + if cellreq.unit == unit: + unitmap = (None, None) + elif dimension.containsUnit(cellreq.unit): + unitmap = (unit, cellreq.unit) + else: + raise Exception("unit %s is not in the basin of dimension %s" % (unit, cell.dimension)) + except DataError: + unitmap = (None, None) + self._extractmap.append( colidx ) + self._conversionmap.append( unitmap ) + + def _extract(self, idx): + ''' + @summary: an internal helper method that takes care of extracting and ordering the columns in the order predefined by calling the extract method. + @param idx: the row index + @type idx: integer + @return: a list of the cell data slice from the row pointed by the row index + @rtype: list + ''' + ret = [] + i = 0 + s = len(self._extractmap) + D = self.source._data + while i < s: + c = self._extractmap[i] + celldata = D[idx][c] + sourceunit, targetunit = self._conversionmap[i] + if sourceunit is None: + ret.append( celldata ) + else: + ret.append( D.um.convert(celldata, sourceunit, targetunit) ) + i += 1 + return ret diff --git a/Monitoring/MonitoringService/DataProcessing/DataSample.py b/Monitoring/MonitoringService/DataProcessing/DataSample.py new file mode 100644 index 0000000..ee7245d --- /dev/null +++ b/Monitoring/MonitoringService/DataProcessing/DataSample.py @@ -0,0 +1,54 @@ +''' +Created on Mar 4, 2013 + +@author: steger +''' +from DataProcessing.DataSource import DataSource +from DataProcessing.DataReader import DataReader + +class DataSample(DataSource): + pass + + def __init__(self, table): + ''' + Constructor + ''' + DataSource.__init__(self) + self._data = table + + def __len__(self): + return len(self._data) + + def __getitem__(self, k): + return None + + @property + def name(self): + return "Original(%s)" % self._data.name + + @property + def readerClass(self): + return DataReader + + def _process(self): + status = 0 + with self._data.readlock: + if self._data.evCleared.isSet(): + self._sourcecleared() + self._data.evCleared.clear() + status |= self.CLEARED + if self._data.evExpanded.isSet(): + self._sourceexpanded() + self._data.evExpanded.clear() + status |= self.EXPANDED + return status + + @property + def writelock(self): + return self._data.writelock + + + @property + def um(self): + return self._data.um + diff --git a/Monitoring/MonitoringService/DataProcessing/DataSource.py b/Monitoring/MonitoringService/DataProcessing/DataSource.py new file mode 100644 index 0000000..a384df0 --- /dev/null +++ b/Monitoring/MonitoringService/DataProcessing/DataSource.py @@ -0,0 +1,116 @@ +''' +Created on Dec 10, 2012 + +@author: steger +''' +from threading import RLock +from DataProcessing.DataError import DataError, SamplerError +#FIXME: import dependency problem, circular reference +#from DataProcessing.DataReader import DataReader + +class DataSource(object): + ''' +#FIXME: docs + @summary: a template to represent any data generated by a tool or derived via various operations on data + @ivar source: pointer to the origin of the data + @note: DataSource instances reference their ancestor via the source property, in a recursive manner, the last item needs to implement a read and a write lock + @ivar data: pointer to the actual data container + @note: results of operations on data yield their result in the container referenced by the data property, the original data generator's data and source properties are meant to be the same + @ivar name: a name for the data source class + @ivar um: reference to the unit model of the ancient data source + ''' + PASS = 0 + CLEARED = 1 + EXPANDED = 2 + + def __init__(self, dependency = None): + self._readers = set() + if dependency is None: + self._inputreader = None + else: + self._inputreader = self.readerClass(dependency) + self._processlock = RLock() + self._data = None + +#FIXME: import error + def registerReader(self, reader): + ''' + @summary: registers a reader to catch clear and update events + @param reader: data consumer + @type reader: DataReader + @raise DataError: wrong argument + ''' + try: +# if isinstance(reader, DataReader): + self._readers.add(reader) + if len(self): + reader.sourceexpanded() + else: + reader.sourcecleared() +# else: + except SamplerError: + pass + except: + raise + raise DataError("Expecting a DataReader, got %s" % reader) + + def deregisterReader(self, reader): + ''' + @summary: removes a registered reader + @param reader: data consumer + @type reader: DataReader + ''' + try: + self._readers.remove(reader) + except KeyError: + pass + + def __len__(self): + raise DataError("%s must implement __len__ method" % self) + + def __getitem__(self, k): + raise DataError("%s must implement __getitem__ method" % self) + + @property + def name(self): + raise DataError("%s must implement name property" % self) + + @property + def readerClass(self): + raise DataError("%s must implement readerClass property" % self) + + @property + def data(self): + self.process() + return self._data + + def process(self): + ''' + @summary: recursively process data records of the source chain + @return: status of the data processing + @rtype: int + ''' + with self._processlock: + if self._inputreader: +# print "PROC SRC", self, self._inputreader.source + self._inputreader.source.process() +# print "PROC", self + status = self._process() + if status & self.CLEARED: +# print "SRC cleared", self + self._sourcecleared() + if status & self.EXPANDED: +# print "SRC expanded", self + self._sourceexpanded() + return status + + def _process(self): + raise DataError("%s must implement _process method returning process status" % self) + + def _sourcecleared(self): + for r in self._readers: + r.sourcecleared() + + def _sourceexpanded(self): + for r in self._readers: + r.sourceexpanded() diff --git a/Monitoring/MonitoringService/DataProcessing/Dimension.py b/Monitoring/MonitoringService/DataProcessing/Dimension.py new file mode 100644 index 0000000..0365ee2 --- /dev/null +++ b/Monitoring/MonitoringService/DataProcessing/Dimension.py @@ -0,0 +1,309 @@ +''' +Created on Feb 27, 2012 + +@author: steger +''' +from Unit import UnitManager +from DataProcessing.MeasurementLevel import MeasurementLevel, Interval, Ratio +from DataProcessing.DataError import DimensionError + +class DimensionManager(object): + ''' + @summary: the dimension container + @ivar dimenstions: the container of the known dimensions + @type dimensions: dict(str: L{Dimension}) + @ivar unitmanager: reference to the unit manager + @type unitmanager: L{UnitManager} + ''' + class Dimension(object): + ''' + @summary: a skeleton class for all the dimensions handled by L{DimensionManager} + @ivar manager: back reference to the dimension manager + @type manager: L{DimensionManager} + @ivar unitmanager: a reference to the unit manager + @type unitmanager: L{UnitManager} + @ivar reference: the unique identifier of the dimension + @ivar name: the name of the dimension + @type name: str + @ivar unit: the default unit of the dimension + @type unit: L{Unit} + @ivar basin: the set of units which are valid for this dimension + @type basin: set(L{Unit}) + ''' + def __init__(self, dimensionmanager, reference, name, unit, level): + ''' + @summary: constructor + @param dimensionmanager: reference to the dimension manager + @type dimensionmanager: L{DimensionManager} + @param reference: the reference to the dimension + @type reference: str + @param unit: the default unit of the dimension + @type unit: L{Unit} + @param level: the measurement level of the dimension + @type level: L{MeasurementLevel} + @note: the level is not a class instance + @raise L{DimensionError}: Wrong type of unit / Wrong type of level + ''' + if not isinstance(unit, UnitManager.Unit): + raise DimensionError("Wrong type of unit %s" % unit) + try: + if not issubclass(level, MeasurementLevel): + raise DimensionError("Wrong type of level %s" % level) + except TypeError: + raise DimensionError("Wrong type of level %s" % level) + self._data = (dimensionmanager, reference, name, unit) + self._level = level + @property + def manager(self): + return self._data[0] + @property + def unitmanager(self): + return self._data[0].unitmanager + @property + def reference(self): + return self._data[1] + @property + def name(self): + return self._data[2] + @property + def unit(self): + return self._data[3] + @property + def basin(self): + return self.unitmanager.getBasinByUnit(self.unit) + def level(self, level): + ''' + @summary: check measurement level against the given level + @param level: measurement level + @type level: L{MeasurementLevel} + @return: True if the measurement level given as a parameter + is the same or looser than the level of the dimension + @rtype: bool + @raise L{DimensionError}: Wrong type of level + ''' + if not issubclass(level, MeasurementLevel): + raise DimensionError("Wrong type of level %s" % level) + return issubclass(self._level, level) + def __str__(self): + return "%s [%s]" % (self.name, self.unit) + def __eq__(self, d): + if not isinstance(d, DimensionManager.Dimension): + raise DimensionError("wrong type") + return self._level == d._level and self.containsUnit(d.unit) + def containsUnit(self, unit): + ''' + @summary: checks if a given unit is in the basin of this dimension + @param unit: the unit to check + @type unit: L{Unit} + @return: true if the unit is applicable for this dimension + @rtype: bool + ''' + return unit in self.unitmanager.getBasinByUnit(self.unit) + + class BaseDimension(Dimension): + ''' + @summary: a dimension axiom + ''' + pass + + class DerivedDimension(Dimension): + ''' + @summary: a skeleton for dimensions, which are deriving from other already known dimensions + ''' + def ancestors(self): + ''' + @summary: iterate over all ancestors this dimension is derived from + @return: generator over ancestors + @rtype: L{Dimension} + ''' + for d in self._ancestor: + yield d + + class DifferenceDimension(DerivedDimension): + ''' + @summary: a dimension defined by subtracting two individuals of a known dimension + ''' + def __init__(self, dimensionmanager, reference, name, unit, derivedfrom): + ''' + @summary: constructor + @param dimensionmanager: reference to the dimension manager + @type dimensionmanager: L{DimensionManager} + @param reference: the reference to the dimension + @param unit: the default unit of the dimension + @type unit: L{Unit} + @param derivedfrom: the ancestor dimension this dimension is derived from + @type derivedfrom: L{Dimension} + @raise L{DimensionError}: Wrong type of derivedfrom + ''' + if not isinstance(derivedfrom, DimensionManager.Dimension): + raise DimensionError("Wrong type of derivedfrom") + if not derivedfrom.level(Interval): + raise DimensionError("Cannot subtract %s" % derivedfrom) + DimensionManager.Dimension.__init__(self, dimensionmanager, reference, name, unit, Ratio) + self._ancestor = derivedfrom + + class PowerDimension(DerivedDimension): + ''' + @summary: a dimension defined by raising an existing dimension to a given power + @ivar exponent: the power + @type exponent: int + ''' + def __init__(self, dimensionmanager, reference, name, unit, derivedfrom, exponent): + ''' + @summary: constructor + @param dimensionmanager: reference to the dimension manager + @type dimensionmanager: L{DimensionManager} + @param reference: the reference to the dimension + @param unit: the default unit of the dimension + @type unit: L{Unit} + @param derivedfrom: the ancestor dimension this dimension is derived from + @type derivedfrom: L{Dimension} + @param exponent: dimension is a derivative of the derivedfrom dimension, by raising to power exponent + @type exponent: int + @raise DimensionError: Wrong type of derivedfrom / Cannot power + ''' + if not isinstance(derivedfrom, DimensionManager.Dimension): + raise DimensionError("Wrong type of derivedfrom") + if not derivedfrom.level(Ratio): + raise DimensionError("Cannot power %s" % derivedfrom) + DimensionManager.Dimension.__init__(self, dimensionmanager, reference, name, unit, Ratio) + self._ancestor = (derivedfrom,) + self._exponent = exponent + @property + def exponent(self): + return self._exponent + + class ProductDimension(DerivedDimension): + ''' + @summary: dimension defined by multiplying at least two known different dimensions + ''' + def __init__(self, dimensionmanager, reference, name, unit, derivedfrom): + ''' + @summary: constructor + @param dimensionmanager: reference to the dimension manager + @type dimensionmanager: L{DimensionManager} + @param reference: the reference to the dimension + @param unit: the default unit of the dimension + @type unit: L{Unit} + @param derivedfrom: the set of dimensions that compose this dimension + @type derivedfrom: tuple(L{Dimension}) + @raise L{DimensionError}: Wrong type of derivedfrom / ProductDimension is derived from more than 2 Dimensions / Cannot be a factor + ''' + if not isinstance(derivedfrom, tuple): + raise DimensionError("Wrong type of derivedfrom") + if len(derivedfrom) < 2: + raise DimensionError("ProductDimension is derived from more than 2 Dimensions, got %d instead" % len(derivedfrom)) + for d in derivedfrom: + if not d.level(Ratio): + raise DimensionError("%s cannot be a factor" % d) + DimensionManager.Dimension.__init__(self, dimensionmanager, reference, name, unit, Ratio) + self._ancestor = derivedfrom + + class RatioDimension(DerivedDimension): + ''' + @summary: dimension defined by dividing two known dimensions + ''' + def __init__(self, dimensionmanager, reference, name, unit, derivedfrom): + ''' + @summary: constructor + @param dimensionmanager: reference to the dimension manager + @type dimensionmanager: L{DimensionManager} + @param reference: the reference to the dimension + @param unit: the default unit of the dimension + @type unit: L{Unit} + @param derivedfrom: the set of dimensions that compose this dimension + @type derivedfrom: tuple(L{Dimension}) + @raise L{DimensionError}: Wrong type of derivedfrom / Cannot be a factor + ''' + if not isinstance(derivedfrom, DimensionManager.Dimension): + raise DimensionError("Wrong type of derivedfrom") + if not derivedfrom.level(Ratio): + raise DimensionError("%s cannot be a factor" % derivedfrom) + DimensionManager.Dimension.__init__(self, dimensionmanager, reference, name, unit, Ratio) + self._ancestor = (derivedfrom,) + + def __init__(self, unitmanager): + ''' + @summary: constructor + @param unitmanager: the unit manager needs to be referenced, to check the basins of a unit + @type unitmanager: L{UnitManager} + ''' + self.dimensions = {} + self.unitmanager = unitmanager + + def __len__(self): + ''' + @summary: the number of dimension known by the L{DimensionManager} + @return: the number of dimension known by the L{DimensionManager} + @rtype: int + ''' + return len(self.dimensions) + + def __iter__(self): + ''' + @summary: an iterator over known dimensions + @return: the next known dimension + @rtype: L{Dimension} + ''' + for d in self.dimensions.values(): + yield d + + def newBaseDimension(self, reference, name, unit, level): + ''' + @summary: generate a new dimension + @param reference: the reference to the dimension + @type reference: str + @param unit: the default unit of the dimension + @type unit: L{Unit} + @param level: the measurement level of the dimension + @type level: L{MeasurementLevel} + @note: the level is not a class instance + @return: the new dimension + @rtype: L{Dimension} + @raise L{DimensionError}: Dimension with reference already exists / Wrong type of unit / Wrong type of level / Wrong type of dimension / + Expecting derivedfrom set / Wrong number of derived from Dimensions + ''' + if self.dimensions.has_key(reference): + raise DimensionError("Dimension with reference %s already exists" % reference) + dimension = self.BaseDimension(self, reference, name, unit, level) + self.dimensions[reference] = dimension + return dimension + + def newDerivedDimension(self, reference, name, unit, derivedfrom, dimtype, **kw): + ''' + @summary: generate a new dimension + @param reference: the reference to the dimension + @param unit: the default unit of the dimension + @type unit: L{Unit} + @param derivedfrom: the set of dimensions that compose this dimension + @type derivedfrom: tuple(L{Dimension}) or L{Dimension} + @param dimtype: possible dimension types are L{DifferenceDimension}, L{PowerDimension}, L{ProductDimension}, L{RatioDimension} + @note: dimtype parameter is not an instance, but a class scheme + @type dimtype: L{Dimension} + @return: the new dimension + @rtype: L{Dimension} + @keyword kw: L{PowerDimension} expects an integer valued parameter: exponent + @raise L{DimensionError}: Dimension with reference already exists / Wrong type of dimension + ''' + if self.dimensions.has_key(reference): + raise DimensionError("Dimension with reference %s already exists" % reference) + if issubclass(dimtype, self.DifferenceDimension)or issubclass(dimtype, self.ProductDimension) or issubclass(dimtype, self.RatioDimension): + dimension = dimtype(self, reference, name, unit, derivedfrom) + elif issubclass(dimtype, self.PowerDimension): + dimension = dimtype(self, reference, name, unit, derivedfrom, kw.get('exponent')) + else: + raise DimensionError("Wrong type of dimension %s" % dimtype) + self.dimensions[reference] = dimension + return dimension + + def __getitem__(self, reference): + ''' + @summary: look up the prefix in the DimensionManager based on its reference + @param reference: the reference to the dimension + @return: the dimension if found + @rtype: L{Dimension} + @raise L{DimensionError}: Dimension with reference not found + ''' + if not self.dimensions.has_key(reference): + raise DimensionError("Dimension with reference %s not found" % reference) + return self.dimensions[reference] diff --git a/Monitoring/MonitoringService/DataProcessing/LinearCombination.py b/Monitoring/MonitoringService/DataProcessing/LinearCombination.py new file mode 100644 index 0000000..5174b58 --- /dev/null +++ b/Monitoring/MonitoringService/DataProcessing/LinearCombination.py @@ -0,0 +1,42 @@ +''' +Created on Mar 21, 2013 + +@author: steger +''' +from DataProcessing.DataSource import DataSource +from DataProcessing.Aggregator import Aggregator +from DataProcessing.DataError import DataError + +class LinearCombination(DataSource): + ''' + classdocs + ''' + + def __init__(self): + ''' + Constructor + ''' + DataSource.__init__(self, dependency = None) + self._terms = [] + self._value = None + + def addTerm(self, factor, aggregate): + if not isinstance(aggregate, Aggregator): + raise DataError("Wrong type of term") + self._terms.append((factor, aggregate)) + + @property + def name(self): + return "BLA" + + @property + def value(self): + self.process() + return self._value + + def process(self): + result = 0 + for factor, aggregate in self._terms: + term = aggregate.aggregate + result += factor * term + self._value = result
\ No newline at end of file diff --git a/Monitoring/MonitoringService/DataProcessing/MeasurementLevel.py b/Monitoring/MonitoringService/DataProcessing/MeasurementLevel.py new file mode 100644 index 0000000..4e3d702 --- /dev/null +++ b/Monitoring/MonitoringService/DataProcessing/MeasurementLevel.py @@ -0,0 +1,46 @@ +''' +Created on Mar 22, 2012 + +@author: steger, jozsef + +@summary: Class representation of the measurement levels (aka measurement scale) defined by Stanley Smith Stevens. +Stevens proposed his theory in a 1946 Science article titled "On the theory of scales of measurement". +@note: These classes are not meant to be instantiated ever. +''' + +class MeasurementLevel: + ''' + @summary: It serves as the common scheme for the measurement levels. Only its subclasses have a meaning. + ''' + pass + +class Nominal(MeasurementLevel): + ''' + @summary: Values of this kind of measurement are mere elements of a set. + ''' + pass + +class Ordinal(Nominal): + ''' + @summary: A ranking is defined between the values of this kind of measurement. + ''' + pass + +class Interval(Ordinal): + ''' + @summary: A difference is defined which can be evaluated for any two values of this kind of measurement. + ''' + pass + +class Ratio(Interval): + ''' + @summary: There is a reference value defined for this kind of measurement, that is "zero" has a meaning. + ''' + pass + +lut_level = { + 'NominalLevel': Nominal, + 'OrdinalLevel': Ordinal, + 'IntervalLevel': Interval, + 'RatioLevel': Ratio, +} diff --git a/Monitoring/MonitoringService/DataProcessing/Parameter.py b/Monitoring/MonitoringService/DataProcessing/Parameter.py new file mode 100644 index 0000000..2f48fa1 --- /dev/null +++ b/Monitoring/MonitoringService/DataProcessing/Parameter.py @@ -0,0 +1,288 @@ +''' +Created on Oct 20, 2011 + +@author: steger, jozsef +@organization: ELTE +@contact: steger@complex.elte.hu +''' + +from DataProcessing.Dimension import DimensionManager +from DataProcessing.DataError import ParameterError + +class Parameter(object): + ''' + @author: steger, jozsef + @summary: + This class represents the control parameters of a monitoring task. + ''' + + def __init__(self, name, valuetype, unitmanager, dimension, default = None): + ''' + @summary: Constructor + @param name: the name of the parameter + @type name: str + @param valuetype: the type of the parameter (used when reading value information) + @type valuetype: type +@fixme: docs + @param default: the preset unit aware value of the parameter + @type default: a tuple of value and unit + ''' + self.um = unitmanager + if not isinstance(dimension, DimensionManager.Dimension): + raise ParameterError("wrong type of dimension") + self._data = (name, valuetype, dimension) + self._value = None + if default is not None: + self.value = default + + def __str__(self): + if self._value is None: + return "%s (%s)" % (self.name, self.dimension) + else: + return "%s (%s) = %s [%s] as %s" % (self.name, self.dimension.name, self._value[0], self._value[1], self.valuetype) + + @property + def name(self): + return self._data[0] + + @property + def valuetype(self): + return self._data[1] + + @property + def dimension(self): + return self._data[2] + + @property + def value(self): + return self._value + @value.setter + def value(self, value): + _, unit = value + if not self.dimension.containsUnit(unit): + raise ParameterError("Unit %s is not in the basin of the dimension %s" % (unit, self.dimension)) + self._value = tuple(value) + @value.deleter + def value(self): + self._value = None + + def copy(self): + return Parameter(name = self.name, valuetype = self.valuetype, unitmanager = self.um, dimension = self.dimension, default = self.value) + + def convert(self, unit): + ''' + @summary: returns the value of the given parameter in the required unit + @param unit: the requested unit, which must adhere to the unit model of this parameter + @type unit: Unit + @return: the parameter value represented in the requested units + @rtype: + @raise ParameterError: Unit not in dimension basin / Unit is not initialized + ''' + if not self.dimension.containsUnit(unit): + raise ParameterError("Unit %s is not in the basin of the dimension %s" % (unit, self.dimension)) + if self._value is None: + raise ParameterError("%s is not initialized" % self) + val, un = self._value + if unit == un: + return self.valuetype(val) + else: + return self.valuetype( self.um.convert(value = val, from_unit = un, to_unit = unit) ) + + def convertToReferencedUnit(self, unitreference): + ''' + @summary: returns the parameter value in units, where the unit is referenced + @param unitreference: the reference to the requested unit, which must adhere to the unit model of this parameter + @type unit: str + ''' + return self.convert( self.um[unitreference] ) + + + + + + + + + + + + + + + + + + + + + + + + + + + +class ParameterList(object): + ''' + @author: steger, jozsef + @summary: + This class represents a list of control parameters of a monitoring task. + ''' + + def __init__(self, parameterlist = []): + ''' + @summary: Constructor + @param parameterlist: a list of parameters to handle together + @type parameterlist: list(Parameter) or ParameterList + ''' + self.parameter = {} + self.extend(parameterlist) + + def __str__(self): + ''' + ''' + return "<ParameterList> [%s\n\t]" % "\n\t\t".join([ "%s," % (p) for p in self.parameter.values() ]) + + def __len__(self): + ''' + @summary: return the size of the parameter list + @return: the size of the parameter list + @rtype: integer + ''' + return len(self.parameter) + + def __iter__(self): + ''' + @summary: provide an iterator over all the parameter elements + @return: the next parameter + @rtype: Parameter + ''' + for p in self.parameter.values(): + yield p + + def __getitem__(self, key): + ''' + @summary: provide the value of a parameter without unit conversion + @return: current value + @rtype: (str, unit) + ''' + return self.parameter[key].value + + def append(self, p): + ''' + @summary: append a new Parameter to the parameter list. If a wrong type of parameter is given, silently discard it. + In case a parameter with the same name exists overwrite its value only. + @param p: a new parameter to add or an existing parameter to update former values + @type p: Parameter + ''' + if not isinstance(p, Parameter): + print "WW: %s is not a parameter" % str(p) + return + if self.has_key(p.name): + print "WW: parameter with name %s is updated" % p.name + self.parameter[p.name].value = p.value + else: + self.parameter[p.name] = p + + def has_key(self, name): + ''' + @summary: Check if a parameter with a given name is already in the list + @param name: the name of the parameter looking for + @type name: str + ''' + return self.parameter.has_key(name) + + def get(self, name, unit): + ''' + @summary: Read the parameter pointed by a given name in the required unit + @param name: the name of the parameter + @type name: str + @param unit: the target unit the caller wants the named parameter to be expressed in + @type unit: Unit + @raise ParameterError: no such parameter name + ''' + if not self.has_key(name): + raise ParameterError("No Parameter with name: %s" % name) + return self.parameter[name].convert(unit) + + def getInReferencedUnits(self, name, unitreference): + ''' + @summary: Read the parameter pointed by a given name in the required unit + @param name: the name of the parameter + @type name: str + @param unitreference: the target unit the caller wants the named parameter to be expressed in + @type unitreference: str + @raise ParameterError: no such parameter name + ''' + if not self.has_key(name): + raise ParameterError("No Parameter with name: %s" % name) + return self.parameter[name].convertToReferencedUnit(unitreference) + + def update(self, name, value, unit): + ''' + @summary: reset the value of the parameter with the given name + @param name: the name of the parameter to update + @type name: str + @param value: the new value + @type value: depends on the Parameter.type + @param unit: the new unit + @type unit: Unit + ''' + self.parameter[name].value = value, unit + + def updateInReferencedUnits(self, name, value, unitreference): + ''' + @summary: reset the value of the parameter with the given name + @param name: the name of the parameter to update + @type name: str + @param value: the new value + @type value: depends on the Parameter.type + @param unitreference: the new unit + @type unitreference: str + ''' + p = self.parameter[name] + p.value = value, p.um[unitreference] + + def update_by_list(self, p_updating): + ''' + @summary: update parameter list with matching elements of another parameter list + @param p_updating: parameter list, whose matching elements update the element of this list + @type p_updating: ParameterList + @raise ParameterError: wrong argument type + ''' + if not isinstance(p_updating, ParameterList): + raise ParameterError("wrong argument type") + for name in p_updating.parameter_names(): + if self.has_key(name): + v = p_updating.parameter[name].value + if v is not None: + self.parameter[name].value = v + + def clear(self): + ''' + @summary: Empty the parameter list + ''' + self.parameter.clear() + + def copy(self): + return ParameterList( map(lambda p: p.copy(), self) ) + + def extend(self, parameterlist): + ''' + @summary: extends this parameter list with the items of another parameter list + @param paramlist: the list of parameter items to extend with + @type paramlist: ParameterList + ''' + for p in parameterlist: + self.append(p) + + def parameter_names(self): + ''' + @summary: List the names of the currently hold parameters + @return: list of Parameter.name + @rtype: list + ''' + return self.parameter.keys() + + def formkeyvaldict(self): + return dict( [ (name, p.value[0]) for (name, p) in self.parameter.iteritems() ] ) diff --git a/Monitoring/MonitoringService/DataProcessing/Prefix.py b/Monitoring/MonitoringService/DataProcessing/Prefix.py new file mode 100644 index 0000000..5e4e1d3 --- /dev/null +++ b/Monitoring/MonitoringService/DataProcessing/Prefix.py @@ -0,0 +1,129 @@ +''' +Created on Feb 27, 2012 + +@author: steger +''' +from DataProcessing.DataError import PrefixError + +class PrefixManager(object): + ''' + @summary: acts a unit prefix container + @ivar prefixes: the container of the known prefixes + @type prefixes: dict(str: L{Prefix}) + @ivar duplicatesymbols: the set of prefixes, which map to the same symbol + @type duplicatesymbols: set(str) + ''' + class Prefix(object): + ''' + @summary: represents a unit prefix and the scaling information + @ivar reference: a uniq prefix identifier + @ivar symbol: a short representation of the prefix + @type symbol: str + @ivar base: the base of the scaling factor + @type base: int + @ivar exponent: the exponent of the scaling factor + @type exponent: int + @ivar scale: the scaling factor, which is base ** exponent + @type scale: float + ''' + def __init__(self, reference, symbol, base, exponent): + ''' + @summary: constructor + @param reference: the reference to the unit prefix + @type reference: str + @param symbol: a short form of the unit prefix + @type symbol: str + @param base: the base of the unit prefix, typically 2 or 10 + @type base: int + @param exponent: the exponent of the unit prefix + @type exponent: int + ''' + scale = base ** exponent + self._data = (reference, symbol, base, exponent, scale) + def __str__(self): + return self.symbol + @property + def reference(self): + return self._data[0] + @property + def symbol(self): + return self._data[1] + @property + def base(self): + return self._data[2] + @property + def exponent(self): + return self._data[3] + @property + def scale(self): + return self._data[4] + + def __init__(self): + ''' + @summary: constructor + ''' + self.prefixes = {} + self.duplicatesymbols = set() + + def __contains__(self, item): + ''' + @summary: check the existence of a unit prefix + @param item: a prefix or a prefix symbol + @type item: L{Prefix} or str + @return: True if the prefix is known by the L{PrefixManager} + @rtype: bool + @raise L{PrefixError}: Wrong item type + ''' + if isinstance(item, self.Prefix): + return item in self.prefixes.values() + elif isinstance(item, str): + for prefix in self.prefixes.values(): + if prefix.symbol == item: + return True + return False + else: + raise PrefixError("Wrong item type %s" % item) + + def __len__(self): + ''' + @summary: the number of prefixes known by the L{PrefixManager} + @return: the number of prefixes known by the L{PrefixManager} + @rtype: int + ''' + return len(self.prefixes) + + def newPrefix(self, reference, symbol, base, exponent): + ''' + @summary: generate a new unit prefix + @param reference: the reference to the unit prefix + @type reference: str + @param symbol: a short form of the unit prefix + @type symbol: str + @param base: the base of the unit prefix, typically 2 or 10 + @type base: int + @param exponent: the exponent of the unit prefix + @type exponent: int + @return: the new unit prefix + @rtype: L{Prefix} + @raise L{PrefixError}: Prefix with reference exists + ''' + if self.prefixes.has_key(reference): + raise PrefixError("Prefix with reference %s already exists" % reference) + if PrefixManager.__contains__(self, symbol): + self.duplicatesymbols.add(symbol) + prefix = self.Prefix(reference, symbol, base, exponent) + self.prefixes[reference] = prefix + return prefix + + def __getitem__(self, reference): + ''' + @summary: look up the prefix in the L{PrefixManager} based on its reference + @param reference: the reference to the unit prefix + @type reference: str + @return: the unit prefix found + @rtype: L{Prefix} + @raise L{PrefixError}: Prefix with reference not found + ''' + if self.prefixes.has_key(reference): + return self.prefixes[reference] + raise PrefixError("Prefix with reference %s not found" % reference) diff --git a/Monitoring/MonitoringService/DataProcessing/Sampler.py b/Monitoring/MonitoringService/DataProcessing/Sampler.py new file mode 100644 index 0000000..df8c2ec --- /dev/null +++ b/Monitoring/MonitoringService/DataProcessing/Sampler.py @@ -0,0 +1,204 @@ +''' +Created on Nov 20, 2012 + +@author: steger +''' +from DataProcessing.DataReader import DataReader +from DataProcessing.DataHeader import DataHeader +from DataProcessing.DataSource import DataSource +from DataProcessing.Data import Data +from DataProcessing.DataError import SamplerError + +class Sampler(DataSource): + ''' + classdocs + ''' + + def __init__(self, datasource): + ''' + Constructor + @param datasource: table of records to manipulate with + @type datasource: DataSource + ''' + if not isinstance(datasource, DataSource): + raise SamplerError("Wrong type of datasource %s" % datasource) + DataSource.__init__(self, dependency = datasource) + self.source = datasource + header = DataHeader("%sSample(%s)" % (self.name, self.source.name)) + for c in self._inputreader.headercells(): + header.addColumn(c) + self.um = self.source.um + self._data = Data(self.um, header) + + @property + def readerClass(self): + return DataReader + + @property + def header(self): + return self._data.header + + def __len__(self): + self.process() + return len(self._data) + + def __getitem__(self, k): + return self._data._rawrecords.__getitem__(k) + + @property + def writelock(self): + return self._data.writelock + + +class Head(Sampler): + def __init__(self, datasource, head = 10): + ''' + Constructor + @param datasource: table of records to manipulate with + @type datasource: DataSource + @param head: the top n records of the table + @type head: int + ''' + Sampler.__init__(self, datasource) + self._head = head + + @property + def name(self): + return "Head" + + @property + def head(self): + return self._head + @head.setter + def head(self, head): + self._head = int(head) + self._data.clear() + self._inputreader.rewind() + + def _process(self): + status = self.PASS + if self._inputreader.sourceCleared.isSet(): + self._inputreader.sourceCleared.clear() + self._inputreader.rewind() + self._data.clear() + status |= self.CLEARED + if len(self._data) == self.head: + return status + for x in self._inputreader: + self._data._rawrecords.append(x) + if len(self._data) == self.head: + status |= self.EXPANDED + return status + raise SamplerError("Not enough sample %d/%d" % (len(self._data), self.head)) + +class Tail(Sampler): + def __init__(self, datasource, tail = 10): + ''' + Constructor + @param datasource: table of records to manipulate with + @type datasource: DataSource + @param tail: the last n records of the table + @type tail: int + ''' + Sampler.__init__(self, datasource) + self._tail = tail + + @property + def name(self): + return "Tail" + + @property + def tail(self): + return self._tail + @tail.setter + def tail(self, tail): + self._tail = int(tail) + self._data.clear() + self._inputreader.rewind() + + def _process(self): + status = self.PASS + clear = False + if self._inputreader.sourceCleared.isSet(): + self._inputreader.sourceCleared.clear() + self._inputreader.rewind() + self._data.clear() + for x in self._inputreader: + if len(self._data) == self.tail: + self._data._rawrecords.pop(0) + clear = True + self._data._rawrecords.append(x) + if clear: + status |= self.CLEARED + if len(self._data) == self.tail: + status |= self.EXPANDED + return status + else: + raise SamplerError("Not enough sample %d/%d" % (len(self._data), self.tail)) + +class Sorter(Sampler): + def __init__(self, datasource, keycell = None, ascending = True): + ''' + Constructor + @param datasource: table of records to manipulate with + @type datasource: DataSource + @param keycell: the key column to use for sorting + @type keycell: CellRequest or None + @param ascending: indicate the sortin order + @type ascending: bool + ''' + Sampler.__init__(self, datasource) + self._asc = ascending + self._key = 0 + if keycell: + self.keycell = keycell + + @property + def name(self): + return "Sort" + + @property + def ascending(self): + return self._asc + @ascending.setter + def ascending(self, ascending): + if bool(ascending) != self._asc: + self._asc = bool(ascending) + self._data.clear() + self._inputreader.rewind() + + @property + def key(self): + return self._key + @key.setter + def key(self, key): + self._key = int(key) + + @property + def keycell(self): + raise SamplerError("don't read this property") + @keycell.setter + def keycell(self, cellrequest): + for idx, _ in self.source._data.header.getCell(cellrequest): + self._key = idx + break + + def _process(self): + status = self.PASS + if self._inputreader.sourceCleared.isSet(): + self._inputreader.sourceCleared.clear() + status |= self.CLEARED + if self._inputreader.sourceExpanded.isSet(): + self._inputreader.sourceExpanded.clear() + status |= self.CLEARED + if status == self.PASS: + return self.PASS + self._inputreader.rewind() + self._data.clear() + self._data._rawrecords = sorted(self._inputreader.source._data._rawrecords, key=lambda r: r[self.key], reverse = not self.ascending) + if len(self._data): + status |= self.EXPANDED + return status + else: + raise SamplerError("Not enough sample...") + diff --git a/Monitoring/MonitoringService/DataProcessing/Unit.py b/Monitoring/MonitoringService/DataProcessing/Unit.py new file mode 100644 index 0000000..c21339c --- /dev/null +++ b/Monitoring/MonitoringService/DataProcessing/Unit.py @@ -0,0 +1,345 @@ +''' +Created on Oct 19, 2011 + +@author: steger, jozsef +@organization: ELTE +@contact: steger@complex.elte.hu +''' +from DataError import UnitError + +class UnitManager(object): + ''' + @summary: the unit container + + @note: The relationship between various unit, describing the derivation paths are not stored in this model, + because this information can be inferred from the dimension derivations, represented in the L{DimensionManager}. + @note: Units that are formed by prepending a unit prefix (L{Prefix}) are dealt as a L{DerivedUnit}. + + @ivar units: container of known units + @type units: dict(str: L{Unit}) + @ivar conversionpaths: is a map of operations to carry out from a unit to get a different unit + @type conversionpaths: dict((L{Unit}, L{Unit}): (callable, args)) + @ivar basins: indicates the derivatives of a basic unit + @type basins: dict(L{BasicUnit}: set(L{Unit})) + @ivar duplicatesymbols: collection of unit symbols, which more than one unit may bare + @type duplicatesymbols: set(str) + ''' + + class Unit(object): + ''' + @summary: common skeleton of all units + @ivar manager: reference to the unit manager + @type manager: L{UnitManager} + @ivar reference: unique reference of the unit + @ivar symbol: short form of the unit + @type symbol: str + ''' + def __init__(self, manager, reference, symbol, ancestor): + ''' + @summary: bind and store common information of the unit + @param manager: the unit manager + @type manager: L{UnitManager} + @param reference: a unique identifier + @param symbol: short human readable representation of the unit + @type symbol: str + @param ancestor: the ancestor of this unit is deriving from + @type ancestor: L{Unit} + ''' + self._data = (manager, reference, symbol) + self._ancestor = ancestor + @property + def manager(self): + return self._data[0] + @property + def reference(self): + return self._data[1] + @property + def symbol(self): + return self._data[2] + def __str__(self): + return self.symbol + def __eq__(self, u): + return self._data == u._data + + class BasicUnit(Unit): + ''' + @summary: a unit axiom + ''' + def __init__(self, manager, reference, symbol): + ''' + @summary: constructor + A BasicUnit is an instance of either set of BaseUnit, ProductUnit and PowerUnit as of the information model. + @param manager: a reference to the unit manager + @type manager: L{UnitManager} + @param reference: the reference to the unit + @param symbol: an abbreviation for the unit + @type symbol: str + ''' + UnitManager.Unit.__init__(self, manager, reference, symbol, None) + + class DerivedUnit(Unit): + ''' + @summary: a unit deriving from various known units + ''' + def __init__(self, manager, reference, symbol, ancestor): + ''' + @summary: constructor + A DerivedUnit is an instance of either set of LinearTransformedUnit and RegexpScaledUnit as of the information model. + Also units that have any unit prefix fall in this set. + @param manager: a reference to the unit manager + @type manager: L{UnitManager} + @param reference: the reference to the unit + @param symbol: an abbreviation for the unit + @type symbol: str + @param ancestor: the neighbor unit, whose derivative this instance is. + @type ancestor: L{Unit} + ''' + UnitManager.Unit.__init__(self, manager, reference, symbol, ancestor) + + + def __init__(self): + ''' + @summary: constructor + ''' + self.units = {} + self.conversionpaths = {} + self.basins = {} + self.duplicatesymbols = set() + + def __contains__(self, item): + ''' + @summary: check the existence of a unit + @param item: a unit or its symbol + @type item: L{Unit} or str + @return: True if the unit is known by the L{UnitManager} + @rtype: bool + @raise L{UnitError}: Wrong item type + ''' + units = set(self.units.values()) + if isinstance(item, self.Unit): + return item in units + elif isinstance(item, str): + for unit in units: + if unit.symbol == item: + return True + return False + else: + raise UnitError("Wrong item type %s" % item) + + def __len__(self): + ''' + @summary: the number of units known by the L{UnitManager} + @return: the number of units known by the L{UnitManager} + @rtype: int + ''' + return len(self.units) + + @staticmethod + def intORfloat(x): + ''' + @summary: a conversion helper to read out a value as a number + @param x: a number + @type x: str + @return: the number converted to integer or floating point decimal + @rtype: int or float + ''' + if isinstance(x, str): + try: + return int(x) + except ValueError: + return float(x) + else: + return float(x) + + def __getitem__(self, reference): + ''' + @summary: look up the unit in the L{UnitManager} using its reference + @param reference: the reference to the unit + @return: the unit found + @rtype: L{Unit} + @raise L{UnitError}: Unit with reference not found + ''' + if self.units.has_key(reference): + return self.units[reference] + raise UnitError("Unit with reference %s not found" % reference) + + def newBasicUnit(self, reference, symbol): + ''' + @summary: generate a new basic unit + @param reference: the reference to the unit + @param symbol: a short form of the unit + @type symbol: str + @return: the new unit + @rtype: L{BasicUnit} + @raise L{UnitError}: Unit with reference exists + ''' + if self.units.has_key(reference): + raise UnitError("Unit with reference %s exists" % reference) + if UnitManager.__contains__(self, symbol): + self.duplicatesymbols.add(symbol) + unit = self.BasicUnit(self, reference, symbol) + self.units[reference] = unit + self.basins[unit] = set([unit]) + self.__dict__[reference] = unit + return unit + + def addLinearTransformedUnit(self, reference, symbol, derivedfrom, scale, offset = 0): + ''' + @summary: generate a derived unit + @param reference: the reference to the unit + @param symbol: a short form of the unit + @type symbol: str + @param derivedfrom: the neighbor unit + @type derivedfrom: L{Unit} + @param scale: scaling factor for the linear transformation + @type scale: float + @param offset: the shift in the linear transformation, defaults to 0 + @type offset: float + @return: the new unit + @rtype: L{DerivedUnit} + @raise L{UnitError}: Wrong type of derivedfrom / Unit not found / Unit with reference exists / Cannot extend basin with unit, because Unit not found + ''' + if not isinstance(derivedfrom, self.Unit): + raise UnitError("Wrong type of derivedfrom %s" % derivedfrom) + if not UnitManager.__contains__(self, str(derivedfrom)): + raise UnitError("Unit %s not found" % derivedfrom) + if self.units.has_key(reference): + raise UnitError("Unit with reference %s exists" % reference) + unit = self.DerivedUnit(self, reference, symbol, derivedfrom) + basic = derivedfrom + while basic._ancestor: + basic = basic._ancestor + if not self.basins.has_key(basic): + raise UnitError("Cannot extend basin with unit %s, because Unit %s not found" % (unit, basic)) + if UnitManager.__contains__(self, symbol): + self.duplicatesymbols.add(symbol) + self.units[reference] = unit + self.conversionpaths[(unit, derivedfrom)] = (self.op_lt_forward, (scale, offset)) + self.conversionpaths[(derivedfrom, unit)] = (self.op_lt_inverse, (scale, offset)) + self.basins[basic].add(unit) + self.__dict__[reference] = unit + return unit + + def addRegexpTransformedUnit(self, reference, symbol, derivedfrom, expr_forward, expr_inverse): + ''' + @summary: generate a derived unit + @param reference: the reference to the unit + @param symbol: a short form of the unit + @type symbol: str + @param derivedfrom: the neighbor unit + @type derivedfrom: L{Unit} + @param expr_forward: the expression driving the forward transformation + @type expr_forward: str + @param expr_inverse: the expression driving the inverse transformation + @type expr_inverse: str + @return: the new unit + @rtype: L{DerivedUnit} + @raise L{UnitError}: Wrong type of derivedfrom / Unit not found / Unit with reference exists / Cannot extend basin with unit, because Unit not found + ''' + if not isinstance(derivedfrom, self.Unit): + raise UnitError("Wrong type of derivedfrom %s" % derivedfrom) + if not UnitManager.__contains__(self, str(derivedfrom)): + raise UnitError("Unit %s not found" % derivedfrom) + if self.units.has_key(reference): + raise UnitError("Unit with reference %s exists" % reference) + unit = self.DerivedUnit(self, reference, symbol, derivedfrom) + basic = derivedfrom + while basic._ancestor: + basic = basic._ancestor + if not self.basins.has_key(basic): + raise UnitError("Cannot extend basin with unit %s, because Unit %s not found" % (unit, basic)) + if UnitManager.__contains__(self, symbol): + self.duplicatesymbols.add(symbol) + self.units[reference] = unit + self.conversionpaths[(unit, derivedfrom)] = (self.op_rt_forward, expr_forward) + self.conversionpaths[(derivedfrom, unit)] = (self.op_rt_inverse, expr_inverse) + self.basins[basic].add(unit) + self.__dict__[reference] = unit + return unit + + def getBasinByUnit(self, unit): + ''' + @summary: return the set of units, which are compatible with a given unit + @param unit: the unit to look up + @type unit: L{Unit} + @return: the set of compatible units + @rtype: set(L{Unit}) + @raise L{UnitError}: not found + ''' + for basin in self.basins.values(): + if unit in basin: + return basin + raise UnitError("Basin for unit %s not found" % unit) + + def getBasinByReference(self, reference): + ''' + @summary: look up the compatible units of a given unit with the calling reference + @param reference: + @return: the set of compatible units + @rtype: set(L{Unit}) + @raise L{UnitError}: not found + ''' + try: + unit = self[reference] + return self.getBasinByUnit(unit) + except UnitError: + raise UnitError("Basin for unit reference %s not found" % reference) + + def op_lt_forward(self, value, so): + (scale, offset) = so + def op(value): + return scale * self.intORfloat( value ) + offset + if isinstance(value, list): + return map(lambda x: op(x), value) + return op(value) + + def op_lt_inverse(self, value, so): + (scale, offset) = so + def op(value): + return (self.intORfloat( value ) - offset) / float(scale) + if isinstance(value, list): + return map(lambda x: op(x), value) + return op(value) + + def op_rt_forward(self, value, expression): + def op(value): + raise UnitError("not implemented") + if isinstance(value, list): + return map(lambda x: op(x), value) + return op(value) + + op_rt_inverse = op_rt_forward + + def convert(self, value, from_unit, to_unit): + ''' + @summary: convert a value of one unit to the other + @param value: input value in from_unit + @param from_unit: the original unit of the input value + @type from_unit: L{Unit} + @param to_unit: the requested new unit + @type to_unit: L{Unit} + @raise L{UnitError}: unknown unit / incompatible units + ''' + if not UnitManager.__contains__(self, str(from_unit)): + raise UnitError("Unknown from_unit") + if not UnitManager.__contains__(self, str(to_unit)): + raise UnitError("Unknown to_unit") + if from_unit == to_unit: + return value + + while from_unit._ancestor: + op, oparg = self.conversionpaths[(from_unit, from_unit._ancestor)] + value = op(value, oparg) + from_unit = from_unit._ancestor + heap = [] + while to_unit._ancestor: + op, oparg = self.conversionpaths[(to_unit._ancestor, to_unit)] + heap.append((op, oparg)) + to_unit = to_unit._ancestor + if from_unit != to_unit: + raise UnitError("Different base units %s %s" % (from_unit, to_unit)) + while len(heap): + op, oparg = heap.pop(0) + value = op(value, oparg) + return value + diff --git a/Monitoring/MonitoringService/DataProcessing/__init__.py b/Monitoring/MonitoringService/DataProcessing/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/Monitoring/MonitoringService/DataProcessing/__init__.py diff --git a/Monitoring/MonitoringService/DataProcessing/test.py b/Monitoring/MonitoringService/DataProcessing/test.py new file mode 100644 index 0000000..e664512 --- /dev/null +++ b/Monitoring/MonitoringService/DataProcessing/test.py @@ -0,0 +1,366 @@ +''' +Created on Sep 2, 2011 + +@author: steger +''' +import unittest +from DataProcessing.Parameter import Parameter +from random import randint +from Example.Prefixes import prefixes, PM +from Example.Dimensions import DM, timeinterval, countable, ipaddress, cardinal +from Example.Units import UM, milli_second, pico_second, dozen, micro_second,\ + piece, nano_second, second +from DataProcessing.Data import Data +from DataProcessing.Aggregator import Sum, Max, Min, Mean, Deviation, Percentile +from DataProcessing.DataReader import DataReader +from DataProcessing.DataFormatter import JsonFormatter, DumbFormatter +from DataProcessing.Sampler import Head, Tail, Sorter +from DataProcessing.AggregatorManager import AggregatorManager +from DataProcessing.DataHeader import DataHeaderGeneratedByDescription,\ + DataHeader +from DataProcessing.DataHeaderCell import DataHeaderCell, CellRequestByName,\ + CellRequestByFeature +from DataProcessing.DataError import DataError, SamplerError +from DataProcessing.DataSample import DataSample + + +class Test(unittest.TestCase): + eps = 1e-15 + + def different(self, expect, got): + return abs(expect - got) / float(expect) < self.eps + + def setUp(self): + pass + + def test_PM(self): + for ref, symbol, base, exponent in prefixes: + scale = base ** exponent + p = PM[ref] + self.assertEqual(str(p), symbol, "symbol cannot be read back %s %s" % (p, symbol)) + self.assertEqual(p.scale, scale, "prefix %s scale error got: %f expect: %f" % (p, p.scale, scale)) + + self.assertTrue('p' in PM, "cannot find symbol") + self.assertFalse('pico' in PM, "found a symbol, which I shouldn't") + + + def test_UM(self): + s = randint(1, 10000) + expect = s * 1e-3 + got = UM.convert(s, milli_second, second) + self.assertTrue(self.different(expect, got), "Different (%d ms) expect %f s got %f s" % (s, expect, got)) + expect = s * 1e9 + got = UM.convert(s, milli_second, pico_second) + self.assertTrue(self.different(expect, got), "Different (%d ms) expect %f ps got %f ps" % (s, expect, got)) + + kilobit = UM["kilo_bit"] + megaByte = UM["mega_Byte"] + b = randint(1, 1000) + expect = b * 1e-3 / 8. + got = UM.convert(b, kilobit, megaByte) + self.assertTrue(self.different(expect, got), "Different (%d kbit) expect %f MB got %f MB" % (b, expect, got)) + + def test_D(self): + dim = DM['TimeInterval'] + for u in [second, milli_second]: + self.assertTrue(dim.containsUnit(u), "piece %s not in dim" % u) + bu = UM.getBasinByUnit(UM['second']) + br = UM.getBasinByReference('micro_second') + self.assertTrue(bu == br, "basins differ") + + def test_parameter(self): + n = randint(0, 1000) + parameter = Parameter(name = 'testparameter', valuetype = float, unitmanager = UM, dimension = countable, default = (n, dozen)) + v1 = 12 * parameter.value[0] + v2 = parameter.convert(piece) + self.assertTrue(abs(v1 - v2) < self.eps, "%d dozen and %d are not equal (type 1)" % (n, v2)) + n = randint(0, 1000) + parameter.value = (n, piece) + v = parameter.convert(dozen) + self.assertTrue(abs(12 * v - n) < self.eps, "%f dozen and %d are not equal (type 2)" % (v, n)) + + def test_addcolumn(self): + ''' + ''' + c1 = DataHeaderCell(name = "oszlop", dimension = timeinterval, unit = milli_second) + c2 = DataHeaderCell(name = "oszlop2", dimension = timeinterval, unit = second, feature = "kutyafule") + h = DataHeader(name = "proba") + h.addColumn(c1) + h.addColumn(c2) + self.assertRaises(DataError, h.addColumn, c1) + cr1 = CellRequestByName(name = "oszlop2") + cr2 = CellRequestByFeature(feature = "kutyafule") + qr1 = [ x for x in h.getCell(cellrequest = cr1) ] + qr2 = [ x for x in h.getCell(cellrequest = cr2) ] + self.assertEqual(qr1, qr2, "getCell oopses 1") + qr = [ x for x in h.getCell(cellrequest = CellRequestByFeature(feature = "macskanyelv")) ] + self.assertEqual(len(qr), 0, "getCell oopses 2") + + + def test_createheadertemplate(self): + header = DataHeader(name = "traceroute") + cell = DataHeaderCell(name = "idx", dimension = cardinal) + header.addColumn(cell) + iphdr = DataHeader(name = "info") + cell = DataHeaderCell(name = "address", dimension = ipaddress) + iphdr.addColumn(cell) + rtthdr = DataHeader(name = "rttinfo") + cell = DataHeaderCell(name = "roundtripdelay", dimension = timeinterval, unit = milli_second) + rtthdr.addColumn(cell) + iphdr.addColumn(rtthdr) + header.addColumn(iphdr) + header2 = DataHeaderGeneratedByDescription("traceroute", [('idx', cardinal), ("info", [('address', ipaddress), ("rttinfo", [('roundtripdelay', timeinterval, milli_second)])])]) + self.assertTrue(header == header2, "headers differ:\n%s\n%s" % (header, header2)) + + def test_complex_table(self): + ''' + ''' + header = DataHeaderGeneratedByDescription("traceroute", [('idx', cardinal), ("info", [('address', ipaddress), ("rttinfo", [('roundtripdelay', timeinterval, milli_second)])])]) + + D = Data(UM, header) + hoprecord = D.getTemplate(size = 2) + iprec1, iprec2 = hoprecord.getRecordTemplates(name = "info") + (rttrec1,) = iprec1.getRecordTemplates(name = "rttinfo", sizes = [3,]) + (rttrec2,) = iprec2.getRecordTemplates(name = "rttinfo", sizes = [3,]) + + rttrec1.update(name = 'roundtripdelay', values = [2.925, 3.279, 3.758], unit = milli_second) + iprec1.update(name = 'address', values = ['192.168.1.1']) + + rttrec2.update(name = 'roundtripdelay', values = [.008634, .008857, .009054], unit = second) + iprec2.update(name = 'address', values = ['157.181.172.126']) + + hoprecord.update(name = 'idx', values = [1,2]) + + D.saveRecord(hoprecord) + + def test_iteratorNextractor(self): + N = 1000 + header = DataHeaderGeneratedByDescription("temptable", [('idx', cardinal), ('RoundTripDelay', timeinterval, milli_second)]) + milli = map(lambda x: randint(1, 100000), range(N)) + micro = map(lambda x: 1000*x, milli) + nano = map(lambda x: 1000000*x, milli) + D = Data(UM, header) + hoprecord = D.getTemplate(size = N) + hoprecord.update(name = 'RoundTripDelay', values = milli, unit = milli_second) + hoprecord.update(name = 'idx', values = range(N)) + D.saveRecord(hoprecord) + + DS = DataSample(table = D) + + DR = DataReader(datasource = DS) + DR.extract(cellrequest = [CellRequestByName(name = 'RoundTripDelay'), CellRequestByName(name = 'RoundTripDelay', unit = micro_second), CellRequestByName(name = 'RoundTripDelay', unit = nano_second)]) + for x in DR: + mill, mic, nan = milli.pop(0), micro.pop(0), nano.pop(0) + delta = [(x[0]-mill)/mill, (x[1]-mic)/mic, (x[2]-nan)/nan] + mask = map(lambda d: abs(d)< self.eps, delta) + self.assertFalse((False in mask), "Conversion introduced a huge error GOT: %s EXPECTED: %s %s %s DELTA: %s MASK: %s" % (x, mill,mic,nan, delta, mask)) + + + @staticmethod + def randheader(): + return DataHeaderGeneratedByDescription("temptable", [('idx', cardinal), ('rnd', countable)]) + + def randtable(self, N = 10): + n = map(lambda x: randint(1, 100000), range(N)) + D = Data(UM, self.randheader()) + hoprecord = D.getTemplate(size = N) + hoprecord.update(name = 'rnd', values = n) + hoprecord.update(name = 'idx', values = range(N)) + D.saveRecord(hoprecord) + S = DataSample(table = D) + return n, D, S + + def test_reader(self): + N = 10 + header = self.randheader() + n1 = map(lambda x: randint(1, 100000), range(N)) + n2 = map(lambda x: randint(1, 100000), range(N)) + D = Data(UM, header) + hoprecord = D.getTemplate(size = N) + hoprecord.update(name = 'rnd', values = n1) + hoprecord.update(name = 'idx', values = range(N)) + DS = DataSample(table = D) + DR = DataReader(datasource = DS) + self.assertFalse(DR.sourceExpanded.isSet(), "dataready, howcome?") + D.saveRecord(hoprecord) + DS.process() + self.assertTrue(DR.sourceExpanded.isSet(), "data not ready, howcome?") + for _ in DR: + pass + self.assertFalse(DR.sourceExpanded.isSet(), "data still ready, howcome?") + hoprecord.update(name = 'rnd', values = n2) + D.saveRecord(hoprecord) + DS.process() + self.assertTrue(DR.sourceExpanded.isSet(), "data not ready, howcome?") + DR.rewind() + got = len([x for x in DR]) + self.assertEqual(2*N, got, "Expected %d items and got %d" % (2*N, got)) + + def test_formatter(self): + _, _, DS = self.randtable() + DF = DumbFormatter(datasource = DS) + res = DF.serialize() + #print res + self.assertGreater(len(res), 2, "empty? %s" % res) + JF = JsonFormatter(datasource = DS) + JF.reader.extract(cellrequest = [CellRequestByName(name = 'rnd')]) + res = JF.serialize() + #print res + self.assertGreater(len(res), 2, "empty? %s" % res) + + def test_aggregator(self): + N = 10 + n, _, DS = self.randtable(N) +# self.assertRaises(AggregatorError, Aggregator(D, CellRequestByName(name = 'rnd'))) + s = Sum(DS, CellRequestByName(name = 'rnd')) + mn = Min(DS, CellRequestByName(name = 'rnd')) + mx = Max(DS, CellRequestByName(name = 'rnd')) + avg = Mean(DS, CellRequestByName(name = 'rnd')) + S = sum(n) + self.assertEqual(s.data._rawrecords[0], (N, S), "sum %f != %f" % (s._aggregate, S)) + self.assertEqual(mn.data._rawrecords[0], (N, min(n)), "min %f != %f" % (mn._aggregate, min(n))) + self.assertEqual(mx.data._rawrecords[0], (N, max(n)), "max %f != %f" % (mx._aggregate, max(n))) + self.assertEqual(avg.data._rawrecords[0], (N, S/float(N)), "avg %f != %f" % (avg._aggregate, S/N)) + + def test_sampler(self): + header = self.randheader() + D = Data(UM, header) + DS = DataSample(table = D) + T = Tail(datasource = DS, tail = 10) + self.assertRaises(SamplerError, T.process) + + n, D, DS = self.randtable() + + H = Head(datasource = DS, head = 5) + DR = DataReader(datasource = H) + expect = n[:5] + got = [ x for _, x in DR ] + self.assertEqual(got, expect, "head %s != %s" % (got, expect)) + + T = Tail(datasource = DS) + T.tail = 5 + DR = DataReader(datasource = T) + expect = n[-5:] + got = [ x for _, x in DR ] + self.assertEqual(got, expect, "tail %s != %s" % (got, expect)) + + expect = n[:] + expect.sort() + S = Sorter(datasource = DS, keycell = CellRequestByName(name = 'rnd')) + DR = DataReader(datasource = S) + got = [ x for _, x in DR ] + self.assertEqual(got, expect, "sort %s != %s" % (got, expect)) + + expect.reverse() + S = Sorter(datasource = DS, keycell = CellRequestByName(name = 'rnd'), ascending = False) + DR = DataReader(datasource = S) + got = [ x for _, x in DR ] + self.assertEqual(got, expect, "sort %s != %s" % (got, expect)) + + + def test_DispersionOK(self): + header = self.randheader() + items = [55,56,57,63,67,68] + D = Data(UM, header) + hoprecord = D.getTemplate(size = len(items)) + hoprecord.update(name = 'rnd', values = items) + hoprecord.update(name = 'idx', values = range(len(items))) + D.saveRecord(hoprecord) + DS = DataSample(table = D) + a = Deviation(DS, CellRequestByName(name = 'rnd')) + a.empirical = False + a.data + self.assertTrue((5.26 == round(a._aggregate,2) ), "Dispersion FAILED 5.26 = "+str(a._aggregate)) + + def test_PercentOK(self): + header = self.randheader() + items = [4.0,5.0,5.0,4.0] + D = Data(UM, header) + hoprecord = D.getTemplate(size = len(items)) + hoprecord.update(name = 'rnd', values = items) + hoprecord.update(name = 'idx', values = range(len(items))) + D.saveRecord(hoprecord) + DS = DataSample(table = D) + a = Percentile(DS, CellRequestByName(name = 'rnd')) + a.percentile = .5 + a.data + self.assertTrue((4.5 == a._aggregate ), "Percent is FAILED 4.5 = "+str(a._aggregate)) + + def test_Pipe(self): + header = self.randheader() + items = [55,56,57,63,67,68] + D = Data(UM, header) + hoprecord = D.getTemplate(size = len(items)) + hoprecord.update(name = 'rnd', values = items) + hoprecord.update(name = 'idx', values = range(len(items))) + D.saveRecord(hoprecord) + DS = DataSample(table = D) + a = Mean(datasource = Tail(datasource = Head(datasource = DS, head = 4), tail = 2), cellrequest = CellRequestByName(name = 'rnd')) + a.data + res = a._aggregate + self.assertTrue((60 == res ), "Pipe FAILED 60 = "+str(res)) + + def test_aggregatorManager(self): + N = 12 + n, D, DS = self.randtable(N) + + AM = AggregatorManager() + _, A = AM.newAggregator(DS, CellRequestByName(name = 'rnd'), [(Tail, {'tail': 10}), (Head, {'head': 5}), (Sum, {})]) + + A.process() + expected = sum(n[-10:][:5]) + got = A._aggregate + self.assertEqual(expected, got, "sum (exp) %f != (got) %f" % (expected, got)) + + hoprecord = D.getTemplate(size = N) + n = map(lambda x: randint(1, 100000), range(N)) + hoprecord.update(name = 'rnd', values = n) + D.saveRecord(hoprecord) + + A.process() + + got = A._aggregate + expected = sum(n[-10:][:5]) + self.assertEqual(expected, got, "2 sum (exp) %f != (got) %f" % (expected, got)) + + + def test_aggrformatter(self): + n, _, DS = self.randtable(15) + AM = AggregatorManager() + _, A = AM.newAggregator(DS, CellRequestByName(name = 'rnd'), [(Tail, {'tail': 10}), (Head, {'head': 5}), (Sum, {})]) +# _, A = AM.newAggregator(DS, CellRequestByName(name = 'rnd'), [(Tail, {'tail': 10})]) + DF = JsonFormatter(datasource = A) + res = DF.serialize() +# print "_"*10 +# print res +# print "^"*10 + self.assertGreater(len(res), 2, "empty? %s" % res) + expected = sum(n[-10:][:5]) + got = A._aggregate + self.assertEqual(expected, got, "2 sum(head(tail())) (exp) %f != (got) %f" % (expected, got)) + + + def test_ComplexaggregateOK(self): + ''' + ''' + header = DataHeaderGeneratedByDescription("traceroute", [('idx', cardinal), ("info", [("rttinfo", countable)])]) + + D = Data(UM, header) + hoprecord = D.getTemplate(size = 5) + inf1, inf2, inf3, inf4, inf5 = hoprecord.getRecordTemplates(name = "info") + + inf1.update(name = 'rttinfo', values = [10]) + inf2.update(name = 'rttinfo', values = [15]) + inf3.update(name = 'rttinfo', values = [16]) + inf4.update(name = 'rttinfo', values = [18]) + inf5.update(name = 'rttinfo', values = [20]) + + hoprecord.update(name = 'idx', values = [1,2,3,4,5]) + + D.saveRecord(hoprecord) + #a = Aggregator(D, ['info','rttinfo']) + + +if __name__ == "__main__": + #import sys;sys.argv = ['', 'Test.test_UM'] + unittest.main() |