欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  数据库

Avro用php请求python服务的例子

程序员文章站 2022-05-02 13:16:56
...

Avro是Hadoop项目之一。主要用来支持数据密集型应用,它定义了一种数据格式并在多种编程语言中支持这种格式。我们最主要是用来操作Cassandra,其次是以RPC的方式,实现语言之间的相互调用。 架构中有很多轻量的python脚本,比如,PHP接收到一个URL,需要调用

Avro是Hadoop项目之一。主要用来支持数据密集型应用,它定义了一种数据格式并在多种编程语言中支持这种格式。我们最主要是用来操作Cassandra,其次是以RPC的方式,实现语言之间的相互调用。

架构中有很多轻量的python脚本,比如,PHP接收到一个URL,需要调用Python脚本,去取这个URL的标题,然后返回。之前是用cli从命令行直接调用,效果不是很好,经常会卡死,有时还会占用PHP的端口导致php-fpm无法启动。这种场景又不能用RabbitMQ之类的异步处理。所以用Avro实现了一个简单的调用,跑了一个多月,目前一切正常,现在分享相关代码。

原理:PHP和Python使用同一个Schema,用Python的HTTPServer做一个http服务,PHP端将数据以avro/binary的方式编码,传给Python,Python端以同样的方法解码,获得传递的数据,进行处理后返回结果给PHP端。

需要用到avro官方提供的Python模块和PHP库。用pip安装avro时,需要先安装libsnappy。

Schema

创建一个JSON文件,命名为avro-protocol.json,内容如下:

{"namespace": "com.dmyz.avro",
    "protocol": "Post",
    "types": [
        {"name": "Action", "type": "record",
            "fields": [
                {"name": "url","type": "string"},
                {"name": "charset","type": "string"}
            ]
        }
    ],
    "messages": {
        "send": {
            "request": [{"name": "message", "type": "Action"}],
            "response": "string"
        }
    }
}

以上内容定义了一个Avro的protocol,Python端根据它的定义来接受参数,生成服务;PHP端不提供服务,所以实际上只用到了定义中schema(type节点部分)。

Python

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
try:
    from cStringIO import StringIO
except ImportError:
    from StringIO import StringIO
?
import cgi
import json
from avro import io, ipc, protocol, schema
?
PROTOCOL_FILE = 'avro-protocol.json'
PROTOCOL_JSON = json.loads(open(PROTOCOL_FILE).read())
PROTOCOL = protocol.parse(open(PROTOCOL_FILE).read())
SCHEMA = schema.parse(json.dumps(PROTOCOL_JSON['types'][0]))
?
SERVER_ADDR = ('localhost', 9090)
?
class PostRequestHandler(BaseHTTPRequestHandler):
    def do_POST(self):
?
        length       = int(self.headers.getheader('content-length'))
        post_data    = self.rfile.read(length)
?
        data_reader  = StringIO(post_data)
        decoder      = io.BinaryDecoder(data_reader)
?
        datum_reader = io.DatumReader(SCHEMA)
        data         = datum_reader.read(decoder)
        url          = data['url'])
        charset      = data['charset']
        #已经取得从PHP发过来的数据,在这里执行其他逻辑
        self.wfile.write('title') #返回取到的title
?
if __name__ == '__main__':
    server = HTTPServer(SERVER_ADDR, PostRequestHandler)
    server.allow_reuse_address = True
    server.serve_forever()

PHP

?
$avro         = json_decode(file_get_contents('avro-protocol.json'), true);
?
$avroProtocol = new AvroProtocol();
$protocol     = $avroProtocol->real_parse($avro);
$schema       = AvroSchema::real_parse($avro['types'][0]);
?
$datum_writer = new AvroIODatumWriter($schema);
$write_io     = new AvroStringIO();
$encoder      = new AvroIOBinaryEncoder($write_io);
?
$message = array('url' => 'http://dmyz.org', 'charset' => 'utf-8');
$datum_writer->write($message, $encoder);
?
$content      = $write_io->string();
?
$headers = array(
    "POST / HTTP/1.1",
    "Host: " . self::$host,
    "Content-Type: avro/binary",
    "Content-Length: " . strlen($content),
);
?
$socket = stream_socket_client('localhost:9090', $errno, $errstr, 5);
?
if (!$socket)
    throw new Exception($errstr, $errno);
?
fwrite($socket, implode("\r\n", $headers) . "\r\n\r\n");
fwrite($socket, $content);
?
$result = '';
while (!feof($socket)) {
    $result .= fgets($socket);
}
fclose($socket);
?
return $result; #得到Python端返回的数据