1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 import logging, os, Queue, socket, time, types
20 from heapq import heappush, heappop, nsmallest
21 from proton import Collector, Connection, ConnectionException, Delivery, Described, dispatch
22 from proton import Endpoint, Event, EventBase, EventType, generate_uuid, Handler, Link, Message
23 from proton import ProtonException, PN_ACCEPTED, PN_PYREF, SASL, Session, SSL, SSLDomain, SSLUnavailable, symbol
24 from proton import Terminus, Timeout, Transport, TransportException, ulong, Url
25 from select import select
26 from proton.handlers import OutgoingMessageHandler
27 from proton import unicode2utf8, utf82unicode
28
29 import traceback
30 from proton import WrappedHandler, _chandler, secs2millis, millis2secs, timeout2millis, millis2timeout, Selectable
31 from wrapper import Wrapper, PYCTX
32 from cproton import *
33
34 -class Task(Wrapper):
35
36 @staticmethod
38 if impl is None:
39 return None
40 else:
41 return Task(impl)
42
45
48
50
53
55 pn_acceptor_close(self._impl)
56
58
59 @staticmethod
61 if impl is None:
62 return None
63 else:
64 record = pn_reactor_attachments(impl)
65 attrs = pn_void2py(pn_record_get(record, PYCTX))
66 if attrs and 'subclass' in attrs:
67 return attrs['subclass'](impl=impl)
68 else:
69 return Reactor(impl=impl)
70
71 - def __init__(self, *handlers, **kwargs):
75
78
80 self.errors.append(info)
81 self.yield_()
82
84 return WrappedHandler.wrap(pn_reactor_get_global_handler(self._impl), self.on_error)
85
87 impl = _chandler(handler, self.on_error)
88 pn_reactor_set_global_handler(self._impl, impl)
89 pn_decref(impl)
90
91 global_handler = property(_get_global, _set_global)
92
94 return millis2timeout(pn_reactor_get_timeout(self._impl))
95
97 return pn_reactor_set_timeout(self._impl, timeout2millis(secs))
98
99 timeout = property(_get_timeout, _set_timeout)
100
102 pn_reactor_yield(self._impl)
103
105 pn_reactor_mark(self._impl)
106
108 return WrappedHandler.wrap(pn_reactor_get_handler(self._impl), self.on_error)
109
111 impl = _chandler(handler, self.on_error)
112 pn_reactor_set_handler(self._impl, impl)
113 pn_decref(impl)
114
115 handler = property(_get_handler, _set_handler)
116
122
124 n = pn_reactor_wakeup(self._impl)
125 if n: raise IOError(pn_error_text(pn_io_error(pn_reactor_io(self._impl))))
126
128 pn_reactor_start(self._impl)
129
130 @property
132 return pn_reactor_quiesced(self._impl)
133
135 if self.errors:
136 for exc, value, tb in self.errors[:-1]:
137 traceback.print_exception(exc, value, tb)
138 exc, value, tb = self.errors[-1]
139 raise exc, value, tb
140
142 result = pn_reactor_process(self._impl)
143 self._check_errors()
144 return result
145
147 pn_reactor_stop(self._impl)
148 self._check_errors()
149 self.global_handler = None
150 self.handler = None
151
153 impl = _chandler(task, self.on_error)
154 task = Task.wrap(pn_reactor_schedule(self._impl, secs2millis(delay), impl))
155 pn_decref(impl)
156 return task
157
158 - def acceptor(self, host, port, handler=None):
159 impl = _chandler(handler, self.on_error)
160 aimpl = pn_reactor_acceptor(self._impl, unicode2utf8(host), str(port), impl)
161 pn_decref(impl)
162 if aimpl:
163 return Acceptor(aimpl)
164 else:
165 raise IOError("%s (%s:%s)" % (pn_error_text(pn_io_error(pn_reactor_io(self._impl))), host, port))
166
168 impl = _chandler(handler, self.on_error)
169 result = Connection.wrap(pn_reactor_connection(self._impl, impl))
170 pn_decref(impl)
171 return result
172
174 impl = _chandler(handler, self.on_error)
175 result = Selectable.wrap(pn_reactor_selectable(self._impl))
176 if impl:
177 record = pn_selectable_attachments(result._impl)
178 pn_record_set_handler(record, impl)
179 pn_decref(impl)
180 return result
181
183 pn_reactor_update(self._impl, sel._impl)
184
186 pn_collector_put(pn_reactor_collector(self._impl), PN_PYREF, pn_py2void(obj), etype.number)
187
188 from proton import wrappers as _wrappers
189 _wrappers["pn_reactor"] = lambda x: Reactor.wrap(pn_cast_pn_reactor(x))
190 _wrappers["pn_task"] = lambda x: Task.wrap(pn_cast_pn_task(x))
194 """
195 Can be added to a reactor to allow events to be triggered by an
196 external thread but handled on the event thread associated with
197 the reactor. An instance of this class can be passed to the
198 Reactor.selectable() method of the reactor in order to activate
199 it. The close() method should be called when it is no longer
200 needed, to allow the event loop to end if needed.
201 """
203 self.queue = Queue.Queue()
204 self.pipe = os.pipe()
205 self._closed = False
206
208 """
209 Request that the given event be dispatched on the event thread
210 of the reactor to which this EventInjector was added.
211 """
212 self.queue.put(event)
213 os.write(self.pipe[1], "!")
214
216 """
217 Request that this EventInjector be closed. Existing events
218 will be dispctahed on the reactors event dispactch thread,
219 then this will be removed from the set of interest.
220 """
221 self._closed = True
222 os.write(self.pipe[1], "!")
223
226
232
242
245 """
246 Application defined event, which can optionally be associated with
247 an engine object and or an arbitrary subject
248 """
249 - def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None):
262
266
268 """
269 Class to track state of an AMQP 1.0 transaction.
270 """
271 - def __init__(self, txn_ctrl, handler, settle_before_discharge=False):
272 self.txn_ctrl = txn_ctrl
273 self.handler = handler
274 self.id = None
275 self._declare = None
276 self._discharge = None
277 self.failed = False
278 self._pending = []
279 self.settle_before_discharge = settle_before_discharge
280 self.declare()
281
284
287
289 self._declare = self._send_ctrl(symbol(u'amqp:declare:list'), [None])
290
294
299
300 - def send(self, sender, msg, tag=None):
305
312
313 - def update(self, delivery, state=None):
317
323
326
349
351 """
352 Abstract interface for link configuration options
353 """
355 """
356 Subclasses will implement any configuration logic in this
357 method
358 """
359 pass
360 - def test(self, link):
361 """
362 Subclasses can override this to selectively apply an option
363 e.g. based on some link criteria
364 """
365 return True
366
370
375
377 - def apply(self, sender): pass
379
381 - def apply(self, receiver): pass
383
398
401 self.filter_set = filter_set
402
403 - def apply(self, receiver):
405
407 """
408 Configures a link with a message selector filter
409 """
410 - def __init__(self, value, name='selector'):
412
413 -class Move(ReceiverOption):
414 - def apply(self, receiver):
416
417 -class Copy(ReceiverOption):
418 - def apply(self, receiver):
420
428
433
440
443 self._default_session = None
444
446 if not self._default_session:
447 self._default_session = _create_session(connection)
448 self._default_session.context = self
449 return self._default_session
450
454
456 """
457 Internal handler that triggers the necessary socket connect for an
458 opened connection.
459 """
462
464 if not self._override(event):
465 event.dispatch(self.base)
466
468 conn = event.connection
469 return conn and hasattr(conn, '_overrides') and event.dispatch(conn._overrides)
470
472 """
473 Internal handler that triggers the necessary socket connect for an
474 opened connection.
475 """
477 self.connection = connection
478 self.address = None
479 self.heartbeat = None
480 self.reconnect = None
481 self.ssl_domain = None
482
502
505
511
514
529
532
535
537 """
538 A reconnect strategy involving an increasing delay between
539 retries, up to a maximum or 10 seconds.
540 """
543
546
554
557 self.values = [Url(v) for v in values]
558 self.i = iter(self.values)
559
562
564 try:
565 return self.i.next()
566 except StopIteration:
567 self.i = iter(self.values)
568 return self.i.next()
569
582
585 """A representation of the AMQP concept of a 'container', which
586 lossely speaking is something that establishes links to or from
587 another container, over which messages are transfered. This is
588 an extension to the Reactor class that adds convenience methods
589 for creating connections and sender- or receiver- links.
590 """
591 - def __init__(self, *handlers, **kwargs):
602
603 - def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None):
604 """
605 Initiates the establishment of an AMQP connection. Returns an
606 instance of proton.Connection.
607 """
608 conn = self.connection(handler)
609 conn.container = self.container_id or str(generate_uuid())
610
611 connector = Connector(conn)
612 conn._overrides = connector
613 if url: connector.address = Urls([url])
614 elif urls: connector.address = Urls(urls)
615 elif address: connector.address = address
616 else: raise ValueError("One of url, urls or address required")
617 if heartbeat:
618 connector.heartbeat = heartbeat
619 if reconnect:
620 connector.reconnect = reconnect
621 elif reconnect is None:
622 connector.reconnect = Backoff()
623 connector.ssl_domain = ssl_domain or (self.ssl and self.ssl.client)
624 conn._session_policy = SessionPerConnection()
625 conn.open()
626 return conn
627
628 - def _get_id(self, container, remote, local):
629 if local and remote: "%s-%s-%s" % (container, remote, local)
630 elif local: return "%s-%s" % (container, local)
631 elif remote: return "%s-%s" % (container, remote)
632 else: return "%s-%s" % (container, str(generate_uuid()))
633
646
647 - def create_sender(self, context, target=None, source=None, name=None, handler=None, tags=None, options=None):
648 """
649 Initiates the establishment of a link over which messages can
650 be sent. Returns an instance of proton.Sender.
651
652 There are two patterns of use. (1) A connection can be passed
653 as the first argument, in which case the link is established
654 on that connection. In this case the target address can be
655 specified as the second argument (or as a keyword
656 argument). The source address can also be specified if
657 desired. (2) Alternatively a URL can be passed as the first
658 argument. In this case a new connection will be establised on
659 which the link will be attached. If a path is specified and
660 the target is not, then the path of the URL is used as the
661 target address.
662
663 The name of the link may be specified if desired, otherwise a
664 unique name will be generated.
665
666 Various LinkOptions can be specified to further control the
667 attachment.
668 """
669 if isinstance(context, basestring):
670 context = Url(context)
671 if isinstance(context, Url) and not target:
672 target = context.path
673 session = self._get_session(context)
674 snd = session.sender(name or self._get_id(session.connection.container, target, source))
675 if source:
676 snd.source.address = source
677 if target:
678 snd.target.address = target
679 if handler:
680 snd.handler = handler
681 if tags:
682 snd.tag_generator = tags
683 _apply_link_options(options, snd)
684 snd.open()
685 return snd
686
687 - def create_receiver(self, context, source=None, target=None, name=None, dynamic=False, handler=None, options=None):
688 """
689 Initiates the establishment of a link over which messages can
690 be received (aka a subscription). Returns an instance of
691 proton.Receiver.
692
693 There are two patterns of use. (1) A connection can be passed
694 as the first argument, in which case the link is established
695 on that connection. In this case the source address can be
696 specified as the second argument (or as a keyword
697 argument). The target address can also be specified if
698 desired. (2) Alternatively a URL can be passed as the first
699 argument. In this case a new connection will be establised on
700 which the link will be attached. If a path is specified and
701 the source is not, then the path of the URL is used as the
702 target address.
703
704 The name of the link may be specified if desired, otherwise a
705 unique name will be generated.
706
707 Various LinkOptions can be specified to further control the
708 attachment.
709 """
710 if isinstance(context, basestring):
711 context = Url(context)
712 if isinstance(context, Url) and not source:
713 source = context.path
714 session = self._get_session(context)
715 rcv = session.receiver(name or self._get_id(session.connection.container, source, target))
716 if source:
717 rcv.source.address = source
718 if dynamic:
719 rcv.source.dynamic = True
720 if target:
721 rcv.target.address = target
722 if handler:
723 rcv.handler = handler
724 _apply_link_options(options, rcv)
725 rcv.open()
726 return rcv
727
729 if not _get_attr(context, '_txn_ctrl'):
730 class InternalTransactionHandler(OutgoingMessageHandler):
731 def __init__(self):
732 super(InternalTransactionHandler, self).__init__(auto_settle=True)
733
734 def on_settled(self, event):
735 if hasattr(event.delivery, "transaction"):
736 event.transaction = event.delivery.transaction
737 event.delivery.transaction.handle_outcome(event)
738 context._txn_ctrl = self.create_sender(context, None, name='txn-ctrl', handler=InternalTransactionHandler())
739 context._txn_ctrl.target.type = Terminus.COORDINATOR
740 context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions'))
741 return Transaction(context._txn_ctrl, handler, settle_before_discharge)
742
743 - def listen(self, url, ssl_domain=None):
744 """
745 Initiates a server socket, accepting incoming AMQP connections
746 on the interface and port specified.
747 """
748 url = Url(url)
749 ssl_config = ssl_domain
750 if not ssl_config and url.scheme == 'amqps':
751 ssl_config = self.ssl_domain
752 return self.acceptor(url.host, url.port)
753
758