Isolating problematic Cgo code
Streaming video decoding via file descriptor passing
Introduction
KCTV_bot watches an HLS video stream and posts screengrabs to Twitter. Because the video source (North Korean state television) is not regularly available, some image processing must be performed to recognize when the channel is live.
Although the code is written in Go, the native options for decoding segments of video to get at individual frames are underwhelming. Fortunately, there exist Cgo charlestamz/goav for the popular audio/video library ffmpeg.
The process of decoding an MPEG video segment to iterate over each individual
video frame is a bit involved.
To get an idea of the complexity, begin reading HandleSegment
at the call to AvformatAllocContext()
.
Although the program was stable over short time intervals, it frequently
crashed while running unsupervised. Furthermore, I would often
return to my computer to see that the memory usage had ballooned
to many .
Typical memory usage is around a gigabyte, with roughly half of that being
accounted for by memory allocated inside Go — as reported by
ReadMemStats()
.
After a few attempts at tracing memory leaks
in Cgo proved mostly fruitless, I decided to try separating the program into two processes.
What is HLS (HTTP Live Streaming)?
Briefly, HLS is a popular format for streaming content. A piece of content, be it live or on-demand, is represented as a series of short (~10 second) media files contained in a playlist. A live playlist will be repeatedly fetched by a player to discover new media segments.
Architecture
- Parent process, directly invoked from the command line to handle the majority of the tasks:
- Downloading the playlist to check for new segments. Every Target Duration, download the playlist and look for new segments.
- Fetching segments
- Image pattern recognition Is this an image of color bars or a test pattern? Is it a black screen? Is the image moving?
- Maintenance of a state machine Has the image been mostly moving for the last 30 seconds? If so, begin storing images to post.
- Posting tweets.
- Child process, invoked by the parent process.
- An rpc service that, when provided with a raw segment (a bunch of MPEG bytes), returns a slice of frames (
[]image.Image
) to the caller. - The child process is completely stateless — there is no dependency on previous rpc calls; a freshly restarted instance of the child process is ready to serve requests.
- An rpc service that, when provided with a raw segment (a bunch of MPEG bytes), returns a slice of frames (
Implementation
Some of the code below has been edited for clarity.
Starting the child process
Spawning the child process is handled by spawnChild
within internal/worker/parent.go
.
First, the parent process listens on a Unix domain socket
and retrieve an os.File
struct corresponding to this socket. This struct contains the file descriptor of the socket.
It may be surprising to see an empty UnixAddr
passed into ListenUnix
instead of a path to a file.
This is a Linuxism that allows us to use a Unix socket on a read-only
file system.
ul, err := net.ListenUnix("unix",
&net.UnixAddr{})
if err != nil {
return err
}
f, err := ul.File()
if err != nil {
return err
}
A special flag is appended to a slice of arguments to the child process.
The go1.18+ fuzzing system is very similar to our approach.
We prepare the child process for execution and add our socket to ExtraFiles
slice on the exec.Cmd
struct:
args := append([]string{}, os.Args[1:]...)
args = append(args, "-worker")
cmd := exec.CommandContext(ctx, os.Args[0], args...)
cmd.ExtraFiles = append(cmd.ExtraFiles, f)
Passing the listening socket’s FD via ExtraFiles
allows the child process to Accept()
connections.
While it is possible for the child process to call ListenUnix
directly & avoid passing ExtraFiles
, the parent’s DialUnix
call may occur
before the child process has begun listening.
The parent process next dials two connections and creates an net/rpc client. One handles rpc request/responses, and the second passes newly opened file descriptors to the child process. See Why two client connections? below.
connRPC, err := net.DialUnix("unix", nil,
ul.Addr().(*net.UnixAddr))
if err != nil {
return err
}
p.conn = conn
connFD, err := net.DialUnix("unix", nil,
ul.Addr().(*net.UnixAddr))
if err != nil {
return err
}
client := rpc.NewClient(connRPC)
Child startup
The child process next translates the file descriptor passed via ExtraFiles
back into a Listener
.
The first 3 file descriptors are reversed for standard i/o , so ExtraFiles[0]
corresponds to an FD of 3.
Simplified from internal/worker/child.go
:
f := os.NewFile(3, "unix")
if f == nil {
return fmt.Errorf("nil for fd %d", fd)
}
listener, err := net.FileListener(f)
if err != nil {
return fmt.Errorf("net.FileListener: %w", err)
}
The Accept
loop for the child process is contained in runWorker
(internal/worker/child.go
).
server := rpc.NewServer()
// pointer to a struct holding the goav code
server.Register(segApi)
conn, err := listener.Accept()
if err != nil {
return errors.Wrap(err, "listener.Accept")
}
server.ServeConn(conn)
The child has now attached an RPC server to the Unix socket connection from the parent.
segApi
’s methods may now be invoked from the parent process.
Specifically HandleSegment
Making calls to the child process
Again, the net/rpc documentation may be helpful. The parent process is able to make calls to child as such:
return client.Call("GoAV.HandleSegment", request, resp)
request
and response
are pointers to:
type Request struct {
FD uintptr
}
type Response struct {
RawImages []image.Image
}
Why are you passing file descriptors between processes?
Request
contains an integer file descriptor for each segment.
I explored passing the segment to goav
in a number of ways.
- A path to a temporary file.
- This had the disadvantage of disk i/o, and could leave files around when the program crashed.
goav
would not wait for the rest of the data to download once the end of a partially downloaded file was reached.
- A path to a temporary FIFO.
- This allows forward progress on a partially downloaded file.
- Same problems as temporary files and a bit complex.
- Passing http
Body()
- an instance of theio.ReadCloser
interface.- It wasn’t obvious to me how to call
goav
on data already in memory. - This does not work across process boundaries.
- It wasn’t obvious to me how to call
- The serialized byte slice of the entire segment (~10 mb for our stream).
goav
cannot begin decoding the segment until it has been fully read from the http response.- It wasn’t obvious to me how to call
goav
on data already in memory. - When moving to process separation, this added additional copies
- Copy from
Body
to a[]byte
- Serialize each
Request
usingencoding/gob
. - Unserialize the
Request
usingencoding/gob
.
- Copy from
Instead of any of these approaches, I ended up passing segments’ file descriptors to the child process
in along with the RPC call. Previous code examples were simplified to hide this complexity.
The implementation of sending and receiving file descriptors can be found in
pkg/unixmsg/send_fd.go
.
func SendFd(conn *net.UnixConn, fd uintptr) error {
rights := syscall.UnixRights(int(fd))
dummy := []byte("x")
n, oobn, err := conn.WriteMsgUnix(dummy, rights, nil)
if err != nil {
return fmt.Errorf("err %v", err)
}
if n != len(dummy) {
return fmt.Errorf("short write %v", conn)
}
if oobn != len(rights) {
return fmt.Errorf("short oob write %v", conn)
}
return nil
}
func RecvFd(conn *net.UnixConn) (uintptr, error) {
buf := make([]byte, 32)
oob := make([]byte, 32)
_, oobn, _, _, err := conn.ReadMsgUnix(buf, oob)
if err != nil {
return 0, err
}
scms, err := syscall.ParseSocketControlMessage(oob[:oobn])
if err != nil {
return 0, err
}
if len(scms) != 1 {
return 0, fmt.Errorf("count not 1: %v", len(scms))
}
scm := scms[0]
fds, err := syscall.ParseUnixRights(&scm)
if err != nil {
return 0, err
}
if len(fds) != 1 {
return 0, fmt.Errorf("fd count not 1: %v", len(fds))
}
return uintptr(fds[0]), nil
}
Under the hood, this is calling the I_SENDFD
ioctl
on one of the Unix socket connections the parent process stood up earlier.
Because file descriptors are scoped to a process, the receiving process must read a structure
out of the connection to determine the integer value of the file descriptor it has been passed.
The values passed from the parent will not be the same as the values received in the child,
despite corresponding to the same resource.
The FD passed to the child process corresponds to a
PipeReader
returned from io.Pipe().
The HTTP body is streamed to the PipeWriter
as it downloads; enabling the goav
calls to begin decoding without
waiting for the entire response to be read into memory.
Simplified version of handling a segment body from
internal/stream/seg_consumer.go
:
resp, err := s.httpGet(ctx, url)
if err != nil {
return errors.Wrap(err, "httpGet")
}
defer resp.Body.Close()
r, w, err := os.Pipe()
if err != nil {
return errors.Wrap(err, "os.Pipe")
}
defer r.Close()
defer w.Close()
go func() {
if _, err := io.Copy(w, resp.Body); err != nil {
log.WithError(err).Warn("io.Copy")
}
w.Close()
}()
request := &segment.Request{FD: r.Fd()}
return s.ProcessSegment(ctx, request)
In the child process, the ffmpeg calls are passed a filename that corresponds to an open file descriptor returned by RecvFd
.
file := fmt.Sprintf("/proc/self/fd/%d", fd) // This is a Linuxism
pFormatContext := avformat.AvformatAllocContext()
avformat.AvformatOpenInput(&pFormatContext, file, nil, nil)
Why two client connections?
Socket control messages are considered out-of-band (OOB) data and are read into
a separate slice by ReadMsgUnix
.
Attempting to read available OOB data will always discard at least
1 byte of in-band data.
This dropped byte would cause problem for the the rpc server we attached to the parent to child connection, so we ended up
using two connections.
It may be possible to make an abstraction on top of UnixConn
that allows multiplexing both messages on a single connection.
But for a project this frivolous, this is good enough for now.
See proposal: net: add ability to read OOB data without discarding a byte for more detail.
Perhaps removing net/rpc
entirely and returning individual frames immediately after decoding would be a cleaner solution?
Conclusion
This code has some warts resulting from its origins as an ANSI HLS player. Nevertheless, I’m pleased that this is now able to run fairly stable without constant care & feeding. I plan on moving this into my local Kubernetes cluster & expect plenty of new problems from the limited resources.
- Source:
WIZARDISHUNGRY/hls-await
- Twitter: KCTV_bot