针对 Azure 事件中心上的 Azure Active Directory 进行身份验证Auth against Azure Active Directory on Azure Event Hubs

Auth against Azure Active Directory on Azure Event Hubs

提问人:Raúl Hernández 提问时间:11/17/2023 更新时间:11/17/2023 访问量:14

问:

我是 Azure Active Directory (Microsoft Entra) 身份验证技术的新手,并尝试以这种方式针对将从 rest API 接收事件的事件中心进行身份验证。只是为了获得一些支持,我正在调查并遵循此 Azure 教程:

https://github.com/Azure/azure-event-hubs-for-kafka/tree/master/tutorials/oauth/java/managedidentity

据我所知,托管和工作负载身份已经创建,并且 kubernetes pod 正在其服务帐户中填充后一个值。

我的问题是:当应用程序启动时,会发生以下错误:

javax.security.auth.login.LoginException: java.lang.RuntimeException: Couldn't acquire access token from IMDS, verify your objectId, clientId or msiResourceId
    at com.microsoft.azure.credentials.MSICredentials.retrieveTokenFromIDMSWithRetry(MSICredentials.java:285)
    at com.microsoft.azure.credentials.MSICredentials.getTokenFromIMDSEndpoint(MSICredentials.java:205)
    at com.microsoft.azure.credentials.MSICredentials.getToken(MSICredentials.java:146)
    at com.atradius.sc.eventpublisher.auth.CustomAuthenticateCallbackHandler.getOAuthBearerToken(CustomAuthenticateCallbackHandler.java:86)
    at com.atradius.sc.eventpublisher.auth.CustomAuthenticateCallbackHandler.handle(CustomAuthenticateCallbackHandler.java:67)
    at org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.identifyToken(OAuthBearerLoginModule.java:316)
    at org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.login(OAuthBearerLoginModule.java:301)
    at java.base/javax.security.auth.login.LoginContext.invoke(Unknown Source)
    at java.base/javax.security.auth.login.LoginContext$4.run(Unknown Source)
    at java.base/javax.security.auth.login.LoginContext$4.run(Unknown Source)
    at java.base/java.security.AccessController.doPrivileged(Unknown Source)
    at java.base/javax.security.auth.login.LoginContext.invokePriv(Unknown Source)
    at java.base/javax.security.auth.login.LoginContext.login(Unknown Source)
    at org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin.login(ExpiringCredentialRefreshingLogin.java:204)
    at org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerRefreshingLogin.login(OAuthBearerRefreshingLogin.java:150)
    at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:62)
    at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:105)
    at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:170)
    at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192)
    at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81)
    at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)
    at org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:517)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:460)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:291)
    ...
