Event Brokers¶
Rasa Core allows you to stream events to a message broker. The example implementation we’re going to show you here uses Pika, the Python client library for RabbitMQ.
The event broker emits events into the event queue. It becomes part of the
TrackerStore
which you use when starting an Agent
or launch
rasa_core.run
.
Adding an Event Broker Using the Endpoint Configuration¶
You can use an endpoint configuration file to instruct Rasa Core to stream
all events to your event broker. To do so, add the following section to your
endpoint configuration, e.g. endpoints.yml
:
event_broker:
url: localhost
username: username
password: password
queue: queue
type: pika
Then instruct Rasa Core to use the endpoint configuration by adding
--endpoints <path to your endpoint configuration
when running it.
Adding an Event Broker in Python¶
Here is how you add it using Python code:
from rasa_core.broker import PikaProducer
from rasa_platform.core.tracker_store import InMemoryTrackerStore
pika_broker = PikaProducer('localhost',
'username',
'password',
queue='rasa_core_events')
tracker_store = InMemoryTrackerStore(db=db, event_broker=pika_broker)
Implementing an Event Consumer¶
All events are streamed to RabbitMQ as serialised dictionaries every time
the tracker updates it state. An example event emitted from the default
tracker looks like this:
{
"sender_id": "default",
"timestamp": 1528402837.617099,
"event": "bot",
"text": "what your bot said",
"data": "some data"
}
The event
field takes the event’s type_name
(for more on event
types, check out the Events docs). You need to have a RabbitMQ
server running, as well as another application
that consumes the events. This consumer to needs to implement Pika’s
start_consuming()
method with a callback
action. Here’s a simple
example:
import json
import pika
def _callback(self, ch, method, properties, body):
# Do something useful with your incoming message body here, e.g.
# saving it to a database
print('Received event {}'.format(json.loads(body)))
if __name__ == '__main__':
# RabbitMQ credentials with username and password
credentials = pika.PlainCredentials('username', 'password')
# pika connection to the RabbitMQ host - typically 'rabbit' in a
# docker environment, or 'localhost' in a local environment
connection = pika.BlockingConnection(
pika.ConnectionParameters('rabbit', credentials=credentials))
# start consumption of channel
channel = connection.channel()
channel.basic_consume(_callback,
queue='rasa_core_events',
no_ack=True)
channel.start_consuming()
Have questions or feedback?¶
We have a very active support community on Rasa Community Forum that is happy to help you with your questions. If you have any feedback for us or a specific suggestion for improving the docs, feel free to share it by creating an issue on Rasa Core GitHub repository.