Skip to content

printer

Printer

Bases: object

The Printer object is responsible for serial communications with a printer. The printer is expected to be running Marlin firmware.

Source code in mecode/printer.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
class Printer(object):
    """ The Printer object is responsible for serial communications with a
    printer. The printer is expected to be running Marlin firmware.

    """

    def __init__(self, port='/dev/tty.usbmodem1411', baudrate=250000):

        # USB port and baudrate for communication with the printer.
        self.port = port
        self.baudrate = baudrate

        # The Serial object that the printer is communicating on.
        self.s = None

        # List of the responses from the printer.
        self.responses = []

        # List of lines that were sent to the printer.
        self.sentlines = []

        # True if the print thread is alive and sending lines.
        self.printing = False

        # Set to True to pause the print.
        self.paused = False

        # If set to True, the read_thread will be closed as soon as possible.
        self.stop_reading = False

        # If set to True, the print_thread will be closed as soon as possible.
        self.stop_printing = False

        # List of all temperature string responses from the printer.
        self.temp_readings = []

        ### Private Attributes  ################################################

        # List of all lines to be sent to the printer.
        self._buffer = []

        # Index into the _buffer of the next line to send to the printer.
        self._current_line_idx = 0

        # This thread continuously sends lines as they appear in self._buffer.
        self._print_thread = None

        # This thread continuously reads lines as they appear from the printer.
        self._read_thread = None

        # Flag used to synchronize the print_thread and the read_thread. An 'ok'
        # needs to be returned for every line sent. When the print_thread sends
        # a line this flag is cleared, and when an 'ok' is received it is set.
        self._ok_received = Event()
        self._ok_received.set()

        # Lock used to ensure serial send/receive events are atomic with the
        # setting/clearing of the `_ok_received` flag.
        self._communication_lock = Lock()

        # Lock used to ensure connecting and disconnecting is atomic.
        self._connection_lock = Lock()

        # If False the Printer instacnce does not own the serial object passed
        # in and it should not be closed when finished with.
        self._owns_serial = True

        # This is set to true when a disconnect was requested. If a sendline is
        # called while this is true an error is raised.
        self._disconnect_pending = False

        # When we reset the line number Marlin's internal number will differ
        # from our own _current_line_idx. This offset is used to keep those two
        # in sync.
        self._reset_offset = 0

    ###  Printer Interface  ###################################################

    def connect(self, s=None):
        """ Instantiate a Serial object using the stored port and baudrate.

        Parameters
        ----------
        s : serial.Serial
            If a serial object is passed in then it will be used instead of
            creating a new one.

        """
        with self._connection_lock:
            if s is None:
                self.s = serial.Serial(self.port, self.baudrate, timeout=3)
            else:
                self.s = s
                self._owns_serial = False
            self._ok_received.set()
            self._current_line_idx = 0
            self._buffer = []
            self.responses = []
            self.sentlines = []
            self._disconnect_pending = False
            self._start_read_thread()
            if s is None:
                start_time = time()
                while len(self.responses) == 0 and time() < start_time + 0.1:
                    sleep(0.01)  # wait until a start message is recieved
                self.responses = []
        logger.debug('Connected to {}'.format(self.s))

    def disconnect(self, wait=False):
        """ Disconnect from the printer by stopping threads and closing the port

        Parameters
        ----------
        wait : Bool (default: False)
            If true, this method waits until all lines in the buffer have been
            sent and acknowledged before disconnecting.  Clearing the buffer
            isn't guaranteed.  If the read thread isn't running for some reason,
            this function may return without waiting even when wait is set to
            True.

        """
        with self._connection_lock:
            self._disconnect_pending = True
            if wait:
                buf_len = len(self._buffer)
                while buf_len > len(self.responses) and \
                      self._is_read_thread_running():
                    sleep(0.01)  # wait until all lines in the buffer are sent
            if self._print_thread is not None:
                self.stop_printing = True
                if self.s is not None and self.s.writeTimeout is not None:
                    timeout = self.s.writeTimeout + 1
                else:
                    timeout = 10
                self._print_thread.join(timeout)
            if self._read_thread is not None:
                self.stop_reading = True
                if self.s is not None and self.s.timeout is not None:
                    timeout = self.s.timeout + 1
                else:
                    timeout = 10
                self._read_thread.join(timeout)
            if self.s is not None and self._owns_serial is True:
                self.s.close()
                self.s = None
            self.printing = False
            self._current_line_idx = 0
            self._buffer = []
            self.responses = []
            self.sentlines = []
        self._disconnect_pending = False
        logger.debug('Disconnected from printer')

    def load_file(self, filepath):
        """ Load the given file into an internal _buffer. The lines will not be
        send until `self._start_print_thread()` is called.

        Parameters
        ----------
        filepath : str
            The path to a text file containing lines of GCode to be printed.

        """
        lines = []
        with open(filepath) as f:
            for line in f:
                line = line.strip()
                if ';' in line:  # clear out the comments
                    line = line.split(';')[0]
                if line:
                    lines.append(line)
        self._buffer.extend(lines)

    def start(self):
        """ Starts the read_thread and the _print_thread.
        """
        self._start_read_thread()
        self._start_print_thread()
        self.reset_linenumber(self._current_line_idx)

    def sendline(self, line):
        """ Send the given line over serial by appending it to the send buffer

        Parameters
        ----------
        line : str
            A line of GCode to send to the printer.

        """
        if self._disconnect_pending:
            msg = 'Attempted to send line after a disconnect was requested: {}'
            raise RuntimeError(msg.format(line))
        if line:
            line = str(line).strip()
            if ';' in line:  # clear out the comments
                line = line.split(';')[0]
            if line:
                self._buffer.append(line)

    def get_response(self, line, timeout=0):
        """ Send the given line and return the response from the printer.

        Parameters
        ----------
        line : str
            The line to send to the printer

        Returns
        -------
        r : str
            The response from the printer.

        """
        buf_len = len(self._buffer) + 1
        self.sendline(line)
        start_time = time()
        while len(self.responses) != buf_len:
            if len(self.responses) > buf_len:
                msg = "Received more responses than lines sent"
                raise RuntimeError(msg)
            if timeout > 0 and (time() - start_time) > timeout:
                return ''  # return blank string on timeout.
            if not self._is_read_thread_running():
                raise RuntimeError("can't get response from serial since read thread isn't running")
            sleep(0.01)
        return self.responses[-1]

    def current_position(self):
        """ Get the current postion of the printer.

        Returns
        -------
        pos : dict
            Dict with keys of 'X', 'Y', 'Z', and 'E' and values of their
            positions

        """
        # example r: X:0.00 Y:0.00 Z:0.00 E:0.00 Count X: 0.00 Y:0.00 Z:0.00
        r = self.get_response("M114")
        r = r.split(' Count')[0].strip().split()
        r = [x.split(':') for x in r]
        pos = dict([(k, float(v)) for k, v in r])
        return pos

    def current_temperature(self):
        """ Get the current temperature of the printer.

        Returns
        -------
        temp : dict
            Dict with keys of 'T', 'B', 'T/', 'B/', '@', and 'B@'
            and values of their temperatures and powers.
            T = extruder temperature, can also be T0, T1 ..
            B = bed temperature
            */ = target temperature
            C = chamber temperature
            @ = hotend power
            B@ = bed power
        """
        # example r: T:149.98 /150.00 B:60.00 /60.00 @:72 B@:30
        r = self.get_response("M105")
        r = r.replace(' /', '/').strip().split()
        temp = {}
        for item in r:
            if ':' in item:
                name, val = item.split(':', 1)
                if '/' in val:
                    val1, val2 = val.split('/')
                    temp[name] = float(val1)
                    temp[name + '/'] = float(val2)
                else:
                    temp[name] = float(val)
        return temp

    def reset_linenumber(self, number = 0):
        line = "M110 N{}".format(number)
        self.sendline(line)

    ###  Private Methods  ######################################################

    def _start_print_thread(self):
        """ Spawns a new thread that will send all lines in the _buffer over
        serial to the printer. This thread can be stopped by setting
        `stop_printing` to True. If a print_thread already exists and is alive,
        this method does nothing.

        """
        if self._is_print_thread_running():
            return
        self.printing = True
        self.stop_printing = False
        self._print_thread = Thread(target=self._print_worker_entrypoint, name='Print')
        self._print_thread.daemon = True
        self._print_thread.start()
        logger.debug('print_thread started')

    def _start_read_thread(self):
        """ Spawns a new thread that will continuously read lines from the
        printer. This thread can be stopped by setting `stop_reading` to True.
        If a print_thread already exists and is alive, this method does
        nothing.

        """
        if self._is_read_thread_running():
            return
        self.stop_reading = False
        self._read_thread = Thread(target=self._read_worker_entrypoint, name='Read')
        self._read_thread.daemon = True
        self._read_thread.start()
        logger.debug('read_thread started')

    def _print_worker_entrypoint(self):
        try:
            self._print_worker()
        except Exception as e:
            logger.exception("Exception running print worker: " + str(e))

    def _read_worker_entrypoint(self):
        try:
            self._read_worker()
        except Exception as e:
            logger.exception("Exception running read worker: " + str(e))

    def _is_print_thread_running(self):
        return self._print_thread is not None and self._print_thread.is_alive()

    def _is_read_thread_running(self):
        return self._read_thread is not None and self._read_thread.is_alive()

    def _print_worker(self):
        """ This method is spawned in the print thread. It loops over every line
        in the _buffer and sends it over seriwal to the printer.

        """
        while not self.stop_printing:
            _paused = False
            while self.paused is True and not self.stop_printing:
                if _paused is False:
                    logger.debug('Printer.paused is True, waiting...')
                    _paused = True
                sleep(0.01)
            if _paused is True:
                logger.debug('Printer.paused is now False, resuming.')
            if self._current_line_idx < len(self._buffer):
                self.printing = True
                while not self._ok_received.is_set() and not self.stop_printing:
                    self._ok_received.wait(1)
                line = self._next_line()
                with self._communication_lock:
                    self.s.write(line.encode('utf-8'))
                    self._ok_received.clear()
                    self._current_line_idx += 1
                # Grab the just sent line without line numbers or checksum
                plain_line = self._buffer[self._current_line_idx - 1].strip()
                self.sentlines.append(plain_line)
            else:  # if there aren't new lines wait 10ms and check again
                sleep(0.01)
                self.printing = False

    def _read_worker(self):
        """ This method is spawned in the read thread. It continuously reads
        from the printer over serial and checks for 'ok's.

        """
        full_resp = ''
        while not self.stop_reading:
            if self.s is not None:
                line = self.s.readline()
                if line.startswith('Resend: '):  # example line: "Resend: 143"
                    self._current_line_idx = int(line.split()[1]) - 1 + self._reset_offset
                    logger.debug('Resend Requested - {}'.format(line.strip()))
                    with self._communication_lock:
                        self._ok_received.set()
                    continue
                if line.startswith('T:'):
                    self.temp_readings.append(line)
                if line:
                    full_resp += line
                    # If there is no newline char in the response that means
                    # serial.readline() hit the timeout before a full line. This
                    # means communication has broken down so both threads need
                    # to be closed down.
                    if '\n' not in line:
                        self.printing = False
                        self.stop_printing = True
                        self.stop_reading = True
                        with self._communication_lock:
                            self._ok_received.set()
                        msg = """readline timed out mid-line.
                            last sentline:  {}
                            response:       {}
                        """
                        raise RuntimeError(msg.format(self.sentlines[-1:],
                                                      full_resp))
                if 'ok' in line:
                    with self._communication_lock:
                        self._ok_received.set()
                    self.responses.append(full_resp)
                    full_resp = ''
                if 'start' in line:
                    self.responses.append(line)
                if line.startswith('echo:'):
                    logger.info(line.rstrip()[len('echo:'):])
            else:  # if no printer is attached, wait 10ms to check again.
                sleep(0.01)

    def _next_line(self):
        """ Prepares the next line to be sent to the printer by prepending the
        line number and appending a checksum and newline character.

        """
        line = self._buffer[self._current_line_idx].strip()
        if line.startswith('M110 N'):
            new_number = int(line[6:])
            self._reset_offset = self._current_line_idx + 1 - new_number
        elif line.startswith('M110'):
            self._reset_offset = self._current_line_idx + 1
        idx = self._current_line_idx + 1 - self._reset_offset
        line = 'N{} {}'.format(idx, line)
        checksum = self._checksum(line)
        return '{}*{}\n'.format(line, checksum)

    def _checksum(self, line):
        """ Calclate the checksum by xor'ing all characters together.
        """
        if not line:
            raise RuntimeError("cannot compute checksum of an empty string")
        return reduce(lambda a, b: a ^ b, [ord(char) for char in line])

