[MQTT / Mosca] Mqtt Packet 에 Command Type 추가하기 (for authentication) #1

[MQTT / Mosca] Mqtt Packet 에 Command Type 추가하기 (for authentication) #2



하잇!

아 이어서 정리하려니까 넘 힘들다

언젠가는 내가 이걸 다시 보고 공부를 할까?

왜 자꾸 부질 없는 것 같지... 흑흑


그래도 내가 공부한 거 정리하는 거니까 마저 열씨미 해야징 ㅠㅠ





#MQTT Packet Command 추가하기



Authentication 을 위한 Packet 흐름


1. Connect Packet (Device -> Server)

2. Authenticate Packet (Server -> Device)

3. Authenticate Packet (Device -> Server)

4. Connect ACK Packet (Server -> Device)


2단계를 하고 있었다.

저번 글에서는 #4. mqtt-packet (generate) 에서 모듈 열어보고, authenticate type packet 을 generate 하는 코드 추가해주고 테스트까지 해봤다.


그럼 2단계인 Server -> Device 에서

"Server -" 정도 까지 된 거다.

남은 "> Device" 부분을 이 글에서 볼 거고!



이제 Server 측에서 authenticate type packet 을 보내는 걸 했으니!

Device 에서 받아야한다. 

해당 Packet 을 제대로 받으려면, Authenticate type 을 parsing 할 수 있어야겠지?


Device 쪽 코드에서는 mqtt library 를 쓴다.

그러니 이번에는 mqtt library를 열어볼 거야



#5. mqtt (receive)


Device 에서는 mqtt library를 이용해 client 를 생성하고,Server 에 Connect 한다.


그리고 client 에다 관련 event들을 handling 하는 callback function 들을 달아주는데,

우리는 authenticate type 의 packet 에 대한 event 도 받아야 한다!

그러려면 일단 event를 emit 하는 부분을 찾아봐야 하지 않을까?!


(mqtt/lib/client.js)

MqttClient.prototype._handleConnack = function (packet) {
  var rc = packet.returnCode,
    // TODO: move to protocol
    errors = [
      '',
      'Unacceptable protocol version',
      'Identifier rejected',
      'Server unavailable',
      'Bad username or password',
      'Not authorized'
    ];

  clearTimeout(this.connackTimer);

  if (0 === rc) {
    this.reconnecting = false;
    this.emit('connect', packet);
  } else if (0 < rc) {
    this.emit('error',
        new Error('Connection refused: ' + errors[rc]));
  }
};

그래서 evnet를 emit 하는 부분을 찾아보았다!

저거 말고도 많은데, 일단 connect event 를 emit 하는 부분을 찾아봤더니, 저렇게 되어있었다.


그럼 다음 수순은 뭘까?

저 function 을 호출하는 부분을 찾아야겠지!?


(mqtt/lib/client.js)

MqttClient.prototype._handlePacket = function (packet, done) {
  this.emit('packetreceive', packet);

  switch (packet.cmd) {
    case 'publish':
      this._handlePublish(packet, done);
      break;
    case 'authenticate':
      this.emit('authenticate', packet);
      break;
    case 'puback':
    case 'pubrec':
    case 'pubcomp':
    case 'suback':
    case 'unsuback':
      this._handleAck(packet);
      done();
      break;
    case 'pubrel':
      this._handlePubrel(packet, done);
      break;
    case 'connack':
      this._handleConnack(packet);
      done();
      break;
    case 'pingresp':
      this._handlePingresp(packet);
      done();
      break;
    default:
      // do nothing
      // maybe we should do an error handling
      // or just log it
      break;
  }
};


짜잔!


이제 packet type 에 따라 event를 발생시켜주는 부분을 알았으니,

저기다 authenticate event 를 emit 하는 코드를 추가해주면

client 에서 authenticate event 를 handling 할 수 있을 거야!!


그래서 authenticate case 를 추가해줬다.


이럼 끝일까?


젠젠,, ㅠ


Packet Parsing 이 남았다.

기껏 authenticate packet 을 전송하는 거랑, 해당 event 를 처리할 수 있도록 준비까지 다 마쳐놨으니!

제대로 authenticate packet 타입을 처리해줘야 한다.


parsing 을 어디서 하는지 알아보자.


일단 저 _handlePacket function이 packet 이란 parameter 를 받는 것으로 보아,

어딘가에서 packet을 parsing 해서 _handlePacket function 을 호출해주겠지?



(mqtt/lib/client.js) 요 파일 많이많이 본다

function process () {
  var packet = packets.shift(),
    done = completeParse;
  if (packet) {
    that._handlePacket(packet, process);
  } else {
    completeParse = null;
    done();
  }
}


여기다!

근데 packet 을 packets 라는 변수에서 꺼내오는 걸 보니

packet 은 이 function 이 호출되기 전에 이미 처리되어 packets 에 저장되는 것 같다.

그럼 또 이 process function 이 호출되는 곳을 봐야한다.



(mqtt/lib/client.js)

writable._write = function (buf, enc, done) {
  console.log(buf)
  completeParse = done;
  parser.parse(buf);
  process();
};

this.stream.pipe(writable);

요기 있지!!


여길 보면 stream.pipe 란 애가 있는데, 요주의..

아마 나중에 또 등장할 거다. 한 마지막편쯤에..


어쨌든 저 pipe 란 애의 원래 역할은, 

readable stream 에 pipe 로 writable stream 을 연결해주면,

