Package proton :: Module utils
[frames] | no frames]

Source Code for Module proton.utils

  1  # 
  2  # Licensed to the Apache Software Foundation (ASF) under one 
  3  # or more contributor license agreements.  See the NOTICE file 
  4  # distributed with this work for additional information 
  5  # regarding copyright ownership.  The ASF licenses this file 
  6  # to you under the Apache License, Version 2.0 (the 
  7  # "License"); you may not use this file except in compliance 
  8  # with the License.  You may obtain a copy of the License at 
  9  # 
 10  #   http://www.apache.org/licenses/LICENSE-2.0 
 11  # 
 12  # Unless required by applicable law or agreed to in writing, 
 13  # software distributed under the License is distributed on an 
 14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
 15  # KIND, either express or implied.  See the License for the 
 16  # specific language governing permissions and limitations 
 17  # under the License. 
 18  # 
 19  import collections, Queue, socket, time, threading 
 20  from proton import ConnectionException, Delivery, Endpoint, Handler, LinkException, Message 
 21  from proton import ProtonException, Timeout, Url 
 22  from proton.reactor import Container 
 23  from proton.handlers import MessagingHandler, IncomingMessageHandler 
24 25 -def utf8(s):
26 if isinstance(s, unicode): 27 return s.encode('utf8') 28 else: 29 return s
30 59
60 -class SendException(ProtonException):
61 """ 62 Exception used to indicate an exceptional state/condition on a send request 63 """
64 - def __init__(self, state):
65 self.state = state
66
67 -class BlockingSender(BlockingLink):
68 - def __init__(self, connection, sender):
69 super(BlockingSender, self).__init__(connection, sender) 70 if self.link.target and self.link.target.address and self.link.target.address != self.link.remote_target.address: 71 #this may be followed by a detach, which may contain an error condition, so wait a little... 72 self._waitForClose() 73 #...but close ourselves if peer does not 74 self.link.close() 75 raise LinkException("Failed to open sender %s, target does not match" % self.link.name)
76
77 - def send(self, msg, timeout=False, error_states=None):
78 delivery = self.link.send(msg) 79 self.connection.wait(lambda: delivery.settled, msg="Sending on sender %s" % self.link.name, timeout=timeout) 80 bad = error_states 81 if bad is None: 82 bad = [Delivery.REJECTED, Delivery.RELEASED] 83 if delivery.remote_state in bad: 84 raise SendException(delivery.remote_state) 85 return delivery
86
87 -class Fetcher(MessagingHandler):
88 - def __init__(self, connection, prefetch):
89 super(Fetcher, self).__init__(prefetch=prefetch, auto_accept=False) 90 self.connection = connection 91 self.incoming = collections.deque([]) 92 self.unsettled = collections.deque([])
93
94 - def on_message(self, event):
95 self.incoming.append((event.message, event.delivery)) 96 self.connection.container.yield_() # Wake up the wait() loop to handle the message.
97 102
103 - def on_connection_error(self, event):
104 raise ConnectionClosed(event.connection)
105 106 @property
107 - def has_message(self):
108 return len(self.incoming)
109
110 - def pop(self):
111 message, delivery = self.incoming.popleft() 112 if not delivery.settled: 113 self.unsettled.append(delivery) 114 return message
115
116 - def settle(self, state=None):
117 delivery = self.unsettled.popleft() 118 if state: 119 delivery.update(state) 120 delivery.settle()
121
122 123 -class BlockingReceiver(BlockingLink):
124 - def __init__(self, connection, receiver, fetcher, credit=1):
125 super(BlockingReceiver, self).__init__(connection, receiver) 126 if self.link.source and self.link.source.address and self.link.source.address != self.link.remote_source.address: 127 #this may be followed by a detach, which may contain an error condition, so wait a little... 128 self._waitForClose() 129 #...but close ourselves if peer does not 130 self.link.close() 131 raise LinkException("Failed to open receiver %s, source does not match" % self.link.name) 132 if credit: receiver.flow(credit) 133 self.fetcher = fetcher
134
135 - def __del__(self):
136 self.fetcher = None 137 self.link.handler = None
138
139 - def receive(self, timeout=False):
140 if not self.fetcher: 141 raise Exception("Can't call receive on this receiver as a handler was provided") 142 if not self.link.credit: 143 self.link.flow(1) 144 self.connection.wait(lambda: self.fetcher.has_message, msg="Receiving on receiver %s" % self.link.name, timeout=timeout) 145 return self.fetcher.pop()
146
147 - def accept(self):
149
150 - def reject(self):
152
153 - def release(self, delivered=True):
154 if delivered: 155 self.settle(Delivery.MODIFIED) 156 else: 157 self.settle(Delivery.RELEASED)
158
159 - def settle(self, state=None):
160 if not self.fetcher: 161 raise Exception("Can't call accept/reject etc on this receiver as a handler was provided") 162 self.fetcher.settle(state)
163
164 165 -class LinkDetached(LinkException):
166 - def __init__(self, link):
167 self.link = link 168 if link.is_sender: 169 txt = "sender %s to %s closed" % (link.name, link.target.address) 170 else: 171 txt = "receiver %s from %s closed" % (link.name, link.source.address) 172 if link.remote_condition: 173 txt += " due to: %s" % link.remote_condition 174 self.condition = link.remote_condition.name 175 else: 176 txt += " by peer" 177 self.condition = None 178 LinkException.__init__(self, txt)
179
180 181 -class ConnectionClosed(ConnectionException):
182 - def __init__(self, connection):
183 self.connection = connection 184 txt = "Connection %s closed" % self.url 185 if event.connection.remote_condition: 186 txt += " due to: %s" % event.connection.remote_condition 187 self.condition = connection.remote_condition.name 188 else: 189 txt += " by peer" 190 self.condition = None 191 ConnectionException.__init__(self, txt)
192
193 194 -class BlockingConnection(Handler):
195 """ 196 A synchronous style connection wrapper. 197 """
198 - def __init__(self, url, timeout=None, container=None, ssl_domain=None, heartbeat=None):
199 self.disconnected = False 200 self.timeout = timeout 201 self.container = container or Container() 202 self.container.timeout = self.timeout 203 self.container.start() 204 self.url = Url(utf8(url)).defaults() 205 self.conn = self.container.connect(url=self.url, handler=self, ssl_domain=ssl_domain, reconnect=False, heartbeat=heartbeat) 206 self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT), 207 msg="Opening connection")
208
209 - def create_sender(self, address, handler=None, name=None, options=None):
210 return BlockingSender(self, self.container.create_sender(self.conn, utf8(address), name=utf8(name), handler=handler, options=options))
211
212 - def create_receiver(self, address, credit=None, dynamic=False, handler=None, name=None, options=None):
213 prefetch = credit 214 if handler: 215 fetcher = None 216 if prefetch is None: 217 prefetch = 1 218 else: 219 fetcher = Fetcher(self, credit) 220 return BlockingReceiver( 221 self, self.container.create_receiver(self.conn, utf8(address), name=utf8(name), dynamic=dynamic, handler=handler or fetcher, options=options), fetcher, credit=prefetch)
222
223 - def close(self):
224 self.conn.close() 225 try: 226 self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_ACTIVE), 227 msg="Closing connection") 228 finally: 229 self.conn = None 230 self.container = None
231
232 - def _is_closed(self):
233 return self.conn.state & (Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED)
234
235 - def run(self):
236 """ Hand control over to the event loop (e.g. if waiting indefinitely for incoming messages) """ 237 while self.container.process(): pass
238
239 - def wait(self, condition, timeout=False, msg=None):
240 """Call process until condition() is true""" 241 if timeout is False: 242 timeout = self.timeout 243 if timeout is None: 244 while not condition() and not self.disconnected: 245 self.container.process() 246 else: 247 container_timeout = self.container.timeout 248 self.container.timeout = timeout 249 try: 250 deadline = time.time() + timeout 251 while not condition() and not self.disconnected: 252 self.container.process() 253 if deadline < time.time(): 254 txt = "Connection %s timed out" % self.url 255 if msg: txt += ": " + msg 256 raise Timeout(txt) 257 finally: 258 self.container.timeout = container_timeout 259 if self.disconnected or self._is_closed(): 260 self.container.stop() 261 self.conn.handler = None # break cyclical reference 262 if self.disconnected and not self._is_closed(): 263 raise ConnectionException("Connection %s disconnected" % self.url)
264 269
270 - def on_connection_remote_close(self, event):
271 if event.connection.state & Endpoint.LOCAL_ACTIVE: 272 event.connection.close() 273 raise ConnectionClosed(event.connection)
274
275 - def on_transport_tail_closed(self, event):
276 self.on_transport_closed(event)
277
278 - def on_transport_head_closed(self, event):
279 self.on_transport_closed(event)
280
281 - def on_transport_closed(self, event):
282 self.disconnected = True
283
284 -class AtomicCount(object):
285 - def __init__(self, start=0, step=1):
286 """Thread-safe atomic counter. Start at start, increment by step.""" 287 self.count, self.step = start, step 288 self.lock = threading.Lock()
289
290 - def next(self):
291 """Get the next value""" 292 self.lock.acquire() 293 self.count += self.step; 294 result = self.count 295 self.lock.release() 296 return result
297
298 -class SyncRequestResponse(IncomingMessageHandler):
299 """ 300 Implementation of the synchronous request-responce (aka RPC) pattern. 301 @ivar address: Address for all requests, may be None. 302 @ivar connection: Connection for requests and responses. 303 """ 304 305 correlation_id = AtomicCount() 306
307 - def __init__(self, connection, address=None):
308 """ 309 Send requests and receive responses. A single instance can send many requests 310 to the same or different addresses. 311 312 @param connection: A L{BlockingConnection} 313 @param address: Address for all requests. 314 If not specified, each request must have the address property set. 315 Sucessive messages may have different addresses. 316 """ 317 super(SyncRequestResponse, self).__init__() 318 self.connection = connection 319 self.address = address 320 self.sender = self.connection.create_sender(self.address) 321 # dynamic=true generates a unique address dynamically for this receiver. 322 # credit=1 because we want to receive 1 response message initially. 323 self.receiver = self.connection.create_receiver(None, dynamic=True, credit=1, handler=self) 324 self.response = None
325
326 - def call(self, request):
327 """ 328 Send a request message, wait for and return the response message. 329 330 @param request: A L{proton.Message}. If L{self.address} is not set the 331 L{self.address} must be set and will be used. 332 """ 333 if not self.address and not request.address: 334 raise ValueError("Request message has no address: %s" % request) 335 request.reply_to = self.reply_to 336 request.correlation_id = correlation_id = self.correlation_id.next() 337 self.sender.send(request) 338 def wakeup(): 339 return self.response and (self.response.correlation_id == correlation_id)
340 self.connection.wait(wakeup, msg="Waiting for response") 341 response = self.response 342 self.response = None # Ready for next response. 343 self.receiver.flow(1) # Set up credit for the next response. 344 return response
345 346 @property
347 - def reply_to(self):
348 """Return the dynamic address of our receiver.""" 349 return self.receiver.remote_source.address
350
351 - def on_message(self, event):
352 """Called when we receive a message for our receiver.""" 353 self.response = event.message 354 self.connection.container.yield_() # Wake up the wait() loop to handle the message.
355