connect(s=None)

Instantiate a Serial object using the stored port and baudrate.

Parameters:

Name Type Description Default
s Serial

If a serial object is passed in then it will be used instead of creating a new one.

None
Source code in mecode/printer.py
def connect(self, s=None):
    """ Instantiate a Serial object using the stored port and baudrate.

    Parameters
    ----------
    s : serial.Serial
        If a serial object is passed in then it will be used instead of
        creating a new one.

    """
    with self._connection_lock:
        if s is None:
            self.s = serial.Serial(self.port, self.baudrate, timeout=3)
        else:
            self.s = s
            self._owns_serial = False
        self._ok_received.set()
        self._current_line_idx = 0
        self._buffer = []
        self.responses = []
        self.sentlines = []
        self._disconnect_pending = False
        self._start_read_thread()
        if s is None:
            start_time = time()
            while len(self.responses) == 0 and time() < start_time + 0.1:
                sleep(0.01)  # wait until a start message is recieved
            self.responses = []
    logger.debug('Connected to {}'.format(self.s))

current_position()

Get the current postion of the printer.

Returns:

Name Type Description
pos dict

Dict with keys of 'X', 'Y', 'Z', and 'E' and values of their positions

Source code in mecode/printer.py
def current_position(self):
    """ Get the current postion of the printer.

    Returns
    -------
    pos : dict
        Dict with keys of 'X', 'Y', 'Z', and 'E' and values of their
        positions

    """
    # example r: X:0.00 Y:0.00 Z:0.00 E:0.00 Count X: 0.00 Y:0.00 Z:0.00
    r = self.get_response("M114")
    r = r.split(' Count')[0].strip().split()
    r = [x.split(':') for x in r]
    pos = dict([(k, float(v)) for k, v in r])
    return pos

