Package proton :: Module handlers
[frames] | no frames]

Source Code for Module proton.handlers

  1  # 
  2  # Licensed to the Apache Software Foundation (ASF) under one 
  3  # or more contributor license agreements.  See the NOTICE file 
  4  # distributed with this work for additional information 
  5  # regarding copyright ownership.  The ASF licenses this file 
  6  # to you under the Apache License, Version 2.0 (the 
  7  # "License"); you may not use this file except in compliance 
  8  # with the License.  You may obtain a copy of the License at 
  9  # 
 10  #   http://www.apache.org/licenses/LICENSE-2.0 
 11  # 
 12  # Unless required by applicable law or agreed to in writing, 
 13  # software distributed under the License is distributed on an 
 14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
 15  # KIND, either express or implied.  See the License for the 
 16  # specific language governing permissions and limitations 
 17  # under the License. 
 18  # 
 19  import heapq, logging, os, re, socket, time, types, weakref 
 20   
 21  from proton import dispatch, generate_uuid, PN_ACCEPTED, SASL, symbol, ulong, Url 
 22  from proton import Collector, Connection, Delivery, Described, Endpoint, Event, Link, Terminus, Timeout 
 23  from proton import Message, Handler, ProtonException, Transport, TransportException, ConnectionException 
 24  from select import select 
 25   
 26  log = logging.getLogger("proton") 
