Package proton ::
Module reactor
|
|
1 from __future__ import absolute_import
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 import logging, os, socket, time, types
21 from heapq import heappush, heappop, nsmallest
22 from proton import Collector, Connection, ConnectionException, Delivery, Described, dispatch
23 from proton import Endpoint, Event, EventBase, EventType, generate_uuid, Handler, Link, Message
24 from proton import ProtonException, PN_ACCEPTED, PN_PYREF, SASL, Session, SSL, SSLDomain, SSLUnavailable, symbol
25 from proton import Terminus, Timeout, Transport, TransportException, ulong, Url
26 from select import select
27 from proton.handlers import OutgoingMessageHandler
28 from proton import unicode2utf8, utf82unicode
29
30 import traceback
31 from proton import WrappedHandler, _chandler, secs2millis, millis2secs, timeout2millis, millis2timeout, Selectable
32 from .wrapper import Wrapper, PYCTX
33 from cproton import *
34 from . import _compat
35
36 try:
37 import Queue
38 except ImportError:
39 import queue as Queue
40
41 -class Task(Wrapper):
42
43 @staticmethod
45 if impl is None:
46 return None
47 else:
48 return Task(impl)
49
52
55
57 pn_task_cancel(self._impl)
58
60
63
64 - def set_ssl_domain(self, ssl_domain):
65 pn_acceptor_set_ssl_domain(self._impl, ssl_domain._domain)
66
68 pn_acceptor_close(self._impl)
69
71
72 @staticmethod
74 if impl is None:
75 return None
76 else:
77 record = pn_reactor_attachments(impl)
78 attrs = pn_void2py(pn_record_get(record, PYCTX))
79 if attrs and 'subclass' in attrs:
80 return attrs['subclass'](impl=impl)
81 else:
82 return Reactor(impl=impl)
83
84 - def __init__(self, *handlers, **kwargs):
88
91
93 self.errors.append(info)
94 self.yield_()
95
97 return WrappedHandler.wrap(pn_reactor_get_global_handler(self._impl), self.on_error)
98
100 impl = _chandler(handler, self.on_error)
101 pn_reactor_set_global_handler(self._impl, impl)
102 pn_decref(impl)
103
104 global_handler = property(_get_global, _set_global)
105
107 return millis2timeout(pn_reactor_get_timeout(self._impl))
108
110 return pn_reactor_set_timeout(self._impl, timeout2millis(secs))
111
112 timeout = property(_get_timeout, _set_timeout)
113
115 pn_reactor_yield(self._impl)
116
118 return pn_reactor_mark(self._impl)
119
121 return WrappedHandler.wrap(pn_reactor_get_handler(self._impl), self.on_error)
122
124 impl = _chandler(handler, self.on_error)
125 pn_reactor_set_handler(self._impl, impl)
126 pn_decref(impl)
127
128 handler = property(_get_handler, _set_handler)
129
135
137 n = pn_reactor_wakeup(self._impl)
138 if n: raise IOError(pn_error_text(pn_io_error(pn_reactor_io(self._impl))))
139
141 pn_reactor_start(self._impl)
142
143 @property
145 return pn_reactor_quiesced(self._impl)
146
148 if self.errors:
149 for exc, value, tb in self.errors[:-1]:
150 traceback.print_exception(exc, value, tb)
151 exc, value, tb = self.errors[-1]
152 _compat.raise_(exc, value, tb)
153
155 result = pn_reactor_process(self._impl)
156 self._check_errors()
157 return result
158
160 pn_reactor_stop(self._impl)
161 self._check_errors()
162 self.global_handler = None
163 self.handler = None
164
166 impl = _chandler(task, self.on_error)
167 task = Task.wrap(pn_reactor_schedule(self._impl, secs2millis(delay), impl))
168 pn_decref(impl)
169 return task
170
171 - def acceptor(self, host, port, handler=None):
172 impl = _chandler(handler, self.on_error)
173 aimpl = pn_reactor_acceptor(self._impl, unicode2utf8(host), str(port), impl)
174 pn_decref(impl)
175 if aimpl:
176 return Acceptor(aimpl)
177 else:
178 raise IOError("%s (%s:%s)" % (pn_error_text(pn_io_error(pn_reactor_io(self._impl))), host, port))
179
181 impl = _chandler(handler, self.on_error)
182 result = Connection.wrap(pn_reactor_connection(self._impl, impl))
183 pn_decref(impl)
184 return result
185
187 impl = _chandler(handler, self.on_error)
188 result = Selectable.wrap(pn_reactor_selectable(self._impl))
189 if impl:
190 record = pn_selectable_attachments(result._impl)
191 pn_record_set_handler(record, impl)
192 pn_decref(impl)
193 return result
194
196 pn_reactor_update(self._impl, sel._impl)
197
199 pn_collector_put(pn_reactor_collector(self._impl), PN_PYREF, pn_py2void(obj), etype.number)
200
201 from proton import wrappers as _wrappers
202 _wrappers["pn_reactor"] = lambda x: Reactor.wrap(pn_cast_pn_reactor(x))
203 _wrappers["pn_task"] = lambda x: Task.wrap(pn_cast_pn_task(x))
207 """
208 Can be added to a reactor to allow events to be triggered by an
209 external thread but handled on the event thread associated with
210 the reactor. An instance of this class can be passed to the
211 Reactor.selectable() method of the reactor in order to activate
212 it. The close() method should be called when it is no longer
213 needed, to allow the event loop to end if needed.
214 """
216 self.queue = Queue.Queue()
217 self.pipe = os.pipe()
218 self._closed = False
219
221 """
222 Request that the given event be dispatched on the event thread
223 of the reactor to which this EventInjector was added.
224 """
225 self.queue.put(event)
226 os.write(self.pipe[1], _compat.str2bin("!"))
227
229 """
230 Request that this EventInjector be closed. Existing events
231 will be dispctahed on the reactors event dispactch thread,
232 then this will be removed from the set of interest.
233 """
234 self._closed = True
235 os.write(self.pipe[1], _compat.str2bin("!"))
236
239
245
247 os.read(self.pipe[0], 512)
248 while not self.queue.empty():
249 requested = self.queue.get()
250 event.reactor.push_event(requested.context, requested.type)
251 if self._closed:
252 s = event.context
253 s.terminate()
254 event.reactor.update(s)
255
258 """
259 Application defined event, which can optionally be associated with
260 an engine object and or an arbitrary subject
261 """
262 - def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None):
275
277 objects = [self.connection, self.session, self.link, self.delivery, self.subject]
278 return "%s(%s)" % (self.type, ", ".join([str(o) for o in objects if o is not None]))
279
281 """
282 Class to track state of an AMQP 1.0 transaction.
283 """
284 - def __init__(self, txn_ctrl, handler, settle_before_discharge=False):
285 self.txn_ctrl = txn_ctrl
286 self.handler = handler
287 self.id = None
288 self._declare = None
289 self._discharge = None
290 self.failed = False
291 self._pending = []
292 self.settle_before_discharge = settle_before_discharge
293 self.declare()
294
297
300
302 self._declare = self._send_ctrl(symbol(u'amqp:declare:list'), [None])
303
305 self.failed = failed
306 self._discharge = self._send_ctrl(symbol(u'amqp:discharge:list'), [self.id, failed])
307
309 delivery = self.txn_ctrl.send(Message(body=Described(descriptor, value)))
310 delivery.transaction = self
311 return delivery
312
313 - def send(self, sender, msg, tag=None):
314 dlv = sender.send(msg, tag=tag)
315 dlv.local.data = [self.id]
316 dlv.update(0x34)
317 return dlv
318
320 self.update(delivery, PN_ACCEPTED)
321 if self.settle_before_discharge:
322 delivery.settle()
323 else:
324 self._pending.append(delivery)
325
326 - def update(self, delivery, state=None):
327 if state:
328 delivery.local.data = [self.id, Described(ulong(state), [])]
329 delivery.update(0x34)
330
332 for d in self._pending:
333 d.update(Delivery.RELEASED)
334 d.settle()
335 self._clear_pending()
336
339
362
364 """
365 Abstract interface for link configuration options
366 """
368 """
369 Subclasses will implement any configuration logic in this
370 method
371 """
372 pass
373 - def test(self, link):
374 """
375 Subclasses can override this to selectively apply an option
376 e.g. based on some link criteria
377 """
378 return True
379
382 link.snd_settle_mode = Link.SND_SETTLED
383
386 link.snd_settle_mode = Link.SND_UNSETTLED
387 link.rcv_settle_mode = Link.RCV_FIRST
388
390 - def apply(self, sender): pass
391 - def test(self, link): return link.is_sender
392
394 - def apply(self, receiver): pass
395 - def test(self, link): return link.is_receiver
396
399 self.properties = {}
400 for k in props:
401 if isinstance(k, symbol):
402 self.properties[k] = props[k]
403 else:
404 self.properties[symbol(k)] = props[k]
405
407 if link.is_receiver:
408 link.source.properties.put_dict(self.properties)
409 else:
410 link.target.properties.put_dict(self.properties)
411
414 self.filter_set = filter_set
415
416 - def apply(self, receiver):
417 receiver.source.filter.put_dict(self.filter_set)
418
420 """
421 Configures a link with a message selector filter
422 """
423 - def __init__(self, value, name='selector'):
424 super(Selector, self).__init__({symbol(name): Described(symbol('apache.org:selector-filter:string'), value)})
425
427 - def apply(self, receiver):
428 receiver.source.durability = Terminus.DELIVERIES
429 receiver.source.expiry_policy = Terminus.EXPIRE_NEVER
430
431 -class Move(ReceiverOption):
432 - def apply(self, receiver):
433 receiver.source.distribution_mode = Terminus.DIST_MODE_MOVE
434
435 -class Copy(ReceiverOption):
436 - def apply(self, receiver):
437 receiver.source.distribution_mode = Terminus.DIST_MODE_COPY
438
440 if options:
441 if isinstance(options, list):
442 for o in options:
443 if o.test(link): o.apply(link)
444 else:
445 if options.test(link): options.apply(link)
446
451
454 if hasattr(target, name):
455 return getattr(target, name)
456 else:
457 return None
458
461 self._default_session = None
462
464 if not self._default_session:
465 self._default_session = _create_session(connection)
466 self._default_session.context = self
467 return self._default_session
468
472
474 """
475 Internal handler that triggers the necessary socket connect for an
476 opened connection.
477 """
480
482 if not self._override(event):
483 event.dispatch(self.base)
484
486 conn = event.connection
487 return conn and hasattr(conn, '_overrides') and event.dispatch(conn._overrides)
488
490 """
491 Internal handler that triggers the necessary socket connect for an
492 opened connection.
493 """
495 self.connection = connection
496 self.address = None
497 self.heartbeat = None
498 self.reconnect = None
499 self.ssl_domain = None
500 self.allow_insecure_mechs = True
501 self.allowed_mechs = None
502 self.sasl_enabled = True
503 self.user = None
504 self.password = None
505
507 url = self.address.next()
508
509 connection.hostname = "%s:%s" % (url.host, url.port)
510 logging.info("connecting to %s..." % connection.hostname)
511
512 transport = Transport()
513 if self.sasl_enabled:
514 sasl = transport.sasl()
515 sasl.allow_insecure_mechs = self.allow_insecure_mechs
516 if url.username:
517 connection.user = url.username
518 elif self.user:
519 connection.user = self.user
520 if url.password:
521 connection.password = url.password
522 elif self.password:
523 connection.password = self.password
524 if self.allowed_mechs:
525 sasl.allowed_mechs(self.allowed_mechs)
526 transport.bind(connection)
527 if self.heartbeat:
528 transport.idle_timeout = self.heartbeat
529 if url.scheme == 'amqps' and self.ssl_domain:
530 self.ssl = SSL(transport, self.ssl_domain)
531 self.ssl.peer_hostname = url.host
532
535
537 logging.info("connected to %s" % event.connection.hostname)
538 if self.reconnect:
539 self.reconnect.reset()
540 self.transport = None
541
544
546 if self.connection and self.connection.state & Endpoint.LOCAL_ACTIVE:
547 if self.reconnect:
548 event.transport.unbind()
549 delay = self.reconnect.next()
550 if delay == 0:
551 logging.info("Disconnected, reconnecting...")
552 self._connect(self.connection)
553 else:
554 logging.info("Disconnected will try to reconnect after %s seconds" % delay)
555 event.reactor.schedule(delay, self)
556 else:
557 logging.info("Disconnected")
558 self.connection = None
559
562
565
567 """
568 A reconnect strategy involving an increasing delay between
569 retries, up to a maximum or 10 seconds.
570 """
573
576
578 current = self.delay
579 if current == 0:
580 self.delay = 0.1
581 else:
582 self.delay = min(10, 2*current)
583 return current
584
587 self.values = [Url(v) for v in values]
588 self.i = iter(self.values)
589
592
594 try:
595 return next(self.i)
596 except StopIteration:
597 self.i = iter(self.values)
598 return next(self.i)
599
602 self.client = SSLDomain(SSLDomain.MODE_CLIENT)
603 self.server = SSLDomain(SSLDomain.MODE_SERVER)
604
608
612
615 """A representation of the AMQP concept of a 'container', which
616 lossely speaking is something that establishes links to or from
617 another container, over which messages are transfered. This is
618 an extension to the Reactor class that adds convenience methods
619 for creating connections and sender- or receiver- links.
620 """
621 - def __init__(self, *handlers, **kwargs):
622 super(Container, self).__init__(*handlers, **kwargs)
623 if "impl" not in kwargs:
624 try:
625 self.ssl = SSLConfig()
626 except SSLUnavailable:
627 self.ssl = None
628 self.global_handler = GlobalOverrides(kwargs.get('global_handler', self.global_handler))
629 self.trigger = None
630 self.container_id = str(generate_uuid())
631 self.allow_insecure_mechs = True
632 self.allowed_mechs = None
633 self.sasl_enabled = True
634 self.user = None
635 self.password = None
636 Wrapper.__setattr__(self, 'subclass', self.__class__)
637
638 - def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None, **kwargs):
639 """
640 Initiates the establishment of an AMQP connection. Returns an
641 instance of proton.Connection.
642
643 @param url: URL string of process to connect to
644
645 @param urls: list of URL strings of process to try to connect to
646
647 Only one of url or urls should be specified.
648
649 @param reconnect: A value of False will prevent the library
650 form automatically trying to reconnect if the underlying
651 socket is disconnected before the connection has been closed.
652
653 @param heartbeat: A value in milliseconds indicating the
654 desired frequency of heartbeats used to test the underlying
655 socket is alive.
656
657 @param ssl_domain: SSL configuration in the form of an
658 instance of proton.SSLdomain.
659
660 @param handler: a connection scoped handler that will be
661 called to process any events in the scope of this connection
662 or its child links
663
664 @param kwargs: sasl_enabled, which determines whether a sasl
665 layer is used for the connection; allowed_mechs an optional
666 list of SASL mechanisms to allow if sasl is enabled;
667 allow_insecure_mechs a flag indicating whether insecure
668 mechanisms, such as PLAIN over a non-encrypted socket, are
669 allowed. These options can also be set at container scope.
670
671 """
672 conn = self.connection(handler)
673 conn.container = self.container_id or str(generate_uuid())
674
675 connector = Connector(conn)
676 connector.allow_insecure_mechs = kwargs.get('allow_insecure_mechs', self.allow_insecure_mechs)
677 connector.allowed_mechs = kwargs.get('allowed_mechs', self.allowed_mechs)
678 connector.sasl_enabled = kwargs.get('sasl_enabled', self.sasl_enabled)
679 connector.user = kwargs.get('user', self.user)
680 connector.password = kwargs.get('password', self.password)
681 conn._overrides = connector
682 if url: connector.address = Urls([url])
683 elif urls: connector.address = Urls(urls)
684 elif address: connector.address = address
685 else: raise ValueError("One of url, urls or address required")
686 if heartbeat:
687 connector.heartbeat = heartbeat
688 if reconnect:
689 connector.reconnect = reconnect
690 elif reconnect is None:
691 connector.reconnect = Backoff()
692 connector.ssl_domain = ssl_domain or (self.ssl and self.ssl.client)
693 conn._session_policy = SessionPerConnection()
694 conn.open()
695 return conn
696
697 - def _get_id(self, container, remote, local):
698 if local and remote: "%s-%s-%s" % (container, remote, local)
699 elif local: return "%s-%s" % (container, local)
700 elif remote: return "%s-%s" % (container, remote)
701 else: return "%s-%s" % (container, str(generate_uuid()))
702
704 if isinstance(context, Url):
705 return self._get_session(self.connect(url=context))
706 elif isinstance(context, Session):
707 return context
708 elif isinstance(context, Connection):
709 if hasattr(context, '_session_policy'):
710 return context._session_policy.session(context)
711 else:
712 return _create_session(context)
713 else:
714 return context.session()
715
716 - def create_sender(self, context, target=None, source=None, name=None, handler=None, tags=None, options=None):
717 """
718 Initiates the establishment of a link over which messages can
719 be sent. Returns an instance of proton.Sender.
720
721 There are two patterns of use. (1) A connection can be passed
722 as the first argument, in which case the link is established
723 on that connection. In this case the target address can be
724 specified as the second argument (or as a keyword
725 argument). The source address can also be specified if
726 desired. (2) Alternatively a URL can be passed as the first
727 argument. In this case a new connection will be establised on
728 which the link will be attached. If a path is specified and
729 the target is not, then the path of the URL is used as the
730 target address.
731
732 The name of the link may be specified if desired, otherwise a
733 unique name will be generated.
734
735 Various LinkOptions can be specified to further control the
736 attachment.
737 """
738 if isinstance(context, _compat.STRING_TYPES):
739 context = Url(context)
740 if isinstance(context, Url) and not target:
741 target = context.path
742 session = self._get_session(context)
743 snd = session.sender(name or self._get_id(session.connection.container, target, source))
744 if source:
745 snd.source.address = source
746 if target:
747 snd.target.address = target
748 if handler != None:
749 snd.handler = handler
750 if tags:
751 snd.tag_generator = tags
752 _apply_link_options(options, snd)
753 snd.open()
754 return snd
755
756 - def create_receiver(self, context, source=None, target=None, name=None, dynamic=False, handler=None, options=None):
757 """
758 Initiates the establishment of a link over which messages can
759 be received (aka a subscription). Returns an instance of
760 proton.Receiver.
761
762 There are two patterns of use. (1) A connection can be passed
763 as the first argument, in which case the link is established
764 on that connection. In this case the source address can be
765 specified as the second argument (or as a keyword
766 argument). The target address can also be specified if
767 desired. (2) Alternatively a URL can be passed as the first
768 argument. In this case a new connection will be establised on
769 which the link will be attached. If a path is specified and
770 the source is not, then the path of the URL is used as the
771 target address.
772
773 The name of the link may be specified if desired, otherwise a
774 unique name will be generated.
775
776 Various LinkOptions can be specified to further control the
777 attachment.
778 """
779 if isinstance(context, _compat.STRING_TYPES):
780 context = Url(context)
781 if isinstance(context, Url) and not source:
782 source = context.path
783 session = self._get_session(context)
784 rcv = session.receiver(name or self._get_id(session.connection.container, source, target))
785 if source:
786 rcv.source.address = source
787 if dynamic:
788 rcv.source.dynamic = True
789 if target:
790 rcv.target.address = target
791 if handler != None:
792 rcv.handler = handler
793 _apply_link_options(options, rcv)
794 rcv.open()
795 return rcv
796
798 if not _get_attr(context, '_txn_ctrl'):
799 class InternalTransactionHandler(OutgoingMessageHandler):
800 def __init__(self):
801 super(InternalTransactionHandler, self).__init__(auto_settle=True)
802
803 def on_settled(self, event):
804 if hasattr(event.delivery, "transaction"):
805 event.transaction = event.delivery.transaction
806 event.delivery.transaction.handle_outcome(event)
807 context._txn_ctrl = self.create_sender(context, None, name='txn-ctrl', handler=InternalTransactionHandler())
808 context._txn_ctrl.target.type = Terminus.COORDINATOR
809 context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions'))
810 return Transaction(context._txn_ctrl, handler, settle_before_discharge)
811
812 - def listen(self, url, ssl_domain=None):
813 """
814 Initiates a server socket, accepting incoming AMQP connections
815 on the interface and port specified.
816 """
817 url = Url(url)
818 acceptor = self.acceptor(url.host, url.port)
819 ssl_config = ssl_domain
820 if not ssl_config and url.scheme == 'amqps' and self.ssl:
821 ssl_config = self.ssl.server
822 if ssl_config:
823 acceptor.set_ssl_domain(ssl_config)
824 return acceptor
825
830