Package proton
[frames] | no frames]

Source Code for Package proton

   1  # 
   2  # Licensed to the Apache Software Foundation (ASF) under one 
   3  # or more contributor license agreements.  See the NOTICE file 
   4  # distributed with this work for additional information 
   5  # regarding copyright ownership.  The ASF licenses this file 
   6  # to you under the Apache License, Version 2.0 (the 
   7  # "License"); you may not use this file except in compliance 
   8  # with the License.  You may obtain a copy of the License at 
   9  # 
  10  #   http://www.apache.org/licenses/LICENSE-2.0 
  11  # 
  12  # Unless required by applicable law or agreed to in writing, 
  13  # software distributed under the License is distributed on an 
  14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
  15  # KIND, either express or implied.  See the License for the 
  16  # specific language governing permissions and limitations 
  17  # under the License. 
  18  # 
  19   
  20  """ 
  21  The proton module defines a suite of APIs that implement the AMQP 1.0 
  22  protocol. 
  23   
  24  The proton APIs consist of the following classes: 
  25   
  26   - L{Messenger} -- A messaging endpoint. 
  27   - L{Message}   -- A class for creating and/or accessing AMQP message content. 
  28   - L{Data}      -- A class for creating and/or accessing arbitrary AMQP encoded 
  29                    data. 
  30   
  31  """ 
  32   
  33  from cproton import * 
  34  from wrapper import Wrapper 
  35   
  36  import weakref, socket, sys, threading 
  37  try: 
  38    import uuid 
