spring integration - duplicate message processed when polling files from s3 -
i using s3 module poll files s3.it downloads file local system , starts processing it.i running on 3 node cluster module count 1.now lets assume file downloaded local system s3 , xd processing it.if xd node goes down have processed half message.when server comes start processing file again hence duplicate message.i trying change idempotent pattern message store change module count 3 still duplicate message issues there.
<int:poller fixed-delay="${fixeddelay}" default="true"> <int:advice-chain> <ref bean="polladvise"/> </int:advice-chain> </int:poller> <bean id="polladvise" class="org.springframework.integration.scheduling.pollskipadvice"> <constructor-arg ref="healthcheckstrategy"/> </bean> <bean id="healthcheckstrategy" class="servicehealthcheckpollskipstrategy"> <property name="url" value="${url}"/> <property name="dohealthcheck" value="${dohealthcheck}"/> </bean> <bean id="credentials" class="org.springframework.integration.aws.core.basicawscredentials"> <property name="accesskey" value="${accesskey}"/> <property name="secretkey" value="${secretkey}"/> </bean> <bean id="clientconfiguration" class="com.amazonaws.clientconfiguration"> <property name="proxyhost" value="${proxyhost}"/> <property name="proxyport" value="${proxyport}"/> <property name="preemptivebasicproxyauth" value="false"/> </bean> <bean id="s3operations" class="org.springframework.integration.aws.s3.core.customc1amazons3operations"> <constructor-arg index="0" ref="credentials"/> <constructor-arg index="1" ref="clientconfiguration"/> <property name="awsendpoint" value="s3.amazonaws.com"/> <property name="temporarydirectory" value="${temporarydirectory}"/> <property name="awssecuritykey" value=""/> </bean> <!-- aws-endpoint="https://s3.amazonaws.com" --> <int-aws:s3-inbound-channel-adapter aws-endpoint="s3.amazonaws.com" bucket="${bucket}" s3-operations="s3operations" credentials-ref="credentials" file-name-wildcard="${filenamewildcard}" remote-directory="${remotedirectory}" channel="splitchannel" local-directory="${localdirectory}" accept-sub-folders="false" delete-source-files="true" archive-bucket="${archivebucket}" archive-directory="${archivedirectory}"> </int-aws:s3-inbound-channel-adapter> <int-file:splitter input-channel="splitchannel" output-channel="output" markers="false" charset="utf-8"> <int-file:request-handler-advice-chain> <bean class="org.springframework.integration.handler.advice.expressionevaluatingrequesthandleradvice"> <property name="onsuccessexpression" value="payload.delete()"/> </bean> </int-file:request-handler-advice-chain> </int-file:splitter> <int:idempotent-receiver id="expressioninterceptor" endpoint="output" metadata-store="redismessagestore" discard-channel="nullchannel" throw-exception-on-rejection="false" key-expression="payload"/> <bean id="redismessagestore" class="o.s.i.redis.store.redischannelmessagestore"> <constructor-arg ref="redisconnectionfactory"/> </bean> <bean id="redisconnectionfactory" class="o.s.data.redis.connection.jedis.jedisconnectionfactory"> <property name="port" value="7379" /> </bean> <int:channel id="output"/>
update 2 configuration worked me help.
<int:idempotent-receiver id="s3interceptor" endpoint="s3splitter" metadata-store="redismessagestore" discard-channel="nullchannel" throw-exception-on-rejection="false" key-expression="payload.name"/> <bean id="redismessagestore" class="org.springframework.integration.redis.metadata.redismetadatastore"> <constructor-arg ref="redisconnectionfactory"/> </bean> <bean id="redisconnectionfactory" class="org.springframework.data.redis.connection.jedis.jedisconnectionfactory"> <property name="port" value="6379" /> </bean> <int:bridge id="batchbridge" input-channel="bridge" output-channel="output"> </int:bridge> <int:idempotent-receiver id="splitterinterceptor" endpoint="batchbridge" metadata-store="redismessagestore" discard-channel="nullchannel" throw-exception-on-rejection="false" key-expression="payload"/> <int:channel id="output"/>
i had few doubts wanted clarify if doing right.
1)as can see have expressionevaluatingrequesthandleradvice delete file.will file deleted after read file redis or after last record read?
2)i explored redis using desktop manager see have metadata man key
both (file , payload) metadatastore key , value going same table fine?or should different metadatastore?
can use hash of payload instead of payload key?is there payload.hash!
looks continuation of multiple message processed, unfortunately don't see <idempotent-receiver>
configuration in case.
according comment there looks continue use simplemetadatastore
or clean shared 1 (redis/mongo) often.
you should share more info dig. logs , debug investigation good, too.
update
the idempotent receiver endpoint
. in config messagechannel
. that's why don't achieve proper work, because messagechannel
ignored idempotentreceiverinterceptor
.
you should add id
<int-file:splitter>
, use id
endpoint
attribute. not should if idea use file
key
idempotency. name
sounds better.
update 2
if node goes down , lets assume file dowloaded(file size million records may gb) xd node , have processed half records , node crashes .when server comes think process same records again?
ok. got point finally! have issue splitted lines file already.
also i'd use idempotent receiver <splitter>
avoid duplicate files s3.
to fix use-case should place in between <splitter>
, output
channel 1 more endpoint - <bridge>
skip duplicate lines idempotent receiver.
Comments
Post a Comment