Skip to content

Commit

Permalink
channels/tsmf: remove usage of old thread utils
Browse files Browse the repository at this point in the history
  • Loading branch information
awakecoding committed Mar 21, 2013
1 parent 16ba581 commit 4d240b6
Showing 1 changed file with 61 additions and 43 deletions.
104 changes: 61 additions & 43 deletions channels/tsmf/client/tsmf_media.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,15 @@
#endif

#include <winpr/crt.h>
#include <winpr/synch.h>
#include <winpr/thread.h>
#include <winpr/collections.h>

#include <freerdp/utils/stream.h>
#include <freerdp/utils/list.h>
#include <freerdp/utils/thread.h>
#include <freerdp/utils/event.h>
#include <freerdp/client/tsmf.h>

#include <winpr/crt.h>
#include <winpr/synch.h>

#include "tsmf_constants.h"
#include "tsmf_types.h"
#include "tsmf_decoder.h"
Expand Down Expand Up @@ -83,8 +82,8 @@ struct _TSMF_PRESENTATION
UINT64 audio_start_time;
UINT64 audio_end_time;

/* The stream list could be accessed by different threads and need to be protected. */
HANDLE mutex;
HANDLE thread;

LIST* stream_list;
};
Expand Down Expand Up @@ -112,12 +111,13 @@ struct _TSMF_STREAM
/* Next sample should not start before this system time. */
UINT64 next_start_time;

freerdp_thread* thread;
BOOL started;

LIST* sample_list;
HANDLE thread;
HANDLE stopEvent;

/* The sample ack response queue will be accessed only by the stream thread. */
LIST* sample_ack_list;
wQueue* sample_list;
wQueue* sample_ack_list;
};

struct _TSMF_SAMPLE
Expand Down Expand Up @@ -158,7 +158,7 @@ static TSMF_SAMPLE* tsmf_stream_pop_sample(TSMF_STREAM* stream, int sync)
BOOL pending = FALSE;
TSMF_PRESENTATION* presentation = stream->presentation;

if (list_size(stream->sample_list) == 0)
if (Queue_Count(stream->sample_list) < 1)
return NULL;

if (sync)
Expand Down Expand Up @@ -204,11 +204,9 @@ static TSMF_SAMPLE* tsmf_stream_pop_sample(TSMF_STREAM* stream, int sync)
if (pending)
return NULL;

freerdp_thread_lock(stream->thread);
sample = (TSMF_SAMPLE*) list_dequeue(stream->sample_list);
freerdp_thread_unlock(stream->thread);
sample = (TSMF_SAMPLE*) Queue_Dequeue(stream->sample_list);

if (sample && sample->end_time > stream->last_end_time)
if (sample && (sample->end_time > stream->last_end_time))
stream->last_end_time = sample->end_time;

return sample;
Expand All @@ -218,6 +216,7 @@ static void tsmf_sample_free(TSMF_SAMPLE* sample)
{
if (sample->data)
free(sample->data);

free(sample);
}

Expand All @@ -230,7 +229,7 @@ static void tsmf_sample_queue_ack(TSMF_SAMPLE* sample)
{
TSMF_STREAM* stream = sample->stream;

list_enqueue(stream->sample_ack_list, sample);
Queue_Enqueue(stream->sample_ack_list, sample);
}

static void tsmf_stream_process_ack(TSMF_STREAM* stream)
Expand All @@ -239,13 +238,16 @@ static void tsmf_stream_process_ack(TSMF_STREAM* stream)
UINT64 ack_time;

ack_time = get_current_time();
while (list_size(stream->sample_ack_list) > 0 && !freerdp_thread_is_stopped(stream->thread))

