spring integration - duplicate message processed when polling files from s3 -


i using s3 module poll files s3.it downloads file local system , starts processing it.i running on 3 node cluster module count 1.now lets assume file downloaded local system s3 , xd processing it.if xd node goes down have processed half message.when server comes start processing file again hence duplicate message.i trying change idempotent pattern message store change module count 3 still duplicate message issues there.

  <int:poller fixed-delay="${fixeddelay}" default="true">         <int:advice-chain>                 <ref bean="polladvise"/>                  </int:advice-chain>         </int:poller>           <bean id="polladvise" class="org.springframework.integration.scheduling.pollskipadvice">             <constructor-arg ref="healthcheckstrategy"/>          </bean>          <bean id="healthcheckstrategy" class="servicehealthcheckpollskipstrategy">                      <property name="url" value="${url}"/>                       <property name="dohealthcheck" value="${dohealthcheck}"/>          </bean>            <bean id="credentials" class="org.springframework.integration.aws.core.basicawscredentials">             <property name="accesskey" value="${accesskey}"/>             <property name="secretkey" value="${secretkey}"/>         </bean>            <bean id="clientconfiguration" class="com.amazonaws.clientconfiguration">             <property name="proxyhost" value="${proxyhost}"/>             <property name="proxyport" value="${proxyport}"/>         <property name="preemptivebasicproxyauth" value="false"/>          </bean>           <bean id="s3operations" class="org.springframework.integration.aws.s3.core.customc1amazons3operations">             <constructor-arg index="0" ref="credentials"/>             <constructor-arg index="1" ref="clientconfiguration"/>             <property name="awsendpoint" value="s3.amazonaws.com"/>             <property name="temporarydirectory" value="${temporarydirectory}"/>             <property name="awssecuritykey"  value=""/>         </bean>            <!-- aws-endpoint="https://s3.amazonaws.com"  -->         <int-aws:s3-inbound-channel-adapter aws-endpoint="s3.amazonaws.com"                                             bucket="${bucket}"                                             s3-operations="s3operations"                                             credentials-ref="credentials"                                             file-name-wildcard="${filenamewildcard}"                                             remote-directory="${remotedirectory}"                                             channel="splitchannel"                                             local-directory="${localdirectory}"                                             accept-sub-folders="false"                                             delete-source-files="true"                                             archive-bucket="${archivebucket}"                                             archive-directory="${archivedirectory}">         </int-aws:s3-inbound-channel-adapter>        <int-file:splitter input-channel="splitchannel" output-channel="output" markers="false" charset="utf-8">              <int-file:request-handler-advice-chain>                 <bean class="org.springframework.integration.handler.advice.expressionevaluatingrequesthandleradvice">                     <property name="onsuccessexpression" value="payload.delete()"/>                 </bean>             </int-file:request-handler-advice-chain>      </int-file:splitter>  <int:idempotent-receiver id="expressioninterceptor" endpoint="output"                               metadata-store="redismessagestore"                              discard-channel="nullchannel"                              throw-exception-on-rejection="false"                               key-expression="payload"/>     <bean id="redismessagestore" class="o.s.i.redis.store.redischannelmessagestore">     <constructor-arg ref="redisconnectionfactory"/> </bean> <bean id="redisconnectionfactory"     class="o.s.data.redis.connection.jedis.jedisconnectionfactory">     <property name="port" value="7379" /> </bean>          <int:channel id="output"/> 

update 2 configuration worked me help.

<int:idempotent-receiver id="s3interceptor" endpoint="s3splitter"                              metadata-store="redismessagestore"                              discard-channel="nullchannel"                              throw-exception-on-rejection="false"                               key-expression="payload.name"/>       <bean id="redismessagestore" class="org.springframework.integration.redis.metadata.redismetadatastore">         <constructor-arg ref="redisconnectionfactory"/>     </bean>      <bean id="redisconnectionfactory"           class="org.springframework.data.redis.connection.jedis.jedisconnectionfactory">         <property name="port" value="6379" />     </bean>      <int:bridge id="batchbridge"  input-channel="bridge" output-channel="output">  </int:bridge>       <int:idempotent-receiver id="splitterinterceptor" endpoint="batchbridge"                              metadata-store="redismessagestore"                              discard-channel="nullchannel"                              throw-exception-on-rejection="false"                               key-expression="payload"/>       <int:channel id="output"/> 

i had few doubts wanted clarify if doing right.

1)as can see have expressionevaluatingrequesthandleradvice delete file.will file deleted after read file redis or after last record read?

2)i explored redis using desktop manager see have metadata man keyenter image description here

both (file , payload) metadatastore key , value going same table fine?or should different metadatastore?

can use hash of payload instead of payload key?is there payload.hash!

looks continuation of multiple message processed, unfortunately don't see <idempotent-receiver> configuration in case.

according comment there looks continue use simplemetadatastore or clean shared 1 (redis/mongo) often.

you should share more info dig. logs , debug investigation good, too.

update

the idempotent receiver endpoint. in config messagechannel. that's why don't achieve proper work, because messagechannel ignored idempotentreceiverinterceptor.

you should add id <int-file:splitter> , use id endpoint attribute. not should if idea use file key idempotency. name sounds better.

update 2

if node goes down , lets assume file dowloaded(file size million records may gb) xd node , have processed half records , node crashes .when server comes think process same records again?

ok. got point finally! have issue splitted lines file already.

also i'd use idempotent receiver <splitter> avoid duplicate files s3.

to fix use-case should place in between <splitter> , output channel 1 more endpoint - <bridge> skip duplicate lines idempotent receiver.


Comments

Popular posts from this blog

javascript - jQuery: Add class depending on URL in the best way -

caching - How to check if a url path exists in the service worker cache -

Redirect to a HTTPS version using .htaccess -