버튼 수집상

[Elastic Search] 한글 문장 벡터화해서 유사한 문장 찾기 알고리즘 구현 - 2 본문

TIL - Elastic Search

[Elastic Search] 한글 문장 벡터화해서 유사한 문장 찾기 알고리즘 구현 - 2

cocokaribou 2023. 11. 21. 16:59

Elastic Search Search API 결과

{
  'took': 5,
  'timed_out': False,
  '_shards': {
    'total': 1,
    'successful': 1,
    'skipped': 0,
    'failed': 0
  },
  'hits': {
    'total': {
      'value': 1,
      'relation': 'eq'
    },
    'max_score': 1.0,
    'hits': [
      {
        '_index': 'INDEX_NAME',
        '_id': 'ID',
        '_score': 1.0,
        '_source': {
          'item_idx': 4561,
          'subject': '글 제목 예시',
          'contents': "글 내용 예시입니다. 보통 이것보다도 줄줄이 길게 들어갑니다.",
          'author_nick': '작성자명',
          'created_at': '2009-08-08 18:44:45',
          'replies': [
            '댓글들입니다',
            '테스트 댓글'
          ]
        }
      }
    ]
  }
}

took: 걸린 시간, milliseconds 단위

timed_out: 리스폰스 타임아웃 여부

_shards: 검색에 관연된 샤드 수, 엘라스틱 서치 하나의 인덱스는 여러개의 샤드로 나뉜다.

hits: 실제 검색결과에 관한 정보

 

 

동적 매핑

인덱스에 도큐멘트를 추가하면 매핑이 생성된다.

매핑(필드) 확인하기

# python elasticsearch 설치
!pip install elasticsearch

# elasticsearch 연결
es = Elasticsearch(hosts = [{'host': 'URL', 'port': 8080, 'scheme' : "https"}],
    request_timeout=300, max_retries=10, retry_on_timeout=True,
    basic_auth=('USER', 'PW')
)

# 인덱스의 mappings 가져오기
result = es.indices.get_mapping(index="INDEX")
print(result["INDEX"]["mappings"]["properties"])

 

 

특정 필드에서 검색하기

term VS match

query_dsl = {
  "term": {
    "field_name": "exact_value"
  }
}
query_dsl = {
  "match": {
    "field_name": "text to search"
  }
}
정확히 일치하는 값을 찾는다. 유연한 검색
부분일치, 오타보정, 유사값 등을 찾을 수 있다.

 

전체 record 개수 검색할 때

search API의 match_all 쿼리를 쓰지 말고 count API를 쓴다

search API의 검색 결과 개수에 상한선이 존재하기 때문.

def get_total():
  query_dsl = {
    "match_all" : {}
  }
  result = es.search(index="INDEX", query=query_dsl)
  return result['hits']['total']['value']
def get_total():
  result = es.count(index="INDEX")
  return result['count']

검색결과 최대 10000개

전체 개수가 제대로 출력됨

 

검색 결과 Paging하기

from과 size 값을 정한다.

 

전체 레코드를 검색

search API 말고 scroll API를 사용한다

_scroll_id 값을 다음 배치에 넘겨야 한다.

scroll_query = {
  "query": {
    "match_all": {}
  }
}

# 위 코드블럭에서 연결한 Elasticsearch 객체 es
result = es.search(index="INDEX", scroll="2m", size=1000, body=scroll_query)
scroll_id = result['_scroll_id'] # 다음 배치를 돌리기 위해서 필요

while len(result['hits']['hits']) > 0:
  result = es.scroll(srcoll_id=scroll_id, scroll='2m')
  # ...

 

 

한글 형태소 분석기 koNLPy

"코-엔엘파이" 라고 읽는다.

다양한 형태소 분석 클래스를 제공한다.

Hannanum, Kkma, Komoran, Mecab, Okt 중에서 Okt로 실습해봤다.

각각의 클래스는 대략 아래와 같은 분석 함수를 제공한다.

noun() 단어를 리턴
pos() Part of Speech의 약자
문장 구성요소로 분석해주는듯 (동사, 명사, 조사 등 구분코드)
phrases() 구절을 리턴
morphs() 형태소 morpheme의 약자
문장을 형태소로 분석

 

 

Doc2Vec 모델 gensim 생성하기

"겐심"이라고 읽는다.

1. 문장의 단어마다 인덱스를 달아서 TaggedDocument를 생성

2. TaggedDocument 넣어서 Doc2Vec 객체 초기화

3. 단어장 생성

4. 모델 훈련

5. seed 값은 0으로 설정

6. 모델 한번 생성할 때 오래걸림, 생성해둔 모델은 디스크에 저장해둔다!
7. 데이터가 바뀔 때마다 한 번씩 배치로 모델 훈련.

 

지금 한번 돌려봤다가 실행시간 장장 50분을 향해 달려가고 있다;;

 

예시 코드

