Changeset 860

Show
Ignore:
Timestamp:
01/16/2007 12:26:42 PM (2 years ago)
Author:
chris
Message:

Added support for the Spread messaging system.

Now writes RRD updates using the timestamp supplied in the elvinrrd message, if there is one. Otherwise the current time is used.

Files:
1 modified

Legend:

Unmodified
Added
Removed
  • elvinrrd/trunk/elvinrrd/elvinrrd.py

    r815 r860  
    2525import getopt 
    2626import time 
     27import cPickle 
     28from cStringIO import StringIO 
    2729 
    2830# optparse is only available in 2.3+, but optik provides the same  
     
    4042import rrdtool  # requires py-rrdtool from http://sourceforge.net/projects/py-rrdtool/ 
    4143                #                       or http://www.nongnu.org/py-rrdtool/ 
    42 import elvin    # requires Elvin4 modules from http://elvin.dstc.edu.au/projects/pe4/index.html 
     44 
     45try: 
     46    import elvin    # Elvin4 modules from http://elvin.dstc.edu.au/projects/pe4/index.html 
     47except: 
     48    elvin = None 
     49 
     50try: 
     51    import spread   # Spread module from http://www.python.org/other/spread/ 
     52except: 
     53    spread = None 
     54 
     55if not elvin and not spread: 
     56    raise Exception("You must install either Spread or Elvin modules for Python.") 
    4357 
    4458# Default Elvin URL and SCOPE 
    4559ELVIN_URL='elvin://localhost' 
    4660ELVIN_SCOPE='elvin' 
     61 
     62# Default Spread host/port 
     63SPREAD_SERVER='localhost' 
     64SPREAD_PORT=4803 
     65 
    4766 
    4867################################################################ 
     
    7089     
    7190 
    72 class BaseElvin: 
    73     """Base Elvin class to handle opening and closing Elvin connections. 
    74     This should be sub-classed and consumer and/or producer functionality 
    75     added. 
    76     """ 
    77      
    78     def __init__(self, elvinurl=ELVIN_URL, elvinscope=ELVIN_SCOPE): 
    79         """Initialise connection to Elvin server, using (in order of preference): 
    80         - An Elvin server URL specified by elvinurl; 
    81         - An Elvin scope specified by elvinscope; 
    82         - Auto discovery if the above not set. 
    83         """ 
    84          
    85         self.elvinurl = elvinurl 
    86         self.elvinscope = elvinscope 
    87          
    88         if self.elvinurl and len(self.elvinurl) > 0: 
    89             connect_string=self.elvinurl 
    90         elif self.elvinscope and len(self.elvinscope) > 0: 
    91             connect_string=self.elvinscope 
    92         else: 
    93             connect_string='*'          # auto discovery 
    94          
    95         if options.verbose: 
    96             log( "Trying Elvin connection to %s" % (connect_string) ) 
    97          
    98         try: 
    99             self.elvinc = elvin.connect( connect_string ) 
    100             if options.verbose: 
    101                 log( "Elvin connection succeeded to %s" % (connect_string) ) 
    102         except: 
    103             sys.stderr.write( "Connection to elvin failed - connection string was '%s'\n" % (connect_string) ) 
    104             if options.logfile != None: 
    105                 log( "Connection to elvin failed - connection string was '%s'" % (connect_string) ) 
    106             sys.exit(1) 
    107      
    108     def cleanExit(self): 
    109         """Close the Elvin connection cleanly. 
    110         """ 
    111          
    112         self.elvinc.close()     # close Elvin connection 
    113      
    114  
    115 class storeconsumer(BaseElvin): 
    116     """An Elvin consumer to receive "ELVINRRD" messages from the Elvin network. 
    117     """ 
    118      
    119     def __init__(self, elvinurl=ELVIN_URL, elvinscope=ELVIN_SCOPE): 
    120         apply( BaseElvin.__init__, (self, elvinurl, elvinscope) ) 
    121      
    122     def register(self): 
    123         """Subscribe for Elvin messages containing the key "ELVINRRD". 
    124         """ 
    125          
    126         self.subscription = 'require(ELVINRRD)' 
    127          
    128         sub = self.elvinc.subscribe(self.subscription) 
    129         sub.add_listener(self.deliver) 
    130         sub.register() 
    131      
    132     def deliver(self, sub, msg, insec, rock): 
     91class Base(object): 
     92    def storeRRD(self, msg): 
    13393        """This method handles any received "ELVINRRD" messages. 
    13494        It parses a valid message and stores the information in the appropriate 
     
    168128                create = str(string.replace( r.create, '*', inx.group(1) ))     # replace all '*' with first string from match 
    169129         
     130        # Use timestamp if part of message, otherwise rrdtool will use current time 
     131        if 'timestamp' in msg.keys(): 
     132            timestamp = str(msg[u'timestamp']).strip() 
     133        else: 
     134            timestamp = 'N' 
     135         
    170136        if len(store) == 1: 
    171137            # only one variable to store, use default method 
     
    176142                return 1 
    177143             
    178             u = (rrdfile, "N:%s" % (str(val))) 
     144            u = (rrdfile, "%s:%s" % (timestamp, str(val))) 
    179145        else: 
    180146            # multiple variables to store - must name them 
    181147            ds = "-t" 
    182             n = "N:" 
     148            n = "%s:" %timestamp 
    183149            for s in store: 
    184150                try: 
     
    226192        return 0 
    227193     
     194     
     195 
     196class storeconsumer(Base): 
     197    """An Elvin consumer to receive "ELVINRRD" messages from the Elvin network. 
     198    """ 
     199     
     200    def __init__(self, elvinurl=ELVIN_URL, elvinscope=ELVIN_SCOPE): 
     201        """Initialise connection to Elvin server, using (in order of preference): 
     202        - An Elvin server URL specified by elvinurl; 
     203        - An Elvin scope specified by elvinscope; 
     204        - Auto discovery if the above not set. 
     205        """ 
     206         
     207        self.elvinurl = elvinurl 
     208        self.elvinscope = elvinscope 
     209         
     210        if self.elvinurl and len(self.elvinurl) > 0: 
     211            connect_string=self.elvinurl 
     212        elif self.elvinscope and len(self.elvinscope) > 0: 
     213            connect_string=self.elvinscope 
     214        else: 
     215            connect_string='*'          # auto discovery 
     216         
     217        if options.verbose: 
     218            log( "Trying Elvin connection to %s" % (connect_string) ) 
     219         
     220        try: 
     221            self.elvinc = elvin.connect( connect_string ) 
     222            if options.verbose: 
     223                log( "Elvin connection succeeded to %s" % (connect_string) ) 
     224        except: 
     225            sys.stderr.write( "Connection to elvin failed - connection string was '%s'\n" % (connect_string) ) 
     226            if options.logfile != None: 
     227                log( "Connection to elvin failed - connection string was '%s'" % (connect_string) ) 
     228            sys.exit(1) 
     229     
     230    def run(self): 
     231        self.elvinc.run() 
     232     
     233    def cleanExit(self): 
     234        """Close the Elvin connection cleanly. 
     235        """ 
     236        self.elvinc.close()     # close Elvin connection 
     237     
     238    def register(self): 
     239        """Subscribe for Elvin messages containing the key "ELVINRRD". 
     240        """ 
     241         
     242        self.subscription = 'require(ELVINRRD)' 
     243         
     244        sub = self.elvinc.subscribe(self.subscription) 
     245        sub.add_listener(self.deliver) 
     246        sub.register() 
     247     
     248    def deliver(self, sub, msg, insec, rock): 
     249        """This method handles any received "ELVINRRD" messages. 
     250        Messages are passed to the base class storeRRD() method. 
     251        """ 
     252        return self.storeRRD(msg) 
     253     
     254 
     255 
     256class SpreadStore(Base): 
     257    '''Handles receiving elvinrrd messages from a Spread service. 
     258    ''' 
     259     
     260    def __init__(self, spread_server='localhost', spread_port=spread.DEFAULT_SPREAD_PORT): 
     261        if spread_port is None: 
     262            spread_port = spread.DEFAULT_SPREAD_PORT 
     263        elif type(spread_port) != type(1): 
     264            spread_port = int(spread_port) 
     265         
     266        self.spread_server = spread_server 
     267        self.spread_port = spread_port 
     268         
     269        server = "%d@%s" % (self.spread_port, self.spread_server) 
     270        self._connection = spread.connect(server) 
     271     
     272    def register(self): 
     273        '''Register with Spread service for group "elvinrrd". 
     274        ''' 
     275        self._connection.join('elvinrrd') 
     276     
     277    def run(self): 
     278        while True: 
     279            m = self._connection.receive() 
     280            if m and hasattr(m, 'message'): 
     281                # RegularMsgType 
     282                mio = StringIO(m.message) 
     283                up = cPickle.Unpickler(mio) 
     284                msg = up.load() 
     285                self.storeRRD(msg) 
     286     
     287 
    228288 
    229289 
     
    328388 
    329389def main(): 
    330     usage_short = "[-hvd] [-e elvin_url] [-s elvin_scope] [-l logfile] -c elvinrrd.cf" 
     390    usage_short = "[-hvd] [-e elvin_url] [-s elvin_scope] [-S spread_server] [-P spread_port] [-l logfile] -c elvinrrd.cf" 
    331391    usage = "usage: %s %s" % (sys.argv[0], usage_short) 
    332392     
     
    345405    parser.add_option('-s', '--elvinscope', dest='elvin_scope', \ 
    346406            metavar="SCOPE", help="Use elvin scope SCOPE") 
     407    parser.add_option('-S', '--spreadserver', dest='spread_server',     \ 
     408            metavar="HOSTNAME", help="Use Spread server at HOSTNAME") 
     409    parser.add_option('-P', '--spreadport', dest='spread_port', \ 
     410            metavar="PORT", help="Use Spread port number PORT") 
    347411    parser.add_option('-l', '--logfile', dest='logfile',        \ 
    348412                        metavar="FILE", help="Log to FILE") 
    349413    parser.add_option('-c', '--configfile', dest='configfile',  \ 
    350414                        metavar="FILE", help="Load config from FILE") 
    351     parser.set_defaults(verbose=False, debug=False, elvin_url=ELVIN_URL, elvin_scope=ELVIN_SCOPE) 
     415    #parser.set_defaults(verbose=False, debug=False, elvin_url=ELVIN_URL, elvin_scope=ELVIN_SCOPE) 
     416    parser.set_defaults(verbose=False, debug=False) 
    352417     
    353418    global options 
     
    374439    rrd = rrdtool 
    375440     
    376     e = storeconsumer(options.elvin_url, options.elvin_scope) 
     441    if options.elvin_url or options.elvin_scope: 
     442        e = storeconsumer(options.elvin_url, options.elvin_scope) 
     443    elif options.spread_server or options.spread_port: 
     444        e = SpreadStore(options.spread_server, options.spread_port) 
     445    else: 
     446        raise Exception("Options for either Spread or Elvin messaging must be supplied.") 
     447         
    377448    e.rrd = rrd 
    378449    e.rrddict = rrddict 
    379450    e.register() 
    380451    if options.verbose: 
    381         log( "Starting Elvin main loop" ) 
    382     e.elvinc.run() 
     452        log( "Starting main loop" ) 
     453    e.run() 
    383454 
    384455