欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Gstreamer应用开发手册10:线程

程序员文章站 2022-04-13 15:43:41
...

线程

GStreamer本质上是多线程的,并且是完全线程安全的。大多数线程内部结构对应用程序都是隐藏的,这使开发更加容易。但是,在某些情况下,应用程序可能希望对其中的某些部分产生影响。GStreamer允许应用程序在管道的某些部分上强制使用多个线程。

GStreamer还可以在创建线程时通知您,以便您可以配置诸如线程优先级或要使用的线程池之类的东西。

在GStreamer中调度

GStreamer管道中的每个元件都决定了如何调度它。元件可以选择将其衬垫设置为基于推式还是基于拉式。例如,一个元件可以选择启动一个线程以开始从sink衬垫拉数据,或从source衬垫推数据。元件还可以选择使用上游或下游线程分别在推和拉模式下对其数据进行处理。GStreamer对元件选择计划的方式没有任何限制。有关更多详细信息,请参见《插件编写器指南》。

在任何情况下,都会发生某些元件将启动一个线程进行数据处理的情况,称为“流线程”。流式传输线程或GstTask对象是从GstTaskPool元件需要创建流式传输线程时创建的。在下一节中,我们将了解如何接收任务和线程池的通知。

在GStreamer中配置线程

在总线上有一条STREAM_STATUS消息,可以获得有关流线程的状态。您将从消息中获得以下信息:

  • 当即将创建新线程时,通过GST_STREAM_STATUS_TYPE_CREATE类型通知你。然后可以在GstTask中配置GstTaskPool。自定义任务池将为任务提供自定义线程,以实现流线程。
  • 如果要配置自定义任务池,则需要同步处理此消息。如果在此消息返回时未在任务上配置任务池,则该任务将使用其默认池。
  • 进入或离开线程。这是您可以配置线程优先级的时刻。当线程被破坏时,您还会收到通知。
  • 当线程启动,暂停和停止时,您会收到消息。这可以用来可视化gui应用程序中流线程的状态。

现在,我们将在下一部分中查看一些示例。

提高线程的优先级

.----------.    .----------.
| fakesrc  |    | fakesink |
|         src->sink        |
'----------'    '----------'

让我们看一下上面的简单管道。我们想提高流线程的优先级。fakesrc元件将启动流线程以生成伪数据,推到对等的假接收器。更改优先级的流程如下:

  • 从READY状态切换到PAUSED状态时,fakesrc将需要一个流线程来将数据推送到fakesink。它将发布一条 STREAM_STATUS消息,指示其对流线程的要求。
  • 应用程序将STREAM_STATUS使用同步总线处理程序对消息做出反应。然后它会配置一个自定义GstTaskPool的GstTask消息内。定制任务池负责创建线程。在此示例中,我们将使线程具有更高的优先级。
  • 或者,由于在线程上下文中调用了同步消息,因此您可以使用线程ENTER/ LEAVE通知来更改当前线程的优先级或调度策略。

第一步,我们需要实现一个GstTaskPool来在任务上配置。以下是使用pthread创建SCHED_RR实时线程的GstTaskPool的实现。请注意,创建实时线程可能需要额外的特权。

#include <pthread.h>

typedef struct
{
  pthread_t thread;
} TestRTId;

G_DEFINE_TYPE (TestRTPool, test_rt_pool, GST_TYPE_TASK_POOL);

static void
default_prepare (GstTaskPool * pool, GError ** error)
{
  /* we don't do anything here. We could construct a pool of threads here that
   * we could reuse later but we don't */
}

static void
default_cleanup (GstTaskPool * pool)
{
}

static gpointer
default_push (GstTaskPool * pool, GstTaskPoolFunction func, gpointer data,
    GError ** error)
{
  TestRTId *tid;
  gint res;
  pthread_attr_t attr;
  struct sched_param param;

  tid = g_slice_new0 (TestRTId);

  pthread_attr_init (&attr);
  if ((res = pthread_attr_setschedpolicy (&attr, SCHED_RR)) != 0)
    g_warning ("setschedpolicy: failure: %p", g_strerror (res));

  param.sched_priority = 50;
  if ((res = pthread_attr_setschedparam (&attr, &param)) != 0)
    g_warning ("setschedparam: failure: %p", g_strerror (res));

  if ((res = pthread_attr_setinheritsched (&attr, PTHREAD_EXPLICIT_SCHED)) != 0)
    g_warning ("setinheritsched: failure: %p", g_strerror (res));

  res = pthread_create (&tid->thread, &attr, (void *(*)(void *)) func, data);

  if (res != 0) {
    g_set_error (error, G_THREAD_ERROR, G_THREAD_ERROR_AGAIN,
        "Error creating thread: %s", g_strerror (res));
    g_slice_free (TestRTId, tid);
    tid = NULL;
  }

  return tid;
}

static void
default_join (GstTaskPool * pool, gpointer id)
{
  TestRTId *tid = (TestRTId *) id;

  pthread_join (tid->thread, NULL);

  g_slice_free (TestRTId, tid);
}

static void
test_rt_pool_class_init (TestRTPoolClass * klass)
{
  GstTaskPoolClass *gsttaskpool_class;

  gsttaskpool_class = (GstTaskPoolClass *) klass;

  gsttaskpool_class->prepare = default_prepare;
  gsttaskpool_class->cleanup = default_cleanup;
  gsttaskpool_class->push = default_push;
  gsttaskpool_class->join = default_join;
}

static void
test_rt_pool_init (TestRTPool * pool)
{
}

