관리 메뉴

밤 늦게까지 여는 카페

Fully Dynamic Algorithms for Maintaining Shortest Paths Trees 리뷰 #4 [완] - Increasing the Weight of an Edge 구현 + 버그 수정... 본문

알고리즘/Path Finding

Fully Dynamic Algorithms for Maintaining Shortest Paths Trees 리뷰 #4 [완] - Increasing the Weight of an Edge 구현 + 버그 수정...

Jㅐ둥이 2025. 4. 21. 00:42

안녕하세요. 이번에는 Fully Dynamic Algorithms for Maintaining Shortest Paths Trees의 Increasing the Weight of an Edge 부분을 구현해봤습니다.

 

작성한 코드와 준비한 시나리오를 같이 봐보시죠!


1. Increasing the Weight of an Edge 구현

코드를 보여드리면 다음과 같습니다.

WHITE = 0
PINK = 1
RED = 2

def increase_edge_weight(
    graph: Graph,
    output_info_dict: Dict[int, OutputInformation],
    edge_id: int,
    amount: int,
):
    edge = graph.get_edge(edge_id=edge_id)
    if amount <= 0:
        raise Exception("amount cannot be zero or negative value")

    edge.set_weight(edge.weight + amount)

    for source_node in graph.nodes.values():
        increase_edge_weight_for_single_source(
            graph,
            output_info_dict[source_node.id],
            edge,
        )

def increase_edge_weight_for_single_source(
    graph: Graph,
    output_info: OutputInformation,
    edge: Edge,
):
    # Step 1
    shortest_tree_dict = output_info.shortest_tree_dict
    start_node = edge.start_node
    end_node = edge.end_node
    
    if start_node.id not in shortest_tree_dict or end_node.id not in [t.value.id for t in shortest_tree_dict[start_node.id].get_children()]:
        return

    color_dict = {}
    m = [
        PrioritizedItem(
            priority=output_info.get_distance(end_node.id),
            item=end_node,
        ),
    ]

    # Step 2
    count = 0
    trace = []
    while len(m) > 0:
        count +=1
        if count > 20:
            break
        current_item = heapq.heappop(m)
        source_node = current_item.item
        trace.append([source_node.id, []])
        is_non_red_neighbor = False
        for adjacent_edge in graph.get_inbound_edges(source_node.id):
            sink_node = adjacent_edge.get_other_node(source_node.id)
            if color_dict.get(sink_node.id, -1) == RED:
                continue
            if output_info.get_distance(sink_node.id) + adjacent_edge.get_weight() == output_info.get_distance(source_node.id):
                color_dict[source_node.id] = PINK
                output_info.update_parent_of_node(node_id=source_node.id, parent_id=sink_node.id)
                is_non_red_neighbor = True
        
        if not is_non_red_neighbor:
            color_dict[source_node.id] = RED
            for child in output_info.get_children(source_node.id):
                trace[-1][1].append(child.value.id)
                item = PrioritizedItem(
                    priority=output_info.get_distance(child.value.id),
                    item=child.value,
                )
                heapq.heappush(m, item)

    # Step 3.a
    q = []
    q_dict = {}
    for node_id, color in color_dict.items():
        if color != RED:
            continue
        source_node = graph.get_node(node_id)

        output_info.set_distance(node_id=source_node.id, distance=float("inf"))
        output_info.remove_parent_of_node(node_id=source_node.id)

        best_distance = float("inf")
        for adjacent_edge in graph.get_inbound_edges(source_node.id):
            sink_node = adjacent_edge.get_other_node(source_node.id)
            if sink_node.id in color_dict and color_dict[sink_node.id] == RED:
                continue
            new_distance = output_info.get_distance(sink_node.id) + adjacent_edge.get_weight()
            if new_distance < best_distance:
                best_distance = new_distance
                output_info.set_distance(source_node.id, best_distance)
                output_info.update_parent_of_node(node_id=source_node.id, parent_id=sink_node.id)

        item = PrioritizedItem(
            priority=output_info.get_distance(source_node.id),
            item=graph.get_node(source_node.id),
        )
        heapq.heappush(q, item)
        q_dict[source_node.id] = item

    # Step 3.b
    while len(q) > 0:
        current_item = heapq.heappop(q)
        source_node = current_item.item
        del q_dict[source_node.id]

        for adjacent_edge in graph.get_outbound_edges(source_node.id):
            sink_node = adjacent_edge.get_other_node(source_node.id)
            if sink_node.id not in color_dict or color_dict[sink_node.id] != RED:
                continue
            new_distance = output_info.get_distance(source_node.id) + adjacent_edge.get_weight()
            if new_distance >= output_info.get_distance(sink_node.id):
                continue
            output_info.set_distance(sink_node.id, new_distance)
            output_info.update_parent_of_node(node_id=sink_node.id, parent_id=source_node.id)
            if sink_node.id in q_dict:
                q_dict[sink_node.id].priority = new_distance
                heapq.heapify(q)
            else:
                item = PrioritizedItem(
                    priority=output_info.get_distance(sink_node.id),
                    item=graph.get_node(sink_node.id),
                )
                heapq.heappush(q, item)
                q_dict[source_node.id] = item

 

 

