用Python的Django框架完成视频处理任务的教程
我过去曾经参与过一个项目,客户需要视频转码功能,这实在不是个容易达成的需求。需要大量的读取每一个视频、音讯与视频容器的格式再输出符合网页使用与喜好的视频格式。
考虑到这一点,我们决定将转码的工作交给 Encoding.com 。这个网站可以免费让你编码1GB大小的视频,超过1GB容量的文件将采取分级计价收费。
开发的代码如下,我上传了一个178KB容量的两秒视频来测试代码是否成功运作。当测试过程没有发生任何的例外错误后,我继续测试其它更大的外部文件。
阶段一:用户上传视频文件
现在这的新的代码段提供了一个基于 HTML5且可以快速上手的 的上传机制。用CoffeeScript撰写的代码,可以从客户端上传文件到服务器端。
$scope.upload_slide = (upload_slide_form) -> file = document.getElementById("slide_file").files[0] reader = new FileReader() reader.readAsDataURL file reader.onload = (event) -> result = event.target.result fileName = document.getElementById("slide_file").files[0].name $.post "/world/upload_slide", data: result name: fileName room_id: $scope.room.id (response_data) -> if response_data.success? is not yes console.error "There was an error uploading the file", response_data else console.log "Upload successful", response_data reader.onloadstart = -> console.log "onloadstart" reader.onprogress = (event) -> console.log "onprogress", event.total, event.loaded, (event.loaded / event.total) * 100 reader.onabort = -> console.log "onabort" reader.onerror = -> console.log "onerror" reader.onloadend = (event) -> console.log "onloadend", event
最好可以通过 (“slide_file”).files 且经由独立的POST上传每个文件,而不是由一个POST需求上传所有文件。稍后我们会解释这点。
阶段二:验证并上传至 Amazon S3
后端我们运行了Django与RabbitMQ。主要的模块如下:
$ pip install 'Django>=1.5.2' 'django-celery>=3.0.21' \ 'django-storages>=1.1.8' 'lxml>=3.2.3' 'python-magic>=0.4.3' 我建立了两个模块:SlideUploadQueue 用来储存每一次上传的数据,SlideVideoMedia 则是用来储存每个要上传影片的数据。 class SlideUploadQueue(models.Model): created_by = models.ForeignKey(User) created_time = models.DateTimeField(db_index=True) original_file = models.FileField( upload_to=filename_sanitiser, blank=True, default='') media_type = models.ForeignKey(MediaType) encoding_com_tracking_code = models.CharField( default='', max_length=24, blank=True) STATUS_AWAITING_DATA = 0 STATUS_AWAITING_PROCESSING = 1 STATUS_PROCESSING = 2 STATUS_AWAITING_3RD_PARTY_PROCESSING = 5 STATUS_FINISHED = 3 STATUS_FAILED = 4 STATUS_LIST = ( (STATUS_AWAITING_DATA, 'Awaiting Data'), (STATUS_AWAITING_PROCESSING, 'Awaiting processing'), (STATUS_PROCESSING, 'Processing'), (STATUS_AWAITING_3RD_PARTY_PROCESSING, 'Awaiting 3rd-party processing'), (STATUS_FINISHED, 'Finished'), (STATUS_FAILED, 'Failed'), ) status = models.PositiveSmallIntegerField( default=STATUS_AWAITING_DATA, choices=STATUS_LIST) class Meta: verbose_name = 'Slide' verbose_name_plural = 'Slide upload queue' def save(self, *args, **kwargs): if not self.created_time: self.created_time = \ datetime.utcnow().replace(tzinfo=pytz.utc) return super(SlideUploadQueue, self).save(*args, **kwargs) def __unicode__(self): if self.id is None: return 'new' return ' %d' % self.id class SlideVideoMedia(models.Model): converted_file = models.FileField( upload_to=filename_sanitiser, blank=True, default='') FORMAT_MP4 = 0 FORMAT_WEBM = 1 FORMAT_OGG = 2 FORMAT_FL9 = 3 FORMAT_THUMB = 4 supported_formats = ( (FORMAT_MP4, 'MPEG 4'), (FORMAT_WEBM, 'WebM'), (FORMAT_OGG, 'OGG'), (FORMAT_FL9, 'Flash 9 Video'), (FORMAT_THUMB, 'Thumbnail'), ) mime_types = ( (FORMAT_MP4, 'video/mp4'), (FORMAT_WEBM, 'video/webm'), (FORMAT_OGG, 'video/ogg'), (FORMAT_FL9, 'video/mp4'), (FORMAT_THUMB, 'image/jpeg'), ) format = models.PositiveSmallIntegerField( default=FORMAT_MP4, choices=supported_formats) class Meta: verbose_name = 'Slide video' verbose_name_plural = 'Slide videos' def __unicode__(self): if self.id is None: return 'new ' return ' %d' % self.id
我们的模块皆使用 filename_sanitiser。FileField 自动的将文件名调整成
def filename_sanitiser(instance, filename): folder = instance.__class__.__name__.lower() ext = 'jpg' if '.' in filename: t_ext = filename.split('.')[-1].strip().lower() if t_ext != '': ext = t_ext return '%s/%s.%s' % (folder, str(uuid.uuid4()), ext)
拿来测试的文件 testing.mov 将会转换成以下网址:https://our-bucket.s3.amazonaws.com/slideuploadqueue/3fe27193-e87f-4244-9aa2-66409f70ebd3.mov 并经由Django Storages 模块上传。
我们通过 Magic 验证从使用者端浏览器上传的文件。Magic可以从文件内容侦测是何种类型的文件。
@verify_auth_token @return_json def upload_slide(request): file_data = request.POST.get('data', '') file_data = base64.b64decode(file_data.split(';base64,')[1]) description = magic.from_buffer(file_data)
如果文件类型符合MPEG v4 系统或是Apple QuickTime 电影,我们就知道该文件转码不会有太大问题。如果格式不是上述所提的几种,我们会标志给用户知悉。
接着,我们将通过SlideUploadQueue 模块将视频储存到队列并发送一个需求给 RabbitMQ。因为我们使用了Django Storages 模块,文件将自动被上传到 Amazon S3。
slide_upload = SlideUploadQueue() ... slide_upload.status = SlideUploadQueue.STATUS_AWAITING_PROCESSING slide_upload.save() slide_upload.original_file.\ save('anything.%s' % file_ext, ContentFile(file_data)) slide_upload.save() task = ConvertRawSlideToSlide() task.delay(slide_upload)
阶段3:发送视频到第三方.
RabbitMQ 将控管 task.delay(slide_upload) 的呼叫。
我们现在只需要发送视频档网址与输出格式给Encoding.com。该网站会回复我们一个工作码让我们检查视频转码的进度。
class ConvertRawSlideToSlide(Task): queue = 'backend_convert_raw_slides' ... def _handle_video(self, slide_upload): mp4 = { 'output': 'mp4', 'size': '320x240', 'bitrate': '256k', 'audio_bitrate': '64k', 'audio_channels_number': '2', 'keep_aspect_ratio': 'yes', 'video_codec': 'mpeg4', 'profile': 'main', 'vcodecparameters': 'no', 'audio_codec': 'libfaac', 'two_pass': 'no', 'cbr': 'no', 'deinterlacing': 'no', 'keyframe': '300', 'audio_volume': '100', 'file_extension': 'mp4', 'hint': 'no', } webm = { 'output': 'webm', 'size': '320x240', 'bitrate': '256k', 'audio_bitrate': '64k', 'audio_sample_rate': '44100', 'audio_channels_number': '2', 'keep_aspect_ratio': 'yes', 'video_codec': 'libvpx', 'profile': 'baseline', 'vcodecparameters': 'no', 'audio_codec': 'libvorbis', 'two_pass': 'no', 'cbr': 'no', 'deinterlacing': 'no', 'keyframe': '300', 'audio_volume': '100', 'preset': '6', 'file_extension': 'webm', 'acbr': 'no', } ogg = { 'output': 'ogg', 'size': '320x240', 'bitrate': '256k', 'audio_bitrate': '64k', 'audio_sample_rate': '44100', 'audio_channels_number': '2', 'keep_aspect_ratio': 'yes', 'video_codec': 'libtheora', 'profile': 'baseline', 'vcodecparameters': 'no', 'audio_codec': 'libvorbis', 'two_pass': 'no', 'cbr': 'no', 'deinterlacing': 'no', 'keyframe': '300', 'audio_volume': '100', 'file_extension': 'ogg', 'acbr': 'no', } flv = { 'output': 'fl9', 'size': '320x240', 'bitrate': '256k', 'audio_bitrate': '64k', 'audio_channels_number': '2', 'keep_aspect_ratio': 'yes', 'video_codec': 'libx264', 'profile': 'high', 'vcodecparameters': 'no', 'audio_codec': 'libfaac', 'two_pass': 'no', 'cbr': 'no', 'deinterlacing': 'no', 'keyframe': '300', 'audio_volume': '100', 'file_extension': 'mp4', } thumbnail = { 'output': 'thumbnail', 'time': '5', 'video_codec': 'mjpeg', 'keep_aspect_ratio': 'yes', 'file_extension': 'jpg', } encoder = Encoding(settings.ENCODING_API_USER_ID, settings.ENCODING_API_USER_KEY) resp = encoder.add_media(source=[slide_upload.original_file.url], formats=[mp4, webm, ogg, flv, thumbnail]) media_id = None if resp is not None and resp.get('response') is not None: media_id = resp.get('response').get('MediaID') if media_id is None: slide_upload.status = SlideUploadQueue.STATUS_FAILED slide_upload.save() log.error('Unable to communicate with encoding.com') return False slide_upload.encoding_com_tracking_code = media_id slide_upload.status = \ SlideUploadQueue.STATUS_AWAITING_3RD_PARTY_PROCESSING slide_upload.save() return True
Encoding.com 推荐一些堪用的Python程序,可用来与它们的服务沟通。我修改了模块一些地方,但还需要修改一些功能才能达到我满意的状态。以下是修改过后目前正在使用的程序代码:
import httplib from lxml import etree import urllib from xml.parsers.expat import ExpatError import xmltodict ENCODING_API_URL = 'manage.encoding.com:80' class Encoding(object): def __init__(self, userid, userkey, url=ENCODING_API_URL): self.url = url self.userid = userid self.userkey = userkey def get_media_info(self, action='GetMediaInfo', ids=[], headers={'Content-Type': 'application/x-www-form-urlencoded'}): query = etree.Element('query') nodes = { 'userid': self.userid, 'userkey': self.userkey, 'action': action, 'mediaid': ','.join(ids), } query = self._build_tree(etree.Element('query'), nodes) results = self._execute_request(query, headers) return self._parse_results(results) def get_status(self, action='GetStatus', ids=[], extended='no', headers={'Content-Type': 'application/x-www-form-urlencoded'}): query = etree.Element('query') nodes = { 'userid': self.userid, 'userkey': self.userkey, 'action': action, 'extended': extended, 'mediaid': ','.join(ids), } query = self._build_tree(etree.Element('query'), nodes) results = self._execute_request(query, headers) return self._parse_results(results) def add_media(self, action='AddMedia', source=[], notify='', formats=[], instant='no', headers={'Content-Type': 'application/x-www-form-urlencoded'}): query = etree.Element('query') nodes = { 'userid': self.userid, 'userkey': self.userkey, 'action': action, 'source': source, 'notify': notify, 'instant': instant, } query = self._build_tree(etree.Element('query'), nodes) for format in formats: format_node = self._build_tree(etree.Element('format'), format) query.append(format_node) results = self._execute_request(query, headers) return self._parse_results(results) def _build_tree(self, node, data): for k, v in data.items(): if isinstance(v, list): for item in v: element = etree.Element(k) element.text = item node.append(element) else: element = etree.Element(k) element.text = v node.append(element) return node def _execute_request(self, xml, headers, path='', method='POST'): params = urllib.urlencode({'xml': etree.tostring(xml)}) conn = httplib.HTTPConnection(self.url) conn.request(method, path, params, headers) response = conn.getresponse() data = response.read() conn.close() return data def _parse_results(self, results): try: return xmltodict.parse(results) except ExpatError, e: print 'Error parsing encoding.com response' print e return None
其他待完成事项包括通过HTTPS-only (加密联机) 使用Encoding.com 严谨的SSL验证,还有一些单元测试。
阶段4:下载所有新的视频档格式
我们有个定期执行的程序,通过RabbitMQ每15秒检查视频转码的进度:
class CheckUpOnThirdParties(PeriodicTask): run_every = timedelta(seconds=settings.THIRD_PARTY_CHECK_UP_INTERVAL) ... def _handle_encoding_com(self, slides): format_lookup = { 'mp4': SlideVideoMedia.FORMAT_MP4, 'webm': SlideVideoMedia.FORMAT_WEBM, 'ogg': SlideVideoMedia.FORMAT_OGG, 'fl9': SlideVideoMedia.FORMAT_FL9, 'thumbnail': SlideVideoMedia.FORMAT_THUMB, } encoder = Encoding(settings.ENCODING_API_USER_ID, settings.ENCODING_API_USER_KEY) job_ids = [item.encoding_com_tracking_code for item in slides] resp = encoder.get_status(ids=job_ids) if resp is None: log.error('Unable to check up on encoding.com') return False
检查Encoding.com的响应来验证每个部分是否正确以利我们继续下去。
if resp.get('response') is None: log.error('Unable to get response node from encoding.com') return False resp_id = resp.get('response').get('id') if resp_id is None: log.error('Unable to get media id from encoding.com') return False slide = SlideUploadQueue.objects.filter( status=SlideUploadQueue.STATUS_AWAITING_3RD_PARTY_PROCESSING, encoding_com_tracking_code=resp_id) if len(slide) != 1: log.error('Unable to find a single record for %s' % resp_id) return False resp_status = resp.get('response').get('status') if resp_status is None: log.error('Unable to get status from encoding.com') return False if resp_status != u'Finished': log.debug("%s isn't finished, will check back later" % resp_id) return True formats = resp.get('response').get('format') if formats is None: log.error("No output formats were found. Something's wrong.") return False for format in formats: try: assert format.get('status') == u'Finished', \ "%s is not finished. Something's wrong." % format.get('id') output = format.get('output') assert output in ('mp4', 'webm', 'ogg', 'fl9', 'thumbnail'), 'Unknown output format %s' % output s3_dest = format.get('s3_destination') assert 'http://encoding.com.result.s3.amazonaws.com/'\ in s3_dest, 'Suspicious S3 url: %s' % s3_dest https_link = \ 'https://s3.amazonaws.com/encoding.com.result/%s' %\ s3_dest.split('/')[-1] file_ext = https_link.split('.')[-1].strip() assert len(file_ext) > 0,\ 'Unable to get file extension from %s' % https_link count = SlideVideoMedia.objects.filter(slide_upload=slide, format=format_lookup[output]).count() if count != 0: print 'There is already a %s file for this slide' % output continue content = self.download_content(https_link) assert content is not None,\ 'There is no content for %s' % format.get('id') except AssertionError, e: log.error('A format did not pass all assertions: %s' % e) continue
到这里我们已确认所有事项皆正常,所以我们可以储存所有的视频档了:
media = SlideVideoMedia() media.format = format_lookup[output] media.converted_file.save('blah.%s' % file_ext, ContentFile(content)) media.save()
阶段5:经由HTML5播放视频档
在我们的前端网页已经新增了一个有HTML5的影像单元的网页。并采用对每个浏览器都有最佳支持的video.js来显示视频。
? bower install video.js bower caching git://github.com/videojs/video.js-component.git bower cloning git://github.com/videojs/video.js-component.git bower fetching video.js bower checking out video.js#v4.0.3 bower copying /home/mark/.bower/cache/video.js/5ab058cd60c5615aa38e8e706cd0f307 bower installing video.js#4.0.3
在我们的首页有包含其他相依的文件:
!!! 5 html(lang="en", class="no-js") head meta(http-equiv='Content-Type', content='text/html; charset=UTF-8') ... link(rel='stylesheet', type='text/css', href='/components/video-js-4.1.0/video-js.css') script(type='text/javascript', src='/components/video-js-4.1.0/video.js')
在Angular.js/JADE-based 框架下的模块,我们引入
#main.span12 video#example_video_1.video-js.vjs-default-skin(controls, preload="auto", width="640", height="264", poster="{{video_thumbnail}}", data-setup='{"example_option":true}', ng-show="videos") source(ng-repeat="video in videos", src="{{video.src}}", type="{{video.type}}")
还会显示出我们转换的每个视频文件格式,并使用在
我们仍然有许多工作需要完成,建立单元测试与加强和Encoding.com服务沟通的程序。如果你对这些工作感兴趣请与我连络。
推荐阅读
-
详尽讲述用Python的Django框架测试驱动开发的教程
-
用Python的Django框架完成视频处理任务的教程
-
用Python的Django框架完成视频处理任务的教程
-
用Python的Django框架完成视频处理任务的教程
-
详尽讲述用Python的Django框架测试驱动开发的教程
-
Python 初学者想通过 Django 框架写一个博客,一个月内完成任务,大致的学习路线怎么安排?
-
Python 初学者想通过 Django 框架写一个博客,一个月内完成任务,大致的学习路线怎么安排?
-
详尽讲述用Python的Django框架测试驱动开发的教程
-
详尽讲述用Python的Django框架测试驱动开发的教程