Package proton :: Module reactor
[frames] | no frames]

Source Code for Module proton.reactor

  1  from __future__ import absolute_import 
  2  # 
  3  # Licensed to the Apache Software Foundation (ASF) under one 
  4  # or more contributor license agreements.  See the NOTICE file 
  5  # distributed with this work for additional information 
  6  # regarding copyright ownership.  The ASF licenses this file 
  7  # to you under the Apache License, Version 2.0 (the 
  8  # "License"); you may not use this file except in compliance 
  9  # with the License.  You may obtain a copy of the License at 
 10  # 
 11  #   http://www.apache.org/licenses/LICENSE-2.0 
 12  # 
 13  # Unless required by applicable law or agreed to in writing, 
 14  # software distributed under the License is distributed on an 
 15  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
 16  # KIND, either express or implied.  See the License for the 
 17  # specific language governing permissions and limitations 
 18  # under the License. 
 19  # 
 20  import logging, os, socket, time, types 
 21  from heapq import heappush, heappop, nsmallest 
 22  from proton import Collector, Connection, ConnectionException, Delivery, Described, dispatch 
 23  from proton import Endpoint, Event, EventBase, EventType, generate_uuid, Handler, Link, Message 
 24  from proton import ProtonException, PN_ACCEPTED, PN_PYREF, SASL, Session, SSL, SSLDomain, SSLUnavailable, symbol 
 25  from proton import Terminus, Timeout, Transport, TransportException, ulong, Url 
 26  from select import select 
 27  from proton.handlers import OutgoingMessageHandler 
 28  from proton import unicode2utf8, utf82unicode 
 29   
 30  import traceback 
 31  from proton import WrappedHandler, _chandler, secs2millis, millis2secs, timeout2millis, millis2timeout, Selectable 
 32  from .wrapper import Wrapper, PYCTX 
 33  from cproton import * 
 34  from . import _compat 
 35   
 36  try: 
 37      import Queue 
 38  except ImportError: 
 39      import queue as Queue 
