Messaging and scheduled jobs made simple

Marek Goldmann
JBoss

Agenda

  • Introduction to TorqueBox
  • Java Message Service
    • Concepts refresh
  • TorqueBox Messaging
    • Destination management
    • Sending and receiving messages
    • Message management
  • TorqueBox Jobs
    • Scheduled jobs
    • 'At' jobs

A cat

A cat
TorqueBox Logo

Ruby Application Server

Did he just say Ruby at a Java conference?

Yep, it's a

Ruby Application Server

based on JBoss AS

JRuby Logo
WildFly Logo
TorqueBox stack

Application Server

is not only

a Web server

Stuff we can deploy on TorqueBox

  • Web applications (both Ruby and Java)
  • Services
  • Jobs
  • Messaging
  • A mix of above
$ torquebox run

Java Message Service

Let's refresh the knowledge!

Asynchronous!

There are

Producers

and

Consumers

Two communication models

  1. point-to-point (one to one)
  2. publish-subscribe (one to many)
Queues
Topics

Destinations

In the code

# Creates and starts a new queue
@queue = TorqueBox::Messaging::Queue.start("/queues/new_queue")

# Creates and starts a new topic
@topic = TorqueBox::Messaging::Topic.start("/topics/new_topic")

Using deployment descriptors

TorqueBox.configure do
  # Creates and starts the queue with the application
  queue '/queues/one'

  # Creates and starts the topic with the application
  topic '/topics/one'
end
torquebox.rb

Demo 01

Creating and destroying destinations

Message

Message

Sending a simple message

# Sending a text message
@queue.publish("Hello!")

# Sending a Ruby Hash
@queue.publish(:type => :car, :price => 20455.73)

Defining message properties

@queue.publish("Hello!",
  :properties => {
    'type'  => 'CUCUMBER',   # String
    'count' => 126,          # Integer
    'price' => 4367.23,      # Float
    'valid' => true          # Boolean
  }
)

Receiving a message

# Watch out - this call can block
# the thread forever if no new messages arrive
message = @queue.receive

# Wait at most 10 seconds for the message
message = @queue.receive(:timeout => 10_000)

Demo 02

Sending and receiving messages

Message processor

class SimpleProcessor < TorqueBox::Messaging::MessageProcessor

  def on_message(message)
    puts "Received '#{message}'"
  end

end

Message processors usage

TorqueBox.configure do
  # Define the queue
  queue '/queues/stuff' do
    # Specify the message processor implementation
    processor SimpleProcessor
  end
end
torquebox.rb

Demo 03

Using message processors

Message filtering

SQL92-like syntax

Examples

type = 'CUCUMBER' AND price < 1.23
action = 'BUY' AND JMSPriority BETWEEN 5 AND 8
name LIKE 'MAR%'
Priority

Syntax Expressions

  • Headers (but not all!)
  • Properties
  • Operators

Receiving a message with selector

message = @queue.receive(:selector => "type = 'CUCUMBER' AND price < 1.23")

Message selectors in processors

TorqueBox.configure do
  # Define the queue
  queue '/queues/selector' do
    # Specify the message processor implementation
    processor SimpleProcessor do
      # Define the filter expression
      selector "score > 5"
    end
  end
end
torquebox.rb

Demo 04

Using message selectors

Message priority

By default: 4

Message provider delivers messages with higher priority first

Message priority in TorqueBox

  • :low set to 1
  • :normal set to 4
  • :high set to 7
  • :critical set to 9

Examples

# low priority (1)
@queue.publish("Is someone there?", :priority => :low)
# default priority (4)
@queue.publish("Hello!")
# critical priority (9)
@queue.publish("Urgent!", :priority => :critical)

Demo 05

Message priorities

Message scheduling

By default: publish NOW!

Examples

# Message is published immediately (:scheduled set to 0)
@queue.publish("Normal message")

# Message is published 10 seconds after executing the call
@queue.publish("Scheduled message", :scheduled => Time.now + 10)

Demo 06

Message scheduling

Synchronous
messaging

WAT?

You said that messaging is asynchronous!

Example

# Publish a message, then wait for reply
answer = @queue.publish_and_receive("Hello")

# Wait for a message and reply
@queue.receive_and_publish do |message|
  message.upcase
end

Demo 07

Synchronous messaging

Message management

  • Counting messages
  • Moving messages to another queue
    • Expiring messages
    • Sending messages to DLQ
  • Removing messages

Demo 08

Message management

Queue management

  • Listing destinations
  • Pausing / resuming the queue

Demo 09

Queue management

Scheduled jobs

Things you want to execute sometime in the future

Job class

class SimpleJob
  def run
    # Stuff goes here
  end
end
simple_job.rb

Scheduling options

  1. Cron syntax
  2. 'At'
Seconds Minutes Hours Day of month Month Day of week Year
*/5 * * * * ?
0 15 3 1 * ?

Deployment descriptors

TorqueBox.configure do
  job SimpleJob do
    cron '*/5 * * * * ?'
    name 'mail.notifier'      # Optional
    description 'Simple job'  # Optional
    timeout '5s'              # Optional, when the job should timeout
    config do                 # Optional, data injected to constructor
      foo 'bar'
    end
  end
end
torquebox.rb

In the code

TorqueBox::ScheduledJob.schedule(
  "SimpleJob", 
  '*/5 * * * * ?',
  :name         => 'simple.job',
  :description  => 'Simple job',
  :timeout      => '5s',
  :config       => { 'foo' => 'bar' }
)

Demo 10

Cron jobs

'At' jobs

# Run a job every 200 ms for over 5 seconds, from now
TorqueBox::ScheduledJob.at(
  'SimpleJob', :every => 200, :until => Time.now + 5)

# Start in 1 second, then every 200 ms for over 4 seconds
TorqueBox::ScheduledJob.at(
  'SimpleJob', :at => Time.now + 1, :every => 200, :until => Time.now + 5)

# Start in 1 second, then every 200 ms for over 4 seconds
TorqueBox::ScheduledJob.at(
  'SimpleJob', :in => 1_000, :every => 200, :until => Time.now + 5)

# Start in 1 second, then repeat te job 10 times, every 200 ms
TorqueBox::ScheduledJob.at(
  'SimpleJob', :in => 1_000, :repeat => 10, :every => 200)

Demo 11

'At' jobs

Disclaimer

All mentioned features require TorqueBox 3

Or at least latest incrementals

Questions?

Thanks!

http://torquebox.org