while ((Queue_Count(stream->sample_ack_list) > 0) && !(WaitForSingleObject(stream->stopEvent, 0) == WAIT_OBJECT_0))
{
sample = (TSMF_SAMPLE*) list_peek(stream->sample_ack_list);
if (!sample || sample->ack_time > ack_time)
sample = (TSMF_SAMPLE*) Queue_Peek(stream->sample_ack_list);

if (!sample || (sample->ack_time > ack_time))
break;

sample = list_dequeue(stream->sample_ack_list);
sample = Queue_Dequeue(stream->sample_ack_list);

tsmf_sample_ack(sample);
tsmf_sample_free(sample);
}
Expand Down Expand Up @@ -670,7 +672,8 @@ static void* tsmf_stream_playback_func(void* arg)
}
}
}
while (!freerdp_thread_is_stopped(stream->thread))

while (!(WaitForSingleObject(stream->stopEvent, 0) == WAIT_OBJECT_0))
{
tsmf_stream_process_ack(stream);
sample = tsmf_stream_pop_sample(stream, 1);
Expand All @@ -680,18 +683,20 @@ static void* tsmf_stream_playback_func(void* arg)
else
USleep(5000);
}

if (stream->eos || presentation->eos)
{
while ((sample = tsmf_stream_pop_sample(stream, 1)) != NULL)
tsmf_sample_playback(sample);
}

if (stream->audio)
{
stream->audio->Free(stream->audio);
stream->audio = NULL;
}

freerdp_thread_quit(stream->thread);
SetEvent(stream->stopEvent);

DEBUG_DVC("out %d", stream->stream_id);

Expand All @@ -700,9 +705,10 @@ static void* tsmf_stream_playback_func(void* arg)

static void tsmf_stream_start(TSMF_STREAM* stream)
{
if (!freerdp_thread_is_running(stream->thread))
if (!stream->started)
{
freerdp_thread_start(stream->thread, tsmf_stream_playback_func, stream);
ResumeThread(stream->thread);
stream->started = TRUE;
}
}

Expand All @@ -714,10 +720,12 @@ static void tsmf_stream_stop(TSMF_STREAM* stream)
if (!stream->decoder)
return;

if (freerdp_thread_is_running(stream->thread))
if (stream->started)
{
freerdp_thread_stop(stream->thread);
SetEvent(stream->stopEvent);
stream->started = FALSE;
}

if (stream->decoder->Control)
{
stream->decoder->Control(stream->decoder, Control_Flush, NULL);
Expand Down Expand Up @@ -830,30 +838,35 @@ void tsmf_presentation_stop(TSMF_PRESENTATION* presentation)
}

tsmf_presentation_restore_last_video_frame(presentation);

if (presentation->last_rects)
{
free(presentation->last_rects);
presentation->last_rects = NULL;
}

presentation->last_num_rects = 0;

if (presentation->output_rects)
{
free(presentation->output_rects);
presentation->output_rects = NULL;
}

presentation->output_num_rects = 0;
}

