summaryrefslogtreecommitdiffstats
path: root/Monitoring/src/main/python/Service
diff options
context:
space:
mode:
Diffstat (limited to 'Monitoring/src/main/python/Service')
-rw-r--r--Monitoring/src/main/python/Service/MonSrvImpl.py326
-rw-r--r--Monitoring/src/main/python/Service/MonitoringService$py.classbin0 -> 31900 bytes
-rw-r--r--Monitoring/src/main/python/Service/MonitoringService.py382
-rw-r--r--Monitoring/src/main/python/Service/MonitoringService.py.old354
-rw-r--r--Monitoring/src/main/python/Service/__init__$py.classbin0 -> 2061 bytes
-rw-r--r--Monitoring/src/main/python/Service/__init__.py0
-rw-r--r--Monitoring/src/main/python/Service/__init__.py.old0
-rw-r--r--Monitoring/src/main/python/Service/a38
-rw-r--r--Monitoring/src/main/python/Service/b43
-rw-r--r--Monitoring/src/main/python/Service/interface$py.classbin0 -> 23408 bytes
-rw-r--r--Monitoring/src/main/python/Service/interface.py316
-rw-r--r--Monitoring/src/main/python/Service/interface.py.old308
-rw-r--r--Monitoring/src/main/python/Service/mock_framework$py.classbin0 -> 9540 bytes
-rw-r--r--Monitoring/src/main/python/Service/mock_framework.py69
-rw-r--r--Monitoring/src/main/python/Service/mock_framework.py.old69
-rw-r--r--Monitoring/src/main/python/Service/test.py268
-rw-r--r--Monitoring/src/main/python/Service/test.py.old245
17 files changed, 2418 insertions, 0 deletions
diff --git a/Monitoring/src/main/python/Service/MonSrvImpl.py b/Monitoring/src/main/python/Service/MonSrvImpl.py
new file mode 100644
index 0000000..36da1cc
--- /dev/null
+++ b/Monitoring/src/main/python/Service/MonSrvImpl.py
@@ -0,0 +1,326 @@
+'''
+Created on 08.08.2011
+
+@author: Sandor Laki
+'''
+
+from __future__ import with_statement
+from rdflib import Graph
+from StringIO import StringIO
+from Service.interface import MSInterface
+from eu.novi.monitoring import MonDiscoveryImpl
+import sys
+from Util.MonitoringQueryImpl import MonitoringQueryImpl
+from threading import Lock
+from org.slf4j import Logger
+from org.slf4j import LoggerFactory
+from os import path, access, R_OK
+#import eu.novi.feedback.event.ReportEvent
+import traceback
+import java.lang.StackOverflowError
+import java.lang.Error
+
+try:
+ import site
+ site.addsitedir('../site-packages')
+except ImportError, e:
+ sys.stderr.write("[EXCEPTION] import Site -> %s\n" % e)
+
+
+
+try:
+ from eu.novi.monitoring import MonSrv
+except ImportError:
+ MonSrv = object
+
+try:
+ from eu.novi.monitoring import Wiring
+except ImportError:
+ Wiring = object
+
+try:
+ from eu.novi.im.core import Resource
+except ImportError, e:
+ sys.stderr.write("[EXCEPTION] Resource -> %s\n" % e)
+ Resource = None
+
+
+class MonSrvImpl(MonSrv,Wiring):
+ testbed = "Undefined"
+ userFeedback = None
+ lock = Lock()
+
+ log = LoggerFactory.getLogger("eu.novi.monitoring.MonSrv")
+
+ def __init__(self):
+ #self.testbed = "Unknown"
+ self._msi = None
+ self.framework = MonDiscoveryImpl()
+ self.log.info("MonSrvImpl has started... Testbed=%s" % self.testbed)
+
+ def createQuery(self):
+ return MonitoringQueryImpl(self.getMSI()._ms)
+
+ def getMSI(self):
+ print "getMSI %s" % self.getTestbed()
+ self.log.info("getMSI %s" % self.getTestbed())
+ tbname = self.getTestbed()
+ with self.lock:
+ if self._msi is None:
+ baseurl = ""
+ config_owl = "config_%s.owl" % (tbname.lower())
+ self.log.info("Testbed specific configuration: %s" % config_owl)
+ #config_owl = "config_planetlab.owl"
+ try:
+ self._msi = MSInterface(self.framework, self.getTestbed(), baseurl, config_owl)
+ except:
+ self.log.info("Error occured at %s" % config_owl)
+ config_owl = "config_planetlab.owl"
+ self._msi = MSInterface(self.framework, self.getTestbed(), baseurl, config_owl)
+ self.log.info("MSInterface has been instanciated... Testbed=%s" % self.getTestbed() )
+ return self._msi
+
+
+ def setPolicy(self, policy):
+ self.policy = policy
+
+ def getPolicy(self):
+ return self.policy
+
+ def getPlatform(self):
+ return self.testbed
+
+ def setResource(self, resource):
+ self.resource = resource
+
+ def getResource(self):
+ return self.resource
+
+ def getTestbed(self):
+ return self.testbed
+
+ def setTestbed(self, testbed):
+ self.testbed = testbed
+
+ def getUserFeedback(self):
+ return self.userFeedback
+
+ def setUserFeedback(self, userFeedback):
+ self.userFeedback = userFeedback
+
+ # Test purpose function
+ def echo(self, platform):
+ '''
+ @summary: An integration tester function (to be exported public)
+ @param platform: name of the platform
+ @type platform: string
+ @return: messages of the platforms taking part in the message flow
+ @rtype: string
+ '''
+ return self.getMSI().echo(platform)
+
+ def extractCredential(self, credential):
+ cred = []
+ if credential.getType()=="UsernamePassword": cred=[{'username' : credential.username, 'password' : credential.password}]
+ elif credential.getType()=="UsernameRSAKey": cred=[{'username' : credential.username, 'password' : credential.password, 'rsakey' : credential.RSAKey}]
+ else: return "Error - unknown credential...."
+
+ # Hardcoded credential - TODO: FIX IT ASAP!!!
+ PATH="/home/novi/apache-servicemix-4.4.1-fuse-01-06/instances/system-tests/etc/root_planetlab_rsa"
+
+ try:
+ #PATH="/home/novi/apache-servicemix-4.4.1-fuse-01-06/instances/system-tests/etc/sfademo_key"
+ if path.exists(PATH) and path.isfile(PATH) and access(PATH, R_OK):
+ cred=[{'username' : "root", 'password' : "", 'rsakey' : PATH}]
+ self.log.info("root path exists and readable")
+ except:
+ self.log.info("root key cannot be accessed at %s" % PATH)
+ if not path.exists(PATH):
+ self.log.info("path doesn't exists")
+ if not path.isfile(PATH):
+ self.log.info("path is not a file")
+ if not access(PATH, R_OK):
+ self.log.info("file cannot be accessed, permission issue?")
+ #pass
+ cred.append({'username':'monitor1','password':'m/n.t,r1'}) # G3 Access
+ return cred
+
+
+ # Substrate monitoring function
+ def measure(self, credential, query):
+ '''
+ @summary: Method to handle substrate monitoring queries (to be exported public)
+ @param credential:
+ @type credential:
+ @param query: an owl document containing several BundleQuery instances
+ @type query: string
+ @return: response to the query
+ @rtype: string
+ '''
+ cred = self.extractCredential( credential )
+ self.log.info("New substrate monitoring query has arrived: %s" % query)
+ try:
+ print "Call measure"
+ #TODO: split query and concatenate results
+ return self.getMSI().measure(cred, query)
+ except Exception, e:
+ self.log.info("Exception %s %s" % (e, traceback.format_exc()))
+ except java.lang.StackOverflowError, se:
+ se.printStackTrace()
+ self.log.info("unknown %s" % se.toString())
+ except java.lang.Error, er:
+ er.printStackTrace()
+ return "[]"
+
+ def substrateFB(self, credential, query, sessionID):
+ try:
+ self.getUserFeedback().instantInfo(sessionID, "MS", "A substrate monitoring task has been submitted.", "http://fp7-novi.eu");
+ except:
+ self.log.info("Feedback thrown an exception")
+ return self.measure(credential, query)
+
+ def substrate(self, credential, query):
+ return self.measure(credential, query)
+
+ # Slice monitoring functions
+ def sliceTasks(self, credential, query):
+ return "sliceTasks() method is not implemented"
+
+ def addTask(self, credential, query):
+ '''
+ @summary: Method to start slice monitoring tasks (to be exported public)
+ @param credential:
+ @type credential:
+ @param query: an owl document containing several BundleQuery instances
+ @type query: string
+ @return: process identifier
+ @rtype: string
+ '''
+ #TODO: investigate if the service instance under this interface should be the boss
+ cred = self.extractCredential( credential )
+ return self.getMSI().launchTasks(cred, query)
+
+ def describeTaskData(self, credential, query):
+ '''
+ @summary: Method to retrieve meta data of task data (to be exported public)
+ @param credential:
+ @type credential:
+ @param query:
+ @type query: string
+ @return: serialize the header of the data tables
+ @rtype: string
+ '''
+ cred = self.extractCredential( credential )
+ return self.getMSI().describeTaskData(cred, query)
+
+
+ def fetchTaskData(self, credential, query):
+ '''
+ @summary: Method to retrieve task data collected since last fetch or the start (to be exported public)
+ @param credential:
+ @type credential:
+ @param query:
+ @type query: string
+ @return: serialize the appended content of the data tables
+ @rtype: string
+ '''
+ cred = self.extractCredential( credential )
+ return self.getMSI().fetchTaskData(cred, query)
+
+ def modifyTask(self, credential, query):
+ raise InterfaceError("modifyTask() method is not implemented")
+
+ def removeTask(self, credential, query):
+ '''
+ @summary: Method to remove a slice measurement task (to be exported public)
+ @param credential:
+ @type credential:
+ @param query:
+ @type query: string
+ '''
+ cred = self.extractCredential( credential )
+ return self.getMSI().removeTask(cred, query)
+
+ def enableTask(self, credential, query):
+ '''
+ @summary: Method to enable a slice measurement task (to be exported public)
+ @param credential:
+ @type credential:
+ @param query:
+ @type query: string
+ '''
+ cred = self.extractCredential( credential )
+ return self.getMSI().enableTask(cred, query)
+
+ def disableTask(self, credential, query):
+ '''
+ @summary: Method to disable a slice measurement task temporarily (to be exported public)
+ @param credential:
+ @type credential:
+ @param query:
+ @type query: string
+ '''
+ cred = self.extractCredential( credential )
+ return self.getMSI().disableTask(cred, query)
+
+ def getTaskStatus(self, credential, query):
+ '''
+ @summary: Method to check the state of a slice measurement task (to be exported public)
+ @param credential:
+ @type credential:
+ @param query:
+ @type query: string
+ @return: True if the tasks are running
+ @rtype: boolean
+ '''
+ cred = self.extractCredential( credential )
+ return self.getMSI().getTaskStatus(cred, query)
+
+ def addAggregator(self, credential, query):
+ '''
+ @summary: Method to define new data manipulation on slice monitoring data (to be exported public)
+ @param credential:
+ @type credential:
+ @param query: an owl document containing several SampleManipulationQuery instances
+ @type query: string
+ @return: aggregator identifier
+ @rtype: string
+ '''
+ #TODO: investigate if the service instance under this interface should be the boss
+ cred = self.extractCredential( credential )
+ return self.getMSI().addAggregator(cred, query)
+
+ def removeAggregator(self, credential, query):
+ '''
+ @summary: Method to remove data manipulation on slice monitoring data (to be exported public)
+ @param credential:
+ @type credential:
+ @param query:
+ @type query: string
+ '''
+ cred = self.extractCredential( credential )
+ return self.getMSI().removeAggregator(cred, query)
+
+ def fetchAggregatorData(self, credential, query):
+ '''
+ @summary: Method to refresh and serialize results of data manipulation on slice monitoring data (to be exported public)
+ @param credential:
+ @type credential:
+ @param query:
+ @type query: string
+ @return: result of aggregators
+ @rtype: string
+ '''
+ cred = self.extractCredential( credential )
+ return self.getMSI().fetchAggregatorData(cred, query)
+
+ def addCondition(self, credential, query):
+ raise InterfaceError("addCondition() method is not implemented")
+
+ def modifyCondition(self, credential, query):
+ raise InterfaceError("modifyCondition() method is not implemented")
+
+ def removeCondition(self, credential, query):
+ raise InterfaceError("removeCondition() method is not implemented")
+
+
diff --git a/Monitoring/src/main/python/Service/MonitoringService$py.class b/Monitoring/src/main/python/Service/MonitoringService$py.class
new file mode 100644
index 0000000..1d60f70
--- /dev/null
+++ b/Monitoring/src/main/python/Service/MonitoringService$py.class
Binary files differ
diff --git a/Monitoring/src/main/python/Service/MonitoringService.py b/Monitoring/src/main/python/Service/MonitoringService.py
new file mode 100644
index 0000000..f6102dc
--- /dev/null
+++ b/Monitoring/src/main/python/Service/MonitoringService.py
@@ -0,0 +1,382 @@
+from __future__ import with_statement
+'''
+Created on Mar 22, 2012
+
+@author: steger
+'''
+from time import sleep
+from DataProcessing.Prefix import PrefixManager
+from DataProcessing.Unit import UnitManager
+from DataProcessing.Dimension import DimensionManager
+from Semantics.InformationModel import Ontology
+from Semantics.UnitModel import UnitModel
+from Semantics.TaskModel import TaskModel
+from Semantics.QueryInterpreter import QueryInterpreter
+from Task.Task import SubtaskManager, TaskError, STRAT_PERIODICAL,\
+ STRAT_ONDEMAND
+from DataProcessing.Parameter import ParameterList
+from Resource.node import node
+from Resource.link import link
+from DataProcessing.AggregatorManager import AggregatorManager
+from DataProcessing.MeasurementLevel import lut_level
+#from paramiko.ssh_exception import BadAuthenticationType
+import logging
+from rdflib import Graph
+from StringIO import StringIO
+from DataProcessing.DataFormatter import JsonFormatter
+from DataProcessing.DataHeaderCell import CellRequestByFeature,\
+ CellRequestByName
+from DataProcessing.DataError import SamplerError
+import traceback
+
+class MonitoringService(object):
+ '''
+ classdocs
+ '''
+ version = "0.0"
+
+ def __str__(self):
+ return "NOVI Monitoring Service v%s @ %s" % (self.version, self.platform)
+
+ def _get_platform(self):
+ return self._if.platform
+
+ def __init__(self, interface, baseurl, config_owl):
+ '''
+ @summary: constructor
+ @param interface:
+ @type interface: MSInterface
+ @param baseurl: the location of the ontology files. Either poin to the file system or to a public url
+ @type baseurl: str
+ @param config_owl: platform specific configuration model
+ @type config_owl: str
+ '''
+ self._if = interface
+ self.logger = logging.getLogger(name = "NOVI.MS.%s" % self.platform)
+ self.log = self._if.log # to be removed
+ self.pm = PrefixManager()
+ self.um = UnitManager()
+ self.dm = DimensionManager(self.um)
+ self.stm = SubtaskManager(self.um)
+ self.am = AggregatorManager()
+ self.domains = []
+ self.features = []
+ self.ontology = Ontology(baseurl = baseurl, config_owl = config_owl)
+ self.unitmodel = UnitModel(self.ontology)
+ self.taskmodel = TaskModel(self.dm, self.um, self.ontology)
+ um = self.unitmodel
+
+ # infer and store prefixes
+ for (p_reference, p_symbol, base, exponent) in um.inferPrefixes():
+ self.pm.newPrefix( self.ontology._tail(p_reference), p_symbol, base, exponent )
+
+ # infer basic units
+ for u_reference, u_symbol in um.inferBaseUnits():
+ self.storeBasicUnit(u_reference, u_symbol)
+ for u_reference, u_symbol, _, _ in um.inferPowerUnits():
+ self.storeBasicUnit(u_reference, u_symbol)
+ for u_reference, u_symbol, _ in um.inferProductUnits():
+ self.storeBasicUnit(u_reference, u_symbol)
+ for u_reference, u_symbol, derivedfrom, scale, offset in um.inferLinearTransformedUnits():
+ self.storeLinearTransformedUnit(u_reference, u_symbol, derivedfrom, scale, offset)
+ for u_reference, u_symbol, derivedfrom, expr_fwd, expr_inv in um.inferRegexpTransformedUnits():
+ uref = self.ontology._tail(u_reference)
+ ancestor = self.um[ self.ontology._tail(derivedfrom) ]
+ self.um.addRegexpTransformedUnit(uref, u_symbol, ancestor, expr_fwd, expr_inv)
+
+ # infer dimensions
+ #FIXME: if there is a reference loop an error is raised...
+ for d_reference, u_reference, level in um.inferBaseDimensions():
+ dref = self.ontology._tail(d_reference)
+ uref = self.ontology._tail(u_reference)
+ lref = self.ontology._tail(level)
+ level = lut_level[lref]
+ unit = self.um[uref]
+ self.dm.newBaseDimension(dref, dref, unit, level)
+ for d_reference, u_reference, d_derivedfrom in um.inferDifferenceDimensions():
+ dref = self.ontology._tail(d_reference)
+ uref = self.ontology._tail(u_reference)
+ daref = self.ontology._tail(d_derivedfrom)
+ unit = self.um[uref]
+ derivedfrom = self.dm[daref]
+ self.dm.newDerivedDimension(dref, dref, unit, derivedfrom, self.dm.DifferenceDimension)
+ for d_reference, u_reference, d_derivedfrom, exponent in um.inferPowerDimensions():
+ dref = self.ontology._tail(d_reference)
+ uref = self.ontology._tail(u_reference)
+ daref = self.ontology._tail(d_derivedfrom)
+ unit = self.um[uref]
+ derivedfrom = self.dm[daref]
+ self.dm.newDerivedDimension(dref, dref, unit, derivedfrom, self.dm.PowerDimension, exponent = exponent)
+ for d_reference, u_reference, d_derivedfrom in um.inferProductDimensions():
+ dref = self.ontology._tail(d_reference)
+ uref = self.ontology._tail(u_reference)
+ unit = self.um[uref]
+ derivedfrom = tuple( self.dm[self.ontology._tail(x)] for x in d_derivedfrom )
+ self.dm.newDerivedDimension(dref, dref, unit, derivedfrom, self.dm.ProductDimension)
+ for d_reference, u_reference, d_derivedfrom in um.inferRatioDimensions():
+ dref = self.ontology._tail(d_reference)
+ uref = self.ontology._tail(u_reference)
+ daref = self.ontology._tail(d_derivedfrom)
+ unit = self.um[uref]
+ derivedfrom = self.dm[daref]
+ self.dm.newDerivedDimension(dref, dref, unit, derivedfrom, self.dm.RatioDimension)
+
+ # infer domains and features
+ for uri_domain in self.taskmodel.inferDomains():
+ self.domains.append(uri_domain)
+ for uri_feature, _, _ in self.taskmodel.inferFeatures():
+ self.features.append(uri_feature)
+
+ self.QI = QueryInterpreter(self.taskmodel)
+
+ self._nextID = 0
+ self.subtaskIDs = {}
+ self.aggregatorIDs = {}
+ self.formatters = {}
+
+
+ def storeBasicUnit(self, u_reference, u_symbol):
+ uref = self.ontology._tail(u_reference)
+ bu = self.um.newBasicUnit(uref, u_symbol)
+ for p_reference in self.unitmodel.inferPossiblePrefixesOf(u_reference):
+ p = self.pm[ self.ontology._tail(p_reference) ]
+ puref = "%s_%s" % (p.reference, uref)
+ symbol = "%s%s" % (p.symbol, bu.symbol)
+ self.um.addLinearTransformedUnit(puref, symbol, bu, p.scale)
+
+ def storeLinearTransformedUnit(self, u_reference, u_symbol, derivedfrom, scale, offset):
+ uref = self.ontology._tail(u_reference)
+ ancestor = self.um[ self.ontology._tail(derivedfrom) ]
+ bu = self.um.addLinearTransformedUnit(uref, u_symbol, ancestor, scale, offset)
+ for p_reference in self.unitmodel.inferPossiblePrefixesOf(u_reference):
+ p = self.pm[ self.ontology._tail(p_reference) ]
+ puref = "%s_%s" % (p.reference, uref)
+ symbol = "%s%s" % (p.symbol, bu.symbol)
+ self.um.addLinearTransformedUnit(puref, symbol, bu, p.scale)
+
+ def newProcessID(self):
+ try:
+ return "%s:process:%d" % (self.platform, self._nextID)
+ finally:
+ self._nextID += 1
+
+ def newAggregateID(self, isprocess = True):
+ try:
+ return "%s:aggregate:%d" % (self.platform, self._nextID)
+ finally:
+ self._nextID += 1
+
+ def measure(self, credential, query):
+ #TODO: docs
+ '''
+ '''
+ g = Graph()
+ g += self.ontology.graph
+ sio = StringIO(query)
+ g.parse(source = sio)
+ responses = []
+ errors = []
+ queries = self.QI.inferBundleQueries(qgraph = g)
+ self.log(shortmsg = "measurements starting...", message = "Attempt to launch %d measurement threads" % len(queries))
+ for q in queries:
+ feature_uri = q.feature
+ domain = self.ontology.ns('task')['Substrate']
+ taskgen = self.taskmodel.inferTasks(domain, feature_uri)
+ no_tool = True
+ (resource_uri, resource) = q.resource
+ #we are ugly here: use the first tool
+ for task_uri, _ in taskgen:
+ try:
+ no_tool = False
+ _, task = self.newTask(task = task_uri, cred = credential, resource = resource, parameters = q.paramlist)
+ if task is None:
+ continue
+ if q.samplechain:
+ task.strategy = STRAT_PERIODICAL
+ # we apply some aggregation to the data
+ flow = []
+ for skeleton, parlist in q.samplechain:
+ flow.append((skeleton, parlist.formkeyvaldict()))
+ aid = self.am.newAggregator(task.data, CellRequestByFeature(feature = q.feature), flow)
+ A = self.am[aid]
+ task.enable()
+ while True:
+ try:
+ task.dataAdded.wait( 15 )
+ formatter = JsonFormatter(datasource = A.data)
+ break
+ except SamplerError:
+ task.dataAdded.clear()
+ sleep(1)
+ else:
+ task.strategy = STRAT_ONDEMAND
+ task.enable()
+ task.dataAdded.wait( 15 )
+ formatter = JsonFormatter(datasource = task.data)
+ formatter.extract(cellrequest = [
+ CellRequestByName(name = "Run"),
+ CellRequestByFeature(feature = feature_uri)
+ ])
+ t = formatter.serialize()
+ try:
+ print "Call task.destroy"
+ task.destroy()
+ except:
+ pass
+ #print "retek",t
+ if t is not None:
+ if len(t)>0:
+ responses.append( "{\"%s\" : %s}" %(feature_uri,t) ) #formatter.serialize() )
+ except Exception, e:
+ tbe = traceback.format_exc()
+ err_desc = "Unexpected exception occured: %s, %s" % (e, tbe)
+ errors.append(err_desc)
+ if no_tool:
+ err_description = "No tools to measure %s @ %s" % (feature_uri, resource_uri)
+ errors.append(err_description)
+ self.log(shortmsg = "Limited result set", message = err_description)
+ useful_data = ",\n".join( responses )
+ error_data = "+".join(errors)
+ if len(errors):
+ if len(useful_data):
+ response = "[%s,\n{\"errors\" : \"%s\"}]" % (useful_data, error_data)
+ else:
+ response = "[{\"errors\" : \"%s\"}]" % (error_data)
+ else:
+ response = "[%s]" % useful_data
+ return response
+
+ def launchTasks(self, credential, query):
+ #TODO: many things in common with measure!!!
+ g = Graph()
+ g += self.ontology.graph
+ sio = StringIO(query)
+ g.parse(source = sio)
+ taskID = self.newID()
+ idstore = self.subtaskIDs[taskID] = []
+ formatters = self.formatters[taskID] = []
+ for q in self.QI.getBundleQuery(qgraph = g):
+ feature_uri = q.feature
+
+ print "PPPPP", q.paramlist
+
+ domain = self.ontology.ns('task')['Slice']
+ taskgen = self.taskmodel.inferTasks(domain, feature_uri)
+ #we are ugly here: use the first tool
+ for task_uri, _ in taskgen:
+ subtaskID, task = self.newTask(task = task_uri, cred = credential, resource = q.resource, parameters = q.paramlist)
+ task.strategy = STRAT_PERIODICAL
+ task.enable()
+ idstore.append(subtaskID)
+ f = q.formatter(datasource = task.data)
+ formatters.append(f)
+ if len(idstore):
+ return taskID
+ else:
+ self.subtaskIDs.pop(taskID)
+ self.formatters.pop(taskID)
+ return None
+
+
+ platform = property(_get_platform,None,None)
+
+
+ def newTask(self, task, cred, resource = None, parameters = ParameterList()):
+ '''
+ @summary: initialize a Task object, which is referenced by a uri
+ @param task: the reference to the task description
+ @type task: URIRef
+ @param cred: an iterable over dictionaries, which are used as input parameters to initialize Credential templates passed to the Task object for authentication, authorization purposes
+ @type cred: dict generator
+ @param resource: the resource to measure
+ @type resource: resource or None
+ @param parameters: the parameter list to refresh the default parameters of the Task object
+ @type parameters: ParameterList
+ @return: the tuple of taskID and the initialized measurement Task object
+ @rtype: int, Task
+ '''
+ name = self.ontology._tail(task)
+ credset = self.taskmodel.inferCredentialOf(task)
+ driver = self.taskmodel.inferDriverOf(task)
+ hdr = self.taskmodel.inferDataheaderOf(task)
+ hooks = self.taskmodel.inferHookdefinitionsOf(task)
+ hookpar = self.taskmodel.inferHookparametersOf(task)
+ taskparameters = self.taskmodel.inferParametersOf(task)
+
+ taskparameters.update_by_list(parameters)
+
+ #TODO: maybe better push resource to the Task as an argument
+ if isinstance(resource, node):
+ addr, unit = resource.get_ipaddress("eth0")
+ taskparameters.update("SourceAddress", addr, unit)
+ elif isinstance(resource, link):
+ addr, unit = resource.source.address
+ taskparameters.update("SourceAddress", addr, unit)
+ addr, unit = resource.destination.address
+ taskparameters.update("DestinationAddress", addr, unit)
+
+# print taskparameters
+
+ while len(credset):
+ ct = credset.pop()
+ for c in cred:
+ try:
+ credential = ct(**c)
+ except:
+ # credential mismatch go on with the next
+ continue
+ try:
+ return self.stm.generate(name = name, driver = driver, dataheader = hdr,
+ hookimplementations = hooks, parameters = taskparameters, credential = credential, **hookpar)
+ except Exception, e:
+ print "Exception - %s" % e
+ pass
+ return None, None
+ #raise TaskError("Cannot initialize the Task with the credential set provided for %s" % name)
+
+ def delTask(self, taskidentifier):
+ self.stm.pop( taskidentifier )
+
+ def getTask(self, taskidentifier):
+ return self.stm[ taskidentifier ]
+
+ def attachAggregators(self, credential, query):
+ g = Graph()
+ g += self.ontology.graph
+ sio = StringIO(query)
+ g.parse(source = sio)
+ aggregatorID = self.newID()
+ idstore = self.aggregatorIDs[aggregatorID] = []
+ formatters = self.formatters[aggregatorID] = []
+ raise Exception("unimplemented")
+# for q in self.QI.getBundleQuery(qgraph = g):
+# feature_uri = q.feature
+#
+# print "PPPPP", q.paramlist
+#
+# domain = self.ontology.ns('task')['Slice']
+# taskgen = self.taskmodel.inferTasks(domain, feature_uri)
+# #we are ugly here: use the first tool
+# for task_uri, _ in taskgen:
+# subtaskID, task = self.newTask(task = task_uri, cred = credential, resource = q.resource, parameters = q.paramlist)
+# task.strategy = STRAT_PERIODICAL
+# task.enable()
+# idstore.append(subtaskID)
+# f = q.formatter(datasource = task.data)
+# formatters.append(f)
+ if len(idstore):
+ return aggregatorID
+ else:
+ self.subtaskIDs.pop(aggregatorID)
+ self.formatters.pop(aggregatorID)
+ return None
+
+ def newAggregator(self):
+ pass
+
+ def delAggregator(self, aggregatoridentifier):
+ self.am.pop( aggregatoridentifier )
+
+ def getAggregator(self, aggregatoridentifier):
+ return self.am[ aggregatoridentifier ]
+
diff --git a/Monitoring/src/main/python/Service/MonitoringService.py.old b/Monitoring/src/main/python/Service/MonitoringService.py.old
new file mode 100644
index 0000000..0939c73
--- /dev/null
+++ b/Monitoring/src/main/python/Service/MonitoringService.py.old
@@ -0,0 +1,354 @@
+'''
+Created on Mar 22, 2012
+
+@author: steger
+'''
+from time import sleep
+from DataProcessing.Prefix import PrefixManager
+from DataProcessing.Unit import UnitManager
+from DataProcessing.Dimension import DimensionManager
+from Semantics.InformationModel import Ontology
+from Semantics.UnitModel import UnitModel
+from Semantics.TaskModel import TaskModel
+from Semantics.QueryInterpreter import QueryInterpreter
+from Task.Task import SubtaskManager, TaskError, STRAT_PERIODICAL,\
+ STRAT_ONDEMAND
+from DataProcessing.Parameter import ParameterList
+from Resource.node import node
+from DataProcessing.AggregatorManager import AggregatorManager
+from DataProcessing.MeasurementLevel import lut_level
+from paramiko.ssh_exception import BadAuthenticationType
+import logging
+from rdflib import Graph
+from StringIO import StringIO
+from DataProcessing.DataFormatter import JsonFormatter
+from DataProcessing.DataHeaderCell import CellRequestByFeature,\
+ CellRequestByName
+from DataProcessing.DataError import SamplerError
+
+class MonitoringService(object):
+ '''
+ classdocs
+ '''
+ version = "0.0"
+
+ def __str__(self):
+ return "NOVI Monitoring Service v%s @ %s" % (self.version, self.platform)
+
+ @property
+ def platform(self):
+ return self._if.platform
+
+ def __init__(self, interface, baseurl, config_owl):
+ '''
+ @summary: constructor
+ @param interface:
+ @type interface: MSInterface
+ @param baseurl: the location of the ontology files. Either poin to the file system or to a public url
+ @type baseurl: str
+ @param config_owl: platform specific configuration model
+ @type config_owl: str
+ '''
+ self._if = interface
+ self.logger = logging.getLogger(name = "NOVI.MS.%s" % self.platform)
+ self.log = self._if.log # to be removed
+ self.pm = PrefixManager()
+ self.um = UnitManager()
+ self.dm = DimensionManager(self.um)
+ self.stm = SubtaskManager(self.um)
+ self.am = AggregatorManager()
+ self.domains = []
+ self.features = []
+ self.ontology = Ontology(baseurl = baseurl, config_owl = config_owl)
+ self.unitmodel = UnitModel(self.ontology)
+ self.taskmodel = TaskModel(self.dm, self.um, self.ontology)
+ um = self.unitmodel
+
+ # infer and store prefixes
+ for (p_reference, p_symbol, base, exponent) in um.inferPrefixes():
+ self.pm.newPrefix( self.ontology._tail(p_reference), p_symbol, base, exponent )
+
+ # infer basic units
+ for u_reference, u_symbol in um.inferBaseUnits():
+ self.storeBasicUnit(u_reference, u_symbol)
+ for u_reference, u_symbol, _, _ in um.inferPowerUnits():
+ self.storeBasicUnit(u_reference, u_symbol)
+ for u_reference, u_symbol, _ in um.inferProductUnits():
+ self.storeBasicUnit(u_reference, u_symbol)
+ for u_reference, u_symbol, derivedfrom, scale, offset in um.inferLinearTransformedUnits():
+ self.storeLinearTransformedUnit(u_reference, u_symbol, derivedfrom, scale, offset)
+ for u_reference, u_symbol, derivedfrom, expr_fwd, expr_inv in um.inferRegexpTransformedUnits():
+ uref = self.ontology._tail(u_reference)
+ ancestor = self.um[ self.ontology._tail(derivedfrom) ]
+ self.um.addRegexpTransformedUnit(uref, u_symbol, ancestor, expr_fwd, expr_inv)
+
+ # infer dimensions
+ #FIXME: if there is a reference loop an error is raised...
+ for d_reference, u_reference, level in um.inferBaseDimensions():
+ dref = self.ontology._tail(d_reference)
+ uref = self.ontology._tail(u_reference)
+ lref = self.ontology._tail(level)
+ level = lut_level[lref]
+ unit = self.um[uref]
+ self.dm.newBaseDimension(dref, dref, unit, level)
+ for d_reference, u_reference, d_derivedfrom in um.inferDifferenceDimensions():
+ dref = self.ontology._tail(d_reference)
+ uref = self.ontology._tail(u_reference)
+ daref = self.ontology._tail(d_derivedfrom)
+ unit = self.um[uref]
+ derivedfrom = self.dm[daref]
+ self.dm.newDerivedDimension(dref, dref, unit, derivedfrom, self.dm.DifferenceDimension)
+ for d_reference, u_reference, d_derivedfrom, exponent in um.inferPowerDimensions():
+ dref = self.ontology._tail(d_reference)
+ uref = self.ontology._tail(u_reference)
+ daref = self.ontology._tail(d_derivedfrom)
+ unit = self.um[uref]
+ derivedfrom = self.dm[daref]
+ self.dm.newDerivedDimension(dref, dref, unit, derivedfrom, self.dm.PowerDimension, exponent = exponent)
+ for d_reference, u_reference, d_derivedfrom in um.inferProductDimensions():
+ dref = self.ontology._tail(d_reference)
+ uref = self.ontology._tail(u_reference)
+ unit = self.um[uref]
+ derivedfrom = tuple( self.dm[self.ontology._tail(x)] for x in d_derivedfrom )
+ self.dm.newDerivedDimension(dref, dref, unit, derivedfrom, self.dm.ProductDimension)
+ for d_reference, u_reference, d_derivedfrom in um.inferRatioDimensions():
+ dref = self.ontology._tail(d_reference)
+ uref = self.ontology._tail(u_reference)
+ daref = self.ontology._tail(d_derivedfrom)
+ unit = self.um[uref]
+ derivedfrom = self.dm[daref]
+ self.dm.newDerivedDimension(dref, dref, unit, derivedfrom, self.dm.RatioDimension)
+
+ # infer domains and features
+ for uri_domain in self.taskmodel.inferDomains():
+ self.domains.append(uri_domain)
+ for uri_feature, _, _ in self.taskmodel.inferFeatures():
+ self.features.append(uri_feature)
+
+ self.QI = QueryInterpreter(self.taskmodel)
+
+ self._nextID = 0
+ self.subtaskIDs = {}
+ self.aggregatorIDs = {}
+ self.formatters = {}
+
+
+ def storeBasicUnit(self, u_reference, u_symbol):
+ uref = self.ontology._tail(u_reference)
+ bu = self.um.newBasicUnit(uref, u_symbol)
+ for p_reference in self.unitmodel.inferPossiblePrefixesOf(u_reference):
+ p = self.pm[ self.ontology._tail(p_reference) ]
+ puref = "%s_%s" % (p.reference, uref)
+ symbol = "%s%s" % (p.symbol, bu.symbol)
+ self.um.addLinearTransformedUnit(puref, symbol, bu, p.scale)
+
+ def storeLinearTransformedUnit(self, u_reference, u_symbol, derivedfrom, scale, offset):
+ uref = self.ontology._tail(u_reference)
+ ancestor = self.um[ self.ontology._tail(derivedfrom) ]
+ bu = self.um.addLinearTransformedUnit(uref, u_symbol, ancestor, scale, offset)
+ for p_reference in self.unitmodel.inferPossiblePrefixesOf(u_reference):
+ p = self.pm[ self.ontology._tail(p_reference) ]
+ puref = "%s_%s" % (p.reference, uref)
+ symbol = "%s%s" % (p.symbol, bu.symbol)
+ self.um.addLinearTransformedUnit(puref, symbol, bu, p.scale)
+
+ def newProcessID(self):
+ try:
+ return "%s:process:%d" % (self.platform, self._nextID)
+ finally:
+ self._nextID += 1
+
+ def newAggregateID(self, isprocess = True):
+ try:
+ return "%s:aggregate:%d" % (self.platform, self._nextID)
+ finally:
+ self._nextID += 1
+
+ def measure(self, credential, query):
+ #TODO: docs
+ '''
+ '''
+ g = Graph()
+ g += self.ontology.graph
+ sio = StringIO(query)
+ g.parse(source = sio)
+ responses = []
+ errors = []
+ queries = self.QI.inferBundleQueries(qgraph = g)
+ self.log(shortmsg = "measurements starting...", message = "Attempt to launch %d measurement threads" % len(queries))
+ for q in queries:
+ feature_uri = q.feature
+ domain = self.ontology.ns('task')['Substrate']
+ taskgen = self.taskmodel.inferTasks(domain, feature_uri)
+ no_tool = True
+ (resource_uri, resource) = q.resource
+ #we are ugly here: use the first tool
+ for task_uri, _ in taskgen:
+ no_tool = False
+ _, task = self.newTask(task = task_uri, cred = credential, resource = resource, parameters = q.paramlist)
+ if q.samplechain:
+ task.strategy = STRAT_PERIODICAL
+ # we apply some aggregation to the data
+ flow = []
+ for skeleton, parlist in q.samplechain:
+ flow.append((skeleton, parlist.formkeyvaldict()))
+ aid = self.am.newAggregator(task.data, CellRequestByFeature(feature = q.feature), flow)
+ A = self.am[aid]
+ task.enable()
+ while True:
+ try:
+ task.dataAdded.wait( 15 )
+ formatter = JsonFormatter(datasource = A.data)
+ break
+ except SamplerError:
+ task.dataAdded.clear()
+ sleep(1)
+ else:
+ task.strategy = STRAT_ONDEMAND
+ task.enable()
+ task.dataAdded.wait( 15 )
+ formatter = JsonFormatter(datasource = task.data)
+ formatter.extract(cellrequest = [
+ CellRequestByName(name = "Run"),
+ CellRequestByFeature(feature = feature_uri)
+ ])
+ responses.append( formatter.serialize() )
+ if no_tool:
+ err_description = "No tools to measure %s @ %s" % (feature_uri, resource_uri)
+ errors.append(err_description)
+ self.log(shortmsg = "Limited result set", message = err_description)
+ useful_data = ",\n".join( responses )
+ error_data = "+".join(errors)
+ if len(errors):
+ if len(useful_data):
+ response = "{%s,\n\"errors\" : \"%s\"}" % (useful_data, error_data)
+ else:
+ response = "{\"errors\" : \"%s\"}" % (error_data)
+ else:
+ response = "{%s}" % useful_data
+ return response
+
+ def launchTasks(self, credential, query):
+ #TODO: many things in common with measure!!!
+ g = Graph()
+ g += self.ontology.graph
+ sio = StringIO(query)
+ g.parse(source = sio)
+ taskID = self.newID()
+ idstore = self.subtaskIDs[taskID] = []
+ formatters = self.formatters[taskID] = []
+ for q in self.QI.getBundleQuery(qgraph = g):
+ feature_uri = q.feature
+
+ print "PPPPP", q.paramlist
+
+ domain = self.ontology.ns('task')['Slice']
+ taskgen = self.taskmodel.inferTasks(domain, feature_uri)
+ #we are ugly here: use the first tool
+ for task_uri, _ in taskgen:
+ subtaskID, task = self.newTask(task = task_uri, cred = credential, resource = q.resource, parameters = q.paramlist)
+ task.strategy = STRAT_PERIODICAL
+ task.enable()
+ idstore.append(subtaskID)
+ f = q.formatter(datasource = task.data)
+ formatters.append(f)
+ if len(idstore):
+ return taskID
+ else:
+ self.subtaskIDs.pop(taskID)
+ self.formatters.pop(taskID)
+ return None
+
+
+
+ def newTask(self, task, cred, resource = None, parameters = ParameterList()):
+ '''
+ @summary: initialize a Task object, which is referenced by a uri
+ @param task: the reference to the task description
+ @type task: URIRef
+ @param cred: an iterable over dictionaries, which are used as input parameters to initialize Credential templates passed to the Task object for authentication, authorization purposes
+ @type cred: dict generator
+ @param resource: the resource to measure
+ @type resource: resource or None
+ @param parameters: the parameter list to refresh the default parameters of the Task object
+ @type parameters: ParameterList
+ @return: the tuple of taskID and the initialized measurement Task object
+ @rtype: int, Task
+ '''
+ name = self.ontology._tail(task)
+ credset = self.taskmodel.inferCredentialOf(task)
+ driver = self.taskmodel.inferDriverOf(task)
+ hdr = self.taskmodel.inferDataheaderOf(task)
+ hooks = self.taskmodel.inferHookdefinitionsOf(task)
+ hookpar = self.taskmodel.inferHookparametersOf(task)
+ taskparameters = self.taskmodel.inferParametersOf(task)
+
+ taskparameters.update_by_list(parameters)
+
+ #TODO: maybe better push resource to the Task as an argument
+ if isinstance(resource, node):
+ addr, unit = resource.get_ipaddress("eth0")
+ taskparameters.update("SourceAddress", addr, unit)
+# print taskparameters
+
+ while len(credset):
+ ct = credset.pop()
+ for c in cred:
+ try:
+ credential = ct(**c)
+ except:
+ # credential mismatch go on with the next
+ continue
+ try:
+ return self.stm.generate(name = name, driver = driver, dataheader = hdr,
+ hookimplementations = hooks, parameters = taskparameters, credential = credential, **hookpar)
+ except BadAuthenticationType:
+ pass
+ raise TaskError("Cannot initialize the Task with the credential set provided for %s" % name)
+
+ def delTask(self, taskidentifier):
+ self.stm.pop( taskidentifier )
+
+ def getTask(self, taskidentifier):
+ return self.stm[ taskidentifier ]
+
+ def attachAggregators(self, credential, query):
+ g = Graph()
+ g += self.ontology.graph
+ sio = StringIO(query)
+ g.parse(source = sio)
+ aggregatorID = self.newID()
+ idstore = self.aggregatorIDs[aggregatorID] = []
+ formatters = self.formatters[aggregatorID] = []
+ raise Exception("unimplemented")
+# for q in self.QI.getBundleQuery(qgraph = g):
+# feature_uri = q.feature
+#
+# print "PPPPP", q.paramlist
+#
+# domain = self.ontology.ns('task')['Slice']
+# taskgen = self.taskmodel.inferTasks(domain, feature_uri)
+# #we are ugly here: use the first tool
+# for task_uri, _ in taskgen:
+# subtaskID, task = self.newTask(task = task_uri, cred = credential, resource = q.resource, parameters = q.paramlist)
+# task.strategy = STRAT_PERIODICAL
+# task.enable()
+# idstore.append(subtaskID)
+# f = q.formatter(datasource = task.data)
+# formatters.append(f)
+ if len(idstore):
+ return aggregatorID
+ else:
+ self.subtaskIDs.pop(aggregatorID)
+ self.formatters.pop(aggregatorID)
+ return None
+
+ def newAggregator(self):
+ pass
+
+ def delAggregator(self, aggregatoridentifier):
+ self.am.pop( aggregatoridentifier )
+
+ def getAggregator(self, aggregatoridentifier):
+ return self.am[ aggregatoridentifier ]
+
diff --git a/Monitoring/src/main/python/Service/__init__$py.class b/Monitoring/src/main/python/Service/__init__$py.class
new file mode 100644
index 0000000..7dcd0ad
--- /dev/null
+++ b/Monitoring/src/main/python/Service/__init__$py.class
Binary files differ
diff --git a/Monitoring/src/main/python/Service/__init__.py b/Monitoring/src/main/python/Service/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/Monitoring/src/main/python/Service/__init__.py
diff --git a/Monitoring/src/main/python/Service/__init__.py.old b/Monitoring/src/main/python/Service/__init__.py.old
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/Monitoring/src/main/python/Service/__init__.py.old
diff --git a/Monitoring/src/main/python/Service/a b/Monitoring/src/main/python/Service/a
new file mode 100644
index 0000000..7af859c
--- /dev/null
+++ b/Monitoring/src/main/python/Service/a
@@ -0,0 +1,38 @@
+>
+ <core:hasInboundInterface rdf:resource="http://foo.bar/req.owl#ifin"/>
+ <core:hasIPv4Address rdf:resource="http://foo.bar/req.owl#smilax_address"/>
+ <core:hasIPv4Address rdf:resource="http://foo.bar/req.owl#smilax_address"/>
+ <core:hasOutboundInterface rdf:resource="http://foo.bar/req.owl#ifin"/>
+ <feature:hasFeature rdf:resource="http://fp7-novi.eu/monitoring_features.owl#MemoryUtilization"/>
+ <query:hasFormatter rdf:resource="http://fp7-novi.eu/monitoring_query.owl#Formatter_JSON"/>
+ <query:hasResource rdf:resource="http://foo.bar/req.owl#smilax1"/>
+ </rdf:Description>
+ </rdf:Description>
+ </rdf:Description>
+ </rdf:Description>
+ </rdf:Description>
+ <rdf:Description rdf:about="http://foo.bar/req.owl#ifin">
+ <rdf:Description rdf:about="http://foo.bar/req.owl#ifout">
+ <rdf:Description rdf:about="http://foo.bar/req.owl#measureMemoryInformation">
+ <rdf:Description rdf:about="http://foo.bar/req.owl#smilax1">
+ <rdf:Description rdf:about="http://foo.bar/req.owl#smilax_address">
+</rdf:RDF>
+<rdf:RDF
+ <rdf:type rdf:resource="http://fp7-novi.eu/im.owl#Interface"/>
+ <rdf:type rdf:resource="http://fp7-novi.eu/im.owl#Interface"/>
+ <rdf:type rdf:resource="http://fp7-novi.eu/im.owl#Node"/>
+ <rdf:type rdf:resource="http://fp7-novi.eu/im.owl#Resource"/>
+ <rdf:type rdf:resource="http://fp7-novi.eu/monitoring_query.owl#BundleQuery"/>
+ <rdf:type rdf:resource="http://fp7-novi.eu/unit.owl#IPAddress"/>
+ <rdf:type rdf:resource="http://www.w3.org/2002/07/owl#NamedIndividual"/>
+ <rdf:type rdf:resource="http://www.w3.org/2002/07/owl#NamedIndividual"/>
+ <rdf:type rdf:resource="http://www.w3.org/2002/07/owl#NamedIndividual"/>
+ <stat:hasSample rdf:resource="http://fp7-novi.eu/monitoring_stat.owl#UnmodifiedExtractOfFeatureSamples"/>
+ <unit:hasValue>150.254.160.19</unit:hasValue>
+ xmlns:core="http://fp7-novi.eu/im.owl#"
+ xmlns:feature="http://fp7-novi.eu/monitoring_features.owl#"
+ xmlns:query="http://fp7-novi.eu/monitoring_query.owl#"
+ xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#"
+ xmlns:stat="http://fp7-novi.eu/monitoring_stat.owl#"
+ xmlns:unit="http://fp7-novi.eu/unit.owl#"
+<?xml version="1.0" encoding="UTF-8"?>
diff --git a/Monitoring/src/main/python/Service/b b/Monitoring/src/main/python/Service/b
new file mode 100644
index 0000000..59b1a36
--- /dev/null
+++ b/Monitoring/src/main/python/Service/b
@@ -0,0 +1,43 @@
+>
+ <core:hasInboundInterface rdf:resource="http://foo.bar/req.owl#ifin"/>
+ <core:hasIPv4Address rdf:resource="http://foo.bar/req.owl#ifin_address"/>
+ <core:hasIPv4Address rdf:resource="http://foo.bar/req.owl#ifout_address"/>
+ <core:hasOutboundInterface rdf:resource="http://foo.bar/req.owl#ifout"/>
+ <feature:hasFeature rdf:resource="http://fp7-novi.eu/monitoring_features.owl#MemoryUtilization"/>
+ <query:hasFormatter rdf:resource="http://fp7-novi.eu/monitoring_query.owl#Formatter_JSON"/>
+ <query:hasResource rdf:resource="http://foo.bar/req.owl#smilax1"/>
+ </rdf:Description>
+ </rdf:Description>
+ </rdf:Description>
+ </rdf:Description>
+ </rdf:Description>
+ </rdf:Description>
+ <rdf:Description rdf:about="http://foo.bar/req.owl#ifin">
+ <rdf:Description rdf:about="http://foo.bar/req.owl#ifin_address">
+ <rdf:Description rdf:about="http://foo.bar/req.owl#ifout">
+ <rdf:Description rdf:about="http://foo.bar/req.owl#ifout_address">
+ <rdf:Description rdf:about="http://foo.bar/req.owl#measureMemoryInfo">
+ <rdf:Description rdf:about="http://foo.bar/req.owl#smilax1">
+</rdf:RDF>
+<rdf:RDF
+ <rdf:type rdf:resource="http://fp7-novi.eu/im.owl#Interface"/>
+ <rdf:type rdf:resource="http://fp7-novi.eu/im.owl#Interface"/>
+ <rdf:type rdf:resource="http://fp7-novi.eu/im.owl#Node"/>
+ <rdf:type rdf:resource="http://fp7-novi.eu/im.owl#Resource"/>
+ <rdf:type rdf:resource="http://fp7-novi.eu/monitoring_query.owl#BundleQuery"/>
+ <rdf:type rdf:resource="http://fp7-novi.eu/unit.owl#IPAddress"/>
+ <rdf:type rdf:resource="http://fp7-novi.eu/unit.owl#IPAddress"/>
+ <rdf:type rdf:resource="http://www.w3.org/2002/07/owl#NamedIndividual"/>
+ <rdf:type rdf:resource="http://www.w3.org/2002/07/owl#NamedIndividual"/>
+ <rdf:type rdf:resource="http://www.w3.org/2002/07/owl#NamedIndividual"/>
+ <rdf:type rdf:resource="http://www.w3.org/2002/07/owl#NamedIndividual"/>
+ <stat:hasSample rdf:resource="http://fp7-novi.eu/monitoring_stat.owl#UnmodifiedExtractOfFeatureSamples"/>
+ <unit:hasValue>150.254.160.19</unit:hasValue>
+ <unit:hasValue>150.254.160.19</unit:hasValue>
+ xmlns:core="http://fp7-novi.eu/im.owl#"
+ xmlns:feature="http://fp7-novi.eu/monitoring_features.owl#"
+ xmlns:query="http://fp7-novi.eu/monitoring_query.owl#"
+ xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#"
+ xmlns:stat="http://fp7-novi.eu/monitoring_stat.owl#"
+ xmlns:unit="http://fp7-novi.eu/unit.owl#"
+<?xml version="1.0" encoding="UTF-8"?>
diff --git a/Monitoring/src/main/python/Service/interface$py.class b/Monitoring/src/main/python/Service/interface$py.class
new file mode 100644
index 0000000..2524e39
--- /dev/null
+++ b/Monitoring/src/main/python/Service/interface$py.class
Binary files differ
diff --git a/Monitoring/src/main/python/Service/interface.py b/Monitoring/src/main/python/Service/interface.py
new file mode 100644
index 0000000..104815f
--- /dev/null
+++ b/Monitoring/src/main/python/Service/interface.py
@@ -0,0 +1,316 @@
+'''
+Created on 08.08.2011
+
+@author: steger, jozsef
+'''
+
+from rdflib import Graph
+from StringIO import StringIO
+from Service.MonitoringService import MonitoringService
+import logging
+
+class InterfaceError(Exception):
+ pass
+
+#TODO: add and handle bindings at this level
+class MSInterface(object):
+ '''
+ @summary: Implements a thin service layer on top of the MonitoringService instance
+ to collect methods that need to be exported and mapped in the NOVI API.
+ It also provides a reference to the framework to be able to communicate with
+ remote MonitoringService instances. The log() method is a place holder
+ to sink information to be pushed in the NOVI UserFeedback service.
+ The emit() method is a place holder to sink signals to be pushed in the NOVI
+ Policy Service component installed on top of the same platform.
+ '''
+
+ def __init__(self, framework, reference, baseurl, config_owl):
+ '''
+ Constructor
+ @param framework: a service which provides getService() method to look up MonSrv instances of different reference
+ @type framework: Framework
+ @param reference: the name of the platform
+ @type reference: str
+ @param baseurl: the location of the ontology files. Either point to the file system or to a public url
+ @type baseurl: str
+ @param config_owl: platform specific configuration model
+ @type config_owl: str
+ '''
+ self.framework = framework
+ self.platform = reference
+ self._ms = MonitoringService(self, baseurl, config_owl)
+ self.logger = logging.getLogger(name = "NOVI.MSI.%s" % reference)
+
+ def _get_service(self):
+ '''
+ @return: the underlying monitoring service component
+ @rtype: MonitoringService
+ '''
+ return self._ms
+
+ def _get_proxy(self):
+ '''
+ @return: a proxy service to look up the rest of the monitoring service components
+ @rtype: Framework
+ '''
+ return self._framework
+
+ def dispatchID(self, identifier):
+ '''
+ @summary: this method finds the MonitoringService instance that is responsible for handling an identified Task or Aggregate
+ @param identifier: identifier of a task or aggregate, it follows the form: <platform>:<process|aggregate>:<id>
+ @type identifier: string
+ @return: the monitoring service instance
+ @rtype: MonitoringService
+ '''
+ try:
+ platform, _, _ = identifier.split(':')
+ if self.platform == platform:
+ return self.service
+ return self.framework.getService(platform)
+ except ValueError:
+ raise InterfaceError("Wrong identifier format")
+
+ def log(self, shortmsg, message):
+ # overridden by the JAVA wrapper
+ self.logger.info("[%s] %s" % (shortmsg, message))
+
+ def emit(self, what):
+ # overridden by the JAVA wrapper
+ self.framework.getPolicyService(self.platform).trigger(what)
+
+ # Test purpose function
+ def echo(self, platform):
+ '''
+ @summary: An integration tester function (to be exported public)
+ @param platform: name of the platform
+ @type platform: string
+ @return: messages of the platforms taking part in the message flow
+ @rtype: string
+ '''
+ self.logger.info("[echo] calling %s" % platform)
+ try:
+ otherservice = self.framework.getService(platform).getPlatform()
+ return "%s -> %s" % (str(self.platform), str(otherservice)), ""
+ except:
+ return "Exception: %s" % str(self.platform), ""
+
+
+ # Substrate monitoring function
+ def measure(self, credential, query):
+ '''
+ @summary: Method to handle substrate monitoring queries (to be exported public)
+ @param credential:
+ @type credential:
+ @param query: an owl document containing several BundleQuery instances
+ @type query: string
+ @return: response to the query
+ @rtype: string
+ '''
+ #TODO: split query and concatenate results
+ return self.service.measure(credential, query)
+
+ # Slice monitoring functions
+ def sliceTasks(self, credential, query):
+ raise InterfaceError("sliceTasks() method is not implemented")
+
+ def addTask(self, credential, query):
+ '''
+ @summary: Method to start slice monitoring tasks (to be exported public)
+ @param credential:
+ @type credential:
+ @param query: an owl document containing several BundleQuery instances
+ @type query: string
+ @return: process identifier
+ @rtype: string
+ '''
+ #TODO: investigate if the service instance under this interface should be the boss
+ return self.service.launchTasks(credential, query)
+
+ def describeTaskData(self, credential, query):
+ '''
+ @summary: Method to retrieve meta data of task data (to be exported public)
+ @param credential:
+ @type credential:
+ @param query:
+ @type query: string
+ @return: serialize the header of the data tables
+ @rtype: string
+ '''
+ taskID = query
+ ms = self.dispatchID(identifier = taskID)
+ #TODO: move this in the MonitoringService
+ headers = map(lambda x: x.header(), ms.formatters[taskID])
+ return "[%s]" % "\n,\n".join(headers)
+
+ def fetchTaskData(self, credential, query):
+ '''
+ @summary: Method to retrieve task data collected since last fetch or the start (to be exported public)
+ @param credential:
+ @type credential:
+ @param query:
+ @type query: string
+ @return: serialize the appended content of the data tables
+ @rtype: string
+ '''
+ taskID = query
+ ms = self.dispatchID(identifier = taskID)
+ #TODO: move this in the MonitoringService
+ response = []
+ try:
+ for f in ms.formatters[taskID]:
+ response.append( f.serialize() )
+ except Exception, e:
+ print "EEE", e
+ pass
+ return "[%s]" % "\n,\n".join(response)
+
+ def modifyTask(self, credential, query):
+ raise InterfaceError("modifyTask() method is not implemented")
+
+ def removeTask(self, credential, query):
+ '''
+ @summary: Method to remove a slice measurement task (to be exported public)
+ @param credential:
+ @type credential:
+ @param query:
+ @type query: string
+ '''
+ taskID = query
+ ms = self.dispatchID(identifier = taskID)
+ #TODO: move this in the MonitoringService
+ try:
+ subtaskids = ms.subtaskIDs.pop(taskID)
+ ms.formatters.pop(taskID)
+ while len(subtaskids):
+ subtaskid = subtaskids.pop()
+ ms.delTask(taskidentifier = subtaskid)
+ except KeyError:
+ # the taskID does not belong to me
+ pass
+
+ def enableTask(self, credential, query):
+ '''
+ @summary: Method to enable a slice measurement task (to be exported public)
+ @param credential:
+ @type credential:
+ @param query:
+ @type query: string
+ '''
+ taskID = query
+ ms = self.dispatchID(identifier = taskID)
+ try:
+ for subtaskid in ms.subtaskIDs[taskID]:
+ t = ms.getTask(taskidentifier = subtaskid)
+ t.enable()
+ except KeyError:
+ # the taskID does not belong to me
+ pass
+
+ def disableTask(self, credential, query):
+ '''
+ @summary: Method to disable a slice measurement task temporarily (to be exported public)
+ @param credential:
+ @type credential:
+ @param query:
+ @type query: string
+ '''
+ taskID = query
+ ms = self.dispatchID(identifier = taskID)
+ try:
+ for subtaskid in ms.subtaskIDs[taskID]:
+ t = ms.getTask(taskidentifier = subtaskid)
+ t.disable()
+ except KeyError:
+ # the taskID does not belong to me
+ pass
+
+ def getTaskStatus(self, credential, query):
+ '''
+ @summary: Method to check the state of a slice measurement task (to be exported public)
+ @param credential:
+ @type credential:
+ @param query:
+ @type query: string
+ @return: True if the tasks are running
+ @rtype: boolean
+ '''
+ taskID = query
+ ms = self.dispatchID(identifier = taskID)
+ try:
+ for subtaskid in ms.subtaskIDs[taskID]:
+ t = ms.getTask(taskidentifier = subtaskid)
+ if t.state == t.STATE_RUNNING:
+ return True
+ except KeyError:
+ # the taskID does not belong to me
+ pass
+ return False
+
+ def addAggregator(self, credential, query):
+ '''
+ @summary: Method to define new data manipulation on slice monitoring data (to be exported public)
+ @param credential:
+ @type credential:
+ @param query: an owl document containing several SampleManipulationQuery instances
+ @type query: string
+ @return: aggregator identifier
+ @rtype: string
+ '''
+ #TODO: investigate if the service instance under this interface should be the boss
+ return self.service.attachAggregators(credential, query)
+
+ def removeAggregator(self, credential, query):
+ '''
+ @summary: Method to remove data manipulation on slice monitoring data (to be exported public)
+ @param credential:
+ @type credential:
+ @param query:
+ @type query: string
+ '''
+ aggregatorID = query
+ ms = self.dispatchID(identifier = aggregatorID)
+ try:
+ aggregatorids = ms.aggregatorIDs.pop(aggregatorID)
+ ms.formatters.pop(aggregatorID)
+ while len(aggregatorids):
+ aggregatorid = aggregatorids.pop()
+ ms.delAggregator(aggregatorid)
+ except KeyError:
+ # the taskID does not belong to me
+ pass
+
+ def fetchAggregatorData(self, credential, query):
+ '''
+ @summary: Method to refresh and serialize results of data manipulation on slice monitoring data (to be exported public)
+ @param credential:
+ @type credential:
+ @param query:
+ @type query: string
+ @return: result of aggregators
+ @rtype: string
+ '''
+ aggregatorID = query
+ ms = self.dispatchID(identifier = aggregatorID)
+ response = []
+ try:
+ for f in ms.formatters[aggregatorID]:
+ response.append( f.serialize() )
+ except Exception, e:
+ print "EEE", e
+ pass
+ return "[%s]" % "\n,\n".join(response)
+
+ def addCondition(self, credential, query):
+ raise InterfaceError("addCondition() method is not implemented")
+
+ def modifyCondition(self, credential, query):
+ raise InterfaceError("modifyCondition() method is not implemented")
+
+ def removeCondition(self, credential, query):
+ raise InterfaceError("removeCondition() method is not implemented")
+
+
+ proxy = property(_get_proxy,None,None)
+
+ service = property(_get_service,None,None)
diff --git a/Monitoring/src/main/python/Service/interface.py.old b/Monitoring/src/main/python/Service/interface.py.old
new file mode 100644
index 0000000..c914bc1
--- /dev/null
+++ b/Monitoring/src/main/python/Service/interface.py.old
@@ -0,0 +1,308 @@
+'''
+Created on 08.08.2011
+
+@author: steger, jozsef
+'''
+from Service.MonitoringService import MonitoringService
+import logging
+
+class InterfaceError(Exception):
+ pass
+
+#TODO: add and handle bindings at this level
+class MSInterface(object):
+ '''
+ @summary: Implements a thin service layer on top of the MonitoringService instance
+ to collect methods that need to be exported and mapped in the NOVI API.
+ It also provides a reference to the framework to be able to communicate with
+ remote MonitoringService instances. The log() method is a place holder
+ to sink information to be pushed in the NOVI UserFeedback service.
+ The emit() method is a place holder to sink signals to be pushed in the NOVI
+ Policy Service component installed on top of the same platform.
+ '''
+
+ def __init__(self, framework, reference, baseurl, config_owl):
+ '''
+ Constructor
+ @param framework: a service which provides getService() method to look up MonSrv instances of different reference
+ @type framework: Framework
+ @param reference: the name of the platform
+ @type reference: str
+ @param baseurl: the location of the ontology files. Either point to the file system or to a public url
+ @type baseurl: str
+ @param config_owl: platform specific configuration model
+ @type config_owl: str
+ '''
+ self.framework = framework
+ self.platform = reference
+ self._ms = MonitoringService(self, baseurl, config_owl)
+ self.logger = logging.getLogger(name = "NOVI.MSI.%s" % reference)
+
+ @property
+ def service(self):
+ '''
+ @return: the underlying monitoring service component
+ @rtype: MonitoringService
+ '''
+ return self._ms
+
+ @property
+ def proxy(self):
+ '''
+ @return: a proxy service to look up the rest of the monitoring service components
+ @rtype: Framework
+ '''
+ return self._framework
+
+ def dispatchID(self, identifier):
+ '''
+ @summary: this method finds the MonitoringService instance that is responsible for handling an identified Task or Aggregate
+ @param identifier: identifier of a task or aggregate, it follows the form: <platform>:<process|aggregate>:<id>
+ @type identifier: string
+ @return: the monitoring service instance
+ @rtype: MonitoringService
+ '''
+ try:
+ platform, _, _ = identifier.split(':')
+ if self.platform == platform:
+ return self.service
+ return self.framework.getService(platform)
+ except ValueError:
+ raise InterfaceError("Wrong identifier format")
+
+ def log(self, shortmsg, message):
+ # overridden by the JAVA wrapper
+ self.logger.info("[%s] %s" % (shortmsg, message))
+
+ def emit(self, what):
+ # overridden by the JAVA wrapper
+ self.framework.getPolicyService(self.platform).trigger(what)
+
+ # Test purpose function
+ def echo(self, platform):
+ '''
+ @summary: An integration tester function (to be exported public)
+ @param platform: name of the platform
+ @type platform: string
+ @return: messages of the platforms taking part in the message flow
+ @rtype: string
+ '''
+ self.logger.info("[echo] calling %s" % platform)
+ otherservice = self.framework.getService(platform)
+ return "%s -> %s" % (str(self.service), str(otherservice))
+
+
+ # Substrate monitoring function
+ def measure(self, credential, query):
+ '''
+ @summary: Method to handle substrate monitoring queries (to be exported public)
+ @param credential:
+ @type credential:
+ @param query: an owl document containing several BundleQuery instances
+ @type query: string
+ @return: response to the query
+ @rtype: string
+ '''
+ #TODO: split query and concatenate results
+ return self.service.measure(credential, query)
+
+ # Slice monitoring functions
+ def sliceTasks(self, credential, query):
+ raise InterfaceError("sliceTasks() method is not implemented")
+
+ def addTask(self, credential, query):
+ '''
+ @summary: Method to start slice monitoring tasks (to be exported public)
+ @param credential:
+ @type credential:
+ @param query: an owl document containing several BundleQuery instances
+ @type query: string
+ @return: process identifier
+ @rtype: string
+ '''
+ #TODO: investigate if the service instance under this interface should be the boss
+ return self.service.launchTasks(credential, query)
+
+ def describeTaskData(self, credential, query):
+ '''
+ @summary: Method to retrieve meta data of task data (to be exported public)
+ @param credential:
+ @type credential:
+ @param query:
+ @type query: string
+ @return: serialize the header of the data tables
+ @rtype: string
+ '''
+ taskID = query
+ ms = self.dispatchID(identifier = taskID)
+ #TODO: move this in the MonitoringService
+ headers = map(lambda x: x.header(), ms.formatters[taskID])
+ return "[%s]" % "\n,\n".join(headers)
+
+ def fetchTaskData(self, credential, query):
+ '''
+ @summary: Method to retrieve task data collected since last fetch or the start (to be exported public)
+ @param credential:
+ @type credential:
+ @param query:
+ @type query: string
+ @return: serialize the appended content of the data tables
+ @rtype: string
+ '''
+ taskID = query
+ ms = self.dispatchID(identifier = taskID)
+ #TODO: move this in the MonitoringService
+ response = []
+ try:
+ for f in ms.formatters[taskID]:
+ response.append( f.serialize() )
+ except Exception, e:
+ print "EEE", e
+ pass
+ return "[%s]" % "\n,\n".join(response)
+
+ def modifyTask(self, credential, query):
+ raise InterfaceError("modifyTask() method is not implemented")
+
+ def removeTask(self, credential, query):
+ '''
+ @summary: Method to remove a slice measurement task (to be exported public)
+ @param credential:
+ @type credential:
+ @param query:
+ @type query: string
+ '''
+ taskID = query
+ ms = self.dispatchID(identifier = taskID)
+ #TODO: move this in the MonitoringService
+ try:
+ subtaskids = ms.subtaskIDs.pop(taskID)
+ ms.formatters.pop(taskID)
+ while len(subtaskids):
+ subtaskid = subtaskids.pop()
+ ms.delTask(taskidentifier = subtaskid)
+ except KeyError:
+ # the taskID does not belong to me
+ pass
+
+ def enableTask(self, credential, query):
+ '''
+ @summary: Method to enable a slice measurement task (to be exported public)
+ @param credential:
+ @type credential:
+ @param query:
+ @type query: string
+ '''
+ taskID = query
+ ms = self.dispatchID(identifier = taskID)
+ try:
+ for subtaskid in ms.subtaskIDs[taskID]:
+ t = ms.getTask(taskidentifier = subtaskid)
+ t.enable()
+ except KeyError:
+ # the taskID does not belong to me
+ pass
+
+ def disableTask(self, credential, query):
+ '''
+ @summary: Method to disable a slice measurement task temporarily (to be exported public)
+ @param credential:
+ @type credential:
+ @param query:
+ @type query: string
+ '''
+ taskID = query
+ ms = self.dispatchID(identifier = taskID)
+ try:
+ for subtaskid in ms.subtaskIDs[taskID]:
+ t = ms.getTask(taskidentifier = subtaskid)
+ t.disable()
+ except KeyError:
+ # the taskID does not belong to me
+ pass
+
+ def getTaskStatus(self, credential, query):
+ '''
+ @summary: Method to check the state of a slice measurement task (to be exported public)
+ @param credential:
+ @type credential:
+ @param query:
+ @type query: string
+ @return: True if the tasks are running
+ @rtype: boolean
+ '''
+ taskID = query
+ ms = self.dispatchID(identifier = taskID)
+ try:
+ for subtaskid in ms.subtaskIDs[taskID]:
+ t = ms.getTask(taskidentifier = subtaskid)
+ if t.state == t.STATE_RUNNING:
+ return True
+ except KeyError:
+ # the taskID does not belong to me
+ pass
+ return False
+
+ def addAggregator(self, credential, query):
+ '''
+ @summary: Method to define new data manipulation on slice monitoring data (to be exported public)
+ @param credential:
+ @type credential:
+ @param query: an owl document containing several SampleManipulationQuery instances
+ @type query: string
+ @return: aggregator identifier
+ @rtype: string
+ '''
+ #TODO: investigate if the service instance under this interface should be the boss
+ return self.service.attachAggregators(credential, query)
+
+ def removeAggregator(self, credential, query):
+ '''
+ @summary: Method to remove data manipulation on slice monitoring data (to be exported public)
+ @param credential:
+ @type credential:
+ @param query:
+ @type query: string
+ '''
+ aggregatorID = query
+ ms = self.dispatchID(identifier = aggregatorID)
+ try:
+ aggregatorids = ms.aggregatorIDs.pop(aggregatorID)
+ ms.formatters.pop(aggregatorID)
+ while len(aggregatorids):
+ aggregatorid = aggregatorids.pop()
+ ms.delAggregator(aggregatorid)
+ except KeyError:
+ # the taskID does not belong to me
+ pass
+
+ def fetchAggregatorData(self, credential, query):
+ '''
+ @summary: Method to refresh and serialize results of data manipulation on slice monitoring data (to be exported public)
+ @param credential:
+ @type credential:
+ @param query:
+ @type query: string
+ @return: result of aggregators
+ @rtype: string
+ '''
+ aggregatorID = query
+ ms = self.dispatchID(identifier = aggregatorID)
+ response = []
+ try:
+ for f in ms.formatters[aggregatorID]:
+ response.append( f.serialize() )
+ except Exception, e:
+ print "EEE", e
+ pass
+ return "[%s]" % "\n,\n".join(response)
+
+ def addCondition(self, credential, query):
+ raise InterfaceError("addCondition() method is not implemented")
+
+ def modifyCondition(self, credential, query):
+ raise InterfaceError("modifyCondition() method is not implemented")
+
+ def removeCondition(self, credential, query):
+ raise InterfaceError("removeCondition() method is not implemented")
+
diff --git a/Monitoring/src/main/python/Service/mock_framework$py.class b/Monitoring/src/main/python/Service/mock_framework$py.class
new file mode 100644
index 0000000..a5818a0
--- /dev/null
+++ b/Monitoring/src/main/python/Service/mock_framework$py.class
Binary files differ
diff --git a/Monitoring/src/main/python/Service/mock_framework.py b/Monitoring/src/main/python/Service/mock_framework.py
new file mode 100644
index 0000000..3f00945
--- /dev/null
+++ b/Monitoring/src/main/python/Service/mock_framework.py
@@ -0,0 +1,69 @@
+'''
+Created on Nov 20, 2012
+
+@author: steger
+'''
+from Service.interface import MSInterface
+import logging
+from logging.handlers import TimedRotatingFileHandler
+from os import path, unlink
+
+class FrameworkError(Exception):
+ pass
+
+class PolicyMock(object):
+ def __init__(self):
+ fn = "/tmp/ps.log"
+ if path.exists(fn):
+ unlink(fn)
+ hdlr = TimedRotatingFileHandler(filename = fn)
+ self.logger = logging.getLogger("NOVI.PS")
+ self.logger.setLevel(level = logging.DEBUG)
+ self.logger.addHandler(hdlr = hdlr)
+
+ def trigger(self, what):
+ self.logger.info(what)
+
+class Framework(object):
+ '''
+ This class mimics the integration framework. It helps to look up remote Monitoring Service instances
+ '''
+
+ def __init__(self, baseurl, conf):
+ '''
+ Constructor
+ '''
+ self._if = {}
+ self._pol = PolicyMock()
+ for platform, config_owl in conf.iteritems():
+ fn = "/tmp/ms_%s.log" % platform
+ if path.exists(fn):
+ unlink(fn)
+ hdlr = TimedRotatingFileHandler(filename = fn)
+ l = logging.getLogger("NOVI.MS.%s" % platform)
+ l.setLevel(level = logging.DEBUG)
+ l.addHandler(hdlr = hdlr)
+ l = logging.getLogger("NOVI.MSI.%s" % platform)
+ l.setLevel(level = logging.DEBUG)
+ l.addHandler(hdlr = hdlr)
+ self._if[platform] = MSInterface(self, platform, baseurl, config_owl)
+
+ def _getInterface(self, platform):
+ try:
+ return self._if[platform]
+ except:
+ print "EE: %s platform not found" % platform
+ raise FrameworkError
+
+ def getService(self, platform):
+ try:
+ return self._if[platform]
+ except:
+ print "EE: %s platform not found" % platform
+ raise FrameworkError
+
+ def _getPolicyService(self, platform):
+ return self._pol
+
+ def _serviceList(self):
+ return self._if.values()
diff --git a/Monitoring/src/main/python/Service/mock_framework.py.old b/Monitoring/src/main/python/Service/mock_framework.py.old
new file mode 100644
index 0000000..6bbbce9
--- /dev/null
+++ b/Monitoring/src/main/python/Service/mock_framework.py.old
@@ -0,0 +1,69 @@
+'''
+Created on Nov 20, 2012
+
+@author: steger
+'''
+from Service.interface import MSInterface
+import logging
+from logging.handlers import TimedRotatingFileHandler
+from os import path, unlink
+
+class FrameworkError(Exception):
+ pass
+
+class PolicyMock(object):
+ def __init__(self):
+ fn = "/tmp/ps.log"
+ if path.exists(fn):
+ unlink(fn)
+ hdlr = TimedRotatingFileHandler(filename = fn)
+ self.logger = logging.getLogger("NOVI.PS")
+ self.logger.setLevel(level = logging.DEBUG)
+ self.logger.addHandler(hdlr = hdlr)
+
+ def trigger(self, what):
+ self.logger.info(what)
+
+class Framework(object):
+ '''
+ This class mimics the integration framework. It helps to look up remote Monitoring Service instances
+ '''
+
+ def __init__(self, baseurl, conf):
+ '''
+ Constructor
+ '''
+ self._if = {}
+ self._pol = PolicyMock()
+ for platform, config_owl in conf.iteritems():
+ fn = "/tmp/ms_%s.log" % platform
+ if path.exists(fn):
+ unlink(fn)
+ hdlr = TimedRotatingFileHandler(filename = fn)
+ l = logging.getLogger("NOVI.MS.%s" % platform)
+ l.setLevel(level = logging.DEBUG)
+ l.addHandler(hdlr = hdlr)
+ l = logging.getLogger("NOVI.MSI.%s" % platform)
+ l.setLevel(level = logging.DEBUG)
+ l.addHandler(hdlr = hdlr)
+ self._if[platform] = MSInterface(self, platform, baseurl, config_owl)
+
+ def getInterface(self, platform):
+ try:
+ return self._if[platform]
+ except:
+ print "EE: %s platform not found" % platform
+ raise FrameworkError
+
+ def getService(self, platform):
+ try:
+ return self._if[platform].service
+ except:
+ print "EE: %s platform not found" % platform
+ raise FrameworkError
+
+ def getPolicyService(self, platform):
+ return self._pol
+
+ def serviceList(self):
+ return self._if.values() \ No newline at end of file
diff --git a/Monitoring/src/main/python/Service/test.py b/Monitoring/src/main/python/Service/test.py
new file mode 100644
index 0000000..efc888d
--- /dev/null
+++ b/Monitoring/src/main/python/Service/test.py
@@ -0,0 +1,268 @@
+from __future__ import with_statement
+'''
+Created on Aug 10, 2011
+
+@author: steger
+'''
+import site
+site.addsitedir('../site-packages')
+
+import unittest2
+from rdflib import Graph, Namespace, Literal
+from Example.credentials import noviCredentialIARGS
+from Example.Platforms import FRAMEWORK
+from Util.MonitoringQueryImpl import MonitoringQueryImpl
+
+
+class Test(unittest2.TestCase):
+
+ def setUp(self):
+ self.MSI_planetlab = FRAMEWORK.getService('PlanetLab')
+ self.PL_O = self.MSI_planetlab.service.ontology
+ NS = self.PL_O.ns
+ self.S = NS('stat')['UnmodifiedExtractOfFeatureSamples']
+ self.F = NS('query')['Formatter_JSON']
+
+ def tearDown(self):
+ pass
+
+ def test_echo(self):
+ p1 = "PlanetLab"
+ p2 = "FEDERICA"
+ response = FRAMEWORK.getService(p1).echo(p1)
+ print response
+ i, o = response.split("->")
+ got = (i.split("@")[-1].strip(), o.split("@")[-1].strip())
+ expect = (p1, p1)
+ self.assertEquals(expect, got, "Echo reply differs from expected (%s): %s" % (expect, response))
+ response = FRAMEWORK.getService(p2).echo(p1)
+ i, o = response.split("->")
+ got = (i.split("@")[-1].strip(), o.split("@")[-1].strip())
+ expect = (p2, p1)
+ self.assertEquals(expect, got, "Echo reply differs from expected (%s): %s" % (expect, response))
+
+ def test_measure(self):
+ doc = "../monitoringmodel/monitoringQuery_example.owl" # % self.PL_O.baseurl
+ with open(doc) as fp:
+ q = fp.read()
+# response = self.MS_planetlab.measure(credential = mykeyring, query = q)
+ response = self.MSI_planetlab.measure(credential = [noviCredentialIARGS], query = q)
+ print response
+ self.assertTrue(response, "Got nothing due to former errors")
+ self.assertGreater(len(response.splitlines()), 26, "got empty measurement response")
+
+ def new_g(self):
+ g = Graph()
+ for k, (_, ns) in self.PL_O.ontology.iteritems():
+ g.bind(k, Namespace(ns))
+ return g
+
+ def save(self, fn, q):
+ try:
+ with open(fn, 'w') as f:
+ f.write(q)
+ except:
+ pass
+
+ def test_genq_mem(self):
+ g = self.new_g()
+ mns = Namespace("http://foo.bar/req.owl#")
+ g.bind('q', mns)
+ NS = self.PL_O.ns
+ TYPE = NS('rdf')['type']
+ Q = mns['measureMemoryInformation']
+# Q2 = mns['measureMemoryInformation2']
+ R = mns['smilax1']
+ I1 = mns['ifin']
+ I2 = mns['ifout']
+ IPADDR = Literal('150.254.160.19')
+ ADDR = mns['smilax_address']
+ g.add((Q, TYPE, NS('owl')['NamedIndividual']))
+ g.add((Q, TYPE, NS('query')['BundleQuery']))
+# g.add((Q, NS('feature')['hasFeature'], NS('feature')['FreeMemory']))
+ g.add((Q, NS('feature')['hasFeature'], NS('feature')['MemoryUtilization']))
+ g.add((Q, NS('query')['hasResource'], R))
+
+# g.add((Q2, TYPE, NS('owl')['NamedIndividual']))
+# g.add((Q2, TYPE, NS('query')['BundleQuery']))
+# g.add((Q2, NS('feature')['hasFeature'], NS('feature')['AvailableMemory']))
+# g.add((Q2, NS('query')['hasResource'], R))
+
+ g.add((Q, NS('stat')['hasSample'], self.S))
+ g.add((Q, NS('query')['hasFormatter'], self.F))
+
+# g.add((Q2, NS('stat')['hasSample'], self.S))
+# g.add((Q2, NS('query')['hasFormatter'], self.F))
+
+
+ g.add((R, TYPE, NS('core')['Node']))
+ g.add((R, TYPE, NS('core')['Resource']))
+ g.add((R, TYPE, NS('owl')['NamedIndividual']))
+
+ g.add((R, NS('core')['hasInboundInterface'], I1))
+ g.add((R, NS('core')['hasOutboundInterface'], I1))
+ g.add((I1, TYPE, NS('core')['Interface']))
+ g.add((I2, TYPE, NS('core')['Interface']))
+ g.add((I1, NS('core')['hasIPv4Address'], ADDR))
+ g.add((I2, NS('core')['hasIPv4Address'], ADDR))
+ g.add((ADDR, TYPE, NS('owl')['NamedIndividual']))
+ g.add((ADDR, TYPE, NS('unit')['IPAddress']))
+ g.add((ADDR, NS('unit')['hasValue'], IPADDR))
+ query = g.serialize()
+ self.save(fn = "/tmp/genq_mem.owl", q = query)
+ response = self.MSI_planetlab.measure(credential = [noviCredentialIARGS], query = query)
+ print response
+ self.assertGreater(len(response.splitlines()), 20, "got empty measurement response")
+
+ def test_genq_cpu(self):
+ g = self.new_g()
+ mns = Namespace("http://foo.bar/req.owl#")
+ g.bind('q', mns)
+ NS = self.PL_O.ns
+ TYPE = NS('rdf')['type']
+ Q = mns['measureCPUInformation']
+ R = mns['smilax1']
+ I1 = mns['ifin']
+ I2 = mns['ifout']
+ IPADDR = Literal('150.254.160.19')
+ ADDR = mns['smilax_address']
+ g.add((Q, TYPE, NS('owl')['NamedIndividual']))
+ g.add((Q, TYPE, NS('query')['BundleQuery']))
+ g.add((Q, NS('feature')['hasFeature'], NS('feature')['CPUUtilization']))
+ g.add((Q, NS('query')['hasResource'], R))
+
+ g.add((Q, NS('stat')['hasSample'], self.S))
+ g.add((Q, NS('query')['hasFormatter'], self.F))
+
+ g.add((R, TYPE, NS('core')['Node']))
+ g.add((R, TYPE, NS('core')['Resource']))
+ g.add((R, TYPE, NS('owl')['NamedIndividual']))
+
+ g.add((R, NS('core')['hasInboundInterface'], I1))
+ g.add((R, NS('core')['hasOutboundInterface'], I1))
+ g.add((I1, TYPE, NS('core')['Interface']))
+ g.add((I2, TYPE, NS('core')['Interface']))
+ g.add((I1, NS('core')['hasIPv4Address'], ADDR))
+ g.add((I2, NS('core')['hasIPv4Address'], ADDR))
+ g.add((ADDR, TYPE, NS('owl')['NamedIndividual']))
+ g.add((ADDR, TYPE, NS('unit')['IPAddress']))
+ g.add((ADDR, NS('unit')['hasValue'], IPADDR))
+ query = g.serialize()
+ self.save(fn = "/tmp/genq_cpu.owl", q = query)
+ response = self.MSI_planetlab.measure(credential = [noviCredentialIARGS], query = query)
+ print response
+ self.assertGreater(len(response.splitlines()), 16, "got empty measurement response")
+
+ def test_genq_err(self):
+ g = self.new_g()
+ mns = Namespace("http://foo.bar/req.owl#")
+ g.bind('q', mns)
+ NS = self.PL_O.ns
+ TYPE = NS('rdf')['type']
+ Q = mns['cantmeasureright']
+ R = mns['smilax1']
+ I1 = mns['ifin']
+ I2 = mns['ifout']
+ IPADDR = Literal('150.254.160.19')
+ ADDR = mns['smilax_address']
+ g.add((Q, TYPE, NS('owl')['NamedIndividual']))
+ g.add((Q, TYPE, NS('query')['BundleQuery']))
+ g.add((Q, NS('feature')['hasFeature'], NS('feature')['UsedMemory']))
+ g.add((Q, NS('query')['hasResource'], R))
+
+ g.add((Q, NS('stat')['hasSample'], self.S))
+ g.add((Q, NS('query')['hasFormatter'], self.F))
+
+ g.add((R, TYPE, NS('core')['Node']))
+ g.add((R, TYPE, NS('core')['Resource']))
+ g.add((R, TYPE, NS('owl')['NamedIndividual']))
+
+ g.add((R, NS('core')['hasInboundInterface'], I1))
+ g.add((R, NS('core')['hasOutboundInterface'], I1))
+ g.add((I1, TYPE, NS('core')['Interface']))
+ g.add((I2, TYPE, NS('core')['Interface']))
+ g.add((I1, NS('core')['hasIPv4Address'], ADDR))
+ g.add((I2, NS('core')['hasIPv4Address'], ADDR))
+ g.add((ADDR, TYPE, NS('owl')['NamedIndividual']))
+ g.add((ADDR, TYPE, NS('unit')['IPAddress']))
+ g.add((ADDR, NS('unit')['hasValue'], IPADDR))
+ query = g.serialize()
+ self.save(fn = "/tmp/genq_cpu.owl", q = query)
+ response = self.MSI_planetlab.measure(credential = [noviCredentialIARGS], query = query)
+ print response
+ self.assertTrue("error" in response, "no error message! got %s" % response)
+
+ def test_genq_helper(self):
+ q = MonitoringQueryImpl(self.MSI_planetlab._ms)
+ q.addFeature('measureMemoryInfo', 'MemoryUtilization')
+ q.addResource('measureMemoryInfo', 'smilax1', 'Node')
+ q.addInterface('smilax1', 'ifin', 'hasInboundInterface')
+ q.addInterface('smilax1', 'ifout', 'hasOutboundInterface')
+ q.defineInterface('ifin','150.254.160.19', 'hasIPv4Address')
+ q.defineInterface('ifout','150.254.160.19', 'hasIPv4Address')
+ query = q.serialize()
+ self.save(fn = "/tmp/genq_mem2.owl", q = query)
+ print query
+ response = self.MSI_planetlab.measure(credential = [noviCredentialIARGS], query = query)
+ print response
+ self.assertGreater(len(response.splitlines()), 20, "got empty measurement response")
+
+
+ def test_genq_complex(self):
+ g = self.new_g()
+ mns = Namespace("http://foo.bar/req.owl#")
+ g.bind('q', mns)
+ NS = self.PL_O.ns
+ TYPE = NS('rdf')['type']
+ Q1 = mns['measureMemoryInformation']
+ Q2 = mns['measureCPUInformation']
+ Q3 = mns['measureDiskInformation']
+ Q4 = mns['measureUsedMemory'] # should generate error
+ R = mns['smilax1']
+ I1 = mns['ifin']
+ I2 = mns['ifout']
+ P = mns['part']
+ IPADDR = Literal('150.254.160.19')
+ ADDR = mns['smilax_address']
+ for Q, feature in [(Q1, 'FreeMemory'), (Q2, 'CPULoad'), (Q3, 'FreeDiskSpace'), (Q4, 'UsedMemory')]:
+ g.add((Q, TYPE, NS('owl')['NamedIndividual']))
+ g.add((Q, TYPE, NS('query')['BundleQuery']))
+ g.add((Q, NS('feature')['hasFeature'], NS('feature')[feature]))
+ g.add((Q, NS('query')['hasResource'], R))
+
+ g.add((Q, NS('stat')['hasSample'], self.S))
+ g.add((Q, NS('query')['hasFormatter'], self.F))
+
+ g.add((R, TYPE, NS('core')['Node']))
+ g.add((R, TYPE, NS('core')['Resource']))
+ g.add((R, TYPE, NS('owl')['NamedIndividual']))
+
+ g.add((R, NS('core')['hasInboundInterface'], I1))
+ g.add((R, NS('core')['hasOutboundInterface'], I1))
+ g.add((I1, TYPE, NS('core')['Interface']))
+ g.add((I2, TYPE, NS('core')['Interface']))
+ g.add((I1, NS('core')['hasIPv4Address'], ADDR))
+ g.add((I2, NS('core')['hasIPv4Address'], ADDR))
+ g.add((ADDR, TYPE, NS('owl')['NamedIndividual']))
+ g.add((ADDR, TYPE, NS('unit')['IPAddress']))
+ g.add((ADDR, NS('unit')['hasValue'], IPADDR))
+ g.add((Q3, NS('param')['hasParameter'], P))
+ g.add((P, TYPE, NS('owl')['NamedIndividual']))
+ g.add((P, TYPE, NS('query')['QueryParameter']))
+ g.add((P, NS('param')['paramName'], Literal("partition")))
+ g.add((P, NS('unit')['hasValue'], Literal("/")))
+ g.add((P, NS('param')['hasType'], NS('param')['String']))
+ g.add((P, TYPE, NS('unit')['NameOfSomething']))
+ query = g.serialize()
+ self.save(fn = "/tmp/genq_complex.owl", q = query)
+ response = self.MSI_planetlab.measure(credential = [noviCredentialIARGS], query = query)
+ print response
+ self.assertTrue(response, "got nothing")
+ self.assertGreater(len(response.splitlines()), 26, "got empty measurement response")
+ self.assertTrue("error" in response, "no error message! got %s" % response)
+
+
+
+if __name__ == "__main__":
+ #import sys;sys.argv = ['', 'Test.test_genq']
+ unittest2.main()
diff --git a/Monitoring/src/main/python/Service/test.py.old b/Monitoring/src/main/python/Service/test.py.old
new file mode 100644
index 0000000..ea329a4
--- /dev/null
+++ b/Monitoring/src/main/python/Service/test.py.old
@@ -0,0 +1,245 @@
+'''
+Created on Aug 10, 2011
+
+@author: steger
+'''
+import unittest
+from rdflib import Graph, Namespace, Literal
+from Example.credentials import noviCredentialIARGS
+from Example.Platforms import FRAMEWORK
+
+class Test(unittest.TestCase):
+
+ def setUp(self):
+ self.MSI_planetlab = FRAMEWORK.getInterface('PlanetLab')
+ self.PL_O = self.MSI_planetlab.service.ontology
+ NS = self.PL_O.ns
+ self.S = NS('stat')['UnmodifiedExtractOfFeatureSamples']
+ self.F = NS('query')['Formatter_JSON']
+
+ def tearDown(self):
+ pass
+
+ def test_echo(self):
+ p1 = "PlanetLab"
+ p2 = "FEDERICA"
+ response = FRAMEWORK.getInterface(p1).echo(p1)
+ i, o = response.split("->")
+ got = (i.split("@")[-1].strip(), o.split("@")[-1].strip())
+ expect = (p1, p1)
+ self.assertEquals(expect, got, "Echo reply differs from expected (%s): %s" % (expect, response))
+ response = FRAMEWORK.getInterface(p2).echo(p1)
+ i, o = response.split("->")
+ got = (i.split("@")[-1].strip(), o.split("@")[-1].strip())
+ expect = (p2, p1)
+ self.assertEquals(expect, got, "Echo reply differs from expected (%s): %s" % (expect, response))
+
+ def test_measure(self):
+ doc = "%s/monitoringQuery_example.owl" % self.PL_O.baseurl
+ with open(doc) as fp:
+ q = fp.read()
+# response = self.MS_planetlab.measure(credential = mykeyring, query = q)
+ response = self.MSI_planetlab.measure(credential = [noviCredentialIARGS], query = q)
+ #print response
+ self.assertTrue(response, "Got nothing due to former errors")
+ self.assertGreater(len(response.splitlines()), 26, "got empty measurement response")
+
+ def new_g(self):
+ g = Graph()
+ for k, (_, ns) in self.PL_O.ontology.iteritems():
+ g.bind(k, Namespace(ns))
+ return g
+
+ def save(self, fn, q):
+ try:
+ with open(fn, 'w') as f:
+ f.write(q)
+ except:
+ pass
+
+ def test_genq_mem(self):
+ g = self.new_g()
+ mns = Namespace("http://foo.bar/req.owl#")
+ g.bind('q', mns)
+ NS = self.PL_O.ns
+ TYPE = NS('rdf')['type']
+ Q = mns['measureMemoryInformation']
+# Q2 = mns['measureMemoryInformation2']
+ R = mns['smilax1']
+ I1 = mns['ifin']
+ I2 = mns['ifout']
+ IPADDR = Literal('150.254.160.19')
+ ADDR = mns['smilax_address']
+ g.add((Q, TYPE, NS('owl')['NamedIndividual']))
+ g.add((Q, TYPE, NS('query')['BundleQuery']))
+# g.add((Q, NS('feature')['hasFeature'], NS('feature')['FreeMemory']))
+ g.add((Q, NS('feature')['hasFeature'], NS('feature')['MemoryUtilization']))
+ g.add((Q, NS('query')['hasResource'], R))
+
+# g.add((Q2, TYPE, NS('owl')['NamedIndividual']))
+# g.add((Q2, TYPE, NS('query')['BundleQuery']))
+# g.add((Q2, NS('feature')['hasFeature'], NS('feature')['AvailableMemory']))
+# g.add((Q2, NS('query')['hasResource'], R))
+
+ g.add((Q, NS('stat')['hasSample'], self.S))
+ g.add((Q, NS('query')['hasFormatter'], self.F))
+
+# g.add((Q2, NS('stat')['hasSample'], self.S))
+# g.add((Q2, NS('query')['hasFormatter'], self.F))
+
+
+ g.add((R, TYPE, NS('core')['Node']))
+ g.add((R, TYPE, NS('core')['Resource']))
+ g.add((R, TYPE, NS('owl')['NamedIndividual']))
+
+ g.add((R, NS('core')['hasInboundInterface'], I1))
+ g.add((R, NS('core')['hasOutboundInterface'], I1))
+ g.add((I1, TYPE, NS('core')['Interface']))
+ g.add((I2, TYPE, NS('core')['Interface']))
+ g.add((I1, NS('core')['hasIPv4Address'], ADDR))
+ g.add((I2, NS('core')['hasIPv4Address'], ADDR))
+ g.add((ADDR, TYPE, NS('owl')['NamedIndividual']))
+ g.add((ADDR, TYPE, NS('unit')['IPAddress']))
+ g.add((ADDR, NS('unit')['hasValue'], IPADDR))
+ query = g.serialize()
+ self.save(fn = "/tmp/genq_mem.owl", q = query)
+ response = self.MSI_planetlab.measure(credential = [noviCredentialIARGS], query = query)
+ #print response
+ self.assertGreater(len(response.splitlines()), 20, "got empty measurement response")
+
+ def test_genq_cpu(self):
+ g = self.new_g()
+ mns = Namespace("http://foo.bar/req.owl#")
+ g.bind('q', mns)
+ NS = self.PL_O.ns
+ TYPE = NS('rdf')['type']
+ Q = mns['measureCPUInformation']
+ R = mns['smilax1']
+ I1 = mns['ifin']
+ I2 = mns['ifout']
+ IPADDR = Literal('150.254.160.19')
+ ADDR = mns['smilax_address']
+ g.add((Q, TYPE, NS('owl')['NamedIndividual']))
+ g.add((Q, TYPE, NS('query')['BundleQuery']))
+ g.add((Q, NS('feature')['hasFeature'], NS('feature')['CPUUtilization']))
+ g.add((Q, NS('query')['hasResource'], R))
+
+ g.add((Q, NS('stat')['hasSample'], self.S))
+ g.add((Q, NS('query')['hasFormatter'], self.F))
+
+ g.add((R, TYPE, NS('core')['Node']))
+ g.add((R, TYPE, NS('core')['Resource']))
+ g.add((R, TYPE, NS('owl')['NamedIndividual']))
+
+ g.add((R, NS('core')['hasInboundInterface'], I1))
+ g.add((R, NS('core')['hasOutboundInterface'], I1))
+ g.add((I1, TYPE, NS('core')['Interface']))
+ g.add((I2, TYPE, NS('core')['Interface']))
+ g.add((I1, NS('core')['hasIPv4Address'], ADDR))
+ g.add((I2, NS('core')['hasIPv4Address'], ADDR))
+ g.add((ADDR, TYPE, NS('owl')['NamedIndividual']))
+ g.add((ADDR, TYPE, NS('unit')['IPAddress']))
+ g.add((ADDR, NS('unit')['hasValue'], IPADDR))
+ query = g.serialize()
+ self.save(fn = "/tmp/genq_cpu.owl", q = query)
+ response = self.MSI_planetlab.measure(credential = [noviCredentialIARGS], query = query)
+ #print response
+ self.assertGreater(len(response.splitlines()), 16, "got empty measurement response")
+
+ def test_genq_err(self):
+ g = self.new_g()
+ mns = Namespace("http://foo.bar/req.owl#")
+ g.bind('q', mns)
+ NS = self.PL_O.ns
+ TYPE = NS('rdf')['type']
+ Q = mns['cantmeasureright']
+ R = mns['smilax1']
+ I1 = mns['ifin']
+ I2 = mns['ifout']
+ IPADDR = Literal('150.254.160.19')
+ ADDR = mns['smilax_address']
+ g.add((Q, TYPE, NS('owl')['NamedIndividual']))
+ g.add((Q, TYPE, NS('query')['BundleQuery']))
+ g.add((Q, NS('feature')['hasFeature'], NS('feature')['UsedMemory']))
+ g.add((Q, NS('query')['hasResource'], R))
+
+ g.add((Q, NS('stat')['hasSample'], self.S))
+ g.add((Q, NS('query')['hasFormatter'], self.F))
+
+ g.add((R, TYPE, NS('core')['Node']))
+ g.add((R, TYPE, NS('core')['Resource']))
+ g.add((R, TYPE, NS('owl')['NamedIndividual']))
+
+ g.add((R, NS('core')['hasInboundInterface'], I1))
+ g.add((R, NS('core')['hasOutboundInterface'], I1))
+ g.add((I1, TYPE, NS('core')['Interface']))
+ g.add((I2, TYPE, NS('core')['Interface']))
+ g.add((I1, NS('core')['hasIPv4Address'], ADDR))
+ g.add((I2, NS('core')['hasIPv4Address'], ADDR))
+ g.add((ADDR, TYPE, NS('owl')['NamedIndividual']))
+ g.add((ADDR, TYPE, NS('unit')['IPAddress']))
+ g.add((ADDR, NS('unit')['hasValue'], IPADDR))
+ query = g.serialize()
+ self.save(fn = "/tmp/genq_cpu.owl", q = query)
+ response = self.MSI_planetlab.measure(credential = [noviCredentialIARGS], query = query)
+ #print response
+ self.assertTrue("error" in response, "no error message! got %s" % response)
+
+ def test_genq_complex(self):
+ g = self.new_g()
+ mns = Namespace("http://foo.bar/req.owl#")
+ g.bind('q', mns)
+ NS = self.PL_O.ns
+ TYPE = NS('rdf')['type']
+ Q1 = mns['measureMemoryInformation']
+ Q2 = mns['measureCPUInformation']
+ Q3 = mns['measureDiskInformation']
+ Q4 = mns['measureUsedMemory'] # should generate error
+ R = mns['smilax1']
+ I1 = mns['ifin']
+ I2 = mns['ifout']
+ P = mns['part']
+ IPADDR = Literal('150.254.160.19')
+ ADDR = mns['smilax_address']
+ for Q, feature in [(Q1, 'FreeMemory'), (Q2, 'CPULoad'), (Q3, 'FreeDiskSpace'), (Q4, 'UsedMemory')]:
+ g.add((Q, TYPE, NS('owl')['NamedIndividual']))
+ g.add((Q, TYPE, NS('query')['BundleQuery']))
+ g.add((Q, NS('feature')['hasFeature'], NS('feature')[feature]))
+ g.add((Q, NS('query')['hasResource'], R))
+
+ g.add((Q, NS('stat')['hasSample'], self.S))
+ g.add((Q, NS('query')['hasFormatter'], self.F))
+
+ g.add((R, TYPE, NS('core')['Node']))
+ g.add((R, TYPE, NS('core')['Resource']))
+ g.add((R, TYPE, NS('owl')['NamedIndividual']))
+
+ g.add((R, NS('core')['hasInboundInterface'], I1))
+ g.add((R, NS('core')['hasOutboundInterface'], I1))
+ g.add((I1, TYPE, NS('core')['Interface']))
+ g.add((I2, TYPE, NS('core')['Interface']))
+ g.add((I1, NS('core')['hasIPv4Address'], ADDR))
+ g.add((I2, NS('core')['hasIPv4Address'], ADDR))
+ g.add((ADDR, TYPE, NS('owl')['NamedIndividual']))
+ g.add((ADDR, TYPE, NS('unit')['IPAddress']))
+ g.add((ADDR, NS('unit')['hasValue'], IPADDR))
+ g.add((Q3, NS('param')['hasParameter'], P))
+ g.add((P, TYPE, NS('owl')['NamedIndividual']))
+ g.add((P, TYPE, NS('query')['QueryParameter']))
+ g.add((P, NS('param')['paramName'], Literal("partition")))
+ g.add((P, NS('unit')['hasValue'], Literal("/")))
+ g.add((P, NS('param')['hasType'], NS('param')['String']))
+ g.add((P, TYPE, NS('unit')['NameOfSomething']))
+ query = g.serialize()
+ self.save(fn = "/tmp/genq_complex.owl", q = query)
+ response = self.MSI_planetlab.measure(credential = [noviCredentialIARGS], query = query)
+ #print response
+ self.assertTrue(response, "got nothing")
+ self.assertGreater(len(response.splitlines()), 26, "got empty measurement response")
+ self.assertTrue("error" in response, "no error message! got %s" % response)
+
+
+
+if __name__ == "__main__":
+ #import sys;sys.argv = ['', 'Test.test_genq']
+ unittest.main()