TCP Inbound Endpoint - Mule ESB - multithreading -
i have tcp inbound endpoint references tcp connector. request-response endpoint. tcp client 3rd party application sends requests on 1 socket. how have set tcp endpoint. endpoint:
<tcp:inbound-endpoint exchange-pattern="request-response" responsetimeout="10000" doc:name="tcp" address="${endpoint}" encoding="iso-8859-1" connector-ref="tcp"/>
connector:
<tcp:connector name="tcp" doc:name="tcp connector" clientsotimeout="${client_so_timeout}" receivebacklog="0" receivebuffersize="0" sendbuffersize="0" serversotimeout="${server_so_timeout}" socketsolinger="0" validateconnections="true" keepalive="true" sendtcpnodelay="true"> <receiver-threading-profile maxthreadsactive="${tcp_maxthreadsactive}" maxthreadsidle = "${tcp_maxthreadsidle}" /> <reconnect-forever /> <service-overrides messagereceiver="custommessagereceiver" /> <tcp:custom-protocol ref="customlengthprotocol" /> </tcp:connector>
the flow working fine. when concurrent requests have processed, last few requests timing out. understand messages waiting @ receiver processed (since 1 tcp session used) until previous request completed mule flow.
in order tune this, looking way change mule flow below: after receiving requests client, need send mule-flow may process asynchronously , push response same socket. once request received @ endpoint, not need wait previous request's flow complete before processing next request. there no requirement keep sequence of request/responses mule flow. there way achieve extending mule tcp endpoint functionality? similar queued asynchronous flow processing strategy, except response has sent original tcp socket.
this how resolved : (i) instead of using tcp request-response inbound endpoint, using tcp inbound (one-way) , tcp outbound (one-way) (ii) tcp-inbound receives request, custom length protocol splits message , provides flow. (iii) use request-reply scope vm endpoints (inbound & outbound) (iv) vm endpoints direct message separate flow has processingstrategy "queued-asynchronous". planning have maxthreads set on flow level thread-pooling. (v) second flow performs business logic , response sent main-flow sent across socket. (vi) inorder access socket inbound endpoint, have overriden preroutemulemessage method in tcpmessagereceiver class , added outbound property called "clientsocket" mulemessage. propagate property end of main flow outbound endpoint. @ tcpoutbound endpoint, created own tcpmessagedispatcher , extended dodispatch method. instead of using socket outbound endpoint's thread pool, use socket object shared part of mulemessage. sample flow:
<tcp:connector name="tcp" doc:name="tcp connector" clientsotimeout="70000" receivebacklog="0" receivebuffersize="0" sendbuffersize="0" serversotimeout="70000" socketsolinger="0" validateconnections="true" keepalive="true" sendtcpnodelay="true" keepsendsocketopen="true"> <receiver-threading-profile maxthreadsactive="1" maxthreadsidle="1" /> <reconnect-forever /> <service-overrides messagereceiver="custommessagereceiver" /> <tcp:custom-protocol ref="customlengthprotocol" /> </tcp:connector> <tcp:connector name="tcp2" doc:name="tcp connector" clientsotimeout="70000" receivebacklog="0" receivebuffersize="0" sendbuffersize="0" serversotimeout="70000" socketsolinger="0" validateconnections="true" keepalive="true" sendtcpnodelay="true" keepsendsocketopen="true"> <receiver-threading-profile maxthreadsactive="1" maxthreadsidle="1" /> <reconnect-forever /> <service-overrides dispatcherfactory="custommessagedispatcherfactory"/> <tcp:custom-protocol ref="customlengthprotocol" /> </tcp:connector> <spring:beans> <spring:bean id="customlengthprotocol" name="customlengthprotocol" class="customlengthprotocol" /> </spring:beans> <flow name="tcptestflow" doc:name="tcptestflow"> <tcp:inbound-endpoint address="tcp://localhost:4444" responsetimeout="100000" doc:name="tcp" connector-ref="tcp" /> <byte-array-to-string-transformer doc:name="byte array string" /> <logger level="info" category="expression" doc:name="logger" /> <set-session-variable variablename="socket" value="#[message.outboundproperties['clientsocket']]" doc:name="session variable" /> <logger message="#[payload] - #[socket]" level="info" category="request" doc:name="logger" /> <request-reply doc:name="request-reply"> <vm:outbound-endpoint exchange-pattern="one-way" path="/qin" doc:name="vm" > <message-properties-transformer scope="outbound"> <delete-message-property key="mule_replyto"></delete-message-property> </message-properties-transformer> </vm:outbound-endpoint> <vm:inbound-endpoint exchange-pattern="one-way" path="/qout" doc:name="vm" /> </request-reply> <logger message="#[payload]" level="info" doc:name="logger" category="response" /> <string-to-byte-array-transformer doc:name="string byte array" /> <tcp:outbound-endpoint address="tcp://localhost:4444" responsetimeout="10000" doc:name="tcp" connector-ref="tcp2" /> </flow> <flow name="tcptestflow1" processingstrategy="queued-asynchronous" doc:name="tcptestflow1"> <vm:inbound-endpoint exchange-pattern="one-way" path="/qin" doc:name="vm" /> <logger message="inside vm flow" level="info" doc:name="logger" /> <set-payload value="appended response - #[payload]" doc:name="set payload" /> <vm:outbound-endpoint exchange-pattern="one-way" path="/qout" doc:name="vm" /> </flow>
Comments
Post a Comment