current_temperature()

Get the current temperature of the printer.

Returns:

Name Type Description
temp dict

Dict with keys of 'T', 'B', 'T/', 'B/', '@', and 'B@' and values of their temperatures and powers. T = extruder temperature, can also be T0, T1 .. B = bed temperature */ = target temperature C = chamber temperature @ = hotend power B@ = bed power

Source code in mecode/printer.py
def current_temperature(self):
    """ Get the current temperature of the printer.

    Returns
    -------
    temp : dict
        Dict with keys of 'T', 'B', 'T/', 'B/', '@', and 'B@'
        and values of their temperatures and powers.
        T = extruder temperature, can also be T0, T1 ..
        B = bed temperature
        */ = target temperature
        C = chamber temperature
        @ = hotend power
        B@ = bed power
    """
    # example r: T:149.98 /150.00 B:60.00 /60.00 @:72 B@:30
    r = self.get_response("M105")
    r = r.replace(' /', '/').strip().split()
    temp = {}
    for item in r:
        if ':' in item:
            name, val = item.split(':', 1)
            if '/' in val:
                val1, val2 = val.split('/')
                temp[name] = float(val1)
                temp[name + '/'] = float(val2)
            else:
                temp[name] = float(val)
    return temp

disconnect(wait=False)

Disconnect from the printer by stopping threads and closing the port

