Package org.consensusj.rx.zeromq
Class RxZmqContext
java.lang.Object
org.consensusj.rx.zeromq.RxZmqContext
- All Implemented Interfaces:
Closeable
,AutoCloseable
ZMQ Context with a single socket and one
Processor<ZMsg>
per topic.
It subscribes SUB
to one or more topics using ZMQ and receives a multiplexed
stream of ZMsg
from the ZMsgSocketFlowable
. In onNext(org.zeromq.ZMsg)
the ZMsg
are de-multiplexed by the topic
String
and placed in a per-topic
Processor<ZMsg>
.
Topics (at least for now) must be passed to the constructor. It would be nice to be able to subscribe to topics at the ZMQ level when we first get subscriptions at the Rx level. This will probably require using a ZPoller or something like that.
-
Constructor Summary
ConstructorDescriptionRxZmqContext
(URI tcpAddress, List<String> topics) RxZmqContext
(URI tcpAddress, List<String> topics, ThreadFactory threadFactory) -
Method Summary
Modifier and TypeMethodDescriptionvoid
close()
org.reactivestreams.Publisher
<org.zeromq.ZMsg> topicPublisher
(String topic) Get a Publisher (internally is an RxJava3 Flowable) for a given topic
-
Constructor Details
-
RxZmqContext
-
RxZmqContext
- Parameters:
tcpAddress
- Address of ZMQ server to connect totopics
- a list of topics to subscribe tothreadFactory
- factory to create a thread for the receive loop (seeZMsgSocketFlowable.createFromSocket(ZMQ.Socket, ThreadFactory)
)
-
-
Method Details
-
topicPublisher
Get a Publisher (internally is an RxJava3 Flowable) for a given topic- Parameters:
topic
- a valid topic as passed to the constructor- Returns:
- A Publisher for the topic
- Throws:
IllegalArgumentException
- if topic wasn't configured in the constructor
-
close
- Specified by:
close
in interfaceAutoCloseable
- Specified by:
close
in interfaceCloseable
-