Caused by: java.io.IOException: Server returned HTTP response code: 400 for URL: http://<ip>/metadata/identity/oauth2/token?api-version=2018-02-01&resource=https%3A%2F%2Fnull
    at java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream0(Unknown Source)
    at java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream(Unknown Source)
    at com.microsoft.azure.credentials.MSICredentials.retrieveTokenFromIDMSWithRetry(MSICredentials.java:264)
    ... 170 more

    at java.base/javax.security.auth.login.LoginContext.invoke(Unknown Source)
    at java.base/javax.security.auth.login.LoginContext$4.run(Unknown Source)
    at java.base/javax.security.auth.login.LoginContext$4.run(Unknown Source)
    at java.base/java.security.AccessController.doPrivileged(Unknown Source)
    at java.base/javax.security.auth.login.LoginContext.invokePriv(Unknown Source)
    at java.base/javax.security.auth.login.LoginContext.login(Unknown Source)
    at org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin.login(ExpiringCredentialRefreshingLogin.java:204)
    at org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerRefreshingLogin.login(OAuthBearerRefreshingLogin.java:150)
    at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:62)
    at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:105)
    at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:170)
    at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192)
    at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81)
    at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)
    at org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:517)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:460)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:291)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory.createRawProducer(DefaultKafkaProducerFactory.java:884)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory.createKafkaProducer(DefaultKafkaProducerFactory.java:777)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory.doCreateProducer(DefaultKafkaProducerFactory.java:747)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory.createProducer(DefaultKafkaProducerFactory.java:727)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory.createProducer(DefaultKafkaProducerFactory.java:721)
    at org.springframework.kafka.core.KafkaTemplate.getTheProducer(KafkaTemplate.java:918)
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:772)
    at org.springframework.kafka.core.KafkaTemplate.observeSend(KafkaTemplate.java:754)
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:538)
    at com.atradius.sc.eventpublisher.service.IgwPublishService.send(IgwPublishService.java:44)
    at com.atradius.sc.eventpublisher.controller.IgwPublishApiController.publishEvent(IgwPublishApiController.java:38)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.base/java.lang.reflect.Method.invoke(Unknown Source)
    at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:207)
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:152)
    at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:118)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:884)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:797)
    at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1081)
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:974)
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1011)
    at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:914)
    at jakarta.servlet.http.HttpServlet.service(HttpServlet.java:590)
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:885)
    at jakarta.servlet.http.HttpServlet.service(HttpServlet.java:658)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:205)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149)
    at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:51)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:110)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149)
    at org.springframework.security.web.FilterChainProxy.lambda$doFilterInternal$3(FilterChainProxy.java:231)
    at org.springframework.security.web.ObservationFilterChainDecorator$FilterObservation$SimpleFilterObservation.lambda$wrap$1(ObservationFilterChainDecorator.java:425)
    at org.springframework.security.web.ObservationFilterChainDecorator$AroundFilterObservation$SimpleAroundFilterObservation.lambda$wrap$1(ObservationFilterChainDecorator.java:285)
    at org.springframework.security.web.ObservationFilterChainDecorator.lambda$wrapSecured$0(ObservationFilterChainDecorator.java:78)
    at org.springframework.security.web.ObservationFilterChainDecorator$VirtualFilterChain.doFilter(ObservationFilterChainDecorator.java:124)
    at org.springframework.security.web.access.intercept.AuthorizationFilter.doFilter(AuthorizationFilter.java:100)
    at org.springframework.security.web.ObservationFilterChainDecorator$ObservationFilter.wrapFilter(ObservationFilterChainDecorator.java:185)
    at org.springframework.security.web.ObservationFilterChainDecorator$ObservationFilter.doFilter(ObservationFilterChainDecorator.java:172)
    at org.springframework.security.web.ObservationFilterChainDecorator$VirtualFilterChain.doFilter(ObservationFilterChainDecorator.java:133)
    at org.springframework.security.web.access.ExceptionTranslationFilter.doFilter(ExceptionTranslationFilter.java:126)
    at org.springframework.security.web.access.ExceptionTranslationFilter.doFilter(ExceptionTranslationFilter.java:120)
    at org.springframework.security.web.ObservationFilterChainDecorator$ObservationFilter.wrapFilter(ObservationFilterChainDecorator.java:185)
    at org.springframework.security.web.ObservationFilterChainDecorator$ObservationFilter.doFilter(ObservationFilterChainDecorator.java:172)
    at org.springframework.security.web.ObservationFilterChainDecorator$VirtualFilterChain.doFilter(ObservationFilterChainDecorator.java:133)
    at org.springframework.security.web.session.SessionManagementFilter.doFilter(SessionManagementFilter.java:131)
    at org.springframework.security.web.session.SessionManagementFilter.doFilter(SessionManagementFilter.java:85)
    at org.springframework.security.web.ObservationFilterChainDecorator$ObservationFilter.wrapFilter(ObservationFilterChainDecorator.java:185)
    at org.springframework.security.web.ObservationFilterChainDecorator$ObservationFilter.doFilter(ObservationFilterChainDecorator.java:172)
    at org.springframework.security.web.ObservationFilterChainDecorator$VirtualFilterChain.doFilter(ObservationFilterChainDecorator.java:133)
    at org.springframework.security.web.authentication.AnonymousAuthenticationFilter.doFilter(AnonymousAuthenticationFilter.java:100)
    at org.springframework.security.web.ObservationFilterChainDecorator$ObservationFilter.wrapFilter(ObservationFilterChainDecorator.java:185)
    at org.springframework.security.web.ObservationFilterChainDecorator$ObservationFilter.doFilter(ObservationFilterChainDecorator.java:172)
    at org.springframework.security.web.ObservationFilterChainDecorator$VirtualFilterChain.doFilter(ObservationFilterChainDecorator.java:133)
    at org.springframework.security.web.servletapi.SecurityContextHolderAwareRequestFilter.doFilter(SecurityContextHolderAwareRequestFilter.java:179)
    at org.springframework.security.web.ObservationFilterChainDecorator$ObservationFilter.wrapFilter(ObservationFilterChainDecorator.java:185)
    at org.springframework.security.web.ObservationFilterChainDecorator$ObservationFilter.doFilter(ObservationFilterChainDecorator.java:172)
    at org.springframework.security.web.ObservationFilterChainDecorator$VirtualFilterChain.doFilter(ObservationFilterChainDecorator.java:133)
    at org.springframework.security.web.savedrequest.RequestCacheAwareFilter.doFilter(RequestCacheAwareFilter.java:63)
    at org.springframework.security.web.ObservationFilterChainDecorator$ObservationFilter.wrapFilter(ObservationFilterChainDecorator.java:185)
    at org.springframework.security.web.ObservationFilterChainDecorator$ObservationFilter.doFilter(ObservationFilterChainDecorator.java:172)
    at org.springframework.security.web.ObservationFilterChainDecorator$VirtualFilterChain.doFilter(ObservationFilterChainDecorator.java:133)
    at com.atradius.boot.web.auth.jwt.OidmJwtAuthenticationFilter.doFilterInternal(OidmJwtAuthenticationFilter.java:55)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116)
    at org.springframework.security.web.ObservationFilterChainDecorator$ObservationFilter.wrapFilter(ObservationFilterChainDecorator.java:185)
    at org.springframework.security.web.ObservationFilterChainDecorator$ObservationFilter.doFilter(ObservationFilterChainDecorator.java:172)
    at org.springframework.security.web.ObservationFilterChainDecorator$VirtualFilterChain.doFilter(ObservationFilterChainDecorator.java:133)
    at org.springframework.security.web.authentication.logout.LogoutFilter.doFilter(LogoutFilter.java:107)
    at org.springframework.security.web.authentication.logout.LogoutFilter.doFilter(LogoutFilter.java:93)
    at org.springframework.security.web.ObservationFilterChainDecorator$ObservationFilter.wrapFilter(ObservationFilterChainDecorator.java:185)
    at org.springframework.security.web.ObservationFilterChainDecorator$ObservationFilter.doFilter(ObservationFilterChainDecorator.java:172)
    at org.springframework.security.web.ObservationFilterChainDecorator$VirtualFilterChain.doFilter(ObservationFilterChainDecorator.java:133)
    at org.springframework.web.filter.CorsFilter.doFilterInternal(CorsFilter.java:91)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116)
    at org.springframework.security.web.ObservationFilterChainDecorator$ObservationFilter.wrapFilter(ObservationFilterChainDecorator.java:185)
    at org.springframework.security.web.ObservationFilterChainDecorator$ObservationFilter.doFilter(ObservationFilterChainDecorator.java:172)
    at org.springframework.security.web.ObservationFilterChainDecorator$VirtualFilterChain.doFilter(ObservationFilterChainDecorator.java:133)
    at org.springframework.security.web.header.HeaderWriterFilter.doHeadersAfter(HeaderWriterFilter.java:90)
    at org.springframework.security.web.header.HeaderWriterFilter.doFilterInternal(HeaderWriterFilter.java:75)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116)
    at org.springframework.security.web.ObservationFilterChainDecorator$ObservationFilter.wrapFilter(ObservationFilterChainDecorator.java:185)
    at org.springframework.security.web.ObservationFilterChainDecorator$ObservationFilter.doFilter(ObservationFilterChainDecorator.java:172)
    at org.springframework.security.web.ObservationFilterChainDecorator$VirtualFilterChain.doFilter(ObservationFilterChainDecorator.java:133)
    at org.springframework.security.web.context.SecurityContextHolderFilter.doFilter(SecurityContextHolderFilter.java:82)
    at org.springframework.security.web.context.SecurityContextHolderFilter.doFilter(SecurityContextHolderFilter.java:69)
    at org.springframework.security.web.ObservationFilterChainDecorator$ObservationFilter.wrapFilter(ObservationFilterChainDecorator.java:185)
    at org.springframework.security.web.ObservationFilterChainDecorator$ObservationFilter.doFilter(ObservationFilterChainDecorator.java:172)
    at org.springframework.security.web.ObservationFilterChainDecorator$VirtualFilterChain.doFilter(ObservationFilterChainDecorator.java:133)
    at org.springframework.security.web.context.request.async.WebAsyncManagerIntegrationFilter.doFilterInternal(WebAsyncManagerIntegrationFilter.java:62)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116)
    at org.springframework.security.web.ObservationFilterChainDecorator$ObservationFilter.wrapFilter(ObservationFilterChainDecorator.java:185)
    at org.springframework.security.web.ObservationFilterChainDecorator$ObservationFilter.doFilter(ObservationFilterChainDecorator.java:172)
    at org.springframework.security.web.ObservationFilterChainDecorator$VirtualFilterChain.doFilter(ObservationFilterChainDecorator.java:133)
    at org.springframework.security.web.session.DisableEncodeUrlFilter.doFilterInternal(DisableEncodeUrlFilter.java:42)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116)
    at org.springframework.security.web.ObservationFilterChainDecorator$ObservationFilter.wrapFilter(ObservationFilterChainDecorator.java:185)
    at org.springframework.security.web.ObservationFilterChainDecorator$AroundFilterObservation$SimpleAroundFilterObservation.lambda$wrap$0(ObservationFilterChainDecorator.java:268)
    at org.springframework.security.web.ObservationFilterChainDecorator$ObservationFilter.doFilter(ObservationFilterChainDecorator.java:169)
    at org.springframework.security.web.ObservationFilterChainDecorator$VirtualFilterChain.doFilter(ObservationFilterChainDecorator.java:133)
    at org.springframework.security.web.FilterChainProxy.doFilterInternal(FilterChainProxy.java:233)
    at org.springframework.security.web.FilterChainProxy.doFilter(FilterChainProxy.java:191)
    at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:352)
    at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:268)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149)
    at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149)
    at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149)
    at org.springframework.web.filter.ServerHttpObservationFilter.doFilterInternal(ServerHttpObservationFilter.java:109)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149)
    at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149)
    at org.springframework.web.filter.ForwardedHeaderFilter.doFilterInternal(ForwardedHeaderFilter.java:153)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149)
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:166)
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:90)
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:482)
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:115)
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:93)
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74)
    at org.apache.catalina.valves.RemoteIpValve.invoke(RemoteIpValve.java:738)
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:341)
    at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:390)
    at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:63)
    at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:894)
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1741)
    at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:52)
    at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1191)
    at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659)
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
    at java.base/java.lang.Thread.run(Unknown Source)
    

