| 72 | | class BaseElvin: |
| 73 | | """Base Elvin class to handle opening and closing Elvin connections. |
| 74 | | This should be sub-classed and consumer and/or producer functionality |
| 75 | | added. |
| 76 | | """ |
| 77 | | |
| 78 | | def __init__(self, elvinurl=ELVIN_URL, elvinscope=ELVIN_SCOPE): |
| 79 | | """Initialise connection to Elvin server, using (in order of preference): |
| 80 | | - An Elvin server URL specified by elvinurl; |
| 81 | | - An Elvin scope specified by elvinscope; |
| 82 | | - Auto discovery if the above not set. |
| 83 | | """ |
| 84 | | |
| 85 | | self.elvinurl = elvinurl |
| 86 | | self.elvinscope = elvinscope |
| 87 | | |
| 88 | | if self.elvinurl and len(self.elvinurl) > 0: |
| 89 | | connect_string=self.elvinurl |
| 90 | | elif self.elvinscope and len(self.elvinscope) > 0: |
| 91 | | connect_string=self.elvinscope |
| 92 | | else: |
| 93 | | connect_string='*' # auto discovery |
| 94 | | |
| 95 | | if options.verbose: |
| 96 | | log( "Trying Elvin connection to %s" % (connect_string) ) |
| 97 | | |
| 98 | | try: |
| 99 | | self.elvinc = elvin.connect( connect_string ) |
| 100 | | if options.verbose: |
| 101 | | log( "Elvin connection succeeded to %s" % (connect_string) ) |
| 102 | | except: |
| 103 | | sys.stderr.write( "Connection to elvin failed - connection string was '%s'\n" % (connect_string) ) |
| 104 | | if options.logfile != None: |
| 105 | | log( "Connection to elvin failed - connection string was '%s'" % (connect_string) ) |
| 106 | | sys.exit(1) |
| 107 | | |
| 108 | | def cleanExit(self): |
| 109 | | """Close the Elvin connection cleanly. |
| 110 | | """ |
| 111 | | |
| 112 | | self.elvinc.close() # close Elvin connection |
| 113 | | |
| 114 | | |
| 115 | | class storeconsumer(BaseElvin): |
| 116 | | """An Elvin consumer to receive "ELVINRRD" messages from the Elvin network. |
| 117 | | """ |
| 118 | | |
| 119 | | def __init__(self, elvinurl=ELVIN_URL, elvinscope=ELVIN_SCOPE): |
| 120 | | apply( BaseElvin.__init__, (self, elvinurl, elvinscope) ) |
| 121 | | |
| 122 | | def register(self): |
| 123 | | """Subscribe for Elvin messages containing the key "ELVINRRD". |
| 124 | | """ |
| 125 | | |
| 126 | | self.subscription = 'require(ELVINRRD)' |
| 127 | | |
| 128 | | sub = self.elvinc.subscribe(self.subscription) |
| 129 | | sub.add_listener(self.deliver) |
| 130 | | sub.register() |
| 131 | | |
| 132 | | def deliver(self, sub, msg, insec, rock): |
| | 91 | class Base(object): |
| | 92 | def storeRRD(self, msg): |
| | 194 | |
| | 195 | |
| | 196 | class storeconsumer(Base): |
| | 197 | """An Elvin consumer to receive "ELVINRRD" messages from the Elvin network. |
| | 198 | """ |
| | 199 | |
| | 200 | def __init__(self, elvinurl=ELVIN_URL, elvinscope=ELVIN_SCOPE): |
| | 201 | """Initialise connection to Elvin server, using (in order of preference): |
| | 202 | - An Elvin server URL specified by elvinurl; |
| | 203 | - An Elvin scope specified by elvinscope; |
| | 204 | - Auto discovery if the above not set. |
| | 205 | """ |
| | 206 | |
| | 207 | self.elvinurl = elvinurl |
| | 208 | self.elvinscope = elvinscope |
| | 209 | |
| | 210 | if self.elvinurl and len(self.elvinurl) > 0: |
| | 211 | connect_string=self.elvinurl |
| | 212 | elif self.elvinscope and len(self.elvinscope) > 0: |
| | 213 | connect_string=self.elvinscope |
| | 214 | else: |
| | 215 | connect_string='*' # auto discovery |
| | 216 | |
| | 217 | if options.verbose: |
| | 218 | log( "Trying Elvin connection to %s" % (connect_string) ) |
| | 219 | |
| | 220 | try: |
| | 221 | self.elvinc = elvin.connect( connect_string ) |
| | 222 | if options.verbose: |
| | 223 | log( "Elvin connection succeeded to %s" % (connect_string) ) |
| | 224 | except: |
| | 225 | sys.stderr.write( "Connection to elvin failed - connection string was '%s'\n" % (connect_string) ) |
| | 226 | if options.logfile != None: |
| | 227 | log( "Connection to elvin failed - connection string was '%s'" % (connect_string) ) |
| | 228 | sys.exit(1) |
| | 229 | |
| | 230 | def run(self): |
| | 231 | self.elvinc.run() |
| | 232 | |
| | 233 | def cleanExit(self): |
| | 234 | """Close the Elvin connection cleanly. |
| | 235 | """ |
| | 236 | self.elvinc.close() # close Elvin connection |
| | 237 | |
| | 238 | def register(self): |
| | 239 | """Subscribe for Elvin messages containing the key "ELVINRRD". |
| | 240 | """ |
| | 241 | |
| | 242 | self.subscription = 'require(ELVINRRD)' |
| | 243 | |
| | 244 | sub = self.elvinc.subscribe(self.subscription) |
| | 245 | sub.add_listener(self.deliver) |
| | 246 | sub.register() |
| | 247 | |
| | 248 | def deliver(self, sub, msg, insec, rock): |
| | 249 | """This method handles any received "ELVINRRD" messages. |
| | 250 | Messages are passed to the base class storeRRD() method. |
| | 251 | """ |
| | 252 | return self.storeRRD(msg) |
| | 253 | |
| | 254 | |
| | 255 | |
| | 256 | class SpreadStore(Base): |
| | 257 | '''Handles receiving elvinrrd messages from a Spread service. |
| | 258 | ''' |
| | 259 | |
| | 260 | def __init__(self, spread_server='localhost', spread_port=spread.DEFAULT_SPREAD_PORT): |
| | 261 | if spread_port is None: |
| | 262 | spread_port = spread.DEFAULT_SPREAD_PORT |
| | 263 | elif type(spread_port) != type(1): |
| | 264 | spread_port = int(spread_port) |
| | 265 | |
| | 266 | self.spread_server = spread_server |
| | 267 | self.spread_port = spread_port |
| | 268 | |
| | 269 | server = "%d@%s" % (self.spread_port, self.spread_server) |
| | 270 | self._connection = spread.connect(server) |
| | 271 | |
| | 272 | def register(self): |
| | 273 | '''Register with Spread service for group "elvinrrd". |
| | 274 | ''' |
| | 275 | self._connection.join('elvinrrd') |
| | 276 | |
| | 277 | def run(self): |
| | 278 | while True: |
| | 279 | m = self._connection.receive() |
| | 280 | if m and hasattr(m, 'message'): |
| | 281 | # RegularMsgType |
| | 282 | mio = StringIO(m.message) |
| | 283 | up = cPickle.Unpickler(mio) |
| | 284 | msg = up.load() |
| | 285 | self.storeRRD(msg) |
| | 286 | |
| | 287 | |