我正在使用
Java和Protoc 3.0编译器,我的proto文件在下面提到.
https://github.com/openconfig/public/blob/master/release/models/rpc/openconfig-rpc-api.yang
https://github.com/openconfig/public/blob/master/release/models/rpc/openconfig-rpc-api.yang
Syntax = "proto3"; package Telemetry; // Interface exported by Agent service OpenConfigTelemetry { // Request an inline subscription for data at the specified path. // The device should send telemetry data back on the same // connection as the subscription request. rpc telemetrySubscribe(SubscriptionRequest) returns (stream OpenConfigData) {} // Terminates and removes an exisiting telemetry subscription rpc cancelTelemetrySubscription(CancelSubscriptionRequest) returns (CancelSubscriptionReply) {} // Get the list of current telemetry subscriptions from the // target. This command returns a list of existing subscriptions // not including those that are established via configuration. rpc getTelemetrySubscriptions(GetSubscriptionsRequest) returns (GetSubscriptionsReply) {} // Get Telemetry Agent Operational States rpc getTelemetryOperationalState(GetOperationalStateRequest) returns (GetOperationalStateReply) {} // Return the set of data encodings supported by the device for // telemetry data rpc getDataEncodings(DataEncodingRequest) returns (DataEncodingReply) {} } // Message sent for a telemetry subscription request message SubscriptionRequest { // Data associated with a telemetry subscription SubscriptionInput input = 1; // List of data models paths and filters // which are used in a telemetry operation. repeated Path path_list = 2; // The below configuration is not defined in Openconfig RPC. // It is a proposed extension to configure additional // subscription request features. SubscriptionAdditionalConfig additional_config = 3; } // Data associated with a telemetry subscription message SubscriptionInput { // List of optional collector endpoints to send data for // this subscription. // If no collector destinations are specified,the collector // destination is assumed to be the requester on the rpc channel. repeated Collector collector_list = 1; } // Collector endpoints to send data specified as an ip+port combination. message Collector { // IP address of collector endpoint string address = 1; // Transport protocol port number for the collector destination. uint32 port = 2; } // Data model path message Path { // Data model path of interest // Path specification for elements of OpenConfig data models string path = 1; // Regular expression to be used in filtering state leaves string filter = 2; // If this is set to true,the target device will only send // updates to the collector upon a change in data value bool suppress_unchanged = 3; // Maximum time in ms the target device may go without sending // a message to the collector. If this time expires with // suppress-unchanged set,the target device must send an update // message regardless if the data values have changed. uint32 max_silent_interval = 4; // Time in ms between collection and transmission of the // specified data to the collector platform. The target device // will sample the corresponding data (e.g,. a counter) and // immediately send to the collector destination. // // If sample-frequency is set to 0,then the network device // must emit an update upon every datum change. uint32 sample_frequency = 5; } // Configure subscription request additional features. message SubscriptionAdditionalConfig { // limit the number of records sent in the stream int32 limit_records = 1; // limit the time the stream remains open int32 limit_time_seconds = 2; } // Reply to inline subscription for data at the specified path is done in // two-folds. // 1. Reply data message sent out using out-of-band channel. // 2. Telemetry data send back on the same connection as the // subscription request. // 1. Reply data message sent out using out-of-band channel. message SubscriptionReply { // Response message to a telemetry subscription creation or // get request. SubscriptionResponse response = 1; // List of data models paths and filters // which are used in a telemetry operation. repeated Path path_list = 2; } // Response message to a telemetry subscription creation or get request. message SubscriptionResponse { // Unique id for the subscription on the device. This is // generated by the device and returned in a subscription // request or when listing existing subscriptions uint32 subscription_id = 1; } // 2. Telemetry data send back on the same connection as the // subscription request. message OpenConfigData { // router name:export IP address string system_id = 1; // line card / RE (slot number) uint32 component_id = 2; // PFE (if applicable) uint32 sub_component_id = 3; // Path specification for elements of OpenConfig data models string path = 4; // Sequence number,monotonically increasing for each // system_id,component_id,sub_component_id + path. uint64 sequence_number = 5; // timestamp (milliseconds since epoch) uint64 timestamp = 6; // List of key-value pairs repeated KeyValue kv = 7; } // Simple Key-value,where value could be one of scalar types message KeyValue { // Key string key = 1; // One of possible values oneof value { double double_value = 5; int64 int_value = 6; uint64 uint_value = 7; sint64 sint_value = 8; bool bool_value = 9; string str_value = 10; bytes bytes_value = 11; } } // Message sent for a telemetry subscription cancellation request message CancelSubscriptionRequest { // Subscription identifier as returned by the device when // subscription was requested uint32 subscription_id = 1; } // Reply to telemetry subscription cancellation request message CancelSubscriptionReply { // Return code ReturnCode code = 1; // Return code string string code_str = 2; }; // Result of the operation enum ReturnCode { SUCCESS = 0; NO_SUBSCRIPTION_ENTRY = 1; UNKNOWN_ERROR = 2; } // Message sent for a telemetry get request message GetSubscriptionsRequest { // Subscription identifier as returned by the device when // subscription was requested // --- or --- // 0xFFFFFFFF for all subscription identifiers uint32 subscription_id = 1; } // Reply to telemetry subscription get request message GetSubscriptionsReply { // List of current telemetry subscriptions repeated SubscriptionReply subscription_list = 1; } // Message sent for telemetry agent operational states request message GetOperationalStateRequest { // Per-subscription_id level operational state can be requested. // // Subscription identifier as returned by the device when // subscription was requested // --- or --- // 0xFFFFFFFF for all subscription identifiers including agent-level // operational stats // --- or --- // If subscription_id is not present then sent only agent-level // operational stats uint32 subscription_id = 1; // Control verbosity of the output VerbosityLevel verbosity = 2; } // Verbosity Level enum VerbosityLevel { DETAIL = 0; TERSE = 1; BRIEF = 2; } // Reply to telemetry agent operational states request message GetOperationalStateReply { // List of key-value pairs where // key = operational state definition // value = operational state value repeated KeyValue kv = 1; } // Message sent for a data encoding request message DataEncodingRequest { } // Reply to data encodings supported request message DataEncodingReply { repeated EncodingType encoding_list = 1; } // Encoding Type Supported enum EncodingType { UNDEFINED = 0; XML = 1; JSON_IETF = 2; PROTO3 = 3; }
为了进行服务调用(rpc TelemetrySubscribe),我首先需要读取具有订阅ID的头,然后开始阅读消息.现在,使用Java我能够连接服务,我确实介绍了拦截器但是当我打印/检索头时它是null.我的调用拦截器代码如下,
ClientInterceptor interceptor = new HeaderClientInterceptor(); originChannel = OkHttpChannelBuilder.forAddress(host,port) .usePlaintext(true) .build(); Channel channel = ClientInterceptors.intercept(originChannel,interceptor); telemetryStub = OpenConfigTelemetryGrpc.newStub(channel);
@Override public <ReqT,RespT> ClientCall<ReqT,RespT> interceptCall(MethodDescriptor<ReqT,RespT> method,CallOptions callOptions,Channel next) { return new SimpleForwardingClientCall<ReqT,RespT>(next.newCall(method,callOptions)) { @Override public void start(Listener<RespT> responseListener,Metadata headers) { super.start(new SimpleForwardingClientCallListener<RespT>(responseListener) { @Override public void onHeaders(Metadata headers) { Key<String> CUSTOM_HEADER_KEY = Metadata.Key.of("responseKEY",Metadata.ASCII_STRING_MARSHALLER); System.out.println("Contains Key?? "+headers.containsKey(CUSTOM_HEADER_KEY));
想知道有没有其他方法来读取元数据或第一个有订阅ID的消息?所有我需要阅读第一条有订阅ID的消息,并将相同的订阅ID返回给服务器以便流可以启动我使用相同的原型文件使用相同的Python代码,它通过下面的代码提供与服务器通信仅供参考:
sub_req = SubscribeRequestMsg("host",port) data_itr = stub.telemetrySubscribe(sub_req,_TIMEOUT_SECONDS) Metadata = data_itr.initial_Metadata() if Metadata[0][0] == "responseKey": Metainfo = Metadata[0][1] print Metainfo subreply = agent_pb2.SubscriptionReply() subreply.SetInParent() google.protobuf.text_format.Merge(Metainfo,subreply) if subreply.response.subscription_id: SUB_ID = subreply.response.subscription_id
从上面的python代码我可以轻松检索元数据对象,不知道如何使用Java检索它?
在阅读MetaData之后,我得到的是:元数据({content-type = [application / grpc],grpc-encoding = [identity],grpc-accept-encoding = [identity,deflate,gzip]})
但我知道从元数据到它还有一条线,也就是说
response { subscription_id: 2 }