• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

第二次提交Flink作业报No suitable driver found for jdbc

武飞扬头像
ambitfly
帮助1

这个BUG是这样的,在flink集群测试(session mode)实时把pulsar的数据写入TDengine的时候,第一次提交任务可以运行成功,然后把任务cancel掉后,再次启动就会报No suitable driver found for jdbc:TAOS://hadoop001:6030?user=root&password=taosdata错误,导致任务提交失败,后续提交任务都会失败,报同样的错误。windows本地环境和linux本地环境都没有问题。

一开始思考问题的方向是:

  1. jar包没打进去
  2. Tdengine的客户端和服务端版本不一致
  3. jdbc的url路径写的不对
  4. driver驱动的版本不一致
  5. driver类没有加载进去
  6. flink环境问题
  7. 手动Cancel后没有调用Flink的Sink方法

在一个个都验证过之后发现都不行,好几天都毫无进展,直到我手动Class.forname的时候,出现了一个Native Library xxx is being loaded in another classloader的异常信息。

我开始换一个方向尝试解决这个Bug:

  1. 之前Driver没卸载,尝试手动卸载
  2. 看看有没有办法卸载之前的Driver对应的类加载器(我想太多,只能通过垃圾回收器同意下载)

一同尝试之后还是不行。

最后通过查阅资料,发现Flink为了解决版本的兼容性问题,没有使用双亲委派模型加载Class,用户代码中的类先由 Flink 框架的类加载器加载,再由用户代码的类加载器加载。

这是 FLink 的类加载器继承结构:
学新通

FlinkUserCodeClassLoader 继承自 URLClassLoader 类,其 loadClass() 方法实现如下:

@Override
public final Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
	try {
		synchronized (getClassLoadingLock(name)) {
			return loadClassWithoutExceptionHandling(name, resolve);
		}
	} catch (Throwable classLoadingException) {
		classLoadingExceptionHandler.accept(classLoadingException);
		throw classLoadingException;
	}
}

protected Class<?> loadClassWithoutExceptionHandling(String name, boolean resolve)
		throws ClassNotFoundException {
	// 本质上还是调用的 ClassLoader 的 loadClass() 方法
	// 即 FlinkUserCodeClassLoader 仍然遵循双亲委派模型
	return super.loadClass(name, resolve);
}

学新通

FlinkUserCodeClassLoader 有2个子类,分别为 ParentFirstClassLoader 和 ChildFirstClassLoader。

public static class ParentFirstClassLoader extends FlinkUserCodeClassLoader {

	ParentFirstClassLoader(
			URL[] urls, ClassLoader parent, Consumer<Throwable> classLoadingExceptionHandler) {
		// 直接复用父类 FlinkUserCodeClassLoader 的相关方法逻辑
		super(urls, parent, classLoadingExceptionHandler);
	}

	static {
		ClassLoader.registerAsParallelCapable();
	}
}

可以看到,Parent-First 类加载策略照搬双亲委派模型,也就是说,用户代码的类加载器是User ClassLoader,Flink 框架本身的类加载器是 Application ClassLoader,用户代码中的类先由 Flink 框架的类加载器加载,再由用户代码的类加载器加载。

双亲委派模型的好处是随着加载器的层次关系保证了被加载类的层次关系,从而保证了 Java 运行环境的安全性。但是在 Flink App 这种依赖纷繁复杂的环境中,双亲委派模型可能并不适用。例如,程序中引入的 Flink-Kafka Connector 总是依赖于固定的 Kafka 版本,用户代码中为了兼容实际使用的 Kafka 版本,会引入一个更低或更高的依赖。而同一个组件不同版本的类定义可能会不同(即使类的全限定名是相同的),如果仍然用双亲委派模型,就会因为 Flink 框架指定版本的类先加载,而出现莫名其妙的兼容性问题,如 NoSuchMethodError、IllegalAccessError等。

鉴于此,Flink 实现了 ChildFirstClassLoader 类加载器并作为默认策略,它打破了双亲委派模型,使得用户代码的类先加载,官方文档中将这个操作称为 “Inverted Class Loading”。

ChildFirstClassLoader

ChildFirstClassLoader 是通过覆写 FlinkUserCodeClassLoader 的 loadClassWithoutExceptionHandling() 方法来实现 Child-First 逻辑的。