Parameters:

Name Type Description Default
wait Bool (default: False)

If true, this method waits until all lines in the buffer have been sent and acknowledged before disconnecting. Clearing the buffer isn't guaranteed. If the read thread isn't running for some reason, this function may return without waiting even when wait is set to True.

False
Source code in mecode/printer.py
def disconnect(self, wait=False):
    """ Disconnect from the printer by stopping threads and closing the port

    Parameters
    ----------
    wait : Bool (default: False)
        If true, this method waits until all lines in the buffer have been
        sent and acknowledged before disconnecting.  Clearing the buffer
        isn't guaranteed.  If the read thread isn't running for some reason,
        this function may return without waiting even when wait is set to
        True.

    """
    with self._connection_lock:
        self._disconnect_pending = True
        if wait:
            buf_len = len(self._buffer)
            while buf_len > len(self.responses) and \
                  self._is_read_thread_running():
                sleep(0.01)  # wait until all lines in the buffer are sent
        if self._print_thread is not None:
            self.stop_printing = True
            if self.s is not None and self.s.writeTimeout is not None:
                timeout = self.s.writeTimeout + 1
            else:
                timeout = 10
            self._print_thread.join(timeout)
        if self._read_thread is not None:
            self.stop_reading = True
            if self.s is not None and self.s.timeout is not None:
                timeout = self.s.timeout + 1
            else:
                timeout = 10
            self._read_thread.join(timeout)
        if self.s is not None and self._owns_serial is True:
            self.s.close()
            self.s = None
        self.printing = False
        self._current_line_idx = 0
        self._buffer = []
        self.responses = []
        self.sentlines = []
    self._disconnect_pending = False
    logger.debug('Disconnected from printer')