40 41 -class Task(Wrapper):
42 43 @staticmethod
44 - def wrap(impl):
45 if impl is None: 46 return None 47 else: 48 return Task(impl)
49
50 - def __init__(self, impl):
51 Wrapper.__init__(self, impl, pn_task_attachments)
52
53 - def _init(self):
54 pass
55
56 - def cancel(self):
57 pn_task_cancel(self._impl)
58
59 -class Acceptor(Wrapper):
60
61 - def __init__(self, impl):
62 Wrapper.__init__(self, impl)
63
64 - def set_ssl_domain(self, ssl_domain):
65 pn_acceptor_set_ssl_domain(self._impl, ssl_domain._domain)
66
67 - def close(self):
68 pn_acceptor_close(self._impl)
69
70 -class Reactor(Wrapper):
71 72 @staticmethod
73 - def wrap(impl):
74 if impl is None: 75 return None 76 else: 77 record = pn_reactor_attachments(impl) 78 attrs = pn_void2py(pn_record_get(record, PYCTX)) 79 if attrs and 'subclass' in attrs: 80 return attrs['subclass'](impl=impl) 81 else: 82 return Reactor(impl=impl)
83
84 - def __init__(self, *handlers, **kwargs):
85 Wrapper.__init__(self, kwargs.get("impl", pn_reactor), pn_reactor_attachments) 86 for h in handlers: 87 self.handler.add(h)
88
89 - def _init(self):
90 self.errors = []
91
92 - def on_error(self, info):
93 self.errors.append(info) 94 self.yield_()
95
96 - def _get_global(self):
97 return WrappedHandler.wrap(pn_reactor_get_global_handler(self._impl), self.on_error)
98
99 - def _set_global(self, handler):
100 impl = _chandler(handler, self.on_error) 101 pn_reactor_set_global_handler(self._impl, impl) 102 pn_decref(impl)
103 104 global_handler = property(_get_global, _set_global) 105
106 - def _get_timeout(self):
107 return millis2timeout(pn_reactor_get_timeout(self._impl))
108
109 - def _set_timeout(self, secs):
110 return pn_reactor_set_timeout(self._impl, timeout2millis(secs))
111 112 timeout = property(_get_timeout, _set_timeout) 113
114 - def yield_(self):
115 pn_reactor_yield(self._impl)
116
117 - def mark(self):
118 return pn_reactor_mark(self._impl)
119
120 - def _get_handler(self):
121 return WrappedHandler.wrap(pn_reactor_get_handler(self._impl), self.on_error)
122
123 - def _set_handler(self, handler):
124 impl = _chandler(handler, self.on_error) 125 pn_reactor_set_handler(self._impl, impl) 126 pn_decref(impl)
127 128 handler = property(_get_handler, _set_handler) 129
130 - def run(self):
131 self.timeout = 3.14159265359 132 self.start() 133 while self.process(): pass 134 self.stop()
135
136 - def wakeup(self):
137 n = pn_reactor_wakeup(self._impl) 138 if n: raise IOError(pn_error_text(pn_io_error(pn_reactor_io(self._impl))))
139
140 - def start(self):
141 pn_reactor_start(self._impl)
142 143 @property
144 - def quiesced(self):
145 return pn_reactor_quiesced(self._impl)
146
147 - def _check_errors(self):
148 if self.errors: 149 for exc, value, tb in self.errors[:-1]: 150 traceback.print_exception(exc, value, tb) 151 exc, value, tb = self.errors[-1] 152 _compat.raise_(exc, value, tb)
153
154 - def process(self):
155 result = pn_reactor_process(self._impl) 156 self._check_errors() 157 return result
158
159 - def stop(self):
160 pn_reactor_stop(self._impl) 161 self._check_errors() 162 self.global_handler = None 163 self.handler = None
164
165 - def schedule(self, delay, task):
166 impl = _chandler(task, self.on_error) 167 task = Task.wrap(pn_reactor_schedule(self._impl, secs2millis(delay), impl)) 168 pn_decref(impl) 169 return task
170
171 - def acceptor(self, host, port, handler=None):
172 impl = _chandler(handler, self.on_error) 173 aimpl = pn_reactor_acceptor(self._impl, unicode2utf8(host), str(port), impl) 174 pn_decref(impl) 175 if aimpl: 176 return Acceptor(aimpl) 177 else: 178 raise IOError("%s (%s:%s)" % (pn_error_text(pn_io_error(pn_reactor_io(self._impl))), host, port))
179
180 - def connection(self, handler=None):
181 impl = _chandler(handler, self.on_error) 182 result = Connection.wrap(pn_reactor_connection(self._impl, impl)) 183 pn_decref(impl) 184 return result
185
186 - def selectable(self, handler=None):
187 impl = _chandler(handler, self.on_error) 188 result = Selectable.wrap(pn_reactor_selectable(self._impl)) 189 if impl: 190 record = pn_selectable_attachments(result._impl) 191 pn_record_set_handler(record, impl) 192 pn_decref(impl) 193 return result
194
195 - def update(self, sel):
196 pn_reactor_update(self._impl, sel._impl)
197
198 - def push_event(self, obj, etype):
199 pn_collector_put(pn_reactor_collector(self._impl), PN_PYREF, pn_py2void(obj), etype.number)
200 201 from proton import wrappers as _wrappers 202 _wrappers["pn_reactor"] = lambda x: Reactor.wrap(pn_cast_pn_reactor(x)) 203 _wrappers["pn_task"] = lambda x: Task.wrap(pn_cast_pn_task(x))
204 205 206 -class EventInjector(object):
207 """ 208 Can be added to a reactor to allow events to be triggered by an 209 external thread but handled on the event thread associated with 210 the reactor. An instance of this class can be passed to the 211 Reactor.selectable() method of the reactor in order to activate 212 it. The close() method should be called when it is no longer 213 needed, to allow the event loop to end if needed. 214 """
215 - def __init__(self):
216 self.queue = Queue.Queue() 217 self.pipe = os.pipe() 218 self._closed = False
219
220 - def trigger(self, event):
221 """ 222 Request that the given event be dispatched on the event thread 223 of the reactor to which this EventInjector was added. 224 """ 225 self.queue.put(event) 226 os.write(self.pipe[1], _compat.str2bin("!"))
227
228 - def close(self):
229 """ 230 Request that this EventInjector be closed. Existing events 231 will be dispctahed on the reactors event dispactch thread, 232 then this will be removed from the set of interest. 233 """ 234 self._closed = True 235 os.write(self.pipe[1], _compat.str2bin("!"))
236
237 - def fileno(self):
238 return self.pipe[0]
239
240 - def on_selectable_init(self, event):
241 sel = event.context 242 sel.fileno(self.fileno()) 243 sel.reading = True 244 event.reactor.update(sel)
245
246 - def on_selectable_readable(self, event):
247 os.read(self.pipe[0], 512) 248 while not self.queue.empty(): 249 requested = self.queue.get() 250 event.reactor.push_event(requested.context, requested.type) 251 if self._closed: 252 s = event.context 253 s.terminate() 254 event.reactor.update(s)
255
256 257 -class ApplicationEvent(EventBase):
258 """ 259 Application defined event, which can optionally be associated with 260 an engine object and or an arbitrary subject 261 """
262 - def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None):
263 super(ApplicationEvent, self).__init__(PN_PYREF, self, EventType(typename)) 264 self.connection = connection 265 self.session = session 266 self.link = link 267 self.delivery = delivery 268 if self.delivery: 269 self.link = self.delivery.link 270 if self.link: 271 self.session = self.link.session 272 if self.session: 273 self.connection = self.session.connection 274 self.subject = subject
275
276 - def __repr__(self):
277 objects = [self.connection, self.session, self.link, self.delivery, self.subject] 278 return "%s(%s)" % (self.type, ", ".join([str(o) for o in objects if o is not None]))
279
280 -class Transaction(object):
281 """ 282 Class to track state of an AMQP 1.0 transaction. 283 """
284 - def __init__(self, txn_ctrl, handler, settle_before_discharge=False):
285 self.txn_ctrl = txn_ctrl 286 self.handler = handler 287 self.id = None 288 self._declare = None 289 self._discharge = None 290 self.failed = False 291 self._pending = [] 292 self.settle_before_discharge = settle_before_discharge 293 self.declare()
294
295 - def commit(self):
296 self.discharge(False)
297
298 - def abort(self):
299 self.discharge(True)
300
301 - def declare(self):
302 self._declare = self._send_ctrl(symbol(u'amqp:declare:list'), [None])
303
304 - def discharge(self, failed):
305 self.failed = failed 306 self._discharge = self._send_ctrl(symbol(u'amqp:discharge:list'), [self.id, failed])
307
308 - def _send_ctrl(self, descriptor, value):
309 delivery = self.txn_ctrl.send(Message(body=Described(descriptor, value))) 310 delivery.transaction = self 311 return delivery
312
313 - def send(self, sender, msg, tag=None):
314 dlv = sender.send(msg, tag=tag) 315 dlv.local.data = [self.id] 316 dlv.update(0x34) 317 return dlv
318
319 - def accept(self, delivery):
320 self.update(delivery, PN_ACCEPTED) 321 if self.settle_before_discharge: 322 delivery.settle() 323 else: 324 self._pending.append(delivery)
325
326 - def update(self, delivery, state=None):
327 if state: 328 delivery.local.data = [self.id, Described(ulong(state), [])] 329 delivery.update(0x34)
330
331 - def _release_pending(self):
332 for d in self._pending: 333 d.update(Delivery.RELEASED) 334 d.settle() 335 self._clear_pending()
336
337 - def _clear_pending(self):
338 self._pending = []
339
340 - def handle_outcome(self, event):
341 if event.delivery == self._declare: 342 if event.delivery.remote.data: 343 self.id = event.delivery.remote.data[0] 344 self.handler.on_transaction_declared(event) 345 elif event.delivery.remote_state == Delivery.REJECTED: 346 self.handler.on_transaction_declare_failed(event) 347 else: 348 logging.warning("Unexpected outcome for declare: %s" % event.delivery.remote_state) 349 self.handler.on_transaction_declare_failed(event) 350 elif event.delivery == self._discharge: 351 if event.delivery.remote_state == Delivery.REJECTED: 352 if not self.failed: 353 self.handler.on_transaction_commit_failed(event) 354 self._release_pending() # make this optional? 355 else: 356 if self.failed: 357 self.handler.on_transaction_aborted(event) 358 self._release_pending() 359 else: 360 self.handler.on_transaction_committed(event) 361 self._clear_pending()
362
363 -class LinkOption(object):
364 """ 365 Abstract interface for link configuration options 366 """
367 - def apply(self, link):
368 """ 369 Subclasses will implement any configuration logic in this 370 method 371 """ 372 pass
373 - def test(self, link):
374 """ 375 Subclasses can override this to selectively apply an option 376 e.g. based on some link criteria 377 """ 378 return True
379
380 -class AtMostOnce(LinkOption):
381 - def apply(self, link):
382 link.snd_settle_mode = Link.SND_SETTLED
383
384 -class AtLeastOnce(LinkOption):
385 - def apply(self, link):
386 link.snd_settle_mode = Link.SND_UNSETTLED 387 link.rcv_settle_mode = Link.RCV_FIRST
388
389 -class SenderOption(LinkOption):
390 - def apply(self, sender): pass
391 - def test(self, link): return link.is_sender
392
393 -class ReceiverOption(LinkOption):
394 - def apply(self, receiver): pass
395 - def test(self, link): return link.is_receiver
396
397 -class DynamicNodeProperties(LinkOption):
398 - def __init__(self, props={}):
399 self.properties = {} 400 for k in props: 401 if isinstance(k, symbol): 402 self.properties[k] = props[k] 403 else: 404 self.properties[symbol(k)] = props[k]
405
406 - def apply(self, link):
407 if link.is_receiver: 408 link.source.properties.put_dict(self.properties) 409 else: 410 link.target.properties.put_dict(self.properties)
411
412 -class Filter(ReceiverOption):
413 - def __init__(self, filter_set={}):
414 self.filter_set = filter_set
415
416 - def apply(self, receiver):
417 receiver.source.filter.put_dict(self.filter_set)
418
419 -class Selector(Filter):
420 """ 421 Configures a link with a message selector filter 422 """
423 - def __init__(self, value, name='selector'):
424 super(Selector, self).__init__({symbol(name): Described(symbol('apache.org:selector-filter:string'), value)})
425
426 -class DurableSubscription(ReceiverOption):
427 - def apply(self, receiver):
428 receiver.source.durability = Terminus.DELIVERIES 429 receiver.source.expiry_policy = Terminus.EXPIRE_NEVER
430
431 -class Move(ReceiverOption):
432 - def apply(self, receiver):
433 receiver.source.distribution_mode = Terminus.DIST_MODE_MOVE
434
435 -class Copy(ReceiverOption):
436 - def apply(self, receiver):
437 receiver.source.distribution_mode = Terminus.DIST_MODE_COPY
438 446
447 -def _create_session(connection, handler=None):
448 session = connection.session() 449 session.open() 450 return session
451
452 453 -def _get_attr(target, name):
454 if hasattr(target, name): 455 return getattr(target, name) 456 else: 457 return None
458
459 -class SessionPerConnection(object):
460 - def __init__(self):
461 self._default_session = None
462
463 - def session(self, connection):
464 if not self._default_session: 465 self._default_session = _create_session(connection) 466 self._default_session.context = self 467 return self._default_session
468
469 - def on_session_remote_close(self, event):
470 event.connection.close() 471 self._default_session = None
472
473 -class GlobalOverrides(object):
474 """ 475 Internal handler that triggers the necessary socket connect for an 476 opened connection. 477 """
478 - def __init__(self, base):
479 self.base = base
480
481 - def on_unhandled(self, name, event):
482 if not self._override(event): 483 event.dispatch(self.base)
484
485 - def _override(self, event):
486 conn = event.connection 487 return conn and hasattr(conn, '_overrides') and event.dispatch(conn._overrides)
488
489 -class Connector(Handler):
490 """ 491 Internal handler that triggers the necessary socket connect for an 492 opened connection. 493 """
494 - def __init__(self, connection):
495 self.connection = connection 496 self.address = None 497 self.heartbeat = None 498 self.reconnect = None 499 self.ssl_domain = None 500 self.allow_insecure_mechs = True 501 self.allowed_mechs = None 502 self.sasl_enabled = True 503 self.user = None 504 self.password = None
505
506 - def _connect(self, connection):
507 url = self.address.next() 508 # IoHandler uses the hostname to determine where to try to connect to 509 connection.hostname = "%s:%s" % (url.host, url.port) 510 logging.info("connecting to %s..." % connection.hostname) 511 512 transport = Transport() 513 if self.sasl_enabled: 514 sasl = transport.sasl() 515 sasl.allow_insecure_mechs = self.allow_insecure_mechs 516 if url.username: 517 connection.user = url.username 518 elif self.user: 519 connection.user = self.user 520 if url.password: 521 connection.password = url.password 522 elif self.password: 523 connection.password = self.password 524 if self.allowed_mechs: 525 sasl.allowed_mechs(self.allowed_mechs) 526 transport.bind(connection) 527 if self.heartbeat: 528 transport.idle_timeout = self.heartbeat 529 if url.scheme == 'amqps' and self.ssl_domain: 530 self.ssl = SSL(transport, self.ssl_domain) 531 self.ssl.peer_hostname = url.host
532
533 - def on_connection_local_open(self, event):
534 self._connect(event.connection)
535
536 - def on_connection_remote_open(self, event):
537 logging.info("connected to %s" % event.connection.hostname) 538 if self.reconnect: 539 self.reconnect.reset() 540 self.transport = None
541
542 - def on_transport_tail_closed(self, event):
543 self.on_transport_closed(event)
544
545 - def on_transport_closed(self, event):
546 if self.connection and self.connection.state & Endpoint.LOCAL_ACTIVE: 547 if self.reconnect: 548 event.transport.unbind() 549 delay = self.reconnect.next() 550 if delay == 0: 551 logging.info("Disconnected, reconnecting...") 552 self._connect(self.connection) 553 else: 554 logging.info("Disconnected will try to reconnect after %s seconds" % delay) 555 event.reactor.schedule(delay, self) 556 else: 557 logging.info("Disconnected") 558 self.connection = None
559
560 - def on_timer_task(self, event):
561 self._connect(self.connection)
562
563 - def on_connection_remote_close(self, event):
564 self.connection = None
565
566 -class Backoff(object):
567 """ 568 A reconnect strategy involving an increasing delay between 569 retries, up to a maximum or 10 seconds. 570 """
571 - def __init__(self):
572 self.delay = 0
573
574 - def reset(self):
575 self.delay = 0
576
577 - def next(self):
578 current = self.delay 579 if current == 0: 580 self.delay = 0.1 581 else: 582 self.delay = min(10, 2*current) 583 return current
584
585 -class Urls(object):
586 - def __init__(self, values):
587 self.values = [Url(v) for v in values] 588 self.i = iter(self.values)
589
590 - def __iter__(self):
591 return self
592
593 - def next(self):
594 try: 595 return next(self.i) 596 except StopIteration: 597 self.i = iter(self.values) 598 return next(self.i)
599
600 -class SSLConfig(object):
601 - def __init__(self):
602 self.client = SSLDomain(SSLDomain.MODE_CLIENT) 603 self.server = SSLDomain(SSLDomain.MODE_SERVER)
604
605 - def set_credentials(self, cert_file, key_file, password):
606 self.client.set_credentials(cert_file, key_file, password) 607 self.server.set_credentials(cert_file, key_file, password)
608
609 - def set_trusted_ca_db(self, certificate_db):
610 self.client.set_trusted_ca_db(certificate_db) 611 self.server.set_trusted_ca_db(certificate_db)
612
613 614 -class Container(Reactor):
615 """A representation of the AMQP concept of a 'container', which 616 lossely speaking is something that establishes links to or from 617 another container, over which messages are transfered. This is 618 an extension to the Reactor class that adds convenience methods 619 for creating connections and sender- or receiver- links. 620 """
621 - def __init__(self, *handlers, **kwargs):
622 super(Container, self).__init__(*handlers, **kwargs) 623 if "impl" not in kwargs: 624 try: 625 self.ssl = SSLConfig() 626 except SSLUnavailable: 627 self.ssl = None 628 self.global_handler = GlobalOverrides(kwargs.get('global_handler', self.global_handler)) 629 self.trigger = None 630 self.container_id = str(generate_uuid()) 631 self.allow_insecure_mechs = True 632 self.allowed_mechs = None 633 self.sasl_enabled = True 634 self.user = None 635 self.password = None 636 Wrapper.__setattr__(self, 'subclass', self.__class__)
637
638 - def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None, **kwargs):
639 """ 640 Initiates the establishment of an AMQP connection. Returns an 641 instance of proton.Connection. 642 643 @param url: URL string of process to connect to 644 645 @param urls: list of URL strings of process to try to connect to 646 647 Only one of url or urls should be specified. 648 649 @param reconnect: A value of False will prevent the library 650 form automatically trying to reconnect if the underlying 651 socket is disconnected before the connection has been closed. 652 653 @param heartbeat: A value in milliseconds indicating the 654 desired frequency of heartbeats used to test the underlying 655 socket is alive. 656 657 @param ssl_domain: SSL configuration in the form of an 658 instance of proton.SSLdomain. 659 660 @param handler: a connection scoped handler that will be 661 called to process any events in the scope of this connection 662 or its child links 663 664 @param kwargs: sasl_enabled, which determines whether a sasl 665 layer is used for the connection; allowed_mechs an optional 666 list of SASL mechanisms to allow if sasl is enabled; 667 allow_insecure_mechs a flag indicating whether insecure 668 mechanisms, such as PLAIN over a non-encrypted socket, are 669 allowed. These options can also be set at container scope. 670 671 """ 672 conn = self.connection(handler) 673 conn.container = self.container_id or str(generate_uuid()) 674 675 connector = Connector(conn) 676 connector.allow_insecure_mechs = kwargs.get('allow_insecure_mechs', self.allow_insecure_mechs) 677 connector.allowed_mechs = kwargs.get('allowed_mechs', self.allowed_mechs) 678 connector.sasl_enabled = kwargs.get('sasl_enabled', self.sasl_enabled) 679 connector.user = kwargs.get('user', self.user) 680 connector.password = kwargs.get('password', self.password) 681 conn._overrides = connector 682 if url: connector.address = Urls([url]) 683 elif urls: connector.address = Urls(urls) 684 elif address: connector.address = address 685 else: raise ValueError("One of url, urls or address required") 686 if heartbeat: 687 connector.heartbeat = heartbeat 688 if reconnect: 689 connector.reconnect = reconnect 690 elif reconnect is None: 691 connector.reconnect = Backoff() 692 connector.ssl_domain = ssl_domain or (self.ssl and self.ssl.client) 693 conn._session_policy = SessionPerConnection() #todo: make configurable 694 conn.open() 695 return conn
696
697 - def _get_id(self, container, remote, local):
698 if local and remote: "%s-%s-%s" % (container, remote, local) 699 elif local: return "%s-%s" % (container, local) 700 elif remote: return "%s-%s" % (container, remote) 701 else: return "%s-%s" % (container, str(generate_uuid()))
702
703 - def _get_session(self, context):
704 if isinstance(context, Url): 705 return self._get_session(self.connect(url=context)) 706 elif isinstance(context, Session): 707 return context 708 elif isinstance(context, Connection): 709 if hasattr(context, '_session_policy'): 710 return context._session_policy.session(context) 711 else: 712 return _create_session(context) 713 else: 714 return context.session()
715
716 - def create_sender(self, context, target=None, source=None, name=None, handler=None, tags=None, options=None):
717 """ 718 Initiates the establishment of a link over which messages can 719 be sent. Returns an instance of proton.Sender. 720 721 There are two patterns of use. (1) A connection can be passed 722 as the first argument, in which case the link is established 723 on that connection. In this case the target address can be 724 specified as the second argument (or as a keyword 725 argument). The source address can also be specified if 726 desired. (2) Alternatively a URL can be passed as the first 727 argument. In this case a new connection will be establised on 728 which the link will be attached. If a path is specified and 729 the target is not, then the path of the URL is used as the 730 target address. 731 732 The name of the link may be specified if desired, otherwise a 733 unique name will be generated. 734 735 Various LinkOptions can be specified to further control the 736 attachment. 737 """ 738 if isinstance(context, _compat.STRING_TYPES): 739 context = Url(context) 740 if isinstance(context, Url) and not target: 741 target = context.path 742 session = self._get_session(context) 743 snd = session.sender(name or self._get_id(session.connection.container, target, source)) 744 if source: 745 snd.source.address = source 746 if target: 747 snd.target.address = target 748 if handler != None: 749 snd.handler = handler 750 if tags: 751 snd.tag_generator = tags 752 _apply_link_options(options, snd) 753 snd.open() 754 return snd
755
756 - def create_receiver(self, context, source=None, target=None, name=None, dynamic=False, handler=None, options=None):
757 """ 758 Initiates the establishment of a link over which messages can 759 be received (aka a subscription). Returns an instance of 760 proton.Receiver. 761 762 There are two patterns of use. (1) A connection can be passed 763 as the first argument, in which case the link is established 764 on that connection. In this case the source address can be 765 specified as the second argument (or as a keyword 766 argument). The target address can also be specified if 767 desired. (2) Alternatively a URL can be passed as the first 768 argument. In this case a new connection will be establised on 769 which the link will be attached. If a path is specified and 770 the source is not, then the path of the URL is used as the 771 target address. 772 773 The name of the link may be specified if desired, otherwise a 774 unique name will be generated. 775 776 Various LinkOptions can be specified to further control the 777 attachment. 778 """ 779 if isinstance(context, _compat.STRING_TYPES): 780 context = Url(context) 781 if isinstance(context, Url) and not source: 782 source = context.path 783 session = self._get_session(context) 784 rcv = session.receiver(name or self._get_id(session.connection.container, source, target)) 785 if source: 786 rcv.source.address = source 787 if dynamic: 788 rcv.source.dynamic = True 789 if target: 790 rcv.target.address = target 791 if handler != None: 792 rcv.handler = handler 793 _apply_link_options(options, rcv) 794 rcv.open() 795 return rcv
796
797 - def declare_transaction(self, context, handler=None, settle_before_discharge=False):
798 if not _get_attr(context, '_txn_ctrl'): 799 class InternalTransactionHandler(OutgoingMessageHandler): 800 def __init__(self): 801 super(InternalTransactionHandler, self).__init__(auto_settle=True)
802 803 def on_settled(self, event): 804 if hasattr(event.delivery, "transaction"): 805 event.transaction = event.delivery.transaction 806 event.delivery.transaction.handle_outcome(event)
807 context._txn_ctrl = self.create_sender(context, None, name='txn-ctrl', handler=InternalTransactionHandler()) 808 context._txn_ctrl.target.type = Terminus.COORDINATOR 809 context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions')) 810 return Transaction(context._txn_ctrl, handler, settle_before_discharge) 811
812 - def listen(self, url, ssl_domain=None):
813 """ 814 Initiates a server socket, accepting incoming AMQP connections 815 on the interface and port specified. 816 """ 817 url = Url(url) 818 acceptor = self.acceptor(url.host, url.port) 819 ssl_config = ssl_domain 820 if not ssl_config and url.scheme == 'amqps' and self.ssl: 821 ssl_config = self.ssl.server 822 if ssl_config: 823 acceptor.set_ssl_domain(ssl_config) 824 return acceptor
825
826 - def do_work(self, timeout=None):
827 if timeout: 828 self.timeout = timeout 829 return self.process()
830