readable stream 이 읽어들인 data buffer 를 writable stream 으로 토스해준다.

그리고 그 buffer 를 처리해주는 부분이 writable._write 로 정의된 저 function 인 것 같다.

자세한 건 document 참조


여튼 그렇게 읽어들인 buffer 를 처리하는 부분에

내가 찾던 키워드가 있다!


parser.parse(buf)...


저 parser 는 또 뭘까 해서 보니

2편에 나왔던 그 아이다

mqtt-packet..




#6. mqtt-packet (parse)


그럼 다시 봐야지 뭐..


(mqtt-packet/parser.js)

Parser.prototype._parsePayload = function () {
  var result = false

  // Do we have a payload? Do we have enough data to complete the payload?
  // PINGs have no payload
  if (this.packet.length === 0 || this._list.length >= this.packet.length) {

    this._pos = 0

    switch (this.packet.cmd) {
      case 'connect':
        this._parseConnect()
        break
      case 'authenticate':
        this._parseAuth()
        break
      case 'connack':
        this._parseConnack()
        break
      case 'publish':
        this._parsePublish()
        break
      case 'puback':
      case 'pubrec':
      case 'pubrel':
      case 'pubcomp':
        this._parseMessageId()
        break
      case 'subscribe':
        this._parseSubscribe()
        break
      case 'suback':
        this._parseSuback()
        break
      case 'unsubscribe':
        this._parseUnsubscribe()
        break
      case 'unsuback':
        this._parseUnsuback()
        break
      case 'pingreq':
      case 'pingresp':
      case 'disconnect':
        // these are empty, nothing to do
        break
      default:
        this._emitError(new Error('not supported'))
    }

    result = true
  }

  return result
}

흐흐 나 코드 따라가는 것 좀 잘 하는 것 같다.


여튼 여기 등장! 빠밤

그럼 여기다 또 언제나 그래왔듯 authenticate case 를 추가해주면 된다.


근데 또 못보던 function 이 은근슬쩍 등장했다.

그렇다. 만들어줘야한다.

진짜 개귀찮다. 내가 왜 이 짓을 시작했지

역시 코딩은 삽질의 연속이다

인정? 어인정~ 동의? 어 보감~ 동휘? 어 보검~~~~~~~~~~~~~~~



(mqtt-packet/parser.js)

Parser.prototype._parseAuth = function() {
  var protocolId // constants id
    , clientId // Client id
    , password // Password
    , username // Username
    , flags = {}
    , packet = this.packet

  // Parse constants id
  protocolId = this._parseString()
  if (protocolId === null)
    return this._emitError(new Error('cannot parse protocol id'))

  if (protocolId != 'MQTT' && protocolId != 'MQIsdp') {

    return this._emitError(new Error('invalid protocol id'))
  }

  packet.protocolId = protocolId

  // Parse constants version number
  if(this._pos >= this._list.length)
    return this._emitError(new Error('packet too short'))

  packet.protocolVersion = this._list.readUInt8(this._pos)

  if(packet.protocolVersion != 3 && packet.protocolVersion != 4) {

    return this._emitError(new Error('invalid protocol version'))
  }

  this._pos++
  if(this._pos >= this._list.length)
    return this._emitError(new Error('packet too short'))

  // Parse connect flags
  flags.username  = (this._list.readUInt8(this._pos) & constants.USERNAME_MASK)
  flags.password  = (this._list.readUInt8(this._pos) & constants.PASSWORD_MASK)

  packet.clean = (this._list.readUInt8(this._pos) & constants.CLEAN_SESSION_MASK) !== 0
  this._pos++

  // Parse keepalive
  packet.keepalive = this._parseNum()
  if(packet.keepalive === -1)
    return this._emitError(new Error('packet too short'))

  // Parse client ID
  clientId = this._parseString()
  if(clientId === null)
    return this._emitError(new Error('packet too short'))
  packet.clientId = clientId

  // Parse username
  if (flags.username) {
    username = this._parseString()
    if(username === null)
      return this._emitError(new Error('cannot parse username'))
    packet.username = username
  }

  // Parse password
  if(flags.password) {
    password = this._parseBuffer()
    if(password === null)
      return this._emitError(new Error('cannot parse username'))
    packet.password = password
  }

  return packet
}


이번에도 역시 _parseConnect 를 참고했다.

참고하되 저번에 만들어둔 Authenticate Generate Function 을 꼭 고려하면서 짜깁기해야한다!

그때 넣었던 데이터들 잘 참고해서 고쳐줘야함!


요까지 해주면 device 에서 authenticate packet 을 받는 것도 매우 잘 된다!


아래는 받는 코드

client.on('authenticate', function(packet) {
  console.log(packet);
});


아 근데 주의할 점이 있다.


혹여나 (가 아니고 대개 그렇겠지만)

Server process 를 돌리는 project directory 와

Device process 를 돌리는 project directory 가 다르다면

각각 node_modules directory 에서 적합한 library 를 수정해줘야 하는데,


지금까지 한 Authenticate Packet (Server -> Device) 단계에서

Server / Device 둘 모두에서 고쳐줘야 하는 부분이 있다.


constants.js !!


authenticate 타입을

Server 에선 추가해줬는데 Device 에서 안 해줬거나

혹은 그 반대의 상황이어도

보내거나 받는 게 제대로 안 된다.


꼭 확인!


ㅠㅠ 다음편엔 3단계로 넘어갈 예정.

정리 넘 힘들당 

+ Recent posts