Haskell 并发编程:快速、简单、正确
Haskell 并发编程:快速、简单、正确
2025年4月13日
在 C、C++ 和 Rust 中构建嵌入式系统近十年之后,我不知何故最终以编写 Haskell 为生。 如果几年前你问我函数式编程,我会告诉你那是自我放纵的学院派胡说八道——然后我偶然发现有人将它用于实时系统,在这些系统中,微秒可能意味着真正的生死。
我已经太老了,无法试图说服人们应该使用什么工具,但 Haskell 有一些功能可能会让任何关心快速、正确代码的人感兴趣。 让我们来谈谈它们。
我们将从并发开始。
有些人遇到问题时,会想,“我知道了,我要用正则表达式”。 现在他们有两个问题了。 —Jamie Zawinski 有些人遇到问题时,会想,“我知道了,我要用线程”,然后他们就有了两个问题。 —Ned Batchelder
正如我们之前讨论过的,我们在追求速度时主要关注两个方面:
- 你的计算机(即使是口袋里的那台)也有多个核心。 要使用整台计算机,你需要将工作分配到这些核心上。
- 外部世界很慢——网络和磁盘 IO 比计算慢数千倍。 在等待时保持计算!
因此,我们需要将工作分解为独立的任务,通常有两种方式1:
- 将程序组合成多个执行_线程_,传统上由操作系统调度和运行。
- 将程序组合为一系列回调或_延续_,这些回调或延续在其他一些动作(例如,IO)完成后运行。
选项 2 具有一些不错的性能优势,尤其是在与事件驱动 IO 结合使用时。 观看 Ryan Dhall 向世界介绍 Node.js——他并不特别关心 JavaScript;他只是试图让这种并发更容易访问。 但是,延续传递有其自身的问题。 即使像 async/await
这样的语法糖使其_看起来_是按顺序运行的,但调试也可能是一种令人沮丧的体验。 传统的堆栈跟踪消失了,你可能会问自己,“好吧,我是怎么到这里来的?”
线程和您
Haskell 试图兼顾两者的优点:线程是它的并发原语,但它们是_绿色_线程,由运行时在(OS)线程池上调度,并由事件驱动 IO 提供支持。
让我们快速了解一下基础知识,以便我们可以了解一些很酷的东西。 我们可以使用 forkIO
(https://hackage.haskell.org/package/base/docs/Control-Concurrent.html#v:forkIO))生成线程,它在新线程中运行给定的动作并返回线程 ID:
import Control.Concurrent
main :: IO ()
main = do
_tid <- forkIO $ putStrLn "Hello from thread 2!"
putStrLn "Look ma, concurrent prints!"
这是一个开始,但我们如何等待线程完成或查看它返回的内容? 除了杀死它之外,我们无法对线程的 ID 做太多事情。 我们在 async 包中找到答案,它为我们的新线程提供了一个_promise_:
async :: IO a -> IO (Async a)
…我们可以 wait
等待! 或者取消,如果我们心情不好:
wait :: Async a -> IO a
cancel :: Async a -> IO ()
所以,
import Control.Concurrent.Async
main :: IO ()
main = do
hFut <- async $ readFile "hello.txt"
putStrLn "Reading file..."
helloContents <- wait hFut
putStrLn helloContents
但是,有时我们不想等待我们生成的线程。 考虑一个服务器,它为每个连接的客户端分出一个线程。 它可以将这些新线程链接到自身,以便失败2向上回溯。
serveLoop :: Socket -> (Socket -> SockAddr -> IO ()) -> IO ()
serveLoop listener clientHandler = do
(clientSock, clientAddr) <- accept listener
-- Handle each client in their own thread
clientThread <- async $ clientHandler clientSock clientAddr
-- Silently swallowing errors is bad, mmk?
link clientThread
serveLoop listener clientHandler
没有线程,只有并发
我们还有很多要弄清楚。 我们应该如何等待多个线程? 如果一个线程失败,我们可以取消其他线程吗? 如果_我们_(调用者)被取消了会发生什么?3
使用正确的工具,正确的答案是“不用担心”。
-- Runs each action in its own thread and returns the results
concurrently :: IO a -> IO b -> IO (a, b)
-- Runs each action in its own thread,
-- returning whichever finishes first.
race :: IO a -> IO b -> IO (Either a b)
-- Run a function (mapping a to b) in a separate thread
-- for each element of a data structure
mapConcurrently :: Traversable t => (a -> IO b) -> t a -> IO (t b)
-- And much more...
在这些中的每一个,如果一个线程失败,其余的线程将被取消。 如果父线程失败,所有子线程都将被取消。 这非常具有声明性——工作并发进行,一旦应该停止就停止,我们不关心生成和连接单个线程。
还有一个 Concurrently
类型,我们可以将其应用于我们自己的抽象。 想要一个并发评估的元组?
(page1, page2, page3) <- runConcurrently $ (,,)
<$> Concurrently (getURL "url1")
<*> Concurrently (getURL "url2")
<*> Concurrently (getURL "url3")
或者一次运行整个动作集合并收集结果?
runAll :: (Foldable f, Monoid m) => f (IO m) -> IO m
runAll = runConcurrently . foldMap Concurrently
(Haskell 构建通用代码的能力,这些代码可以处理“任何可折叠的”或“任何可遍历的”事物,是另一项超能力值得讨论,但让我们今天略过 FP 术语。)
STM 和等待的艺术
因为受伤的人将会是那些停滞不前的人。 —Bob Dylan
太棒了,我们有线程了! 接下来,我们需要它们相互通信——这是人们在说并发很难时想到的部分。 进入 Haskell 的真正魔力:STM。
STM 是“软件事务内存”的缩写,它定义了一些特殊的类型。 基础类型是 TVar
(https://hackage.haskell.org/package/stm/docs/Control-Concurrent-STM-TVar.html#t:TVar)):
-- A "transactional variable"
data TVar a
-- Create a TVar holding any type at all, then...
newTVarIO :: a -> IO (TVar a)
-- ...atomically read...
readTVar :: TVar a -> STM a
-- ...and atomically write.
writeTVar :: TVar a -> a -> STM ()
该库使用它来构建其他有用的类型,例如有界队列:
data TBQueue a
-- Create one of the given length
newTBQueueIO :: Natural -> IO (TBQueue a)
-- Write to the queue, blocking if full
writeTBQueue :: TBQueue a -> a -> STM ()
-- Read from the queue, blocking if empty
readTBQueue :: TBQueue a -> STM a
-- Read from the queue, returning Nothing if empty
tryReadTBQueue :: TBQueue a -> STM (Maybe a)
-- And so on...
你会注意到读取和写入不是 IO 动作——它们是 STM 动作。 我们如何使用它们? 作为原子事务的一部分,当然。
atomically :: STM a -> IO a
顾名思义,atomically
充当临界区——其中的一切都会一次性发生。 最无聊的是,我们可以使用它来读取和写入我们的 STM 类型:
-- A silly concurrent cat:
-- read stdin in one thread, write to stdout in the other.
main :: IO ()
main = do
q <- newTBQueueIO 1024
let reader = do
l <- getLine
atomically $ writeTBQueue q l
reader -- loop!
let printer = do
l <- atomically $ readTBQueue q
putStrLn l
printer -- loop!
-- Run each in their own thread:
concurrently_ reader printer
但真正的力量在于 STM 函数如何组合。
假设我们想要一个可以_关闭_的队列。 我们的小程序只有在数据停止之前才能很好地工作——将文件通过管道传输到它或点击 Ctrl+D 并:
cat.hs: <stdin>: hGetLine: end of file
让我们修复它。
-- The C is for Closeable!
data TBCQueue a = TBCQueue {
queue :: TBQueue a,
open :: TVar Bool
}
-- Make a new closeable queue with the given capacity.
newTBCQueueIO :: Natural -> IO (TBCQueue a)
newTBCQueueIO n = TBCQueue <$> newTBQueueIO n <*> newTVarIO True
-- Closing means it's no longer open.
closeTBCQueue :: TBCQueue a -> STM ()
closeTBCQueue q = writeTVar q.open False
如果通道已关闭,我们将使写入变为无操作。(返回 open
将是另一个可行的选择。)
writeTBCQueue :: TBCQueue a -> a -> STM ()
writeTBCQueue q v = do
stillOpen <- readTVar q.open
when stillOpen $ writeTBQueue q.queue v
读取更有趣一些——我们希望在队列打开时等待一个值,然后在它关闭_(且为空!)_后,返回 Nothing
。
readTBCQueue :: TBCQueue a -> STM (Maybe a)
readTBCQueue q = do
-- Try to read from the queue
maybeV <- tryReadTBQueue q.queue
case maybeV of
-- If there was a value in the queue, just return it.
Just v -> pure $ Just v
-- If the queue was empty...
Nothing -> do
-- ...Is the queue still open?
-- If so we need to wait,
-- otherwise return Nothing to indicate it's closed.
stillOpen <- readTVar q.open
if stillOpen
then retry
else pure Nothing
你可能会问 retry
是什么? 它会中止整个事务并重试。
添加一些逻辑来检查派对何时结束,我们可以优雅地处理 EOF:
main :: IO ()
main = do
q <- newTBCQueueIO 1024
let reader = do
eof <- isEOF
if eof
then atomically $ closeTBCQueue q
else do
l <- getLine
atomically $ writeTBCQueue q l
reader -- loop!
let printer = do
maybeL <- atomically $ readTBCQueue q
case maybeL of
Nothing -> pure ()
Just l -> do
putStrLn l
printer -- loop!
concurrently_ reader printer
如果您想自己尝试一下,TBCQueue
和一些相关的好东西可以在这里找到。
但首先,停下来欣赏一下魔力。 我们正在原子地操作队列_和_ open
标志,并且没有看到互斥锁。 此外,readTBCQueue
看起来像通过调用 retry
进行忙等待循环,但是当我们运行程序时,没有核心受到损害! Haskell 运行时跟踪每个事务中涉及的 TVar,并且仅在写入器更改其中一个时才唤醒 retry
的线程。
想象一下,你将如何使用条件变量、CAS 和 futex、事件组或你所知道和喜欢的任何其他原语来实现这种等待/唤醒行为。 至少可以这么说,这将是棘手的。 在这里,无需担心虚假唤醒或死锁。 而且,由于只有 STM
动作可以进入 atomically
,因此我们不会不小心将任意 IO 拉入这些临界区。 正如 Rust 使大多数内存错误在类型级别上不可能发生4一样,STM 消除了整个类别的并发问题。
我认为这非常棒。
- …至少在用户空间程序中。 在裸机嵌入式系统或操作系统的_内部_,将计算手动分解为由中断驱动的状态机并不罕见。 但那是一场完全不同的比赛。↩
- 通过_失败_,我的意思是_异常_。 Haskell 有它们。 我通常不太喜欢它们,因为它们会创建函数类型未表达的“隐形”控制流。 (我们喜欢类型,对吗?)但是使 IO 失败_异常_确实给了我们一个明确的信号,表明发生了超出我们控制的事情。 它也使快乐的道路更加清洁。↩
- 它变得更奇怪:在 Haskell 中,异常可以被抛_给_其他线程! 这当然是非正统的,但它是一种干净的带外信令机制。 点击 Ctrl+C 只是向主线程发送一个
UserInterrupt
异常,并且cancel
函数所做的只是向目标线程抛出一个AsyncCancelled
异常并等待它死亡。↩ - 当然,两种语言都有转义舱口,并且一个充分激励的笨蛋可以搬起石头砸自己的脚,但我很乐意接受一个使搞砸事情变得更加困难的系统。↩
-
联系方式:
-
matt bitbashing.io