처음에는 모르는 용어가 너무 많아서 과연 내가 이해할 수 있을까 걱정을 많이 했었는데

계속 읽다보니 의외로 간단한? 알고리즘인 것 같았습니다.

 

물론 알고리즘이 정확하게 동작한다는 증명까지 이해한 것은 아니라서 반쪽짜리 공부였지만요 😵😵😵

 

논문에 작성되어 있는 pseudo code를 구현하는데만 집중해서 코드가 복잡하고 효율이 부족한 부분도 있습니다 ㅜ

다른 알고리즘을 구현하면서 차근차근 수정해보겠습니다!

 

2. 확인해보고 싶었던 시나리오

제가 이 논문을 읽었던 이유는 agent가 몰려 있는 상황을 엣지 weight의 변경으로 해소하고 싶었고

이를 위해 distance table을 빠르게 업데이트 하는 방법을 찾아야 했기 때문입니다.

  • distance table은 모든 노드에서 모든 노드까지의 최적 경로에 대한 최소 비용을 저장하고 있는 자료 구조입니다.
  • 특히 Floyd warshall 알고리즘보다 빠르게 최소 비용을 구하고 싶었습니다.

참고: https://goodahn.tistory.com/292

 

그래서 agent가 몰려 있는 상황을 엣지 weight의 변경으로 해소하는 상황을 다음과 같이 정의했습니다.

  1. agent는 전체 노드 개수의 5%만큼 존재한다.
  2. agent가 이동할 때마다 agent가 위치하는 노드의 inbound 엣지들의 weight을 40만큼 증가시킵니다.
  3. agent가 이동하면 2에서 증가시킨 엣지 weight을 다시 감소시킵니다.

위의 상황을 스크립트로 작성한 것입니다.

import sys
import time
import random


from single_agent import floyd_warshall
from single_agent.dynamic_problem import (
    decrease_edge_weight,
    increase_edge_weight,
)

from utils.fixture import make_grid


if __name__ == "__main__":
    width, height = int(sys.argv[1]), int(sys.argv[2])

    # 1. construct a grid map
    start_time = time.time()
    graph = make_grid(width, height, 10)
    end_time = time.time()
    print(
        f"execution time of constructing {width}x{height} grid: {end_time - start_time} s"
    )

    # 2. measure the execution time of floyd warshall
    start_time = time.time()
    output_info_dict = floyd_warshall.make_output_information(graph)
    end_time = time.time()
    print(f"execution time of floyd warshall: {end_time - start_time} s")

    # 3. measure the execution time of dynamic algorithm
    start_time = time.time()
    for test_node_id in random.sample(list(range(width*height)), width*height//20*4):
        test_edges = graph.get_inbound_edges(test_node_id)
        for test_edge in test_edges:
            decrease_edge_weight(graph, output_info_dict, test_edge.id, test_edge.weight - 1)
    for test_node_id in random.sample(list(range(width*height)), width*height//20*4):
        test_edges = graph.get_outbound_edges(test_node_id)
        for test_edge in test_edges:
            increase_edge_weight(graph, output_info_dict, test_edge.id, 40)
    end_time = time.time()
    print(
        f"execution time of dynamic algorithm in updating edge weight: {end_time - start_time} s"
    )

스크립트의 결과는 다음과 같습니다.

Floyd Warshall보다 빠르긴하지만 edge weight의 업데이트 수가 많다보니 10배 이상 빠른 정도는 아니더라고요.

알고리즘 \ 격자지도 크기 10 x 10 20 x 20 30 x 30
Floyd Warshall 0.26초 16.86초 184.68초
논문 알고리즘 0.17초 6.08초 40.58초

10 x 10 실행 시간
20 x 20 실행 시간
30 x 30 실행 시간

 

 

P.S.

Floyd warshall 알고리즘은 모든 노드에서 모든 노드까지의 최적 경로를 구하는 All-pairs shortest paths 알고리즘이고

이 논문에서 제시하는 알고리즘은 single-source shortest paths 알고리즘입니다.

엄밀히 따지면 다른 종류의 알고리즘들을 비교하는 것이지만
dynamic problem을 처음 공부하는 입장에서 가능성을 보기 위한 시도라고 봐주시면 감사하겠습니다.

3. 예상치 못했던 버그들

Increasing the Weight of an Edge 부분을 구현하면서 기존 알고리즘들의 버그를 찾을 수 있었습니다.

 

bellman ford 알고리즘은 loop문이 하나 없는 상태로 실행되고 있었고,

floyd warshall 알고리즘은 shortest path tree를 잘못 만들고 있었습니다...

 

역시 기본이 중요한 것 같습니다😥

4. 다음 공부

앞서 말씀드렸다시피 Fully Dynamic Algorithms for Maintaining Shortest Paths Trees 논문은 single-source shortest paths 알고리즘입니다.

 

그래서 All-pairs shortest paths를 구하는데는 최적의 알고리즘이 아니었을 것입니다.

 

다음에는 All-pairs shortest paths 문제를 효율적으로 dynamic algorithm을 공부해보겠습니다.

 

 

🎁 Fully Dynamic Algorithms for Maintaining Shortest Paths Trees 이전 리뷰들 🎁

반응형