TCP Inbound Endpoint - Mule ESB - multithreading -


i have tcp inbound endpoint references tcp connector. request-response endpoint. tcp client 3rd party application sends requests on 1 socket. how have set tcp endpoint. endpoint:

<tcp:inbound-endpoint exchange-pattern="request-response"             responsetimeout="10000" doc:name="tcp" address="${endpoint}" encoding="iso-8859-1" connector-ref="tcp"/> 

connector:

<tcp:connector name="tcp" doc:name="tcp connector"     clientsotimeout="${client_so_timeout}" receivebacklog="0" receivebuffersize="0"     sendbuffersize="0" serversotimeout="${server_so_timeout}" socketsolinger="0"     validateconnections="true" keepalive="true" sendtcpnodelay="true">             <receiver-threading-profile maxthreadsactive="${tcp_maxthreadsactive}" maxthreadsidle = "${tcp_maxthreadsidle}" />     <reconnect-forever />     <service-overrides messagereceiver="custommessagereceiver" />     <tcp:custom-protocol ref="customlengthprotocol" /> </tcp:connector> 

the flow working fine. when concurrent requests have processed, last few requests timing out. understand messages waiting @ receiver processed (since 1 tcp session used) until previous request completed mule flow.

in order tune this, looking way change mule flow below: after receiving requests client, need send mule-flow may process asynchronously , push response same socket. once request received @ endpoint, not need wait previous request's flow complete before processing next request. there no requirement keep sequence of request/responses mule flow. there way achieve extending mule tcp endpoint functionality? similar queued asynchronous flow processing strategy, except response has sent original tcp socket.

this how resolved : (i) instead of using tcp request-response inbound endpoint, using tcp inbound (one-way) , tcp outbound (one-way) (ii) tcp-inbound receives request, custom length protocol splits message , provides flow. (iii) use request-reply scope vm endpoints (inbound & outbound) (iv) vm endpoints direct message separate flow has processingstrategy "queued-asynchronous". planning have maxthreads set on flow level thread-pooling. (v) second flow performs business logic , response sent main-flow sent across socket. (vi) inorder access socket inbound endpoint, have overriden preroutemulemessage method in tcpmessagereceiver class , added outbound property called "clientsocket" mulemessage. propagate property end of main flow outbound endpoint. @ tcpoutbound endpoint, created own tcpmessagedispatcher , extended dodispatch method. instead of using socket outbound endpoint's thread pool, use socket object shared part of mulemessage. sample flow:

<tcp:connector name="tcp" doc:name="tcp connector"     clientsotimeout="70000" receivebacklog="0" receivebuffersize="0"     sendbuffersize="0" serversotimeout="70000" socketsolinger="0"     validateconnections="true" keepalive="true" sendtcpnodelay="true" keepsendsocketopen="true">     <receiver-threading-profile         maxthreadsactive="1" maxthreadsidle="1" />     <reconnect-forever />     <service-overrides messagereceiver="custommessagereceiver" />      <tcp:custom-protocol ref="customlengthprotocol" /> </tcp:connector>  <tcp:connector name="tcp2" doc:name="tcp connector"     clientsotimeout="70000" receivebacklog="0" receivebuffersize="0"     sendbuffersize="0" serversotimeout="70000" socketsolinger="0"     validateconnections="true" keepalive="true" sendtcpnodelay="true"     keepsendsocketopen="true">     <receiver-threading-profile         maxthreadsactive="1" maxthreadsidle="1" />     <reconnect-forever />     <service-overrides dispatcherfactory="custommessagedispatcherfactory"/>     <tcp:custom-protocol ref="customlengthprotocol" />  </tcp:connector>   <spring:beans>     <spring:bean id="customlengthprotocol" name="customlengthprotocol"         class="customlengthprotocol" /> </spring:beans> <flow name="tcptestflow" doc:name="tcptestflow">     <tcp:inbound-endpoint address="tcp://localhost:4444"          responsetimeout="100000"  doc:name="tcp" connector-ref="tcp" />     <byte-array-to-string-transformer         doc:name="byte array string" />     <logger level="info" category="expression" doc:name="logger" />      <set-session-variable variablename="socket"         value="#[message.outboundproperties['clientsocket']]"         doc:name="session variable" />     <logger message="#[payload] - #[socket]" level="info" category="request"         doc:name="logger" />      <request-reply doc:name="request-reply">         <vm:outbound-endpoint exchange-pattern="one-way"             path="/qin" doc:name="vm" >             <message-properties-transformer scope="outbound">                  <delete-message-property key="mule_replyto"></delete-message-property>               </message-properties-transformer>            </vm:outbound-endpoint>         <vm:inbound-endpoint exchange-pattern="one-way"             path="/qout" doc:name="vm" />     </request-reply>     <logger message="#[payload]" level="info" doc:name="logger"         category="response" />     <string-to-byte-array-transformer         doc:name="string byte array" />     <tcp:outbound-endpoint address="tcp://localhost:4444"          responsetimeout="10000" doc:name="tcp" connector-ref="tcp2" />  </flow> <flow name="tcptestflow1" processingstrategy="queued-asynchronous"     doc:name="tcptestflow1">      <vm:inbound-endpoint exchange-pattern="one-way"         path="/qin" doc:name="vm" />     <logger message="inside vm flow" level="info" doc:name="logger" />     <set-payload value="appended response - #[payload]"         doc:name="set payload" />     <vm:outbound-endpoint exchange-pattern="one-way"         path="/qout" doc:name="vm" />  </flow> 

Comments

Popular posts from this blog

OpenCV OpenCL: Convert Mat to Bitmap in JNI Layer for Android -

android - org.xmlpull.v1.XmlPullParserException: expected: START_TAG {http://schemas.xmlsoap.org/soap/envelope/}Envelope -

python - How to remove the Xframe Options header in django? -