.stream().collect(Collectors.toMap(A::getKey, B::getKey)
>> .stream().collect(Collectors.toMap(A::getKey, B::getKey, (k1,k2) -> k1)

java8 stream collect  toMap 사용 중 duplicate key 발생 시

Map 반환에 키 중복이 있을때 발생하는데,  오버로딩된 toMap 메소드 중 merge 기능이 있는걸 사용하면 된다. 

<실행되는 Collectors.class >

public static <T, K, U>
Collector<T, ?, Map<K,U>> toMap(Function<? super T, ? extends K> keyMapper,
                                Function<? super T, ? extends U> valueMapper,
                                BinaryOperator<U> mergeFunction) {
    return toMap(keyMapper, valueMapper, mergeFunction, HashMap::new);
}

스프링 배치잡으로 실행하려고 셋팅 하고 (라이브러리셋팅, config셋팅)

kafka consumer 를 poll 할때

ConcurrentModificationException. KafkaConsumer is not safe for multi-threaded access

에러가 발생했다. 

이유가 뭔고 하니 

카프카 컨슈머는 쓰레드세이프하지 않으니 멀티쓰레드 환경에서는 쓰지 말라는 것이다. 

어쩔까 하다 보니 결국 하나씩만 실행하게 환경을 구축해 줘야 한다. 

.yml 파일에 아래와 같이 셋팅해주자.

spring.task.execution.pool.coreSize: 1

관련 javadoc : http://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#multithreaded

 

KafkaConsumer (kafka 2.1.0 API)

Subscribe to all topics matching specified pattern to get dynamically assigned partitions. The pattern matching will be done periodically against topics existing at the time of check. This is a short-hand for subscribe(Pattern, ConsumerRebalanceListener),

kafka.apache.org

 

.yml

# hadoop
  hadoop:
    user: 
    home: /Dev/hadoop-2.6.0
    command: /Dev/hadoop-2.6.0/bin/hadoop.cmd
    confDir: /Dev/hadoop-2.6.0/etc/hadoop
    defaultFS: hdfs://
    yarn:
      resourceManager: 
      hadoopMapreduceJobTracker: 
      hadoopMapreduceJobHistory: 
      yarnQueueName: default
  hive:
    connectionTimeout: 1800000
    maximumPoolSize: 1
    infoUsername: 
    driverClassName: org.apache.hive.jdbc.HiveDriver
    jdbcUrl: jdbc:hive2://

.gradle


    // hadoop-hive

    implementation group: 'org.springframework.data', name: 'spring-data-hadoop', version: '2.2.1.RELEASE'
    implementation ('org.apache.hive:hive-jdbc:2.1.0') {
        [new Tuple('org.eclipse.jetty.orbit', 'javax.servlet')
         , new Tuple('org.eclipse.jetty.aggregate', 'jetty-all')
         , new Tuple('org.json', 'json')
         , new Tuple('org.slf4j', 'slf4j-log4j12')
         , new Tuple('org.apache.logging.log4j', 'log4j-slf4j-impl')
        ].each {
            exclude group: "${it.get(0)}", module: "${it.get(1)}"
        }
    }
    compile group: 'org.apache.parquet', name: 'parquet-hadoop', version: '1.11.1'

    compile group: 'org.apache.hadoop', 'name': 'hadoop-common', version: '2.6.5'
    compile group: 'org.apache.hadoop', 'name': 'hadoop-hdfs', version: '2.6.5'
    compile group: 'org.apache.hadoop', 'name': 'hadoop-mapreduce-client-core', version: '2.6.5'
    compile group: 'org.apache.hadoop', 'name': 'hadoop-hdfs', version: '2.6.5'
    compile group: 'org.apache.hadoop', 'name': 'hadoop-mapreduce-client-core', version: '2.6.5'

    // end-hadoop-hive

 

HadoopHiveConfig.java

package .common.config;

import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import javax.sql.DataSource;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.ibatis.transaction.managed.ManagedTransactionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;

@Slf4j
@Configuration
@MapperScan(basePackages = "", sqlSessionFactoryRef = "hiveSqlSessionFactory", annotationClass = HadoopHiveConfig.HiveMapper.class)
public class HadoopHiveConfig {
    @Value("${spring.hadoop.home}")
    private String hadoopHome;
    @Value("${spring.hadoop.user}")
    private String hadoopUser;
    @Value("${spring.hadoop.command}")
    private String hadoopCommand;
    @Value("${spring.hadoop.confDir}")
    private String hadoopConfDir;
    @Value("${spring.hadoop.defaultFS}")
    private String hadoopDefaultFS;
    @Value("${spring.hadoop.yarn.resourceManager}")
    private String yarnResourceManager;
    @Value("${spring.hadoop.yarn.hadoopMapreduceJobTracker}")
    private String hadoopMapreduceJobTracker;
    @Value("${spring.hadoop.yarn.hadoopMapreduceJobHistory}")
    private String hadoopMapreduceJobHistory;
    @Value("${spring.hadoop.yarn.yarnQueueName}")
    private String yarnQueueName;

    @Value("${spring.hive.connectionTimeout}")
    private long hiveInfoConnectionTimeout;
    @Value("${spring.hive.maximumPoolSize}")
    private int hiveInfoMaximumPoolSize;
    @Value("${spring.hive.infoUsername}")
    private String hiveInfoUsername;
    @Value("${spring.hive.driverClassName}")
    private String hiveInfoDriverClassName;
    @Value("${spring.hive.jdbcUrl}")
    private String hiveInfoJdbcUrl;
    @Value("${spring.hive.poolName}")
    private String hivePoolName;

    @Bean(name = "hiveSqlSessionFactory")
    public SqlSessionFactory sqlSessionFactory(@Qualifier("hikariHiveDatasource") DataSource hikariHiveDataSource) throws Exception {
        SqlSessionFactoryBean sessionFactory = new SqlSessionFactoryBean();
        sessionFactory.setDataSource(hikariHiveDataSource);
        sessionFactory.setMapperLocations(
            new PathMatchingResourcePatternResolver().getResources("classpath:mapper_hive/*.xml") );
        sessionFactory.setTransactionFactory(new ManagedTransactionFactory());
        return sessionFactory.getObject();
    }

    @Bean(name = "hikariHiveDatasource")
    public DataSource dataSource() {
        setSystemProperty();
        HikariConfig config = getHiveHikariConfig();
        return new HikariDataSource(config);
    }

    private void setSystemProperty() {
        System.setProperty("hadoop.home.dir", hadoopHome);
        System.setProperty("HADOOP_USER_NAME", hadoopUser);
    }

    protected HikariConfig getHiveHikariConfig() {
        HikariConfig config = new HikariConfig();

        config.setPoolName(hivePoolName);
        config.setJdbcUrl(hiveInfoJdbcUrl);
        config.setDriverClassName(hiveInfoDriverClassName);
        config.setUsername(hiveInfoUsername);
        config.setMaximumPoolSize(hiveInfoMaximumPoolSize);
        config.setConnectionTimeout(hiveInfoConnectionTimeout);

        return config;
    }

    /**
     * hdfs conf
     * @return
     */
    public org.apache.hadoop.conf.Configuration getHdfsConfig() {
        org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
        // core-site
        conf.set("fs.defaultFS", hadoopDefaultFS);
        // hdfs-site
        conf.set("dfs.replication", "3");
        conf.set("dfs.permissions", "false");

        return conf;
    }
    /**
     * hdfs conf + yarn conf
     * @return
     */
    public org.apache.hadoop.conf.Configuration getYarnConfig() {
        org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(getHdfsConfig());

        conf.set("mapreduce.job.user.classpath.first", "true");

        conf.set("mapreduce.framework.name", "yarn");
        conf.set("yarn.resourcemanager.hostname", yarnResourceManager);
        if(hadoopMapreduceJobTracker.length() > 0) {

            conf.set("mapreduce.jobtracker.address", hadoopMapreduceJobTracker);
            conf.set("mapreduce.jobhistory.address", hadoopMapreduceJobHistory);
            conf.set("yarn.app.mapreduce.am.staging-dir", "/tmp/mapred-dic/staging");

            // 필요함.
            conf.set("mapreduce.jobhistory.webapp.address", "0.0.0.0:50030");
            conf.set("mapreduce.jobtracker.http.address", "0.0.0.0:51030");
            conf.set("mapreduce.tasktracker.http.address", "0.0.0.0:51060");
        }

        if(yarnQueueName.length() > 0) {
            conf.set("mapreduce.job.queuename", yarnQueueName);
        }


        return conf;
    }

    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    public @interface HiveMapper {
    }

}
import com.google.gson.annotations.SerializedName;
import lombok.Data;

@Data
public class ImageLogJson {
    @SerializedName("field_name")
    public String name;
    @SerializedName("url")
    public String url;
}
#!/bin/bash -li

pwd_path='/Users/a1101225/Dev/git/catalog-script-11st/aa/'
del='find '$pwd_path' -type f -name "a*" -exec rm -v {} +'
echo $del
$del
eval $del

5번째 라인에서 $del 을 호출해도 실행되지 않는다.

eval 을 추가해야 실행이 된다. 

private int bitAndOperator(List<Integer> nums) {
List<String> bits = new ArrayList<>();
int maxLength = 0;

for (Integer num : nums) {
String e = Integer.toBinaryString(num);
if (maxLength < e.length()) {
maxLength = e.length();
}
bits.add(e);
}
List<String> padBits = new ArrayList<>();
for (String bit : bits) {
padBits.add(pad(bit, maxLength));
}
System.out.println(padBits);

Map<Integer, List<String>> posPerBits = new LinkedHashMap<>();
for (int i = 0; i < maxLength; i++) {
posPerBits.put(i, getAndValue(padBits, i));
}
String res = "";
for (Map.Entry<Integer, List<String>> entry : posPerBits.entrySet()) {
if (isAllPositive(entry.getValue())) {
res += "1";
} else {
res += "0";
}
}

return Integer.parseInt(res, 2);
}

private boolean isAllPositive(List<String> value) {
for (String s : value) {
if (s.equals("0")) {
return false;
}
}
return true;
}

private List<String> getAndValue(List<String> padBits, int i) {
List<String> res = new ArrayList<>();
for (String padBit : padBits) {
res.add(String.valueOf(padBit.charAt(i)));
}
return res;
}

private String pad(String input, int maxLength) {
int padLen = maxLength - input.length();
if (padLen > 0) {
for (int i = 0; i < padLen; i++) {
input = ("0" + input);
}
}
return input;
}

iconv -c -f euc-kr -t utf-8 example.txt > example_u.txt

 

#!/bin/bash
#ex) /start_batch.sh {jobname} --param1=1 --param2=2
pwd_path='/deploy/'
jar_nm='batch.jar'
file_nm=$pwd_path$jar_nm
cp_file_nm=$file_nm."$(date +'%y%m%d%H%M%S')"

function delete_filter() {
before_jar_filter=$jar_nm.$1*
for f in $before_jar_filter; do
rm -f $pwd_path$f
done
}

delete_filter "$(date -v-2d +'%y%m%d')" #2day ago
delete_filter "$(date -v-2m +'%y%m')" #2month ago

job_name=$1
cp_cmd='cp '$file_nm' '$cp_file_nm
$cp_cmd
jar_cmd='java -jar -Dspring.profiles.active=prd '$cp_file_nm' --spring.batch.job.names='$job_name

for arg in $*; do
if [[ $arg == *"--"* ]]; then
jar_cmd=${jar_cmd}' '$arg
fi
done
echo $jar_cmd


1. 백업

2. 오래된 백업삭제 jar

3. 실행

:g/^$/d


개발자를 위한 코드리뷰

코드리뷰를 왜 해야 하는가 ?

  • 시장과 비지니스의 요구사항

    • 항상 변한다.
      • 빠르게, 자주, 안정적으로 배포 해야 한다.
  • 릴리즈별 개발자 수(기하급수적 늘어남) , 릴리즈별 생산성 (늘어나지않음) , 릴리즈별 코드작성 비용 (늘어남)

  • 동작 > 복붙 > 공유부족으로 인한 개발 인력에 대한 의존도 높아짐

아키텍처의 중요성

클린코드, 좋은설계, 아키텍처에 대한 중요성

  • 중복이 하나도 없게 하는것도 리소스가 너무 들어감 (3개까지는..인정)

  • Big ball of mud

    • 뚜렷한 아키텍처 없이 구현된 시스템
  • 지속적으로 변화하는 요구사항 수용

코드를 잘짜는게 중요한 설계다.

  • Agile 더 좋은 sw개발 , 단순절자변경 개발 역량
  • Transformation ?

코드리뷰 목적

  • 주목적: 품질문제 검수 (버그/장애)
  • 부가 목적
    • 서로에게 관심
    • 지식공유
    • 집단 코드 오너십 및 결속 증대
  • 컴퓨터가 할수 있는건 컴퓨터에게 맡겨라
  • 스타일같은걸로 힘빼지 말자
  • 기분안상하게 조심

리뷰는 즉시 시작

  • 속도를 위해. PR은 작고, 범위가 좋은 사이즈로
  • 너무 크면 PR을 분리하라
  • 예제코드 제공에 관대해라 .
    • 다 알려줄 필요는 없지만 헤메는걸 보고있을 필요도 없다.

공격적 리뷰 자제

  • 코드작성자에 대한 지적은 제외해라.
  • 명령하지 마라. 요청해라
  • 의견을 줄때는 레퍼런스로 .
  • PR에 포함되지 않은 라인은 리뷰범위가 아님.

칭찬해라

  • 잘못된 부분에 집중하지 말고 좋은변경에 대한 진심어린 칭찬을 해라 (특히 주니어에게)

교착상태를 피해라.

  • 만나서 얘기해라 텍스트로는 한계가 있다.

  • 인정하거나 Escalate 해라. (그냥 승인해라)

  • 다른 리뷰어에게 할당

  • 오프라인 리뷰 최소화 (오프라인상에서 선배가 하는 리뷰는 지시로 받아들일수 있다.)

  • 코드 비난에 대한 두려움을 극복해라 - 그냥 받아들여라


+ Recent posts