27 28 -class OutgoingMessageHandler(Handler):
29 """ 30 A utility for simpler and more intuitive handling of delivery 31 events related to outgoing i.e. sent messages. 32 """
33 - def __init__(self, auto_settle=True, delegate=None):
34 self.auto_settle = auto_settle 35 self.delegate = delegate
36 42
43 - def on_delivery(self, event):
44 dlv = event.delivery 45 if dlv.link.is_sender and dlv.updated: 46 if dlv.remote_state == Delivery.ACCEPTED: 47 self.on_accepted(event) 48 elif dlv.remote_state == Delivery.REJECTED: 49 self.on_rejected(event) 50 elif dlv.remote_state == Delivery.RELEASED or dlv.remote_state == Delivery.MODIFIED: 51 self.on_released(event) 52 if dlv.settled: 53 self.on_settled(event) 54 if self.auto_settle: 55 dlv.settle()
56
57 - def on_sendable(self, event):
58 """ 59 Called when the sender link has credit and messages can 60 therefore be transferred. 61 """ 62 if self.delegate != None: 63 dispatch(self.delegate, 'on_sendable', event)
64
65 - def on_accepted(self, event):
66 """ 67 Called when the remote peer accepts an outgoing message. 68 """ 69 if self.delegate != None: 70 dispatch(self.delegate, 'on_accepted', event)
71
72 - def on_rejected(self, event):
73 """ 74 Called when the remote peer rejects an outgoing message. 75 """ 76 if self.delegate != None: 77 dispatch(self.delegate, 'on_rejected', event)
78
79 - def on_released(self, event):
80 """ 81 Called when the remote peer releases an outgoing message. Note 82 that this may be in response to either the RELEASE or MODIFIED 83 state as defined by the AMQP specification. 84 """ 85 if self.delegate != None: 86 dispatch(self.delegate, 'on_released', event)
87
88 - def on_settled(self, event):
89 """ 90 Called when the remote peer has settled the outgoing 91 message. This is the point at which it should never be 92 retransmitted. 93 """ 94 if self.delegate != None: 95 dispatch(self.delegate, 'on_settled', event)
96
97 -def recv_msg(delivery):
98 msg = Message() 99 msg.decode(delivery.link.recv(delivery.pending)) 100 delivery.link.advance() 101 return msg
102
103 -class Reject(ProtonException):
104 """ 105 An exception that indicate a message should be rejected 106 """ 107 pass
108
109 -class Release(ProtonException):
110 """ 111 An exception that indicate a message should be rejected 112 """ 113 pass
114
115 -class Acking(object):
116 - def accept(self, delivery):
117 """ 118 Accepts a received message. 119 """ 120 self.settle(delivery, Delivery.ACCEPTED)
121
122 - def reject(self, delivery):
123 """ 124 Rejects a received message that is considered invalid or 125 unprocessable. 126 """ 127 self.settle(delivery, Delivery.REJECTED)
128
129 - def release(self, delivery, delivered=True):
130 """ 131 Releases a received message, making it available at the source 132 for any (other) interested receiver. The ``delivered`` 133 parameter indicates whether this should be considered a 134 delivery attempt (and the delivery count updated) or not. 135 """ 136 if delivered: 137 self.settle(delivery, Delivery.MODIFIED) 138 else: 139 self.settle(delivery, Delivery.RELEASED)
140
141 - def settle(self, delivery, state=None):
145
146 -class IncomingMessageHandler(Handler, Acking):
147 """ 148 A utility for simpler and more intuitive handling of delivery 149 events related to incoming i.e. received messages. 150 """ 151
152 - def __init__(self, auto_accept=True, delegate=None):
153 self.delegate = delegate 154 self.auto_accept = auto_accept
155
156 - def on_delivery(self, event):
157 dlv = event.delivery 158 if not dlv.link.is_receiver: return 159 if dlv.aborted: 160 self.on_aborted(event) 161 dlv.settle() 162 elif dlv.readable and not dlv.partial: 163 event.message = recv_msg(dlv) 164 if event.link.state & Endpoint.LOCAL_CLOSED: 165 if self.auto_accept: 166 dlv.update(Delivery.RELEASED) 167 dlv.settle() 168 else: 169 try: 170 self.on_message(event) 171 if self.auto_accept: 172 dlv.update(Delivery.ACCEPTED) 173 dlv.settle() 174 except Reject: 175 dlv.update(Delivery.REJECTED) 176 dlv.settle() 177 except Release: 178 dlv.update(Delivery.MODIFIED) 179 dlv.settle() 180 elif dlv.updated and dlv.settled: 181 self.on_settled(event)
182
183 - def on_message(self, event):
184 """ 185 Called when a message is received. The message itself can be 186 obtained as a property on the event. For the purpose of 187 referring to this message in further actions (e.g. if 188 explicitly accepting it, the ``delivery`` should be used, also 189 obtainable via a property on the event. 190 """ 191 if self.delegate != None: 192 dispatch(self.delegate, 'on_message', event)
193
194 - def on_settled(self, event):
195 if self.delegate != None: 196 dispatch(self.delegate, 'on_settled', event)
197
198 - def on_aborted(self, event):
199 if self.delegate != None: 200 dispatch(self.delegate, 'on_aborted', event)
201
202 -class EndpointStateHandler(Handler):
203 """ 204 A utility that exposes 'endpoint' events i.e. the open/close for 205 links, sessions and connections in a more intuitive manner. A 206 XXX_opened method will be called when both local and remote peers 207 have opened the link, session or connection. This can be used to 208 confirm a locally initiated action for example. A XXX_opening 209 method will be called when the remote peer has requested an open 210 that was not initiated locally. By default this will simply open 211 locally, which then triggers the XXX_opened call. The same applies 212 to close. 213 """ 214
215 - def __init__(self, peer_close_is_error=False, delegate=None):
216 self.delegate = delegate 217 self.peer_close_is_error = peer_close_is_error
218 219 @classmethod
220 - def is_local_open(cls, endpoint):
221 return endpoint.state & Endpoint.LOCAL_ACTIVE
222 223 @classmethod
224 - def is_local_uninitialised(cls, endpoint):
225 return endpoint.state & Endpoint.LOCAL_UNINIT
226 227 @classmethod
228 - def is_local_closed(cls, endpoint):
229 return endpoint.state & Endpoint.LOCAL_CLOSED
230 231 @classmethod
232 - def is_remote_open(cls, endpoint):
233 return endpoint.state & Endpoint.REMOTE_ACTIVE
234 235 @classmethod
236 - def is_remote_closed(cls, endpoint):
237 return endpoint.state & Endpoint.REMOTE_CLOSED
238 239 @classmethod
240 - def print_error(cls, endpoint, endpoint_type):
241 if endpoint.remote_condition: 242 log.error(endpoint.remote_condition.description or endpoint.remote_condition.name) 243 elif cls.is_local_open(endpoint) and cls.is_remote_closed(endpoint): 244 log.error("%s closed by peer" % endpoint_type)
245 254
255 - def on_session_remote_close(self, event):
256 if event.session.remote_condition: 257 self.on_session_error(event) 258 elif self.is_local_closed(event.session): 259 self.on_session_closed(event) 260 else: 261 self.on_session_closing(event) 262 event.session.close()
263
264 - def on_connection_remote_close(self, event):
265 if event.connection.remote_condition: 266 if event.connection.remote_condition.name == "amqp:connection:forced": 267 # Treat this the same as just having the transport closed by the peer without 268 # sending any events. Allow reconnection to happen transparently. 269 return 270 self.on_connection_error(event) 271 elif self.is_local_closed(event.connection): 272 self.on_connection_closed(event) 273 else: 274 self.on_connection_closing(event) 275 event.connection.close()
276
277 - def on_connection_local_open(self, event):
278 if self.is_remote_open(event.connection): 279 self.on_connection_opened(event)
280
281 - def on_connection_remote_open(self, event):
282 if self.is_local_open(event.connection): 283 self.on_connection_opened(event) 284 elif self.is_local_uninitialised(event.connection): 285 self.on_connection_opening(event) 286 event.connection.open()
287
288 - def on_session_local_open(self, event):
289 if self.is_remote_open(event.session): 290 self.on_session_opened(event)
291
292 - def on_session_remote_open(self, event):
293 if self.is_local_open(event.session): 294 self.on_session_opened(event) 295 elif self.is_local_uninitialised(event.session): 296 self.on_session_opening(event) 297 event.session.open()
298 302 309
310 - def on_connection_opened(self, event):
311 if self.delegate != None: 312 dispatch(self.delegate, 'on_connection_opened', event)
313
314 - def on_session_opened(self, event):
315 if self.delegate != None: 316 dispatch(self.delegate, 'on_session_opened', event)
317 321
322 - def on_connection_opening(self, event):
323 if self.delegate != None: 324 dispatch(self.delegate, 'on_connection_opening', event)
325
326 - def on_session_opening(self, event):
327 if self.delegate != None: 328 dispatch(self.delegate, 'on_session_opening', event)
329 333
334 - def on_connection_error(self, event):
335 if self.delegate != None: 336 dispatch(self.delegate, 'on_connection_error', event) 337 else: 338 self.log_error(event.connection, "connection")
339
340 - def on_session_error(self, event):
341 if self.delegate != None: 342 dispatch(self.delegate, 'on_session_error', event) 343 else: 344 self.log_error(event.session, "session") 345 event.connection.close()
346 353
354 - def on_connection_closed(self, event):
355 if self.delegate != None: 356 dispatch(self.delegate, 'on_connection_closed', event)
357
358 - def on_session_closed(self, event):
359 if self.delegate != None: 360 dispatch(self.delegate, 'on_session_closed', event)
361 365
366 - def on_connection_closing(self, event):
367 if self.delegate != None: 368 dispatch(self.delegate, 'on_connection_closing', event) 369 elif self.peer_close_is_error: 370 self.on_connection_error(event)
371
372 - def on_session_closing(self, event):
373 if self.delegate != None: 374 dispatch(self.delegate, 'on_session_closing', event) 375 elif self.peer_close_is_error: 376 self.on_session_error(event)
377 383
384 - def on_transport_tail_closed(self, event):
385 self.on_transport_closed(event)
386
387 - def on_transport_closed(self, event):
388 if self.delegate != None and event.connection and self.is_local_open(event.connection): 389 dispatch(self.delegate, 'on_disconnected', event)
390
391 -class MessagingHandler(Handler, Acking):
392 """ 393 A general purpose handler that makes the proton-c events somewhat 394 simpler to deal with and/or avoids repetitive tasks for common use 395 cases. 396 """
397 - def __init__(self, prefetch=10, auto_accept=True, auto_settle=True, peer_close_is_error=False):
398 self.handlers = [] 399 if prefetch: 400 self.handlers.append(CFlowController(prefetch)) 401 self.handlers.append(EndpointStateHandler(peer_close_is_error, weakref.proxy(self))) 402 self.handlers.append(IncomingMessageHandler(auto_accept, weakref.proxy(self))) 403 self.handlers.append(OutgoingMessageHandler(auto_settle, weakref.proxy(self))) 404 self.fatal_conditions = ["amqp:unauthorized-access"]
405
406 - def on_transport_error(self, event):
407 """ 408 Called when some error is encountered with the transport over 409 which the AMQP connection is to be established. This includes 410 authentication errors as well as socket errors. 411 """ 412 if event.transport.condition: 413 if event.transport.condition.info: 414 log.error("%s: %s: %s" % (event.transport.condition.name, event.transport.condition.description, event.transport.condition.info)) 415 else: 416 log.error("%s: %s" % (event.transport.condition.name, event.transport.condition.description)) 417 if event.transport.condition.name in self.fatal_conditions: 418 event.connection.close() 419 else: 420 logging.error("Unspecified transport error")
421
422 - def on_connection_error(self, event):
423 """ 424 Called when the peer closes the connection with an error condition. 425 """ 426 EndpointStateHandler.print_error(event.connection, "connection")
427
428 - def on_session_error(self, event):
429 """ 430 Called when the peer closes the session with an error condition. 431 """ 432 EndpointStateHandler.print_error(event.session, "session") 433 event.connection.close()
434 441
442 - def on_reactor_init(self, event):
443 """ 444 Called when the event loop - the reactor - starts. 445 """ 446 if hasattr(event.reactor, 'subclass'): 447 setattr(event, event.reactor.subclass.__name__.lower(), event.reactor) 448 self.on_start(event)
449
450 - def on_start(self, event):
451 """ 452 Called when the event loop starts. (Just an alias for on_reactor_init) 453 """ 454 pass
455 - def on_connection_closed(self, event):
456 """ 457 Called when the connection is closed. 458 """ 459 pass
460 - def on_session_closed(self, event):
461 """ 462 Called when the session is closed. 463 """ 464 pass
470 - def on_connection_closing(self, event):
471 """ 472 Called when the peer initiates the closing of the connection. 473 """ 474 pass
475 - def on_session_closing(self, event):
476 """ 477 Called when the peer initiates the closing of the session. 478 """ 479 pass
485 - def on_disconnected(self, event):
486 """ 487 Called when the socket is disconnected. 488 """ 489 pass
490
491 - def on_sendable(self, event):
492 """ 493 Called when the sender link has credit and messages can 494 therefore be transferred. 495 """ 496 pass
497
498 - def on_accepted(self, event):
499 """ 500 Called when the remote peer accepts an outgoing message. 501 """ 502 pass
503
504 - def on_rejected(self, event):
505 """ 506 Called when the remote peer rejects an outgoing message. 507 """ 508 pass
509
510 - def on_released(self, event):
511 """ 512 Called when the remote peer releases an outgoing message. Note 513 that this may be in response to either the RELEASE or MODIFIED 514 state as defined by the AMQP specification. 515 """ 516 pass
517
518 - def on_settled(self, event):
519 """ 520 Called when the remote peer has settled the outgoing 521 message. This is the point at which it should never be 522 retransmitted. 523 """ 524 pass
525 - def on_message(self, event):
526 """ 527 Called when a message is received. The message itself can be 528 obtained as a property on the event. For the purpose of 529 referring to this message in further actions (e.g. if 530 explicitly accepting it, the ``delivery`` should be used, also 531 obtainable via a property on the event. 532 """ 533 pass
534
535 -class TransactionHandler(object):
536 """ 537 The interface for transaction handlers, i.e. objects that want to 538 be notified of state changes related to a transaction. 539 """
540 - def on_transaction_declared(self, event):
541 pass
542
543 - def on_transaction_committed(self, event):
544 pass
545
546 - def on_transaction_aborted(self, event):
547 pass
548
549 - def on_transaction_declare_failed(self, event):
550 pass
551
552 - def on_transaction_commit_failed(self, event):
553 pass
554
555 -class TransactionalClientHandler(MessagingHandler, TransactionHandler):
556 """ 557 An extension to the MessagingHandler for applications using 558 transactions. 559 """ 560
561 - def __init__(self, prefetch=10, auto_accept=False, auto_settle=True, peer_close_is_error=False):
562 super(TransactionalClientHandler, self).__init__(prefetch, auto_accept, auto_settle, peer_close_is_error)
563
564 - def accept(self, delivery, transaction=None):
565 if transaction: 566 transaction.accept(delivery) 567 else: 568 super(TransactionalClientHandler, self).accept(delivery)
569 570 from proton import WrappedHandler 571 from cproton import pn_flowcontroller, pn_handshaker, pn_iohandler
572 573 -class CFlowController(WrappedHandler):
574
575 - def __init__(self, window=1024):
576 WrappedHandler.__init__(self, lambda: pn_flowcontroller(window))
577
578 -class CHandshaker(WrappedHandler):
579
580 - def __init__(self):
581 WrappedHandler.__init__(self, pn_handshaker)
582
583 -class IOHandler(WrappedHandler):
584
585 - def __init__(self):
586 WrappedHandler.__init__(self, pn_iohandler)
587
588 -class PythonIO:
589
590 - def __init__(self):
591 self.selectables = [] 592 self.delegate = IOHandler()
593
594 - def on_unhandled(self, method, event):
595 event.dispatch(self.delegate)
596
597 - def on_selectable_init(self, event):
598 self.selectables.append(event.context)
599
600 - def on_selectable_updated(self, event):
601 pass
602
603 - def on_selectable_final(self, event):
604 sel = event.context 605 if sel.is_terminal: 606 self.selectables.remove(sel) 607 sel.release()
608
609 - def on_reactor_quiesced(self, event):
610 reactor = event.reactor 611 # check if we are still quiesced, other handlers of 612 # on_reactor_quiesced could have produced events to process 613 if not reactor.quiesced: return 614 615 reading = [] 616 writing = [] 617 deadline = None 618 for sel in self.selectables: 619 if sel.reading: 620 reading.append(sel) 621 if sel.writing: 622 writing.append(sel) 623 if sel.deadline: 624 if deadline is None: 625 deadline = sel.deadline 626 else: 627 deadline = min(sel.deadline, deadline) 628 629 if deadline is not None: 630 timeout = deadline - time.time() 631 else: 632 timeout = reactor.timeout 633 if (timeout < 0): timeout = 0 634 timeout = min(timeout, reactor.timeout) 635 readable, writable, _ = select(reading, writing, [], timeout) 636 637 reactor.mark() 638 639 now = time.time() 640 641 for s in readable: 642 s.readable() 643 for s in writable: 644 s.writable() 645 for s in self.selectables: 646 if s.deadline and now > s.deadline: 647 s.expired() 648 649 reactor.yield_()
650