异步广度优先搜索算法

Published on 1/15/2017 9:21:44 AM

为什么要异步?

CPU的工艺越来越小,Cannon Lake架构的Intel CPU已经达到10nm技术,因此在面积不变的情况下,核心数可以明显提升。单纯的提升主频将造成发热量大、需要的电压大、功耗大的问题。而传统的算法与数据结构是针对单核心单线程同步而言的,因此,传统的算法无法将CPU利用率达到最大。

广度优先搜索

首先我们先了解一下与之对应的深度优先搜索(DFS),深度优先搜索即像走迷宫时,始终保持左手与左侧墙壁接触,换言之即遇到岔路时永远向左拐,从而寻找出口。而广度优先搜索,则在每个岔路时,变出一个分身,继续前进。

但是,实际上是这样吗?答案是否定的,刚刚已经讲到,传统的算法与数据结构是建立在单线程同步基础上的,因此传统算法只能够模拟分身在同时前进,这时就要引入队列来保存和展开岔路节点(当遇到新岔路时,将这个节点放入队列。队列头部元素进行展开寻找新的岔路并放入队列尾部)。

基于Parallel的并行广度优先搜索

而在并行或异步以及多线程的环境下,我们可以真的让“分身”们同时前进。首先使用并行广度优先搜索的前提是你不在意真的保证了广度是同步的,虽然并行广度优先搜索能够寻找到全部解,但是无法保证同一时刻进行搜索的任务是在同一深度的。

在这一前提下,我们以遍历图为例,首先定义邻接表的数据结构:

public class Node
{
    public string Value { get; set; }

    public LinkedList<Node> Nodes { get; set; } = new LinkedList<Node>();
}

假设我们的图结构如下:

file

进行数据的初始化:

public static void Main(string[] args)
{
    var A = new Node { Value = "A" };
    var B = new Node { Value = "B" };
    var C = new Node { Value = "C" };
    var D = new Node { Value = "D" };
    var E = new Node { Value = "E" };
    var F = new Node { Value = "F" };
    var G = new Node { Value = "G" };
    A.Nodes.AddLast(B);
    A.Nodes.AddLast(C);
    A.Nodes.AddLast(D);
    B.Nodes.AddLast(A);
    B.Nodes.AddLast(D);
    C.Nodes.AddLast(A);
    D.Nodes.AddLast(A);
    D.Nodes.AddLast(B);
    D.Nodes.AddLast(E);
    E.Nodes.AddLast(D);
    E.Nodes.AddLast(F);
    F.Nodes.AddLast(E);
    F.Nodes.AddLast(G);
    G.Nodes.AddLast(F);

    // TODO: Async visit
}

在此处,姑且认为Node.GetHashCode()可以作为Node的唯一标识,我们来定义一个HashSet来存储已经访问过的Node标识:

private static HashSet<int> Visited = new HashSet<int>();

此时,我们只需要按照正常编写深度优先搜索的递归方法编写即可,但其中的循环使用Parallel提供的循环方法,这样即可实现广度搜索:

public void Visit(Node n)
{
    lock (Visited)
    {
        if (Visited.Contains(n.GetHashCode()))
        {
            return;
        }
        Visited.Add(n.GetHashCode());
    }
    Console.WriteLine($"{ n.Value } ");
    Parallel.ForEach(n.Nodes, x => {
        Visit(x);
    });
}

基于Task的异步广度优先搜索

如果我们需要在进行搜索时,保持同一个时间点的任务所涉及到的节点的深度一致,我们就需要将上述方法改写成异步方式,并使用异步信号量来使处于同一深度的Task等待同深度其他Task完成:

首先定义一个异步信号控制器类AsyncSemaphore,其中包含一个公共构造方法和两个公共方法:

public class AsyncSemaphore
{
    public void AddTaskCount(int)
    public Task WaitAsync();
    public void Release();
}

该类被初始化时,认为需要等待的任务数量为0,通过调用AddTaskCount来增加需要等待的任务数,WaitAsync被调用时,将先判断需要等待的任务数量是否与已经完成的任务数量相等,如果相等则不等待,不相等则返回一个等待信号。当Release被调用后,判断需要等待的任务数量是否与已经完成的任务数量相等,如果相等则置所有等待信号放行。

因此这个类的具体实现如下:

public class AsyncSemaphore
{
    private int m_totalCount = 0;
    private int m_finishedCount = 0;
    private readonly List<TaskCompletionSource<bool>> m_waiters = new List<TaskCompletionSource<bool>>();
    private readonly static Task s_completed = Task.FromResult(true);
    public void AddTaskCount(int count)
    {
        m_totalCount += count;
    }
    public Task WaitAsync()
    {
        lock (m_waiters)
        {
            if (m_finishedCount == m_totalCount)
            {
                return s_completed;
            }
            else
            {
                var waiter = new TaskCompletionSource<bool>();
                m_waiters.Add(waiter);
                return waiter.Task;
            }
        }
    }
    public void Release()
    {
        lock (m_waiters)
        {
            m_finishedCount++;
            if (m_finishedCount == m_totalCount)
            {
                Parallel.ForEach(m_waiters, x => {
                    x.SetResult(true);
                });
                m_waiters.Clear();
            }
        }
    }
}

在编写Visit方法之前,我们需要对每个深度设置一个锁,因此我们需要定义一个Dictionary来存储各个深度(或叫层)的锁:

private static Dictionary<int, AsyncSemaphore> Lockers = new Dictionary<int, AsyncSemaphore>();

同时,需要为起点层预设一个锁:

Lockers.Add(0, new AsyncSemaphore());
Lockers[0].AddTaskCount(1);

接下来编写VisitAsync方法,该方法是一个异步函数,第一个参数接收节点,第二个参数为当前深度,起点深度为0。

public static async Task VisitAsync(Node n, int deep = 0)

在VisitAsync方法被调用时,应先检查该节点是否被访问:

lock (Visited)
{
    if (Visited.Contains(n.GetHashCode()))
    {
        Lockers[deep].Release();
        return;
    }
    Visited.Add(n.GetHashCode());
}

这里需要额外说明的一点就是,如果这个节点被访问过,也是需要释放锁的。因为在后面的节点展开代码中,我们并没有过滤节点是否被访问过,因此访问过的节点也包含在了AddTaskCount()的参数中。

接下来,我们需要检查下一层的锁有无被初始化:

lock(Lockers)
{
    if (!Lockers.ContainsKey(deep + 1))
    {
        Lockers.Add(deep + 1, new AsyncSemaphore());
    }
}

这些准备工作完成后,即可输出当前节点的Value,输出后,我们计算一下当前节点有多少子节点,将这个数值累加到下一层的异步锁中,添加完毕后,通知本层锁已经完成了一个任务并等待本层其他任务完成后,继续展开本节点:

Console.Write($"{ n.Value } ");
Lockers[deep + 1].AddTaskCount(n.Nodes.Count);
Lockers[deep].Release();
await Lockers[deep].WaitAsync();
Parallel.ForEach(n.Nodes, x => {
    VisitAsync(x, deep + 1);
});

运行调试时,我们可以观察到A永远是第一个输出的,结尾顺序永远是EFG,而第二层的顺序是不固定的。因此证明了广度优先搜索是成功的。以上即为基于异步实现的广度优先搜索。

file

分享到:

Comments

使用微信扫码