Metadata-Version: 2.1
Name: geeteventbus
Version: 1.0
Summary: An eventbus for highly concurrent system
Home-page: https://github.com/nipuntalukdar/geeteventbus
Author: Nipun Talukdar
Author-email: nipunmlist@gmail.com
Maintainer: Nipun Talukdar
Maintainer-email: nipunmlist@gmail.com
License: MIT
Platform: all
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 2
Classifier: Programming Language :: Python :: 2.6
Classifier: Programming Language :: Python :: 2.7
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.2
Classifier: Programming Language :: Python :: 3.3
Classifier: Programming Language :: Python :: 3.4

geeteventbus
============
An eventbus for concurrent programming
--------------------------------------

geeteventbus is a library that allows publish-subscribe-style communication. There is no need for the components to register to each-other. It is inspired by a Java library, Guava eventbus from Google. But it is not exactly same as the Guava eventbus library.

- geeteventbus simplifies handling events from publishers and subscribers.
- publisher and subscribers don't need to create threads to concurrently process the events.
- the eventbus can be synchronus, where the events are delivered from the same thread posting the events
- events can be delivered to subscibers in the same order they are posted
- subscribers may be declared as thread-safe, in that case same subscriber may be invoked concurrently for processing multiple events
- events for which there are no subscribers are registered yet are simply discared by the eventbus.
- the eventbus is not to be used for inter process communication. Publishers and subsribers must run on the same process

Basic working
-------------

1) We create an eventbus

    .. code:: python

        from geeteventbus.eventbus import eventbus
        eb = eventbus()

   This will create an eventbus with the defaults. The default eventbus will have below characteristics:

        1) the maximum queued event limit is set to 10000
        2) number of executor thread is 8
        3) the subscribers will be called asynchronously
        4) subscibers are treated as thread-safe and hence same subscribers may be invoked simultaneously  on different threads
    
2) Create a subsclass of subscriber and override the process method. Create an object of this class and register it to the eventbus for receiving messages with certain topics:
    
    .. code:: python

        from geeteventbus.subscriber import subscriber
        from geeteventbus.eventbus import eventbus
        from geeteventbus.event import event

        class mysubscriber(subscriber):
            def process(self, eventobj):
                if not isinstance(eventobj, event):
                    print('Invalid object type is passed.')
                    return
                topic = eventobj.get_topic()
                data = eventobj.get_data()
                print('Processing event with TOPIC: %s, DATA: %s' % (topic, data))
        
        subscr = mysubscriber()
        eb.register_consumer(subscr, 'an_important_topic')

3) Post some events to the eventbus with the topic "an_important_topic".

    .. code:: python

        from geeteventbus.event import event

        eobj1 = ('an_important_topic', 'This is some data for the event 1')
        eobj2 = ('an_important_topic', 'This is some data for the event 2')
        eobj3 = ('an_important_topic', 'This is some data for the event 3')
        eobj3 = ('an_important_topic', 'This is some data for the event 4')
    
        eb.post(eobj1)
        eb.post(eobj2)
        eb.post(eobj3)
        eb.post(eobj4)

4) We may gracefully shutdown the eventbus before exiting the process

    .. code:: python
        
        eb.shutdown()


The complete example is below:
    
    .. code:: python
        
        from time import sleep
        from geeteventbus.subscriber import subscriber
        from geeteventbus.eventbus import eventbus
        from geeteventbus.event import event

        class mysubscriber(subscriber):
            def process(self, eventobj):
                if not isinstance(eventobj, event):
                    print('Invalid object type is passed.')
                    return
                topic = eventobj.get_topic()
                data = eventobj.get_data()
                print('Processing event with TOPIC: %s, DATA: %s' % (topic, data))
        
        
        eb = eventbus()
        subscr = mysubscriber()
        eb.register_consumer(subscr, 'an_important_topic')
        

        eobj1 = event('an_important_topic', 'This is some data for the event 1')
        eobj2 = event('an_important_topic', 'This is some data for the event 2')
        eobj3 = event('an_important_topic', 'This is some data for the event 3')
        eobj4 = event('an_important_topic', 'This is some data for the event 4')
    
        eb.post(eobj1)
        eb.post(eobj2)
        eb.post(eobj3)
        eb.post(eobj4)

        eb.shutdown()
        sleep(2)


A more detailed example is given below. A subscriber (counter_aggregator) aggregates the values for 
a set of counters. It registers itself to an eventbus for receiving events for the 
counters(topics). A set of producers update the values for the counters and post events describing
the counter to the eventbus:
    
    .. code:: python

        from threading import Lock, Thread
        from time import sleep, time
        from geeteventbus.eventbus import eventbus
        from geeteventbus.event import event
        from geeteventbus.subscriber import subscriber
        from random import randint


        class counter_aggregator(subscriber, Thread):
            '''
            Aggregator for a set of counters. Multiple threads updates the counts which
            are aggregated by this class and output the aggregated value periodically.
            '''
            def __init__(self, counter_names):
                Thread.__init__(self)
                self.counter_names = counter_names
                self.locks = {}
                self.counts = {}
                self.keep_running = True
                self.collect_times = {}
                for counter in counter_names:
                    self.locks[counter] = Lock()
                    self.counts[counter] = 0
                    self.collect_times[counter] = time()

            def process(self, eobj):
                '''
                Process method calls with the event object eobj. eobj has the counter name as the topic
                and an int count as the value for the counter.
                '''
                counter_name = eobj.get_topic()
                if counter_name not in self.counter_names:
                    return
                count = eobj.get_data()
                with self.locks[counter_name]:
                    self.counts[counter_name] += count

            def stop(self):
                self.keep_running = False

            def __call__(self):
                '''
                Keep outputing the aggregated counts every 2 seconds
                '''
                while self.keep_running:
                    sleep(2)
                    for counter_name in self.counter_names:
                        with self.locks[counter_name]:
                            print('Change for counter %s = %d, in last %f secs' % (counter_name,
                                  self.counts[counter_name], time() - self.collect_times[counter_name]))
                            self.counts[counter_name] = 0
                            self.collect_times[counter_name] = time()
                print('Aggregator exited')


        class count_producer:
            '''
            Producer for counters. Every 0.02 seconds post the "updated" value for a
            counter randomly
            '''
            def __init__(self, counters, ebus):
                self.counters = counters
                self.ebus = ebus
                self.keep_running = True
                self.num_counter = len(counters)

            def stop(self):
                self.keep_running = False

            def __call__(self):
                while self.keep_running:
                    ev = event(self.counters[randint(0, self.num_counter - 1)], randint(1, 100))
                    ebus.post(ev)
                    sleep(0.02)
                print('producer exited')

        if __name__ == '__main__':
            ebus = eventbus()
            counters = ['c1', 'c2', 'c3', 'c4']
            subcr = counter_aggregator(counters)
            producer = count_producer(counters, ebus)
            for counter in counters:
                ebus.register_consumer(subcr, counter)
            threads = []
            i = 30
            while i > 0:
                threads.append(Thread(target=producer))
                i -= 1

            aggregator_thread = Thread(target=subcr)
            aggregator_thread.start()
            for thrd in threads:
                thrd.start()
            sleep(20)
            producer.stop()
            subcr.stop()
            sleep(2)
            ebus.shutdown()