39 40 - def generate_uuid():
41 return uuid.uuid4()
42 43 except ImportError: 44 """ 45 No 'native' UUID support. Provide a very basic UUID type that is a compatible subset of the uuid type provided by more modern python releases. 46 """ 47 import struct
48 - class uuid:
49 - class UUID:
50 - def __init__(self, hex=None, bytes=None):
51 if [hex, bytes].count(None) != 1: 52 raise TypeError("need one of hex or bytes") 53 if bytes is not None: 54 self.bytes = bytes 55 elif hex is not None: 56 fields=hex.split("-") 57 fields[4:5] = [fields[4][:4], fields[4][4:]] 58 self.bytes = struct.pack("!LHHHHL", *[int(x,16) for x in fields])
59
60 - def __cmp__(self, other):
61 if isinstance(other, uuid.UUID): 62 return cmp(self.bytes, other.bytes) 63 else: 64 return -1
65
66 - def __str__(self):
67 return "%08x-%04x-%04x-%04x-%04x%08x" % struct.unpack("!LHHHHL", self.bytes)
68
69 - def __repr__(self):
70 return "UUID(%r)" % str(self)
71
72 - def __hash__(self):
73 return self.bytes.__hash__()
74 75 import os, random, time 76 rand = random.Random() 77 rand.seed((os.getpid(), time.time(), socket.gethostname()))
78 - def random_uuid():
79 bytes = [rand.randint(0, 255) for i in xrange(16)] 80 81 # From RFC4122, the version bits are set to 0100 82 bytes[7] &= 0x0F 83 bytes[7] |= 0x40 84 85 # From RFC4122, the top two bits of byte 8 get set to 01 86 bytes[8] &= 0x3F 87 bytes[8] |= 0x80 88 return "".join(map(chr, bytes))
89
90 - def uuid4():
91 return uuid.UUID(bytes=random_uuid())
92
93 - def generate_uuid():
94 return uuid4()
95 96 try: 97 bytes() 98 except NameError: 99 bytes = str 100 101 VERSION_MAJOR = PN_VERSION_MAJOR 102 VERSION_MINOR = PN_VERSION_MINOR 103 API_LANGUAGE = "C" 104 IMPLEMENTATION_LANGUAGE = "C"
105 106 -class Constant(object):
107
108 - def __init__(self, name):
109 self.name = name
110
111 - def __repr__(self):
112 return self.name
113
114 -class ProtonException(Exception):
115 """ 116 The root of the proton exception hierarchy. All proton exception 117 classes derive from this exception. 118 """ 119 pass
120
121 -class Timeout(ProtonException):
122 """ 123 A timeout exception indicates that a blocking operation has timed 124 out. 125 """ 126 pass
127
128 -class Interrupt(ProtonException):
129 """ 130 An interrupt exception indicaes that a blocking operation was interrupted. 131 """ 132 pass
133
134 -class MessengerException(ProtonException):
135 """ 136 The root of the messenger exception hierarchy. All exceptions 137 generated by the messenger class derive from this exception. 138 """ 139 pass
140
141 -class MessageException(ProtonException):
142 """ 143 The MessageException class is the root of the message exception 144 hierarhcy. All exceptions generated by the Message class derive from 145 this exception. 146 """ 147 pass
148 149 EXCEPTIONS = { 150 PN_TIMEOUT: Timeout, 151 PN_INTR: Interrupt 152 } 153 154 PENDING = Constant("PENDING") 155 ACCEPTED = Constant("ACCEPTED") 156 REJECTED = Constant("REJECTED") 157 RELEASED = Constant("RELEASED") 158 MODIFIED = Constant("MODIFIED") 159 ABORTED = Constant("ABORTED") 160 SETTLED = Constant("SETTLED") 161 162 STATUSES = { 163 PN_STATUS_ABORTED: ABORTED, 164 PN_STATUS_ACCEPTED: ACCEPTED, 165 PN_STATUS_REJECTED: REJECTED, 166 PN_STATUS_RELEASED: RELEASED, 167 PN_STATUS_MODIFIED: MODIFIED, 168 PN_STATUS_PENDING: PENDING, 169 PN_STATUS_SETTLED: SETTLED, 170 PN_STATUS_UNKNOWN: None 171 } 172 173 AUTOMATIC = Constant("AUTOMATIC") 174 MANUAL = Constant("MANUAL")
175 176 -class Messenger(object):
177 """ 178 The L{Messenger} class defines a high level interface for sending 179 and receiving L{Messages<Message>}. Every L{Messenger} contains a 180 single logical queue of incoming messages and a single logical queue 181 of outgoing messages. These messages in these queues may be destined 182 for, or originate from, a variety of addresses. 183 184 The messenger interface is single-threaded. All methods 185 except one (L{interrupt}) are intended to be used from within 186 the messenger thread. 187 188 189 Address Syntax 190 ============== 191 192 An address has the following form:: 193 194 [ amqp[s]:// ] [user[:password]@] domain [/[name]] 195 196 Where domain can be one of:: 197 198 host | host:port | ip | ip:port | name 199 200 The following are valid examples of addresses: 201 202 - example.org 203 - example.org:1234 204 - amqp://example.org 205 - amqps://example.org 206 - example.org/incoming 207 - amqps://example.org/outgoing 208 - amqps://fred:trustno1@example.org 209 - 127.0.0.1:1234 210 - amqps://127.0.0.1:1234 211 212 Sending & Receiving Messages 213 ============================ 214 215 The L{Messenger} class works in conjuction with the L{Message} class. The 216 L{Message} class is a mutable holder of message content. 217 218 The L{put} method copies its L{Message} to the outgoing queue, and may 219 send queued messages if it can do so without blocking. The L{send} 220 method blocks until it has sent the requested number of messages, 221 or until a timeout interrupts the attempt. 222 223 224 >>> message = Message() 225 >>> for i in range(3): 226 ... message.address = "amqp://host/queue" 227 ... message.subject = "Hello World %i" % i 228 ... messenger.put(message) 229 >>> messenger.send() 230 231 Similarly, the L{recv} method receives messages into the incoming 232 queue, and may block as it attempts to receive the requested number 233 of messages, or until timeout is reached. It may receive fewer 234 than the requested number. The L{get} method pops the 235 eldest L{Message} off the incoming queue and copies it into the L{Message} 236 object that you supply. It will not block. 237 238 239 >>> message = Message() 240 >>> messenger.recv(10): 241 >>> while messenger.incoming > 0: 242 ... messenger.get(message) 243 ... print message.subject 244 Hello World 0 245 Hello World 1 246 Hello World 2 247 248 The blocking flag allows you to turn off blocking behavior entirely, 249 in which case L{send} and L{recv} will do whatever they can without 250 blocking, and then return. You can then look at the number 251 of incoming and outgoing messages to see how much outstanding work 252 still remains. 253 """ 254
255 - def __init__(self, name=None):
256 """ 257 Construct a new L{Messenger} with the given name. The name has 258 global scope. If a NULL name is supplied, a UUID based name will 259 be chosen. 260 261 @type name: string 262 @param name: the name of the messenger or None 263 264 """ 265 self._mng = pn_messenger(name) 266 self._selectables = {}
267
268 - def __del__(self):
269 """ 270 Destroy the L{Messenger}. This will close all connections that 271 are managed by the L{Messenger}. Call the L{stop} method before 272 destroying the L{Messenger}. 273 """ 274 if hasattr(self, "_mng"): 275 pn_messenger_free(self._mng) 276 del self._mng
277
278 - def _check(self, err):
279 if err < 0: 280 if (err == PN_INPROGRESS): 281 return 282 exc = EXCEPTIONS.get(err, MessengerException) 283 raise exc("[%s]: %s" % (err, pn_error_text(pn_messenger_error(self._mng)))) 284 else: 285 return err
286 287 @property
288 - def name(self):
289 """ 290 The name of the L{Messenger}. 291 """ 292 return pn_messenger_name(self._mng)
293
294 - def _get_certificate(self):
295 return pn_messenger_get_certificate(self._mng)
296
297 - def _set_certificate(self, value):
298 self._check(pn_messenger_set_certificate(self._mng, value))
299 300 certificate = property(_get_certificate, _set_certificate, 301 doc=""" 302 Path to a certificate file for the L{Messenger}. This certificate is 303 used when the L{Messenger} accepts or establishes SSL/TLS connections. 304 This property must be specified for the L{Messenger} to accept 305 incoming SSL/TLS connections and to establish client authenticated 306 outgoing SSL/TLS connection. Non client authenticated outgoing SSL/TLS 307 connections do not require this property. 308 """) 309
310 - def _get_private_key(self):
311 return pn_messenger_get_private_key(self._mng)
312
313 - def _set_private_key(self, value):
314 self._check(pn_messenger_set_private_key(self._mng, value))
315 316 private_key = property(_get_private_key, _set_private_key, 317 doc=""" 318 Path to a private key file for the L{Messenger's<Messenger>} 319 certificate. This property must be specified for the L{Messenger} to 320 accept incoming SSL/TLS connections and to establish client 321 authenticated outgoing SSL/TLS connection. Non client authenticated 322 SSL/TLS connections do not require this property. 323 """) 324
325 - def _get_password(self):
326 return pn_messenger_get_password(self._mng)
327
328 - def _set_password(self, value):
329 self._check(pn_messenger_set_password(self._mng, value))
330 331 password = property(_get_password, _set_password, 332 doc=""" 333 This property contains the password for the L{Messenger.private_key} 334 file, or None if the file is not encrypted. 335 """) 336
337 - def _get_trusted_certificates(self):
338 return pn_messenger_get_trusted_certificates(self._mng)
339
340 - def _set_trusted_certificates(self, value):
341 self._check(pn_messenger_set_trusted_certificates(self._mng, value))
342 343 trusted_certificates = property(_get_trusted_certificates, 344 _set_trusted_certificates, 345 doc=""" 346 A path to a database of trusted certificates for use in verifying the 347 peer on an SSL/TLS connection. If this property is None, then the peer 348 will not be verified. 349 """) 350
351 - def _get_timeout(self):
352 t = pn_messenger_get_timeout(self._mng) 353 if t == -1: 354 return None 355 else: 356 return millis2secs(t)
357
358 - def _set_timeout(self, value):
359 if value is None: 360 t = -1 361 else: 362 t = secs2millis(value) 363 self._check(pn_messenger_set_timeout(self._mng, t))
364 365 timeout = property(_get_timeout, _set_timeout, 366 doc=""" 367 The timeout property contains the default timeout for blocking 368 operations performed by the L{Messenger}. 369 """) 370
371 - def _is_blocking(self):
372 return pn_messenger_is_blocking(self._mng)
373
374 - def _set_blocking(self, b):
375 self._check(pn_messenger_set_blocking(self._mng, b))
376 377 blocking = property(_is_blocking, _set_blocking, 378 doc=""" 379 Enable or disable blocking behavior during L{Message} sending 380 and receiving. This affects every blocking call, with the 381 exception of L{work}. Currently, the affected calls are 382 L{send}, L{recv}, and L{stop}. 383 """) 384
385 - def _is_passive(self):
386 return pn_messenger_is_passive(self._mng)
387
388 - def _set_passive(self, b):
389 self._check(pn_messenger_set_passive(self._mng, b))
390 391 passive = property(_is_passive, _set_passive, 392 doc=""" 393 When passive is set to true, Messenger will not attempt to perform I/O 394 internally. In this mode it is necessary to use the selectables API to 395 drive any I/O needed to perform requested actions. In this mode 396 Messenger will never block. 397 """) 398
399 - def _get_incoming_window(self):
400 return pn_messenger_get_incoming_window(self._mng)
401
402 - def _set_incoming_window(self, window):
403 self._check(pn_messenger_set_incoming_window(self._mng, window))
404 405 incoming_window = property(_get_incoming_window, _set_incoming_window, 406 doc=""" 407 The incoming tracking window for the messenger. The messenger will 408 track the remote status of this many incoming deliveries after they 409 have been accepted or rejected. Defaults to zero. 410 411 L{Messages<Message>} enter this window only when you take them into your application 412 using L{get}. If your incoming window size is I{n}, and you get I{n}+1 L{messages<Message>} 413 without explicitly accepting or rejecting the oldest message, then the 414 message that passes beyond the edge of the incoming window will be assigned 415 the default disposition of its link. 416 """) 417
418 - def _get_outgoing_window(self):
419 return pn_messenger_get_outgoing_window(self._mng)
420
421 - def _set_outgoing_window(self, window):
422 self._check(pn_messenger_set_outgoing_window(self._mng, window))
423 424 outgoing_window = property(_get_outgoing_window, _set_outgoing_window, 425 doc=""" 426 The outgoing tracking window for the messenger. The messenger will 427 track the remote status of this many outgoing deliveries after calling 428 send. Defaults to zero. 429 430 A L{Message} enters this window when you call the put() method with the 431 message. If your outgoing window size is I{n}, and you call L{put} I{n}+1 432 times, status information will no longer be available for the 433 first message. 434 """) 435
436 - def start(self):
437 """ 438 Currently a no-op placeholder. 439 For future compatibility, do not L{send} or L{recv} messages 440 before starting the L{Messenger}. 441 """ 442 self._check(pn_messenger_start(self._mng))
443
444 - def stop(self):
445 """ 446 Transitions the L{Messenger} to an inactive state. An inactive 447 L{Messenger} will not send or receive messages from its internal 448 queues. A L{Messenger} should be stopped before being discarded to 449 ensure a clean shutdown handshake occurs on any internally managed 450 connections. 451 """ 452 self._check(pn_messenger_stop(self._mng))
453 454 @property
455 - def stopped(self):
456 """ 457 Returns true iff a L{Messenger} is in the stopped state. 458 This function does not block. 459 """ 460 return pn_messenger_stopped(self._mng)
461
462 - def subscribe(self, source):
463 """ 464 Subscribes the L{Messenger} to messages originating from the 465 specified source. The source is an address as specified in the 466 L{Messenger} introduction with the following addition. If the 467 domain portion of the address begins with the '~' character, the 468 L{Messenger} will interpret the domain as host/port, bind to it, 469 and listen for incoming messages. For example "~0.0.0.0", 470 "amqp://~0.0.0.0", and "amqps://~0.0.0.0" will all bind to any 471 local interface and listen for incoming messages with the last 472 variant only permitting incoming SSL connections. 473 474 @type source: string 475 @param source: the source of messages to subscribe to 476 """ 477 sub_impl = pn_messenger_subscribe(self._mng, source) 478 if not sub_impl: 479 self._check(pn_error_code(pn_messenger_error(self._mng))) 480 raise MessengerException("Cannot subscribe to %s"%source) 481 return Subscription(sub_impl)
482
483 - def put(self, message):
484 """ 485 Places the content contained in the message onto the outgoing 486 queue of the L{Messenger}. This method will never block, however 487 it will send any unblocked L{Messages<Message>} in the outgoing 488 queue immediately and leave any blocked L{Messages<Message>} 489 remaining in the outgoing queue. The L{send} call may be used to 490 block until the outgoing queue is empty. The L{outgoing} property 491 may be used to check the depth of the outgoing queue. 492 493 When the content in a given L{Message} object is copied to the outgoing 494 message queue, you may then modify or discard the L{Message} object 495 without having any impact on the content in the outgoing queue. 496 497 This method returns an outgoing tracker for the L{Message}. The tracker 498 can be used to determine the delivery status of the L{Message}. 499 500 @type message: Message 501 @param message: the message to place in the outgoing queue 502 @return: a tracker 503 """ 504 message._pre_encode() 505 self._check(pn_messenger_put(self._mng, message._msg)) 506 return pn_messenger_outgoing_tracker(self._mng)
507
508 - def status(self, tracker):
509 """ 510 Gets the last known remote state of the delivery associated with 511 the given tracker. 512 513 @type tracker: tracker 514 @param tracker: the tracker whose status is to be retrieved 515 516 @return: one of None, PENDING, REJECTED, MODIFIED, or ACCEPTED 517 """ 518 disp = pn_messenger_status(self._mng, tracker); 519 return STATUSES.get(disp, disp)
520
521 - def buffered(self, tracker):
522 """ 523 Checks if the delivery associated with the given tracker is still 524 waiting to be sent. 525 526 @type tracker: tracker 527 @param tracker: the tracker whose status is to be retrieved 528 529 @return: true if delivery is still buffered 530 """ 531 return pn_messenger_buffered(self._mng, tracker);
532
533 - def settle(self, tracker=None):
534 """ 535 Frees a L{Messenger} from tracking the status associated with a given 536 tracker. If you don't supply a tracker, all outgoing L{messages<Message>} up 537 to the most recent will be settled. 538 """ 539 if tracker is None: 540 tracker = pn_messenger_outgoing_tracker(self._mng) 541 flags = PN_CUMULATIVE 542 else: 543 flags = 0 544 self._check(pn_messenger_settle(self._mng, tracker, flags))
545
546 - def send(self, n=-1):
547 """ 548 This call will block until the indicated number of L{messages<Message>} 549 have been sent, or until the operation times out. If n is -1 this call will 550 block until all outgoing L{messages<Message>} have been sent. If n is 0 then 551 this call will send whatever it can without blocking. 552 """ 553 self._check(pn_messenger_send(self._mng, n))
554
555 - def recv(self, n=None):
556 """ 557 Receives up to I{n} L{messages<Message>} into the incoming queue. If no value 558 for I{n} is supplied, this call will receive as many L{messages<Message>} as it 559 can buffer internally. If the L{Messenger} is in blocking mode, this 560 call will block until at least one L{Message} is available in the 561 incoming queue. 562 """ 563 if n is None: 564 n = -1 565 self._check(pn_messenger_recv(self._mng, n))
566
567 - def work(self, timeout=None):
568 """ 569 Sends or receives any outstanding L{messages<Message>} queued for a L{Messenger}. 570 This will block for the indicated timeout. 571 This method may also do I/O work other than sending and receiving 572 L{messages<Message>}. For example, closing connections after messenger.L{stop}() 573 has been called. 574 """ 575 if timeout is None: 576 t = -1 577 else: 578 t = secs2millis(timeout) 579 err = pn_messenger_work(self._mng, t) 580 if (err == PN_TIMEOUT): 581 return False 582 else: 583 self._check(err) 584 return True
585 586 @property
587 - def receiving(self):
588 return pn_messenger_receiving(self._mng)
589
590 - def interrupt(self):
591 """ 592 The L{Messenger} interface is single-threaded. 593 This is the only L{Messenger} function intended to be called 594 from outside of the L{Messenger} thread. 595 Call this from a non-messenger thread to interrupt 596 a L{Messenger} that is blocking. 597 This will cause any in-progress blocking call to throw 598 the L{Interrupt} exception. If there is no currently blocking 599 call, then the next blocking call will be affected, even if it 600 is within the same thread that interrupt was called from. 601 """ 602 self._check(pn_messenger_interrupt(self._mng))
603
604 - def get(self, message=None):
605 """ 606 Moves the message from the head of the incoming message queue into 607 the supplied message object. Any content in the message will be 608 overwritten. 609 610 A tracker for the incoming L{Message} is returned. The tracker can 611 later be used to communicate your acceptance or rejection of the 612 L{Message}. 613 614 If None is passed in for the L{Message} object, the L{Message} 615 popped from the head of the queue is discarded. 616 617 @type message: Message 618 @param message: the destination message object 619 @return: a tracker 620 """ 621 if message is None: 622 impl = None 623 else: 624 impl = message._msg 625 self._check(pn_messenger_get(self._mng, impl)) 626 if message is not None: 627 message._post_decode() 628 return pn_messenger_incoming_tracker(self._mng)
629
630 - def accept(self, tracker=None):
631 """ 632 Signal the sender that you have acted on the L{Message} 633 pointed to by the tracker. If no tracker is supplied, 634 then all messages that have been returned by the L{get} 635 method are accepted, except those that have already been 636 auto-settled by passing beyond your incoming window size. 637 638 @type tracker: tracker 639 @param tracker: a tracker as returned by get 640 """ 641 if tracker is None: 642 tracker = pn_messenger_incoming_tracker(self._mng) 643 flags = PN_CUMULATIVE 644 else: 645 flags = 0 646 self._check(pn_messenger_accept(self._mng, tracker, flags))
647
648 - def reject(self, tracker=None):
649 """ 650 Rejects the L{Message} indicated by the tracker. If no tracker 651 is supplied, all messages that have been returned by the L{get} 652 method are rejected, except those that have already been auto-settled 653 by passing beyond your outgoing window size. 654 655 @type tracker: tracker 656 @param tracker: a tracker as returned by get 657 """ 658 if tracker is None: 659 tracker = pn_messenger_incoming_tracker(self._mng) 660 flags = PN_CUMULATIVE 661 else: 662 flags = 0 663 self._check(pn_messenger_reject(self._mng, tracker, flags))
664 665 @property
666 - def outgoing(self):
667 """ 668 The outgoing queue depth. 669 """ 670 return pn_messenger_outgoing(self._mng)
671 672 @property
673 - def incoming(self):
674 """ 675 The incoming queue depth. 676 """ 677 return pn_messenger_incoming(self._mng)
678
679 - def route(self, pattern, address):
680 """ 681 Adds a routing rule to a L{Messenger's<Messenger>} internal routing table. 682 683 The route procedure may be used to influence how a L{Messenger} will 684 internally treat a given address or class of addresses. Every call 685 to the route procedure will result in L{Messenger} appending a routing 686 rule to its internal routing table. 687 688 Whenever a L{Message} is presented to a L{Messenger} for delivery, it 689 will match the address of this message against the set of routing 690 rules in order. The first rule to match will be triggered, and 691 instead of routing based on the address presented in the message, 692 the L{Messenger} will route based on the address supplied in the rule. 693 694 The pattern matching syntax supports two types of matches, a '%' 695 will match any character except a '/', and a '*' will match any 696 character including a '/'. 697 698 A routing address is specified as a normal AMQP address, however it 699 may additionally use substitution variables from the pattern match 700 that triggered the rule. 701 702 Any message sent to "foo" will be routed to "amqp://foo.com": 703 704 >>> messenger.route("foo", "amqp://foo.com"); 705 706 Any message sent to "foobar" will be routed to 707 "amqp://foo.com/bar": 708 709 >>> messenger.route("foobar", "amqp://foo.com/bar"); 710 711 Any message sent to bar/<path> will be routed to the corresponding 712 path within the amqp://bar.com domain: 713 714 >>> messenger.route("bar/*", "amqp://bar.com/$1"); 715 716 Route all L{messages<Message>} over TLS: 717 718 >>> messenger.route("amqp:*", "amqps:$1") 719 720 Supply credentials for foo.com: 721 722 >>> messenger.route("amqp://foo.com/*", "amqp://user:password@foo.com/$1"); 723 724 Supply credentials for all domains: 725 726 >>> messenger.route("amqp://*", "amqp://user:password@$1"); 727 728 Route all addresses through a single proxy while preserving the 729 original destination: 730 731 >>> messenger.route("amqp://%/*", "amqp://user:password@proxy/$1/$2"); 732 733 Route any address through a single broker: 734 735 >>> messenger.route("*", "amqp://user:password@broker/$1"); 736 """ 737 self._check(pn_messenger_route(self._mng, unicode2utf8(pattern), unicode2utf8(address)))
738
739 - def rewrite(self, pattern, address):
740 """ 741 Similar to route(), except that the destination of 742 the L{Message} is determined before the message address is rewritten. 743 744 The outgoing address is only rewritten after routing has been 745 finalized. If a message has an outgoing address of 746 "amqp://0.0.0.0:5678", and a rewriting rule that changes its 747 outgoing address to "foo", it will still arrive at the peer that 748 is listening on "amqp://0.0.0.0:5678", but when it arrives there, 749 the receiver will see its outgoing address as "foo". 750 751 The default rewrite rule removes username and password from addresses 752 before they are transmitted. 753 """ 754 self._check(pn_messenger_rewrite(self._mng, unicode2utf8(pattern), unicode2utf8(address)))
755
756 - def selectable(self):
757 return Selectable.wrap(pn_messenger_selectable(self._mng))
758 759 @property
760 - def deadline(self):
761 tstamp = pn_messenger_deadline(self._mng) 762 if tstamp: 763 return millis2secs(tstamp) 764 else: 765 return None
766
767 -class Message(object):
768 """The L{Message} class is a mutable holder of message content. 769 770 @ivar instructions: delivery instructions for the message 771 @type instructions: dict 772 @ivar annotations: infrastructure defined message annotations 773 @type annotations: dict 774 @ivar properties: application defined message properties 775 @type properties: dict 776 @ivar body: message body 777 @type body: bytes | unicode | dict | list | int | long | float | UUID 778 """ 779 780 DEFAULT_PRIORITY = PN_DEFAULT_PRIORITY 781
782 - def __init__(self, body=None, **kwargs):
783 """ 784 @param kwargs: Message property name/value pairs to initialise the Message 785 """ 786 self._msg = pn_message() 787 self._id = Data(pn_message_id(self._msg)) 788 self._correlation_id = Data(pn_message_correlation_id(self._msg)) 789 self.instructions = None 790 self.annotations = None 791 self.properties = None 792 self.body = body 793 for k,v in kwargs.iteritems(): 794 getattr(self, k) # Raise exception if it's not a valid attribute. 795 setattr(self, k, v)
796
797 - def __del__(self):
798 if hasattr(self, "_msg"): 799 pn_message_free(self._msg) 800 del self._msg
801
802 - def _check(self, err):
803 if err < 0: 804 exc = EXCEPTIONS.get(err, MessageException) 805 raise exc("[%s]: %s" % (err, pn_error_text(pn_message_error(self._msg)))) 806 else: 807 return err
808
809 - def _pre_encode(self):
810 inst = Data(pn_message_instructions(self._msg)) 811 ann = Data(pn_message_annotations(self._msg)) 812 props = Data(pn_message_properties(self._msg)) 813 body = Data(pn_message_body(self._msg)) 814 815 inst.clear() 816 if self.instructions is not None: 817 inst.put_object(self.instructions) 818 ann.clear() 819 if self.annotations is not None: 820 ann.put_object(self.annotations) 821 props.clear() 822 if self.properties is not None: 823 props.put_object(self.properties) 824 body.clear() 825 if self.body is not None: 826 body.put_object(self.body)
827
828 - def _post_decode(self):
829 inst = Data(pn_message_instructions(self._msg)) 830 ann = Data(pn_message_annotations(self._msg)) 831 props = Data(pn_message_properties(self._msg)) 832 body = Data(pn_message_body(self._msg)) 833 834 if inst.next(): 835 self.instructions = inst.get_object() 836 else: 837 self.instructions = None 838 if ann.next(): 839 self.annotations = ann.get_object() 840 else: 841 self.annotations = None 842 if props.next(): 843 self.properties = props.get_object() 844 else: 845 self.properties = None 846 if body.next(): 847 self.body = body.get_object() 848 else: 849 self.body = None
850
851 - def clear(self):
852 """ 853 Clears the contents of the L{Message}. All fields will be reset to 854 their default values. 855 """ 856 pn_message_clear(self._msg) 857 self.instructions = None 858 self.annotations = None 859 self.properties = None 860 self.body = None
861
862 - def _is_inferred(self):
863 return pn_message_is_inferred(self._msg)
864
865 - def _set_inferred(self, value):
866 self._check(pn_message_set_inferred(self._msg, bool(value)))
867 868 inferred = property(_is_inferred, _set_inferred, doc=""" 869 The inferred flag for a message indicates how the message content 870 is encoded into AMQP sections. If inferred is true then binary and 871 list values in the body of the message will be encoded as AMQP DATA 872 and AMQP SEQUENCE sections, respectively. If inferred is false, 873 then all values in the body of the message will be encoded as AMQP 874 VALUE sections regardless of their type. 875 """) 876
877 - def _is_durable(self):
878 return pn_message_is_durable(self._msg)
879
880 - def _set_durable(self, value):
881 self._check(pn_message_set_durable(self._msg, bool(value)))
882 883 durable = property(_is_durable, _set_durable, 884 doc=""" 885 The durable property indicates that the message should be held durably 886 by any intermediaries taking responsibility for the message. 887 """) 888
889 - def _get_priority(self):
890 return pn_message_get_priority(self._msg)
891
892 - def _set_priority(self, value):
893 self._check(pn_message_set_priority(self._msg, value))
894 895 priority = property(_get_priority, _set_priority, 896 doc=""" 897 The priority of the message. 898 """) 899
900 - def _get_ttl(self):
901 return millis2secs(pn_message_get_ttl(self._msg))
902
903 - def _set_ttl(self, value):
904 self._check(pn_message_set_ttl(self._msg, secs2millis(value)))
905 906 ttl = property(_get_ttl, _set_ttl, 907 doc=""" 908 The time to live of the message measured in seconds. Expired messages 909 may be dropped. 910 """) 911
912 - def _is_first_acquirer(self):
913 return pn_message_is_first_acquirer(self._msg)
914
915 - def _set_first_acquirer(self, value):
916 self._check(pn_message_set_first_acquirer(self._msg, bool(value)))
917 918 first_acquirer = property(_is_first_acquirer, _set_first_acquirer, 919 doc=""" 920 True iff the recipient is the first to acquire the message. 921 """) 922
923 - def _get_delivery_count(self):
924 return pn_message_get_delivery_count(self._msg)
925
926 - def _set_delivery_count(self, value):
927 self._check(pn_message_set_delivery_count(self._msg, value))
928 929 delivery_count = property(_get_delivery_count, _set_delivery_count, 930 doc=""" 931 The number of delivery attempts made for this message. 932 """) 933 934
935 - def _get_id(self):
936 return self._id.get_object()
937 - def _set_id(self, value):
938 if type(value) in (int, long): 939 value = ulong(value) 940 self._id.rewind() 941 self._id.put_object(value)
942 id = property(_get_id, _set_id, 943 doc=""" 944 The id of the message. 945 """) 946
947 - def _get_user_id(self):
948 return pn_message_get_user_id(self._msg)
949
950 - def _set_user_id(self, value):
951 self._check(pn_message_set_user_id(self._msg, value))
952 953 user_id = property(_get_user_id, _set_user_id, 954 doc=""" 955 The user id of the message creator. 956 """) 957
958 - def _get_address(self):
959 return utf82unicode(pn_message_get_address(self._msg))
960
961 - def _set_address(self, value):
962 self._check(pn_message_set_address(self._msg, unicode2utf8(value)))
963 964 address = property(_get_address, _set_address, 965 doc=""" 966 The address of the message. 967 """) 968
969 - def _get_subject(self):
970 return pn_message_get_subject(self._msg)
971
972 - def _set_subject(self, value):
973 self._check(pn_message_set_subject(self._msg, value))
974 975 subject = property(_get_subject, _set_subject, 976 doc=""" 977 The subject of the message. 978 """) 979
980 - def _get_reply_to(self):
981 return utf82unicode(pn_message_get_reply_to(self._msg))
982
983 - def _set_reply_to(self, value):
984 self._check(pn_message_set_reply_to(self._msg, unicode2utf8(value)))
985 986 reply_to = property(_get_reply_to, _set_reply_to, 987 doc=""" 988 The reply-to address for the message. 989 """) 990
991 - def _get_correlation_id(self):
992 return self._correlation_id.get_object()
993 - def _set_correlation_id(self, value):
994 if type(value) in (int, long): 995 value = ulong(value) 996 self._correlation_id.rewind() 997 self._correlation_id.put_object(value)
998 999 correlation_id = property(_get_correlation_id, _set_correlation_id, 1000 doc=""" 1001 The correlation-id for the message. 1002 """) 1003
1004 - def _get_content_type(self):
1005 return pn_message_get_content_type(self._msg)
1006
1007 - def _set_content_type(self, value):
1008 self._check(pn_message_set_content_type(self._msg, value))
1009 1010 content_type = property(_get_content_type, _set_content_type, 1011 doc=""" 1012 The content-type of the message. 1013 """) 1014
1015 - def _get_content_encoding(self):
1016 return pn_message_get_content_encoding(self._msg)
1017
1018 - def _set_content_encoding(self, value):
1019 self._check(pn_message_set_content_encoding(self._msg, value))
1020 1021 content_encoding = property(_get_content_encoding, _set_content_encoding, 1022 doc=""" 1023 The content-encoding of the message. 1024 """) 1025
1026 - def _get_expiry_time(self):
1027 return millis2secs(pn_message_get_expiry_time(self._msg))
1028
1029 - def _set_expiry_time(self, value):
1030 self._check(pn_message_set_expiry_time(self._msg, secs2millis(value)))
1031 1032 expiry_time = property(_get_expiry_time, _set_expiry_time, 1033 doc=""" 1034 The expiry time of the message. 1035 """) 1036
1037 - def _get_creation_time(self):
1038 return millis2secs(pn_message_get_creation_time(self._msg))
1039
1040 - def _set_creation_time(self, value):
1041 self._check(pn_message_set_creation_time(self._msg, secs2millis(value)))
1042 1043 creation_time = property(_get_creation_time, _set_creation_time, 1044 doc=""" 1045 The creation time of the message. 1046 """) 1047
1048 - def _get_group_id(self):
1049 return pn_message_get_group_id(self._msg)
1050
1051 - def _set_group_id(self, value):
1052 self._check(pn_message_set_group_id(self._msg, value))
1053 1054 group_id = property(_get_group_id, _set_group_id, 1055 doc=""" 1056 The group id of the message. 1057 """) 1058
1059 - def _get_group_sequence(self):
1060 return pn_message_get_group_sequence(self._msg)
1061
1062 - def _set_group_sequence(self, value):
1063 self._check(pn_message_set_group_sequence(self._msg, value))
1064 1065 group_sequence = property(_get_group_sequence, _set_group_sequence, 1066 doc=""" 1067 The sequence of the message within its group. 1068 """) 1069
1070 - def _get_reply_to_group_id(self):
1071 return pn_message_get_reply_to_group_id(self._msg)
1072
1073 - def _set_reply_to_group_id(self, value):
1074 self._check(pn_message_set_reply_to_group_id(self._msg, value))
1075 1076 reply_to_group_id = property(_get_reply_to_group_id, _set_reply_to_group_id, 1077 doc=""" 1078 The group-id for any replies. 1079 """) 1080
1081 - def encode(self):
1082 self._pre_encode() 1083 sz = 16 1084 while True: 1085 err, data = pn_message_encode(self._msg, sz) 1086 if err == PN_OVERFLOW: 1087 sz *= 2 1088 continue 1089 else: 1090 self._check(err) 1091 return data
1092
1093 - def decode(self, data):
1094 self._check(pn_message_decode(self._msg, data, len(data))) 1095 self._post_decode()
1096
1097 - def send(self, sender, tag=None):
1098 dlv = sender.delivery(tag or sender.delivery_tag()) 1099 encoded = self.encode() 1100 sender.stream(encoded) 1101 sender.advance() 1102 if sender.snd_settle_mode == Link.SND_SETTLED: 1103 dlv.settle() 1104 return dlv
1105
1106 - def recv(self, link):
1107 """ 1108 Receives and decodes the message content for the current delivery 1109 from the link. Upon success it will return the current delivery 1110 for the link. If there is no current delivery, or if the current 1111 delivery is incomplete, or if the link is not a receiver, it will 1112 return None. 1113 1114 @type link: Link 1115 @param link: the link to receive a message from 1116 @return the delivery associated with the decoded message (or None) 1117 1118 """ 1119 if link.is_sender: return None 1120 dlv = link.current 1121 if not dlv or dlv.partial: return None 1122 encoded = link.recv(dlv.pending) 1123 link.advance() 1124 # the sender has already forgotten about the delivery, so we might 1125 # as well too 1126 if link.remote_snd_settle_mode == Link.SND_SETTLED: 1127 dlv.settle() 1128 self.decode(encoded) 1129 return dlv
1130
1131 - def __repr2__(self):
1132 props = [] 1133 for attr in ("inferred", "address", "reply_to", "durable", "ttl", 1134 "priority", "first_acquirer", "delivery_count", "id", 1135 "correlation_id", "user_id", "group_id", "group_sequence", 1136 "reply_to_group_id", "instructions", "annotations", 1137 "properties", "body"): 1138 value = getattr(self, attr) 1139 if value: props.append("%s=%r" % (attr, value)) 1140 return "Message(%s)" % ", ".join(props)
1141
1142 - def __repr__(self):
1143 tmp = pn_string(None) 1144 err = pn_inspect(self._msg, tmp) 1145 result = pn_string_get(tmp) 1146 pn_free(tmp) 1147 self._check(err) 1148 return result
1149
1150 -class Subscription(object):
1151
1152 - def __init__(self, impl):
1153 self._impl = impl
1154 1155 @property
1156 - def address(self):
1157 return pn_subscription_address(self._impl)
1158 1159 _DEFAULT = object()
1160 1161 -class Selectable(Wrapper):
1162 1163 @staticmethod
1164 - def wrap(impl):
1165 if impl is None: 1166 return None 1167 else: 1168 return Selectable(impl)
1169
1170 - def __init__(self, impl):
1171 Wrapper.__init__(self, impl, pn_selectable_attachments)
1172
1173 - def _init(self):
1174 pass
1175
1176 - def fileno(self, fd = _DEFAULT):
1177 if fd is _DEFAULT: 1178 return pn_selectable_get_fd(self._impl) 1179 elif fd is None: 1180 pn_selectable_set_fd(self._impl, PN_INVALID_SOCKET) 1181 else: 1182 pn_selectable_set_fd(self._impl, fd)
1183
1184 - def _is_reading(self):
1185 return pn_selectable_is_reading(self._impl)
1186
1187 - def _set_reading(self, val):
1188 pn_selectable_set_reading(self._impl, bool(val))
1189 1190 reading = property(_is_reading, _set_reading) 1191
1192 - def _is_writing(self):
1193 return pn_selectable_is_writing(self._impl)
1194
1195 - def _set_writing(self, val):
1196 pn_selectable_set_writing(self._impl, bool(val))
1197 1198 writing = property(_is_writing, _set_writing) 1199
1200 - def _get_deadline(self):
1201 tstamp = pn_selectable_get_deadline(self._impl) 1202 if tstamp: 1203 return millis2secs(tstamp) 1204 else: 1205 return None
1206
1207 - def _set_deadline(self, deadline):
1208 pn_selectable_set_deadline(self._impl, secs2millis(deadline))
1209 1210 deadline = property(_get_deadline, _set_deadline) 1211
1212 - def readable(self):
1213 pn_selectable_readable(self._impl)
1214
1215 - def writable(self):
1216 pn_selectable_writable(self._impl)
1217
1218 - def expired(self):
1219 pn_selectable_expired(self._impl)
1220
1221 - def _is_registered(self):
1222 return pn_selectable_is_registered(self._impl)
1223
1224 - def _set_registered(self, registered):
1225 pn_selectable_set_registered(self._impl, registered)
1226 1227 registered = property(_is_registered, _set_registered, 1228 doc=""" 1229 The registered property may be get/set by an I/O polling system to 1230 indicate whether the fd has been registered or not. 1231 """) 1232 1233 @property
1234 - def is_terminal(self):
1235 return pn_selectable_is_terminal(self._impl)
1236
1237 - def terminate(self):
1238 pn_selectable_terminate(self._impl)
1239
1240 - def release(self):
1241 pn_selectable_release(self._impl)
1242
1243 -class DataException(ProtonException):
1244 """ 1245 The DataException class is the root of the Data exception hierarchy. 1246 All exceptions raised by the Data class extend this exception. 1247 """ 1248 pass
1249
1250 -class UnmappedType:
1251
1252 - def __init__(self, msg):
1253 self.msg = msg
1254
1255 - def __repr__(self):
1256 return "UnmappedType(%s)" % self.msg
1257
1258 -class ulong(long):
1259
1260 - def __repr__(self):
1261 return "ulong(%s)" % long.__repr__(self)
1262
1263 -class timestamp(long):
1264
1265 - def __repr__(self):
1266 return "timestamp(%s)" % long.__repr__(self)
1267
1268 -class symbol(unicode):
1269
1270 - def __repr__(self):
1271 return "symbol(%s)" % unicode.__repr__(self)
1272
1273 -class char(unicode):
1274
1275 - def __repr__(self):
1276 return "char(%s)" % unicode.__repr__(self)
1277
1278 -class Described(object):
1279
1280 - def __init__(self, descriptor, value):
1281 self.descriptor = descriptor 1282 self.value = value
1283
1284 - def __repr__(self):
1285 return "Described(%r, %r)" % (self.descriptor, self.value)
1286
1287 - def __eq__(self, o):
1288 if isinstance(o, Described): 1289 return self.descriptor == o.descriptor and self.value == o.value 1290 else: 1291 return False
1292 1293 UNDESCRIBED = Constant("UNDESCRIBED")
1294 1295 -class Array(object):
1296
1297 - def __init__(self, descriptor, type, *elements):
1298 self.descriptor = descriptor 1299 self.type = type 1300 self.elements = elements
1301
1302 - def __repr__(self):
1303 if self.elements: 1304 els = ", %s" % (", ".join(map(repr, self.elements))) 1305 else: 1306 els = "" 1307 return "Array(%r, %r%s)" % (self.descriptor, self.type, els)
1308
1309 - def __eq__(self, o):
1310 if isinstance(o, Array): 1311 return self.descriptor == o.descriptor and \ 1312 self.type == o.type and self.elements == o.elements 1313 else: 1314 return False
1315
1316 -class Data:
1317 """ 1318 The L{Data} class provides an interface for decoding, extracting, 1319 creating, and encoding arbitrary AMQP data. A L{Data} object 1320 contains a tree of AMQP values. Leaf nodes in this tree correspond 1321 to scalars in the AMQP type system such as L{ints<INT>} or 1322 L{strings<STRING>}. Non-leaf nodes in this tree correspond to 1323 compound values in the AMQP type system such as L{lists<LIST>}, 1324 L{maps<MAP>}, L{arrays<ARRAY>}, or L{described values<DESCRIBED>}. 1325 The root node of the tree is the L{Data} object itself and can have 1326 an arbitrary number of children. 1327 1328 A L{Data} object maintains the notion of the current sibling node 1329 and a current parent node. Siblings are ordered within their parent. 1330 Values are accessed and/or added by using the L{next}, L{prev}, 1331 L{enter}, and L{exit} methods to navigate to the desired location in 1332 the tree and using the supplied variety of put_*/get_* methods to 1333 access or add a value of the desired type. 1334 1335 The put_* methods will always add a value I{after} the current node 1336 in the tree. If the current node has a next sibling the put_* method 1337 will overwrite the value on this node. If there is no current node 1338 or the current node has no next sibling then one will be added. The 1339 put_* methods always set the added/modified node to the current 1340 node. The get_* methods read the value of the current node and do 1341 not change which node is current. 1342 1343 The following types of scalar values are supported: 1344 1345 - L{NULL} 1346 - L{BOOL} 1347 - L{UBYTE} 1348 - L{USHORT} 1349 - L{SHORT} 1350 - L{UINT} 1351 - L{INT} 1352 - L{ULONG} 1353 - L{LONG} 1354 - L{FLOAT} 1355 - L{DOUBLE} 1356 - L{BINARY} 1357 - L{STRING} 1358 - L{SYMBOL} 1359 1360 The following types of compound values are supported: 1361 1362 - L{DESCRIBED} 1363 - L{ARRAY} 1364 - L{LIST} 1365 - L{MAP} 1366 """ 1367 1368 NULL = PN_NULL; "A null value." 1369 BOOL = PN_BOOL; "A boolean value." 1370 UBYTE = PN_UBYTE; "An unsigned byte value." 1371 BYTE = PN_BYTE; "A signed byte value." 1372 USHORT = PN_USHORT; "An unsigned short value." 1373 SHORT = PN_SHORT; "A short value." 1374 UINT = PN_UINT; "An unsigned int value." 1375 INT = PN_INT; "A signed int value." 1376 CHAR = PN_CHAR; "A character value." 1377 ULONG = PN_ULONG; "An unsigned long value." 1378 LONG = PN_LONG; "A signed long value." 1379 TIMESTAMP = PN_TIMESTAMP; "A timestamp value." 1380 FLOAT = PN_FLOAT; "A float value." 1381 DOUBLE = PN_DOUBLE; "A double value." 1382 DECIMAL32 = PN_DECIMAL32; "A DECIMAL32 value." 1383 DECIMAL64 = PN_DECIMAL64; "A DECIMAL64 value." 1384 DECIMAL128 = PN_DECIMAL128; "A DECIMAL128 value." 1385 UUID = PN_UUID; "A UUID value." 1386 BINARY = PN_BINARY; "A binary string." 1387 STRING = PN_STRING; "A unicode string." 1388 SYMBOL = PN_SYMBOL; "A symbolic string." 1389 DESCRIBED = PN_DESCRIBED; "A described value." 1390 ARRAY = PN_ARRAY; "An array value." 1391 LIST = PN_LIST; "A list value." 1392 MAP = PN_MAP; "A map value." 1393 1394 type_names = { 1395 NULL: "null", 1396 BOOL: "bool", 1397 BYTE: "byte", 1398 UBYTE: "ubyte", 1399 SHORT: "short", 1400 USHORT: "ushort", 1401 INT: "int", 1402 UINT: "uint", 1403 CHAR: "char", 1404 LONG: "long", 1405 ULONG: "ulong", 1406 TIMESTAMP: "timestamp", 1407 FLOAT: "float", 1408 DOUBLE: "double", 1409 DECIMAL32: "decimal32", 1410 DECIMAL64: "decimal64", 1411 DECIMAL128: "decimal128", 1412 UUID: "uuid", 1413 BINARY: "binary", 1414 STRING: "string", 1415 SYMBOL: "symbol", 1416 DESCRIBED: "described", 1417 ARRAY: "array", 1418 LIST: "list", 1419 MAP: "map" 1420 } 1421 1422 @classmethod
1423 - def type_name(type): return Data.type_names[type]
1424
1425 - def __init__(self, capacity=16):
1426 if type(capacity) in (int, long): 1427 self._data = pn_data(capacity) 1428 self._free = True 1429 else: 1430 self._data = capacity 1431 self._free = False
1432
1433 - def __del__(self):
1434 if self._free and hasattr(self, "_data"): 1435 pn_data_free(self._data) 1436 del self._data
1437
1438 - def _check(self, err):
1439 if err < 0: 1440 exc = EXCEPTIONS.get(err, DataException) 1441 raise exc("[%s]: %s" % (err, pn_error_text(pn_data_error(self._data)))) 1442 else: 1443 return err
1444
1445 - def clear(self):
1446 """ 1447 Clears the data object. 1448 """ 1449 pn_data_clear(self._data)
1450
1451 - def rewind(self):
1452 """ 1453 Clears current node and sets the parent to the root node. Clearing the 1454 current node sets it _before_ the first node, calling next() will advance to 1455 the first node. 1456 """ 1457 assert self._data is not None 1458 pn_data_rewind(self._data)
1459
1460 - def next(self):
1461 """ 1462 Advances the current node to its next sibling and returns its 1463 type. If there is no next sibling the current node remains 1464 unchanged and None is returned. 1465 """ 1466 found = pn_data_next(self._data) 1467 if found: 1468 return self.type() 1469 else: 1470 return None
1471
1472 - def prev(self):
1473 """ 1474 Advances the current node to its previous sibling and returns its 1475 type. If there is no previous sibling the current node remains 1476 unchanged and None is returned. 1477 """ 1478 found = pn_data_prev(self._data) 1479 if found: 1480 return self.type() 1481 else: 1482 return None
1483
1484 - def enter(self):
1485 """ 1486 Sets the parent node to the current node and clears the current node. 1487 Clearing the current node sets it _before_ the first child, 1488 call next() advances to the first child. 1489 """ 1490 return pn_data_enter(self._data)
1491
1492 - def exit(self):
1493 """ 1494 Sets the current node to the parent node and the parent node to 1495 its own parent. 1496 """ 1497 return pn_data_exit(self._data)
1498
1499 - def lookup(self, name):
1500 return pn_data_lookup(self._data, name)
1501
1502 - def narrow(self):
1503 pn_data_narrow(self._data)
1504
1505 - def widen(self):
1506 pn_data_widen(self._data)
1507
1508 - def type(self):
1509 """ 1510 Returns the type of the current node. 1511 """ 1512 dtype = pn_data_type(self._data) 1513 if dtype == -1: 1514 return None 1515 else: 1516 return dtype
1517
1518 - def encode(self):
1519 """ 1520 Returns a representation of the data encoded in AMQP format. 1521 """ 1522 size = 1024 1523 while True: 1524 cd, enc = pn_data_encode(self._data, size) 1525 if cd == PN_OVERFLOW: 1526 size *= 2 1527 elif cd >= 0: 1528 return enc 1529 else: 1530 self._check(cd)
1531
1532 - def decode(self, encoded):
1533 """ 1534 Decodes the first value from supplied AMQP data and returns the 1535 number of bytes consumed. 1536 1537 @type encoded: binary 1538 @param encoded: AMQP encoded binary data 1539 """ 1540 return self._check(pn_data_decode(self._data, encoded))
1541
1542 - def put_list(self):
1543 """ 1544 Puts a list value. Elements may be filled by entering the list 1545 node and putting element values. 1546 1547 >>> data = Data() 1548 >>> data.put_list() 1549 >>> data.enter() 1550 >>> data.put_int(1) 1551 >>> data.put_int(2) 1552 >>> data.put_int(3) 1553 >>> data.exit() 1554 """ 1555 self._check(pn_data_put_list(self._data))
1556
1557 - def put_map(self):
1558 """ 1559 Puts a map value. Elements may be filled by entering the map node 1560 and putting alternating key value pairs. 1561 1562 >>> data = Data() 1563 >>> data.put_map() 1564 >>> data.enter() 1565 >>> data.put_string("key") 1566 >>> data.put_string("value") 1567 >>> data.exit() 1568 """ 1569 self._check(pn_data_put_map(self._data))
1570
1571 - def put_array(self, described, element_type):
1572 """ 1573 Puts an array value. Elements may be filled by entering the array 1574 node and putting the element values. The values must all be of the 1575 specified array element type. If an array is described then the 1576 first child value of the array is the descriptor and may be of any 1577 type. 1578 1579 >>> data = Data() 1580 >>> 1581 >>> data.put_array(False, Data.INT) 1582 >>> data.enter() 1583 >>> data.put_int(1) 1584 >>> data.put_int(2) 1585 >>> data.put_int(3) 1586 >>> data.exit() 1587 >>> 1588 >>> data.put_array(True, Data.DOUBLE) 1589 >>> data.enter() 1590 >>> data.put_symbol("array-descriptor") 1591 >>> data.put_double(1.1) 1592 >>> data.put_double(1.2) 1593 >>> data.put_double(1.3) 1594 >>> data.exit() 1595 1596 @type described: bool 1597 @param described: specifies whether the array is described 1598 @type element_type: int 1599 @param element_type: the type of the array elements 1600 """ 1601 self._check(pn_data_put_array(self._data, described, element_type))
1602
1603 - def put_described(self):
1604 """ 1605 Puts a described value. A described node has two children, the 1606 descriptor and the value. These are specified by entering the node 1607 and putting the desired values. 1608 1609 >>> data = Data() 1610 >>> data.put_described() 1611 >>> data.enter() 1612 >>> data.put_symbol("value-descriptor") 1613 >>> data.put_string("the value") 1614 >>> data.exit() 1615 """ 1616 self._check(pn_data_put_described(self._data))
1617
1618 - def put_null(self):
1619 """ 1620 Puts a null value. 1621 """ 1622 self._check(pn_data_put_null(self._data))
1623
1624 - def put_bool(self, b):
1625 """ 1626 Puts a boolean value. 1627 1628 @param b: a boolean value 1629 """ 1630 self._check(pn_data_put_bool(self._data, b))
1631
1632 - def put_ubyte(self, ub):
1633 """ 1634 Puts an unsigned byte value. 1635 1636 @param ub: an integral value 1637 """ 1638 self._check(pn_data_put_ubyte(self._data, ub))
1639
1640 - def put_byte(self, b):
1641 """ 1642 Puts a signed byte value. 1643 1644 @param b: an integral value 1645 """ 1646 self._check(pn_data_put_byte(self._data, b))
1647
1648 - def put_ushort(self, us):
1649 """ 1650 Puts an unsigned short value. 1651 1652 @param us: an integral value. 1653 """ 1654 self._check(pn_data_put_ushort(self._data, us))
1655
1656 - def put_short(self, s):
1657 """ 1658 Puts a signed short value. 1659 1660 @param s: an integral value 1661 """ 1662 self._check(pn_data_put_short(self._data, s))
1663
1664 - def put_uint(self, ui):
1665 """ 1666 Puts an unsigned int value. 1667 1668 @param ui: an integral value 1669 """ 1670 self._check(pn_data_put_uint(self._data, ui))
1671
1672 - def put_int(self, i):
1673 """ 1674 Puts a signed int value. 1675 1676 @param i: an integral value 1677 """ 1678 self._check(pn_data_put_int(self._data, i))
1679
1680 - def put_char(self, c):
1681 """ 1682 Puts a char value. 1683 1684 @param c: a single character 1685 """ 1686 self._check(pn_data_put_char(self._data, ord(c)))
1687
1688 - def put_ulong(self, ul):
1689 """ 1690 Puts an unsigned long value. 1691 1692 @param ul: an integral value 1693 """ 1694 self._check(pn_data_put_ulong(self._data, ul))
1695
1696 - def put_long(self, l):
1697 """ 1698 Puts a signed long value. 1699 1700 @param l: an integral value 1701 """ 1702 self._check(pn_data_put_long(self._data, l))
1703
1704 - def put_timestamp(self, t):
1705 """ 1706 Puts a timestamp value. 1707 1708 @param t: an integral value 1709 """ 1710 self._check(pn_data_put_timestamp(self._data, t))
1711
1712 - def put_float(self, f):
1713 """ 1714 Puts a float value. 1715 1716 @param f: a floating point value 1717 """ 1718 self._check(pn_data_put_float(self._data, f))
1719
1720 - def put_double(self, d):
1721 """ 1722 Puts a double value. 1723 1724 @param d: a floating point value. 1725 """ 1726 self._check(pn_data_put_double(self._data, d))
1727
1728 - def put_decimal32(self, d):
1729 """ 1730 Puts a decimal32 value. 1731 1732 @param d: a decimal32 value 1733 """ 1734 self._check(pn_data_put_decimal32(self._data, d))
1735
1736 - def put_decimal64(self, d):
1737 """ 1738 Puts a decimal64 value. 1739 1740 @param d: a decimal64 value 1741 """ 1742 self._check(pn_data_put_decimal64(self._data, d))
1743
1744 - def put_decimal128(self, d):
1745 """ 1746 Puts a decimal128 value. 1747 1748 @param d: a decimal128 value 1749 """ 1750 self._check(pn_data_put_decimal128(self._data, d))
1751
1752 - def put_uuid(self, u):
1753 """ 1754 Puts a UUID value. 1755 1756 @param u: a uuid value 1757 """ 1758 self._check(pn_data_put_uuid(self._data, u.bytes))
1759
1760 - def put_binary(self, b):
1761 """ 1762 Puts a binary value. 1763 1764 @type b: binary 1765 @param b: a binary value 1766 """ 1767 self._check(pn_data_put_binary(self._data, b))
1768
1769 - def put_string(self, s):
1770 """ 1771 Puts a unicode value. 1772 1773 @type s: unicode 1774 @param s: a unicode value 1775 """ 1776 self._check(pn_data_put_string(self._data, s.encode("utf8")))
1777
1778 - def put_symbol(self, s):
1779 """ 1780 Puts a symbolic value. 1781 1782 @type s: string 1783 @param s: the symbol name 1784 """ 1785 self._check(pn_data_put_symbol(self._data, s))
1786
1787 - def get_list(self):
1788 """ 1789 If the current node is a list, return the number of elements, 1790 otherwise return zero. List elements can be accessed by entering 1791 the list. 1792 1793 >>> count = data.get_list() 1794 >>> data.enter() 1795 >>> for i in range(count): 1796 ... type = data.next() 1797 ... if type == Data.STRING: 1798 ... print data.get_string() 1799 ... elif type == ...: 1800 ... ... 1801 >>> data.exit() 1802 """ 1803 return pn_data_get_list(self._data)
1804
1805 - def get_map(self):
1806 """ 1807 If the current node is a map, return the number of child elements, 1808 otherwise return zero. Key value pairs can be accessed by entering 1809 the map. 1810 1811 >>> count = data.get_map() 1812 >>> data.enter() 1813 >>> for i in range(count/2): 1814 ... type = data.next() 1815 ... if type == Data.STRING: 1816 ... print data.get_string() 1817 ... elif type == ...: 1818 ... ... 1819 >>> data.exit() 1820 """ 1821 return pn_data_get_map(self._data)
1822
1823 - def get_array(self):
1824 """ 1825 If the current node is an array, return a tuple of the element 1826 count, a boolean indicating whether the array is described, and 1827 the type of each element, otherwise return (0, False, None). Array 1828 data can be accessed by entering the array. 1829 1830 >>> # read an array of strings with a symbolic descriptor 1831 >>> count, described, type = data.get_array() 1832 >>> data.enter() 1833 >>> data.next() 1834 >>> print "Descriptor:", data.get_symbol() 1835 >>> for i in range(count): 1836 ... data.next() 1837 ... print "Element:", data.get_string() 1838 >>> data.exit() 1839 """ 1840 count = pn_data_get_array(self._data) 1841 described = pn_data_is_array_described(self._data) 1842 type = pn_data_get_array_type(self._data) 1843 if type == -1: 1844 type = None 1845 return count, described, type
1846
1847 - def is_described(self):
1848 """ 1849 Checks if the current node is a described value. The descriptor 1850 and value may be accessed by entering the described value. 1851 1852 >>> # read a symbolically described string 1853 >>> assert data.is_described() # will error if the current node is not described 1854 >>> data.enter() 1855 >>> data.next() 1856 >>> print data.get_symbol() 1857 >>> data.next() 1858 >>> print data.get_string() 1859 >>> data.exit() 1860 """ 1861 return pn_data_is_described(self._data)
1862
1863 - def is_null(self):
1864 """ 1865 Checks if the current node is a null. 1866 """ 1867 return pn_data_is_null(self._data)
1868
1869 - def get_bool(self):
1870 """ 1871 If the current node is a boolean, returns its value, returns False 1872 otherwise. 1873 """ 1874 return pn_data_get_bool(self._data)
1875
1876 - def get_ubyte(self):
1877 """ 1878 If the current node is an unsigned byte, returns its value, 1879 returns 0 otherwise. 1880 """ 1881 return pn_data_get_ubyte(self._data)
1882
1883 - def get_byte(self):
1884 """ 1885 If the current node is a signed byte, returns its value, returns 0 1886 otherwise. 1887 """ 1888 return pn_data_get_byte(self._data)
1889
1890 - def get_ushort(self):
1891 """ 1892 If the current node is an unsigned short, returns its value, 1893 returns 0 otherwise. 1894 """ 1895 return pn_data_get_ushort(self._data)
1896
1897 - def get_short(self):
1898 """ 1899 If the current node is a signed short, returns its value, returns 1900 0 otherwise. 1901 """ 1902 return pn_data_get_short(self._data)
1903
1904 - def get_uint(self):
1905 """ 1906 If the current node is an unsigned int, returns its value, returns 1907 0 otherwise. 1908 """ 1909 return pn_data_get_uint(self._data)
1910
1911 - def get_int(self):
1912 """ 1913 If the current node is a signed int, returns its value, returns 0 1914 otherwise. 1915 """ 1916 return pn_data_get_int(self._data)
1917
1918 - def get_char(self):
1919 """ 1920 If the current node is a char, returns its value, returns 0 1921 otherwise. 1922 """ 1923 return char(unichr(pn_data_get_char(self._data)))
1924
1925 - def get_ulong(self):
1926 """ 1927 If the current node is an unsigned long, returns its value, 1928 returns 0 otherwise. 1929 """ 1930 return ulong(pn_data_get_ulong(self._data))
1931
1932 - def get_long(self):
1933 """ 1934 If the current node is an signed long, returns its value, returns 1935 0 otherwise. 1936 """ 1937 return pn_data_get_long(self._data)
1938
1939 - def get_timestamp(self):
1940 """ 1941 If the current node is a timestamp, returns its value, returns 0 1942 otherwise. 1943 """ 1944 return timestamp(pn_data_get_timestamp(self._data))
1945
1946 - def get_float(self):
1947 """ 1948 If the current node is a float, returns its value, raises 0 1949 otherwise. 1950 """ 1951 return pn_data_get_float(self._data)
1952
1953 - def get_double(self):
1954 """ 1955 If the current node is a double, returns its value, returns 0 1956 otherwise. 1957 """ 1958 return pn_data_get_double(self._data)
1959 1960 # XXX: need to convert
1961 - def get_decimal32(self):
1962 """ 1963 If the current node is a decimal32, returns its value, returns 0 1964 otherwise. 1965 """ 1966 return pn_data_get_decimal32(self._data)
1967 1968 # XXX: need to convert
1969 - def get_decimal64(self):
1970 """ 1971 If the current node is a decimal64, returns its value, returns 0 1972 otherwise. 1973 """ 1974 return pn_data_get_decimal64(self._data)
1975 1976 # XXX: need to convert
1977 - def get_decimal128(self):
1978 """ 1979 If the current node is a decimal128, returns its value, returns 0 1980 otherwise. 1981 """ 1982 return pn_data_get_decimal128(self._data)
1983
1984 - def get_uuid(self):
1985 """ 1986 If the current node is a UUID, returns its value, returns None 1987 otherwise. 1988 """ 1989 if pn_data_type(self._data) == Data.UUID: 1990 return uuid.UUID(bytes=pn_data_get_uuid(self._data)) 1991 else: 1992 return None
1993
1994 - def get_binary(self):
1995 """ 1996 If the current node is binary, returns its value, returns "" 1997 otherwise. 1998 """ 1999 return pn_data_get_binary(self._data)
2000
2001 - def get_string(self):
2002 """ 2003 If the current node is a string, returns its value, returns "" 2004 otherwise. 2005 """ 2006 return pn_data_get_string(self._data).decode("utf8")
2007
2008 - def get_symbol(self):
2009 """ 2010 If the current node is a symbol, returns its value, returns "" 2011 otherwise. 2012 """ 2013 return symbol(pn_data_get_symbol(self._data))
2014
2015 - def copy(self, src):
2016 self._check(pn_data_copy(self._data, src._data))
2017
2018 - def format(self):
2019 sz = 16 2020 while True: 2021 err, result = pn_data_format(self._data, sz) 2022 if err == PN_OVERFLOW: 2023 sz *= 2 2024 continue 2025 else: 2026 self._check(err) 2027 return result
2028
2029 - def dump(self):
2030 pn_data_dump(self._data)
2031
2032 - def put_dict(self, d):
2033 self.put_map() 2034 self.enter() 2035 try: 2036 for k, v in d.items(): 2037 self.put_object(k) 2038 self.put_object(v) 2039 finally: 2040 self.exit()
2041
2042 - def get_dict(self):
2043 if self.enter(): 2044 try: 2045 result = {} 2046 while self.next(): 2047 k = self.get_object() 2048 if self.next(): 2049 v = self.get_object() 2050 else: 2051 v = None 2052 result[k] = v 2053 finally: 2054 self.exit() 2055 return result
2056
2057 - def put_sequence(self, s):
2058 self.put_list() 2059 self.enter() 2060 try: 2061 for o in s: 2062 self.put_object(o) 2063 finally: 2064 self.exit()
2065
2066 - def get_sequence(self):
2067 if self.enter(): 2068 try: 2069 result = [] 2070 while self.next(): 2071 result.append(self.get_object()) 2072 finally: 2073 self.exit() 2074 return result
2075
2076 - def get_py_described(self):
2077 if self.enter(): 2078 try: 2079 self.next() 2080 descriptor = self.get_object() 2081 self.next() 2082 value = self.get_object() 2083 finally: 2084 self.exit() 2085 return Described(descriptor, value)
2086
2087 - def put_py_described(self, d):
2088 self.put_described() 2089 self.enter() 2090 try: 2091 self.put_object(d.descriptor) 2092 self.put_object(d.value) 2093 finally: 2094 self.exit()
2095
2096 - def get_py_array(self):
2097 """ 2098 If the current node is an array, return an Array object 2099 representing the array and its contents. Otherwise return None. 2100 This is a convenience wrapper around get_array, enter, etc. 2101 """ 2102 2103 count, described, type = self.get_array() 2104 if type is None: return None 2105 if self.enter(): 2106 try: 2107 if described: 2108 self.next() 2109 descriptor = self.get_object() 2110 else: 2111 descriptor = UNDESCRIBED 2112 elements = [] 2113 while self.next(): 2114 elements.append(self.get_object()) 2115 finally: 2116 self.exit() 2117 return Array(descriptor, type, *elements)
2118
2119 - def put_py_array(self, a):
2120 described = a.descriptor != UNDESCRIBED 2121 self.put_array(described, a.type) 2122 self.enter() 2123 try: 2124 if described: 2125 self.put_object(a.descriptor) 2126 for e in a.elements: 2127 self.put_object(e) 2128 finally: 2129 self.exit()
2130 2131 put_mappings = { 2132 None.__class__: lambda s, _: s.put_null(), 2133 bool: put_bool, 2134 dict: put_dict, 2135 list: put_sequence, 2136 tuple: put_sequence, 2137 unicode: put_string, 2138 bytes: put_binary, 2139 symbol: put_symbol, 2140 int: put_long, 2141 char: put_char, 2142 long: put_long, 2143 ulong: put_ulong, 2144 timestamp: put_timestamp, 2145 float: put_double, 2146 uuid.UUID: put_uuid, 2147 Described: put_py_described, 2148 Array: put_py_array 2149 } 2150 get_mappings = { 2151 NULL: lambda s: None, 2152 BOOL: get_bool, 2153 BYTE: get_byte, 2154 UBYTE: get_ubyte, 2155 SHORT: get_short, 2156 USHORT: get_ushort, 2157 INT: get_int, 2158 UINT: get_uint, 2159 CHAR: get_char, 2160 LONG: get_long, 2161 ULONG: get_ulong, 2162 TIMESTAMP: get_timestamp, 2163 FLOAT: get_float, 2164 DOUBLE: get_double, 2165 DECIMAL32: get_decimal32, 2166 DECIMAL64: get_decimal64, 2167 DECIMAL128: get_decimal128, 2168 UUID: get_uuid, 2169 BINARY: get_binary, 2170 STRING: get_string, 2171 SYMBOL: get_symbol, 2172 DESCRIBED: get_py_described, 2173 ARRAY: get_py_array, 2174 LIST: get_sequence, 2175 MAP: get_dict 2176 } 2177 2178
2179 - def put_object(self, obj):
2180 putter = self.put_mappings[obj.__class__] 2181 putter(self, obj)
2182
2183 - def get_object(self):
2184 type = self.type() 2185 if type is None: return None 2186 getter = self.get_mappings.get(type) 2187 if getter: 2188 return getter(self) 2189 else: 2190 return UnmappedType(str(type))
2191
2192 -class ConnectionException(ProtonException):
2193 pass
2194
2195 -class Endpoint(object):
2196 2197 LOCAL_UNINIT = PN_LOCAL_UNINIT 2198 REMOTE_UNINIT = PN_REMOTE_UNINIT 2199 LOCAL_ACTIVE = PN_LOCAL_ACTIVE 2200 REMOTE_ACTIVE = PN_REMOTE_ACTIVE 2201 LOCAL_CLOSED = PN_LOCAL_CLOSED 2202 REMOTE_CLOSED = PN_REMOTE_CLOSED 2203
2204 - def _init(self):
2205 self.condition = None
2206
2207 - def _update_cond(self):
2208 obj2cond(self.condition, self._get_cond_impl())
2209 2210 @property
2211 - def remote_condition(self):
2212 return cond2obj(self._get_remote_cond_impl())
2213 2214 # the following must be provided by subclasses
2215 - def _get_cond_impl(self):
2216 assert False, "Subclass must override this!"
2217
2218 - def _get_remote_cond_impl(self):
2219 assert False, "Subclass must override this!"
2220
2221 - def _get_handler(self):
2222 import reactor 2223 ractor = reactor.Reactor.wrap(pn_object_reactor(self._impl)) 2224 if ractor: 2225 on_error = ractor.on_error 2226 else: 2227 on_error = None 2228 record = self._get_attachments() 2229 return WrappedHandler.wrap(pn_record_get_handler(record), on_error)
2230
2231 - def _set_handler(self, handler):
2232 import reactor 2233 ractor = reactor.Reactor.wrap(pn_object_reactor(self._impl)) 2234 if ractor: 2235 on_error = ractor.on_error 2236 else: 2237 on_error = None 2238 impl = _chandler(handler, on_error) 2239 record = self._get_attachments() 2240 pn_record_set_handler(record, impl) 2241 pn_decref(impl)
2242 2243 handler = property(_get_handler, _set_handler) 2244 2245 @property
2246 - def transport(self):
2247 return self.connection.transport
2248
2249 -class Condition:
2250
2251 - def __init__(self, name, description=None, info=None):
2252 self.name = name 2253 self.description = description 2254 self.info = info
2255
2256 - def __repr__(self):
2257 return "Condition(%s)" % ", ".join([repr(x) for x in 2258 (self.name, self.description, self.info) 2259 if x])
2260
2261 - def __eq__(self, o):
2262 if not isinstance(o, Condition): return False 2263 return self.name == o.name and \ 2264 self.description == o.description and \ 2265 self.info == o.info
2266
2267 -def obj2cond(obj, cond):
2268 pn_condition_clear(cond) 2269 if obj: 2270 pn_condition_set_name(cond, str(obj.name)) 2271 pn_condition_set_description(cond, obj.description) 2272 info = Data(pn_condition_info(cond)) 2273 if obj.info: 2274 info.put_object(obj.info)
2275
2276 -def cond2obj(cond):
2277 if pn_condition_is_set(cond): 2278 return Condition(pn_condition_get_name(cond), 2279 pn_condition_get_description(cond), 2280 dat2obj(pn_condition_info(cond))) 2281 else: 2282 return None
2283
2284 -def dat2obj(dimpl):
2285 if dimpl: 2286 d = Data(dimpl) 2287 d.rewind() 2288 d.next() 2289 obj = d.get_object() 2290 d.rewind() 2291 return obj
2292
2293 -def obj2dat(obj, dimpl):
2294 if obj is not None: 2295 d = Data(dimpl) 2296 d.put_object(obj)
2297
2298 -def secs2millis(secs):
2299 return long(secs*1000)
2300
2301 -def millis2secs(millis):
2302 return float(millis)/1000.0
2303
2304 -def timeout2millis(secs):
2305 if secs is None: return PN_MILLIS_MAX 2306 return secs2millis(secs)
2307
2308 -def millis2timeout(millis):
2309 if millis == PN_MILLIS_MAX: return None 2310 return millis2secs(millis)
2311
2312 -def unicode2utf8(string):
2313 if string is None: 2314 return None 2315 if isinstance(string, unicode): 2316 return string.encode('utf8') 2317 elif isinstance(string, str): 2318 return string 2319 else: 2320 raise TypeError("Unrecognized string type: %r" % string)
2321
2322 -def utf82unicode(string):
2323 if string is None: 2324 return None 2325 if isinstance(string, unicode): 2326 return string 2327 elif isinstance(string, str): 2328 return string.decode('utf8') 2329 else: 2330 raise TypeError("Unrecognized string type")
2331
2332 -class Connection(Wrapper, Endpoint):
2333 """ 2334 A representation of an AMQP connection 2335 """ 2336 2337 @staticmethod
2338 - def wrap(impl):
2339 if impl is None: 2340 return None 2341 else: 2342 return Connection(impl)
2343
2344 - def __init__(self, impl = pn_connection):
2345 Wrapper.__init__(self, impl, pn_connection_attachments)
2346
2347 - def _init(self):
2348 Endpoint._init(self) 2349 self.offered_capabilities = None 2350 self.desired_capabilities = None 2351 self.properties = None
2352
2353 - def _get_attachments(self):
2354 return pn_connection_attachments(self._impl)
2355 2356 @property
2357 - def connection(self):
2358 return self
2359 2360 @property
2361 - def transport(self):
2362 return Transport.wrap(pn_connection_transport(self._impl))
2363
2364 - def _check(self, err):
2365 if err < 0: 2366 exc = EXCEPTIONS.get(err, ConnectionException) 2367 raise exc("[%s]: %s" % (err, pn_connection_error(self._impl))) 2368 else: 2369 return err
2370
2371 - def _get_cond_impl(self):
2372 return pn_connection_condition(self._impl)
2373
2374 - def _get_remote_cond_impl(self):
2375 return pn_connection_remote_condition(self._impl)
2376
2377 - def collect(self, collector):
2378 if collector is None: 2379 pn_connection_collect(self._impl, None) 2380 else: 2381 pn_connection_collect(self._impl, collector._impl) 2382 self._collector = weakref.ref(collector)
2383
2384 - def _get_container(self):
2385 return utf82unicode(pn_connection_get_container(self._impl))
2386 - def _set_container(self, name):
2387 return pn_connection_set_container(self._impl, unicode2utf8(name))
2388 2389 container = property(_get_container, _set_container) 2390
2391 - def _get_hostname(self):
2392 return utf82unicode(pn_connection_get_hostname(self._impl))
2393 - def _set_hostname(self, name):
2394 return pn_connection_set_hostname(self._impl, unicode2utf8(name))
2395 2396 hostname = property(_get_hostname, _set_hostname) 2397 2398 @property
2399 - def remote_container(self):
2400 """The container identifier specified by the remote peer for this connection.""" 2401 return pn_connection_remote_container(self._impl)
2402 2403 @property
2404 - def remote_hostname(self):
2405 """The hostname specified by the remote peer for this connection.""" 2406 return pn_connection_remote_hostname(self._impl)
2407 2408 @property
2410 """The capabilities offered by the remote peer for this connection.""" 2411 return dat2obj(pn_connection_remote_offered_capabilities(self._impl))
2412 2413 @property
2415 """The capabilities desired by the remote peer for this connection.""" 2416 return dat2obj(pn_connection_remote_desired_capabilities(self._impl))
2417 2418 @property
2419 - def remote_properties(self):
2420 """The properties specified by the remote peer for this connection.""" 2421 return dat2obj(pn_connection_remote_properties(self._impl))
2422
2423 - def open(self):
2424 """ 2425 Opens the connection. 2426 2427 In more detail, this moves the local state of the connection to 2428 the ACTIVE state and triggers an open frame to be sent to the 2429 peer. A connection is fully active once both peers have opened it. 2430 """ 2431 obj2dat(self.offered_capabilities, 2432 pn_connection_offered_capabilities(self._impl)) 2433 obj2dat(self.desired_capabilities, 2434 pn_connection_desired_capabilities(self._impl)) 2435 obj2dat(self.properties, pn_connection_properties(self._impl)) 2436 pn_connection_open(self._impl)
2437
2438 - def close(self):
2439 """ 2440 Closes the connection. 2441 2442 In more detail, this moves the local state of the connection to 2443 the CLOSED state and triggers a close frame to be sent to the 2444 peer. A connection is fully closed once both peers have closed it. 2445 """ 2446 self._update_cond() 2447 pn_connection_close(self._impl)
2448 2449 @property
2450 - def state(self):
2451 """ 2452 The state of the connection as a bit field. The state has a local 2453 and a remote component. Each of these can be in one of three 2454 states: UNINIT, ACTIVE or CLOSED. These can be tested by masking 2455 against LOCAL_UNINIT, LOCAL_ACTIVE, LOCAL_CLOSED, REMOTE_UNINIT, 2456 REMOTE_ACTIVE and REMOTE_CLOSED. 2457 """ 2458 return pn_connection_state(self._impl)
2459
2460 - def session(self):
2461 """ 2462 Returns a new session on this connection. 2463 """ 2464 return Session(pn_session(self._impl))
2465
2466 - def session_head(self, mask):
2467 return Session.wrap(pn_session_head(self._impl, mask))
2468 2471 2472 @property
2473 - def work_head(self):
2474 return Delivery.wrap(pn_work_head(self._impl))
2475 2476 @property
2477 - def error(self):
2478 return pn_error_code(pn_connection_error(self._impl))
2479
2480 - def free(self):
2481 pn_connection_release(self._impl)
2482
2483 -class SessionException(ProtonException):
2484 pass
2485
2486 -class Session(Wrapper, Endpoint):
2487 2488 @staticmethod
2489 - def wrap(impl):
2490 if impl is None: 2491 return None 2492 else: 2493 return Session(impl)
2494
2495 - def __init__(self, impl):
2496 Wrapper.__init__(self, impl, pn_session_attachments)
2497
2498 - def _get_attachments(self):
2499 return pn_session_attachments(self._impl)
2500
2501 - def _get_cond_impl(self):
2502 return pn_session_condition(self._impl)
2503
2504 - def _get_remote_cond_impl(self):
2505 return pn_session_remote_condition(self._impl)
2506
2507 - def _get_incoming_capacity(self):
2508 return pn_session_get_incoming_capacity(self._impl)
2509
2510 - def _set_incoming_capacity(self, capacity):
2511 pn_session_set_incoming_capacity(self._impl, capacity)
2512 2513 incoming_capacity = property(_get_incoming_capacity, _set_incoming_capacity) 2514 2515 @property
2516 - def outgoing_bytes(self):
2517 return pn_session_outgoing_bytes(self._impl)
2518 2519 @property
2520 - def incoming_bytes(self):
2521 return pn_session_incoming_bytes(self._impl)
2522
2523 - def open(self):
2524 pn_session_open(self._impl)
2525
2526 - def close(self):
2527 self._update_cond() 2528 pn_session_close(self._impl)
2529
2530 - def next(self, mask):
2531 return Session.wrap(pn_session_next(self._impl, mask))
2532 2533 @property
2534 - def state(self):
2535 return pn_session_state(self._impl)
2536 2537 @property
2538 - def connection(self):
2539 return Connection.wrap(pn_session_connection(self._impl))
2540
2541 - def sender(self, name):
2542 return Sender(pn_sender(self._impl, unicode2utf8(name)))
2543
2544 - def receiver(self, name):
2545 return Receiver(pn_receiver(self._impl, unicode2utf8(name)))
2546
2547 - def free(self):
2548 pn_session_free(self._impl)
2549
2550 -class LinkException(ProtonException):
2551 pass
2552 2735
2736 -class Terminus(object):
2737 2738 UNSPECIFIED = PN_UNSPECIFIED 2739 SOURCE = PN_SOURCE 2740 TARGET = PN_TARGET 2741 COORDINATOR = PN_COORDINATOR 2742 2743 NONDURABLE = PN_NONDURABLE 2744 CONFIGURATION = PN_CONFIGURATION 2745 DELIVERIES = PN_DELIVERIES 2746 2747 DIST_MODE_UNSPECIFIED = PN_DIST_MODE_UNSPECIFIED 2748 DIST_MODE_COPY = PN_DIST_MODE_COPY 2749 DIST_MODE_MOVE = PN_DIST_MODE_MOVE 2750
2751 - def __init__(self, impl):
2752 self._impl = impl
2753
2754 - def _check(self, err):
2755 if err < 0: 2756 exc = EXCEPTIONS.get(err, LinkException) 2757 raise exc("[%s]" % err) 2758 else: 2759 return err
2760
2761 - def _get_type(self):
2762 return pn_terminus_get_type(self._impl)
2763 - def _set_type(self, type):
2764 self._check(pn_terminus_set_type(self._impl, type))
2765 type = property(_get_type, _set_type) 2766
2767 - def _get_address(self):
2768 """The address that identifies the source or target node""" 2769 return utf82unicode(pn_terminus_get_address(self._impl))
2770 - def _set_address(self, address):
2771 self._check(pn_terminus_set_address(self._impl, unicode2utf8(address)))
2772 address = property(_get_address, _set_address) 2773
2774 - def _get_durability(self):
2775 return pn_terminus_get_durability(self._impl)
2776 - def _set_durability(self, seconds):
2777 self._check(pn_terminus_set_durability(self._impl, seconds))
2778 durability = property(_get_durability, _set_durability) 2779
2780 - def _get_expiry_policy(self):
2781 return pn_terminus_get_expiry_policy(self._impl)
2782 - def _set_expiry_policy(self, seconds):
2783 self._check(pn_terminus_set_expiry_policy(self._impl, seconds))
2784 expiry_policy = property(_get_expiry_policy, _set_expiry_policy) 2785
2786 - def _get_timeout(self):
2787 return pn_terminus_get_timeout(self._impl)
2788 - def _set_timeout(self, seconds):
2789 self._check(pn_terminus_set_timeout(self._impl, seconds))
2790 timeout = property(_get_timeout, _set_timeout) 2791
2792 - def _is_dynamic(self):
2793 """Indicates whether the source or target node was dynamically 2794 created""" 2795 return pn_terminus_is_dynamic(self._impl)
2796 - def _set_dynamic(self, dynamic):
2797 self._check(pn_terminus_set_dynamic(self._impl, dynamic))
2798 dynamic = property(_is_dynamic, _set_dynamic) 2799
2800 - def _get_distribution_mode(self):
2801 return pn_terminus_get_distribution_mode(self._impl)
2802 - def _set_distribution_mode(self, mode):
2803 self._check(pn_terminus_set_distribution_mode(self._impl, mode))
2804 distribution_mode = property(_get_distribution_mode, _set_distribution_mode) 2805 2806 @property
2807 - def properties(self):
2808 """Properties of a dynamic source or target.""" 2809 return Data(pn_terminus_properties(self._impl))
2810 2811 @property
2812 - def capabilities(self):
2813 """Capabilities of the source or target.""" 2814 return Data(pn_terminus_capabilities(self._impl))
2815 2816 @property
2817 - def outcomes(self):
2818 return Data(pn_terminus_outcomes(self._impl))
2819 2820 @property
2821 - def filter(self):
2822 """A filter on a source allows the set of messages transfered over 2823 the link to be restricted""" 2824 return Data(pn_terminus_filter(self._impl))
2825
2826 - def copy(self, src):
2827 self._check(pn_terminus_copy(self._impl, src._impl))
2828
2829 -class Sender(Link):
2830 """ 2831 A link over which messages are sent. 2832 """ 2833
2834 - def offered(self, n):
2835 pn_link_offered(self._impl, n)
2836
2837 - def stream(self, bytes):
2838 """ 2839 Send specified bytes as part of the current delivery 2840 """ 2841 return self._check(pn_link_send(self._impl, bytes))
2842
2843 - def send(self, obj, tag=None):
2844 """ 2845 Send specified object over this sender; the object is expected to 2846 have a send() method on it that takes the sender and an optional 2847 tag as arguments. 2848 2849 Where the object is a Message, this will send the message over 2850 this link, creating a new delivery for the purpose. 2851 """ 2852 if hasattr(obj, 'send'): 2853 return obj.send(self, tag=tag) 2854 else: 2855 # treat object as bytes 2856 return self.stream(obj)
2857
2858 - def delivery_tag(self):
2859 if not hasattr(self, 'tag_generator'): 2860 def simple_tags(): 2861 count = 1 2862 while True: 2863 yield str(count) 2864 count += 1
2865 self.tag_generator = simple_tags() 2866 return self.tag_generator.next()
2867
2868 -class Receiver(Link):
2869 """ 2870 A link over which messages are received. 2871 """ 2872
2873 - def flow(self, n):
2874 """Increases the credit issued to the remote sender by the specified number of messages.""" 2875 pn_link_flow(self._impl, n)
2876
2877 - def recv(self, limit):
2878 n, bytes = pn_link_recv(self._impl, limit) 2879 if n == PN_EOS: 2880 return None 2881 else: 2882 self._check(n) 2883 return bytes
2884
2885 - def drain(self, n):
2886 pn_link_drain(self._impl, n)
2887
2888 - def draining(self):
2889 return pn_link_draining(self._impl)
2890
2891 -class NamedInt(int):
2892 2893 values = {} 2894
2895 - def __new__(cls, i, name):
2896 ni = super(NamedInt, cls).__new__(cls, i) 2897 cls.values[i] = ni 2898 return ni
2899
2900 - def __init__(self, i, name):
2901 self.name = name
2902
2903 - def __repr__(self):
2904 return self.name
2905
2906 - def __str__(self):
2907 return self.name
2908 2909 @classmethod
2910 - def get(cls, i):
2911 return cls.values.get(i, i)
2912
2913 -class DispositionType(NamedInt):
2914 values = {}
2915
2916 -class Disposition(object):
2917 2918 RECEIVED = DispositionType(PN_RECEIVED, "RECEIVED") 2919 ACCEPTED = DispositionType(PN_ACCEPTED, "ACCEPTED") 2920 REJECTED = DispositionType(PN_REJECTED, "REJECTED") 2921 RELEASED = DispositionType(PN_RELEASED, "RELEASED") 2922 MODIFIED = DispositionType(PN_MODIFIED, "MODIFIED") 2923
2924 - def __init__(self, impl, local):
2925 self._impl = impl 2926 self.local = local 2927 self._data = None 2928 self._condition = None 2929 self._annotations = None
2930 2931 @property
2932 - def type(self):
2933 return DispositionType.get(pn_disposition_type(self._impl))
2934
2935 - def _get_section_number(self):
2936 return pn_disposition_get_section_number(self._impl)
2937 - def _set_section_number(self, n):
2938 pn_disposition_set_section_number(self._impl, n)
2939 section_number = property(_get_section_number, _set_section_number) 2940
2941 - def _get_section_offset(self):
2942 return pn_disposition_get_section_offset(self._impl)
2943 - def _set_section_offset(self, n):
2944 pn_disposition_set_section_offset(self._impl, n)
2945 section_offset = property(_get_section_offset, _set_section_offset) 2946
2947 - def _get_failed(self):
2948 return pn_disposition_is_failed(self._impl)
2949 - def _set_failed(self, b):
2950 pn_disposition_set_failed(self._impl, b)
2951 failed = property(_get_failed, _set_failed) 2952
2953 - def _get_undeliverable(self):
2954 return pn_disposition_is_undeliverable(self._impl)
2955 - def _set_undeliverable(self, b):
2956 pn_disposition_set_undeliverable(self._impl, b)
2957 undeliverable = property(_get_undeliverable, _set_undeliverable) 2958
2959 - def _get_data(self):
2960 if self.local: 2961 return self._data 2962 else: 2963 return dat2obj(pn_disposition_data(self._impl))
2964 - def _set_data(self, obj):
2965 if self.local: 2966 self._data = obj 2967 else: 2968 raise AttributeError("data attribute is read-only")
2969 data = property(_get_data, _set_data) 2970
2971 - def _get_annotations(self):
2972 if self.local: 2973 return self._annotations 2974 else: 2975 return dat2obj(pn_disposition_annotations(self._impl))
2976 - def _set_annotations(self, obj):
2977 if self.local: 2978 self._annotations = obj 2979 else: 2980 raise AttributeError("annotations attribute is read-only")
2981 annotations = property(_get_annotations, _set_annotations) 2982
2983 - def _get_condition(self):
2984 if self.local: 2985 return self._condition 2986 else: 2987 return cond2obj(pn_disposition_condition(self._impl))
2988 - def _set_condition(self, obj):
2989 if self.local: 2990 self._condition = obj 2991 else: 2992 raise AttributeError("condition attribute is read-only")
2993 condition = property(_get_condition, _set_condition)
2994
2995 -class Delivery(Wrapper):
2996 """ 2997 Tracks and/or records the delivery of a message over a link. 2998 """ 2999 3000 RECEIVED = Disposition.RECEIVED 3001 ACCEPTED = Disposition.ACCEPTED 3002 REJECTED = Disposition.REJECTED 3003 RELEASED = Disposition.RELEASED 3004 MODIFIED = Disposition.MODIFIED 3005 3006 @staticmethod
3007 - def wrap(impl):
3008 if impl is None: 3009 return None 3010 else: 3011 return Delivery(impl)
3012
3013 - def __init__(self, impl):
3014 Wrapper.__init__(self, impl, pn_delivery_attachments)
3015
3016 - def _init(self):
3017 self.local = Disposition(pn_delivery_local(self._impl), True) 3018 self.remote = Disposition(pn_delivery_remote(self._impl), False)
3019 3020 @property
3021 - def tag(self):
3022 """The identifier for the delivery.""" 3023 return pn_delivery_tag(self._impl)
3024 3025 @property
3026 - def writable(self):
3027 """Returns true for an outgoing delivery to which data can now be written.""" 3028 return pn_delivery_writable(self._impl)
3029 3030 @property
3031 - def readable(self):
3032 """Returns true for an incoming delivery that has data to read.""" 3033 return pn_delivery_readable(self._impl)
3034 3035 @property
3036 - def updated(self):
3037 """Returns true if the state of the delivery has been updated 3038 (e.g. it has been settled and/or accepted, rejected etc).""" 3039 return pn_delivery_updated(self._impl)
3040
3041 - def update(self, state):
3042 """ 3043 Set the local state of the delivery e.g. ACCEPTED, REJECTED, RELEASED. 3044 """ 3045 obj2dat(self.local._data, pn_disposition_data(self.local._impl)) 3046 obj2dat(self.local._annotations, pn_disposition_annotations(self.local._impl)) 3047 obj2cond(self.local._condition, pn_disposition_condition(self.local._impl)) 3048 pn_delivery_update(self._impl, state)
3049 3050 @property
3051 - def pending(self):
3052 return pn_delivery_pending(self._impl)
3053 3054 @property
3055 - def partial(self):
3056 """ 3057 Returns true for an incoming delivery if not all the data is 3058 yet available. 3059 """ 3060 return pn_delivery_partial(self._impl)
3061 3062 @property
3063 - def local_state(self):
3064 """Returns the local state of the delivery.""" 3065 return DispositionType.get(pn_delivery_local_state(self._impl))
3066 3067 @property
3068 - def remote_state(self):
3069 """ 3070 Returns the state of the delivery as indicated by the remote 3071 peer. 3072 """ 3073 return DispositionType.get(pn_delivery_remote_state(self._impl))
3074 3075 @property
3076 - def settled(self):
3077 """ 3078 Returns true if the delivery has been settled by the remote peer. 3079 """ 3080 return pn_delivery_settled(self._impl)
3081
3082 - def settle(self):
3083 """ 3084 Settles the delivery locally. This indicates the aplication 3085 considers the delivery complete and does not wish to receive any 3086 further events about it. Every delivery should be settled locally. 3087 """ 3088 pn_delivery_settle(self._impl)
3089 3090 @property
3091 - def work_next(self):
3092 return Delivery.wrap(pn_work_next(self._impl))
3093 3094 @property 3100 3101 @property
3102 - def session(self):
3103 """ 3104 Returns the session over which the delivery was sent or received. 3105 """ 3106 return self.link.session
3107 3108 @property
3109 - def connection(self):
3110 """ 3111 Returns the connection over which the delivery was sent or received. 3112 """ 3113 return self.session.connection
3114 3115 @property
3116 - def transport(self):
3117 return self.connection.transport
3118
3119 -class TransportException(ProtonException):
3120 pass
3121
3122 -class Transport(Wrapper):
3123 3124 TRACE_OFF = PN_TRACE_OFF 3125 TRACE_DRV = PN_TRACE_DRV 3126 TRACE_FRM = PN_TRACE_FRM 3127 TRACE_RAW = PN_TRACE_RAW 3128 3129 CLIENT = 1 3130 SERVER = 2 3131 3132 @staticmethod
3133 - def wrap(impl):
3134 if impl is None: 3135 return None 3136 else: 3137 return Transport(_impl=impl)
3138
3139 - def __init__(self, mode=None, _impl = pn_transport):
3140 Wrapper.__init__(self, _impl, pn_transport_attachments) 3141 if mode == Transport.SERVER: 3142 pn_transport_set_server(self._impl) 3143 elif mode is None or mode==Transport.CLIENT: 3144 pass 3145 else: 3146 raise TransportException("Cannot initialise Transport from mode: %s" % str(mode))
3147
3148 - def _init(self):
3149 self._sasl = None 3150 self._ssl = None
3151
3152 - def _check(self, err):
3153 if err < 0: 3154 exc = EXCEPTIONS.get(err, TransportException) 3155 raise exc("[%s]: %s" % (err, pn_error_text(pn_transport_error(self._impl)))) 3156 else: 3157 return err
3158
3159 - def bind(self, connection):
3160 """Assign a connection to the transport""" 3161 self._check(pn_transport_bind(self._impl, connection._impl))
3162
3163 - def unbind(self):
3164 """Release the connection""" 3165 self._check(pn_transport_unbind(self._impl))
3166
3167 - def trace(self, n):
3168 pn_transport_trace(self._impl, n)
3169
3170 - def tick(self, now):
3171 """Process any timed events (like heartbeat generation). 3172 now = seconds since epoch (float). 3173 """ 3174 return millis2secs(pn_transport_tick(self._impl, secs2millis(now)))
3175
3176 - def capacity(self):
3177 c = pn_transport_capacity(self._impl) 3178 if c >= PN_EOS: 3179 return c 3180 else: 3181 return self._check(c)
3182
3183 - def push(self, bytes):
3184 n = self._check(pn_transport_push(self._impl, bytes)) 3185 if n != len(bytes): 3186 raise OverflowError("unable to process all bytes")
3187
3188 - def close_tail(self):
3189 self._check(pn_transport_close_tail(self._impl))
3190
3191 - def pending(self):
3192 p = pn_transport_pending(self._impl) 3193 if p >= PN_EOS: 3194 return p 3195 else: 3196 return self._check(p)
3197
3198 - def peek(self, size):
3199 cd, out = pn_transport_peek(self._impl, size) 3200 if cd == PN_EOS: 3201 return None 3202 else: 3203 self._check(cd) 3204 return out
3205
3206 - def pop(self, size):
3207 pn_transport_pop(self._impl, size)
3208
3209 - def close_head(self):
3210 self._check(pn_transport_close_head(self._impl))
3211 3212 @property
3213 - def closed(self):
3214 return pn_transport_closed(self._impl)
3215 3216 # AMQP 1.0 max-frame-size
3217 - def _get_max_frame_size(self):
3218 return pn_transport_get_max_frame(self._impl)
3219
3220 - def _set_max_frame_size(self, value):
3221 pn_transport_set_max_frame(self._impl, value)
3222 3223 max_frame_size = property(_get_max_frame_size, _set_max_frame_size, 3224 doc=""" 3225 Sets the maximum size for received frames (in bytes). 3226 """) 3227 3228 @property
3229 - def remote_max_frame_size(self):
3230 return pn_transport_get_remote_max_frame(self._impl)
3231
3232 - def _get_channel_max(self):
3233 return pn_transport_get_channel_max(self._impl)
3234
3235 - def _set_channel_max(self, value):
3236 pn_transport_set_channel_max(self._impl, value)
3237 3238 channel_max = property(_get_channel_max, _set_channel_max, 3239 doc=""" 3240 Sets the maximum channel that may be used on the transport. 3241 """) 3242 3243 @property
3244 - def remote_channel_max(self):
3245 return pn_transport_remote_channel_max(self._impl)
3246 3247 # AMQP 1.0 idle-time-out
3248 - def _get_idle_timeout(self):
3249 return millis2secs(pn_transport_get_idle_timeout(self._impl))
3250
3251 - def _set_idle_timeout(self, sec):
3252 pn_transport_set_idle_timeout(self._impl, secs2millis(sec))
3253 3254 idle_timeout = property(_get_idle_timeout, _set_idle_timeout, 3255 doc=""" 3256 The idle timeout of the connection (float, in seconds). 3257 """) 3258 3259 @property
3260 - def remote_idle_timeout(self):
3261 return millis2secs(pn_transport_get_remote_idle_timeout(self._impl))
3262 3263 @property
3264 - def frames_output(self):
3265 return pn_transport_get_frames_output(self._impl)
3266 3267 @property
3268 - def frames_input(self):
3269 return pn_transport_get_frames_input(self._impl)
3270
3271 - def sasl(self):
3272 return SASL(self)
3273
3274 - def ssl(self, domain=None, session_details=None):
3275 # SSL factory (singleton for this transport) 3276 if not self._ssl: 3277 self._ssl = SSL(self, domain, session_details) 3278 return self._ssl
3279 3280 @property
3281 - def condition(self):
3282 return cond2obj(pn_transport_condition(self._impl))
3283 3284 @property
3285 - def connection(self):
3286 return Connection.wrap(pn_transport_connection(self._impl))
3287
3288 -class SASLException(TransportException):
3289 pass
3290
3291 -class SASL(Wrapper):
3292 3293 OK = PN_SASL_OK 3294 AUTH = PN_SASL_AUTH 3295 SKIPPED = PN_SASL_SKIPPED 3296
3297 - def __init__(self, transport):
3298 Wrapper.__init__(self, transport._impl, pn_transport_attachments) 3299 self._sasl = pn_sasl(transport._impl)
3300
3301 - def _check(self, err):
3302 if err < 0: 3303 exc = EXCEPTIONS.get(err, SASLException) 3304 raise exc("[%s]" % (err)) 3305 else: 3306 return err
3307
3308 - def mechanisms(self, mechs):
3309 pn_sasl_mechanisms(self._sasl, mechs)
3310 3311 # @deprecated
3312 - def client(self):
3313 pn_sasl_client(self._sasl)
3314 3315 # @deprecated
3316 - def server(self):
3317 pn_sasl_server(self._sasl)
3318
3319 - def allow_skip(self, allow):
3320 pn_sasl_allow_skip(self._sasl, allow)
3321
3322 - def plain(self, user, password):
3323 pn_sasl_plain(self._sasl, user, password)
3324
3325 - def send(self, data):
3326 self._check(pn_sasl_send(self._sasl, data, len(data)))
3327
3328 - def recv(self):
3329 sz = 16 3330 while True: 3331 n, data = pn_sasl_recv(self._sasl, sz) 3332 if n == PN_OVERFLOW: 3333 sz *= 2 3334 continue 3335 elif n == PN_EOS: 3336 return None 3337 else: 3338 self._check(n) 3339 return data
3340 3341 @property
3342 - def outcome(self):
3343 outcome = pn_sasl_outcome(self._sasl) 3344 if outcome == PN_SASL_NONE: 3345 return None 3346 else: 3347 return outcome
3348
3349 - def done(self, outcome):
3350 pn_sasl_done(self._sasl, outcome)
3351 3352 STATE_IDLE = PN_SASL_IDLE 3353 STATE_STEP = PN_SASL_STEP 3354 STATE_PASS = PN_SASL_PASS 3355 STATE_FAIL = PN_SASL_FAIL 3356 3357 @property
3358 - def state(self):
3359 return pn_sasl_state(self._sasl)
3360
3361 3362 -class SSLException(TransportException):
3363 pass
3364
3365 -class SSLUnavailable(SSLException):
3366 pass
3367
3368 -class SSLDomain(object):
3369 3370 MODE_CLIENT = PN_SSL_MODE_CLIENT 3371 MODE_SERVER = PN_SSL_MODE_SERVER 3372 VERIFY_PEER = PN_SSL_VERIFY_PEER 3373 VERIFY_PEER_NAME = PN_SSL_VERIFY_PEER_NAME 3374 ANONYMOUS_PEER = PN_SSL_ANONYMOUS_PEER 3375
3376 - def __init__(self, mode):
3377 self._domain = pn_ssl_domain(mode) 3378 if self._domain is None: 3379 raise SSLUnavailable()
3380
3381 - def _check(self, err):
3382 if err < 0: 3383 exc = EXCEPTIONS.get(err, SSLException) 3384 raise exc("SSL failure.") 3385 else: 3386 return err
3387
3388 - def set_credentials(self, cert_file, key_file, password):
3389 return self._check( pn_ssl_domain_set_credentials(self._domain, 3390 cert_file, key_file, 3391 password) )
3392 - def set_trusted_ca_db(self, certificate_db):
3393 return self._check( pn_ssl_domain_set_trusted_ca_db(self._domain, 3394 certificate_db) )
3395 - def set_peer_authentication(self, verify_mode, trusted_CAs=None):
3396 return self._check( pn_ssl_domain_set_peer_authentication(self._domain, 3397 verify_mode, 3398 trusted_CAs) )
3399
3400 - def allow_unsecured_client(self):
3401 return self._check( pn_ssl_domain_allow_unsecured_client(self._domain) )
3402
3403 - def __del__(self):
3404 pn_ssl_domain_free(self._domain)
3405
3406 -class SSL(object):
3407 3408 @staticmethod
3409 - def present():
3410 return pn_ssl_present()
3411
3412 - def _check(self, err):
3413 if err < 0: 3414 exc = EXCEPTIONS.get(err, SSLException) 3415 raise exc("SSL failure.") 3416 else: 3417 return err
3418
3419 - def __new__(cls, transport, domain, session_details=None):
3420 """Enforce a singleton SSL object per Transport""" 3421 if transport._ssl: 3422 # unfortunately, we've combined the allocation and the configuration in a 3423 # single step. So catch any attempt by the application to provide what 3424 # may be a different configuration than the original (hack) 3425 ssl = transport._ssl 3426 if (domain and (ssl._domain is not domain) or 3427 session_details and (ssl._session_details is not session_details)): 3428 raise SSLException("Cannot re-configure existing SSL object!") 3429 else: 3430 obj = super(SSL, cls).__new__(cls) 3431 obj._domain = domain 3432 obj._session_details = session_details 3433 session_id = None 3434 if session_details: 3435 session_id = session_details.get_session_id() 3436 obj._ssl = pn_ssl( transport._impl ) 3437 if obj._ssl is None: 3438 raise SSLUnavailable() 3439 pn_ssl_init( obj._ssl, domain._domain, session_id ) 3440 transport._ssl = obj 3441 return transport._ssl
3442
3443 - def cipher_name(self):
3444 rc, name = pn_ssl_get_cipher_name( self._ssl, 128 ) 3445 if rc: 3446 return name 3447 return None
3448
3449 - def protocol_name(self):
3450 rc, name = pn_ssl_get_protocol_name( self._ssl, 128 ) 3451 if rc: 3452 return name 3453 return None
3454 3455 RESUME_UNKNOWN = PN_SSL_RESUME_UNKNOWN 3456 RESUME_NEW = PN_SSL_RESUME_NEW 3457 RESUME_REUSED = PN_SSL_RESUME_REUSED 3458
3459 - def resume_status(self):
3460 return pn_ssl_resume_status( self._ssl )
3461
3462 - def _set_peer_hostname(self, hostname):
3463 self._check(pn_ssl_set_peer_hostname( self._ssl, unicode2utf8(hostname) ))
3464 - def _get_peer_hostname(self):
3465 err, name = pn_ssl_get_peer_hostname( self._ssl, 1024 ) 3466 self._check(err) 3467 return utf82unicode(name)
3468 peer_hostname = property(_get_peer_hostname, _set_peer_hostname, 3469 doc=""" 3470 Manage the expected name of the remote peer. Used to authenticate the remote. 3471 """)
3472
3473 3474 -class SSLSessionDetails(object):
3475 """ Unique identifier for the SSL session. Used to resume previous session on a new 3476 SSL connection. 3477 """ 3478
3479 - def __init__(self, session_id):
3480 self._session_id = session_id
3481
3482 - def get_session_id(self):
3483 return self._session_id
3484 3485 3486 wrappers = { 3487 "pn_void": lambda x: pn_void2py(x), 3488 "pn_pyref": lambda x: pn_void2py(x), 3489 "pn_connection": lambda x: Connection.wrap(pn_cast_pn_connection(x)), 3490 "pn_session": lambda x: Session.wrap(pn_cast_pn_session(x)), 3491 "pn_link": lambda x: Link.wrap(pn_cast_pn_link(x)), 3492 "pn_delivery": lambda x: Delivery.wrap(pn_cast_pn_delivery(x)), 3493 "pn_transport": lambda x: Transport.wrap(pn_cast_pn_transport(x)), 3494 "pn_selectable": lambda x: Selectable.wrap(pn_cast_pn_selectable(x)) 3495 }
3496 3497 -class Collector:
3498
3499 - def __init__(self):
3500 self._impl = pn_collector()
3501
3502 - def put(self, obj, etype):
3503 pn_collector_put(self._impl, PN_PYREF, pn_py2void(obj), etype.number)
3504
3505 - def peek(self):
3506 return Event.wrap(pn_collector_peek(self._impl))
3507
3508 - def pop(self):
3509 ev = self.peek() 3510 pn_collector_pop(self._impl)
3511
3512 - def __del__(self):
3513 pn_collector_free(self._impl) 3514 del self._impl
3515
3516 -class EventType(object):
3517 3518 _lock = threading.Lock() 3519 _extended = 10000 3520 TYPES = {} 3521
3522 - def __init__(self, name=None, number=None, method=None):
3523 if name is None and number is None: 3524 raise TypeError("extended events require a name") 3525 try: 3526 self._lock.acquire() 3527 if name is None: 3528 name = pn_event_type_name(number) 3529 3530 if number is None: 3531 number = EventType._extended 3532 EventType._extended += 1 3533 3534 if method is None: 3535 method = "on_%s" % name 3536 3537 self.name = name 3538 self.number = number 3539 self.method = method 3540 3541 self.TYPES[number] = self 3542 finally: 3543 self._lock.release()
3544
3545 - def __repr__(self):
3546 return self.name
3547
3548 -def dispatch(handler, method, *args):
3549 m = getattr(handler, method, None) 3550 if m: 3551 return m(*args) 3552 elif hasattr(handler, "on_unhandled"): 3553 return handler.on_unhandled(method, *args)
3554
3555 -class EventBase(object):
3556
3557 - def __init__(self, clazz, context, type):
3558 self.clazz = clazz 3559 self.context = context 3560 self.type = type
3561
3562 - def dispatch(self, handler):
3563 return dispatch(handler, self.type.method, self)
3564
3565 -def _none(x): return None
3566 3567 DELEGATED = Constant("DELEGATED")
3568 3569 -def _core(number, method):
3570 return EventType(number=number, method=method)
3571
3572 -class Event(Wrapper, EventBase):
3573 3574 REACTOR_INIT = _core(PN_REACTOR_INIT, "on_reactor_init") 3575 REACTOR_QUIESCED = _core(PN_REACTOR_QUIESCED, "on_reactor_quiesced") 3576 REACTOR_FINAL = _core(PN_REACTOR_FINAL, "on_reactor_final") 3577 3578 TIMER_TASK = _core(PN_TIMER_TASK, "on_timer_task") 3579 3580 CONNECTION_INIT = _core(PN_CONNECTION_INIT, "on_connection_init") 3581 CONNECTION_BOUND = _core(PN_CONNECTION_BOUND, "on_connection_bound") 3582 CONNECTION_UNBOUND = _core(PN_CONNECTION_UNBOUND, "on_connection_unbound") 3583 CONNECTION_LOCAL_OPEN = _core(PN_CONNECTION_LOCAL_OPEN, "on_connection_local_open") 3584 CONNECTION_LOCAL_CLOSE = _core(PN_CONNECTION_LOCAL_CLOSE, "on_connection_local_close") 3585 CONNECTION_REMOTE_OPEN = _core(PN_CONNECTION_REMOTE_OPEN, "on_connection_remote_open") 3586 CONNECTION_REMOTE_CLOSE = _core(PN_CONNECTION_REMOTE_CLOSE, "on_connection_remote_close") 3587 CONNECTION_FINAL = _core(PN_CONNECTION_FINAL, "on_connection_final") 3588 3589 SESSION_INIT = _core(PN_SESSION_INIT, "on_session_init") 3590 SESSION_LOCAL_OPEN = _core(PN_SESSION_LOCAL_OPEN, "on_session_local_open") 3591 SESSION_LOCAL_CLOSE = _core(PN_SESSION_LOCAL_CLOSE, "on_session_local_close") 3592 SESSION_REMOTE_OPEN = _core(PN_SESSION_REMOTE_OPEN, "on_session_remote_open") 3593 SESSION_REMOTE_CLOSE = _core(PN_SESSION_REMOTE_CLOSE, "on_session_remote_close") 3594 SESSION_FINAL = _core(PN_SESSION_FINAL, "on_session_final") 3595 3596 LINK_INIT = _core(PN_LINK_INIT, "on_link_init") 3597 LINK_LOCAL_OPEN = _core(PN_LINK_LOCAL_OPEN, "on_link_local_open") 3598 LINK_LOCAL_CLOSE = _core(PN_LINK_LOCAL_CLOSE, "on_link_local_close") 3599 LINK_LOCAL_DETACH = _core(PN_LINK_LOCAL_DETACH, "on_link_local_detach") 3600 LINK_REMOTE_OPEN = _core(PN_LINK_REMOTE_OPEN, "on_link_remote_open") 3601 LINK_REMOTE_CLOSE = _core(PN_LINK_REMOTE_CLOSE, "on_link_remote_close") 3602 LINK_REMOTE_DETACH = _core(PN_LINK_REMOTE_DETACH, "on_link_remote_detach") 3603 LINK_FLOW = _core(PN_LINK_FLOW, "on_link_flow") 3604 LINK_FINAL = _core(PN_LINK_FINAL, "on_link_final") 3605 3606 DELIVERY = _core(PN_DELIVERY, "on_delivery") 3607 3608 TRANSPORT = _core(PN_TRANSPORT, "on_transport") 3609 TRANSPORT_ERROR = _core(PN_TRANSPORT_ERROR, "on_transport_error") 3610 TRANSPORT_HEAD_CLOSED = _core(PN_TRANSPORT_HEAD_CLOSED, "on_transport_head_closed") 3611 TRANSPORT_TAIL_CLOSED = _core(PN_TRANSPORT_TAIL_CLOSED, "on_transport_tail_closed") 3612 TRANSPORT_CLOSED = _core(PN_TRANSPORT_CLOSED, "on_transport_closed") 3613 3614 SELECTABLE_INIT = _core(PN_SELECTABLE_INIT, "on_selectable_init") 3615 SELECTABLE_UPDATED = _core(PN_SELECTABLE_UPDATED, "on_selectable_updated") 3616 SELECTABLE_READABLE = _core(PN_SELECTABLE_READABLE, "on_selectable_readable") 3617 SELECTABLE_WRITABLE = _core(PN_SELECTABLE_WRITABLE, "on_selectable_writable") 3618 SELECTABLE_EXPIRED = _core(PN_SELECTABLE_EXPIRED, "on_selectable_expired") 3619 SELECTABLE_ERROR = _core(PN_SELECTABLE_ERROR, "on_selectable_error") 3620 SELECTABLE_FINAL = _core(PN_SELECTABLE_FINAL, "on_selectable_final") 3621 3622 @staticmethod
3623 - def wrap(impl, number=None):
3624 if impl is None: 3625 return None 3626 3627 if number is None: 3628 number = pn_event_type(impl) 3629 3630 event = Event(impl, number) 3631 3632 if isinstance(event.context, EventBase): 3633 return event.context 3634 else: 3635 return event
3636
3637 - def __init__(self, impl, number):
3638 Wrapper.__init__(self, impl, pn_event_attachments) 3639 self.__dict__["type"] = EventType.TYPES[number]
3640
3641 - def _init(self):
3642 pass
3643 3644 @property
3645 - def clazz(self):
3646 cls = pn_event_class(self._impl) 3647 if cls: 3648 return pn_class_name(cls) 3649 else: 3650 return None
3651 3652 @property
3653 - def context(self):
3654 """Returns the context object associated with the event. The type of this depend on the type of event.""" 3655 return wrappers[self.clazz](pn_event_context(self._impl))
3656
3657 - def dispatch(self, handler, type=None):
3658 type = type or self.type 3659 if isinstance(handler, WrappedHandler): 3660 pn_handler_dispatch(handler._impl, self._impl, type.number) 3661 else: 3662 result = dispatch(handler, type.method, self) 3663 if result != DELEGATED and hasattr(handler, "handlers"): 3664 for h in handler.handlers: 3665 self.dispatch(h, type)
3666 3667 3668 @property
3669 - def reactor(self):
3670 """Returns the reactor associated with the event.""" 3671 return wrappers.get("pn_reactor", _none)(pn_event_reactor(self._impl))
3672 3673 @property
3674 - def transport(self):
3675 """Returns the transport associated with the event, or null if none is associated with it.""" 3676 return Transport.wrap(pn_event_transport(self._impl))
3677 3678 @property
3679 - def connection(self):
3680 """Returns the connection associated with the event, or null if none is associated with it.""" 3681 return Connection.wrap(pn_event_connection(self._impl))
3682 3683 @property
3684 - def session(self):
3685 """Returns the session associated with the event, or null if none is associated with it.""" 3686 return Session.wrap(pn_event_session(self._impl))
3687 3688 @property 3692 3693 @property
3694 - def sender(self):
3695 """Returns the sender link associated with the event, or null if 3696 none is associated with it. This is essentially an alias for 3697 link(), that does an additional checkon the type of the 3698 link.""" 3699 l = self.link 3700 if l and l.is_sender: 3701 return l 3702 else: 3703 return None
3704 3705 @property
3706 - def receiver(self):
3707 """Returns the receiver link associated with the event, or null if 3708 none is associated with it. This is essentially an alias for 3709 link(), that does an additional checkon the type of the link.""" 3710 l = self.link 3711 if l and l.is_receiver: 3712 return l 3713 else: 3714 return None
3715 3716 @property
3717 - def delivery(self):
3718 """Returns the delivery associated with the event, or null if none is associated with it.""" 3719 return Delivery.wrap(pn_event_delivery(self._impl))
3720
3721 - def __repr__(self):
3722 return "%s(%s)" % (self.type, self.context)
3723
3724 -class Handler(object):
3725
3726 - def on_unhandled(self, method, *args):
3727 pass
3728
3729 -class _cadapter:
3730
3731 - def __init__(self, handler, on_error=None):
3732 self.handler = handler 3733 self.on_error = on_error
3734
3735 - def dispatch(self, cevent, ctype):
3736 ev = Event.wrap(cevent, ctype) 3737 ev.dispatch(self.handler)
3738
3739 - def exception(self, exc, val, tb):
3740 if self.on_error is None: 3741 raise exc, val, tb 3742 else: 3743 self.on_error((exc, val, tb))
3744
3745 -class WrappedHandler(Wrapper):
3746 3747 @staticmethod
3748 - def wrap(impl, on_error=None):
3749 if impl is None: 3750 return None 3751 else: 3752 handler = WrappedHandler(impl) 3753 handler.__dict__["on_error"] = on_error 3754 return handler
3755
3756 - def __init__(self, impl_or_constructor):
3757 Wrapper.__init__(self, impl_or_constructor)
3758
3759 - def _on_error(self, info):
3760 on_error = getattr(self, "on_error", None) 3761 if on_error is None: 3762 raise info[0], info[1], info[2] 3763 else: 3764 on_error(info)
3765
3766 - def add(self, handler):
3767 if handler is None: return 3768 impl = _chandler(handler, self._on_error) 3769 pn_handler_add(self._impl, impl) 3770 pn_decref(impl)
3771
3772 - def clear(self):
3773 pn_handler_clear(self._impl)
3774
3775 -def _chandler(obj, on_error=None):
3776 if obj is None: 3777 return None 3778 elif isinstance(obj, WrappedHandler): 3779 impl = obj._impl 3780 pn_incref(impl) 3781 return impl 3782 else: 3783 return pn_pyhandler(_cadapter(obj, on_error))
3784
3785 -class Url(object):
3786 """ 3787 Simple URL parser/constructor, handles URLs of the form: 3788 3789 <scheme>://<user>:<password>@<host>:<port>/<path> 3790 3791 All components can be None if not specifeid in the URL string. 3792 3793 The port can be specified as a service name, e.g. 'amqp' in the 3794 URL string but Url.port always gives the integer value. 3795 3796 @ivar scheme: Url scheme e.g. 'amqp' or 'amqps' 3797 @ivar user: Username 3798 @ivar password: Password 3799 @ivar host: Host name, ipv6 literal or ipv4 dotted quad. 3800 @ivar port: Integer port. 3801 @ivar host_port: Returns host:port 3802 """ 3803 3804 AMQPS = "amqps" 3805 AMQP = "amqp" 3806
3807 - class Port(int):
3808 """An integer port number that can be constructed from a service name string""" 3809
3810 - def __new__(cls, value):
3811 """@param value: integer port number or string service name.""" 3812 port = super(Url.Port, cls).__new__(cls, cls._port_int(value)) 3813 setattr(port, 'name', str(value)) 3814 return port
3815
3816 - def __eq__(self, x): return str(self) == x or int(self) == x
3817 - def __ne__(self, x): return not self == x
3818 - def __str__(self): return str(self.name)
3819 3820 @staticmethod
3821 - def _port_int(value):
3822 """Convert service, an integer or a service name, into an integer port number.""" 3823 try: 3824 return int(value) 3825 except ValueError: 3826 try: 3827 return socket.getservbyname(value) 3828 except socket.error: 3829 # Not every system has amqp/amqps defined as a service 3830 if value == Url.AMQPS: return 5671 3831 elif value == Url.AMQP: return 5672 3832 else: 3833 raise ValueError("Not a valid port number or service name: '%s'" % value)
3834
3835 - def __init__(self, url=None, defaults=True, **kwargs):
3836 """ 3837 @param url: URL string to parse. 3838 @param defaults: If true, fill in missing default values in the URL. 3839 If false, you can fill them in later by calling self.defaults() 3840 @param kwargs: scheme, user, password, host, port, path. 3841 If specified, replaces corresponding part in url string. 3842 """ 3843 if url: 3844 self._url = pn_url_parse(str(url)) 3845 if not self._url: raise ValueError("Invalid URL '%s'" % url) 3846 else: 3847 self._url = pn_url() 3848 for k in kwargs: # Let kwargs override values parsed from url 3849 getattr(self, k) # Check for invalid kwargs 3850 setattr(self, k, kwargs[k]) 3851 if defaults: self.defaults()
3852
3853 - class PartDescriptor(object):
3854 - def __init__(self, part):
3855 self.getter = globals()["pn_url_get_%s" % part] 3856 self.setter = globals()["pn_url_set_%s" % part]
3857 - def __get__(self, obj, type=None): return self.getter(obj._url)
3858 - def __set__(self, obj, value): return self.setter(obj._url, str(value))
3859 3860 scheme = PartDescriptor('scheme') 3861 username = PartDescriptor('username') 3862 password = PartDescriptor('password') 3863 host = PartDescriptor('host') 3864 path = PartDescriptor('path') 3865
3866 - def _get_port(self):
3867 portstr = pn_url_get_port(self._url) 3868 return portstr and Url.Port(portstr)
3869
3870 - def _set_port(self, value):
3871 if value is None: pn_url_set_port(self._url, None) 3872 else: pn_url_set_port(self._url, str(Url.Port(value)))
3873 3874 port = property(_get_port, _set_port) 3875
3876 - def __str__(self): return pn_url_str(self._url)
3877
3878 - def __repr__(self): return "Url(%r)" % str(self)
3879
3880 - def __eq__(self, x): return str(self) == str(x)
3881 - def __ne__(self, x): return not self == x
3882
3883 - def __del__(self):
3884 pn_url_free(self._url); 3885 del self._url
3886
3887 - def defaults(self):
3888 """ 3889 Fill in missing values (scheme, host or port) with defaults 3890 @return: self 3891 """ 3892 self.scheme = self.scheme or self.AMQP 3893 self.host = self.host or '0.0.0.0' 3894 self.port = self.port or self.Port(self.scheme) 3895 return self
3896 3897 __all__ = [ 3898 "API_LANGUAGE", 3899 "IMPLEMENTATION_LANGUAGE", 3900 "ABORTED", 3901 "ACCEPTED", 3902 "AUTOMATIC", 3903 "PENDING", 3904 "MANUAL", 3905 "REJECTED", 3906 "RELEASED", 3907 "MODIFIED", 3908 "SETTLED", 3909 "UNDESCRIBED", 3910 "Array", 3911 "Collector", 3912 "Condition", 3913 "Connection", 3914 "Data", 3915 "Delivery", 3916 "Disposition", 3917 "Described", 3918 "Endpoint", 3919 "Event", 3920 "Handler", 3921 "Link", 3922 "Message", 3923 "MessageException", 3924 "Messenger", 3925 "MessengerException", 3926 "ProtonException", 3927 "VERSION_MAJOR", 3928 "VERSION_MINOR", 3929 "Receiver", 3930 "SASL", 3931 "Sender", 3932 "Session", 3933 "SSL", 3934 "SSLDomain", 3935 "SSLSessionDetails", 3936 "SSLUnavailable", 3937 "SSLException", 3938 "Terminus", 3939 "Timeout", 3940 "Interrupt", 3941 "Transport", 3942 "TransportException", 3943 "Url", 3944 "char", 3945 "dispatch", 3946 "symbol", 3947 "timestamp", 3948 "ulong" 3949 ] 3950