void tsmf_presentation_set_geometry_info(TSMF_PRESENTATION* presentation,
UINT32 x, UINT32 y, UINT32 width, UINT32 height,
int num_rects, RDP_RECT* rects)
UINT32 x, UINT32 y, UINT32 width, UINT32 height, int num_rects, RDP_RECT* rects)
{
presentation->output_x = x;
presentation->output_y = y;
presentation->output_width = width;
presentation->output_height = height;

if (presentation->output_rects)
free(presentation->output_rects);

presentation->output_rects = rects;
presentation->output_num_rects = num_rects;
}
Expand All @@ -868,18 +881,15 @@ static void tsmf_stream_flush(TSMF_STREAM* stream)
{
TSMF_SAMPLE* sample;

while ((sample = tsmf_stream_pop_sample(stream, 0)) != NULL)
tsmf_sample_free(sample);

while ((sample = list_dequeue(stream->sample_ack_list)) != NULL)
tsmf_sample_free(sample);
/* TODO: free lists */

if (stream->audio)
stream->audio->Flush(stream->audio);

stream->eos = 0;
stream->last_end_time = 0;
stream->next_start_time = 0;

if (stream->major_type == TSMF_MAJOR_TYPE_AUDIO)
{
stream->presentation->audio_start_time = 0;
Expand Down Expand Up @@ -929,6 +939,7 @@ TSMF_STREAM* tsmf_stream_new(TSMF_PRESENTATION* presentation, UINT32 stream_id)
TSMF_STREAM* stream;

stream = tsmf_stream_find_by_id(presentation, stream_id);

if (stream)
{
DEBUG_WARN("duplicated stream id %d!", stream_id);
Expand All @@ -940,9 +951,14 @@ TSMF_STREAM* tsmf_stream_new(TSMF_PRESENTATION* presentation, UINT32 stream_id)

stream->stream_id = stream_id;
stream->presentation = presentation;
stream->thread = freerdp_thread_new();
stream->sample_list = list_new();
stream->sample_ack_list = list_new();

stream->started = FALSE;

stream->stopEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
stream->thread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE) tsmf_stream_playback_func, stream, CREATE_SUSPENDED, NULL);

stream->sample_list = Queue_New(TRUE, -1, -1);
stream->sample_ack_list = Queue_New(TRUE, -1, -1);

WaitForSingleObject(presentation->mutex, INFINITE);
list_enqueue(presentation->stream_list, stream);
Expand All @@ -959,9 +975,11 @@ TSMF_STREAM* tsmf_stream_find_by_id(TSMF_PRESENTATION* presentation, UINT32 stre
for (item = presentation->stream_list->head; item; item = item->next)
{
stream = (TSMF_STREAM*) item->data;

if (stream->stream_id == stream_id)
return stream;
}

return NULL;
}

Expand All @@ -981,17 +999,19 @@ void tsmf_stream_set_format(TSMF_STREAM* stream, const char* name, STREAM* s)
{
DEBUG_DVC("video width %d height %d bit_rate %d frame_rate %f codec_data %d",
mediatype.Width, mediatype.Height, mediatype.BitRate,
(double)mediatype.SamplesPerSecond.Numerator / (double)mediatype.SamplesPerSecond.Denominator,
(double) mediatype.SamplesPerSecond.Numerator / (double) mediatype.SamplesPerSecond.Denominator,
mediatype.ExtraDataSize);
}
else if (mediatype.MajorType == TSMF_MAJOR_TYPE_AUDIO)
{
DEBUG_DVC("audio channel %d sample_rate %d bits_per_sample %d codec_data %d",
mediatype.Channels, mediatype.SamplesPerSecond.Numerator, mediatype.BitsPerSample,
mediatype.ExtraDataSize);

stream->sample_rate = mediatype.SamplesPerSecond.Numerator;
stream->channels = mediatype.Channels;
stream->bits_per_sample = mediatype.BitsPerSample;

if (stream->bits_per_sample == 0)
stream->bits_per_sample = 16;
}
Expand Down Expand Up @@ -1019,16 +1039,16 @@ void tsmf_stream_free(TSMF_STREAM* stream)
list_remove(presentation->stream_list, stream);
ReleaseMutex(presentation->mutex);

list_free(stream->sample_list);
list_free(stream->sample_ack_list);
Queue_Free(stream->sample_list);
Queue_Free(stream->sample_ack_list);

if (stream->decoder)
{
stream->decoder->Free(stream->decoder);
stream->decoder = 0;
}

freerdp_thread_free(stream->thread);
SetEvent(stream->thread);

free(stream);
stream = 0;
Expand Down Expand Up @@ -1063,11 +1083,9 @@ void tsmf_stream_push_sample(TSMF_STREAM* stream, IWTSVirtualChannelCallback* pC
sample->data_size = data_size;
sample->data = malloc(data_size + TSMF_BUFFER_PADDING_SIZE);
ZeroMemory(sample->data, data_size + TSMF_BUFFER_PADDING_SIZE);
memcpy(sample->data, data, data_size);
CopyMemory(sample->data, data, data_size);

freerdp_thread_lock(stream->thread);
list_enqueue(stream->sample_list, sample);
freerdp_thread_unlock(stream->thread);
Queue_Enqueue(stream->sample_list, sample);
}

#ifndef _WIN32
Expand Down

0 comments on commit 4d240b6

Please sign in to comment.