Package proton :: Module reactor
[frames] | no frames]

Source Code for Module proton.reactor

  1  # 
  2  # Licensed to the Apache Software Foundation (ASF) under one 
  3  # or more contributor license agreements.  See the NOTICE file 
  4  # distributed with this work for additional information 
  5  # regarding copyright ownership.  The ASF licenses this file 
  6  # to you under the Apache License, Version 2.0 (the 
  7  # "License"); you may not use this file except in compliance 
  8  # with the License.  You may obtain a copy of the License at 
  9  # 
 10  #   http://www.apache.org/licenses/LICENSE-2.0 
 11  # 
 12  # Unless required by applicable law or agreed to in writing, 
 13  # software distributed under the License is distributed on an 
 14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
 15  # KIND, either express or implied.  See the License for the 
 16  # specific language governing permissions and limitations 
 17  # under the License. 
 18  # 
 19  import logging, os, Queue, socket, time, types 
 20  from heapq import heappush, heappop, nsmallest 
 21  from proton import Collector, Connection, ConnectionException, Delivery, Described, dispatch 
 22  from proton import Endpoint, Event, EventBase, EventType, generate_uuid, Handler, Link, Message 
 23  from proton import ProtonException, PN_ACCEPTED, PN_PYREF, SASL, Session, SSL, SSLDomain, SSLUnavailable, symbol 
 24  from proton import Terminus, Timeout, Transport, TransportException, ulong, Url 
 25  from select import select 
 26  from proton.handlers import OutgoingMessageHandler 
 27  from proton import unicode2utf8, utf82unicode 
 28   
 29  import traceback 
 30  from proton import WrappedHandler, _chandler, secs2millis, millis2secs, timeout2millis, millis2timeout, Selectable 
 31  from wrapper import Wrapper, PYCTX 
 32  from cproton import * 