get_response(line, timeout=0)

Send the given line and return the response from the printer.

Parameters:

Name Type Description Default
line str

The line to send to the printer

required

Returns:

Name Type Description
r str

The response from the printer.

Source code in mecode/printer.py
def get_response(self, line, timeout=0):
    """ Send the given line and return the response from the printer.

    Parameters
    ----------
    line : str
        The line to send to the printer

    Returns
    -------
    r : str
        The response from the printer.

    """
    buf_len = len(self._buffer) + 1
    self.sendline(line)
    start_time = time()
    while len(self.responses) != buf_len:
        if len(self.responses) > buf_len:
            msg = "Received more responses than lines sent"
            raise RuntimeError(msg)
        if timeout > 0 and (time() - start_time) > timeout:
            return ''  # return blank string on timeout.
        if not self._is_read_thread_running():
            raise RuntimeError("can't get response from serial since read thread isn't running")
        sleep(0.01)
    return self.responses[-1]

load_file(filepath)

Load the given file into an internal _buffer. The lines will not be send until self._start_print_thread() is called.

Parameters:

Name Type Description Default
filepath str

The path to a text file containing lines of GCode to be printed.

required
Source code in mecode/printer.py
def load_file(self, filepath):
    """ Load the given file into an internal _buffer. The lines will not be
    send until `self._start_print_thread()` is called.

    Parameters
    ----------
    filepath : str
        The path to a text file containing lines of GCode to be printed.

    """
    lines = []
    with open(filepath) as f:
        for line in f:
            line = line.strip()
            if ';' in line:  # clear out the comments
                line = line.split(';')[0]
            if line:
                lines.append(line)
    self._buffer.extend(lines)

sendline(line)

Send the given line over serial by appending it to the send buffer

Parameters:

Name Type Description Default
line str

A line of GCode to send to the printer.

required
Source code in mecode/printer.py
def sendline(self, line):
    """ Send the given line over serial by appending it to the send buffer

    Parameters
    ----------
    line : str
        A line of GCode to send to the printer.

    """
    if self._disconnect_pending:
        msg = 'Attempted to send line after a disconnect was requested: {}'
        raise RuntimeError(msg.format(line))
    if line:
        line = str(line).strip()
        if ';' in line:  # clear out the comments
            line = line.split(';')[0]
        if line:
            self._buffer.append(line)

start()

Starts the read_thread and the _print_thread.

Source code in mecode/printer.py
def start(self):
    """ Starts the read_thread and the _print_thread.
    """
    self._start_read_thread()
    self._start_print_thread()
    self.reset_linenumber(self._current_line_idx)