1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 """
21 The proton module defines a suite of APIs that implement the AMQP 1.0
22 protocol.
23
24 The proton APIs consist of the following classes:
25
26 - L{Messenger} -- A messaging endpoint.
27 - L{Message} -- A class for creating and/or accessing AMQP message content.
28 - L{Data} -- A class for creating and/or accessing arbitrary AMQP encoded
29 data.
30
31 """
32 from __future__ import absolute_import
33
34 from cproton import *
35 from .wrapper import Wrapper
36 from proton import _compat
37
38 import logging, weakref, socket, sys, threading
39
40 try:
41 handler = logging.NullHandler()
42 except AttributeError:
46
47 - def emit(self, record):
49
52
53 handler = NullHandler()
54
55 log = logging.getLogger("proton")
56 log.addHandler(handler)
57
58 try:
59 import uuid
63
64 except ImportError:
65 """
66 No 'native' UUID support. Provide a very basic UUID type that is a compatible subset of the uuid type provided by more modern python releases.
67 """
68 import struct
71 - def __init__(self, hex=None, bytes=None):
72 if [hex, bytes].count(None) != 1:
73 raise TypeError("need one of hex or bytes")
74 if bytes is not None:
75 self.bytes = bytes
76 elif hex is not None:
77 fields=hex.split("-")
78 fields[4:5] = [fields[4][:4], fields[4][4:]]
79 self.bytes = struct.pack("!LHHHHL", *[int(x,16) for x in fields])
80
82 if isinstance(other, uuid.UUID):
83 return cmp(self.bytes, other.bytes)
84 else:
85 return -1
86
88 return "%08x-%04x-%04x-%04x-%04x%08x" % struct.unpack("!LHHHHL", self.bytes)
89
91 return "UUID(%r)" % str(self)
92
95
96 import os, random, time
97 rand = random.Random()
98 rand.seed((os.getpid(), time.time(), socket.gethostname()))
100 data = [rand.randint(0, 255) for i in xrange(16)]
101
102
103 data[6] &= 0x0F
104 data[6] |= 0x40
105
106
107 data[8] &= 0x3F
108 data[8] |= 0x80
109 return "".join(map(chr, data))
110
112 return uuid.UUID(bytes=random_uuid())
113
116
117
118
119
120 try:
121 bytes()
122 except NameError:
123 bytes = str
124 try:
125 long()
126 except NameError:
127 long = int
128 try:
129 unicode()
130 except NameError:
131 unicode = str
132
133
134 VERSION_MAJOR = PN_VERSION_MAJOR
135 VERSION_MINOR = PN_VERSION_MINOR
136 VERSION_POINT = PN_VERSION_POINT
137 VERSION = (VERSION_MAJOR, VERSION_MINOR, VERSION_POINT)
138 API_LANGUAGE = "C"
139 IMPLEMENTATION_LANGUAGE = "C"
148
150 """
151 The root of the proton exception hierarchy. All proton exception
152 classes derive from this exception.
153 """
154 pass
155
157 """
158 A timeout exception indicates that a blocking operation has timed
159 out.
160 """
161 pass
162
164 """
165 An interrupt exception indicates that a blocking operation was interrupted.
166 """
167 pass
168
170 """
171 The root of the messenger exception hierarchy. All exceptions
172 generated by the messenger class derive from this exception.
173 """
174 pass
175
177 """
178 The MessageException class is the root of the message exception
179 hierarchy. All exceptions generated by the Message class derive from
180 this exception.
181 """
182 pass
183
184 EXCEPTIONS = {
185 PN_TIMEOUT: Timeout,
186 PN_INTR: Interrupt
187 }
188
189 PENDING = Constant("PENDING")
190 ACCEPTED = Constant("ACCEPTED")
191 REJECTED = Constant("REJECTED")
192 RELEASED = Constant("RELEASED")
193 MODIFIED = Constant("MODIFIED")
194 ABORTED = Constant("ABORTED")
195 SETTLED = Constant("SETTLED")
196
197 STATUSES = {
198 PN_STATUS_ABORTED: ABORTED,
199 PN_STATUS_ACCEPTED: ACCEPTED,
200 PN_STATUS_REJECTED: REJECTED,
201 PN_STATUS_RELEASED: RELEASED,
202 PN_STATUS_MODIFIED: MODIFIED,
203 PN_STATUS_PENDING: PENDING,
204 PN_STATUS_SETTLED: SETTLED,
205 PN_STATUS_UNKNOWN: None
206 }
207
208 AUTOMATIC = Constant("AUTOMATIC")
209 MANUAL = Constant("MANUAL")
212 """
213 The L{Messenger} class defines a high level interface for sending
214 and receiving L{Messages<Message>}. Every L{Messenger} contains a
215 single logical queue of incoming messages and a single logical queue
216 of outgoing messages. These messages in these queues may be destined
217 for, or originate from, a variety of addresses.
218
219 The messenger interface is single-threaded. All methods
220 except one (L{interrupt}) are intended to be used from within
221 the messenger thread.
222
223
224 Address Syntax
225 ==============
226
227 An address has the following form::
228
229 [ amqp[s]:// ] [user[:password]@] domain [/[name]]
230
231 Where domain can be one of::
232
233 host | host:port | ip | ip:port | name
234
235 The following are valid examples of addresses:
236
237 - example.org
238 - example.org:1234
239 - amqp://example.org
240 - amqps://example.org
241 - example.org/incoming
242 - amqps://example.org/outgoing
243 - amqps://fred:trustno1@example.org
244 - 127.0.0.1:1234
245 - amqps://127.0.0.1:1234
246
247 Sending & Receiving Messages
248 ============================
249
250 The L{Messenger} class works in conjunction with the L{Message} class. The
251 L{Message} class is a mutable holder of message content.
252
253 The L{put} method copies its L{Message} to the outgoing queue, and may
254 send queued messages if it can do so without blocking. The L{send}
255 method blocks until it has sent the requested number of messages,
256 or until a timeout interrupts the attempt.
257
258
259 >>> message = Message()
260 >>> for i in range(3):
261 ... message.address = "amqp://host/queue"
262 ... message.subject = "Hello World %i" % i
263 ... messenger.put(message)
264 >>> messenger.send()
265
266 Similarly, the L{recv} method receives messages into the incoming
267 queue, and may block as it attempts to receive the requested number
268 of messages, or until timeout is reached. It may receive fewer
269 than the requested number. The L{get} method pops the
270 eldest L{Message} off the incoming queue and copies it into the L{Message}
271 object that you supply. It will not block.
272
273
274 >>> message = Message()
275 >>> messenger.recv(10):
276 >>> while messenger.incoming > 0:
277 ... messenger.get(message)
278 ... print message.subject
279 Hello World 0
280 Hello World 1
281 Hello World 2
282
283 The blocking flag allows you to turn off blocking behavior entirely,
284 in which case L{send} and L{recv} will do whatever they can without
285 blocking, and then return. You can then look at the number
286 of incoming and outgoing messages to see how much outstanding work
287 still remains.
288 """
289
291 """
292 Construct a new L{Messenger} with the given name. The name has
293 global scope. If a NULL name is supplied, a UUID based name will
294 be chosen.
295
296 @type name: string
297 @param name: the name of the messenger or None
298
299 """
300 self._mng = pn_messenger(name)
301 self._selectables = {}
302
304 """
305 Destroy the L{Messenger}. This will close all connections that
306 are managed by the L{Messenger}. Call the L{stop} method before
307 destroying the L{Messenger}.
308 """
309 if hasattr(self, "_mng"):
310 pn_messenger_free(self._mng)
311 del self._mng
312
314 if err < 0:
315 if (err == PN_INPROGRESS):
316 return
317 exc = EXCEPTIONS.get(err, MessengerException)
318 raise exc("[%s]: %s" % (err, pn_error_text(pn_messenger_error(self._mng))))
319 else:
320 return err
321
322 @property
324 """
325 The name of the L{Messenger}.
326 """
327 return pn_messenger_name(self._mng)
328
330 return pn_messenger_get_certificate(self._mng)
331
333 self._check(pn_messenger_set_certificate(self._mng, value))
334
335 certificate = property(_get_certificate, _set_certificate,
336 doc="""
337 Path to a certificate file for the L{Messenger}. This certificate is
338 used when the L{Messenger} accepts or establishes SSL/TLS connections.
339 This property must be specified for the L{Messenger} to accept
340 incoming SSL/TLS connections and to establish client authenticated
341 outgoing SSL/TLS connection. Non client authenticated outgoing SSL/TLS
342 connections do not require this property.
343 """)
344
346 return pn_messenger_get_private_key(self._mng)
347
349 self._check(pn_messenger_set_private_key(self._mng, value))
350
351 private_key = property(_get_private_key, _set_private_key,
352 doc="""
353 Path to a private key file for the L{Messenger's<Messenger>}
354 certificate. This property must be specified for the L{Messenger} to
355 accept incoming SSL/TLS connections and to establish client
356 authenticated outgoing SSL/TLS connection. Non client authenticated
357 SSL/TLS connections do not require this property.
358 """)
359
361 return pn_messenger_get_password(self._mng)
362
364 self._check(pn_messenger_set_password(self._mng, value))
365
366 password = property(_get_password, _set_password,
367 doc="""
368 This property contains the password for the L{Messenger.private_key}
369 file, or None if the file is not encrypted.
370 """)
371
373 return pn_messenger_get_trusted_certificates(self._mng)
374
376 self._check(pn_messenger_set_trusted_certificates(self._mng, value))
377
378 trusted_certificates = property(_get_trusted_certificates,
379 _set_trusted_certificates,
380 doc="""
381 A path to a database of trusted certificates for use in verifying the
382 peer on an SSL/TLS connection. If this property is None, then the peer
383 will not be verified.
384 """)
385
387 t = pn_messenger_get_timeout(self._mng)
388 if t == -1:
389 return None
390 else:
391 return millis2secs(t)
392
394 if value is None:
395 t = -1
396 else:
397 t = secs2millis(value)
398 self._check(pn_messenger_set_timeout(self._mng, t))
399
400 timeout = property(_get_timeout, _set_timeout,
401 doc="""
402 The timeout property contains the default timeout for blocking
403 operations performed by the L{Messenger}.
404 """)
405
407 return pn_messenger_is_blocking(self._mng)
408
410 self._check(pn_messenger_set_blocking(self._mng, b))
411
412 blocking = property(_is_blocking, _set_blocking,
413 doc="""
414 Enable or disable blocking behavior during L{Message} sending
415 and receiving. This affects every blocking call, with the
416 exception of L{work}. Currently, the affected calls are
417 L{send}, L{recv}, and L{stop}.
418 """)
419
421 return pn_messenger_is_passive(self._mng)
422
424 self._check(pn_messenger_set_passive(self._mng, b))
425
426 passive = property(_is_passive, _set_passive,
427 doc="""
428 When passive is set to true, Messenger will not attempt to perform I/O
429 internally. In this mode it is necessary to use the selectables API to
430 drive any I/O needed to perform requested actions. In this mode
431 Messenger will never block.
432 """)
433
435 return pn_messenger_get_incoming_window(self._mng)
436
438 self._check(pn_messenger_set_incoming_window(self._mng, window))
439
440 incoming_window = property(_get_incoming_window, _set_incoming_window,
441 doc="""
442 The incoming tracking window for the messenger. The messenger will
443 track the remote status of this many incoming deliveries after they
444 have been accepted or rejected. Defaults to zero.
445
446 L{Messages<Message>} enter this window only when you take them into your application
447 using L{get}. If your incoming window size is I{n}, and you get I{n}+1 L{messages<Message>}
448 without explicitly accepting or rejecting the oldest message, then the
449 message that passes beyond the edge of the incoming window will be assigned
450 the default disposition of its link.
451 """)
452
454 return pn_messenger_get_outgoing_window(self._mng)
455
457 self._check(pn_messenger_set_outgoing_window(self._mng, window))
458
459 outgoing_window = property(_get_outgoing_window, _set_outgoing_window,
460 doc="""
461 The outgoing tracking window for the messenger. The messenger will
462 track the remote status of this many outgoing deliveries after calling
463 send. Defaults to zero.
464
465 A L{Message} enters this window when you call the put() method with the
466 message. If your outgoing window size is I{n}, and you call L{put} I{n}+1
467 times, status information will no longer be available for the
468 first message.
469 """)
470
472 """
473 Currently a no-op placeholder.
474 For future compatibility, do not L{send} or L{recv} messages
475 before starting the L{Messenger}.
476 """
477 self._check(pn_messenger_start(self._mng))
478
480 """
481 Transitions the L{Messenger} to an inactive state. An inactive
482 L{Messenger} will not send or receive messages from its internal
483 queues. A L{Messenger} should be stopped before being discarded to
484 ensure a clean shutdown handshake occurs on any internally managed
485 connections.
486 """
487 self._check(pn_messenger_stop(self._mng))
488
489 @property
491 """
492 Returns true iff a L{Messenger} is in the stopped state.
493 This function does not block.
494 """
495 return pn_messenger_stopped(self._mng)
496
498 """
499 Subscribes the L{Messenger} to messages originating from the
500 specified source. The source is an address as specified in the
501 L{Messenger} introduction with the following addition. If the
502 domain portion of the address begins with the '~' character, the
503 L{Messenger} will interpret the domain as host/port, bind to it,
504 and listen for incoming messages. For example "~0.0.0.0",
505 "amqp://~0.0.0.0", and "amqps://~0.0.0.0" will all bind to any
506 local interface and listen for incoming messages with the last
507 variant only permitting incoming SSL connections.
508
509 @type source: string
510 @param source: the source of messages to subscribe to
511 """
512 sub_impl = pn_messenger_subscribe(self._mng, source)
513 if not sub_impl:
514 self._check(pn_error_code(pn_messenger_error(self._mng)))
515 raise MessengerException("Cannot subscribe to %s"%source)
516 return Subscription(sub_impl)
517
518 - def put(self, message):
519 """
520 Places the content contained in the message onto the outgoing
521 queue of the L{Messenger}. This method will never block, however
522 it will send any unblocked L{Messages<Message>} in the outgoing
523 queue immediately and leave any blocked L{Messages<Message>}
524 remaining in the outgoing queue. The L{send} call may be used to
525 block until the outgoing queue is empty. The L{outgoing} property
526 may be used to check the depth of the outgoing queue.
527
528 When the content in a given L{Message} object is copied to the outgoing
529 message queue, you may then modify or discard the L{Message} object
530 without having any impact on the content in the outgoing queue.
531
532 This method returns an outgoing tracker for the L{Message}. The tracker
533 can be used to determine the delivery status of the L{Message}.
534
535 @type message: Message
536 @param message: the message to place in the outgoing queue
537 @return: a tracker
538 """
539 message._pre_encode()
540 self._check(pn_messenger_put(self._mng, message._msg))
541 return pn_messenger_outgoing_tracker(self._mng)
542
544 """
545 Gets the last known remote state of the delivery associated with
546 the given tracker.
547
548 @type tracker: tracker
549 @param tracker: the tracker whose status is to be retrieved
550
551 @return: one of None, PENDING, REJECTED, MODIFIED, or ACCEPTED
552 """
553 disp = pn_messenger_status(self._mng, tracker);
554 return STATUSES.get(disp, disp)
555
557 """
558 Checks if the delivery associated with the given tracker is still
559 waiting to be sent.
560
561 @type tracker: tracker
562 @param tracker: the tracker whose status is to be retrieved
563
564 @return: true if delivery is still buffered
565 """
566 return pn_messenger_buffered(self._mng, tracker);
567
568 - def settle(self, tracker=None):
569 """
570 Frees a L{Messenger} from tracking the status associated with a given
571 tracker. If you don't supply a tracker, all outgoing L{messages<Message>} up
572 to the most recent will be settled.
573 """
574 if tracker is None:
575 tracker = pn_messenger_outgoing_tracker(self._mng)
576 flags = PN_CUMULATIVE
577 else:
578 flags = 0
579 self._check(pn_messenger_settle(self._mng, tracker, flags))
580
581 - def send(self, n=-1):
582 """
583 This call will block until the indicated number of L{messages<Message>}
584 have been sent, or until the operation times out. If n is -1 this call will
585 block until all outgoing L{messages<Message>} have been sent. If n is 0 then
586 this call will send whatever it can without blocking.
587 """
588 self._check(pn_messenger_send(self._mng, n))
589
590 - def recv(self, n=None):
591 """
592 Receives up to I{n} L{messages<Message>} into the incoming queue. If no value
593 for I{n} is supplied, this call will receive as many L{messages<Message>} as it
594 can buffer internally. If the L{Messenger} is in blocking mode, this
595 call will block until at least one L{Message} is available in the
596 incoming queue.
597 """
598 if n is None:
599 n = -1
600 self._check(pn_messenger_recv(self._mng, n))
601
602 - def work(self, timeout=None):
603 """
604 Sends or receives any outstanding L{messages<Message>} queued for a L{Messenger}.
605 This will block for the indicated timeout.
606 This method may also do I/O work other than sending and receiving
607 L{messages<Message>}. For example, closing connections after messenger.L{stop}()
608 has been called.
609 """
610 if timeout is None:
611 t = -1
612 else:
613 t = secs2millis(timeout)
614 err = pn_messenger_work(self._mng, t)
615 if (err == PN_TIMEOUT):
616 return False
617 else:
618 self._check(err)
619 return True
620
621 @property
623 return pn_messenger_receiving(self._mng)
624
626 """
627 The L{Messenger} interface is single-threaded.
628 This is the only L{Messenger} function intended to be called
629 from outside of the L{Messenger} thread.
630 Call this from a non-messenger thread to interrupt
631 a L{Messenger} that is blocking.
632 This will cause any in-progress blocking call to throw
633 the L{Interrupt} exception. If there is no currently blocking
634 call, then the next blocking call will be affected, even if it
635 is within the same thread that interrupt was called from.
636 """
637 self._check(pn_messenger_interrupt(self._mng))
638
639 - def get(self, message=None):
640 """
641 Moves the message from the head of the incoming message queue into
642 the supplied message object. Any content in the message will be
643 overwritten.
644
645 A tracker for the incoming L{Message} is returned. The tracker can
646 later be used to communicate your acceptance or rejection of the
647 L{Message}.
648
649 If None is passed in for the L{Message} object, the L{Message}
650 popped from the head of the queue is discarded.
651
652 @type message: Message
653 @param message: the destination message object
654 @return: a tracker
655 """
656 if message is None:
657 impl = None
658 else:
659 impl = message._msg
660 self._check(pn_messenger_get(self._mng, impl))
661 if message is not None:
662 message._post_decode()
663 return pn_messenger_incoming_tracker(self._mng)
664
665 - def accept(self, tracker=None):
666 """
667 Signal the sender that you have acted on the L{Message}
668 pointed to by the tracker. If no tracker is supplied,
669 then all messages that have been returned by the L{get}
670 method are accepted, except those that have already been
671 auto-settled by passing beyond your incoming window size.
672
673 @type tracker: tracker
674 @param tracker: a tracker as returned by get
675 """
676 if tracker is None:
677 tracker = pn_messenger_incoming_tracker(self._mng)
678 flags = PN_CUMULATIVE
679 else:
680 flags = 0
681 self._check(pn_messenger_accept(self._mng, tracker, flags))
682
683 - def reject(self, tracker=None):
684 """
685 Rejects the L{Message} indicated by the tracker. If no tracker
686 is supplied, all messages that have been returned by the L{get}
687 method are rejected, except those that have already been auto-settled
688 by passing beyond your outgoing window size.
689
690 @type tracker: tracker
691 @param tracker: a tracker as returned by get
692 """
693 if tracker is None:
694 tracker = pn_messenger_incoming_tracker(self._mng)
695 flags = PN_CUMULATIVE
696 else:
697 flags = 0
698 self._check(pn_messenger_reject(self._mng, tracker, flags))
699
700 @property
702 """
703 The outgoing queue depth.
704 """
705 return pn_messenger_outgoing(self._mng)
706
707 @property
709 """
710 The incoming queue depth.
711 """
712 return pn_messenger_incoming(self._mng)
713
714 - def route(self, pattern, address):
715 """
716 Adds a routing rule to a L{Messenger's<Messenger>} internal routing table.
717
718 The route procedure may be used to influence how a L{Messenger} will
719 internally treat a given address or class of addresses. Every call
720 to the route procedure will result in L{Messenger} appending a routing
721 rule to its internal routing table.
722
723 Whenever a L{Message} is presented to a L{Messenger} for delivery, it
724 will match the address of this message against the set of routing
725 rules in order. The first rule to match will be triggered, and
726 instead of routing based on the address presented in the message,
727 the L{Messenger} will route based on the address supplied in the rule.
728
729 The pattern matching syntax supports two types of matches, a '%'
730 will match any character except a '/', and a '*' will match any
731 character including a '/'.
732
733 A routing address is specified as a normal AMQP address, however it
734 may additionally use substitution variables from the pattern match
735 that triggered the rule.
736
737 Any message sent to "foo" will be routed to "amqp://foo.com":
738
739 >>> messenger.route("foo", "amqp://foo.com");
740
741 Any message sent to "foobar" will be routed to
742 "amqp://foo.com/bar":
743
744 >>> messenger.route("foobar", "amqp://foo.com/bar");
745
746 Any message sent to bar/<path> will be routed to the corresponding
747 path within the amqp://bar.com domain:
748
749 >>> messenger.route("bar/*", "amqp://bar.com/$1");
750
751 Route all L{messages<Message>} over TLS:
752
753 >>> messenger.route("amqp:*", "amqps:$1")
754
755 Supply credentials for foo.com:
756
757 >>> messenger.route("amqp://foo.com/*", "amqp://user:password@foo.com/$1");
758
759 Supply credentials for all domains:
760
761 >>> messenger.route("amqp://*", "amqp://user:password@$1");
762
763 Route all addresses through a single proxy while preserving the
764 original destination:
765
766 >>> messenger.route("amqp://%/*", "amqp://user:password@proxy/$1/$2");
767
768 Route any address through a single broker:
769
770 >>> messenger.route("*", "amqp://user:password@broker/$1");
771 """
772 self._check(pn_messenger_route(self._mng, unicode2utf8(pattern), unicode2utf8(address)))
773
774 - def rewrite(self, pattern, address):
775 """
776 Similar to route(), except that the destination of
777 the L{Message} is determined before the message address is rewritten.
778
779 The outgoing address is only rewritten after routing has been
780 finalized. If a message has an outgoing address of
781 "amqp://0.0.0.0:5678", and a rewriting rule that changes its
782 outgoing address to "foo", it will still arrive at the peer that
783 is listening on "amqp://0.0.0.0:5678", but when it arrives there,
784 the receiver will see its outgoing address as "foo".
785
786 The default rewrite rule removes username and password from addresses
787 before they are transmitted.
788 """
789 self._check(pn_messenger_rewrite(self._mng, unicode2utf8(pattern), unicode2utf8(address)))
790
792 return Selectable.wrap(pn_messenger_selectable(self._mng))
793
794 @property
796 tstamp = pn_messenger_deadline(self._mng)
797 if tstamp:
798 return millis2secs(tstamp)
799 else:
800 return None
801
803 """The L{Message} class is a mutable holder of message content.
804
805 @ivar instructions: delivery instructions for the message
806 @type instructions: dict
807 @ivar annotations: infrastructure defined message annotations
808 @type annotations: dict
809 @ivar properties: application defined message properties
810 @type properties: dict
811 @ivar body: message body
812 @type body: bytes | unicode | dict | list | int | long | float | UUID
813 """
814
815 DEFAULT_PRIORITY = PN_DEFAULT_PRIORITY
816
817 - def __init__(self, body=None, **kwargs):
818 """
819 @param kwargs: Message property name/value pairs to initialise the Message
820 """
821 self._msg = pn_message()
822 self._id = Data(pn_message_id(self._msg))
823 self._correlation_id = Data(pn_message_correlation_id(self._msg))
824 self.instructions = None
825 self.annotations = None
826 self.properties = None
827 self.body = body
828 for k,v in _compat.iteritems(kwargs):
829 getattr(self, k)
830 setattr(self, k, v)
831
833 if hasattr(self, "_msg"):
834 pn_message_free(self._msg)
835 del self._msg
836
838 if err < 0:
839 exc = EXCEPTIONS.get(err, MessageException)
840 raise exc("[%s]: %s" % (err, pn_error_text(pn_message_error(self._msg))))
841 else:
842 return err
843
862
863 - def _post_decode(self):
864 inst = Data(pn_message_instructions(self._msg))
865 ann = Data(pn_message_annotations(self._msg))
866 props = Data(pn_message_properties(self._msg))
867 body = Data(pn_message_body(self._msg))
868
869 if inst.next():
870 self.instructions = inst.get_object()
871 else:
872 self.instructions = None
873 if ann.next():
874 self.annotations = ann.get_object()
875 else:
876 self.annotations = None
877 if props.next():
878 self.properties = props.get_object()
879 else:
880 self.properties = None
881 if body.next():
882 self.body = body.get_object()
883 else:
884 self.body = None
885
887 """
888 Clears the contents of the L{Message}. All fields will be reset to
889 their default values.
890 """
891 pn_message_clear(self._msg)
892 self.instructions = None
893 self.annotations = None
894 self.properties = None
895 self.body = None
896
898 return pn_message_is_inferred(self._msg)
899
901 self._check(pn_message_set_inferred(self._msg, bool(value)))
902
903 inferred = property(_is_inferred, _set_inferred, doc="""
904 The inferred flag for a message indicates how the message content
905 is encoded into AMQP sections. If inferred is true then binary and
906 list values in the body of the message will be encoded as AMQP DATA
907 and AMQP SEQUENCE sections, respectively. If inferred is false,
908 then all values in the body of the message will be encoded as AMQP
909 VALUE sections regardless of their type.
910 """)
911
913 return pn_message_is_durable(self._msg)
914
916 self._check(pn_message_set_durable(self._msg, bool(value)))
917
918 durable = property(_is_durable, _set_durable,
919 doc="""
920 The durable property indicates that the message should be held durably
921 by any intermediaries taking responsibility for the message.
922 """)
923
925 return pn_message_get_priority(self._msg)
926
928 self._check(pn_message_set_priority(self._msg, value))
929
930 priority = property(_get_priority, _set_priority,
931 doc="""
932 The priority of the message.
933 """)
934
936 return millis2secs(pn_message_get_ttl(self._msg))
937
939 self._check(pn_message_set_ttl(self._msg, secs2millis(value)))
940
941 ttl = property(_get_ttl, _set_ttl,
942 doc="""
943 The time to live of the message measured in seconds. Expired messages
944 may be dropped.
945 """)
946
948 return pn_message_is_first_acquirer(self._msg)
949
951 self._check(pn_message_set_first_acquirer(self._msg, bool(value)))
952
953 first_acquirer = property(_is_first_acquirer, _set_first_acquirer,
954 doc="""
955 True iff the recipient is the first to acquire the message.
956 """)
957
959 return pn_message_get_delivery_count(self._msg)
960
962 self._check(pn_message_set_delivery_count(self._msg, value))
963
964 delivery_count = property(_get_delivery_count, _set_delivery_count,
965 doc="""
966 The number of delivery attempts made for this message.
967 """)
968
969
977 id = property(_get_id, _set_id,
978 doc="""
979 The id of the message.
980 """)
981
983 return pn_message_get_user_id(self._msg)
984
986 self._check(pn_message_set_user_id(self._msg, value))
987
988 user_id = property(_get_user_id, _set_user_id,
989 doc="""
990 The user id of the message creator.
991 """)
992
994 return utf82unicode(pn_message_get_address(self._msg))
995
997 self._check(pn_message_set_address(self._msg, unicode2utf8(value)))
998
999 address = property(_get_address, _set_address,
1000 doc="""
1001 The address of the message.
1002 """)
1003
1005 return utf82unicode(pn_message_get_subject(self._msg))
1006
1008 self._check(pn_message_set_subject(self._msg, unicode2utf8(value)))
1009
1010 subject = property(_get_subject, _set_subject,
1011 doc="""
1012 The subject of the message.
1013 """)
1014
1016 return utf82unicode(pn_message_get_reply_to(self._msg))
1017
1019 self._check(pn_message_set_reply_to(self._msg, unicode2utf8(value)))
1020
1021 reply_to = property(_get_reply_to, _set_reply_to,
1022 doc="""
1023 The reply-to address for the message.
1024 """)
1025
1033
1034 correlation_id = property(_get_correlation_id, _set_correlation_id,
1035 doc="""
1036 The correlation-id for the message.
1037 """)
1038
1040 return symbol(utf82unicode(pn_message_get_content_type(self._msg)))
1041
1042 - def _set_content_type(self, value):
1043 self._check(pn_message_set_content_type(self._msg, unicode2utf8(value)))
1044
1045 content_type = property(_get_content_type, _set_content_type,
1046 doc="""
1047 The content-type of the message.
1048 """)
1049
1051 return symbol(utf82unicode(pn_message_get_content_encoding(self._msg)))
1052
1053 - def _set_content_encoding(self, value):
1054 self._check(pn_message_set_content_encoding(self._msg, unicode2utf8(value)))
1055
1056 content_encoding = property(_get_content_encoding, _set_content_encoding,
1057 doc="""
1058 The content-encoding of the message.
1059 """)
1060
1062 return millis2secs(pn_message_get_expiry_time(self._msg))
1063
1065 self._check(pn_message_set_expiry_time(self._msg, secs2millis(value)))
1066
1067 expiry_time = property(_get_expiry_time, _set_expiry_time,
1068 doc="""
1069 The expiry time of the message.
1070 """)
1071
1073 return millis2secs(pn_message_get_creation_time(self._msg))
1074
1076 self._check(pn_message_set_creation_time(self._msg, secs2millis(value)))
1077
1078 creation_time = property(_get_creation_time, _set_creation_time,
1079 doc="""
1080 The creation time of the message.
1081 """)
1082
1084 return utf82unicode(pn_message_get_group_id(self._msg))
1085
1087 self._check(pn_message_set_group_id(self._msg, unicode2utf8(value)))
1088
1089 group_id = property(_get_group_id, _set_group_id,
1090 doc="""
1091 The group id of the message.
1092 """)
1093
1095 return pn_message_get_group_sequence(self._msg)
1096
1098 self._check(pn_message_set_group_sequence(self._msg, value))
1099
1100 group_sequence = property(_get_group_sequence, _set_group_sequence,
1101 doc="""
1102 The sequence of the message within its group.
1103 """)
1104
1106 return utf82unicode(pn_message_get_reply_to_group_id(self._msg))
1107
1109 self._check(pn_message_set_reply_to_group_id(self._msg, unicode2utf8(value)))
1110
1111 reply_to_group_id = property(_get_reply_to_group_id, _set_reply_to_group_id,
1112 doc="""
1113 The group-id for any replies.
1114 """)
1115
1117 self._pre_encode()
1118 sz = 16
1119 while True:
1120 err, data = pn_message_encode(self._msg, sz)
1121 if err == PN_OVERFLOW:
1122 sz *= 2
1123 continue
1124 else:
1125 self._check(err)
1126 return data
1127
1129 self._check(pn_message_decode(self._msg, data))
1130 self._post_decode()
1131
1132 - def send(self, sender, tag=None):
1140
1141 - def recv(self, link):
1142 """
1143 Receives and decodes the message content for the current delivery
1144 from the link. Upon success it will return the current delivery
1145 for the link. If there is no current delivery, or if the current
1146 delivery is incomplete, or if the link is not a receiver, it will
1147 return None.
1148
1149 @type link: Link
1150 @param link: the link to receive a message from
1151 @return the delivery associated with the decoded message (or None)
1152
1153 """
1154 if link.is_sender: return None
1155 dlv = link.current
1156 if not dlv or dlv.partial: return None
1157 dlv.encoded = link.recv(dlv.pending)
1158 link.advance()
1159
1160
1161 if link.remote_snd_settle_mode == Link.SND_SETTLED:
1162 dlv.settle()
1163 self.decode(dlv.encoded)
1164 return dlv
1165
1167 props = []
1168 for attr in ("inferred", "address", "reply_to", "durable", "ttl",
1169 "priority", "first_acquirer", "delivery_count", "id",
1170 "correlation_id", "user_id", "group_id", "group_sequence",
1171 "reply_to_group_id", "instructions", "annotations",
1172 "properties", "body"):
1173 value = getattr(self, attr)
1174 if value: props.append("%s=%r" % (attr, value))
1175 return "Message(%s)" % ", ".join(props)
1176
1178 tmp = pn_string(None)
1179 err = pn_inspect(self._msg, tmp)
1180 result = pn_string_get(tmp)
1181 pn_free(tmp)
1182 self._check(err)
1183 return result
1184
1186
1189
1190 @property
1192 return pn_subscription_address(self._impl)
1193
1194 _DEFAULT = object()
1197
1198 @staticmethod
1200 if impl is None:
1201 return None
1202 else:
1203 return Selectable(impl)
1204
1207
1210
1212 if fd is _DEFAULT:
1213 return pn_selectable_get_fd(self._impl)
1214 elif fd is None:
1215 pn_selectable_set_fd(self._impl, PN_INVALID_SOCKET)
1216 else:
1217 pn_selectable_set_fd(self._impl, fd)
1218
1220 return pn_selectable_is_reading(self._impl)
1221
1223 pn_selectable_set_reading(self._impl, bool(val))
1224
1225 reading = property(_is_reading, _set_reading)
1226
1228 return pn_selectable_is_writing(self._impl)
1229
1231 pn_selectable_set_writing(self._impl, bool(val))
1232
1233 writing = property(_is_writing, _set_writing)
1234
1236 tstamp = pn_selectable_get_deadline(self._impl)
1237 if tstamp:
1238 return millis2secs(tstamp)
1239 else:
1240 return None
1241
1243 pn_selectable_set_deadline(self._impl, secs2millis(deadline))
1244
1245 deadline = property(_get_deadline, _set_deadline)
1246
1248 pn_selectable_readable(self._impl)
1249
1251 pn_selectable_writable(self._impl)
1252
1254 pn_selectable_expired(self._impl)
1255
1257 return pn_selectable_is_registered(self._impl)
1258
1260 pn_selectable_set_registered(self._impl, registered)
1261
1262 registered = property(_is_registered, _set_registered,
1263 doc="""
1264 The registered property may be get/set by an I/O polling system to
1265 indicate whether the fd has been registered or not.
1266 """)
1267
1268 @property
1270 return pn_selectable_is_terminal(self._impl)
1271
1273 pn_selectable_terminate(self._impl)
1274
1276 pn_selectable_release(self._impl)
1277
1279 """
1280 The DataException class is the root of the Data exception hierarchy.
1281 All exceptions raised by the Data class extend this exception.
1282 """
1283 pass
1284
1286
1289
1291 return "UnmappedType(%s)" % self.msg
1292
1294
1296 return "ulong(%s)" % long.__repr__(self)
1297
1299
1301 return "timestamp(%s)" % long.__repr__(self)
1302
1304
1306 return "symbol(%s)" % unicode.__repr__(self)
1307
1308 -class char(unicode):
1309
1311 return "char(%s)" % unicode.__repr__(self)
1312
1317
1322
1327
1332
1337
1339
1341 return "uint(%s)" % long.__repr__(self)
1342
1344
1346 return "float32(%s)" % float.__repr__(self)
1347
1352
1354
1356 return "decimal64(%s)" % long.__repr__(self)
1357
1359
1361 return "decimal128(%s)" % bytes.__repr__(self)
1362
1364
1365 - def __init__(self, descriptor, value):
1366 self.descriptor = descriptor
1367 self.value = value
1368
1370 return "Described(%r, %r)" % (self.descriptor, self.value)
1371
1373 if isinstance(o, Described):
1374 return self.descriptor == o.descriptor and self.value == o.value
1375 else:
1376 return False
1377
1378 UNDESCRIBED = Constant("UNDESCRIBED")
1379
1380 -class Array(object):
1381
1382 - def __init__(self, descriptor, type, *elements):
1383 self.descriptor = descriptor
1384 self.type = type
1385 self.elements = elements
1386
1388 return iter(self.elements)
1389
1391 if self.elements:
1392 els = ", %s" % (", ".join(map(repr, self.elements)))
1393 else:
1394 els = ""
1395 return "Array(%r, %r%s)" % (self.descriptor, self.type, els)
1396
1398 if isinstance(o, Array):
1399 return self.descriptor == o.descriptor and \
1400 self.type == o.type and self.elements == o.elements
1401 else:
1402 return False
1403
1405 """
1406 The L{Data} class provides an interface for decoding, extracting,
1407 creating, and encoding arbitrary AMQP data. A L{Data} object
1408 contains a tree of AMQP values. Leaf nodes in this tree correspond
1409 to scalars in the AMQP type system such as L{ints<INT>} or
1410 L{strings<STRING>}. Non-leaf nodes in this tree correspond to
1411 compound values in the AMQP type system such as L{lists<LIST>},
1412 L{maps<MAP>}, L{arrays<ARRAY>}, or L{described values<DESCRIBED>}.
1413 The root node of the tree is the L{Data} object itself and can have
1414 an arbitrary number of children.
1415
1416 A L{Data} object maintains the notion of the current sibling node
1417 and a current parent node. Siblings are ordered within their parent.
1418 Values are accessed and/or added by using the L{next}, L{prev},
1419 L{enter}, and L{exit} methods to navigate to the desired location in
1420 the tree and using the supplied variety of put_*/get_* methods to
1421 access or add a value of the desired type.
1422
1423 The put_* methods will always add a value I{after} the current node
1424 in the tree. If the current node has a next sibling the put_* method
1425 will overwrite the value on this node. If there is no current node
1426 or the current node has no next sibling then one will be added. The
1427 put_* methods always set the added/modified node to the current
1428 node. The get_* methods read the value of the current node and do
1429 not change which node is current.
1430
1431 The following types of scalar values are supported:
1432
1433 - L{NULL}
1434 - L{BOOL}
1435 - L{UBYTE}
1436 - L{USHORT}
1437 - L{SHORT}
1438 - L{UINT}
1439 - L{INT}
1440 - L{ULONG}
1441 - L{LONG}
1442 - L{FLOAT}
1443 - L{DOUBLE}
1444 - L{BINARY}
1445 - L{STRING}
1446 - L{SYMBOL}
1447
1448 The following types of compound values are supported:
1449
1450 - L{DESCRIBED}
1451 - L{ARRAY}
1452 - L{LIST}
1453 - L{MAP}
1454 """
1455
1456 NULL = PN_NULL; "A null value."
1457 BOOL = PN_BOOL; "A boolean value."
1458 UBYTE = PN_UBYTE; "An unsigned byte value."
1459 BYTE = PN_BYTE; "A signed byte value."
1460 USHORT = PN_USHORT; "An unsigned short value."
1461 SHORT = PN_SHORT; "A short value."
1462 UINT = PN_UINT; "An unsigned int value."
1463 INT = PN_INT; "A signed int value."
1464 CHAR = PN_CHAR; "A character value."
1465 ULONG = PN_ULONG; "An unsigned long value."
1466 LONG = PN_LONG; "A signed long value."
1467 TIMESTAMP = PN_TIMESTAMP; "A timestamp value."
1468 FLOAT = PN_FLOAT; "A float value."
1469 DOUBLE = PN_DOUBLE; "A double value."
1470 DECIMAL32 = PN_DECIMAL32; "A DECIMAL32 value."
1471 DECIMAL64 = PN_DECIMAL64; "A DECIMAL64 value."
1472 DECIMAL128 = PN_DECIMAL128; "A DECIMAL128 value."
1473 UUID = PN_UUID; "A UUID value."
1474 BINARY = PN_BINARY; "A binary string."
1475 STRING = PN_STRING; "A unicode string."
1476 SYMBOL = PN_SYMBOL; "A symbolic string."
1477 DESCRIBED = PN_DESCRIBED; "A described value."
1478 ARRAY = PN_ARRAY; "An array value."
1479 LIST = PN_LIST; "A list value."
1480 MAP = PN_MAP; "A map value."
1481
1482 type_names = {
1483 NULL: "null",
1484 BOOL: "bool",
1485 BYTE: "byte",
1486 UBYTE: "ubyte",
1487 SHORT: "short",
1488 USHORT: "ushort",
1489 INT: "int",
1490 UINT: "uint",
1491 CHAR: "char",
1492 LONG: "long",
1493 ULONG: "ulong",
1494 TIMESTAMP: "timestamp",
1495 FLOAT: "float",
1496 DOUBLE: "double",
1497 DECIMAL32: "decimal32",
1498 DECIMAL64: "decimal64",
1499 DECIMAL128: "decimal128",
1500 UUID: "uuid",
1501 BINARY: "binary",
1502 STRING: "string",
1503 SYMBOL: "symbol",
1504 DESCRIBED: "described",
1505 ARRAY: "array",
1506 LIST: "list",
1507 MAP: "map"
1508 }
1509
1510 @classmethod
1512
1520
1522 if self._free and hasattr(self, "_data"):
1523 pn_data_free(self._data)
1524 del self._data
1525
1527 if err < 0:
1528 exc = EXCEPTIONS.get(err, DataException)
1529 raise exc("[%s]: %s" % (err, pn_error_text(pn_data_error(self._data))))
1530 else:
1531 return err
1532
1534 """
1535 Clears the data object.
1536 """
1537 pn_data_clear(self._data)
1538
1540 """
1541 Clears current node and sets the parent to the root node. Clearing the
1542 current node sets it _before_ the first node, calling next() will advance to
1543 the first node.
1544 """
1545 assert self._data is not None
1546 pn_data_rewind(self._data)
1547
1549 """
1550 Advances the current node to its next sibling and returns its
1551 type. If there is no next sibling the current node remains
1552 unchanged and None is returned.
1553 """
1554 found = pn_data_next(self._data)
1555 if found:
1556 return self.type()
1557 else:
1558 return None
1559
1561 """
1562 Advances the current node to its previous sibling and returns its
1563 type. If there is no previous sibling the current node remains
1564 unchanged and None is returned.
1565 """
1566 found = pn_data_prev(self._data)
1567 if found:
1568 return self.type()
1569 else:
1570 return None
1571
1573 """
1574 Sets the parent node to the current node and clears the current node.
1575 Clearing the current node sets it _before_ the first child,
1576 call next() advances to the first child.
1577 """
1578 return pn_data_enter(self._data)
1579
1581 """
1582 Sets the current node to the parent node and the parent node to
1583 its own parent.
1584 """
1585 return pn_data_exit(self._data)
1586
1588 return pn_data_lookup(self._data, name)
1589
1591 pn_data_narrow(self._data)
1592
1594 pn_data_widen(self._data)
1595
1597 """
1598 Returns the type of the current node.
1599 """
1600 dtype = pn_data_type(self._data)
1601 if dtype == -1:
1602 return None
1603 else:
1604 return dtype
1605
1607 """
1608 Returns the size in bytes needed to encode the data in AMQP format.
1609 """
1610 return pn_data_encoded_size(self._data)
1611
1613 """
1614 Returns a representation of the data encoded in AMQP format.
1615 """
1616 size = 1024
1617 while True:
1618 cd, enc = pn_data_encode(self._data, size)
1619 if cd == PN_OVERFLOW:
1620 size *= 2
1621 elif cd >= 0:
1622 return enc
1623 else:
1624 self._check(cd)
1625
1627 """
1628 Decodes the first value from supplied AMQP data and returns the
1629 number of bytes consumed.
1630
1631 @type encoded: binary
1632 @param encoded: AMQP encoded binary data
1633 """
1634 return self._check(pn_data_decode(self._data, encoded))
1635
1637 """
1638 Puts a list value. Elements may be filled by entering the list
1639 node and putting element values.
1640
1641 >>> data = Data()
1642 >>> data.put_list()
1643 >>> data.enter()
1644 >>> data.put_int(1)
1645 >>> data.put_int(2)
1646 >>> data.put_int(3)
1647 >>> data.exit()
1648 """
1649 self._check(pn_data_put_list(self._data))
1650
1652 """
1653 Puts a map value. Elements may be filled by entering the map node
1654 and putting alternating key value pairs.
1655
1656 >>> data = Data()
1657 >>> data.put_map()
1658 >>> data.enter()
1659 >>> data.put_string("key")
1660 >>> data.put_string("value")
1661 >>> data.exit()
1662 """
1663 self._check(pn_data_put_map(self._data))
1664
1665 - def put_array(self, described, element_type):
1666 """
1667 Puts an array value. Elements may be filled by entering the array
1668 node and putting the element values. The values must all be of the
1669 specified array element type. If an array is described then the
1670 first child value of the array is the descriptor and may be of any
1671 type.
1672
1673 >>> data = Data()
1674 >>>
1675 >>> data.put_array(False, Data.INT)
1676 >>> data.enter()
1677 >>> data.put_int(1)
1678 >>> data.put_int(2)
1679 >>> data.put_int(3)
1680 >>> data.exit()
1681 >>>
1682 >>> data.put_array(True, Data.DOUBLE)
1683 >>> data.enter()
1684 >>> data.put_symbol("array-descriptor")
1685 >>> data.put_double(1.1)
1686 >>> data.put_double(1.2)
1687 >>> data.put_double(1.3)
1688 >>> data.exit()
1689
1690 @type described: bool
1691 @param described: specifies whether the array is described
1692 @type element_type: int
1693 @param element_type: the type of the array elements
1694 """
1695 self._check(pn_data_put_array(self._data, described, element_type))
1696
1698 """
1699 Puts a described value. A described node has two children, the
1700 descriptor and the value. These are specified by entering the node
1701 and putting the desired values.
1702
1703 >>> data = Data()
1704 >>> data.put_described()
1705 >>> data.enter()
1706 >>> data.put_symbol("value-descriptor")
1707 >>> data.put_string("the value")
1708 >>> data.exit()
1709 """
1710 self._check(pn_data_put_described(self._data))
1711
1713 """
1714 Puts a null value.
1715 """
1716 self._check(pn_data_put_null(self._data))
1717
1719 """
1720 Puts a boolean value.
1721
1722 @param b: a boolean value
1723 """
1724 self._check(pn_data_put_bool(self._data, b))
1725
1727 """
1728 Puts an unsigned byte value.
1729
1730 @param ub: an integral value
1731 """
1732 self._check(pn_data_put_ubyte(self._data, ub))
1733
1735 """
1736 Puts a signed byte value.
1737
1738 @param b: an integral value
1739 """
1740 self._check(pn_data_put_byte(self._data, b))
1741
1743 """
1744 Puts an unsigned short value.
1745
1746 @param us: an integral value.
1747 """
1748 self._check(pn_data_put_ushort(self._data, us))
1749
1751 """
1752 Puts a signed short value.
1753
1754 @param s: an integral value
1755 """
1756 self._check(pn_data_put_short(self._data, s))
1757
1759 """
1760 Puts an unsigned int value.
1761
1762 @param ui: an integral value
1763 """
1764 self._check(pn_data_put_uint(self._data, ui))
1765
1767 """
1768 Puts a signed int value.
1769
1770 @param i: an integral value
1771 """
1772 self._check(pn_data_put_int(self._data, i))
1773
1775 """
1776 Puts a char value.
1777
1778 @param c: a single character
1779 """
1780 self._check(pn_data_put_char(self._data, ord(c)))
1781
1783 """
1784 Puts an unsigned long value.
1785
1786 @param ul: an integral value
1787 """
1788 self._check(pn_data_put_ulong(self._data, ul))
1789
1791 """
1792 Puts a signed long value.
1793
1794 @param l: an integral value
1795 """
1796 self._check(pn_data_put_long(self._data, l))
1797
1799 """
1800 Puts a timestamp value.
1801
1802 @param t: an integral value
1803 """
1804 self._check(pn_data_put_timestamp(self._data, t))
1805
1807 """
1808 Puts a float value.
1809
1810 @param f: a floating point value
1811 """
1812 self._check(pn_data_put_float(self._data, f))
1813
1815 """
1816 Puts a double value.
1817
1818 @param d: a floating point value.
1819 """
1820 self._check(pn_data_put_double(self._data, d))
1821
1823 """
1824 Puts a decimal32 value.
1825
1826 @param d: a decimal32 value
1827 """
1828 self._check(pn_data_put_decimal32(self._data, d))
1829
1831 """
1832 Puts a decimal64 value.
1833
1834 @param d: a decimal64 value
1835 """
1836 self._check(pn_data_put_decimal64(self._data, d))
1837
1839 """
1840 Puts a decimal128 value.
1841
1842 @param d: a decimal128 value
1843 """
1844 self._check(pn_data_put_decimal128(self._data, d))
1845
1847 """
1848 Puts a UUID value.
1849
1850 @param u: a uuid value
1851 """
1852 self._check(pn_data_put_uuid(self._data, u.bytes))
1853
1855 """
1856 Puts a binary value.
1857
1858 @type b: binary
1859 @param b: a binary value
1860 """
1861 self._check(pn_data_put_binary(self._data, b))
1862
1864 """Put a python memoryview object as an AMQP binary value"""
1865 self.put_binary(mv.tobytes())
1866
1868 """Put a python buffer object as an AMQP binary value"""
1869 self.put_binary(bytes(buff))
1870
1872 """
1873 Puts a unicode value.
1874
1875 @type s: unicode
1876 @param s: a unicode value
1877 """
1878 self._check(pn_data_put_string(self._data, s.encode("utf8")))
1879
1881 """
1882 Puts a symbolic value.
1883
1884 @type s: string
1885 @param s: the symbol name
1886 """
1887 self._check(pn_data_put_symbol(self._data, s.encode('ascii')))
1888
1890 """
1891 If the current node is a list, return the number of elements,
1892 otherwise return zero. List elements can be accessed by entering
1893 the list.
1894
1895 >>> count = data.get_list()
1896 >>> data.enter()
1897 >>> for i in range(count):
1898 ... type = data.next()
1899 ... if type == Data.STRING:
1900 ... print data.get_string()
1901 ... elif type == ...:
1902 ... ...
1903 >>> data.exit()
1904 """
1905 return pn_data_get_list(self._data)
1906
1908 """
1909 If the current node is a map, return the number of child elements,
1910 otherwise return zero. Key value pairs can be accessed by entering
1911 the map.
1912
1913 >>> count = data.get_map()
1914 >>> data.enter()
1915 >>> for i in range(count/2):
1916 ... type = data.next()
1917 ... if type == Data.STRING:
1918 ... print data.get_string()
1919 ... elif type == ...:
1920 ... ...
1921 >>> data.exit()
1922 """
1923 return pn_data_get_map(self._data)
1924
1926 """
1927 If the current node is an array, return a tuple of the element
1928 count, a boolean indicating whether the array is described, and
1929 the type of each element, otherwise return (0, False, None). Array
1930 data can be accessed by entering the array.
1931
1932 >>> # read an array of strings with a symbolic descriptor
1933 >>> count, described, type = data.get_array()
1934 >>> data.enter()
1935 >>> data.next()
1936 >>> print "Descriptor:", data.get_symbol()
1937 >>> for i in range(count):
1938 ... data.next()
1939 ... print "Element:", data.get_string()
1940 >>> data.exit()
1941 """
1942 count = pn_data_get_array(self._data)
1943 described = pn_data_is_array_described(self._data)
1944 type = pn_data_get_array_type(self._data)
1945 if type == -1:
1946 type = None
1947 return count, described, type
1948
1950 """
1951 Checks if the current node is a described value. The descriptor
1952 and value may be accessed by entering the described value.
1953
1954 >>> # read a symbolically described string
1955 >>> assert data.is_described() # will error if the current node is not described
1956 >>> data.enter()
1957 >>> data.next()
1958 >>> print data.get_symbol()
1959 >>> data.next()
1960 >>> print data.get_string()
1961 >>> data.exit()
1962 """
1963 return pn_data_is_described(self._data)
1964
1966 """
1967 Checks if the current node is a null.
1968 """
1969 return pn_data_is_null(self._data)
1970
1972 """
1973 If the current node is a boolean, returns its value, returns False
1974 otherwise.
1975 """
1976 return pn_data_get_bool(self._data)
1977
1979 """
1980 If the current node is an unsigned byte, returns its value,
1981 returns 0 otherwise.
1982 """
1983 return ubyte(pn_data_get_ubyte(self._data))
1984
1986 """
1987 If the current node is a signed byte, returns its value, returns 0
1988 otherwise.
1989 """
1990 return byte(pn_data_get_byte(self._data))
1991
1993 """
1994 If the current node is an unsigned short, returns its value,
1995 returns 0 otherwise.
1996 """
1997 return ushort(pn_data_get_ushort(self._data))
1998
2000 """
2001 If the current node is a signed short, returns its value, returns
2002 0 otherwise.
2003 """
2004 return short(pn_data_get_short(self._data))
2005
2007 """
2008 If the current node is an unsigned int, returns its value, returns
2009 0 otherwise.
2010 """
2011 return uint(pn_data_get_uint(self._data))
2012
2014 """
2015 If the current node is a signed int, returns its value, returns 0
2016 otherwise.
2017 """
2018 return int32(pn_data_get_int(self._data))
2019
2021 """
2022 If the current node is a char, returns its value, returns 0
2023 otherwise.
2024 """
2025 return char(_compat.unichar(pn_data_get_char(self._data)))
2026
2028 """
2029 If the current node is an unsigned long, returns its value,
2030 returns 0 otherwise.
2031 """
2032 return ulong(pn_data_get_ulong(self._data))
2033
2035 """
2036 If the current node is an signed long, returns its value, returns
2037 0 otherwise.
2038 """
2039 return long(pn_data_get_long(self._data))
2040
2042 """
2043 If the current node is a timestamp, returns its value, returns 0
2044 otherwise.
2045 """
2046 return timestamp(pn_data_get_timestamp(self._data))
2047
2049 """
2050 If the current node is a float, returns its value, raises 0
2051 otherwise.
2052 """
2053 return float32(pn_data_get_float(self._data))
2054
2056 """
2057 If the current node is a double, returns its value, returns 0
2058 otherwise.
2059 """
2060 return pn_data_get_double(self._data)
2061
2062
2064 """
2065 If the current node is a decimal32, returns its value, returns 0
2066 otherwise.
2067 """
2068 return decimal32(pn_data_get_decimal32(self._data))
2069
2070
2072 """
2073 If the current node is a decimal64, returns its value, returns 0
2074 otherwise.
2075 """
2076 return decimal64(pn_data_get_decimal64(self._data))
2077
2078
2080 """
2081 If the current node is a decimal128, returns its value, returns 0
2082 otherwise.
2083 """
2084 return decimal128(pn_data_get_decimal128(self._data))
2085
2087 """
2088 If the current node is a UUID, returns its value, returns None
2089 otherwise.
2090 """
2091 if pn_data_type(self._data) == Data.UUID:
2092 return uuid.UUID(bytes=pn_data_get_uuid(self._data))
2093 else:
2094 return None
2095
2097 """
2098 If the current node is binary, returns its value, returns ""
2099 otherwise.
2100 """
2101 return pn_data_get_binary(self._data)
2102
2104 """
2105 If the current node is a string, returns its value, returns ""
2106 otherwise.
2107 """
2108 return pn_data_get_string(self._data).decode("utf8")
2109
2111 """
2112 If the current node is a symbol, returns its value, returns ""
2113 otherwise.
2114 """
2115 return symbol(pn_data_get_symbol(self._data).decode('ascii'))
2116
2117 - def copy(self, src):
2118 self._check(pn_data_copy(self._data, src._data))
2119
2130
2132 pn_data_dump(self._data)
2133
2143
2145 if self.enter():
2146 try:
2147 result = {}
2148 while self.next():
2149 k = self.get_object()
2150 if self.next():
2151 v = self.get_object()
2152 else:
2153 v = None
2154 result[k] = v
2155 finally:
2156 self.exit()
2157 return result
2158
2167
2169 if self.enter():
2170 try:
2171 result = []
2172 while self.next():
2173 result.append(self.get_object())
2174 finally:
2175 self.exit()
2176 return result
2177
2188
2197
2199 """
2200 If the current node is an array, return an Array object
2201 representing the array and its contents. Otherwise return None.
2202 This is a convenience wrapper around get_array, enter, etc.
2203 """
2204
2205 count, described, type = self.get_array()
2206 if type is None: return None
2207 if self.enter():
2208 try:
2209 if described:
2210 self.next()
2211 descriptor = self.get_object()
2212 else:
2213 descriptor = UNDESCRIBED
2214 elements = []
2215 while self.next():
2216 elements.append(self.get_object())
2217 finally:
2218 self.exit()
2219 return Array(descriptor, type, *elements)
2220
2232
2233 put_mappings = {
2234 None.__class__: lambda s, _: s.put_null(),
2235 bool: put_bool,
2236 ubyte: put_ubyte,
2237 ushort: put_ushort,
2238 uint: put_uint,
2239 ulong: put_ulong,
2240 byte: put_byte,
2241 short: put_short,
2242 int32: put_int,
2243 long: put_long,
2244 float32: put_float,
2245 float: put_double,
2246 decimal32: put_decimal32,
2247 decimal64: put_decimal64,
2248 decimal128: put_decimal128,
2249 char: put_char,
2250 timestamp: put_timestamp,
2251 uuid.UUID: put_uuid,
2252 bytes: put_binary,
2253 unicode: put_string,
2254 symbol: put_symbol,
2255 list: put_sequence,
2256 tuple: put_sequence,
2257 dict: put_dict,
2258 Described: put_py_described,
2259 Array: put_py_array
2260 }
2261
2262
2263 if int not in put_mappings:
2264 put_mappings[int] = put_int
2265
2266 try: put_mappings[memoryview] = put_memoryview
2267 except NameError: pass
2268 try: put_mappings[buffer] = put_buffer
2269 except NameError: pass
2270 get_mappings = {
2271 NULL: lambda s: None,
2272 BOOL: get_bool,
2273 BYTE: get_byte,
2274 UBYTE: get_ubyte,
2275 SHORT: get_short,
2276 USHORT: get_ushort,
2277 INT: get_int,
2278 UINT: get_uint,
2279 CHAR: get_char,
2280 LONG: get_long,
2281 ULONG: get_ulong,
2282 TIMESTAMP: get_timestamp,
2283 FLOAT: get_float,
2284 DOUBLE: get_double,
2285 DECIMAL32: get_decimal32,
2286 DECIMAL64: get_decimal64,
2287 DECIMAL128: get_decimal128,
2288 UUID: get_uuid,
2289 BINARY: get_binary,
2290 STRING: get_string,
2291 SYMBOL: get_symbol,
2292 DESCRIBED: get_py_described,
2293 ARRAY: get_py_array,
2294 LIST: get_sequence,
2295 MAP: get_dict
2296 }
2297
2298
2300 putter = self.put_mappings[obj.__class__]
2301 putter(self, obj)
2302
2304 type = self.type()
2305 if type is None: return None
2306 getter = self.get_mappings.get(type)
2307 if getter:
2308 return getter(self)
2309 else:
2310 return UnmappedType(str(type))
2311
2314
2316
2317 LOCAL_UNINIT = PN_LOCAL_UNINIT
2318 REMOTE_UNINIT = PN_REMOTE_UNINIT
2319 LOCAL_ACTIVE = PN_LOCAL_ACTIVE
2320 REMOTE_ACTIVE = PN_REMOTE_ACTIVE
2321 LOCAL_CLOSED = PN_LOCAL_CLOSED
2322 REMOTE_CLOSED = PN_REMOTE_CLOSED
2323
2326
2328 obj2cond(self.condition, self._get_cond_impl())
2329
2330 @property
2332 return cond2obj(self._get_remote_cond_impl())
2333
2334
2336 assert False, "Subclass must override this!"
2337
2339 assert False, "Subclass must override this!"
2340
2350
2362
2363 handler = property(_get_handler, _set_handler)
2364
2365 @property
2368
2370
2371 - def __init__(self, name, description=None, info=None):
2372 self.name = name
2373 self.description = description
2374 self.info = info
2375
2377 return "Condition(%s)" % ", ".join([repr(x) for x in
2378 (self.name, self.description, self.info)
2379 if x])
2380
2382 if not isinstance(o, Condition): return False
2383 return self.name == o.name and \
2384 self.description == o.description and \
2385 self.info == o.info
2386
2388 pn_condition_clear(cond)
2389 if obj:
2390 pn_condition_set_name(cond, str(obj.name))
2391 pn_condition_set_description(cond, obj.description)
2392 info = Data(pn_condition_info(cond))
2393 if obj.info:
2394 info.put_object(obj.info)
2395
2397 if pn_condition_is_set(cond):
2398 return Condition(pn_condition_get_name(cond),
2399 pn_condition_get_description(cond),
2400 dat2obj(pn_condition_info(cond)))
2401 else:
2402 return None
2403
2412
2417
2419 return long(secs*1000)
2420
2422 return float(millis)/1000.0
2423
2425 if secs is None: return PN_MILLIS_MAX
2426 return secs2millis(secs)
2427
2429 if millis == PN_MILLIS_MAX: return None
2430 return millis2secs(millis)
2431
2433 """Some Proton APIs expect a null terminated string. Convert python text
2434 types to UTF8 to avoid zero bytes introduced by other multi-byte encodings.
2435 This method will throw if the string cannot be converted.
2436 """
2437 if string is None:
2438 return None
2439 if _compat.IS_PY2:
2440 if isinstance(string, unicode):
2441 return string.encode('utf-8')
2442 elif isinstance(string, str):
2443 return string
2444 else:
2445
2446 if isinstance(string, str):
2447 string = string.encode('utf-8')
2448
2449 if isinstance(string, bytes):
2450 return string.decode('utf-8')
2451 raise TypeError("Unrecognized string type: %r (%s)" % (string, type(string)))
2452
2454 """Covert C strings returned from proton-c into python unicode"""
2455 if string is None:
2456 return None
2457 if isinstance(string, _compat.TEXT_TYPES):
2458
2459 return string
2460 elif isinstance(string, _compat.BINARY_TYPES):
2461 return string.decode('utf8')
2462 else:
2463 raise TypeError("Unrecognized string type")
2464
2466 """
2467 A representation of an AMQP connection
2468 """
2469
2470 @staticmethod
2472 if impl is None:
2473 return None
2474 else:
2475 return Connection(impl)
2476
2477 - def __init__(self, impl = pn_connection):
2479
2481 Endpoint._init(self)
2482 self.offered_capabilities = None
2483 self.desired_capabilities = None
2484 self.properties = None
2485
2487 return pn_connection_attachments(self._impl)
2488
2489 @property
2492
2493 @property
2496
2498 if err < 0:
2499 exc = EXCEPTIONS.get(err, ConnectionException)
2500 raise exc("[%s]: %s" % (err, pn_connection_error(self._impl)))
2501 else:
2502 return err
2503
2505 return pn_connection_condition(self._impl)
2506
2508 return pn_connection_remote_condition(self._impl)
2509
2511 if collector is None:
2512 pn_connection_collect(self._impl, None)
2513 else:
2514 pn_connection_collect(self._impl, collector._impl)
2515 self._collector = weakref.ref(collector)
2516
2518 return utf82unicode(pn_connection_get_container(self._impl))
2520 return pn_connection_set_container(self._impl, unicode2utf8(name))
2521
2522 container = property(_get_container, _set_container)
2523
2525 return utf82unicode(pn_connection_get_hostname(self._impl))
2527 return pn_connection_set_hostname(self._impl, unicode2utf8(name))
2528
2529 hostname = property(_get_hostname, _set_hostname,
2530 doc="""
2531 Set the name of the host (either fully qualified or relative) to which this
2532 connection is connecting to. This information may be used by the remote
2533 peer to determine the correct back-end service to connect the client to.
2534 This value will be sent in the Open performative, and will be used by SSL
2535 and SASL layers to identify the peer.
2536 """)
2537
2539 return utf82unicode(pn_connection_get_user(self._impl))
2541 return pn_connection_set_user(self._impl, unicode2utf8(name))
2542
2543 user = property(_get_user, _set_user)
2544
2548 return pn_connection_set_password(self._impl, unicode2utf8(name))
2549
2550 password = property(_get_password, _set_password)
2551
2552 @property
2554 """The container identifier specified by the remote peer for this connection."""
2555 return pn_connection_remote_container(self._impl)
2556
2557 @property
2559 """The hostname specified by the remote peer for this connection."""
2560 return pn_connection_remote_hostname(self._impl)
2561
2562 @property
2564 """The capabilities offered by the remote peer for this connection."""
2565 return dat2obj(pn_connection_remote_offered_capabilities(self._impl))
2566
2567 @property
2569 """The capabilities desired by the remote peer for this connection."""
2570 return dat2obj(pn_connection_remote_desired_capabilities(self._impl))
2571
2572 @property
2574 """The properties specified by the remote peer for this connection."""
2575 return dat2obj(pn_connection_remote_properties(self._impl))
2576
2578 """
2579 Opens the connection.
2580
2581 In more detail, this moves the local state of the connection to
2582 the ACTIVE state and triggers an open frame to be sent to the
2583 peer. A connection is fully active once both peers have opened it.
2584 """
2585 obj2dat(self.offered_capabilities,
2586 pn_connection_offered_capabilities(self._impl))
2587 obj2dat(self.desired_capabilities,
2588 pn_connection_desired_capabilities(self._impl))
2589 obj2dat(self.properties, pn_connection_properties(self._impl))
2590 pn_connection_open(self._impl)
2591
2593 """
2594 Closes the connection.
2595
2596 In more detail, this moves the local state of the connection to
2597 the CLOSED state and triggers a close frame to be sent to the
2598 peer. A connection is fully closed once both peers have closed it.
2599 """
2600 self._update_cond()
2601 pn_connection_close(self._impl)
2602 if hasattr(self, '_session_policy'):
2603
2604 del self._session_policy
2605
2606 @property
2608 """
2609 The state of the connection as a bit field. The state has a local
2610 and a remote component. Each of these can be in one of three
2611 states: UNINIT, ACTIVE or CLOSED. These can be tested by masking
2612 against LOCAL_UNINIT, LOCAL_ACTIVE, LOCAL_CLOSED, REMOTE_UNINIT,
2613 REMOTE_ACTIVE and REMOTE_CLOSED.
2614 """
2615 return pn_connection_state(self._impl)
2616
2618 """
2619 Returns a new session on this connection.
2620 """
2621 ssn = pn_session(self._impl)
2622 if ssn is None:
2623 raise(SessionException("Session allocation failed."))
2624 else:
2625 return Session(ssn)
2626
2628 return Session.wrap(pn_session_head(self._impl, mask))
2629
2631 return Link.wrap(pn_link_head(self._impl, mask))
2632
2633 @property
2636
2637 @property
2639 return pn_error_code(pn_connection_error(self._impl))
2640
2642 pn_connection_release(self._impl)
2643
2646
2648
2649 @staticmethod
2651 if impl is None:
2652 return None
2653 else:
2654 return Session(impl)
2655
2658
2660 return pn_session_attachments(self._impl)
2661
2663 return pn_session_condition(self._impl)
2664
2666 return pn_session_remote_condition(self._impl)
2667
2669 return pn_session_get_incoming_capacity(self._impl)
2670
2672 pn_session_set_incoming_capacity(self._impl, capacity)
2673
2674 incoming_capacity = property(_get_incoming_capacity, _set_incoming_capacity)
2675
2677 return pn_session_get_outgoing_window(self._impl)
2678
2680 pn_session_set_outgoing_window(self._impl, window)
2681
2682 outgoing_window = property(_get_outgoing_window, _set_outgoing_window)
2683
2684 @property
2686 return pn_session_outgoing_bytes(self._impl)
2687
2688 @property
2690 return pn_session_incoming_bytes(self._impl)
2691
2693 pn_session_open(self._impl)
2694
2696 self._update_cond()
2697 pn_session_close(self._impl)
2698
2699 - def next(self, mask):
2700 return Session.wrap(pn_session_next(self._impl, mask))
2701
2702 @property
2704 return pn_session_state(self._impl)
2705
2706 @property
2709
2711 return Sender(pn_sender(self._impl, unicode2utf8(name)))
2712
2714 return Receiver(pn_receiver(self._impl, unicode2utf8(name)))
2715
2717 pn_session_free(self._impl)
2718
2721
2722 -class Link(Wrapper, Endpoint):
2723 """
2724 A representation of an AMQP link, of which there are two concrete
2725 implementations, Sender and Receiver.
2726 """
2727
2728 SND_UNSETTLED = PN_SND_UNSETTLED
2729 SND_SETTLED = PN_SND_SETTLED
2730 SND_MIXED = PN_SND_MIXED
2731
2732 RCV_FIRST = PN_RCV_FIRST
2733 RCV_SECOND = PN_RCV_SECOND
2734
2735 @staticmethod
2737 if impl is None: return None
2738 if pn_link_is_sender(impl):
2739 return Sender(impl)
2740 else:
2741 return Receiver(impl)
2742
2745
2747 return pn_link_attachments(self._impl)
2748
2750 if err < 0:
2751 exc = EXCEPTIONS.get(err, LinkException)
2752 raise exc("[%s]: %s" % (err, pn_error_text(pn_link_error(self._impl))))
2753 else:
2754 return err
2755
2757 return pn_link_condition(self._impl)
2758
2760 return pn_link_remote_condition(self._impl)
2761
2763 """
2764 Opens the link.
2765
2766 In more detail, this moves the local state of the link to the
2767 ACTIVE state and triggers an attach frame to be sent to the
2768 peer. A link is fully active once both peers have attached it.
2769 """
2770 pn_link_open(self._impl)
2771
2773 """
2774 Closes the link.
2775
2776 In more detail, this moves the local state of the link to the
2777 CLOSED state and triggers an detach frame (with the closed flag
2778 set) to be sent to the peer. A link is fully closed once both
2779 peers have detached it.
2780 """
2781 self._update_cond()
2782 pn_link_close(self._impl)
2783
2784 @property
2786 """
2787 The state of the link as a bit field. The state has a local
2788 and a remote component. Each of these can be in one of three
2789 states: UNINIT, ACTIVE or CLOSED. These can be tested by masking
2790 against LOCAL_UNINIT, LOCAL_ACTIVE, LOCAL_CLOSED, REMOTE_UNINIT,
2791 REMOTE_ACTIVE and REMOTE_CLOSED.
2792 """
2793 return pn_link_state(self._impl)
2794
2795 @property
2797 """The source of the link as described by the local peer."""
2798 return Terminus(pn_link_source(self._impl))
2799
2800 @property
2802 """The target of the link as described by the local peer."""
2803 return Terminus(pn_link_target(self._impl))
2804
2805 @property
2807 """The source of the link as described by the remote peer."""
2808 return Terminus(pn_link_remote_source(self._impl))
2809 @property
2811 """The target of the link as described by the remote peer."""
2812 return Terminus(pn_link_remote_target(self._impl))
2813
2814 @property
2817
2818 @property
2820 """The connection on which this link was attached."""
2821 return self.session.connection
2822
2825
2826 @property
2829
2831 return pn_link_advance(self._impl)
2832
2833 @property
2835 return pn_link_unsettled(self._impl)
2836
2837 @property
2839 """The amount of outstanding credit on this link."""
2840 return pn_link_credit(self._impl)
2841
2842 @property
2844 return pn_link_available(self._impl)
2845
2846 @property
2848 return pn_link_queued(self._impl)
2849
2850 - def next(self, mask):
2851 return Link.wrap(pn_link_next(self._impl, mask))
2852
2853 @property
2855 """Returns the name of the link"""
2856 return utf82unicode(pn_link_name(self._impl))
2857
2858 @property
2860 """Returns true if this link is a sender."""
2861 return pn_link_is_sender(self._impl)
2862
2863 @property
2865 """Returns true if this link is a receiver."""
2866 return pn_link_is_receiver(self._impl)
2867
2868 @property
2870 return pn_link_remote_snd_settle_mode(self._impl)
2871
2872 @property
2874 return pn_link_remote_rcv_settle_mode(self._impl)
2875
2877 return pn_link_snd_settle_mode(self._impl)
2879 pn_link_set_snd_settle_mode(self._impl, mode)
2880 snd_settle_mode = property(_get_snd_settle_mode, _set_snd_settle_mode)
2881
2883 return pn_link_rcv_settle_mode(self._impl)
2885 pn_link_set_rcv_settle_mode(self._impl, mode)
2886 rcv_settle_mode = property(_get_rcv_settle_mode, _set_rcv_settle_mode)
2887
2889 return pn_link_get_drain(self._impl)
2890
2892 pn_link_set_drain(self._impl, bool(b))
2893
2894 drain_mode = property(_get_drain, _set_drain)
2895
2897 return pn_link_drained(self._impl)
2898
2899 @property
2901 return pn_link_remote_max_message_size(self._impl)
2902
2904 return pn_link_max_message_size(self._impl)
2906 pn_link_set_max_message_size(self._impl, mode)
2907 max_message_size = property(_get_max_message_size, _set_max_message_size)
2908
2910 return pn_link_detach(self._impl)
2911
2913 pn_link_free(self._impl)
2914
2916
2917 UNSPECIFIED = PN_UNSPECIFIED
2918 SOURCE = PN_SOURCE
2919 TARGET = PN_TARGET
2920 COORDINATOR = PN_COORDINATOR
2921
2922 NONDURABLE = PN_NONDURABLE
2923 CONFIGURATION = PN_CONFIGURATION
2924 DELIVERIES = PN_DELIVERIES
2925
2926 DIST_MODE_UNSPECIFIED = PN_DIST_MODE_UNSPECIFIED
2927 DIST_MODE_COPY = PN_DIST_MODE_COPY
2928 DIST_MODE_MOVE = PN_DIST_MODE_MOVE
2929
2930 EXPIRE_WITH_LINK = PN_EXPIRE_WITH_LINK
2931 EXPIRE_WITH_SESSION = PN_EXPIRE_WITH_SESSION
2932 EXPIRE_WITH_CONNECTION = PN_EXPIRE_WITH_CONNECTION
2933 EXPIRE_NEVER = PN_EXPIRE_NEVER
2934
2937
2939 if err < 0:
2940 exc = EXCEPTIONS.get(err, LinkException)
2941 raise exc("[%s]" % err)
2942 else:
2943 return err
2944
2946 return pn_terminus_get_type(self._impl)
2948 self._check(pn_terminus_set_type(self._impl, type))
2949 type = property(_get_type, _set_type)
2950
2952 """The address that identifies the source or target node"""
2953 return utf82unicode(pn_terminus_get_address(self._impl))
2955 self._check(pn_terminus_set_address(self._impl, unicode2utf8(address)))
2956 address = property(_get_address, _set_address)
2957
2959 return pn_terminus_get_durability(self._impl)
2961 self._check(pn_terminus_set_durability(self._impl, seconds))
2962 durability = property(_get_durability, _set_durability)
2963
2965 return pn_terminus_get_expiry_policy(self._impl)
2967 self._check(pn_terminus_set_expiry_policy(self._impl, seconds))
2968 expiry_policy = property(_get_expiry_policy, _set_expiry_policy)
2969
2971 return pn_terminus_get_timeout(self._impl)
2973 self._check(pn_terminus_set_timeout(self._impl, seconds))
2974 timeout = property(_get_timeout, _set_timeout)
2975
2977 """Indicates whether the source or target node was dynamically
2978 created"""
2979 return pn_terminus_is_dynamic(self._impl)
2981 self._check(pn_terminus_set_dynamic(self._impl, dynamic))
2982 dynamic = property(_is_dynamic, _set_dynamic)
2983
2985 return pn_terminus_get_distribution_mode(self._impl)
2987 self._check(pn_terminus_set_distribution_mode(self._impl, mode))
2988 distribution_mode = property(_get_distribution_mode, _set_distribution_mode)
2989
2990 @property
2992 """Properties of a dynamic source or target."""
2993 return Data(pn_terminus_properties(self._impl))
2994
2995 @property
2997 """Capabilities of the source or target."""
2998 return Data(pn_terminus_capabilities(self._impl))
2999
3000 @property
3002 return Data(pn_terminus_outcomes(self._impl))
3003
3004 @property
3006 """A filter on a source allows the set of messages transfered over
3007 the link to be restricted"""
3008 return Data(pn_terminus_filter(self._impl))
3009
3010 - def copy(self, src):
3011 self._check(pn_terminus_copy(self._impl, src._impl))
3012
3014 """
3015 A link over which messages are sent.
3016 """
3017
3019 pn_link_offered(self._impl, n)
3020
3022 """
3023 Send specified data as part of the current delivery
3024
3025 @type data: binary
3026 @param data: data to send
3027 """
3028 return self._check(pn_link_send(self._impl, data))
3029
3030 - def send(self, obj, tag=None):
3031 """
3032 Send specified object over this sender; the object is expected to
3033 have a send() method on it that takes the sender and an optional
3034 tag as arguments.
3035
3036 Where the object is a Message, this will send the message over
3037 this link, creating a new delivery for the purpose.
3038 """
3039 if hasattr(obj, 'send'):
3040 return obj.send(self, tag=tag)
3041 else:
3042
3043 return self.stream(obj)
3044
3046 if not hasattr(self, 'tag_generator'):
3047 def simple_tags():
3048 count = 1
3049 while True:
3050 yield str(count)
3051 count += 1
3052 self.tag_generator = simple_tags()
3053 return next(self.tag_generator)
3054
3056 """
3057 A link over which messages are received.
3058 """
3059
3060 - def flow(self, n):
3061 """Increases the credit issued to the remote sender by the specified number of messages."""
3062 pn_link_flow(self._impl, n)
3063
3064 - def recv(self, limit):
3065 n, binary = pn_link_recv(self._impl, limit)
3066 if n == PN_EOS:
3067 return None
3068 else:
3069 self._check(n)
3070 return binary
3071
3073 pn_link_drain(self._impl, n)
3074
3076 return pn_link_draining(self._impl)
3077
3079
3080 values = {}
3081
3083 ni = super(NamedInt, cls).__new__(cls, i)
3084 cls.values[i] = ni
3085 return ni
3086
3089
3092
3095
3096 @classmethod
3098 return cls.values.get(i, i)
3099
3102
3104
3105 RECEIVED = DispositionType(PN_RECEIVED, "RECEIVED")
3106 ACCEPTED = DispositionType(PN_ACCEPTED, "ACCEPTED")
3107 REJECTED = DispositionType(PN_REJECTED, "REJECTED")
3108 RELEASED = DispositionType(PN_RELEASED, "RELEASED")
3109 MODIFIED = DispositionType(PN_MODIFIED, "MODIFIED")
3110
3112 self._impl = impl
3113 self.local = local
3114 self._data = None
3115 self._condition = None
3116 self._annotations = None
3117
3118 @property
3120 return DispositionType.get(pn_disposition_type(self._impl))
3121
3123 return pn_disposition_get_section_number(self._impl)
3125 pn_disposition_set_section_number(self._impl, n)
3126 section_number = property(_get_section_number, _set_section_number)
3127
3129 return pn_disposition_get_section_offset(self._impl)
3131 pn_disposition_set_section_offset(self._impl, n)
3132 section_offset = property(_get_section_offset, _set_section_offset)
3133
3135 return pn_disposition_is_failed(self._impl)
3137 pn_disposition_set_failed(self._impl, b)
3138 failed = property(_get_failed, _set_failed)
3139
3141 return pn_disposition_is_undeliverable(self._impl)
3143 pn_disposition_set_undeliverable(self._impl, b)
3144 undeliverable = property(_get_undeliverable, _set_undeliverable)
3145
3147 if self.local:
3148 return self._data
3149 else:
3150 return dat2obj(pn_disposition_data(self._impl))
3152 if self.local:
3153 self._data = obj
3154 else:
3155 raise AttributeError("data attribute is read-only")
3156 data = property(_get_data, _set_data)
3157
3159 if self.local:
3160 return self._annotations
3161 else:
3162 return dat2obj(pn_disposition_annotations(self._impl))
3164 if self.local:
3165 self._annotations = obj
3166 else:
3167 raise AttributeError("annotations attribute is read-only")
3168 annotations = property(_get_annotations, _set_annotations)
3169
3171 if self.local:
3172 return self._condition
3173 else:
3174 return cond2obj(pn_disposition_condition(self._impl))
3176 if self.local:
3177 self._condition = obj
3178 else:
3179 raise AttributeError("condition attribute is read-only")
3180 condition = property(_get_condition, _set_condition)
3181
3183 """
3184 Tracks and/or records the delivery of a message over a link.
3185 """
3186
3187 RECEIVED = Disposition.RECEIVED
3188 ACCEPTED = Disposition.ACCEPTED
3189 REJECTED = Disposition.REJECTED
3190 RELEASED = Disposition.RELEASED
3191 MODIFIED = Disposition.MODIFIED
3192
3193 @staticmethod
3195 if impl is None:
3196 return None
3197 else:
3198 return Delivery(impl)
3199
3202
3204 self.local = Disposition(pn_delivery_local(self._impl), True)
3205 self.remote = Disposition(pn_delivery_remote(self._impl), False)
3206
3207 @property
3209 """The identifier for the delivery."""
3210 return pn_delivery_tag(self._impl)
3211
3212 @property
3214 """Returns true for an outgoing delivery to which data can now be written."""
3215 return pn_delivery_writable(self._impl)
3216
3217 @property
3219 """Returns true for an incoming delivery that has data to read."""
3220 return pn_delivery_readable(self._impl)
3221
3222 @property
3224 """Returns true if the state of the delivery has been updated
3225 (e.g. it has been settled and/or accepted, rejected etc)."""
3226 return pn_delivery_updated(self._impl)
3227
3229 """
3230 Set the local state of the delivery e.g. ACCEPTED, REJECTED, RELEASED.
3231 """
3232 obj2dat(self.local._data, pn_disposition_data(self.local._impl))
3233 obj2dat(self.local._annotations, pn_disposition_annotations(self.local._impl))
3234 obj2cond(self.local._condition, pn_disposition_condition(self.local._impl))
3235 pn_delivery_update(self._impl, state)
3236
3237 @property
3239 return pn_delivery_pending(self._impl)
3240
3241 @property
3243 """
3244 Returns true for an incoming delivery if not all the data is
3245 yet available.
3246 """
3247 return pn_delivery_partial(self._impl)
3248
3249 @property
3251 """Returns the local state of the delivery."""
3252 return DispositionType.get(pn_delivery_local_state(self._impl))
3253
3254 @property
3256 """
3257 Returns the state of the delivery as indicated by the remote
3258 peer.
3259 """
3260 return DispositionType.get(pn_delivery_remote_state(self._impl))
3261
3262 @property
3264 """
3265 Returns true if the delivery has been settled by the remote peer.
3266 """
3267 return pn_delivery_settled(self._impl)
3268
3270 """
3271 Settles the delivery locally. This indicates the application
3272 considers the delivery complete and does not wish to receive any
3273 further events about it. Every delivery should be settled locally.
3274 """
3275 pn_delivery_settle(self._impl)
3276
3277 @property
3279 """Returns true if the delivery has been aborted."""
3280 return pn_delivery_aborted(self._impl)
3281
3283 """
3284 Aborts the delivery. This indicates the application wishes to
3285 invalidate any data that may have already been sent on this delivery.
3286 The delivery cannot be aborted after it has been completely delivered.
3287 """
3288 pn_delivery_abort(self._impl)
3289
3290 @property
3293
3294 @property
3296 """
3297 Returns the link on which the delivery was sent or received.
3298 """
3299 return Link.wrap(pn_delivery_link(self._impl))
3300
3301 @property
3303 """
3304 Returns the session over which the delivery was sent or received.
3305 """
3306 return self.link.session
3307
3308 @property
3310 """
3311 Returns the connection over which the delivery was sent or received.
3312 """
3313 return self.session.connection
3314
3315 @property
3318
3321
3323
3326
3327 - def __call__(self, trans_impl, message):
3329
3331
3332 TRACE_OFF = PN_TRACE_OFF
3333 TRACE_DRV = PN_TRACE_DRV
3334 TRACE_FRM = PN_TRACE_FRM
3335 TRACE_RAW = PN_TRACE_RAW
3336
3337 CLIENT = 1
3338 SERVER = 2
3339
3340 @staticmethod
3342 if impl is None:
3343 return None
3344 else:
3345 return Transport(_impl=impl)
3346
3347 - def __init__(self, mode=None, _impl = pn_transport):
3355
3357 self._sasl = None
3358 self._ssl = None
3359
3361 if err < 0:
3362 exc = EXCEPTIONS.get(err, TransportException)
3363 raise exc("[%s]: %s" % (err, pn_error_text(pn_transport_error(self._impl))))
3364 else:
3365 return err
3366
3368 pn_transport_set_pytracer(self._impl, TraceAdapter(tracer));
3369
3371 adapter = pn_transport_get_pytracer(self._impl)
3372 if adapter:
3373 return adapter.tracer
3374 else:
3375 return None
3376
3377 tracer = property(_get_tracer, _set_tracer,
3378 doc="""
3379 A callback for trace logging. The callback is passed the transport and log message.
3380 """)
3381
3382 - def log(self, message):
3383 pn_transport_log(self._impl, message)
3384
3386 pn_transport_require_auth(self._impl, bool)
3387
3388 @property
3390 return pn_transport_is_authenticated(self._impl)
3391
3393 pn_transport_require_encryption(self._impl, bool)
3394
3395 @property
3397 return pn_transport_is_encrypted(self._impl)
3398
3399 @property
3401 return pn_transport_get_user(self._impl)
3402
3403 - def bind(self, connection):
3404 """Assign a connection to the transport"""
3405 self._check(pn_transport_bind(self._impl, connection._impl))
3406
3408 """Release the connection"""
3409 self._check(pn_transport_unbind(self._impl))
3410
3412 pn_transport_trace(self._impl, n)
3413
3414 - def tick(self, now):
3415 """Process any timed events (like heartbeat generation).
3416 now = seconds since epoch (float).
3417 """
3418 return millis2secs(pn_transport_tick(self._impl, secs2millis(now)))
3419
3421 c = pn_transport_capacity(self._impl)
3422 if c >= PN_EOS:
3423 return c
3424 else:
3425 return self._check(c)
3426
3427 - def push(self, binary):
3428 n = self._check(pn_transport_push(self._impl, binary))
3429 if n != len(binary):
3430 raise OverflowError("unable to process all bytes: %s, %s" % (n, len(binary)))
3431
3433 self._check(pn_transport_close_tail(self._impl))
3434
3436 p = pn_transport_pending(self._impl)
3437 if p >= PN_EOS:
3438 return p
3439 else:
3440 return self._check(p)
3441
3442 - def peek(self, size):
3443 cd, out = pn_transport_peek(self._impl, size)
3444 if cd == PN_EOS:
3445 return None
3446 else:
3447 self._check(cd)
3448 return out
3449
3450 - def pop(self, size):
3451 pn_transport_pop(self._impl, size)
3452
3454 self._check(pn_transport_close_head(self._impl))
3455
3456 @property
3458 return pn_transport_closed(self._impl)
3459
3460
3462 return pn_transport_get_max_frame(self._impl)
3463
3465 pn_transport_set_max_frame(self._impl, value)
3466
3467 max_frame_size = property(_get_max_frame_size, _set_max_frame_size,
3468 doc="""
3469 Sets the maximum size for received frames (in bytes).
3470 """)
3471
3472 @property
3474 return pn_transport_get_remote_max_frame(self._impl)
3475
3477 return pn_transport_get_channel_max(self._impl)
3478
3480 if pn_transport_set_channel_max(self._impl, value):
3481 raise SessionException("Too late to change channel max.")
3482
3483 channel_max = property(_get_channel_max, _set_channel_max,
3484 doc="""
3485 Sets the maximum channel that may be used on the transport.
3486 """)
3487
3488 @property
3490 return pn_transport_remote_channel_max(self._impl)
3491
3492
3494 return millis2secs(pn_transport_get_idle_timeout(self._impl))
3495
3497 pn_transport_set_idle_timeout(self._impl, secs2millis(sec))
3498
3499 idle_timeout = property(_get_idle_timeout, _set_idle_timeout,
3500 doc="""
3501 The idle timeout of the connection (float, in seconds).
3502 """)
3503
3504 @property
3506 return millis2secs(pn_transport_get_remote_idle_timeout(self._impl))
3507
3508 @property
3510 return pn_transport_get_frames_output(self._impl)
3511
3512 @property
3515
3518
3519 - def ssl(self, domain=None, session_details=None):
3520
3521 if not self._ssl:
3522 self._ssl = SSL(self, domain, session_details)
3523 return self._ssl
3524
3525 @property
3527 return cond2obj(pn_transport_condition(self._impl))
3528
3529 @property
3532
3535
3536 -class SASL(Wrapper):
3537
3538 OK = PN_SASL_OK
3539 AUTH = PN_SASL_AUTH
3540 SYS = PN_SASL_SYS
3541 PERM = PN_SASL_PERM
3542 TEMP = PN_SASL_TEMP
3543
3544 @staticmethod
3546 return pn_sasl_extended()
3547
3551
3553 if err < 0:
3554 exc = EXCEPTIONS.get(err, SASLException)
3555 raise exc("[%s]" % (err))
3556 else:
3557 return err
3558
3559 @property
3561 return pn_sasl_get_user(self._sasl)
3562
3563 @property
3565 return pn_sasl_get_mech(self._sasl)
3566
3567 @property
3569 outcome = pn_sasl_outcome(self._sasl)
3570 if outcome == PN_SASL_NONE:
3571 return None
3572 else:
3573 return outcome
3574
3576 pn_sasl_allowed_mechs(self._sasl, unicode2utf8(mechs))
3577
3579 return pn_sasl_get_allow_insecure_mechs(self._sasl)
3580
3582 pn_sasl_set_allow_insecure_mechs(self._sasl, insecure)
3583
3584 allow_insecure_mechs = property(_get_allow_insecure_mechs, _set_allow_insecure_mechs,
3585 doc="""
3586 Allow unencrypted cleartext passwords (PLAIN mech)
3587 """)
3588
3589 - def done(self, outcome):
3590 pn_sasl_done(self._sasl, outcome)
3591
3593 pn_sasl_config_name(self._sasl, name)
3594
3596 pn_sasl_config_path(self._sasl, path)
3597
3600
3603
3604 -class SSLDomain(object):
3605
3606 MODE_CLIENT = PN_SSL_MODE_CLIENT
3607 MODE_SERVER = PN_SSL_MODE_SERVER
3608 VERIFY_PEER = PN_SSL_VERIFY_PEER
3609 VERIFY_PEER_NAME = PN_SSL_VERIFY_PEER_NAME
3610 ANONYMOUS_PEER = PN_SSL_ANONYMOUS_PEER
3611
3612 - def __init__(self, mode):
3613 self._domain = pn_ssl_domain(mode)
3614 if self._domain is None:
3615 raise SSLUnavailable()
3616
3617 - def _check(self, err):
3618 if err < 0:
3619 exc = EXCEPTIONS.get(err, SSLException)
3620 raise exc("SSL failure.")
3621 else:
3622 return err
3623
3624 - def set_credentials(self, cert_file, key_file, password):
3625 return self._check( pn_ssl_domain_set_credentials(self._domain,
3626 cert_file, key_file,
3627 password) )
3628 - def set_trusted_ca_db(self, certificate_db):
3629 return self._check( pn_ssl_domain_set_trusted_ca_db(self._domain,
3630 certificate_db) )
3631 - def set_peer_authentication(self, verify_mode, trusted_CAs=None):
3632 return self._check( pn_ssl_domain_set_peer_authentication(self._domain,
3633 verify_mode,
3634 trusted_CAs) )
3635
3637 return self._check( pn_ssl_domain_allow_unsecured_client(self._domain) )
3638
3639 - def __del__(self):
3640 pn_ssl_domain_free(self._domain)
3641
3643
3644 @staticmethod
3646 return pn_ssl_present()
3647
3654
3655 - def __new__(cls, transport, domain, session_details=None):
3656 """Enforce a singleton SSL object per Transport"""
3657 if transport._ssl:
3658
3659
3660
3661 ssl = transport._ssl
3662 if (domain and (ssl._domain is not domain) or
3663 session_details and (ssl._session_details is not session_details)):
3664 raise SSLException("Cannot re-configure existing SSL object!")
3665 else:
3666 obj = super(SSL, cls).__new__(cls)
3667 obj._domain = domain
3668 obj._session_details = session_details
3669 session_id = None
3670 if session_details:
3671 session_id = session_details.get_session_id()
3672 obj._ssl = pn_ssl( transport._impl )
3673 if obj._ssl is None:
3674 raise SSLUnavailable()
3675 if domain:
3676 pn_ssl_init( obj._ssl, domain._domain, session_id )
3677 transport._ssl = obj
3678 return transport._ssl
3679
3681 rc, name = pn_ssl_get_cipher_name( self._ssl, 128 )
3682 if rc:
3683 return name
3684 return None
3685
3687 rc, name = pn_ssl_get_protocol_name( self._ssl, 128 )
3688 if rc:
3689 return name
3690 return None
3691
3692 SHA1 = PN_SSL_SHA1
3693 SHA256 = PN_SSL_SHA256
3694 SHA512 = PN_SSL_SHA512
3695 MD5 = PN_SSL_MD5
3696
3697 CERT_COUNTRY_NAME = PN_SSL_CERT_SUBJECT_COUNTRY_NAME
3698 CERT_STATE_OR_PROVINCE = PN_SSL_CERT_SUBJECT_STATE_OR_PROVINCE
3699 CERT_CITY_OR_LOCALITY = PN_SSL_CERT_SUBJECT_CITY_OR_LOCALITY
3700 CERT_ORGANIZATION_NAME = PN_SSL_CERT_SUBJECT_ORGANIZATION_NAME
3701 CERT_ORGANIZATION_UNIT = PN_SSL_CERT_SUBJECT_ORGANIZATION_UNIT
3702 CERT_COMMON_NAME = PN_SSL_CERT_SUBJECT_COMMON_NAME
3703
3705 subfield_value = pn_ssl_get_remote_subject_subfield(self._ssl, subfield_name)
3706 return subfield_value
3707
3709 subject = pn_ssl_get_remote_subject(self._ssl)
3710 return subject
3711
3715
3716
3719
3722
3725
3728
3731
3734
3736 rc, fingerprint_str = pn_ssl_get_cert_fingerprint(self._ssl, fingerprint_length, digest_name)
3737 if rc == PN_OK:
3738 return fingerprint_str
3739 return None
3740
3741
3744
3747
3751
3755
3758
3759 @property
3761 return pn_ssl_get_remote_subject( self._ssl )
3762
3763 RESUME_UNKNOWN = PN_SSL_RESUME_UNKNOWN
3764 RESUME_NEW = PN_SSL_RESUME_NEW
3765 RESUME_REUSED = PN_SSL_RESUME_REUSED
3766
3768 return pn_ssl_resume_status( self._ssl )
3769
3771 self._check(pn_ssl_set_peer_hostname( self._ssl, unicode2utf8(hostname) ))
3773 err, name = pn_ssl_get_peer_hostname( self._ssl, 1024 )
3774 self._check(err)
3775 return utf82unicode(name)
3776 peer_hostname = property(_get_peer_hostname, _set_peer_hostname,
3777 doc="""
3778 Manage the expected name of the remote peer. Used to authenticate the remote.
3779 """)
3780
3783 """ Unique identifier for the SSL session. Used to resume previous session on a new
3784 SSL connection.
3785 """
3786
3788 self._session_id = session_id
3789
3791 return self._session_id
3792
3793
3794 wrappers = {
3795 "pn_void": lambda x: pn_void2py(x),
3796 "pn_pyref": lambda x: pn_void2py(x),
3797 "pn_connection": lambda x: Connection.wrap(pn_cast_pn_connection(x)),
3798 "pn_session": lambda x: Session.wrap(pn_cast_pn_session(x)),
3799 "pn_link": lambda x: Link.wrap(pn_cast_pn_link(x)),
3800 "pn_delivery": lambda x: Delivery.wrap(pn_cast_pn_delivery(x)),
3801 "pn_transport": lambda x: Transport.wrap(pn_cast_pn_transport(x)),
3802 "pn_selectable": lambda x: Selectable.wrap(pn_cast_pn_selectable(x))
3803 }
3806
3808 self._impl = pn_collector()
3809
3810 - def put(self, obj, etype):
3811 pn_collector_put(self._impl, PN_PYREF, pn_py2void(obj), etype.number)
3812
3814 return Event.wrap(pn_collector_peek(self._impl))
3815
3817 ev = self.peek()
3818 pn_collector_pop(self._impl)
3819
3821 pn_collector_free(self._impl)
3822 del self._impl
3823
3824 if "TypeExtender" not in globals():
3827 self.number = number
3829 try:
3830 return self.number
3831 finally:
3832 self.number += 1
3833
3835
3836 _lock = threading.Lock()
3837 _extended = TypeExtender(10000)
3838 TYPES = {}
3839
3840 - def __init__(self, name=None, number=None, method=None):
3841 if name is None and number is None:
3842 raise TypeError("extended events require a name")
3843 try:
3844 self._lock.acquire()
3845 if name is None:
3846 name = pn_event_type_name(number)
3847
3848 if number is None:
3849 number = self._extended.next()
3850
3851 if method is None:
3852 method = "on_%s" % name
3853
3854 self.name = name
3855 self.number = number
3856 self.method = method
3857
3858 self.TYPES[number] = self
3859 finally:
3860 self._lock.release()
3861
3864
3871
3873
3874 - def __init__(self, clazz, context, type):
3878
3881
3882 -def _none(x): return None
3883
3884 DELEGATED = Constant("DELEGATED")
3885
3886 -def _core(number, method):
3887 return EventType(number=number, method=method)
3888
3889 -class Event(Wrapper, EventBase):
3890
3891 REACTOR_INIT = _core(PN_REACTOR_INIT, "on_reactor_init")
3892 REACTOR_QUIESCED = _core(PN_REACTOR_QUIESCED, "on_reactor_quiesced")
3893 REACTOR_FINAL = _core(PN_REACTOR_FINAL, "on_reactor_final")
3894
3895 TIMER_TASK = _core(PN_TIMER_TASK, "on_timer_task")
3896
3897 CONNECTION_INIT = _core(PN_CONNECTION_INIT, "on_connection_init")
3898 CONNECTION_BOUND = _core(PN_CONNECTION_BOUND, "on_connection_bound")
3899 CONNECTION_UNBOUND = _core(PN_CONNECTION_UNBOUND, "on_connection_unbound")
3900 CONNECTION_LOCAL_OPEN = _core(PN_CONNECTION_LOCAL_OPEN, "on_connection_local_open")
3901 CONNECTION_LOCAL_CLOSE = _core(PN_CONNECTION_LOCAL_CLOSE, "on_connection_local_close")
3902 CONNECTION_REMOTE_OPEN = _core(PN_CONNECTION_REMOTE_OPEN, "on_connection_remote_open")
3903 CONNECTION_REMOTE_CLOSE = _core(PN_CONNECTION_REMOTE_CLOSE, "on_connection_remote_close")
3904 CONNECTION_FINAL = _core(PN_CONNECTION_FINAL, "on_connection_final")
3905
3906 SESSION_INIT = _core(PN_SESSION_INIT, "on_session_init")
3907 SESSION_LOCAL_OPEN = _core(PN_SESSION_LOCAL_OPEN, "on_session_local_open")
3908 SESSION_LOCAL_CLOSE = _core(PN_SESSION_LOCAL_CLOSE, "on_session_local_close")
3909 SESSION_REMOTE_OPEN = _core(PN_SESSION_REMOTE_OPEN, "on_session_remote_open")
3910 SESSION_REMOTE_CLOSE = _core(PN_SESSION_REMOTE_CLOSE, "on_session_remote_close")
3911 SESSION_FINAL = _core(PN_SESSION_FINAL, "on_session_final")
3912
3913 LINK_INIT = _core(PN_LINK_INIT, "on_link_init")
3914 LINK_LOCAL_OPEN = _core(PN_LINK_LOCAL_OPEN, "on_link_local_open")
3915 LINK_LOCAL_CLOSE = _core(PN_LINK_LOCAL_CLOSE, "on_link_local_close")
3916 LINK_LOCAL_DETACH = _core(PN_LINK_LOCAL_DETACH, "on_link_local_detach")
3917 LINK_REMOTE_OPEN = _core(PN_LINK_REMOTE_OPEN, "on_link_remote_open")
3918 LINK_REMOTE_CLOSE = _core(PN_LINK_REMOTE_CLOSE, "on_link_remote_close")
3919 LINK_REMOTE_DETACH = _core(PN_LINK_REMOTE_DETACH, "on_link_remote_detach")
3920 LINK_FLOW = _core(PN_LINK_FLOW, "on_link_flow")
3921 LINK_FINAL = _core(PN_LINK_FINAL, "on_link_final")
3922
3923 DELIVERY = _core(PN_DELIVERY, "on_delivery")
3924
3925 TRANSPORT = _core(PN_TRANSPORT, "on_transport")
3926 TRANSPORT_ERROR = _core(PN_TRANSPORT_ERROR, "on_transport_error")
3927 TRANSPORT_HEAD_CLOSED = _core(PN_TRANSPORT_HEAD_CLOSED, "on_transport_head_closed")
3928 TRANSPORT_TAIL_CLOSED = _core(PN_TRANSPORT_TAIL_CLOSED, "on_transport_tail_closed")
3929 TRANSPORT_CLOSED = _core(PN_TRANSPORT_CLOSED, "on_transport_closed")
3930
3931 SELECTABLE_INIT = _core(PN_SELECTABLE_INIT, "on_selectable_init")
3932 SELECTABLE_UPDATED = _core(PN_SELECTABLE_UPDATED, "on_selectable_updated")
3933 SELECTABLE_READABLE = _core(PN_SELECTABLE_READABLE, "on_selectable_readable")
3934 SELECTABLE_WRITABLE = _core(PN_SELECTABLE_WRITABLE, "on_selectable_writable")
3935 SELECTABLE_EXPIRED = _core(PN_SELECTABLE_EXPIRED, "on_selectable_expired")
3936 SELECTABLE_ERROR = _core(PN_SELECTABLE_ERROR, "on_selectable_error")
3937 SELECTABLE_FINAL = _core(PN_SELECTABLE_FINAL, "on_selectable_final")
3938
3939 @staticmethod
3940 - def wrap(impl, number=None):
3941 if impl is None:
3942 return None
3943
3944 if number is None:
3945 number = pn_event_type(impl)
3946
3947 event = Event(impl, number)
3948
3949
3950
3951 if pn_event_class(impl) == PN_PYREF and \
3952 isinstance(event.context, EventBase):
3953 return event.context
3954 else:
3955 return event
3956
3960
3963
3967
3968 @property
3970 cls = pn_event_class(self._impl)
3971 if cls:
3972 return pn_class_name(cls)
3973 else:
3974 return None
3975
3976 @property
3978 return WrappedHandler.wrap(pn_event_root(self._impl))
3979
3980 @property
3981 - def context(self):
3982 """Returns the context object associated with the event. The type of this depend on the type of event."""
3983 return wrappers[self.clazz](pn_event_context(self._impl))
3984
3985 - def dispatch(self, handler, type=None):
3994
3995
3996 @property
3998 """Returns the reactor associated with the event."""
3999 return wrappers.get("pn_reactor", _none)(pn_event_reactor(self._impl))
4000
4002 r = self.reactor
4003 if r and hasattr(r, 'subclass') and r.subclass.__name__.lower() == name:
4004 return r
4005 else:
4006 return super(Event, self).__getattr__(name)
4007
4008 @property
4010 """Returns the transport associated with the event, or null if none is associated with it."""
4011 return Transport.wrap(pn_event_transport(self._impl))
4012
4013 @property
4015 """Returns the connection associated with the event, or null if none is associated with it."""
4016 return Connection.wrap(pn_event_connection(self._impl))
4017
4018 @property
4020 """Returns the session associated with the event, or null if none is associated with it."""
4021 return Session.wrap(pn_event_session(self._impl))
4022
4023 @property
4025 """Returns the link associated with the event, or null if none is associated with it."""
4026 return Link.wrap(pn_event_link(self._impl))
4027
4028 @property
4030 """Returns the sender link associated with the event, or null if
4031 none is associated with it. This is essentially an alias for
4032 link(), that does an additional checkon the type of the
4033 link."""
4034 l = self.link
4035 if l and l.is_sender:
4036 return l
4037 else:
4038 return None
4039
4040 @property
4042 """Returns the receiver link associated with the event, or null if
4043 none is associated with it. This is essentially an alias for
4044 link(), that does an additional checkon the type of the link."""
4045 l = self.link
4046 if l and l.is_receiver:
4047 return l
4048 else:
4049 return None
4050
4051 @property
4053 """Returns the delivery associated with the event, or null if none is associated with it."""
4054 return Delivery.wrap(pn_event_delivery(self._impl))
4055
4058
4061 if obj is None:
4062 return self
4063 ret = []
4064 obj.__dict__['handlers'] = ret
4065 return ret
4066
4072
4074
4075 - def __init__(self, handler, on_error=None):
4078
4082
4088
4091 self.handlers = []
4092 self.delegate = weakref.ref(delegate)
4093
4095 delegate = self.delegate()
4096 if delegate:
4097 dispatch(delegate, method, event)
4098
4102 if obj is None:
4103 return None
4104 return self.surrogate(obj).handlers
4105
4107 self.surrogate(obj).handlers = value
4108
4110 key = "_surrogate"
4111 objdict = obj.__dict__
4112 surrogate = objdict.get(key, None)
4113 if surrogate is None:
4114 objdict[key] = surrogate = WrappedHandlersChildSurrogate(obj)
4115 obj.add(surrogate)
4116 return surrogate
4117
4119
4120 handlers = WrappedHandlersProperty()
4121
4122 @classmethod
4123 - def wrap(cls, impl, on_error=None):
4130
4131 - def __init__(self, impl_or_constructor):
4132 Wrapper.__init__(self, impl_or_constructor)
4133 if list(self.__class__.__mro__).index(WrappedHandler) > 1:
4134
4135 self.handlers.extend([])
4136
4143
4144 - def add(self, handler, on_error=None):
4150
4152 pn_handler_clear(self._impl)
4153
4155 if obj is None:
4156 return None
4157 elif isinstance(obj, WrappedHandler):
4158 impl = obj._impl
4159 pn_incref(impl)
4160 return impl
4161 else:
4162 return pn_pyhandler(_cadapter(obj, on_error))
4163
4165 """
4166 Simple URL parser/constructor, handles URLs of the form:
4167
4168 <scheme>://<user>:<password>@<host>:<port>/<path>
4169
4170 All components can be None if not specified in the URL string.
4171
4172 The port can be specified as a service name, e.g. 'amqp' in the
4173 URL string but Url.port always gives the integer value.
4174
4175 Warning: The placement of user and password in URLs is not
4176 recommended. It can result in credentials leaking out in program
4177 logs. Use connection configuration attributes instead.
4178
4179 @ivar scheme: Url scheme e.g. 'amqp' or 'amqps'
4180 @ivar user: Username
4181 @ivar password: Password
4182 @ivar host: Host name, ipv6 literal or ipv4 dotted quad.
4183 @ivar port: Integer port.
4184 @ivar host_port: Returns host:port
4185 """
4186
4187 AMQPS = "amqps"
4188 AMQP = "amqp"
4189
4191 """An integer port number that can be constructed from a service name string"""
4192
4194 """@param value: integer port number or string service name."""
4195 port = super(Url.Port, cls).__new__(cls, cls._port_int(value))
4196 setattr(port, 'name', str(value))
4197 return port
4198
4199 - def __eq__(self, x): return str(self) == x or int(self) == x
4200 - def __ne__(self, x): return not self == x
4202
4203 @staticmethod
4205 """Convert service, an integer or a service name, into an integer port number."""
4206 try:
4207 return int(value)
4208 except ValueError:
4209 try:
4210 return socket.getservbyname(value)
4211 except socket.error:
4212
4213 if value == Url.AMQPS: return 5671
4214 elif value == Url.AMQP: return 5672
4215 else:
4216 raise ValueError("Not a valid port number or service name: '%s'" % value)
4217
4218 - def __init__(self, url=None, defaults=True, **kwargs):
4219 """
4220 @param url: URL string to parse.
4221 @param defaults: If true, fill in missing default values in the URL.
4222 If false, you can fill them in later by calling self.defaults()
4223 @param kwargs: scheme, user, password, host, port, path.
4224 If specified, replaces corresponding part in url string.
4225 """
4226 if url:
4227 self._url = pn_url_parse(unicode2utf8(str(url)))
4228 if not self._url: raise ValueError("Invalid URL '%s'" % url)
4229 else:
4230 self._url = pn_url()
4231 for k in kwargs:
4232 getattr(self, k)
4233 setattr(self, k, kwargs[k])
4234 if defaults: self.defaults()
4235
4238 self.getter = globals()["pn_url_get_%s" % part]
4239 self.setter = globals()["pn_url_set_%s" % part]
4240 - def __get__(self, obj, type=None): return self.getter(obj._url)
4241 - def __set__(self, obj, value): return self.setter(obj._url, str(value))
4242
4243 scheme = PartDescriptor('scheme')
4244 username = PartDescriptor('username')
4245 password = PartDescriptor('password')
4246 host = PartDescriptor('host')
4247 path = PartDescriptor('path')
4248
4250 portstr = pn_url_get_port(self._url)
4251 return portstr and Url.Port(portstr)
4252
4254 if value is None: pn_url_set_port(self._url, None)
4255 else: pn_url_set_port(self._url, str(Url.Port(value)))
4256
4257 port = property(_get_port, _set_port)
4258
4259 - def __str__(self): return pn_url_str(self._url)
4260
4263
4264 - def __eq__(self, x): return str(self) == str(x)
4265 - def __ne__(self, x): return not self == x
4266
4268 pn_url_free(self._url);
4269 del self._url
4270
4272 """
4273 Fill in missing values (scheme, host or port) with defaults
4274 @return: self
4275 """
4276 self.scheme = self.scheme or self.AMQP
4277 self.host = self.host or '0.0.0.0'
4278 self.port = self.port or self.Port(self.scheme)
4279 return self
4280
4281 __all__ = [
4282 "API_LANGUAGE",
4283 "IMPLEMENTATION_LANGUAGE",
4284 "ABORTED",
4285 "ACCEPTED",
4286 "AUTOMATIC",
4287 "PENDING",
4288 "MANUAL",
4289 "REJECTED",
4290 "RELEASED",
4291 "MODIFIED",
4292 "SETTLED",
4293 "UNDESCRIBED",
4294 "Array",
4295 "Collector",
4296 "Condition",
4297 "Connection",
4298 "Data",
4299 "Delivery",
4300 "Disposition",
4301 "Described",
4302 "Endpoint",
4303 "Event",
4304 "EventType",
4305 "Handler",
4306 "Link",
4307 "Message",
4308 "MessageException",
4309 "Messenger",
4310 "MessengerException",
4311 "ProtonException",
4312 "VERSION_MAJOR",
4313 "VERSION_MINOR",
4314 "Receiver",
4315 "SASL",
4316 "Sender",
4317 "Session",
4318 "SessionException",
4319 "SSL",
4320 "SSLDomain",
4321 "SSLSessionDetails",
4322 "SSLUnavailable",
4323 "SSLException",
4324 "Terminus",
4325 "Timeout",
4326 "Interrupt",
4327 "Transport",
4328 "TransportException",
4329 "Url",
4330 "char",
4331 "dispatch",
4332 "symbol",
4333 "timestamp",
4334 "ulong",
4335 "byte",
4336 "short",
4337 "int32",
4338 "ubyte",
4339 "ushort",
4340 "uint",
4341 "float32",
4342 "decimal32",
4343 "decimal64",
4344 "decimal128"
4345 ]
4346