// Copyright (c) 2014 The SurgeMQ Authors. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package message import ( "bytes" "fmt" "sync/atomic" ) // The SUBSCRIBE Packet is sent from the Client to the Server to create one or more // Subscriptions. Each Subscription registers a Client’s interest in one or more // Topics. The Server sends PUBLISH Packets to the Client in order to forward // Application Messages that were published to Topics that match these Subscriptions. // The SUBSCRIBE Packet also specifies (for each Subscription) the maximum QoS with // which the Server can send Application Messages to the Client. type SubscribeMessage struct { header topics [][]byte qos []byte } var _ Message = (*SubscribeMessage)(nil) // NewSubscribeMessage creates a new SUBSCRIBE message. func NewSubscribeMessage() *SubscribeMessage { msg := &SubscribeMessage{} msg.SetType(SUBSCRIBE) return msg } func (this SubscribeMessage) String() string { msgstr := fmt.Sprintf("%s, Packet ID=%d", this.header, this.PacketId()) for i, t := range this.topics { msgstr = fmt.Sprintf("%s, Topic[%d]=%q/%d", msgstr, i, string(t), this.qos[i]) } return msgstr } // Topics returns a list of topics sent by the Client. func (this *SubscribeMessage) Topics() [][]byte { return this.topics } // AddTopic adds a single topic to the message, along with the corresponding QoS. // An error is returned if QoS is invalid. func (this *SubscribeMessage) AddTopic(topic []byte, qos byte) error { if !ValidQos(qos) { return fmt.Errorf("Invalid QoS %d", qos) } var i int var t []byte var found bool for i, t = range this.topics { if bytes.Equal(t, topic) { found = true break } } if found { this.qos[i] = qos return nil } this.topics = append(this.topics, topic) this.qos = append(this.qos, qos) this.dirty = true return nil } // RemoveTopic removes a single topic from the list of existing ones in the message. // If topic does not exist it just does nothing. func (this *SubscribeMessage) RemoveTopic(topic []byte) { var i int var t []byte var found bool for i, t = range this.topics { if bytes.Equal(t, topic) { found = true break } } if found { this.topics = append(this.topics[:i], this.topics[i+1:]...) this.qos = append(this.qos[:i], this.qos[i+1:]...) } this.dirty = true } // TopicExists checks to see if a topic exists in the list. func (this *SubscribeMessage) TopicExists(topic []byte) bool { for _, t := range this.topics { if bytes.Equal(t, topic) { return true } } return false } // TopicQos returns the QoS level of a topic. If topic does not exist, QosFailure // is returned. func (this *SubscribeMessage) TopicQos(topic []byte) byte { for i, t := range this.topics { if bytes.Equal(t, topic) { return this.qos[i] } } return QosFailure } // Qos returns the list of QoS current in the message. func (this *SubscribeMessage) Qos() []byte { return this.qos } func (this *SubscribeMessage) Len() int { if !this.dirty { return len(this.dbuf) } ml := this.msglen() if err := this.SetRemainingLength(int32(ml)); err != nil { return 0 } return this.header.msglen() + ml } func (this *SubscribeMessage) Decode(src []byte) (int, error) { total := 0 hn, err := this.header.decode(src[total:]) total += hn if err != nil { return total, err } //this.packetId = binary.BigEndian.Uint16(src[total:]) this.packetId = src[total : total+2] total += 2 remlen := int(this.remlen) - (total - hn) for remlen > 0 { t, n, err := readLPBytes(src[total:]) total += n if err != nil { return total, err } this.topics = append(this.topics, t) this.qos = append(this.qos, src[total]) total++ remlen = remlen - n - 1 } if len(this.topics) == 0 { return 0, fmt.Errorf("subscribe/Decode: Empty topic list") } this.dirty = false return total, nil } func (this *SubscribeMessage) Encode(dst []byte) (int, error) { if !this.dirty { if len(dst) < len(this.dbuf) { return 0, fmt.Errorf("subscribe/Encode: Insufficient buffer size. Expecting %d, got %d.", len(this.dbuf), len(dst)) } return copy(dst, this.dbuf), nil } hl := this.header.msglen() ml := this.msglen() if len(dst) < hl+ml { return 0, fmt.Errorf("subscribe/Encode: Insufficient buffer size. Expecting %d, got %d.", hl+ml, len(dst)) } if err := this.SetRemainingLength(int32(ml)); err != nil { return 0, err } total := 0 n, err := this.header.encode(dst[total:]) total += n if err != nil { return total, err } if this.PacketId() == 0 { this.SetPacketId(uint16(atomic.AddUint64(&gPacketId, 1) & 0xffff)) //this.packetId = uint16(atomic.AddUint64(&gPacketId, 1) & 0xffff) } n = copy(dst[total:], this.packetId) //binary.BigEndian.PutUint16(dst[total:], this.packetId) total += n for i, t := range this.topics { n, err := writeLPBytes(dst[total:], t) total += n if err != nil { return total, err } dst[total] = this.qos[i] total++ } return total, nil } func (this *SubscribeMessage) msglen() int { // packet ID total := 2 for _, t := range this.topics { total += 2 + len(t) + 1 } return total }