class Stomp::Client
Typical Stomp client class. Uses a listener thread to receive frames from the server, any thread can send.
Receives all happen in one thread, so consider not doing much processing in that thread if you have much message volume.
Attributes
Parameters hash
Public Class Methods
A new Client object can be initialized using three forms:
Hash (this is the recommended Client initialization method):
hash = { :hosts => [ {:login => "login1", :passcode => "passcode1", :host => "localhost", :port => 61616, :ssl => false}, {:login => "login2", :passcode => "passcode2", :host => "remotehost", :port => 61617, :ssl => false} ], # These are the default parameters and do not need to be set :reliable => true, # reliable (use failover) :initial_reconnect_delay => 0.01, # initial delay before reconnect (secs) :max_reconnect_delay => 30.0, # max delay before reconnect :use_exponential_back_off => true, # increase delay between reconnect attpempts :back_off_multiplier => 2, # next delay multiplier :max_reconnect_attempts => 0, # retry forever, use # for maximum attempts :randomize => false, # do not radomize hosts hash before reconnect :connect_timeout => 0, # Timeout for TCP/TLS connects, use # for max seconds :connect_headers => {}, # user supplied CONNECT headers (req'd for Stomp 1.1+) :parse_timeout => 5, # IO::select wait time on socket reads :logger => nil, # user suplied callback logger instance :dmh => false, # do not support multihomed IPV4 / IPV6 hosts during failover :closed_check => true, # check first if closed in each protocol method :hbser => false, # raise on heartbeat send exception :stompconn => false, # Use STOMP instead of CONNECT :usecrlf => false, # Use CRLF command and header line ends (1.2+) :max_hbread_fails => 0, # Max HB read fails before retry. 0 => never retry :max_hbrlck_fails => 0, # Max HB read lock obtain fails before retry. 0 => never retry :fast_hbs_adjust => 0.0, # Fast heartbeat senders sleep adjustment, seconds, needed ... # For fast heartbeat senders. 'fast' == YMMV. If not # correct for your environment, expect unnecessary fail overs :connread_timeout => 0, # Timeout during CONNECT for read of CONNECTED/ERROR, secs :tcp_nodelay => true, # Turns on the TCP_NODELAY socket option; disables Nagle's algorithm :start_timeout => 0, # Timeout around Stomp::Client initialization :sslctx_newparm => nil, # Param for SSLContext.new :ssl_post_conn_check => true, # Further verify broker identity } e.g. c = Stomp::Client.new(hash)
Positional parameters:
login (String, default : '') passcode (String, default : '') host (String, default : 'localhost') port (Integer, default : 61613) reliable (Boolean, default : false) e.g. c = Stomp::Client.new('login', 'passcode', 'localhost', 61613, true)
Stomp URL :
A Stomp URL must begin with 'stomp://' and can be in one of the following forms: stomp://host:port stomp://host.domain.tld:port stomp://login:passcode@host:port stomp://login:passcode@host.domain.tld:port e.g. c = Stomp::Client.new(urlstring)
# File lib/stomp/client.rb, line 82 def initialize(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, autoflush = false) parse_hash_params(login) || parse_stomp_url(login) || parse_failover_url(login) || parse_positional_params(login, passcode, host, port, reliable) @logger = @parameters[:logger] ||= Stomp::NullLogger.new @start_timeout = @parameters[:start_timeout] || 0 check_arguments!() # p [ "cldbg01", @parameters ] begin Timeout::timeout(@start_timeout) { create_error_handler create_connection(autoflush) start_listeners() } rescue TimeoutError # p [ "cldbg02" ] ex = Stomp::Error::StartTimeoutException.new(@start_timeout) raise ex end end
open is syntactic sugar for 'Client.new', see 'initialize' for usage.
# File lib/stomp/client.rb, line 133 def self.open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false) Client.new(login, passcode, host, port, reliable) end
Public Instance Methods
Abort aborts work in a transaction by name.
# File lib/stomp/client.rb, line 149 def abort(name, headers = {}) @connection.abort(name, headers) # replay any ack'd messages in this transaction replay_list = @replay_messages_by_txn[name] if replay_list replay_list.each do |message| find_listener(message) # find_listener also calls the listener end end end
Acknowledge a message, used when a subscription has specified client acknowledgement ( connection.subscribe(“/queue/a”,{:ack => 'client'}). Accepts a transaction header ( :transaction => 'some_transaction_id' ).
# File lib/stomp/client.rb, line 194 def ack(message, headers = {}) txn_id = headers[:transaction] if txn_id # lets keep around messages ack'd in this transaction in case we rollback replay_list = @replay_messages_by_txn[txn_id] if replay_list.nil? replay_list = [] @replay_messages_by_txn[txn_id] = replay_list end replay_list << message end if block_given? headers = headers.merge(:receipt => register_receipt_listener(lambda {|r| yield r})) end context = ack_context_for(message, headers) @connection.ack context[:message_id], context[:headers] end
# File lib/stomp/client.rb, line 222 def ack_context_for(message, headers) id = case protocol when Stomp::SPL_12 'ack' when Stomp::SPL_11 headers = headers.merge(:subscription => message.headers['subscription']) 'message-id' else 'message-id' end {:message_id => message.headers[id], :headers => headers} end
autoflush returns the current connection's autoflush setting.
# File lib/stomp/client.rb, line 346 def autoflush() @connection.autoflush() end
autoflush= sets the current connection's autoflush setting.
# File lib/stomp/client.rb, line 341 def autoflush=(af) @connection.autoflush = af end
Begin starts work in a a transaction by name.
# File lib/stomp/client.rb, line 144 def begin(name, headers = {}) @connection.begin(name, headers) end
close frees resources in use by this client. The listener thread is terminated, and disconnect on the connection is called.
# File lib/stomp/client.rb, line 278 def close(headers={}) @listener_thread.exit @connection.disconnect(headers) end
close? tests if this client connection is closed.
# File lib/stomp/client.rb, line 267 def closed?() @connection.closed?() end
Commit commits work in a transaction by name.
# File lib/stomp/client.rb, line 162 def commit(name, headers = {}) txn_id = headers[:transaction] @replay_messages_by_txn.delete(txn_id) @connection.commit(name, headers) end
Return the broker's CONNECTED frame to the client. Misnamed.
# File lib/stomp/client.rb, line 252 def connection_frame() @connection.connection_frame end
# File lib/stomp/client.rb, line 107 def create_error_handler client_thread = Thread.current @error_listener = lambda do |error| exception = case error.body when /ResourceAllocationException/i Stomp::Error::ProducerFlowControlException.new(error) when /ProtocolException/i Stomp::Error::ProtocolException.new(error) else Stomp::Error::BrokerException.new(error) end @receipt_listeners.delete(error.headers['receipt-id']) if error.headers['receipt-id'] client_thread.raise exception end end
Return any RECEIPT frame received by DISCONNECT.
# File lib/stomp/client.rb, line 257 def disconnect_receipt() @connection.disconnect_receipt end
#hbrecv_count returns the current connection's heartbeat receive count.
# File lib/stomp/client.rb, line 330 def hbrecv_count() @connection.hbrecv_count() end
#hbrecv_interval returns the connection's heartbeat receive interval.
# File lib/stomp/client.rb, line 320 def hbrecv_interval() @connection.hbrecv_interval() end
#hbsend_count returns the current connection's heartbeat send count.
# File lib/stomp/client.rb, line 325 def hbsend_count() @connection.hbsend_count() end
#hbsend_interval returns the connection's heartbeat send interval.
# File lib/stomp/client.rb, line 315 def hbsend_interval() @connection.hbsend_interval() end
join the listener thread for this client, generally used to wait for a quit signal.
# File lib/stomp/client.rb, line 139 def join(limit = nil) @listener_thread.join(limit) end
jruby? tests if the connection has detcted a JRuby environment
# File lib/stomp/client.rb, line 272 def jruby?() @connection.jruby end
Stomp 1.1+ NACK.
# File lib/stomp/client.rb, line 216 def nack(message, headers = {}) context = ack_context_for(message, headers) @connection.nack context[:message_id], context[:headers] end
open? tests if this client connection is open.
# File lib/stomp/client.rb, line 262 def open? @connection.open?() end
Poll for asynchronous messages issued by broker. Return nil of no message available, else the message
# File lib/stomp/client.rb, line 336 def poll() @connection.poll() end
protocol returns the current client's protocol level.
# File lib/stomp/client.rb, line 295 def protocol() @connection.protocol() end
Publishes message to destination. If a block is given a receipt will be requested and passed to the block on receipt. Accepts a transaction header ( :transaction => 'some_transaction_id' ).
# File lib/stomp/client.rb, line 244 def publish(destination, message, headers = {}) if block_given? headers = headers.merge(:receipt => register_receipt_listener(lambda {|r| yield r})) end @connection.publish(destination, message, headers) end
running checks if the thread was created and is not dead.
# File lib/stomp/client.rb, line 284 def running() @listener_thread && !!@listener_thread.status end
#set_logger identifies a new callback logger.
# File lib/stomp/client.rb, line 289 def set_logger(logger) @logger = logger @connection.set_logger(logger) end
sha1 returns a SHA1 sum of a given string.
# File lib/stomp/client.rb, line 305 def sha1(data) @connection.sha1(data) end
Subscribe to a destination, must be passed a block which will be used as a callback listener. Accepts a transaction header ( :transaction => 'some_transaction_id' ).
# File lib/stomp/client.rb, line 171 def subscribe(destination, headers = {}) raise "No listener given" unless block_given? # use subscription id to correlate messages to subscription. As described in # the SUBSCRIPTION section of the protocol: http://stomp.github.com/. # If no subscription id is provided, generate one. headers = headers.merge(:id => build_subscription_id(destination, headers)) if @listeners[headers[:id]] raise "attempting to subscribe to a queue with a previous subscription" end @listeners[headers[:id]] = lambda {|msg| yield msg} @connection.subscribe(destination, headers) end
Unreceive a message, sending it back to its queue or to the DLQ.
# File lib/stomp/client.rb, line 236 def unreceive(message, options = {}) @connection.unreceive(message, options) end
Unsubscribe from a subscription by name.
# File lib/stomp/client.rb, line 185 def unsubscribe(name, headers = {}) headers = headers.merge(:id => build_subscription_id(name, headers)) @connection.unsubscribe(name, headers) @listeners[headers[:id]] = nil end
uuid returns a type 4 UUID.
# File lib/stomp/client.rb, line 310 def uuid() @connection.uuid() end
valid_utf8? validates any given string for UTF8 compliance.
# File lib/stomp/client.rb, line 300 def valid_utf8?(s) @connection.valid_utf8?(s) end
Private Instance Methods
# File lib/client/utils.rb, line 77 def build_subscription_id(destination, headers) return headers[:id] until headers[:id].nil? return headers['id'] until headers['id'].nil? Digest::SHA1.hexdigest(destination) end
A sanity check of required arguments.
# File lib/client/utils.rb, line 102 def check_arguments!() raise ArgumentError.new("missing :hosts parameter") unless @parameters[:hosts] raise ArgumentError.new("invalid :hosts type") unless @parameters[:hosts].is_a?(Array) @parameters[:hosts].each do |hv| # Validate port requested raise ArgumentError.new("empty :port value in #{hv.inspect}") if hv[:port] == '' unless hv[:port].nil? tpv = hv[:port].to_i raise ArgumentError.new("invalid :port value=#{tpv} from #{hv.inspect}") if tpv < 1 || tpv > 65535 end # Validate host requested (no validation here. if nil or '', localhost will # be used in #Connection.) end raise ArgumentError unless @parameters[:reliable].is_a?(TrueClass) || @parameters[:reliable].is_a?(FalseClass) # if @parameters[:reliable] && @start_timeout > 0 warn "WARN detected :reliable == true and :start_timeout > 0" warn "WARN this may cause incorrect fail-over behavior" warn "WARN use :start_timeout => 0 to correct fail-over behavior" end end
# File lib/stomp/client.rb, line 125 def create_connection(autoflush) # p [ "ccon01", @parameters ] @connection = Connection.new(@parameters) @connection.autoflush = autoflush end
# File lib/client/utils.rb, line 167 def create_listener_maps @listeners = {} @receipt_listeners = {} @replay_messages_by_txn = {} @listener_map = Hash.new do |message| unless @connection.slog(:on_miscerr, @connection.log_params, "Received unknown frame type: '#{message.command}'\n") warn "Received unknown frame type: '#{message.command}'\n" end end @listener_map[Stomp::CMD_MESSAGE] = lambda {|message| find_listener(message) } @listener_map[Stomp::CMD_RECEIPT] = lambda {|message| find_receipt_listener(message) } @listener_map[Stomp::CMD_ERROR] = @error_listener end
#filter_options returns a new Hash of filtered options.
# File lib/client/utils.rb, line 125 def filter_options(options) new_options = {} new_options[:initial_reconnect_delay] = (options["initialReconnectDelay"] || 10).to_f / 1000 # In ms new_options[:max_reconnect_delay] = (options["maxReconnectDelay"] || 30000 ).to_f / 1000 # In ms new_options[:use_exponential_back_off] = !(options["useExponentialBackOff"] == "false") # Default: true new_options[:back_off_multiplier] = (options["backOffMultiplier"] || 2 ).to_i new_options[:max_reconnect_attempts] = (options["maxReconnectAttempts"] || 0 ).to_i new_options[:randomize] = options["randomize"] == "true" # Default: false new_options[:connect_timeout] = 0 new_options end
#find_listener returns the listener for a given subscription in a given message.
# File lib/client/utils.rb, line 139 def find_listener(message) subscription_id = message.headers['subscription'] if subscription_id == nil # For backward compatibility, some messages may already exist with no # subscription id, in which case we can attempt to synthesize one. set_subscription_id_if_missing(message.headers['destination'], message.headers) subscription_id = message.headers[:id] end listener = @listeners[subscription_id] listener.call(message) if listener end
# File lib/client/utils.rb, line 159 def find_receipt_listener(message) listener = @receipt_listeners[message.headers['receipt-id']] if listener listener.call(message) @receipt_listeners.delete(message.headers['receipt-id']) end end
e.g. failover://(stomp://login1:passcode1@localhost:61616,stomp://login2:passcode2@remotehost:61617)?option1=param
# File lib/client/utils.rb, line 37 def parse_failover_url(login) rval = nil original_verbose, $VERBOSE = $VERBOSE, nil # shut off warnings md = FAILOVER_REGEX.match(login) $VERBOSE = original_verbose if md finhosts = parse_hosts(login) options = {} if md_last = md[-1] parts = md_last.split(/&|=/) raise Stomp::Error::MalformedFailoverOptionsError unless ( parts.size % 2 ) == 0 options = Hash[*parts] end @parameters = {:hosts => finhosts}.merge!(filter_options(options)) @parameters[:reliable] = true rval = true end rval end
# File lib/client/utils.rb, line 11 def parse_hash_params(params) return false unless params.is_a?(Hash) @parameters = params # Do not override user choice of false. @parameters[:reliable] = true unless @parameters[:reliable] == false true end
Parse a stomp URL.
# File lib/client/utils.rb, line 84 def parse_hosts(url) hosts = [] original_verbose, $VERBOSE = $VERBOSE, nil # shut off warnings host_match = /stomp(\+ssl)?:\/\/#{URL_REPAT}/ url.scan(host_match).each do |match| host = {} host[:ssl] = match[0] == "+ssl" ? true : false host[:login] = match[3] || "" host[:passcode] = match[4] || "" host[:host] = match[5] host[:port] = match[6].to_i hosts << host end $VERBOSE = original_verbose hosts end
# File lib/client/utils.rb, line 60 def parse_positional_params(login, passcode, host, port, reliable) @parameters = { :reliable => reliable, :hosts => [ { :login => login, :passcode => passcode, :host => host, :port => port.to_i } ] } true end
# File lib/client/utils.rb, line 21 def parse_stomp_url(login) original_verbose, $VERBOSE = $VERBOSE, nil # shut off warnings regexp = /^stomp:\/\/#{URL_REPAT}/ url = regexp.match(login) $VERBOSE = original_verbose return false unless url @parameters = { :reliable => false, :hosts => [ { :login => url[3] || "", :passcode => url[4] || "", :host => url[5], :port => url[6].to_i} ] } true end
Register a receipt listener.
# File lib/client/utils.rb, line 153 def register_receipt_listener(listener) id = uuid @receipt_listeners[id] = listener id end
Set a subscription id in the headers hash if one does not already exist. For simplicities sake, all subscriptions have a subscription ID. setting an id in the SUBSCRIPTION header is described in the stomp protocol docs: stomp.github.com/
# File lib/client/utils.rb, line 73 def set_subscription_id_if_missing(destination, headers) headers[:id] = build_subscription_id(destination, headers) end
Start a single listener thread. Misnamed I think.
# File lib/client/utils.rb, line 184 def start_listeners() create_listener_maps @listener_thread = Thread.start do loop do message = @connection.receive # AMQ specific behavior if message.nil? && (!@parameters[:reliable]) raise Stomp::Error::NilMessageError end next unless message # message can be nil on rapid AMQ stop/start sequences @listener_map[message.command].call(message) end end end