33 34 -class Task(Wrapper):
35 36 @staticmethod
37 - def wrap(impl):
38 if impl is None: 39 return None 40 else: 41 return Task(impl)
42
43 - def __init__(self, impl):
44 Wrapper.__init__(self, impl, pn_task_attachments)
45
46 - def _init(self):
47 pass
48
49 -class Acceptor(Wrapper):
50
51 - def __init__(self, impl):
52 Wrapper.__init__(self, impl)
53
54 - def close(self):
55 pn_acceptor_close(self._impl)
56
57 -class Reactor(Wrapper):
58 59 @staticmethod
60 - def wrap(impl):
61 if impl is None: 62 return None 63 else: 64 record = pn_reactor_attachments(impl) 65 attrs = pn_void2py(pn_record_get(record, PYCTX)) 66 if attrs and 'subclass' in attrs: 67 return attrs['subclass'](impl=impl) 68 else: 69 return Reactor(impl=impl)
70
71 - def __init__(self, *handlers, **kwargs):
72 Wrapper.__init__(self, kwargs.get("impl", pn_reactor), pn_reactor_attachments) 73 for h in handlers: 74 self.handler.add(h)
75
76 - def _init(self):
77 self.errors = []
78
79 - def on_error(self, info):
80 self.errors.append(info) 81 self.yield_()
82
83 - def _get_global(self):
84 return WrappedHandler.wrap(pn_reactor_get_global_handler(self._impl), self.on_error)
85
86 - def _set_global(self, handler):
87 impl = _chandler(handler, self.on_error) 88 pn_reactor_set_global_handler(self._impl, impl) 89 pn_decref(impl)
90 91 global_handler = property(_get_global, _set_global) 92
93 - def _get_timeout(self):
94 return millis2timeout(pn_reactor_get_timeout(self._impl))
95
96 - def _set_timeout(self, secs):
97 return pn_reactor_set_timeout(self._impl, timeout2millis(secs))
98 99 timeout = property(_get_timeout, _set_timeout) 100
101 - def yield_(self):
102 pn_reactor_yield(self._impl)
103
104 - def mark(self):
105 pn_reactor_mark(self._impl)
106
107 - def _get_handler(self):
108 return WrappedHandler.wrap(pn_reactor_get_handler(self._impl), self.on_error)
109
110 - def _set_handler(self, handler):
111 impl = _chandler(handler, self.on_error) 112 pn_reactor_set_handler(self._impl, impl) 113 pn_decref(impl)
114 115 handler = property(_get_handler, _set_handler) 116
117 - def run(self):
118 self.timeout = 3.14159265359 119 self.start() 120 while self.process(): pass 121 self.stop()
122
123 - def wakeup(self):
124 n = pn_reactor_wakeup(self._impl) 125 if n: raise IOError(pn_error_text(pn_io_error(pn_reactor_io(self._impl))))
126
127 - def start(self):
128 pn_reactor_start(self._impl)
129 130 @property
131 - def quiesced(self):
132 return pn_reactor_quiesced(self._impl)
133
134 - def _check_errors(self):
135 if self.errors: 136 for exc, value, tb in self.errors[:-1]: 137 traceback.print_exception(exc, value, tb) 138 exc, value, tb = self.errors[-1] 139 raise exc, value, tb
140
141 - def process(self):
142 result = pn_reactor_process(self._impl) 143 self._check_errors() 144 return result
145
146 - def stop(self):
147 pn_reactor_stop(self._impl) 148 self._check_errors() 149 self.global_handler = None 150 self.handler = None
151
152 - def schedule(self, delay, task):
153 impl = _chandler(task, self.on_error) 154 task = Task.wrap(pn_reactor_schedule(self._impl, secs2millis(delay), impl)) 155 pn_decref(impl) 156 return task
157
158 - def acceptor(self, host, port, handler=None):
159 impl = _chandler(handler, self.on_error) 160 aimpl = pn_reactor_acceptor(self._impl, unicode2utf8(host), str(port), impl) 161 pn_decref(impl) 162 if aimpl: 163 return Acceptor(aimpl) 164 else: 165 raise IOError("%s (%s:%s)" % (pn_error_text(pn_io_error(pn_reactor_io(self._impl))), host, port))
166
167 - def connection(self, handler=None):
168 impl = _chandler(handler, self.on_error) 169 result = Connection.wrap(pn_reactor_connection(self._impl, impl)) 170 pn_decref(impl) 171 return result
172
173 - def selectable(self, handler=None):
174 impl = _chandler(handler, self.on_error) 175 result = Selectable.wrap(pn_reactor_selectable(self._impl)) 176 if impl: 177 record = pn_selectable_attachments(result._impl) 178 pn_record_set_handler(record, impl) 179 pn_decref(impl) 180 return result
181
182 - def update(self, sel):
183 pn_reactor_update(self._impl, sel._impl)
184
185 - def push_event(self, obj, etype):
186 pn_collector_put(pn_reactor_collector(self._impl), PN_PYREF, pn_py2void(obj), etype.number)
187 188 from proton import wrappers as _wrappers 189 _wrappers["pn_reactor"] = lambda x: Reactor.wrap(pn_cast_pn_reactor(x)) 190 _wrappers["pn_task"] = lambda x: Task.wrap(pn_cast_pn_task(x))
191 192 193 -class EventInjector(object):
194 """ 195 Can be added to a reactor to allow events to be triggered by an 196 external thread but handled on the event thread associated with 197 the reactor. An instance of this class can be passed to the 198 Reactor.selectable() method of the reactor in order to activate 199 it. The close() method should be called when it is no longer 200 needed, to allow the event loop to end if needed. 201 """
202 - def __init__(self):
203 self.queue = Queue.Queue() 204 self.pipe = os.pipe() 205 self._closed = False
206
207 - def trigger(self, event):
208 """ 209 Request that the given event be dispatched on the event thread 210 of the reactor to which this EventInjector was added. 211 """ 212 self.queue.put(event) 213 os.write(self.pipe[1], "!")
214
215 - def close(self):
216 """ 217 Request that this EventInjector be closed. Existing events 218 will be dispctahed on the reactors event dispactch thread, 219 then this will be removed from the set of interest. 220 """ 221 self._closed = True 222 os.write(self.pipe[1], "!")
223
224 - def fileno(self):
225 return self.pipe[0]
226
227 - def on_selectable_init(self, event):
228 sel = event.context 229 sel.fileno(self.fileno()) 230 sel.reading = True 231 event.reactor.update(sel)
232
233 - def on_selectable_readable(self, event):
234 os.read(self.pipe[0], 512) 235 while not self.queue.empty(): 236 requested = self.queue.get() 237 event.reactor.push_event(requested.context, requested.type) 238 if self._closed: 239 s = event.context 240 s.terminate() 241 event.reactor.update(s)
242
243 244 -class ApplicationEvent(EventBase):
245 """ 246 Application defined event, which can optionally be associated with 247 an engine object and or an arbitrary subject 248 """
249 - def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None):
250 super(ApplicationEvent, self).__init__(PN_PYREF, self, EventType(typename)) 251 self.connection = connection 252 self.session = session 253 self.link = link 254 self.delivery = delivery 255 if self.delivery: 256 self.link = self.delivery.link 257 if self.link: 258 self.session = self.link.session 259 if self.session: 260 self.connection = self.session.connection 261 self.subject = subject
262
263 - def __repr__(self):
264 objects = [self.connection, self.session, self.link, self.delivery, self.subject] 265 return "%s(%s)" % (typename, ", ".join([str(o) for o in objects if o is not None]))
266
267 -class Transaction(object):
268 """ 269 Class to track state of an AMQP 1.0 transaction. 270 """
271 - def __init__(self, txn_ctrl, handler, settle_before_discharge=False):
272 self.txn_ctrl = txn_ctrl 273 self.handler = handler 274 self.id = None 275 self._declare = None 276 self._discharge = None 277 self.failed = False 278 self._pending = [] 279 self.settle_before_discharge = settle_before_discharge 280 self.declare()
281
282 - def commit(self):
283 self.discharge(False)
284
285 - def abort(self):
286 self.discharge(True)
287
288 - def declare(self):
289 self._declare = self._send_ctrl(symbol(u'amqp:declare:list'), [None])
290
291 - def discharge(self, failed):
292 self.failed = failed 293 self._discharge = self._send_ctrl(symbol(u'amqp:discharge:list'), [self.id, failed])
294
295 - def _send_ctrl(self, descriptor, value):
296 delivery = self.txn_ctrl.send(Message(body=Described(descriptor, value))) 297 delivery.transaction = self 298 return delivery
299
300 - def send(self, sender, msg, tag=None):
301 dlv = sender.send(msg, tag=tag) 302 dlv.local.data = [self.id] 303 dlv.update(0x34) 304 return dlv
305
306 - def accept(self, delivery):
307 self.update(delivery, PN_ACCEPTED) 308 if self.settle_before_discharge: 309 delivery.settle() 310 else: 311 self._pending.append(delivery)
312
313 - def update(self, delivery, state=None):
314 if state: 315 delivery.local.data = [self.id, Described(ulong(state), [])] 316 delivery.update(0x34)
317
318 - def _release_pending(self):
319 for d in self._pending: 320 d.update(Delivery.RELEASED) 321 d.settle() 322 self._clear_pending()
323
324 - def _clear_pending(self):
325 self._pending = []
326
327 - def handle_outcome(self, event):
328 if event.delivery == self._declare: 329 if event.delivery.remote.data: 330 self.id = event.delivery.remote.data[0] 331 self.handler.on_transaction_declared(event) 332 elif event.delivery.remote_state == Delivery.REJECTED: 333 self.handler.on_transaction_declare_failed(event) 334 else: 335 logging.warning("Unexpected outcome for declare: %s" % event.delivery.remote_state) 336 self.handler.on_transaction_declare_failed(event) 337 elif event.delivery == self._discharge: 338 if event.delivery.remote_state == Delivery.REJECTED: 339 if not self.failed: 340 self.handler.on_transaction_commit_failed(event) 341 self._release_pending() # make this optional? 342 else: 343 if self.failed: 344 self.handler.on_transaction_aborted(event) 345 self._release_pending() 346 else: 347 self.handler.on_transaction_committed(event) 348 self._clear_pending()
349
350 -class LinkOption(object):
351 """ 352 Abstract interface for link configuration options 353 """
354 - def apply(self, link):
355 """ 356 Subclasses will implement any configuration logic in this 357 method 358 """ 359 pass
360 - def test(self, link):
361 """ 362 Subclasses can override this to selectively apply an option 363 e.g. based on some link criteria 364 """ 365 return True
366
367 -class AtMostOnce(LinkOption):
368 - def apply(self, link):
370
371 -class AtLeastOnce(LinkOption):
372 - def apply(self, link):
375
376 -class SenderOption(LinkOption):
377 - def apply(self, sender): pass
378 - def test(self, link): return link.is_sender
379
380 -class ReceiverOption(LinkOption):
381 - def apply(self, receiver): pass
382 - def test(self, link): return link.is_receiver
383
384 -class DynamicNodeProperties(LinkOption):
385 - def __init__(self, props={}):
386 self.properties = {} 387 for k in props: 388 if isinstance(k, symbol): 389 self.properties[k] = props[k] 390 else: 391 self.properties[symbol(k)] = props[k]
392
393 - def apply(self, link):
398
399 -class Filter(ReceiverOption):
400 - def __init__(self, filter_set={}):
401 self.filter_set = filter_set
402
403 - def apply(self, receiver):
404 receiver.source.filter.put_dict(self.filter_set)
405
406 -class Selector(Filter):
407 """ 408 Configures a link with a message selector filter 409 """
410 - def __init__(self, value, name='selector'):
411 super(Selector, self).__init__({symbol(name): Described(symbol('apache.org:selector-filter:string'), value)})
412
413 -class Move(ReceiverOption):
414 - def apply(self, receiver):
416
417 -class Copy(ReceiverOption):
418 - def apply(self, receiver):
420 428
429 -def _create_session(connection, handler=None):
430 session = connection.session() 431 session.open() 432 return session
433
434 435 -def _get_attr(target, name):
436 if hasattr(target, name): 437 return getattr(target, name) 438 else: 439 return None
440
441 -class SessionPerConnection(object):
442 - def __init__(self):
443 self._default_session = None
444
445 - def session(self, connection):
446 if not self._default_session: 447 self._default_session = _create_session(connection) 448 self._default_session.context = self 449 return self._default_session
450
451 - def on_session_remote_close(self, event):
452 event.connection.close() 453 self._default_session = None
454
455 -class GlobalOverrides(object):
456 """ 457 Internal handler that triggers the necessary socket connect for an 458 opened connection. 459 """
460 - def __init__(self, base):
461 self.base = base
462
463 - def on_unhandled(self, name, event):
464 if not self._override(event): 465 event.dispatch(self.base)
466
467 - def _override(self, event):
468 conn = event.connection 469 return conn and hasattr(conn, '_overrides') and event.dispatch(conn._overrides)
470
471 -class Connector(Handler):
472 """ 473 Internal handler that triggers the necessary socket connect for an 474 opened connection. 475 """
476 - def __init__(self, connection):
477 self.connection = connection 478 self.address = None 479 self.heartbeat = None 480 self.reconnect = None 481 self.ssl_domain = None
482
483 - def _connect(self, connection):
484 url = self.address.next() 485 # IoHandler uses the hostname to determine where to try to connect to 486 connection.hostname = "%s:%i" % (url.host, url.port) 487 logging.info("connecting to %s..." % connection.hostname) 488 489 transport = Transport() 490 transport.bind(connection) 491 if self.heartbeat: 492 transport.idle_timeout = self.heartbeat 493 if url.scheme == 'amqps' and self.ssl_domain: 494 self.ssl = SSL(transport, self.ssl_domain) 495 self.ssl.peer_hostname = url.host 496 if url.username: 497 sasl = transport.sasl() 498 if url.username == 'anonymous': 499 sasl.mechanisms('ANONYMOUS') 500 else: 501 sasl.plain(url.username, url.password)
502
503 - def on_connection_local_open(self, event):
504 self._connect(event.connection)
505
506 - def on_connection_remote_open(self, event):
507 logging.info("connected to %s" % event.connection.hostname) 508 if self.reconnect: 509 self.reconnect.reset() 510 self.transport = None
511
512 - def on_transport_tail_closed(self, event):
513 self.on_transport_closed(event)
514
515 - def on_transport_closed(self, event):
516 if self.connection and self.connection.state & Endpoint.LOCAL_ACTIVE: 517 if self.reconnect: 518 event.transport.unbind() 519 delay = self.reconnect.next() 520 if delay == 0: 521 logging.info("Disconnected, reconnecting...") 522 self._connect(self.connection) 523 else: 524 logging.info("Disconnected will try to reconnect after %s seconds" % delay) 525 event.reactor.schedule(delay, self) 526 else: 527 logging.info("Disconnected") 528 self.connection = None
529
530 - def on_timer_task(self, event):
531 self._connect(self.connection)
532
533 - def on_connection_remote_close(self, event):
534 self.connection = None
535
536 -class Backoff(object):
537 """ 538 A reconnect strategy involving an increasing delay between 539 retries, up to a maximum or 10 seconds. 540 """
541 - def __init__(self):
542 self.delay = 0
543
544 - def reset(self):
545 self.delay = 0
546
547 - def next(self):
548 current = self.delay 549 if current == 0: 550 self.delay = 0.1 551 else: 552 self.delay = min(10, 2*current) 553 return current
554
555 -class Urls(object):
556 - def __init__(self, values):
557 self.values = [Url(v) for v in values] 558 self.i = iter(self.values)
559
560 - def __iter__(self):
561 return self
562
563 - def next(self):
564 try: 565 return self.i.next() 566 except StopIteration: 567 self.i = iter(self.values) 568 return self.i.next()
569
570 -class SSLConfig(object):
571 - def __init__(self):
574
575 - def set_credentials(self, cert_file, key_file, password):
576 self.client.set_credentials(cert_file, key_file, password) 577 self.server.set_credentials(cert_file, key_file, password)
578
579 - def set_trusted_ca_db(self, certificate_db):
580 self.client.set_trusted_ca_db(certificate_db) 581 self.server.set_trusted_ca_db(certificate_db)
582
583 584 -class Container(Reactor):
585 """A representation of the AMQP concept of a 'container', which 586 lossely speaking is something that establishes links to or from 587 another container, over which messages are transfered. This is 588 an extension to the Reactor class that adds convenience methods 589 for creating connections and sender- or receiver- links. 590 """
591 - def __init__(self, *handlers, **kwargs):
592 super(Container, self).__init__(*handlers, **kwargs) 593 if "impl" not in kwargs: 594 try: 595 self.ssl = SSLConfig() 596 except SSLUnavailable: 597 self.ssl = None 598 self.global_handler = GlobalOverrides(kwargs.get('global_handler', self.global_handler)) 599 self.trigger = None 600 self.container_id = str(generate_uuid()) 601 Wrapper.__setattr__(self, 'subclass', self.__class__)
602
603 - def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None):
604 """ 605 Initiates the establishment of an AMQP connection. Returns an 606 instance of proton.Connection. 607 """ 608 conn = self.connection(handler) 609 conn.container = self.container_id or str(generate_uuid()) 610 611 connector = Connector(conn) 612 conn._overrides = connector 613 if url: connector.address = Urls([url]) 614 elif urls: connector.address = Urls(urls) 615 elif address: connector.address = address 616 else: raise ValueError("One of url, urls or address required") 617 if heartbeat: 618 connector.heartbeat = heartbeat 619 if reconnect: 620 connector.reconnect = reconnect 621 elif reconnect is None: 622 connector.reconnect = Backoff() 623 connector.ssl_domain = ssl_domain or (self.ssl and self.ssl.client) 624 conn._session_policy = SessionPerConnection() #todo: make configurable 625 conn.open() 626 return conn
627
628 - def _get_id(self, container, remote, local):
629 if local and remote: "%s-%s-%s" % (container, remote, local) 630 elif local: return "%s-%s" % (container, local) 631 elif remote: return "%s-%s" % (container, remote) 632 else: return "%s-%s" % (container, str(generate_uuid()))
633
634 - def _get_session(self, context):
635 if isinstance(context, Url): 636 return self._get_session(self.connect(url=context)) 637 elif isinstance(context, Session): 638 return context 639 elif isinstance(context, Connection): 640 if hasattr(context, '_session_policy'): 641 return context._session_policy.session(context) 642 else: 643 return _create_session(context) 644 else: 645 return context.session()
646
647 - def create_sender(self, context, target=None, source=None, name=None, handler=None, tags=None, options=None):
648 """ 649 Initiates the establishment of a link over which messages can 650 be sent. Returns an instance of proton.Sender. 651 652 There are two patterns of use. (1) A connection can be passed 653 as the first argument, in which case the link is established 654 on that connection. In this case the target address can be 655 specified as the second argument (or as a keyword 656 argument). The source address can also be specified if 657 desired. (2) Alternatively a URL can be passed as the first 658 argument. In this case a new connection will be establised on 659 which the link will be attached. If a path is specified and 660 the target is not, then the path of the URL is used as the 661 target address. 662 663 The name of the link may be specified if desired, otherwise a 664 unique name will be generated. 665 666 Various LinkOptions can be specified to further control the 667 attachment. 668 """ 669 if isinstance(context, basestring): 670 context = Url(context) 671 if isinstance(context, Url) and not target: 672 target = context.path 673 session = self._get_session(context) 674 snd = session.sender(name or self._get_id(session.connection.container, target, source)) 675 if source: 676 snd.source.address = source 677 if target: 678 snd.target.address = target 679 if handler: 680 snd.handler = handler 681 if tags: 682 snd.tag_generator = tags 683 _apply_link_options(options, snd) 684 snd.open() 685 return snd
686
687 - def create_receiver(self, context, source=None, target=None, name=None, dynamic=False, handler=None, options=None):
688 """ 689 Initiates the establishment of a link over which messages can 690 be received (aka a subscription). Returns an instance of 691 proton.Receiver. 692 693 There are two patterns of use. (1) A connection can be passed 694 as the first argument, in which case the link is established 695 on that connection. In this case the source address can be 696 specified as the second argument (or as a keyword 697 argument). The target address can also be specified if 698 desired. (2) Alternatively a URL can be passed as the first 699 argument. In this case a new connection will be establised on 700 which the link will be attached. If a path is specified and 701 the source is not, then the path of the URL is used as the 702 target address. 703 704 The name of the link may be specified if desired, otherwise a 705 unique name will be generated. 706 707 Various LinkOptions can be specified to further control the 708 attachment. 709 """ 710 if isinstance(context, basestring): 711 context = Url(context) 712 if isinstance(context, Url) and not source: 713 source = context.path 714 session = self._get_session(context) 715 rcv = session.receiver(name or self._get_id(session.connection.container, source, target)) 716 if source: 717 rcv.source.address = source 718 if dynamic: 719 rcv.source.dynamic = True 720 if target: 721 rcv.target.address = target 722 if handler: 723 rcv.handler = handler 724 _apply_link_options(options, rcv) 725 rcv.open() 726 return rcv
727
728 - def declare_transaction(self, context, handler=None, settle_before_discharge=False):
729 if not _get_attr(context, '_txn_ctrl'): 730 class InternalTransactionHandler(OutgoingMessageHandler): 731 def __init__(self): 732 super(InternalTransactionHandler, self).__init__(auto_settle=True)
733 734 def on_settled(self, event): 735 if hasattr(event.delivery, "transaction"): 736 event.transaction = event.delivery.transaction 737 event.delivery.transaction.handle_outcome(event)
738 context._txn_ctrl = self.create_sender(context, None, name='txn-ctrl', handler=InternalTransactionHandler()) 739 context._txn_ctrl.target.type = Terminus.COORDINATOR 740 context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions')) 741 return Transaction(context._txn_ctrl, handler, settle_before_discharge) 742
743 - def listen(self, url, ssl_domain=None):
744 """ 745 Initiates a server socket, accepting incoming AMQP connections 746 on the interface and port specified. 747 """ 748 url = Url(url) 749 ssl_config = ssl_domain 750 if not ssl_config and url.scheme == 'amqps': 751 ssl_config = self.ssl_domain 752 return self.acceptor(url.host, url.port)
753
754 - def do_work(self, timeout=None):
755 if timeout: 756 self.timeout = timeout 757 return self.process()
758