我正在构建一个小的
ruby程序来运行与
MQTT服务器的连接并订阅一个频道.我正在使用
mosquitto宝石,它只是
libmosquitto C库的桥梁.
我创建了一个非常简单的程序实现,可以使用ruby my_prog.rb运行:
# Dependencies require File.expand_path(File.join('..','environment'),__FILE__) # MQTT Application module Pulsr class MQTT attr_reader :host,:port,:alive def initialize(host = 'iot.eclipse.org',port = 1883,alive = 60) @client ||= Mosquitto::Client.new SecureRandom.hex(8) Signal.trap(Signal.list.has_key?('INT') ? 'SIGINT' : 'SIGTERM') do @client.log 'Shutdown' shutdown end @host = host @port = port @alive = alive start end private def on_connect Proc.new { |return_code| @client.log "Connected RC #{return_code}" @client.subscribe(nil,'/pulsr',Mosquitto::EXACTLY_ONCE) } end def on_disconnect Proc.new { |return_code| @client.log "Disconnected RC #{return_code}" } end def on_subscribe Proc.new { |message_id,granted_qos| @client.log "Subscribed MID #{message_id} QoS #{granted_qos}" } end def on_unsubscribe Proc.new { |message_id| @client.log "Unsubscribed MID #{message_id}" } end def on_message Proc.new { |message| Pulsr::Workers::TrackingEvent.perform_async message.to_s } end def configure @client.logger = Logger.new(STDOUT) @client.on_connect &on_connect @client.on_disconnect &on_disconnect @client.on_subscribe &on_subscribe @client.on_unsubscribe &on_unsubscribe @client.on_message &on_message end def connect @client.connect_async(@host,@port,@alive) end def start @client.loop_start configure connect sleep end def shutdown @client.loop_stop(true) Process.exit end end end # MQTT Start Pulsr::MQTT.new :host => 'iot.eclipse.org',:port => 1883,:alive => 60
我想知道,如果我想使用Celluloid或EventMachine运行mosquitto gem提供的循环,我该怎么做?
mosquitto gem提供了一个很好的documentation,并提供了一些可以使用的循环方法,但我不知道从哪里开始或如何做,我既没有使用EM或赛璐珞.
任何人都可以帮助开始这个,我认为它可以为社区带来一些价值,它最终可以作为一个开源项目,蚊子宝石的一小部分?
解决方法
我认为这并不难.
Mosquitto有一个很好的图书馆.
Mosquitto有一个很好的图书馆.
你需要连接这些功能:
mosquitto_loop_misc() <-> EventMachine::PeriodicTimer.new mosquitto_read() <-> EventMachine.watch mosquitto_write() <-> EventMachine.watch