Class RxZmqContext

java.lang.Object
org.consensusj.rx.zeromq.RxZmqContext
All Implemented Interfaces:
Closeable, AutoCloseable

public class RxZmqContext extends Object implements Closeable
ZMQ Context with a single socket and one Processor<ZMsg> per topic. It subscribes SUB to one or more topics using ZMQ and receives a multiplexed stream of ZMsg from the ZMsgSocketFlowable. In onNext(org.zeromq.ZMsg) the ZMsg are de-multiplexed by the topic String and placed in a per-topic Processor<ZMsg>.

Topics (at least for now) must be passed to the constructor. It would be nice to be able to subscribe to topics at the ZMQ level when we first get subscriptions at the Rx level. This will probably require using a ZPoller or something like that.

  • Constructor Details

    • RxZmqContext

      public RxZmqContext(URI tcpAddress, List<String> topics)
    • RxZmqContext

      public RxZmqContext(URI tcpAddress, List<String> topics, ThreadFactory threadFactory)
      Parameters:
      tcpAddress - Address of ZMQ server to connect to
      topics - a list of topics to subscribe to
      threadFactory - factory to create a thread for the receive loop (see ZMsgSocketFlowable.createFromSocket(ZMQ.Socket, ThreadFactory))
  • Method Details

    • topicPublisher

      public org.reactivestreams.Publisher<org.zeromq.ZMsg> topicPublisher(String topic)
      Get a Publisher (internally is an RxJava3 Flowable) for a given topic
      Parameters:
      topic - a valid topic as passed to the constructor
      Returns:
      A Publisher for the topic
      Throws:
      IllegalArgumentException - if topic wasn't configured in the constructor
    • close

      public void close()
      Specified by:
      close in interface AutoCloseable
      Specified by:
      close in interface Closeable