Home | Trees | Indices | Help |
---|
|
1 # 2 # Licensed to the Apache Software Foundation (ASF) under one 3 # or more contributor license agreements. See the NOTICE file 4 # distributed with this work for additional information 5 # regarding copyright ownership. The ASF licenses this file 6 # to you under the Apache License, Version 2.0 (the 7 # "License"); you may not use this file except in compliance 8 # with the License. You may obtain a copy of the License at 9 # 10 # http://www.apache.org/licenses/LICENSE-2.0 11 # 12 # Unless required by applicable law or agreed to in writing, 13 # software distributed under the License is distributed on an 14 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 # KIND, either express or implied. See the License for the 16 # specific language governing permissions and limitations 17 # under the License. 18 # 19 import collections, Queue, socket, time, threading 20 from proton import ConnectionException, Delivery, Endpoint, Handler, LinkException, Message 21 from proton import ProtonException, Timeout, Url 22 from proton.reactor import Container 23 from proton.handlers import MessagingHandler, IncomingMessageHandler 3059 6633 self.connection = connection 34 self.link = link 35 self.connection.wait(lambda: not (self.link.state & Endpoint.REMOTE_UNINIT), 36 msg="Opening link %s" % link.name) 37 self._checkClosed()3840 try: 41 self.connection.wait(lambda: self.link.state & Endpoint.REMOTE_CLOSED, 42 timeout=timeout, 43 msg="Opening link %s" % self.link.name) 44 except Timeout, e: pass 45 self._checkClosed()4648 if self.link.state & Endpoint.REMOTE_CLOSED: 49 self.link.close() 50 raise LinkDetached(self.link)5153 self.link.close() 54 self.connection.wait(lambda: not (self.link.state & Endpoint.REMOTE_ACTIVE), 55 msg="Closing link %s" % self.link.name)56 57 # Access to other link attributes.8669 super(BlockingSender, self).__init__(connection, sender) 70 if self.link.target and self.link.target.address and self.link.target.address != self.link.remote_target.address: 71 #this may be followed by a detach, which may contain an error condition, so wait a little... 72 self._waitForClose() 73 #...but close ourselves if peer does not 74 self.link.close() 75 raise LinkException("Failed to open sender %s, target does not match" % self.link.name)7678 delivery = self.link.send(msg) 79 self.connection.wait(lambda: delivery.settled, msg="Sending on sender %s" % self.link.name, timeout=timeout) 80 bad = error_states 81 if bad is None: 82 bad = [Delivery.REJECTED, Delivery.RELEASED] 83 if delivery.remote_state in bad: 84 raise SendException(delivery.remote_state) 85 return delivery12189 super(Fetcher, self).__init__(prefetch=prefetch, auto_accept=False) 90 self.connection = connection 91 self.incoming = collections.deque([]) 92 self.unsettled = collections.deque([])9395 self.incoming.append((event.message, event.delivery)) 96 self.connection.container.yield_() # Wake up the wait() loop to handle the message.9799 if event.link.state & Endpoint.LOCAL_ACTIVE: 100 event.link.close() 101 raise LinkDetached(event.link)102 105 106 @property108 return len(self.incoming)109111 message, delivery = self.incoming.popleft() 112 if not delivery.settled: 113 self.unsettled.append(delivery) 114 return message115163125 super(BlockingReceiver, self).__init__(connection, receiver) 126 if self.link.source and self.link.source.address and self.link.source.address != self.link.remote_source.address: 127 #this may be followed by a detach, which may contain an error condition, so wait a little... 128 self._waitForClose() 129 #...but close ourselves if peer does not 130 self.link.close() 131 raise LinkException("Failed to open receiver %s, source does not match" % self.link.name) 132 if credit: receiver.flow(credit) 133 self.fetcher = fetcher134 138140 if not self.fetcher: 141 raise Exception("Can't call receive on this receiver as a handler was provided") 142 if not self.link.credit: 143 self.link.flow(1) 144 self.connection.wait(lambda: self.fetcher.has_message, msg="Receiving on receiver %s" % self.link.name, timeout=timeout) 145 return self.fetcher.pop()146 149 152 158179167 self.link = link 168 if link.is_sender: 169 txt = "sender %s to %s closed" % (link.name, link.target.address) 170 else: 171 txt = "receiver %s from %s closed" % (link.name, link.source.address) 172 if link.remote_condition: 173 txt += " due to: %s" % link.remote_condition 174 self.condition = link.remote_condition.name 175 else: 176 txt += " by peer" 177 self.condition = None 178 LinkException.__init__(self, txt)192183 self.connection = connection 184 txt = "Connection %s closed" % self.url 185 if event.connection.remote_condition: 186 txt += " due to: %s" % event.connection.remote_condition 187 self.condition = connection.remote_condition.name 188 else: 189 txt += " by peer" 190 self.condition = None 191 ConnectionException.__init__(self, txt)195 """ 196 A synchronous style connection wrapper. 197 """283199 self.disconnected = False 200 self.timeout = timeout 201 self.container = container or Container() 202 self.container.timeout = self.timeout 203 self.container.start() 204 self.url = Url(utf8(url)).defaults() 205 self.conn = self.container.connect(url=self.url, handler=self, ssl_domain=ssl_domain, reconnect=False, heartbeat=heartbeat) 206 self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT), 207 msg="Opening connection")208210 return BlockingSender(self, self.container.create_sender(self.conn, utf8(address), name=utf8(name), handler=handler, options=options))211212 - def create_receiver(self, address, credit=None, dynamic=False, handler=None, name=None, options=None):213 prefetch = credit 214 if handler: 215 fetcher = None 216 if prefetch is None: 217 prefetch = 1 218 else: 219 fetcher = Fetcher(self, credit) 220 return BlockingReceiver( 221 self, self.container.create_receiver(self.conn, utf8(address), name=utf8(name), dynamic=dynamic, handler=handler or fetcher, options=options), fetcher, credit=prefetch)222224 self.conn.close() 225 try: 226 self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_ACTIVE), 227 msg="Closing connection") 228 finally: 229 self.conn = None 230 self.container = None231 234236 """ Hand control over to the event loop (e.g. if waiting indefinitely for incoming messages) """ 237 while self.container.process(): pass238240 """Call process until condition() is true""" 241 if timeout is False: 242 timeout = self.timeout 243 if timeout is None: 244 while not condition() and not self.disconnected: 245 self.container.process() 246 else: 247 container_timeout = self.container.timeout 248 self.container.timeout = timeout 249 try: 250 deadline = time.time() + timeout 251 while not condition() and not self.disconnected: 252 self.container.process() 253 if deadline < time.time(): 254 txt = "Connection %s timed out" % self.url 255 if msg: txt += ": " + msg 256 raise Timeout(txt) 257 finally: 258 self.container.timeout = container_timeout 259 if self.disconnected or self._is_closed(): 260 self.container.stop() 261 self.conn.handler = None # break cyclical reference 262 if self.disconnected and not self._is_closed(): 263 raise ConnectionException("Connection %s disconnected" % self.url)264266 if event.link.state & Endpoint.LOCAL_ACTIVE: 267 event.link.close() 268 raise LinkDetached(event.link)269271 if event.connection.state & Endpoint.LOCAL_ACTIVE: 272 event.connection.close() 273 raise ConnectionClosed(event.connection)274276 self.on_transport_closed(event)277279 self.on_transport_closed(event)280297286 """Thread-safe atomic counter. Start at start, increment by step.""" 287 self.count, self.step = start, step 288 self.lock = threading.Lock()289291 """Get the next value""" 292 self.lock.acquire() 293 self.count += self.step; 294 result = self.count 295 self.lock.release() 296 return result299 """ 300 Implementation of the synchronous request-responce (aka RPC) pattern. 301 @ivar address: Address for all requests, may be None. 302 @ivar connection: Connection for requests and responses. 303 """ 304 305 correlation_id = AtomicCount() 306345 346 @property308 """ 309 Send requests and receive responses. A single instance can send many requests 310 to the same or different addresses. 311 312 @param connection: A L{BlockingConnection} 313 @param address: Address for all requests. 314 If not specified, each request must have the address property set. 315 Sucessive messages may have different addresses. 316 """ 317 super(SyncRequestResponse, self).__init__() 318 self.connection = connection 319 self.address = address 320 self.sender = self.connection.create_sender(self.address) 321 # dynamic=true generates a unique address dynamically for this receiver. 322 # credit=1 because we want to receive 1 response message initially. 323 self.receiver = self.connection.create_receiver(None, dynamic=True, credit=1, handler=self) 324 self.response = None325327 """ 328 Send a request message, wait for and return the response message. 329 330 @param request: A L{proton.Message}. If L{self.address} is not set the 331 L{self.address} must be set and will be used. 332 """ 333 if not self.address and not request.address: 334 raise ValueError("Request message has no address: %s" % request) 335 request.reply_to = self.reply_to 336 request.correlation_id = correlation_id = self.correlation_id.next() 337 self.sender.send(request) 338 def wakeup(): 339 return self.response and (self.response.correlation_id == correlation_id)340 self.connection.wait(wakeup, msg="Waiting for response") 341 response = self.response 342 self.response = None # Ready for next response. 343 self.receiver.flow(1) # Set up credit for the next response. 344 return response348 """Return the dynamic address of our receiver.""" 349 return self.receiver.remote_source.address350352 """Called when we receive a message for our receiver.""" 353 self.response = event.message 354 self.connection.container.yield_() # Wake up the wait() loop to handle the message.355
Home | Trees | Indices | Help |
---|
Generated by Epydoc 3.0.1 on Fri Jan 8 15:33:32 2016 | http://epydoc.sourceforge.net |