스프링 배치잡으로 실행하려고 셋팅 하고 (라이브러리셋팅, config셋팅)

kafka consumer 를 poll 할때

ConcurrentModificationException. KafkaConsumer is not safe for multi-threaded access

에러가 발생했다. 

이유가 뭔고 하니 

카프카 컨슈머는 쓰레드세이프하지 않으니 멀티쓰레드 환경에서는 쓰지 말라는 것이다. 

어쩔까 하다 보니 결국 하나씩만 실행하게 환경을 구축해 줘야 한다. 

.yml 파일에 아래와 같이 셋팅해주자.

spring.task.execution.pool.coreSize: 1

관련 javadoc : http://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#multithreaded

 

KafkaConsumer (kafka 2.1.0 API)

Subscribe to all topics matching specified pattern to get dynamically assigned partitions. The pattern matching will be done periodically against topics existing at the time of check. This is a short-hand for subscribe(Pattern, ConsumerRebalanceListener),

kafka.apache.org

 

.yml

# hadoop
  hadoop:
    user: 
    home: /Dev/hadoop-2.6.0
    command: /Dev/hadoop-2.6.0/bin/hadoop.cmd
    confDir: /Dev/hadoop-2.6.0/etc/hadoop
    defaultFS: hdfs://
    yarn:
      resourceManager: 
      hadoopMapreduceJobTracker: 
      hadoopMapreduceJobHistory: 
      yarnQueueName: default
  hive:
    connectionTimeout: 1800000
    maximumPoolSize: 1
    infoUsername: 
    driverClassName: org.apache.hive.jdbc.HiveDriver
    jdbcUrl: jdbc:hive2://

.gradle


    // hadoop-hive

    implementation group: 'org.springframework.data', name: 'spring-data-hadoop', version: '2.2.1.RELEASE'
    implementation ('org.apache.hive:hive-jdbc:2.1.0') {
        [new Tuple('org.eclipse.jetty.orbit', 'javax.servlet')
         , new Tuple('org.eclipse.jetty.aggregate', 'jetty-all')
         , new Tuple('org.json', 'json')
         , new Tuple('org.slf4j', 'slf4j-log4j12')
         , new Tuple('org.apache.logging.log4j', 'log4j-slf4j-impl')
        ].each {
            exclude group: "${it.get(0)}", module: "${it.get(1)}"
        }
    }
    compile group: 'org.apache.parquet', name: 'parquet-hadoop', version: '1.11.1'

    compile group: 'org.apache.hadoop', 'name': 'hadoop-common', version: '2.6.5'
    compile group: 'org.apache.hadoop', 'name': 'hadoop-hdfs', version: '2.6.5'
    compile group: 'org.apache.hadoop', 'name': 'hadoop-mapreduce-client-core', version: '2.6.5'
    compile group: 'org.apache.hadoop', 'name': 'hadoop-hdfs', version: '2.6.5'
    compile group: 'org.apache.hadoop', 'name': 'hadoop-mapreduce-client-core', version: '2.6.5'

    // end-hadoop-hive

 

HadoopHiveConfig.java

package .common.config;

import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import javax.sql.DataSource;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.ibatis.transaction.managed.ManagedTransactionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;

@Slf4j
@Configuration
@MapperScan(basePackages = "", sqlSessionFactoryRef = "hiveSqlSessionFactory", annotationClass = HadoopHiveConfig.HiveMapper.class)
public class HadoopHiveConfig {
    @Value("${spring.hadoop.home}")
    private String hadoopHome;
    @Value("${spring.hadoop.user}")
    private String hadoopUser;
    @Value("${spring.hadoop.command}")
    private String hadoopCommand;
    @Value("${spring.hadoop.confDir}")
    private String hadoopConfDir;
    @Value("${spring.hadoop.defaultFS}")
    private String hadoopDefaultFS;
    @Value("${spring.hadoop.yarn.resourceManager}")
    private String yarnResourceManager;
    @Value("${spring.hadoop.yarn.hadoopMapreduceJobTracker}")
    private String hadoopMapreduceJobTracker;
    @Value("${spring.hadoop.yarn.hadoopMapreduceJobHistory}")
    private String hadoopMapreduceJobHistory;
    @Value("${spring.hadoop.yarn.yarnQueueName}")
    private String yarnQueueName;