这是 auth 类:

public class CustomAuthenticateCallbackHandler implements AuthenticateCallbackHandler {

    final static ScheduledExecutorService EXECUTOR_SERVICE = Executors.newScheduledThreadPool(1);
    final static MSICredentials CREDENTIALS = new MSICredentials();
    // Use AppServiceMSICredentials instead for App Service deployment.
    // final static AppServiceMSICredentials CREDENTIALS = new AppServiceMSICredentials(AzureEnvironment.AZURE);
    
    private String sbUri;

    @Override
    public void configure(Map<String, ?> configs, String mechanism, List<AppConfigurationEntry> jaasConfigEntries) {
        String bootstrapServer = Arrays.asList(configs.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)).get(0).toString();
        bootstrapServer = bootstrapServer.replaceAll("\\[|\\]", "");
        URI uri = URI.create("https://" + bootstrapServer);
        this.sbUri = uri.getScheme() + "://" + uri.getHost();
    }

    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
        for (Callback callback: callbacks) {
            if (callback instanceof OAuthBearerTokenCallback) {
                try {
                    OAuthBearerToken token = getOAuthBearerToken();
                    OAuthBearerTokenCallback oauthCallback = (OAuthBearerTokenCallback) callback;
                    oauthCallback.token(token);
                } catch (InterruptedException | ExecutionException | TimeoutException | ParseException e) {
                    e.printStackTrace();
                }
            } else {
                throw new UnsupportedCallbackException(callback);
            }
        }
    }

    OAuthBearerToken getOAuthBearerToken() throws InterruptedException, ExecutionException, TimeoutException, IOException, ParseException
    {
        String accesToken = CREDENTIALS.getToken(sbUri);
        JWT jwt = JWTParser.parse(accesToken);
        JWTClaimsSet claims = jwt.getJWTClaimsSet();
        
        return new OAuthBearerTokenImp(accesToken, claims.getExpirationTime());
    }
    
    public void close() throws KafkaException {
        // NOOP
    }
}

身份验证描述符aplication.yml

spring:
  kafka:
    bootstrap-servers: ${EventHubNamespace}.servicebus.windows.net:9093
    security:
      protocol: SASL_SSL
    properties:
      sasl.mechanism: OAUTHBEARER
      sasl.jaas.config: org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
      sasl.login.callback.handler.class: CustomAuthenticateCallbackHandler

依赖关系build.gradle

implementation("com.microsoft.azure:azure-client-authentication:1.7.14")

其他项目特点:

  • Spring Boot 3.0.6 (英语)
  • Spring Kafka 3.0.6
  • 爪哇 17

有人在这里有提示吗?谢谢和问候!

azure-active-directory spring-kafka

评论


答: 暂无答案