summaryrefslogtreecommitdiffstats
path: root/Monitoring/MonitoringService/Service
diff options
context:
space:
mode:
Diffstat (limited to 'Monitoring/MonitoringService/Service')
-rw-r--r--Monitoring/MonitoringService/Service/MonitoringService.py342
-rw-r--r--Monitoring/MonitoringService/Service/WatchdogManager.py46
-rw-r--r--Monitoring/MonitoringService/Service/__init__.py0
-rw-r--r--Monitoring/MonitoringService/Service/interface.py344
-rw-r--r--Monitoring/MonitoringService/Service/mock_framework.py284
-rw-r--r--Monitoring/MonitoringService/Service/test.py442
6 files changed, 1458 insertions, 0 deletions
diff --git a/Monitoring/MonitoringService/Service/MonitoringService.py b/Monitoring/MonitoringService/Service/MonitoringService.py
new file mode 100644
index 0000000..9b1f87d
--- /dev/null
+++ b/Monitoring/MonitoringService/Service/MonitoringService.py
@@ -0,0 +1,342 @@
+'''
+Created on Mar 22, 2012
+
+@author: steger
+'''
+from time import sleep
+from Semantics.InformationModel import Ontology
+from Semantics.UnitModel import UnitModel
+from Semantics.TaskModel import TaskModel
+from Semantics.QueryInterpreter import QueryInterpreter
+from Task.Task import SubtaskManager, TaskError, STRAT_PERIODICAL,\
+ STRAT_ONDEMAND
+from DataProcessing.Parameter import ParameterList
+from Resource.node import node
+from DataProcessing.AggregatorManager import AggregatorManager
+from paramiko.ssh_exception import BadAuthenticationType
+import logging
+from StringIO import StringIO
+from DataProcessing.DataFormatter import JsonFormatter
+from DataProcessing.DataHeaderCell import CellRequestByFeature,\
+ CellRequestByName
+from DataProcessing.DataError import SamplerError
+from Example.model import owl_unit, owl_param, owl_features, owl_task, owl_query,\
+ owl_stat, owl_core
+from Service.WatchdogManager import WatchdogManager
+
+ontology = {
+ 'unit': (owl_unit, "http://fp7-novi.eu/unit.owl#"),
+ 'param': (owl_param, "http://fp7-novi.eu/monitoring_parameter.owl#"),
+ 'feature': (owl_features, "http://fp7-novi.eu/monitoring_features.owl#"),
+ 'task': (owl_task, "http://fp7-novi.eu/monitoring_task.owl#"),
+ 'query': (owl_query, "http://fp7-novi.eu/monitoring_query.owl#"),
+ 'stat': (owl_stat, 'http://fp7-novi.eu/monitoring_stat.owl#'),
+ 'core': (owl_core, "http://fp7-novi.eu/im.owl#"),
+}
+
+
+
+
+class MonitoringService(object):
+ '''
+ classdocs
+ '''
+ version = "0.0"
+
+ def __str__(self):
+ return "NOVI Monitoring Service v%s @ %s" % (self.version, self.platform)
+
+ @property
+ def platform(self):
+ return self._if.platform
+
+ def __init__(self, interface, config_owl):
+ '''
+ @summary: constructor
+ @param interface:
+ @type interface: MSInterface
+ @param config_owl: platform specific configuration model
+ @type config_owl: str
+ '''
+ self._if = interface
+ self.logger = logging.getLogger(name = "NOVI.MS.%s" % self.platform)
+ self.log = self._if.log # to be removed
+ self.ontology = Ontology()
+ for prefix, (owl_url, ns) in ontology.iteritems():
+ if owl_url is None:
+ continue
+ self.ontology.load(prefix, owl_url, ns)
+ self.ontology.load('config', config_owl, 'http://fp7-novi.eu/config.owl#')
+ self.unitmodel = UnitModel(self.ontology)
+ self.stm = SubtaskManager(self.um)
+ self.am = AggregatorManager()
+ self.wm = WatchdogManager(self.am)
+
+ self.taskmodel = TaskModel(self.dm, self.um, self.ontology)
+ self.QI = QueryInterpreter(self.taskmodel)
+
+ self._nextID = 0
+ self.subtaskIDs = {}
+ self.aggregatorIDs = {}
+ self.watchdogIDs = {}
+ self.formatters = {}
+
+ @property
+ def pm(self): return self.unitmodel.pm
+
+ @property
+ def um(self): return self.unitmodel.um
+
+ @property
+ def dm(self): return self.unitmodel.dm
+
+
+ def newProcessID(self):
+ try:
+ return "%s:process:%d" % (self.platform, self._nextID)
+ finally:
+ self._nextID += 1
+
+ def newAggregateID(self):
+ try:
+ return "%s:aggregate:%d" % (self.platform, self._nextID)
+ finally:
+ self._nextID += 1
+
+ def newWatchdogID(self):
+ try:
+ return "%s:watchdog:%d" % (self.platform, self._nextID)
+ finally:
+ self._nextID += 1
+
+
+ def measure(self, credential, query):
+ #TODO: docs
+ '''
+ '''
+ g = self.ontology.g
+ sio = StringIO(query)
+ g.parse(source = sio)
+ responses = []
+ errors = []
+ queries = self.QI.inferBundleQueries(qgraph = g)
+ self.log(shortmsg = "measurements starting...", message = "Attempt to launch %d measurement threads" % len(queries))
+ for q in queries:
+ feature_uri = q.feature
+ domain = self.ontology.ns('task')['Substrate']
+ taskgen = self.taskmodel.inferTasks(domain, feature_uri)
+ no_tool = True
+ (resource_uri, resource) = q.resource
+ #we are ugly here: use the first tool
+ for task_uri, _ in taskgen:
+ print task_uri
+ no_tool = False
+ _, task = self.newTask(task = task_uri, cred = credential, resource = resource, parameters = q.paramlist)
+ if q.samplechain:
+ task.strategy = STRAT_PERIODICAL
+ task.enable()
+ # we apply some aggregation to the data
+ flow = []
+ for skeleton, parlist in q.samplechain:
+ flow.append((skeleton, parlist.formkeyvaldict()))
+ _, A = self.am.newAggregator(task.datasource, CellRequestByFeature(feature = q.feature), flow)
+ formatter = JsonFormatter(datasource = A)
+ while True:
+ try:
+ task.dataAdded.wait( 15 )
+ responses.append( formatter.serialize() )
+ break
+ except SamplerError:
+ task.dataAdded.clear()
+ sleep(1)
+ else:
+ task.strategy = STRAT_ONDEMAND
+ task.enable()
+ task.dataAdded.wait( 15 )
+ formatter = JsonFormatter(datasource = task.datasource)
+ formatter.reader.extract(cellrequest = [
+ CellRequestByName(name = "Run"),
+ CellRequestByFeature(feature = feature_uri)
+ ])
+ responses.append( formatter.serialize() )
+ task.destroy()
+ if no_tool:
+ err_description = "No tools to measure %s @ %s" % (feature_uri, resource_uri)
+ errors.append(err_description)
+ self.log(shortmsg = "Limited result set", message = err_description)
+ useful_data = ",\n".join( responses )
+ error_data = ",\n".join(errors)
+ if len(errors):
+ if len(useful_data):
+ response = "[%s,\n{\"errors\" : \"%s\"}]" % (useful_data, error_data)
+ else:
+ response = "[{\"errors\" : \"%s\"}]" % (error_data)
+ else:
+ response = "[%s]" % useful_data
+ return response
+
+ def launchTasks(self, credential, query):
+ #TODO: many things in common with measure!!!
+ g = self.ontology.g
+ sio = StringIO(query)
+ g.parse(source = sio)
+ taskID = self.newProcessID()
+ idstore = self.subtaskIDs[taskID] = []
+ formatters = self.formatters[taskID] = []
+ for q in self.QI.inferBundleQueries(qgraph = g):
+ feature_uri = q.feature
+ domain = self.ontology.ns('task')['Slice']
+ taskgen = self.taskmodel.inferTasks(domain, feature_uri)
+ #we are ugly here: use the first tool
+ for task_uri, _ in taskgen:
+ subtaskID, task = self.newTask(task = task_uri, cred = credential, resource = q.resource[1], parameters = q.paramlist)
+ task.strategy = STRAT_PERIODICAL
+ task.enable()
+ idstore.append(subtaskID)
+ f = q.formatter(datasource = task.datasource)
+ formatters.append(f)
+ if len(idstore):
+ print "KONYVELT", taskID
+ return taskID
+ else:
+ self.subtaskIDs.pop(taskID)
+ self.formatters.pop(taskID)
+ return None
+
+ def newTask(self, task, cred, resource = None, parameters = ParameterList()):
+ '''
+ @summary: initialize a Task object, which is referenced by a uri
+ @param task: the reference to the task description
+ @type task: URIRef
+ @param cred: an iterable over dictionaries, which are used as input parameters to initialize Credential templates passed to the Task object for authentication, authorization purposes
+ @type cred: dict generator
+ @param resource: the resource to measure
+ @type resource: resource or None
+ @param parameters: the parameter list to refresh the default parameters of the Task object
+ @type parameters: ParameterList
+ @return: the tuple of taskID and the initialized measurement Task object
+ @rtype: int, Task
+ '''
+ name = self.ontology._tail(task)
+ credset = self.taskmodel.inferCredentialOf(task)
+ driver = self.taskmodel.inferDriverOf(task)
+ hdr = self.taskmodel.inferDataheaderOf(task)
+ hooks = self.taskmodel.inferHookdefinitionsOf(task)
+ hookpar = self.taskmodel.inferHookparametersOf(task)
+ taskparameters = self.taskmodel.inferParametersOf(task)
+
+ taskparameters.update_by_list(parameters)
+
+ #TODO: maybe better push resource to the Task as an argument
+ if isinstance(resource, node):
+ addr, unit = resource.get_ipaddress("eth0")
+ taskparameters.update("SourceAddress", addr, unit)
+ else:
+ print "EEEEE unhandled resource", resource
+# print taskparameters
+
+ while len(credset):
+ ct = credset.pop()
+ for c in cred:
+ try:
+ credential = ct(**c)
+ except:
+ # credential mismatch go on with the next
+ continue
+ try:
+ return self.stm.generate(name = name, driver = driver, dataheader = hdr,
+ hookimplementations = hooks, parameters = taskparameters, credential = credential, **hookpar)
+ except BadAuthenticationType:
+ pass
+ raise TaskError("Cannot initialize the Task with the credential set provided for %s" % name)
+
+ def delTask(self, taskidentifier):
+ self.stm.pop( taskidentifier )
+
+ def getTask(self, taskidentifier):
+ return self.stm[ taskidentifier ]
+
+ def attachAggregators(self, credential, query):
+ g = self.ontology.g
+ sio = StringIO(query)
+ g.parse(source = sio)
+ aggregatorID = self.newAggregateID()
+ idstore = self.aggregatorIDs[aggregatorID] = []
+ formatters = self.formatters[aggregatorID] = []
+
+ for q in self.QI.inferSampleManipulationQueries(qgraph = g):
+ _, sourcetype, _ = q.sourceid.split(':')
+ if sourcetype == 'process':
+ #FIXME: csak egy subprocessre muxik perpill
+ sourceid = self.subtaskIDs[q.sourceid][0]
+
+ ds = self.stm[ sourceid ].datasource
+ elif sourcetype == 'aggregate':
+ sourceid = self.aggregatorIDs[q.sourceid][0]
+
+ ds = self.am[ sourceid ]
+ else:
+ raise Exception("Unknown source type %s" % sourcetype)
+ cr = CellRequestByFeature(feature = q.feature)
+ aggID, A = self.am.newAggregator(dataSource = ds, cellrequest = cr, commandflow = q.samplechain)
+ idstore.append(aggID)
+ f = q.formatter(datasource = A)
+ formatters.append(f)
+ if len(idstore):
+ return aggregatorID
+ else:
+ self.aggregatorIDs.pop(aggregatorID)
+ self.formatters.pop(aggregatorID)
+ return None
+
+ def delAggregator(self, aggregatoridentifier):
+ self.am.pop( aggregatoridentifier )
+
+ def getAggregator(self, aggregatoridentifier):
+ return self.am[ aggregatoridentifier ]
+
+ def attachWatchdogs(self, credential, query):
+ g = self.ontology.g
+ sio = StringIO(query)
+ g.parse(source = sio)
+ watchdogID = self.newWatchdogID()
+ idstore = self.watchdogIDs[watchdogID] = []
+
+ for q in self.QI.inferConditionQueries(qgraph = g):
+ _, sourcetype, _ = q.sourceid.split(':')
+ if sourcetype == 'process':
+ #FIXME: csak egy subprocessre muxik perpill
+ sourceid = self.subtaskIDs[q.sourceid][0]
+
+ ds = self.stm[ sourceid ].datasource
+ elif sourcetype == 'aggregate':
+ sourceid = self.aggregatorIDs[q.sourceid][0]
+
+ ds = self.am[ sourceid ]
+ else:
+ raise Exception("Unknown source type %s" % sourcetype)
+
+ #ITT A TENNIVALO
+ cr = CellRequestByFeature(feature = q.feature)
+ watchID, _ = self.wm.newConditional(dataSource = ds, cellrequest = cr, conditiontype = q.conditiontype, operation = q.operation)
+
+
+
+
+ idstore.append(watchID)
+ if len(idstore):
+ return watchdogID
+ else:
+ self.watchdogIDs.pop(watchdogID)
+ return None
+
+ def delWatchdog(self, watchdogidentifier):
+ self.wm.pop( watchdogidentifier )
+
+
+ def checkWatchdog(self, watchdogidentifier):
+ resp = []
+ for watchID in self.watchdogIDs[watchdogidentifier]:
+ WD = self.wm[watchID]
+ resp.append("\'%s\': %s" % (WD.name, WD.value))
+ return "{\n\t%s\n}" % ",\n\t".join(resp)
diff --git a/Monitoring/MonitoringService/Service/WatchdogManager.py b/Monitoring/MonitoringService/Service/WatchdogManager.py
new file mode 100644
index 0000000..e40f1c3
--- /dev/null
+++ b/Monitoring/MonitoringService/Service/WatchdogManager.py
@@ -0,0 +1,46 @@
+'''
+Created on Dec 10, 2012
+
+@author: steger
+'''
+from DataProcessing.LinearCombination import LinearCombination
+from DataProcessing.Bool import IsPositive, IsNotPositive, IsNegative,\
+ IsNotNegative
+
+class WatchdogManager(object):
+ def __init__(self, am):
+ self._id = 0;
+ self._conditionals = {}
+ self.am = am
+ self._dep = {}
+
+ def newConditional(self, dataSource, cellrequest, conditiontype, operation):
+ deps = []
+ if conditiontype in [ IsPositive, IsNegative, IsNotNegative, IsNotPositive ]:
+ lincomb = LinearCombination()
+ for factor, commandflow in operation:
+ Aid, A = self.am.newAggregator(dataSource, cellrequest, commandflow)
+ lincomb.addTerm(factor, A)
+ deps.append(Aid)
+ DS = conditiontype(lincomb)
+ self._id += 1
+ self._conditionals[ self._id ] = DS
+ self._dep[ self._id ] = deps[:]
+ return self._id, DS
+
+ def __getitem__(self, watchdogid):
+ try:
+ return self._conditionals[ watchdogid ]
+ except:
+ raise Exception("Watchdog with id %s not found" % watchdogid)
+
+ def pop(self, watchdogid):
+ try:
+ self._conditionals.pop( watchdogid )
+ deps = self._dep.pop(watchdogid)
+ while len(deps):
+ Aid = deps.pop()
+ self.am.pop(Aid)
+ except KeyError:
+ print "WW: Watchdog with id %s not found" % watchdogid
+ \ No newline at end of file
diff --git a/Monitoring/MonitoringService/Service/__init__.py b/Monitoring/MonitoringService/Service/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/Monitoring/MonitoringService/Service/__init__.py
diff --git a/Monitoring/MonitoringService/Service/interface.py b/Monitoring/MonitoringService/Service/interface.py
new file mode 100644
index 0000000..2ba917d
--- /dev/null
+++ b/Monitoring/MonitoringService/Service/interface.py
@@ -0,0 +1,344 @@
+'''
+Created on 08.08.2011
+
+@author: steger, jozsef
+'''
+from Service.MonitoringService import MonitoringService
+import logging
+
+class InterfaceError(Exception):
+ pass
+
+#TODO: add and handle bindings at this level
+class MSInterface(object):
+ '''
+ @summary: Implements a thin service layer on top of the MonitoringService instance
+ to collect methods that need to be exported and mapped in the NOVI API.
+ It also provides a reference to the framework to be able to communicate with
+ remote MonitoringService instances. The log() method is a place holder
+ to sink information to be pushed in the NOVI UserFeedback service.
+ The emit() method is a place holder to sink signals to be pushed in the NOVI
+ Policy Service component installed on top of the same platform.
+ '''
+
+ def __init__(self, framework, reference, config_owl):
+ '''
+ Constructor
+ @param framework: a service which provides getService() method to look up MonSrv instances of different reference
+ @type framework: Framework
+ @param reference: the name of the platform
+ @type reference: str
+ @param config_owl: platform specific configuration model
+ @type config_owl: str
+ '''
+ self._framework = framework
+ self.platform = reference
+ self._ms = MonitoringService(self, config_owl)
+ self.logger = logging.getLogger(name = "NOVI.MSI.%s" % reference)
+
+ @property
+ def service(self):
+ '''
+ @return: the underlying monitoring service component
+ @rtype: MonitoringService
+ '''
+ return self._ms
+
+ @property
+ def proxy(self):
+ '''
+ @return: a proxy service to look up the rest of the monitoring service components
+ @rtype: Framework
+ '''
+ return self._framework
+
+ def dispatchID(self, identifier):
+ '''
+ @summary: this method finds the MonitoringService instance that is responsible for handling an identified Task or Aggregate
+ @param identifier: identifier of a task or aggregate, it follows the form: <platform>:<process|aggregate>:<id>
+ @type identifier: string
+ @return: the monitoring service instance
+ @rtype: MonitoringService
+ '''
+ try:
+ platform, _, _ = identifier.split(':')
+ if self.platform == platform:
+ return self.service
+ return self.framework.getService(platform)
+ except ValueError:
+ raise InterfaceError("Wrong identifier format")
+
+ def log(self, shortmsg, message):
+ # overridden by the JAVA wrapper
+ self.logger.info("[%s] %s" % (shortmsg, message))
+
+ def emit(self, what):
+ # overridden by the JAVA wrapper
+ self.framework.getPolicyService(self.platform).trigger(what)
+
+ # Test purpose function
+ def echo(self, platform):
+ '''
+ @summary: An integration tester function (to be exported public)
+ @param platform: name of the platform
+ @type platform: string
+ @return: messages of the platforms taking part in the message flow
+ @rtype: string
+ '''
+ self.logger.info("[echo] calling %s" % platform)
+ otherservice = self._framework.getInterface(platform).service
+ return "%s -> %s" % (str(self.service), str(otherservice))
+
+
+ # Substrate monitoring function
+ def measure(self, credential, query):
+ '''
+ @summary: Method to handle substrate monitoring queries (to be exported public)
+ @param credential:
+ @type credential:
+ @param query: an owl document containing several BundleQuery instances
+ @type query: string
+ @return: response to the query
+ @rtype: string
+ '''
+ #TODO: split query and concatenate results
+ return self.service.measure(credential, query)
+
+ # Slice monitoring functions
+ def sliceTasks(self, credential, query):
+ raise InterfaceError("sliceTasks() method is not implemented")
+
+ def addTask(self, credential, query):
+ '''
+ @summary: Method to start slice monitoring tasks (to be exported public)
+ @param credential:
+ @type credential:
+ @param query: an owl document containing several BundleQuery instances
+ @type query: string
+ @return: process identifier
+ @rtype: string
+ '''
+ #TODO: investigate if the service instance under this interface should be the boss
+ return self.service.launchTasks(credential, query)
+
+ def describeTaskData(self, credential, query):
+ '''
+ @summary: Method to retrieve meta data of task data (to be exported public)
+ @param credential:
+ @type credential:
+ @param query:
+ @type query: string
+ @return: serialize the header of the data tables
+ @rtype: string
+ '''
+ taskID = query
+ ms = self.dispatchID(identifier = taskID)
+ #TODO: move this in the MonitoringService
+ headers = map(lambda x: x.header(), ms.formatters[taskID])
+ return "[%s]" % "\n,\n".join(headers)
+
+ def fetchTaskData(self, credential, query):
+ '''
+ @summary: Method to retrieve task data collected since last fetch or the start (to be exported public)
+ @param credential:
+ @type credential:
+ @param query:
+ @type query: string
+ @return: serialize the appended content of the data tables
+ @rtype: string
+ '''
+ taskID = query
+ ms = self.dispatchID(identifier = taskID)
+ #TODO: move this in the MonitoringService
+ response = []
+ try:
+ for f in ms.formatters[taskID]:
+ response.append( f.serialize() )
+ except Exception, e:
+ print "EEE", e
+ pass
+ return "[%s]" % "\n,\n".join(response)
+
+ def modifyTask(self, credential, query):
+ raise InterfaceError("modifyTask() method is not implemented")
+
+ def removeTask(self, credential, query):
+ '''
+ @summary: Method to remove a slice measurement task (to be exported public)
+ @param credential:
+ @type credential:
+ @param query:
+ @type query: string
+ '''
+ taskID = query
+ ms = self.dispatchID(identifier = taskID)
+ #TODO: move this in the MonitoringService
+ try:
+ subtaskids = ms.subtaskIDs.pop(taskID)
+ ms.formatters.pop(taskID)
+ while len(subtaskids):
+ subtaskid = subtaskids.pop()
+ ms.delTask(taskidentifier = subtaskid)
+ except KeyError:
+ # the taskID does not belong to me
+ pass
+
+ def enableTask(self, credential, query):
+ '''
+ @summary: Method to enable a slice measurement task (to be exported public)
+ @param credential:
+ @type credential:
+ @param query:
+ @type query: string
+ '''
+ taskID = query
+ ms = self.dispatchID(identifier = taskID)
+ try:
+ for subtaskid in ms.subtaskIDs[taskID]:
+ t = ms.getTask(taskidentifier = subtaskid)
+ t.enable()
+ except KeyError:
+ # the taskID does not belong to me
+ pass
+
+ def disableTask(self, credential, query):
+ '''
+ @summary: Method to disable a slice measurement task temporarily (to be exported public)
+ @param credential:
+ @type credential:
+ @param query:
+ @type query: string
+ '''
+ taskID = query
+ ms = self.dispatchID(identifier = taskID)
+ try:
+ for subtaskid in ms.subtaskIDs[taskID]:
+ t = ms.getTask(taskidentifier = subtaskid)
+ t.disable()
+ except KeyError:
+ # the taskID does not belong to me
+ pass
+
+ def getTaskStatus(self, credential, query):
+ '''
+ @summary: Method to check the state of a slice measurement task (to be exported public)
+ @param credential:
+ @type credential:
+ @param query:
+ @type query: string
+ @return: True if the tasks are running
+ @rtype: boolean
+ '''
+ taskID = query
+ ms = self.dispatchID(identifier = taskID)
+ try:
+ for subtaskid in ms.subtaskIDs[taskID]:
+ t = ms.getTask(taskidentifier = subtaskid)
+ if t.state == t.STATE_RUNNING:
+ return True
+ except KeyError:
+ # the taskID does not belong to me
+ pass
+ return False
+
+ def addAggregator(self, credential, query):
+ '''
+ @summary: Method to define new data manipulation on slice monitoring data (to be exported public)
+ @param credential:
+ @type credential:
+ @param query: an owl document containing several SampleManipulationQuery instances
+ @type query: string
+ @return: aggregator identifier
+ @rtype: string
+ '''
+ #TODO: investigate if the service instance under this interface should be the boss
+ return self.service.attachAggregators(credential, query)
+
+ def removeAggregator(self, credential, query):
+ '''
+ @summary: Method to remove data manipulation on slice monitoring data (to be exported public)
+ @param credential:
+ @type credential:
+ @param query:
+ @type query: string
+ '''
+ aggregatorID = query
+ ms = self.dispatchID(identifier = aggregatorID)
+ try:
+ aggregatorids = ms.aggregatorIDs.pop(aggregatorID)
+ ms.formatters.pop(aggregatorID)
+ while len(aggregatorids):
+ aggregatorid = aggregatorids.pop()
+ ms.delAggregator(aggregatorid)
+ except KeyError:
+ # the aggregatorID does not belong to me
+ pass
+
+ def fetchAggregatorData(self, credential, query):
+ '''
+ @summary: Method to refresh and serialize results of data manipulation on slice monitoring data (to be exported public)
+ @param credential:
+ @type credential:
+ @param query:
+ @type query: string
+ @return: result of aggregators
+ @rtype: string
+ '''
+ aggregatorID = query
+ ms = self.dispatchID(identifier = aggregatorID)
+ response = []
+ try:
+ for f in ms.formatters[aggregatorID]:
+ f.source.process()
+ print "ALMA", f.source
+ print "ALMA", f.source.source
+ print "ALMA", f.source.data
+ response.append( f.serialize() )
+ except Exception, e:
+ print "EEE", e
+ pass
+ return "[%s]" % "\n,\n".join(response)
+
+ def addWatchdog(self, credential, query):
+ '''
+ @summary:
+ @param credential:
+ @type credential:
+ @param query: an owl document containing several SampleManipulationQuery instances
+ @type query: string
+ @return: watchdog identifier
+ @rtype: string
+ '''
+ #TODO: investigate if the service instance under this interface should be the boss
+ return self.service.attachWatchdogs(credential, query)
+
+ def removeCondition(self, credential, query):
+ '''
+ @summary: Method to remove conditions bound to slice monitoring data (to be exported public)
+ @param credential:
+ @type credential:
+ @param query:
+ @type query: string
+ '''
+ watchdogID = query
+ ms = self.dispatchID(identifier = watchdogID)
+ try:
+ watchdogids = ms.watchdogIDs.pop(watchdogID)
+ while len(watchdogids):
+ watchdogid = watchdogids.pop()
+ ms.delWatchdog(watchdogid)
+ except KeyError:
+ # the aggregatorID does not belong to me
+ pass
+
+ def checkCondition(self, credential, query):
+ '''
+ @summary: Method to examine a conditions bound to slice monitoring data (to be exported public)
+ @param credential:
+ @type credential:
+ @param query:
+ @type query: string
+ '''
+ watchdogID = query
+ ms = self.dispatchID(identifier = watchdogID)
+ return ms.checkWatchdog(watchdogID)
diff --git a/Monitoring/MonitoringService/Service/mock_framework.py b/Monitoring/MonitoringService/Service/mock_framework.py
new file mode 100644
index 0000000..8a8874b
--- /dev/null
+++ b/Monitoring/MonitoringService/Service/mock_framework.py
@@ -0,0 +1,284 @@
+'''
+Created on Nov 20, 2012
+
+@author: steger
+'''
+from Service.interface import MSInterface
+import logging
+from logging.handlers import TimedRotatingFileHandler
+from os import path, unlink
+from flask import Flask, request
+from threading import Thread
+from Example.Platforms import federation
+from select import select
+from sys import stdin
+import httplib2
+from os import urandom
+
+#FIXME: DIRTY!
+from Example.credentials import novisaCredentialIARGS
+import traceback
+import sys
+
+class FrameworkError(Exception):
+ pass
+
+class PolicyMock(object):
+ def __init__(self):
+ fn = "/tmp/ps.log"
+ if path.exists(fn):
+ unlink(fn)
+ hdlr = TimedRotatingFileHandler(filename = fn)
+ self.logger = logging.getLogger("NOVI.PS")
+ self.logger.setLevel(level = logging.DEBUG)
+ self.logger.addHandler(hdlr = hdlr)
+
+ def trigger(self, what):
+ self.logger.info(what)
+
+class Framework(object):
+ '''
+ This class mimics the integration framework. It helps to look up remote Monitoring Service instances
+ '''
+
+ def __init__(self):
+ '''
+ Constructor
+ '''
+ self._if = {}
+ self._pol = PolicyMock()
+
+ def add(self, platform, config_owl):
+ fn = "/tmp/ms_%s.log" % platform
+ if path.exists(fn):
+ unlink(fn)
+ hdlr = TimedRotatingFileHandler(filename = fn)
+ l = logging.getLogger("NOVI.MS.%s" % platform)
+ l.setLevel(level = logging.DEBUG)
+ l.addHandler(hdlr = hdlr)
+ l = logging.getLogger("NOVI.MSI.%s" % platform)
+ l.setLevel(level = logging.DEBUG)
+ l.addHandler(hdlr = hdlr)
+ iface = MSInterface(self, platform, config_owl)
+ self._if[platform] = iface
+ return iface
+
+ def getInterface(self, platform):
+ try:
+ return self._if[platform]
+ except:
+ print "EE: %s platform not found" % platform
+ raise FrameworkError
+
+ def getPolicyService(self, platform):
+ return self._pol
+
+ def serviceList(self):
+ return self._if.values()
+
+def app_launch(plif, port):
+ app = Flask(__name__)
+ app.secret_key = urandom(24)
+ t = Thread(target = run, args = (app, port))
+ t.start()
+
+ # these are here for test and control
+ @app.route("/", methods = ['GET'])
+ def hello():
+ return "Hello world"
+
+ @app.route('/shutdown', methods = ['POST'])
+ def shutdown():
+ shutdown_server()
+ return 'Server shutting down...'
+
+
+ # these are the real service interfaces
+ @app.route("/echo", methods = ['POST'])
+ def echo():
+ platform = request.form['platform']
+ try:
+ return plif.echo(platform)
+ except Exception, e:
+ print "-"*60
+ traceback.print_exc(file=sys.stdout)
+ print "="*60
+ return "Error %s" % e
+
+ @app.route("/measure", methods = ['POST'])
+ def measure():
+ q = request.form['query']
+ try:
+ return plif.measure(credential = [novisaCredentialIARGS], query = q)
+ except Exception, e:
+ print "-"*60
+ traceback.print_exc(file=sys.stdout)
+ print "="*60
+ return "Error %s" % e
+
+ @app.route("/addTask", methods = ['POST'])
+ def addTask():
+ try:
+ q = request.form['query']
+ return plif.addTask(credential = [novisaCredentialIARGS], query = q)
+ except Exception, e:
+ print "-"*60
+ traceback.print_exc(file=sys.stdout)
+ print "-"*60
+ return "Error %s" % e
+
+ @app.route("/fetchTaskData", methods = ['POST'])
+ def fetchTaskData():
+ q = request.form['query']
+ return plif.fetchTaskData(credential = [novisaCredentialIARGS], query = q)
+
+ @app.route("/removeTask", methods = ['POST'])
+ def removeTask():
+ try:
+ q = request.form['query']
+ plif.removeTask(credential = [novisaCredentialIARGS], query = q)
+ except Exception, e:
+ print "Error %s" % e
+ return "OK"
+
+ @app.route("/describeTaskData", methods = ['POST'])
+ def describeTaskData():
+ q = request.form['query']
+ return plif.describeTaskData(credential = [novisaCredentialIARGS], query = q)
+
+ @app.route("/getTaskStatus", methods = ['POST'])
+ def getTaskStatus():
+ try:
+ q = request.form['query']
+ return str( plif.getTaskStatus(credential = [novisaCredentialIARGS], query = q) )
+ except Exception, e:
+ print "-"*60
+ traceback.print_exc(file=sys.stdout)
+ print "-"*60
+ return "Error %s" % e
+
+ @app.route("/enableTask", methods = ['POST'])
+ def enableTask():
+ q = request.form['query']
+ plif.enableTask(credential = [novisaCredentialIARGS], query = q)
+ return "OK"
+
+ @app.route("/disableTask", methods = ['POST'])
+ def disableTask():
+ q = request.form['query']
+ plif.disableTask(credential = [novisaCredentialIARGS], query = q)
+ return "OK"
+
+ @app.route("/addAggregator", methods = ['POST'])
+ def addAggregator():
+ try:
+ q = request.form['query']
+ return plif.addAggregator(credential = [novisaCredentialIARGS], query = q)
+ except Exception, e:
+ print "-"*60
+ traceback.print_exc(file=sys.stdout)
+ print "-"*60
+ return "Error %s" % e
+
+ @app.route("/fetchAggregatorData", methods = ['POST'])
+ def fetchAggregatorData():
+ q = request.form['query']
+ return plif.fetchAggregatorData(credential = [novisaCredentialIARGS], query = q)
+
+ @app.route("/removeAggregator", methods = ['POST'])
+ def removeAggregator():
+ try:
+ q = request.form['query']
+ plif.removeAggregator(credential = [novisaCredentialIARGS], query = q)
+ except Exception, e:
+ print "Error %s" % e
+ return "OK"
+
+ @app.route("/addCondition", methods = ['POST'])
+ def addCondition():
+ try:
+ q = request.form['query']
+ return plif.addWatchdog(credential = [novisaCredentialIARGS], query = q)
+ except Exception, e:
+ print "-"*60
+ traceback.print_exc(file=sys.stdout)
+ print "-"*60
+ return "Error %s" % e
+
+ @app.route("/removeCondition", methods = ['POST'])
+ def removeCondition():
+ try:
+ q = request.form['query']
+ plif.removeCondition(credential = [novisaCredentialIARGS], query = q)
+ except Exception, e:
+ print "Error %s" % e
+ return "OK"
+
+ @app.route("/checkCondition", methods = ['POST'])
+ def checkCondition():
+ try:
+ q = request.form['query']
+ return plif.checkCondition(credential = [novisaCredentialIARGS], query = q)
+ except Exception, e:
+ print "-"*60
+ traceback.print_exc(file=sys.stdout)
+ print "-"*60
+ return "Error %s" % e
+
+
+
+ return t
+
+
+def run(app, port):
+ app.run(debug = False, port = port)
+
+def shutdown_server():
+ func = request.environ.get('werkzeug.server.shutdown')
+ if func is None:
+ raise RuntimeError('Not running with the Werkzeug Server')
+ func()
+
+def emit(port):
+ try:
+ h = httplib2.Http(timeout = 10)
+ url = "http://localhost:%d/shutdown" % port
+ resp, _ = h.request(uri = url, method = "POST")
+ if resp.status != 200:
+ print "Service responded with status %s" % resp.status
+ except Exception, e:
+ print "Error contacting server @ localhost:%d: (%s)" % (port, e)
+
+def start_servers():
+ fw = Framework()
+ t = []
+ for platform, (port, config_owl) in federation.iteritems():
+ plif = fw.add(platform, config_owl)
+ t.append( app_launch(plif, port) )
+ return fw, t
+
+def stop_servers(t):
+ # POST the shutdown methods
+ for port, _ in federation.values():
+ emit(port)
+
+ # join threads
+ while len(t):
+ st = t.pop()
+ st.join()
+
+if __name__ == "__main__":
+ print "INIT"
+ # start services as separate threads
+ _, t = start_servers()
+
+ # wait for a keyboard interrupt
+ print "PRESS ^C to stop"
+ while True:
+ try:
+ select([stdin],[],[])
+ except KeyboardInterrupt:
+ break
+
+ stop_servers(t)
+ print "OK" \ No newline at end of file
diff --git a/Monitoring/MonitoringService/Service/test.py b/Monitoring/MonitoringService/Service/test.py
new file mode 100644
index 0000000..1b6b78b
--- /dev/null
+++ b/Monitoring/MonitoringService/Service/test.py
@@ -0,0 +1,442 @@
+'''
+Created on Aug 10, 2011
+
+@author: steger
+'''
+import unittest
+from rdflib import Graph, Namespace, Literal
+from Service.mock_framework import start_servers, stop_servers
+import httplib2
+from Example.Platforms import federation
+from urllib import urlencode
+from time import sleep
+
+fw, t = start_servers()
+from MonitoringService import ontology
+
+class Test(unittest.TestCase):
+ headers = {'Content-type': 'application/x-www-form-urlencoded'}
+ proxy = httplib2.Http(cache = "/tmp/ms_client_cache", timeout = 10)
+
+ def setUp(self):
+ self.MSI_planetlab = fw.getInterface('PlanetLab')
+ self.PL_O = self.MSI_planetlab.service.ontology
+ NS = self.ns
+ self.S = NS('stat')['UnmodifiedExtractOfFeatureSamples']
+ self.F = NS('query')['Formatter_JSON']
+ sleep (1)
+
+ def tearDown(self):
+ pass
+
+ def test_echo(self):
+ p1 = "PlanetLab"
+ p2 = "FEDERICA"
+
+ h = httplib2.Http(cache = "/tmp/ms_client_cache", timeout = 10)
+ url = "http://localhost:%d/echo" % federation[p1][0]
+ data = urlencode( {'platform': p2} )
+ resp, response = h.request(uri = url, method = "POST", body = data, headers = self.headers)
+ self.assertTrue(resp.status == 200, "Service @%s responded with status %s" % (p1, resp.status))
+ i, o = response.split("->")
+ got = (i.split("@")[-1].strip(), o.split("@")[-1].strip())
+ expect = (p1, p2)
+ self.assertEquals(expect, got, "Echo reply differs from expected (%s): %s" % (expect, response))
+
+ data = urlencode( {'platform': p1} )
+ resp, response = h.request(uri = url, method = "POST", body = data, headers = self.headers)
+ self.assertTrue(resp.status == 200, "Service @%s responded with status %s" % (p1, resp.status))
+ i, o = response.split("->")
+ got = (i.split("@")[-1].strip(), o.split("@")[-1].strip())
+ expect = (p1, p1)
+ self.assertEquals(expect, got, "Echo reply differs from expected (%s): %s" % (expect, response))
+
+ @staticmethod
+ def q_enc(q):
+ return urlencode( {'query': q} )
+
+ @property
+ def mns(self):
+ return Namespace("http://foo.bar/req.owl#")
+
+ @property
+ def ns(self):
+ return self.PL_O.ns
+
+ @property
+ def ns_type(self):
+ return self.ns('rdf')['type']
+
+ def measure(self, q, expect = 26):
+ url = "http://localhost:%d/measure" % federation['PlanetLab'][0]
+ data = self.q_enc(q)
+ resp, response = self.proxy.request(uri = url, method = "POST", body = data, headers = self.headers)
+ self.assertTrue(resp.status == 200, "Service responded with status %s" % (resp.status))
+ self.tlen(response, expect)
+ return response
+
+ def addtask(self, q):
+ url = "http://localhost:%d/addTask" % federation['PlanetLab'][0]
+ data = self.q_enc(q)
+ resp, response = self.proxy.request(uri = url, method = "POST", body = data, headers = self.headers)
+ self.assertTrue(resp.status == 200, "Service responded with status %s" % (resp.status))
+ self.assertTrue(response.startswith('PlanetLab:process:'), "wrong process id %s" % response)
+ return response
+
+ def fetchtaskdata(self, q, expect = 26):
+ url = "http://localhost:%d/fetchTaskData" % federation['PlanetLab'][0]
+ data = self.q_enc(q)
+ resp, response = self.proxy.request(uri = url, method = "POST", body = data, headers = self.headers)
+ self.assertTrue(resp.status == 200, "Service responded with status %s" % (resp.status))
+ self.tlen(response, expect)
+ return response
+
+ def deltask(self, q):
+ url = "http://localhost:%d/removeTask" % federation['PlanetLab'][0]
+ data = self.q_enc(q)
+ resp, response = self.proxy.request(uri = url, method = "POST", body = data, headers = self.headers)
+ self.assertTrue(resp.status == 200, "Service responded with status %s" % (resp.status))
+ self.assertEqual(response, "OK", "Got: %s" % response)
+
+ def addaggregator(self, q):
+ url = "http://localhost:%d/addAggregator" % federation['PlanetLab'][0]
+ data = self.q_enc(q)
+ resp, response = self.proxy.request(uri = url, method = "POST", body = data, headers = self.headers)
+ self.assertTrue(resp.status == 200, "Service responded with status %s" % (resp.status))
+ self.assertTrue(response.startswith('PlanetLab:aggregate:'), "wrong process id %s" % response)
+ return response
+
+ def fetchaggregate(self, q, expect = 26):
+ url = "http://localhost:%d/fetchAggregatorData" % federation['PlanetLab'][0]
+ data = self.q_enc(q)
+ resp, response = self.proxy.request(uri = url, method = "POST", body = data, headers = self.headers)
+ self.assertTrue(resp.status == 200, "Service responded with status %s" % (resp.status))
+ self.tlen(response, expect)
+ return response
+
+ def delaggregator(self, q):
+ url = "http://localhost:%d/removeAggregator" % federation['PlanetLab'][0]
+ data = self.q_enc(q)
+ resp, response = self.proxy.request(uri = url, method = "POST", body = data, headers = self.headers)
+ self.assertTrue(resp.status == 200, "Service responded with status %s" % (resp.status))
+ self.assertEqual(response, "OK", "Got: %s" % response)
+
+ def addcondition(self, q):
+ url = "http://localhost:%d/addCondition" % federation['PlanetLab'][0]
+ data = self.q_enc(q)
+ resp, response = self.proxy.request(uri = url, method = "POST", body = data, headers = self.headers)
+ self.assertTrue(resp.status == 200, "Service responded with status %s" % (resp.status))
+ self.assertTrue(response.startswith('PlanetLab:watchdog:'), "wrong process id %s" % response)
+ return response
+
+ def delcondition(self, q):
+ url = "http://localhost:%d/removeCondition" % federation['PlanetLab'][0]
+ data = self.q_enc(q)
+ resp, response = self.proxy.request(uri = url, method = "POST", body = data, headers = self.headers)
+ self.assertTrue(resp.status == 200, "Service responded with status %s" % (resp.status))
+ self.assertEqual(response, "OK", "Got: %s" % response)
+
+ def checkcondition(self, q):
+ url = "http://localhost:%d/checkCondition" % federation['PlanetLab'][0]
+ data = self.q_enc(q)
+ resp, response = self.proxy.request(uri = url, method = "POST", body = data, headers = self.headers)
+ self.assertTrue(resp.status == 200, "Service responded with status %s" % (resp.status))
+ self.assertGreater(len(response.splitlines()), 2, "Got: %s" % response)
+ return response
+
+
+ def tlen(self, response, expect = 26):
+# print response
+ self.assertTrue(response, "Got nothing due to former errors")
+ if expect > 1:
+ self.assertGreater(len(response.splitlines()), expect, "got empty measurement response")
+ else:
+ self.assertGreater(len(response), 2, "got empty measurement response")
+ return response
+
+
+
+ def new_g(self):
+ g = Graph()
+ for k, (_, ns) in ontology.iteritems():
+ g.bind(k, Namespace(ns))
+ mns = self.mns
+ g.bind('q', mns)
+
+ return g
+
+ def save(self, fn, q):
+ try:
+ with open(fn, 'w') as f:
+ f.write(q)
+ with open("%s.ue" % fn, 'w') as f:
+ f.write( self.q_enc(q) )
+ except:
+ pass
+
+ def test_measure(self):
+ doc = "../../information-model/monitoring-model//monitoringQuery_example.owl"
+ with open(doc) as fp:
+ q = fp.read()
+ self.measure(q)
+
+ def addnode(self, g, resname = 'smilax1', address = '150.254.160.19'):
+ mns = self.mns
+ NS = self.ns
+ TYPE = self.ns_type
+ R = mns[resname]
+ I1 = mns['ifin']
+ I2 = mns['ifout']
+ IPADDR = Literal(address)
+ ADDR = mns['%s_address' % resname]
+ g.add((R, TYPE, NS('core')['Node']))
+ g.add((R, TYPE, NS('core')['Resource']))
+ g.add((R, TYPE, NS('owl')['NamedIndividual']))
+
+ g.add((R, NS('core')['hasInboundInterface'], I1))
+ g.add((R, NS('core')['hasOutboundInterface'], I1))
+ g.add((I1, TYPE, NS('core')['Interface']))
+ g.add((I2, TYPE, NS('core')['Interface']))
+ g.add((I1, NS('core')['hasIPv4Address'], ADDR))
+ g.add((I2, NS('core')['hasIPv4Address'], ADDR))
+ g.add((ADDR, TYPE, NS('owl')['NamedIndividual']))
+ g.add((ADDR, TYPE, NS('unit')['IPAddress']))
+ g.add((ADDR, NS('unit')['hasValue'], IPADDR))
+ return R
+
+ def addPar(self, g, pname = 'partition', pval = '/', ptype = 'String', pdim = 'NameOfSomething'):
+ P = self.mns['par_%s' % pname]
+ NS = self.ns
+ TYPE = self.ns_type
+ g.add((P, TYPE, NS('owl')['NamedIndividual']))
+ g.add((P, TYPE, NS('query')['QueryParameter']))
+ g.add((P, NS('param')['paramName'], Literal(pname)))
+ g.add((P, NS('unit')['hasValue'], Literal(pval)))
+ g.add((P, NS('param')['hasType'], NS('param')[ptype]))
+ g.add((P, TYPE, NS('unit')[pdim]))
+ return P
+
+ def bindNode(self, g, Q, R):
+ g.add((Q, self.ns('query')['hasResource'], R))
+
+ def bindPar(self, g, Q, P):
+ g.add((Q, self.ns('param')['hasParameter'], P))
+
+ def newQ(self, g, what = 'MemoryUtilization'):
+ Q = self.mns['measure_%s' % what]
+ NS = self.ns
+ TYPE = NS('rdf')['type']
+ g.add((Q, TYPE, NS('owl')['NamedIndividual']))
+ g.add((Q, TYPE, NS('query')['BundleQuery']))
+ g.add((Q, NS('feature')['hasFeature'], NS('feature')[what]))
+
+ g.add((Q, NS('stat')['hasSample'], self.S))
+ g.add((Q, NS('query')['hasFormatter'], self.F))
+ return Q
+
+ def createaggregatorquery(self, pid, what = 'MemoryUtilization'):
+ g = self.new_g()
+ R = self.addnode(g)
+ Q = self.mns['aggr_%s' % what]
+ NS = self.ns
+ TYPE = NS('rdf')['type']
+ g.add((Q, TYPE, NS('owl')['NamedIndividual']))
+ g.add((Q, TYPE, NS('query')['SampleManipulationQuery']))
+ g.add((Q, NS('feature')['hasFeature'], NS('feature')[what]))
+
+ g.add((Q, NS('query')['hasFormatter'], self.F))
+ g.add((Q, NS('query')['hasProcessid'], Literal(pid)))
+ g.add((Q, NS('query')['hasResource'], R))
+
+ P = self.mns['par_last5']
+ NS = self.ns
+ TYPE = self.ns_type
+ g.add((P, TYPE, NS('owl')['NamedIndividual']))
+ g.add((P, TYPE, NS('query')['SOP_tail']))
+ g.add((P, NS('param')['paramName'], Literal('tail')))
+ g.add((P, NS('unit')['hasValue'], Literal('5')))
+ g.add((P, NS('param')['hasType'], NS('param')['Integer']))
+ g.add((P, TYPE, NS('unit')['Countable']))
+
+ L5 = self.mns['last5']
+ g.add((L5, TYPE, NS('owl')['NamedIndividual']))
+ g.add((L5, TYPE, NS('stat')['Tail']))
+ g.add((L5, NS('stat')['hasSample'], self.S))
+ g.add((L5, NS('param')['hasParameter'], P))
+ ML5 = self.mns['maxlast5']
+ g.add((ML5, TYPE, NS('owl')['NamedIndividual']))
+ g.add((ML5, TYPE, NS('stat')['Maximum']))
+ g.add((ML5, NS('stat')['hasSample'], L5))
+ g.add((Q, NS('stat')['hasSample'], ML5))
+
+ return g.serialize()
+
+
+ def createconditionquery(self, pid, what = 'MemoryUtilization'):
+ g = self.new_g()
+ R = self.addnode(g)
+ NS = self.ns
+ TYPE = self.ns_type
+
+ P = self.mns['par_last']
+ g.add((P, TYPE, NS('owl')['NamedIndividual']))
+ g.add((P, TYPE, NS('query')['SOP_tail']))
+ g.add((P, NS('param')['paramName'], Literal('tail')))
+ g.add((P, NS('unit')['hasValue'], Literal('1')))
+ g.add((P, NS('param')['hasType'], NS('param')['Integer']))
+ g.add((P, TYPE, NS('unit')['Countable']))
+
+ L = self.mns['last']
+ g.add((L, TYPE, NS('owl')['NamedIndividual']))
+ g.add((L, TYPE, NS('stat')['Tail']))
+ g.add((L, NS('stat')['hasSample'], self.S))
+ g.add((L, NS('param')['hasParameter'], P))
+
+ MIN = self.mns['minall']
+ g.add((MIN, TYPE, NS('owl')['NamedIndividual']))
+ g.add((MIN, TYPE, NS('stat')['Minimum']))
+ g.add((MIN, NS('stat')['hasSample'], self.S))
+
+ ML = self.mns['maxlast1']
+ g.add((ML, TYPE, NS('owl')['NamedIndividual']))
+ g.add((ML, TYPE, NS('stat')['Maximum']))
+ g.add((ML, NS('stat')['hasSample'], L))
+
+ T1 = self.mns['lastitem_%s' % what]
+ g.add((T1, TYPE, NS('owl')['NamedIndividual']))
+ g.add((T1, TYPE, NS('stat')['SampleTerm']))
+# g.add((T, NS('stat')['hasScale'], Literal(1)))
+ g.add((T1, NS('stat')['hasSample'], ML))
+
+ T2 = self.mns['minitem_%s' % what]
+ g.add((T2, TYPE, NS('owl')['NamedIndividual']))
+ g.add((T2, TYPE, NS('stat')['SampleTerm']))
+ g.add((T2, NS('stat')['hasScale'], Literal('-1.5')))
+ g.add((T2, NS('stat')['hasSample'], MIN))
+
+ LCS = self.mns['lcs_%s' % what]
+ g.add((LCS, TYPE, NS('owl')['NamedIndividual']))
+ g.add((LCS, TYPE, NS('stat')['LinearCombinedSample']))
+ g.add((LCS, NS('stat')['hasTerm'], T1))
+ g.add((LCS, NS('stat')['hasTerm'], T2))
+
+ # condition: "last measurement" > "1.5 * min(all measurements)"
+ C = self.mns['cond_%s' % what]
+ g.add((C, TYPE, NS('owl')['NamedIndividual']))
+ g.add((C, TYPE, NS('stat')['IsPositive']))
+ g.add((C, NS('stat')['hasSample'], LCS))
+
+ Q = self.mns['condq_%s' % what]
+ g.add((Q, TYPE, NS('owl')['NamedIndividual']))
+ g.add((Q, TYPE, NS('query')['ConditionQuery']))
+ g.add((Q, NS('feature')['hasFeature'], NS('feature')[what]))
+ g.add((Q, NS('stat')['hasCondition'], C))
+ g.add((Q, NS('query')['hasProcessid'], Literal(pid)))
+ g.add((Q, NS('query')['hasResource'], R))
+
+ return g.serialize()
+
+
+ def test_genq_mem(self):
+ g = self.new_g()
+ Q = self.newQ(g)
+ R = self.addnode(g)
+ self.bindNode(g, Q, R)
+ query = g.serialize()
+ self.save(fn = "/tmp/genq_mem.owl", q = query)
+ self.measure(query, 20)
+
+ def test_genq_cpu(self):
+ g = self.new_g()
+ Q = self.newQ(g, what= 'CPUUtilization')
+ R = self.addnode(g)
+ self.bindNode(g, Q, R)
+ query = g.serialize()
+ self.save(fn = "/tmp/genq_cpu.owl", q = query)
+ self.measure(query, 16)
+
+ def test_genq_err(self):
+ g = self.new_g()
+ Q = self.newQ(g, what = 'AlmaFa')
+ R = self.addnode(g)
+ self.bindNode(g, Q, R)
+ query = g.serialize()
+ self.save(fn = "/tmp/genq_mem_err.owl", q = query)
+ response = self.measure(query, 1)
+ self.assertTrue("error" in response, "no error message! got %s" % response)
+
+ def test_genq_complex(self):
+ g = self.new_g()
+ R = self.addnode(g)
+ P = self.addPar(g)
+ for feature in ['FreeMemory', 'CPULoad', 'FreeDiskSpace', 'AlmaFa']:
+ Q = self.newQ(g, what = feature)
+ self.bindNode(g, Q, R)
+ if feature == 'FreeDiskSpace': #FIXME: ugly
+ self.bindPar(g, Q, P)
+
+ query = g.serialize()
+ self.save(fn = "/tmp/genq_complex.owl", q = query)
+ response = self.measure(query, 26)
+ #print response
+ self.assertTrue("error" in response, "no error message! got %s" % response)
+
+ def test_genq_memslice(self):
+ g = self.new_g()
+ Q = self.newQ(g)
+ R = self.addnode(g)
+ P = self.addPar(g, pname = 'SliceName', pval = 'novi_novi')
+ self.bindNode(g, Q, R)
+ self.bindPar(g, Q, P)
+ query = g.serialize()
+ self.save(fn = "/tmp/genq_memslice.owl", q = query)
+ pid = self.addtask(query)
+ query = self.createaggregatorquery(pid)
+ self.save(fn = "/tmp/genq_memaggr.owl", q = query)
+ aid = self.addaggregator(query)
+ print "COLLECTING DATA WAIT FOR 10 SECS"
+ sleep(10)
+ self.fetchaggregate(q = aid, expect = 21)
+ print "COLLECTING SOME MORE DATA WAIT FOR 10 SECS"
+ sleep(10)
+ self.fetchaggregate(q = aid, expect = 21)
+
+ self.delaggregator(q = aid)
+ self.deltask(q = pid)
+
+
+
+
+
+ def test_condition(self):
+ g = self.new_g()
+ Q = self.newQ(g)
+ R = self.addnode(g)
+ P = self.addPar(g, pname = 'SliceName', pval = 'novi_novi')
+ self.bindNode(g, Q, R)
+ self.bindPar(g, Q, P)
+ query = g.serialize()
+ self.save(fn = "/tmp/genq_memslice_c.owl", q = query)
+ pid = self.addtask(query)
+ query = self.createconditionquery(pid)
+ self.save(fn = "/tmp/genq_cond.owl", q = query)
+ cid = self.addcondition(query)
+
+ sleep(3)
+ print self.checkcondition(q = cid)
+
+ self.delcondition(q = cid)
+ self.deltask(q = pid)
+
+
+
+
+
+
+
+if __name__ == "__main__":
+ #import sys;sys.argv = ['', 'Test.test_genq']
+ try:
+ unittest.main()
+ finally:
+ stop_servers(t)