    @Value("${spring.hive.connectionTimeout}")
    private long hiveInfoConnectionTimeout;
    @Value("${spring.hive.maximumPoolSize}")
    private int hiveInfoMaximumPoolSize;
    @Value("${spring.hive.infoUsername}")
    private String hiveInfoUsername;
    @Value("${spring.hive.driverClassName}")
    private String hiveInfoDriverClassName;
    @Value("${spring.hive.jdbcUrl}")
    private String hiveInfoJdbcUrl;
    @Value("${spring.hive.poolName}")
    private String hivePoolName;

    @Bean(name = "hiveSqlSessionFactory")
    public SqlSessionFactory sqlSessionFactory(@Qualifier("hikariHiveDatasource") DataSource hikariHiveDataSource) throws Exception {
        SqlSessionFactoryBean sessionFactory = new SqlSessionFactoryBean();
        sessionFactory.setDataSource(hikariHiveDataSource);
        sessionFactory.setMapperLocations(
            new PathMatchingResourcePatternResolver().getResources("classpath:mapper_hive/*.xml") );
        sessionFactory.setTransactionFactory(new ManagedTransactionFactory());
        return sessionFactory.getObject();
    }

    @Bean(name = "hikariHiveDatasource")
    public DataSource dataSource() {
        setSystemProperty();
        HikariConfig config = getHiveHikariConfig();
        return new HikariDataSource(config);
    }

    private void setSystemProperty() {
        System.setProperty("hadoop.home.dir", hadoopHome);
        System.setProperty("HADOOP_USER_NAME", hadoopUser);
    }

    protected HikariConfig getHiveHikariConfig() {
        HikariConfig config = new HikariConfig();

        config.setPoolName(hivePoolName);
        config.setJdbcUrl(hiveInfoJdbcUrl);
        config.setDriverClassName(hiveInfoDriverClassName);
        config.setUsername(hiveInfoUsername);
        config.setMaximumPoolSize(hiveInfoMaximumPoolSize);
        config.setConnectionTimeout(hiveInfoConnectionTimeout);

        return config;
    }

    /**
     * hdfs conf
     * @return
     */
    public org.apache.hadoop.conf.Configuration getHdfsConfig() {
        org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
        // core-site
        conf.set("fs.defaultFS", hadoopDefaultFS);
        // hdfs-site
        conf.set("dfs.replication", "3");
        conf.set("dfs.permissions", "false");

        return conf;
    }
    /**
     * hdfs conf + yarn conf
     * @return
     */
    public org.apache.hadoop.conf.Configuration getYarnConfig() {
        org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(getHdfsConfig());

        conf.set("mapreduce.job.user.classpath.first", "true");

        conf.set("mapreduce.framework.name", "yarn");
        conf.set("yarn.resourcemanager.hostname", yarnResourceManager);
        if(hadoopMapreduceJobTracker.length() > 0) {

            conf.set("mapreduce.jobtracker.address", hadoopMapreduceJobTracker);
            conf.set("mapreduce.jobhistory.address", hadoopMapreduceJobHistory);
            conf.set("yarn.app.mapreduce.am.staging-dir", "/tmp/mapred-dic/staging");

            // 필요함.
            conf.set("mapreduce.jobhistory.webapp.address", "0.0.0.0:50030");
            conf.set("mapreduce.jobtracker.http.address", "0.0.0.0:51030");
            conf.set("mapreduce.tasktracker.http.address", "0.0.0.0:51060");
        }

        if(yarnQueueName.length() > 0) {
            conf.set("mapreduce.job.queuename", yarnQueueName);
        }


        return conf;
    }

    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    public @interface HiveMapper {
    }

}
import com.google.gson.annotations.SerializedName;
import lombok.Data;

@Data
public class ImageLogJson {
    @SerializedName("field_name")
    public String name;
    @SerializedName("url")
    public String url;
}

+ Recent posts