前言
配置中心,通过key=value的形式存储环境变量。配置中心的属性做了修改,项目中可以通过配置中心的依赖(sdk)立即感知到。需要做的就是如何在属性发生变化时,改变带有@ConfigurationProperties的bean的相关属性。
配置中心
在读配置中心源码的时候发现,里面维护了一个Environment,以及ZookeeperPropertySource。当配置中心属性发生变化的时候,清空ZookeeperPropertySource,并放入最新的属性值。
public class ZookeeperPropertySource extends EnumerablePropertySource
ZookeeperPropertySource重写了equals和hahscode方法,根据这两个方法可以判定配置中心是否修改了属性。
配置中心定义的属性变量
message.center.channels[0].type=HELIUYANmessage.center.channels[0].desc=和留言系统message.center.channels[1].type=EC_BACKENDmessage.center.channels[1].desc=电商后台message.center.channels[2].type=BILL_FLOWmessage.center.channels[2].desc=话费和流量提醒message.center.channels[3].type=INTEGRATED_CASHIERmessage.center.channels[3].desc=综合收银台message.center.businesses[0].type=BIZ_EXP_REMINDERmessage.center.businesses[0].desc=业务到期提醒message.center.businesses[0].topic=message-center-biz-expiration-reminder-topicmessage.center.businesses[1].type=RECHARGE_TRANSACTION_PUSHmessage.center.businesses[1].desc=充值交易实时推送message.center.businesses[1].topic=message-center-recharge-transaction-push-topicmessage.center.businesses2Channels[BIZ_EXP_REMINDER]=EC_BACKENDmessage.center.businesses2Channels[RECHARGE_TRANSACTION_PUSH]=INTEGRATED_CASHIERmessage.center.bizTypeForMsgType[RECHARGE_TRANSACTION_PUSH]=data.type:pay-finish,data.type:rechr-finish,data.type:refund-finish
java属性配置映射类
import org.springframework.boot.context.properties.ConfigurationProperties;import java.util.List;import java.util.Map;import java.util.Objects;/** * @author hujunzheng * @create 2018-06-28 11:37 **/@ConfigurationProperties(prefix = "message.center")public class MessageCenterConstants { private Listbusinesses; private List channels; private Map businesses2Channels; private Map bizTypeForMsgType; public void setBusinesses(List businesses) { this.businesses = businesses; } public void setChannels(List channels) { this.channels = channels; } public List getBusinesses() { return businesses; } public List getChannels() { return channels; } public Map getBusinesses2Channels() { return businesses2Channels; } public void setBusinesses2Channels(Map businesses2Channels) { this.businesses2Channels = businesses2Channels; } public Map getBizTypeForMsgType() { return bizTypeForMsgType; } public void setBizTypeForMsgType(Map bizTypeForMsgType) { this.bizTypeForMsgType = bizTypeForMsgType; } public static class Business implements Comparable { //业务类型 private String type; //业务描述 private String desc; //对应 kafka 的 topic private String topic; public String getType() { return type; } public void setType(String type) { this.type = type; } public String getDesc() { return desc; } public void setDesc(String desc) { this.desc = desc; } public String getTopic() { return topic; } public void setTopic(String topic) { this.topic = topic; } @Override public int compareTo(Business o) { if (type.compareTo(o.type) == 0 || topic.compareTo(o.topic) == 0) { return 0; } return Objects.hash(type, topic); } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Business business = (Business) o; return Objects.equals(type, business.type) || Objects.equals(topic, business.topic); } @Override public int hashCode() { return Objects.hash(type, topic); } @Override public String toString() { return "Business{" + "type='" + type + '\'' + ", desc='" + desc + '\'' + ", topic='" + topic + '\'' + '}'; } } public static class Channel implements Comparable { //渠道类型 private String type; //渠道描述 private String desc; public String getType() { return type; } public void setType(String type) { this.type = type; } public String getDesc() { return desc; } public void setDesc(String desc) { this.desc = desc; } @Override public int compareTo(Channel o) { return this.type.compareTo(o.type); } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Channel channel = (Channel) o; return Objects.equals(type, channel.type); } @Override public int hashCode() { return Objects.hash(type); } @Override public String toString() { return "Channel{" + "type='" + type + '\'' + ", desc='" + desc + '\'' + '}'; } }}
属性刷新方案
@Beanpublic MergedProperties kafkaMessageMergedProperties() { return ConfigCenterUtils.createToRefreshPropertiesBean(MergedProperties.class);}public static class MergedProperties { private Mapbusinesses; private Map channels; //业务映射渠道 private Map businesses2Channels; //消息类型映射业务类型 private Map msgType2BizType; public MergedProperties() throws GeneralException { this.refreshProperties(); } private void refreshProperties() throws GeneralException { //获取到配置中心最新的propertySource ZookeeperPropertySource propertySource = ConfigHelper.getZookeeperPropertySource(); MessageCenterConstants messageCenterConstants = null; //判断属性是否刷新 if (ConfigCenterUtils.propertySourceRefresh(propertySource)) { //将属性binding到带有@ConfigurationProperties注解的类中 messageCenterConstants = RelaxedConfigurationBinder .with(MessageCenterConstants.class) .setPropertySources(propertySource) .doBind(); } //以下是自定义处理,可忽略 if (!Objects.isNull(messageCenterConstants)) { //Business.type <-> Business this.setBusinesses(Maps.newHashMap( Maps.uniqueIndex(Sets.newHashSet(messageCenterConstants.getBusinesses()), business -> business.getType()) )); //Channel.type <-> Channel this.setChannels(Maps.newHashMap( Maps.uniqueIndex(Sets.newHashSet(messageCenterConstants.getChannels()), channel -> channel.getType()) )); //business <-> channels this.setBusinesses2Channels(messageCenterConstants.getBusinesses2Channels()); //消息类型映射业务类型 this.setMsgType2BizType( messageCenterConstants.getBizTypeForMsgType().entrySet() .stream().map(entry -> { Map tmpMap = Maps.newHashMap(); if (StringUtils.isBlank(entry.getValue())) { return tmpMap; } Arrays.stream(entry.getValue().split(",")).forEach(value -> tmpMap.put(value, entry.getKey())); return tmpMap; }).flatMap(map -> map.entrySet().stream()).collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue())) ); } } //刷新方法 private void catchRefreshProperties() { try { this.refreshProperties(); } catch (Exception e) { LOGGER.error("KafkaMessageConfig 配置中心属性刷新失败", e); } } //get方法上指定刷新属性 @ToRefresh(method = "catchRefreshProperties") public Map getBusinesses() { return businesses; } public void setBusinesses(Map businesses) { this.businesses = businesses; } @ToRefresh(method = "catchRefreshProperties") public Map getChannels() { return channels; } public void setChannels(Map channels) { this.channels = channels; } @ToRefresh(method = "catchRefreshProperties") public Map getBusinesses2Channels() { return businesses2Channels; } public void setBusinesses2Channels(Map businesses2Channels) { this.businesses2Channels = businesses2Channels; } @ToRefresh(method = "catchRefreshProperties") public Map getMsgType2BizType() { return msgType2BizType; } public void setMsgType2BizType(Map msgType2BizType) { this.msgType2BizType = msgType2BizType; }}
工具类
ConfigCenterUtils
import com.cmos.cfg.core.ConfigHelper;import com.cmos.cfg.zookeeper.ZookeeperPropertySource;import org.apache.commons.lang3.StringUtils;import org.springframework.cglib.proxy.Enhancer;import org.springframework.cglib.proxy.MethodInterceptor;import org.springframework.cglib.proxy.MethodProxy;import org.springframework.core.BridgeMethodResolver;import org.springframework.core.annotation.AnnotationUtils;import org.springframework.util.ReflectionUtils;import java.lang.reflect.Method;import java.util.Objects;/** * @author hujunzheng * @create 2018-07-04 15:45 **/public class ConfigCenterUtils { private static ZookeeperPropertySource propertySource = ConfigHelper.getZookeeperPropertySource(); //判断配置中心属性是否刷新 public synchronized static boolean propertySourceRefresh(ZookeeperPropertySource newPropertySource) { if (propertySource.equals(newPropertySource)) { return false; } if (propertySource.hashCode() == newPropertySource.hashCode()) { return false; } propertySource = newPropertySource; return true; } //创建代理类,代理@ToRefresh注解的方法,调用相应的刷新方法 public staticT createToRefreshPropertiesBean(Class clazz) { Enhancer enhancer = new Enhancer(); // 设置代理对象父类 enhancer.setSuperclass(clazz); // 设置增强 enhancer.setCallback(new MethodInterceptor() { @Override public Object intercept(Object target, Method method, Object[] args, MethodProxy methodProxy) throws Throwable { ToRefresh toRefresh = AnnotationUtils.findAnnotation(method, ToRefresh.class); if (Objects.isNull(toRefresh) || StringUtils.isBlank(toRefresh.method())) { return methodProxy.invokeSuper(target, args); } Method refreshMethod = ReflectionUtils.findMethod(target.getClass(), toRefresh.method()); if (Objects.isNull(refreshMethod)) { return methodProxy.invokeSuper(target, args); } refreshMethod = BridgeMethodResolver.findBridgedMethod(refreshMethod); refreshMethod.setAccessible(true); refreshMethod.invoke(target, null); return methodProxy.invokeSuper(target, args); } }); return (T) enhancer.create();// 创建代理对象 }}
import org.apache.commons.lang3.StringUtils;import java.lang.annotation.Documented;import java.lang.annotation.Retention;import java.lang.annotation.Target;import static java.lang.annotation.ElementType.METHOD;import static java.lang.annotation.RetentionPolicy.RUNTIME;/** * @author hujunzheng * @create 2018-07-06 9:59 **/@Target({METHOD})@Retention(RUNTIME)@Documentedpublic @interface ToRefresh { //刷新方法 String method() default StringUtils.EMPTY;}
RelaxedConfigurationBinder
动态将propertysource绑定到带有@ConfigurationProperties注解的bean中
参考:org.springframework.boot.context.properties.ConfigurationPropertiesBindingPostProcessor
import com.cmos.common.exception.GeneralException;import org.springframework.boot.bind.PropertiesConfigurationFactory;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.core.convert.ConversionService;import org.springframework.core.convert.support.DefaultConversionService;import org.springframework.core.env.*;import org.springframework.validation.Validator;import org.springframework.validation.beanvalidation.SpringValidatorAdapter;import javax.validation.Validation;import static org.springframework.core.annotation.AnnotatedElementUtils.getMergedAnnotation;/** * @author hujunzheng * @create 2018-07-03 18:01 * * 不强依赖ConfigurationProperties,进行配置注入 **/public class RelaxedConfigurationBinder{ private final PropertiesConfigurationFactory factory; public RelaxedConfigurationBinder(T object) { this(new PropertiesConfigurationFactory<>(object)); } public RelaxedConfigurationBinder(Class type) { this(new PropertiesConfigurationFactory<>(type)); } public static RelaxedConfigurationBinder with(T object) { return new RelaxedConfigurationBinder<>(object); } public static RelaxedConfigurationBinder with(Class type) { return new RelaxedConfigurationBinder<>(type); } public RelaxedConfigurationBinder(PropertiesConfigurationFactory factory) { this.factory = factory; ConfigurationProperties properties = getMergedAnnotation(factory.getObjectType(), ConfigurationProperties.class); javax.validation.Validator validator = Validation.buildDefaultValidatorFactory().getValidator(); factory.setValidator(new SpringValidatorAdapter(validator)); factory.setConversionService(new DefaultConversionService()); if (null != properties) { factory.setIgnoreNestedProperties(properties.ignoreNestedProperties()); factory.setIgnoreInvalidFields(properties.ignoreInvalidFields()); factory.setIgnoreUnknownFields(properties.ignoreUnknownFields()); factory.setTargetName(properties.prefix()); factory.setExceptionIfInvalid(properties.exceptionIfInvalid()); } } public RelaxedConfigurationBinder setTargetName(String targetName) { factory.setTargetName(targetName); return this; } public RelaxedConfigurationBinder setPropertySources(PropertySource ... propertySources) { MutablePropertySources sources = new MutablePropertySources(); for (PropertySource propertySource : propertySources) { sources.addLast(propertySource); } factory.setPropertySources(sources); return this; } public RelaxedConfigurationBinder setPropertySources(Environment environment) { factory.setPropertySources(((ConfigurableEnvironment) environment).getPropertySources()); return this; } public RelaxedConfigurationBinder setPropertySources(PropertySources propertySources) { factory.setPropertySources(propertySources); return this; } public RelaxedConfigurationBinder setConversionService(ConversionService conversionService) { factory.setConversionService(conversionService); return this; } public RelaxedConfigurationBinder setValidator(Validator validator) { factory.setValidator(validator); return this; } public RelaxedConfigurationBinder setResolvePlaceholders(boolean resolvePlaceholders) { factory.setResolvePlaceholders(resolvePlaceholders); return this; } public T doBind() throws GeneralException { try { return factory.getObject(); } catch (Exception ex) { throw new GeneralException("配置绑定失败!", ex); } }}