Details
-
Type:
Bug
-
Status:
Open
-
Priority:
Major
-
Resolution: Unresolved
-
Affects Version/s: JRuby 1.5.1
-
Fix Version/s: None
-
Component/s: Core Classes/Modules
-
Labels:None
-
Environment:osx / jvm 1.6
-
Patch Submitted:Yes
-
Number of attachments :
Description
The stomp consumer times out after 5 seconds throwing the following error.
consumer.rb:18:in `_receive': execution expired (Stomp::Error::PacketParsingTimeout) from consumer.rb:12:in `_receive' from consumer.rb:7:in `_receive' from /Users/victori/Documents/Java/jruby-1.4/lib/ruby/gems/1.8/gems/stomp-1.1.6/lib/stomp/connection.rb:325:in `__old_receive' from /Users/victori/Documents/Java/jruby-1.4/lib/ruby/gems/1.8/gems/stomp-1.1.6/lib/stomp/connection.rb:335:in `receive' from /Users/victori/Documents/Java/jruby-1.4/lib/ruby/gems/1.8/gems/stomp-1.1.6/lib/stomp/client.rb:289:in `start_listeners' from /Users/victori/Documents/Java/jruby-1.4/lib/ruby/gems/1.8/gems/stomp-1.1.6/lib/stomp/client.rb:287:in `start' from /Users/victori/Documents/Java/jruby-1.4/lib/ruby/gems/1.8/gems/stomp-1.1.6/lib/stomp/client.rb:287:in `start_listeners' from /Users/victori/Documents/Java/jruby-1.4/lib/ruby/gems/1.8/gems/stomp-1.1.6/lib/stomp/client.rb:96:in `initialize' from consumer.rb:53:in `new' from consumer.rb:53
Here is the code to this example
consumer.rb
require 'rubygems' require 'stomp' client = Stomp::Client.new "failover:(stomp://localhost:61613)?initialReconnectDelay=100" client.subscribe('/queue/test_queue', {:ack => "client", "activemq.prefetchSize" => 1, "activemq.exclusive" => true }) do |msg| p msg.body client.acknowledge(msg) end client.join
producer.rb
require 'rubygems' require 'stomp' client = Stomp::Client.new "failover:(stomp://localhost:61613)?initialReconnectDelay=100" client.publish("/queue/test_queue", "hello world!",{:persistent=>true})
And here is the monkey patch that fixes the issue; commenting out the timeout callback
# for stomp subscriber if defined?(JRUBY_VERSION) module Stomp class Connection def _receive( read_socket ) @read_semaphore.synchronize do line = read_socket.gets return nil if line.nil? # If the reading hangs for more than 5 seconds, abort the parsing process #Timeout::timeout(5, Stomp::Error::PacketParsingTimeout) do # Reads the beginning of the message until it runs into a empty line message_header = '' begin message_header += line begin line = read_socket.gets rescue p read_socket end end until line =~ /^\s?\n$/ # Checks if it includes content_length header content_length = message_header.match /content-length\s?:\s?(\d+)\s?\n/ message_body = '' # If it does, reads the specified amount of bytes char = '' if content_length message_body = read_socket.read content_length[1].to_i raise Stomp::Error::InvalidMessageLength unless parse_char(read_socket.getc) == "\0" # Else reads, the rest of the message until the first \0 else message_body += char while read_socket.ready? && (char = parse_char(read_socket.getc)) != "\0" end # If the buffer isn't empty, reads the next char and returns it to the buffer # unless it's a \n if read_socket.ready? last_char = read_socket.getc read_socket.ungetc(last_char) if parse_char(last_char) != "\n" end # Adds the excluded \n and \0 and tries to create a new message with it Message.new(message_header + "\n" + message_body + "\0") end #end end end end end
Formatting change for readability.