下面代码路径 :source->rocketmq->common->selector
DefaultTopicSelector.java 类
public class DefaultTopicSelector<T> implements TopicSelector<T> {
private final String topicName;
private final String tagName;
public DefaultTopicSelector(final String topicName) {
this(topicName, "");
}
public DefaultTopicSelector(String topicName, String tagName) {
this.topicName = topicName;
this.tagName = tagName;
}
@Override
public String getTopic(T tuple) {
return topicName;
}
@Override
public String getTag(T tuple) {
return tagName;
}
}
TopicSelector.java
import java.io.Serializable;
public interface TopicSelector<T> extends Serializable {
String getTopic(T tuple);
String getTag(T tuple);
}
下面代码路径:source->rocketmq->common->serialization
AlarmEventSerializationSchema.java
import com.alibaba.fastjson.JSON;
import com.bsj.flinkRisk.model.AlarmEvent;
import com.bsj.flinkRisk.utils.NumberToByteUtil;
import java.nio.charset.StandardCharsets;
public class AlarmEventSerializationSchema implements KeyValueSerializationSchema<AlarmEvent> {
public static final String DEFAULT_KEY_FIELD = "key";
public static final String DEFAULT_VALUE_FIELD = "value";
public String keyField;
public String valueField;
public AlarmEventSerializationSchema() {
this(DEFAULT_KEY_FIELD, DEFAULT_VALUE_FIELD);
}
public AlarmEventSerializationSchema(String keyField, String valueField) {
this.keyField = keyField;
this.valueField = valueField;
}
@Override
public byte[] serializeKey(AlarmEvent event) {
return NumberToByteUtil.long2Bytes(event.getVehicleId());
}
@Override
public byte[] serializeValue(AlarmEvent event) {
String data = JSON.toJSONString(event);
return data.getBytes(StandardCharsets.UTF_8);
}
}
KeyValueDeserializationSchema.java
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import java.io.Serializable;
public interface KeyValueDeserializationSchema<T> extends ResultTypeQueryable<T>, Serializable {
T deserializeKeyAndValue(byte[] key, byte[] value);
}
KeyValueSerializationSchema.java
import java.io.Serializable;
public interface KeyValueSerializationSchema<T> extends Serializable {
byte[] serializeKey(T tuple);
byte[] serializeValue(T tuple);
}
VehiclePosInfoDeserializationSchema.java
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@Slf4j
public class VehiclePosInfoDeserializationSchema implements KeyValueDeserializationSchema<VehiclePosInfo> {
public VehiclePosInfoDeserializationSchema() {
}
@Override
public VehiclePosInfo deserializeKeyAndValue(byte[] key, byte[] value) {
SpeedInfo speedInfo = null;
YunDataSerialize yunDataSerialize = new YunDataSerialize();
try {
speedInfo =yunDataSerialize.SpeedinfoFromArray(value);
// 2表示为1402的位置数据,传过来的不处理。
if (speedInfo.getSource()==2){
return null;
}
} catch (Exception e) {
log.error("数据解析异常,数据{}", HexStr.toStr(value),e);
}
return convertToVehicleState(speedInfo);
}
/**
* 类型转换
*
* @param speedInfo
* @return
*/
private VehiclePosInfo convertToVehicleState(SpeedInfo speedInfo) {
if(speedInfo==null){
return null;
}
VehiclePosInfo vehicleState = new VehiclePosInfo();
vehicleState.setVehicleId(speedInfo.getVehicleId());
vehicleState.setGroupId(speedInfo.getGroupId());
vehicleState.setPlateColor(speedInfo.getPlateColor());
vehicleState.setPlate(speedInfo.getPlate());
vehicleState.setVehicleShape(speedInfo.getVehicleShape());
vehicleState.setVehicleState(speedInfo.getVehicleState());
vehicleState.setTerminalNo(speedInfo.getTerminalNo());
vehicleState.setProtocolType(speedInfo.getProtocolType());
vehicleState.setTerminalType(speedInfo.getTerminalType());
vehicleState.setDevTime(speedInfo.getPos().getDevTime());
vehicleState.setRecvTime(speedInfo.getPos().getRecvTime());
vehicleState.setLon(speedInfo.getPos().getLon());
ehicleState.setLat(speedInfo.getPos().getLat());
vehicleState.setSpeed(speedInfo.getPos().getSpeed());
vehicleState.setDirect(speedInfo.getPos().getDirect());
vehicleState.setHigh(speedInfo.getPos().getHigh());
vehicleState.setMileage(speedInfo.getPos().getMileage());
vehicleState.setIsAcc(speedInfo.getPos().getIsAcc());
vehicleState.setExtend(speedInfo.getPos().getExtend());
vehicleState.setIsPos(speedInfo.getPos().getIsPos());
vehicleState.setPosSource(speedInfo.getSource());
vehicleState.setCarhrough(speedInfo.getCarhrough());
}
}
下面代码路径:source->rocketmq->example 这个只是例子可以不用
RocketMQFlinkExample.java
public class RocketMQFlinkExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(3000);
Properties consumerProps = new Properties();
consumerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, "localhost:9876");
consumerProps.setProperty(RocketMQConfig.CONSUMER_GROUP, "c002");
consumerProps.setProperty(RocketMQConfig.CONSUMER_TOPIC, "gg");
Properties producerProps = new Properties();
producerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, "localhost:9876");
int msgDelayLevel = RocketMQConfig.MSG_DELAY_LEVEL00;
producerProps.setProperty(RocketMQConfig.MSG_DELAY_LEVEL, String.valueOf(msgDelayLevel));
// TimeDelayLevel is not supported for batching
boolean batchFlag = msgDelayLevel <= 0;
DataStream<SpeedInfo> stream = env
.addSource(new RocketMQSource(new VehiclePosInfoDeserializationSchema(), consumerProps))
.name("transactions");
stream.keyBy(SpeedInfo::getVehicleId)
.process(new KeyedProcessFunction<Long, SpeedInfo, AlarmEvent>() {
@Override
public void processElement(SpeedInfo in, Context ctx, Collector<AlarmEvent> out) throws Exception {
AlarmEvent event=new AlarmEvent();
out.collect(event);
}
})
.name("upper-processor")
.setParallelism(2)
.addSink(new RocketMQSink(new AlarmEventSerializationSchema("id", "province"),
new DefaultTopicSelector("zhisheng"), producerProps).withBatchFlushOnCheckpoint(batchFlag))
.name("rocketmq-sink")
.setParallelism(2);
env.execute("rocketmq-flink-example");
}
}
SimpleConsumer.java
public class SimpleConsumer {
public static void main(String[] args) {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("g00003");
consumer.setNamesrvAddr("localhost:9876");
try {
consumer.subscribe("zhisheng", "*");
} catch (MQClientException e) {
e.printStackTrace();
}
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(msg.getKeys() + ":" + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
try {
consumer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
}
SimpleProducer.java
public class SimpleProducer {
public static void main(String[] args) {
DefaultMQProducer producer = new DefaultMQProducer("p001");
producer.setNamesrvAddr("localhost:9876");
try {
producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
for (int i = 0; i < 10000; i++) {
Message msg = new Message("zhisheng", "", "id_" + i, ("country_X province_" + i).getBytes());
try {
producer.send(msg);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("send " + i);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
下面代码路径:source->rocketmq
RocketMQConfig.java
public class RocketMQConfig {
// Server Config
public static final String NAME_SERVER_ADDR = "nameserver.address"; // 必须
public static final String NAME_SERVER_POLL_INTERVAL = "nameserver.poll.interval";
public static final int DEFAULT_NAME_SERVER_POLL_INTERVAL = 30000; // 30 seconds
public static final String BROKER_HEART_BEAT_INTERVAL = "brokerserver.heartbeat.interval";
public static final int DEFAULT_BROKER_HEART_BEAT_INTERVAL = 30000; // 30 seconds
// Producer related config
public static final String PRODUCER_GROUP = "producer.group";
public static final String PRODUCER_RETRY_TIMES = "producer.retry.times";
public static final int DEFAULT_PRODUCER_RETRY_TIMES = 3;
public static final String PRODUCER_TIMEOUT = "producer.timeout";
public static final int DEFAULT_PRODUCER_TIMEOUT = 3000; // 3 seconds
// Consumer related config
public static final String CONSUMER_GROUP = "consumer.group"; // 必须
public static final String CONSUMER_TOPIC = "consumer.topic"; // 必须
public static final String CONSUMER_TAG = "consumer.tag";
public static final String DEFAULT_CONSUMER_TAG = "*";
public static final String CONSUMER_OFFSET_RESET_TO = "consumer.offset.reset.to"; // offset 重制到
public static final String CONSUMER_OFFSET_LATEST = "latest";
public static final String CONSUMER_OFFSET_EARLIEST = "earliest";
public static final String CONSUMER_OFFSET_TIMESTAMP = "timestamp";
public static final String CONSUMER_OFFSET_FROM_TIMESTAMP = "consumer.offset.from.timestamp"; // offset 重制到某个时间点
public static final String CONSUMER_OFFSET_PERSIST_INTERVAL = "consumer.offset.persist.interval";
public static final int DEFAULT_CONSUMER_OFFSET_PERSIST_INTERVAL = 5000; // 5 seconds
public static final String CONSUMER_PULL_POOL_SIZE = "consumer.pull.thread.pool.size";
public static final int DEFAULT_CONSUMER_PULL_POOL_SIZE = 20;
public static final String CONSUMER_BATCH_SIZE = "consumer.batch.size";
public static final int DEFAULT_CONSUMER_BATCH_SIZE = 500;
public static final String CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND = "consumer.delay.when.message.not.found";
public static final int DEFAULT_CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND = 10;
public static final String MSG_DELAY_LEVEL = "msg.delay.level";
public static final int MSG_DELAY_LEVEL00 = 0; // no delay
public static final int MSG_DELAY_LEVEL01 = 1; // 1s
public static final int MSG_DELAY_LEVEL02 = 2; // 5s
public static final int MSG_DELAY_LEVEL03 = 3; // 10s
public static final int MSG_DELAY_LEVEL04 = 4; // 30s
public static final int MSG_DELAY_LEVEL05 = 5; // 1min
public static final int MSG_DELAY_LEVEL06 = 6; // 2min
public static final int MSG_DELAY_LEVEL07 = 7; // 3min
public static final int MSG_DELAY_LEVEL08 = 8; // 4min
public static final int MSG_DELAY_LEVEL09 = 9; // 5min
public static final int MSG_DELAY_LEVEL10 = 10; // 6min
public static final int MSG_DELAY_LEVEL11 = 11; // 7min
public static final int MSG_DELAY_LEVEL12 = 12; // 8min
public static final int MSG_DELAY_LEVEL13 = 13; // 9min
public static final int MSG_DELAY_LEVEL14 = 14; // 10min
public static final int MSG_DELAY_LEVEL15 = 15; // 20min
public static final int MSG_DELAY_LEVEL16 = 16; // 30min
public static final int MSG_DELAY_LEVEL17 = 17; // 1h
public static final int MSG_DELAY_LEVEL18 = 18; // 2h
/**
* 构建 producer 配置
*
* @param props Properties
* @param producer DefaultMQProducer
*/
public static void buildProducerConfigs(Properties props, DefaultMQProducer producer) {
buildCommonConfigs(props, producer);
String group = props.getProperty(PRODUCER_GROUP);
if (StringUtils.isEmpty(group)) {
group = UUID.randomUUID().toString();
}
producer.setProducerGroup(props.getProperty(PRODUCER_GROUP, group));
producer.setRetryTimesWhenSendFailed(getInteger(props,
PRODUCER_RETRY_TIMES, DEFAULT_PRODUCER_RETRY_TIMES));
producer.setRetryTimesWhenSendAsyncFailed(getInteger(props,
PRODUCER_RETRY_TIMES, DEFAULT_PRODUCER_RETRY_TIMES));
producer.setSendMsgTimeout(getInteger(props,
PRODUCER_TIMEOUT, DEFAULT_PRODUCER_TIMEOUT));
}
/**
* 构建 Consumer 配置
*
* @param props Properties
* @param consumer DefaultMQPushConsumer
*/
public static void buildConsumerConfigs(Properties props, DefaultMQPullConsumer consumer) {
buildCommonConfigs(props, consumer);
//消费模式
consumer.setMessageModel(MessageModel.CLUSTERING);
// consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.setPersistConsumerOffsetInterval(getInteger(props,
CONSUMER_OFFSET_PERSIST_INTERVAL, DEFAULT_CONSUMER_OFFSET_PERSIST_INTERVAL));
}
/**
* 构建通用的配置
*
* @param props Properties
* @param client ClientConfig
*/
private static void buildCommonConfigs(Properties props, ClientConfig client) {
String nameServers = props.getProperty(NAME_SERVER_ADDR);
Validate.notEmpty(nameServers);
client.setNamesrvAddr(nameServers);
client.setPollNameServerInterval(getInteger(props,
NAME_SERVER_POLL_INTERVAL, DEFAULT_NAME_SERVER_POLL_INTERVAL));
client.setHeartbeatBrokerInterval(getInteger(props,
BROKER_HEART_BEAT_INTERVAL, DEFAULT_BROKER_HEART_BEAT_INTERVAL));
}
}
RocketMQSink.java
public class RocketMQSink<IN> extends RichSinkFunction<IN> implements CheckpointedFunction {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(RocketMQSink.class);
private transient DefaultMQProducer producer;
private boolean async; // false by default
private Properties props;
private TopicSelector<IN> topicSelector;
private KeyValueSerializationSchema<IN> serializationSchema;
private boolean batchFlushOnCheckpoint; // false by default
private int batchSize = 1000;
private List<Message> batchList;
private int messageDeliveryDelayLevel = RocketMQConfig.MSG_DELAY_LEVEL00;
public RocketMQSink(KeyValueSerializationSchema<IN> schema, TopicSelector<IN> topicSelector, Properties props) {
this.serializationSchema = schema;
this.topicSelector = topicSelector;
this.props = props;
if (this.props != null) {
this.messageDeliveryDelayLevel = RocketMQUtils.getInteger(this.props, RocketMQConfig.MSG_DELAY_LEVEL,
RocketMQConfig.MSG_DELAY_LEVEL00);
if (this.messageDeliveryDelayLevel < RocketMQConfig.MSG_DELAY_LEVEL00) {
this.messageDeliveryDelayLevel = RocketMQConfig.MSG_DELAY_LEVEL00;
} else if (this.messageDeliveryDelayLevel > RocketMQConfig.MSG_DELAY_LEVEL18) {
this.messageDeliveryDelayLevel = RocketMQConfig.MSG_DELAY_LEVEL18;
}
}
}
@Override
public void open(Configuration parameters) throws Exception {
Validate.notEmpty(props, "Producer properties can not be empty");
Validate.notNull(topicSelector, "TopicSelector can not be null");
Validate.notNull(serializationSchema, "KeyValueSerializationSchema can not be null");
producer = new DefaultMQProducer();
producer.setInstanceName(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()));
RocketMQConfig.buildProducerConfigs(props, producer);
batchList = new LinkedList<>();
if (batchFlushOnCheckpoint && !((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled()) {
LOG.warn("Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling flushing.");
batchFlushOnCheckpoint = false;
}
try {
producer.start();
} catch (MQClientException e) {
throw new RuntimeException(e);
}
}
Override
public void invoke(IN input, Context context) throws Exception {
Message msg = prepareMessage(input);
if (batchFlushOnCheckpoint) {
batchList.add(msg);
if (batchList.size() >= batchSize) {
flushSync();
}
return;
}
if (async) {
try {
producer.send(msg,new MessageQueueSelector() {
// 每个节点轮着来
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// arg就是send的第3个参数
long vehicleId = (Long) arg;
return mqs.get((int) (vehicleId % mqs.size()));
}
},msg.getKeys(), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
LOG.debug("Async send message success! result: {}", sendResult);
}
@Override
public void onException(Throwable throwable) {
if (throwable != null) {
LOG.error("Async send message failure!", throwable);
}
}
});
}catch (Exception e) {
LOG.error("Async send message failure!", e);
}
}else {
try {
SendResult result = producer.send(msg);
LOG.debug("Sync send message result: {}", result);
} catch (Exception e) {
LOG.error("Sync send message failure!", e);
}
}
}
/**
* 解析消息
*
* @param input
* @return
*/
private Message prepareMessage(IN input) {
String topic = topicSelector.getTopic(input);
String tag = topicSelector.getTag(input) != null ? topicSelector.getTag(input) : "";
byte[] k = serializationSchema.serializeKey(input);
String key = k != null ? String.valueOf(NumberToByteUtil.bytes2Long(k)) : "";
byte[] value = serializationSchema.serializeValue(input);
Validate.notNull(topic, "the message topic is null");
Validate.notNull(value, "the message body is null");
Message msg = new Message(topic, tag, key, value);
if (this.messageDeliveryDelayLevel > RocketMQConfig.MSG_DELAY_LEVEL00) {
msg.setDelayTimeLevel(this.messageDeliveryDelayLevel);
}
return msg;
}
public RocketMQSink<IN> withAsync(boolean async) {
this.async = async;
return this;
}
public RocketMQSink<IN> withBatchFlushOnCheckpoint(boolean batchFlushOnCheckpoint) {
this.batchFlushOnCheckpoint = batchFlushOnCheckpoint;
return this;
}
public RocketMQSink<IN> withBatchSize(int batchSize) {
this.batchSize = batchSize;
return this;
}
@Override
public void close() throws Exception {
if (producer != null) {
flushSync();
producer.shutdown();
}
}
private void flushSync() throws Exception {
if (batchFlushOnCheckpoint) {
synchronized (batchList) {
if (batchList.size() > 0) {
producer.send(batchList);
batchList.clear();
}
}
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
flushSync();
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// Nothing to do
}
}
RocketMQSource.java
public class RocketMQSource <OUT> extends RichParallelSourceFunction<OUT>
implements CheckpointedFunction, ResultTypeQueryable<OUT> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(RocketMQSource.class);
private transient MQPullConsumerScheduleService pullConsumerScheduleService;
private DefaultMQPullConsumer consumer;
private KeyValueDeserializationSchema<OUT> schema;
private RunningChecker runningChecker;
private transient ListState<Tuple2<MessageQueue, Long>> unionOffsetStates;
private Map<MessageQueue, Long> offsetTable;
private Map<MessageQueue, Long> restoredOffsets;
private Properties props;
private String topic;
private String group;
private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states";
private transient volatile boolean restored;
public RocketMQSource(KeyValueDeserializationSchema<OUT> schema, Properties props) {
this.schema = schema;
this.props = props;
}
@Override
public void open(Configuration parameters) throws Exception {
LOG.info("启动定位数据消费Mq");
Validate.notEmpty(props, "Consumer properties can not be empty");
Validate.notNull(schema, "KeyValueDeserializationSchema can not be null");
this.topic = props.getProperty(RocketMQConfig.CONSUMER_TOPIC);
this.group = props.getProperty(RocketMQConfig.CONSUMER_GROUP);
Validate.notEmpty(topic, "Consumer topic can not be empty");
Validate.notEmpty(group, "Consumer group can not be empty");
if (offsetTable == null) {
offsetTable = new ConcurrentHashMap<>();
}
if (restoredOffsets == null) {
restoredOffsets = new ConcurrentHashMap<>();
}
runningChecker = new RunningChecker();
pullConsumerScheduleService = new MQPullConsumerScheduleService(group);
consumer = pullConsumerScheduleService.getDefaultMQPullConsumer();
consumer.setInstanceName(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()));
RocketMQConfig.buildConsumerConfigs(props, consumer);
}
@Override
public void run(SourceContext<OUT> context) throws Exception {
LOG.info("开始发送数据");
// The lock that guarantees that record emission and state updates are atomic,
// from the view of taking a checkpoint.
final Object lock = context.getCheckpointLock();
int delayWhenMessageNotFound = getInteger(props, RocketMQConfig.CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND,
RocketMQConfig.DEFAULT_CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND);
String tag = props.getProperty(RocketMQConfig.CONSUMER_TAG, RocketMQConfig.DEFAULT_CONSUMER_TAG);
int pullPoolSize = getInteger(props, RocketMQConfig.CONSUMER_PULL_POOL_SIZE,
RocketMQConfig.DEFAULT_CONSUMER_PULL_POOL_SIZE);
int pullBatchSize = getInteger(props, RocketMQConfig.CONSUMER_BATCH_SIZE,
RocketMQConfig.DEFAULT_CONSUMER_BATCH_SIZE);
pullConsumerScheduleService.setPullThreadNums(pullPoolSize);
pullConsumerScheduleService.registerPullTaskCallback(topic, new PullTaskCallback() {
@Override
public void doPullTask(MessageQueue mq, PullTaskContext pullTaskContext) {
try {
long offset = getMessageQueueOffset(mq);
if (offset < 0) {
return;
}
PullResult pullResult = consumer.pull(mq, tag, offset, pullBatchSize);
boolean found = false;
switch (pullResult.getPullStatus()) {
case FOUND:
List<MessageExt> messages = pullResult.getMsgFoundList();
for (MessageExt msg : messages) {
byte[] key = msg.getKeys() != null ? msg.getKeys().getBytes(StandardCharsets.UTF_8) : null;
byte[] value = msg.getBody();
OUT data = schema.deserializeKeyAndValue(key, value);
// output and state update are atomic
synchronized (lock) {
if(data!=null){
context.collectWithTimestamp(data, msg.getBornTimestamp());
}
}
}
found = true;
break;
case NO_MATCHED_MSG:
LOG.info("No matched message after offset {} for queue {}", offset, mq);
break;
case NO_NEW_MSG:
break;
case OFFSET_ILLEGAL:
LOG.info("Offset {} is illegal for queue {}", offset, mq);
break;
default:
break;
}
synchronized (lock) {
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
}
if (found) {
pullTaskContext.setPullNextDelayTimeMillis(0); // no delay when messages were found
} else {
pullTaskContext.setPullNextDelayTimeMillis(delayWhenMessageNotFound);
}
}catch (Exception e) {
throw new RuntimeException(e);
}
}
});
try {
pullConsumerScheduleService.start();
} catch (MQClientException e) {
throw new RuntimeException(e);
}
runningChecker.setRunning(true);
awaitTermination();
}
private void awaitTermination() throws InterruptedException {
while (runningChecker.isRunning()) {
Thread.sleep(50);
}
}
private long getMessageQueueOffset(MessageQueue mq) throws MQClientException {
Long offset = offsetTable.get(mq);
// restoredOffsets(unionOffsetStates) is the restored global union state;
// should only snapshot mqs that actually belong to us
if (restored && offset == null) {
offset = restoredOffsets.get(mq);
}
if (offset == null) {
offset = consumer.fetchConsumeOffset(mq, false);
if (offset < 0) {
String initialOffset = props.getProperty(RocketMQConfig.CONSUMER_OFFSET_RESET_TO, CONSUMER_OFFSET_LATEST);
switch (initialOffset) {
case CONSUMER_OFFSET_EARLIEST:
offset = consumer.minOffset(mq);
break;
case CONSUMER_OFFSET_LATEST:
offset = consumer.maxOffset(mq);
break;
case CONSUMER_OFFSET_TIMESTAMP:
offset = consumer.searchOffset(mq, getLong(props,
RocketMQConfig.CONSUMER_OFFSET_FROM_TIMESTAMP, System.currentTimeMillis()));
break;
default:
throw new IllegalArgumentException("Unknown value for CONSUMER_OFFSET_RESET_TO.");
}
}
}
offsetTable.put(mq, offset);
return offsetTable.get(mq);
}
private void putMessageQueueOffset(MessageQueue mq, long offset) throws MQClientException {
offsetTable.put(mq, offset);
consumer.updateConsumeOffset(mq, offset);
}
@Override
public void cancel() {
LOG.info("cancel ...");
runningChecker.setRunning(false);
if (pullConsumerScheduleService != null) {
pullConsumerScheduleService.shutdown();
}
offsetTable.clear();
restoredOffsets.clear();
}
@Override
public void close() throws Exception {
LOG.info("close ...");
try {
cancel();
} finally {
super.close();
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
if (!runningChecker.isRunning()) {
LOG.info("snapshotState() called on closed source; returning null.");
return;
}
if (LOG.isDebugEnabled()) {
LOG.info("Snapshotting state {} ...", context.getCheckpointId());
}
unionOffsetStates.clear();
if (LOG.isDebugEnabled()) {
LOG.info("Snapshotted state, last processed offsets: {}, checkpoint id: {}, timestamp: {}",
offsetTable, context.getCheckpointId(), context.getCheckpointTimestamp());
}
}
// remove the unassigned queues in order to avoid read the wrong offset when the source restart
Set<MessageQueue> assignedQueues = consumer.fetchMessageQueuesInBalance(topic);
offsetTable.entrySet().removeIf(item -> !assignedQueues.contains(item.getKey()));
for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) {
unionOffsetStates.add(Tuple2.of(entry.getKey(), entry.getValue()));
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// called every time the user-defined function is initialized,
// be that when the function is first initialized or be that
// when the function is actually recovering from an earlier checkpoint.
// Given this, initializeState() is not only the place where different types of state are initialized,
// but also where state recovery logic is included.
LOG.info("initialize State ...");
this.unionOffsetStates = context.getOperatorStateStore().getUnionListState(new ListStateDescriptor<>(
OFFSETS_STATE_NAME, TypeInformation.of(new TypeHint<Tuple2<MessageQueue, Long>>() {
})));
this.restored = context.isRestored();
if (restored) {
if (restoredOffsets == null) {
restoredOffsets = new ConcurrentHashMap<>();
}
for (Tuple2<MessageQueue, Long> mqOffsets : unionOffsetStates.get()) {
if (!restoredOffsets.containsKey(mqOffsets.f0) || restoredOffsets.get(mqOffsets.f0) < mqOffsets.f1) {
restoredOffsets.put(mqOffsets.f0, mqOffsets.f1);
}
}
LOG.info("Setting restore state in the consumer. Using the following offsets: {}", restoredOffsets);
} else {
LOG.info("No restore state for the consumer.");
}
}
@Override
public TypeInformation<OUT> getProducedType() {
return schema.getProducedType();
}
}
RocketMQUtils.java
public final class RocketMQUtils {
public static int getInteger(Properties props, String key, int defaultValue) {
return Integer.parseInt(props.getProperty(key, String.valueOf(defaultValue)));
}
public static long getLong(Properties props, String key, long defaultValue) {
return Long.parseLong(props.getProperty(key, String.valueOf(defaultValue)));
}
public static boolean getBoolean(Properties props, String key, boolean defaultValue) {
return Boolean.parseBoolean(props.getProperty(key, String.valueOf(defaultValue)));
}
}
RunningChecker.java文章来源:https://uudwc.com/A/22dN
public class RunningChecker implements Serializable {
private volatile boolean isRunning = false;
public boolean isRunning() {
return isRunning;
}
public void setRunning(boolean running) {
isRunning = running;
}
}
以下代码路径:source
RocketMQPosInfoSource.java文章来源地址https://uudwc.com/A/22dN
/**
* rocketmq 定位数据流源
*/
public class RocketMQPosInfoSource extends RocketMQSource<VehiclePosInfo> implements ResultTypeQueryable<VehiclePosInfo> {
public RocketMQPosInfoSource(KeyValueDeserializationSchema<VehiclePosInfo> schema, Properties props) {
super(schema, props);
}
}