Login or register    

Salt Stack Piggy Back Encrypted ZMQ Event Bus

Some code examples that piggy back on Salt Stack's encrypted ZMQ event bus. These code examples show how 3rd party scripts on each side may communicate over secure and fast transport!

Requires:

Fire event to minion bus and listen to minion bus

In this example we write a listener/subscriber script that connects to the minion bus. We then write a emitter/publisher script that connects to the minion bus and posts events.

listen_to_minion_bus.py

3rd Party Subscriber / Listener:

# needed for config to opts processing
import os
import salt.syspaths as syspaths
import salt.config

# get opts from minion config file, this function also looks in drop dir!
opts = salt.config.minion_config(os.path.join(syspaths.CONFIG_DIR, 'minion'))

# debug to STDOUT
import salt.log
salt.log.setup_console_logger('all')

# event libary for events over ZMQ
import salt.utils.event

# opts must have the valid minion ID else it binds to invalid socket
event = salt.utils.event.MinionEvent(**opts)

tag = 'mytag'

print('Listening for events tagged \'{}\' on Salt Minion bus.'.format(tag))

# generator iterator yields events forever, we filter on tag
for data in event.iter_events(tag=tag):
    print(data)

emit_to_minion_bus.py

3rd Party Publisher / Emitter:

# needed for config to opts processing
import os
import salt.syspaths as syspaths
import salt.config

# get opts from minion config file, this function also looks in drop dir!
opts = salt.config.minion_config(os.path.join(syspaths.CONFIG_DIR, 'minion'))

# debug to STDOUT
import salt.log
salt.log.setup_console_logger('all')

# event libary for events over ZMQ
import salt.utils.event

tag = 'mytag'
data = {'message':'a drop in the bucket'}

payload = {
  'id': opts['id'],
  'tag': tag,
  'data': data,
}

print (payload)

# opts must valid minion ID else it binds to invalid socket
event = salt.utils.event.SaltEvent('minion', **opts)

# Fire event payload with tag
event.fire_event(payload, tag)

Notes on the implementation

Notice how I use event.MinionEvent in listener and event.SaltEvent in emitter? The event.MinionEvent is just a very light weight subclass of event.SaltEvent so you can use either or...

The configuration code could be reduced to static variables, for instance replace -

# needed for config to opts processing
import os
import salt.syspaths as syspaths
import salt.config

# get opts from minion config file, this function also looks in drop dir!
opts = salt.config.minion_config(os.path.join(syspaths.CONFIG_DIR, 'minion'))

with this

opts = {'sock_dir':'/var/run/salt/minion', 'id':'id-of-this-minion'}

Fire event to master bus and listen to master bus

In this example we write a listener/subscriber script that connects to the master bus. We then write a emitter/publisher script that connects to the master bus and posts events.

listen_to_master_bus.py

# debug logging to STDOUT
import salt.log
salt.log.setup_console_logger('all')

# event libary for events over ZMQ
import salt.utils.event

# create event object, attach to master socket ...
event = salt.utils.event.MasterEvent('/var/run/salt/master')

tag = 'mytag'

print('Listening for events tagged \'{}\' on Salt Master bus.'.format(tag))

# generator iterator yields events forever, we filter on tag
for data in event.iter_events(tag=tag):
    print(data)

emit_to_master_bus.py

# debug logging to STDOUT
import salt.log
salt.log.setup_console_logger('all')

# event libary for events over ZMQ
import salt.utils.event

payload = {'sample-msg': 'this is a test',
           'example': 'this is the same test'}

sock_dir = '/var/run/salt/master'

# create event object, attach to master socket ...
event = salt.utils.event.SaltEvent('master', sock_dir)

# post the event
event.fire_event(payload, 'mytag')

Fire events between minion and master

There is where I get stuck ... I want to allow minions to fire events which end up on the master bus ...

I'm able to do this with salt-call, for example:

salt-call event.fire_master '{"data": "message for the master"}' 'mytag'

From my understanding the minion to master communication could work like this:

  1. A 3rd party script publishes to minion bus
  2. salt-minion daemon sees the properly formatted event tagged 'fire_master' and runs _fire_master() method
  3. the event ends up on master bus
  4. A 3rd party script listens to the master bus and subscribes (filters) on the nested tag and gains access to the message

The code in emit-to-minion-bus.py does the first step, but it seems my event "package" format is incorrect so it never gets forwarded to the master ...

This area in the salt-minion daemon loop has conditionals which determine if the package/event should be forwarded. It seems to look for the 'fire_master' tag:

emit_to_minion_bus_forward_to_master_bus.py

# needed for config to opts processing
import os
import salt.syspaths as syspaths
import salt.config

# get opts from minion config file, this function also looks in drop dir!
opts = salt.config.minion_config(os.path.join(syspaths.CONFIG_DIR, 'minion'))

# debug to STDOUT
import salt.log
salt.log.setup_console_logger('all')

# event libary for events over ZMQ
import salt.utils.event

tag = 'mytag'

# a list of event ret objects
events = [
  {'message':'a drap in the bucket', 'tag': tag },
  {'message':'a drep in the bucket', 'tag': tag },
  {'message':'a drip in the bucket', 'tag': tag },
  {'message':'a drop in the bucket', 'tag': tag },
  {'message':'a drup in the bucket', 'tag': tag },
]

# multi event example, supports a list of event ret objects
payload = {
  'id': opts['id'],
  'events': events,
  'tag': None,
  'pretag': None,
  'data': None
}

print (payload)

# opts must valid minion ID else it binds to invalid socket
event = salt.utils.event.SaltEvent('minion', **opts)

# Fire event payload with 'fire_master' tag which the
# salt-minion daemon will forward to the master event bus!
event.fire_event(payload, 'fire_master')

ideas

I think it could be neat to build a returner which accepted a ZMQ "channel" tag. Then after a remote execution the ret data could be passed from minion to master on a special channel. Then we could have a 3rd party script on the salt master that subscribes to this special channel and performs further processing or presents the data in a neat way.

for example:

salt '*' --return=master_bus --tag=monitoring cmd.run_all '/usr/lib/nagios/plugins/check_procs -w 150 -c 200'

The minion would see the remote execution command, run the job, get data back, and return to master on a special channel tag.

On the master we could have a 3rd party script subscribed to the special channel tag and perform further manipulation or persist the output somewhere.

UPDATE: I wrote a zeromq returner to return tagged data back to the master's zeromq bus here.

Comments

Leave a comment

Please login or register to leave a comment!