if __name__ == "__main__":
    # korean word tokenizer
    okt = Okt()

    # fetch data from Elasticsearch
    scroll_query = {
        "query" : {
            "match_all" : {}
        }
    }
    scroll_size = 1000
    result = es.search(index=INDEX, scroll='2m', size=scroll_size, body=scroll_query)
    scroll_id = result['_scroll_id']

    # extract "contents" field from documents
    documents = [hit['_source']['contents'] for hit in result['hits']['hits']]

    # scroll api
    while len(result['hits']['hits']) > 0:
        result = es.scroll(scroll_id=scroll_id, scroll='2m')
        documents.extend([hit['_source']['contents'] for hit in result['hits']['hits']])

    es.clear_scroll(scroll_id=scroll_id)

    # convert documents into TaggedDocument
    tagged_data = [TaggedDocument(words=okt.morphs(doc), tags=[i]) for i, doc in enumerate(documents)]

    # train Doc2Vec
    model = Doc2Vec(vector_size=256, window=2, min_count=1, workers=4, epochs=100)
    model.build_vocab(tagged_data)
    model.train(tagged_data, total_examples=model.corpus_count, epochs=model.epochs)
    model.random.seed(0)

    # save model to persistent disk
    model.save(model_save_path)

    # Use a scroll query to iterate through all documents in the index
    scroll_query = {
        "query": {
            "match_all": {}
        }
    }

    result = es.search(index=index_name, scroll='2m', size=scroll_size, body=scroll_query)
    scroll_id = result['_scroll_id']

    # Update documents with the inferred vectors
    for hit in result['hits']['hits']:
        contents_value = hit['_source']['contents']
        vector_value = model.infer_vector(okt.morphs(contents_value))

        update_query = {
            "doc": {
                "d2v_vector": vector_value.tolist()
            }
        }

        es.update(index=index_name, id=hit['_id'], body=update_query)

    # Continue scrolling to update all documents
    while len(result['hits']['hits']) > 0:
        result = es.scroll(scroll_id=scroll_id, scroll='2m')

        for hit in result['hits']['hits']:
            contents_value = hit['_source']['contents']
            vector_value = model.infer_vector(okt.morphs(contents_value))

            update_query = {
                "doc": {
                    "d2v_vector": vector_value.tolist()
                }
            }

            es.update(index=index_name, id=hit['_id'], body=update_query)

    # Clear the scroll
    es.clear_scroll(scroll_id=scroll_id)

전체 document 검색 스크롤을 두 번 돌린다.. -> 피드백1번 '학습'과 '추론'을 분리해야 함

이것보다 잘 짤 수 있을 것 같은데 그냥 되게만 만드느라 급급한 느낌.

추론하면서 벡터 뽑을 때는 search api의 filter 기능을 써봐야겠다.

 

 

피드백

1. 학습과 추론은 별도의 함수로 구분한다.

2. 학습시킬 때 런타임 환경을 체크해서 GPU로 돌린다.

3. Doc2Vec 모델 객체를 만들때의 파라미터 값을 잘 조정해서 넣어야 한다. (하이퍼 파라미터)

모델 명세서를 읽고 하나하나 조정해가면서 시간 체크.

4. Training Loss와 Validation Loss가 있다.

Training Loss는 훈련시킬 때 데이터간의 오차범위, Validation Loss는 모델에 인풋을 넣었을 때의 오차범위.

이게 0으로 떨어지는 시점에 훈련을 stop 해도 ok.

보통은 Validation Loss가 더 낮다.

오버핏, 언더핏 주의.

5. gensim Doc2Vec 은 epoch 돌 때마다 training loss를 찍는 모델이 구현이 안 돼있다.

from gensim.models.callbacks import CallbackAny2Vec

class LossCallback(CallbackAny2Vec):
    '''Callback to print loss after each epoch.'''

    def __init__(self):
        self.epoch = 0
        self.previous_loss = 0

    def on_epoch_end(self, model):
        loss = model.get_latest_training_loss() - self.previous_loss
        self.previous_loss = model.get_latest_training_loss()
        print('Loss after epoch {}: {}'.format(self.epoch, loss))
        self.epoch += 1

CallbackAny2Vec은 gensim의 Word2Vec, Doc2Vec 모델에 모두 지원.

model = Doc2Vec(vector_size=50, window=2, min_count=1, dm=1, workers=4)
model.train(tagged_data, total_examples=model.corpus_count, epochs=100, callbacks=[LossCallback()])

 

Doc2Vec 모델 만들때 callback 파라미터에 넣으면 로그를 볼 수 있다.

그런데 gensim의 CallbackAny2Vec 클래스의 get_latest_training_loss() 함수가 Doc2Vec에선 지원이 안 된다는 모양이다.

https://github.com/piskvorky/gensim/issues/2983

 

track training loss while using doc2vec issue. · Issue #2983 · piskvorky/gensim

Problem description I am trying to track training loss using doc2vec algorithm. And it failed. Is there a way to track training loss in doc2vec? Also, I didnt find any documentation related to perf...

github.com

 

 

ElasticSearch에 저장한 벡터는 cosine_similarity 를 돌려서 유사한 문장을 검색할 수 있음.

GET cos_sim_test/_search?filter_path=hits.total,hits.hits._score,hits.hits._source.text_name
{
  "size": 5, 
  "query": {
    "script_score": {
      "query": {"match_all": {}},
      "script": {
        "source": "cosineSimilarity(params.query_vector, 'vec') + 1.0",
        "params": {"query_vector": [0.4,0,0,0.2,0.6] }
      }
    }
  }
}

 

*계속 수정됩니다.

728x90