@Override
protected Class<?> loadClassWithoutExceptionHandling(String name, boolean resolve)
		throws ClassNotFoundException {

	// 首先调用 findLoadedClass 方法检查全限定名 name 对应的类是否已加载过
	Class<?> c = findLoadedClass(name);

	if (c == null) {
		// 检查要加载的类是否以 alwaysParentFirstPatterns 集合中的前缀开头。如果是,则调用父类的 loadClassWithoutExceptionHandling 方法,以 Parent-First 的方式加载它
		for (String alwaysParentFirstPattern : alwaysParentFirstPatterns) {
			if (name.startsWith(alwaysParentFirstPattern)) {
				return super.loadClassWithoutExceptionHandling(name, resolve);
			}
		}

		try {
			// 若加载的类不以 alwaysParentFirstPatterns 集合中的前缀开头,则调用父类 URLClassLoader 的 findClass 方法进行类加载
			c = findClass(name);
		} catch (ClassNotFoundException e) {
			// 若调用 findClass() 方法失败
			// 最终再调用父类的 loadClassWithoutExceptionHandling 方法,以 Parent-First 的方式加载它
			c = super.loadClassWithoutExceptionHandling(name, resolve);
		}
	} else if (resolve) {
		resolveClass(c);
	}

	return c;
}
学新通

可见,用户如果仍然希望某些类"遵循祖制",采用双亲委派模型来加载,则需要借助 alwaysParentFirstPatterns 集合来实现。

而该集合主要由以下2个参数来指定:

  • classloader.parent-first-patterns.default,默认值如下(该值一般不推荐修改):
@Documentation.Section(Documentation.Sections.EXPERT_CLASS_LOADING)
public static final ConfigOption<List<String>> ALWAYS_PARENT_FIRST_LOADER_PATTERNS =
		ConfigOptions.key("classloader.parent-first-patterns.default")
				.stringType()
				.asList()
				.defaultValues(
						ArrayUtils.concat(
								new String[] {
									"java.",
									"scala.",
									"org.apache.flink.",
									"com.esotericsoftware.kryo",
									"org.apache.hadoop.",
									"javax.annotation.",
									"org.xml",
									"javax.xml",
									"org.apache.xerces",
									"org.w3c",
									"org.rocksdb."
								},
								PARENT_FIRST_LOGGING_PATTERNS))
				.withDeprecatedKeys("classloader.parent-first-patterns")
				.withDescription(
						"A (semicolon-separated) list of patterns that specifies which classes should always be"
								  " resolved through the parent ClassLoader first. A pattern is a simple prefix that is checked against"
								  " the fully qualified class name. This setting should generally not be modified. To add another pattern we"
								  " recommend to use \"classloader.parent-first-patterns.additional\" instead.");
								
@Internal
public static final String[] PARENT_FIRST_LOGGING_PATTERNS =
		new String[] {
			"org.slf4j",
			"org.apache.log4j",
			"org.apache.logging",
			"org.apache.commons.logging",
			"ch.qos.logback"
		};

学新通
  • classloader.parent-first-patterns.additional

用户如果仍然希望某些类"遵循祖制",采用双亲委派模型来加载,则可以通过该参数额外指定,并以分号分隔。

@Documentation.Section(Documentation.Sections.EXPERT_CLASS_LOADING)
public static final ConfigOption<List<String>> ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL =
		ConfigOptions.key("classloader.parent-first-patterns.additional")
				.stringType()
				.asList()
				.defaultValues()
				.withDescription(
						"A (semicolon-separated) list of patterns that specifies which classes should always be"
								  " resolved through the parent ClassLoader first. A pattern is a simple prefix that is checked against"
								  " the fully qualified class name. These patterns are appended to \""
								  ALWAYS_PARENT_FIRST_LOADER_PATTERNS.key()
								  "\".");

若有需要,在 conf/flink-conf.yaml 中配置 classloader.parent-first-patterns.additional即可。

Flink 会将上述2种配置合并在一起,作为 alwaysParentFirstPatterns 集合:

public static String[] getParentFirstLoaderPatterns(Configuration config) {
	List<String> base = config.get(ALWAYS_PARENT_FIRST_LOADER_PATTERNS);
	List<String> append = config.get(ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL);
	return mergeListsToArray(base, append);
}

基于这种情况,想到了两种解决方案:

第一种:使用 单作业模式提交flink作业
第二种:修改flink配置文件,使得com.taosdata.jdbc.路径下的类加载的时候通过双亲委派机制加载,这样只有一个类加载器了。

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhiagagb
系列文章
更多 icon
同类精品
更多 icon
继续加载