GstTaskPool *
test_rt_pool_new (void)
{
  GstTaskPool *pool;

  pool = g_object_new (TEST_TYPE_RT_POOL, NULL);

  return pool;
}

编写任务池时要实现的重要功能是“推送”功能。我们启动一个调用给定函数的线程,涉及更多的实现可能希望在线程池中保留一些线程,因为创建和销毁线程并不总是最快的操作。

下一步,我们需要在fakesrc需要时配置自定义任务池。为此,我们使用同步处理程序获取STREAM_STATUS消息。

static GMainLoop* loop;

static void
on_stream_status (GstBus     *bus,
                  GstMessage *message,
                  gpointer    user_data)
{
  GstStreamStatusType type;
  GstElement *owner;
  const GValue *val;
  GstTask *task = NULL;

  gst_message_parse_stream_status (message, &type, &owner);

  val = gst_message_get_stream_status_object (message);

  /* see if we know how to deal with this object */
  if (G_VALUE_TYPE (val) == GST_TYPE_TASK) {
    task = g_value_get_object (val);
  }

  switch (type) {
    case GST_STREAM_STATUS_TYPE_CREATE:
      if (task) {
        GstTaskPool *pool;

        pool = test_rt_pool_new();

        gst_task_set_pool (task, pool);
      }
      break;
    default:
      break;
  }
}

static void
on_error (GstBus     *bus,
          GstMessage *message,
          gpointer    user_data)
{
  g_message ("received ERROR");
  g_main_loop_quit (loop);
}

static void
on_eos (GstBus     *bus,
        GstMessage *message,
        gpointer    user_data)
{
  g_main_loop_quit (loop);
}

int
main (int argc, char *argv[])
{
  GstElement *bin, *fakesrc, *fakesink;
  GstBus *bus;
  GstStateChangeReturn ret;

  gst_init (&argc, &argv);

  /* create a new bin to hold the elements */
  bin = gst_pipeline_new ("pipeline");
  g_assert (bin);

  /* create a source */
  fakesrc = gst_element_factory_make ("fakesrc", "fakesrc");
  g_assert (fakesrc);
  g_object_set (fakesrc, "num-buffers", 50, NULL);

  /* and a sink */
  fakesink = gst_element_factory_make ("fakesink", "fakesink");
  g_assert (fakesink);

  /* add objects to the main pipeline */
  gst_bin_add_many (GST_BIN (bin), fakesrc, fakesink, NULL);

  /* link the elements */
  gst_element_link (fakesrc, fakesink);

  loop = g_main_loop_new (NULL, FALSE);

  /* get the bus, we need to install a sync handler */
  bus = gst_pipeline_get_bus (GST_PIPELINE (bin));
  gst_bus_enable_sync_message_emission (bus);
  gst_bus_add_signal_watch (bus);

  g_signal_connect (bus, "sync-message::stream-status",
      (GCallback) on_stream_status, NULL);
  g_signal_connect (bus, "message::error",
      (GCallback) on_error, NULL);
  g_signal_connect (bus, "message::eos",
      (GCallback) on_eos, NULL);

  /* start playing */
  ret = gst_element_set_state (bin, GST_STATE_PLAYING);
  if (ret != GST_STATE_CHANGE_SUCCESS) {
    g_message ("failed to change state");
    return -1;
  }

  /* Run event loop listening for bus messages until EOS or ERROR */
  g_main_loop_run (loop);

  /* stop the bin */
  gst_element_set_state (bin, GST_STATE_NULL);
  gst_object_unref (bus);
  g_main_loop_unref (loop);

  return 0;
}

请注意,该程序可能需要root权限才能创建实时线程。当无法创建线程时,状态更改功能将失败,这是我们在上面的应用程序中捕获的。

当管道中有多个线程时,您将收到多条STREAM_STATUS消息。您应该使用消息的所有者(可能是启动线程的衬垫或元件)来确定该线程在应用程序上下文中的功能。

什么时候强制线程?

我们已经看到线程是由元件创建的,但是将元件插入管道中时我们想要在管道中强制使用新线程。

强制使用线程有多种原因。但是,出于性能原因,您永远不希望对其中的每个元件使用一个线程,因为那样会增加一些开销。现在让我们列出一些线程可能特别有用的情况:

  • 数据缓冲,例如在处理网络流或从实时流(例如视频或音频卡)记录数据时。管道中其他地方的简短更新不会导致数据丢失。另请参阅关于使用queue2进行网络缓冲的流缓冲。

Gstreamer应用开发手册10:线程

 

  • 同步输出设备,例如在播放包含视频和音频数据的流时。通过将线程用于两个输出,它们的同步效果会更好。

Gstreamer应用开发手册10:线程

我们已经多次提到“队列”元件。队列是线程边界元件,通过它可以强制使用线程。它是通过使用经典的提供者/消费者模型来实现的,该模型是在世界各地的大学的线程课程中学习的。这样,它既可以使线程之间的数据吞吐量成为线程安全的方式,又可以充当缓冲区。队列具有几个要配置用于特定用途的 GObject属性。例如,您可以为元件设置上下阈值。如果数据少于下限阈值(默认值:禁用),它将阻止输出。如果数据多于上限,它将阻止输入或(如果配置为这样做)丢弃数据。

要使用队列(并因此在管道中强制使用两个不同的线程),可以简单地创建一个“队列”元件并将其作为管道的一部分放入。GStreamer将在内部处理所有线程细节。

相关标签: gstreamer开发手册