classPrinter(object):""" The Printer object is responsible for serial communications with a printer. The printer is expected to be running Marlin firmware. """def__init__(self,port='/dev/tty.usbmodem1411',baudrate=250000):# USB port and baudrate for communication with the printer.self.port=portself.baudrate=baudrate# The Serial object that the printer is communicating on.self.s=None# List of the responses from the printer.self.responses=[]# List of lines that were sent to the printer.self.sentlines=[]# True if the print thread is alive and sending lines.self.printing=False# Set to True to pause the print.self.paused=False# If set to True, the read_thread will be closed as soon as possible.self.stop_reading=False# If set to True, the print_thread will be closed as soon as possible.self.stop_printing=False# List of all temperature string responses from the printer.self.temp_readings=[]### Private Attributes ################################################# List of all lines to be sent to the printer.self._buffer=[]# Index into the _buffer of the next line to send to the printer.self._current_line_idx=0# This thread continuously sends lines as they appear in self._buffer.self._print_thread=None# This thread continuously reads lines as they appear from the printer.self._read_thread=None# Flag used to synchronize the print_thread and the read_thread. An 'ok'# needs to be returned for every line sent. When the print_thread sends# a line this flag is cleared, and when an 'ok' is received it is set.self._ok_received=Event()self._ok_received.set()# Lock used to ensure serial send/receive events are atomic with the# setting/clearing of the `_ok_received` flag.self._communication_lock=Lock()# Lock used to ensure connecting and disconnecting is atomic.self._connection_lock=Lock()# If False the Printer instacnce does not own the serial object passed# in and it should not be closed when finished with.self._owns_serial=True# This is set to true when a disconnect was requested. If a sendline is# called while this is true an error is raised.self._disconnect_pending=False# When we reset the line number Marlin's internal number will differ# from our own _current_line_idx. This offset is used to keep those two# in sync.self._reset_offset=0### Printer Interface ###################################################defconnect(self,s=None):""" Instantiate a Serial object using the stored port and baudrate. Parameters ---------- s : serial.Serial If a serial object is passed in then it will be used instead of creating a new one. """withself._connection_lock:ifsisNone:self.s=serial.Serial(self.port,self.baudrate,timeout=3)else:self.s=sself._owns_serial=Falseself._ok_received.set()self._current_line_idx=0self._buffer=[]self.responses=[]self.sentlines=[]self._disconnect_pending=Falseself._start_read_thread()ifsisNone:start_time=time()whilelen(self.responses)==0andtime()<start_time+0.1:sleep(0.01)# wait until a start message is recievedself.responses=[]logger.debug('Connected to {}'.format(self.s))defdisconnect(self,wait=False):""" Disconnect from the printer by stopping threads and closing the port Parameters ---------- wait : Bool (default: False) If true, this method waits until all lines in the buffer have been sent and acknowledged before disconnecting. Clearing the buffer isn't guaranteed. If the read thread isn't running for some reason, this function may return without waiting even when wait is set to True. """withself._connection_lock:self._disconnect_pending=Trueifwait:buf_len=len(self._buffer)whilebuf_len>len(self.responses)and \
self._is_read_thread_running():sleep(0.01)# wait until all lines in the buffer are sentifself._print_threadisnotNone:self.stop_printing=Trueifself.sisnotNoneandself.s.writeTimeoutisnotNone:timeout=self.s.writeTimeout+1else:timeout=10self._print_thread.join(timeout)ifself._read_threadisnotNone:self.stop_reading=Trueifself.sisnotNoneandself.s.timeoutisnotNone:timeout=self.s.timeout+1else:timeout=10self._read_thread.join(timeout)ifself.sisnotNoneandself._owns_serialisTrue:self.s.close()self.s=Noneself.printing=Falseself._current_line_idx=0self._buffer=[]self.responses=[]self.sentlines=[]self._disconnect_pending=Falselogger.debug('Disconnected from printer')defload_file(self,filepath):""" Load the given file into an internal _buffer. The lines will not be send until `self._start_print_thread()` is called. Parameters ---------- filepath : str The path to a text file containing lines of GCode to be printed. """lines=[]withopen(filepath)asf:forlineinf:line=line.strip()if';'inline:# clear out the commentsline=line.split(';')[0]ifline:lines.append(line)self._buffer.extend(lines)defstart(self):""" Starts the read_thread and the _print_thread. """self._start_read_thread()self._start_print_thread()self.reset_linenumber(self._current_line_idx)defsendline(self,line):""" Send the given line over serial by appending it to the send buffer Parameters ---------- line : str A line of GCode to send to the printer. """ifself._disconnect_pending:msg='Attempted to send line after a disconnect was requested: {}'raiseRuntimeError(msg.format(line))ifline:line=str(line).strip()if';'inline:# clear out the commentsline=line.split(';')[0]ifline:self._buffer.append(line)defget_response(self,line,timeout=0):""" Send the given line and return the response from the printer. Parameters ---------- line : str The line to send to the printer Returns ------- r : str The response from the printer. """buf_len=len(self._buffer)+1self.sendline(line)start_time=time()whilelen(self.responses)!=buf_len:iflen(self.responses)>buf_len:msg="Received more responses than lines sent"raiseRuntimeError(msg)iftimeout>0and(time()-start_time)>timeout:return''# return blank string on timeout.ifnotself._is_read_thread_running():raiseRuntimeError("can't get response from serial since read thread isn't running")sleep(0.01)returnself.responses[-1]defcurrent_position(self):""" Get the current postion of the printer. Returns ------- pos : dict Dict with keys of 'X', 'Y', 'Z', and 'E' and values of their positions """# example r: X:0.00 Y:0.00 Z:0.00 E:0.00 Count X: 0.00 Y:0.00 Z:0.00r=self.get_response("M114")r=r.split(' Count')[0].strip().split()r=[x.split(':')forxinr]pos=dict([(k,float(v))fork,vinr])returnposdefcurrent_temperature(self):""" Get the current temperature of the printer. Returns ------- temp : dict Dict with keys of 'T', 'B', 'T/', 'B/', '@', and 'B@' and values of their temperatures and powers. T = extruder temperature, can also be T0, T1 .. B = bed temperature */ = target temperature C = chamber temperature @ = hotend power B@ = bed power """# example r: T:149.98 /150.00 B:60.00 /60.00 @:72 B@:30r=self.get_response("M105")r=r.replace(' /','/').strip().split()temp={}foriteminr:if':'initem:name,val=item.split(':',1)if'/'inval:val1,val2=val.split('/')temp[name]=float(val1)temp[name+'/']=float(val2)else:temp[name]=float(val)returntempdefreset_linenumber(self,number=0):line="M110 N{}".format(number)self.sendline(line)### Private Methods ######################################################def_start_print_thread(self):""" Spawns a new thread that will send all lines in the _buffer over serial to the printer. This thread can be stopped by setting `stop_printing` to True. If a print_thread already exists and is alive, this method does nothing. """ifself._is_print_thread_running():returnself.printing=Trueself.stop_printing=Falseself._print_thread=Thread(target=self._print_worker_entrypoint,name='Print')self._print_thread.daemon=Trueself._print_thread.start()logger.debug('print_thread started')def_start_read_thread(self):""" Spawns a new thread that will continuously read lines from the printer. This thread can be stopped by setting `stop_reading` to True. If a print_thread already exists and is alive, this method does nothing. """ifself._is_read_thread_running():returnself.stop_reading=Falseself._read_thread=Thread(target=self._read_worker_entrypoint,name='Read')self._read_thread.daemon=Trueself._read_thread.start()logger.debug('read_thread started')def_print_worker_entrypoint(self):try:self._print_worker()exceptExceptionase:logger.exception("Exception running print worker: "+str(e))def_read_worker_entrypoint(self):try:self._read_worker()exceptExceptionase:logger.exception("Exception running read worker: "+str(e))def_is_print_thread_running(self):returnself._print_threadisnotNoneandself._print_thread.is_alive()def_is_read_thread_running(self):returnself._read_threadisnotNoneandself._read_thread.is_alive()def_print_worker(self):""" This method is spawned in the print thread. It loops over every line in the _buffer and sends it over seriwal to the printer. """whilenotself.stop_printing:_paused=Falsewhileself.pausedisTrueandnotself.stop_printing:if_pausedisFalse:logger.debug('Printer.paused is True, waiting...')_paused=Truesleep(0.01)if_pausedisTrue:logger.debug('Printer.paused is now False, resuming.')ifself._current_line_idx<len(self._buffer):self.printing=Truewhilenotself._ok_received.is_set()andnotself.stop_printing:self._ok_received.wait(1)line=self._next_line()withself._communication_lock:self.s.write(line.encode('utf-8'))self._ok_received.clear()self._current_line_idx+=1# Grab the just sent line without line numbers or checksumplain_line=self._buffer[self._current_line_idx-1].strip()self.sentlines.append(plain_line)else:# if there aren't new lines wait 10ms and check againsleep(0.01)self.printing=Falsedef_read_worker(self):""" This method is spawned in the read thread. It continuously reads from the printer over serial and checks for 'ok's. """full_resp=''whilenotself.stop_reading:ifself.sisnotNone:line=self.s.readline()ifline.startswith('Resend: '):# example line: "Resend: 143"self._current_line_idx=int(line.split()[1])-1+self._reset_offsetlogger.debug('Resend Requested - {}'.format(line.strip()))withself._communication_lock:self._ok_received.set()continueifline.startswith('T:'):self.temp_readings.append(line)ifline:full_resp+=line# If there is no newline char in the response that means# serial.readline() hit the timeout before a full line. This# means communication has broken down so both threads need# to be closed down.if'\n'notinline:self.printing=Falseself.stop_printing=Trueself.stop_reading=Truewithself._communication_lock:self._ok_received.set()msg="""readline timed out mid-line. last sentline: {} response: {} """raiseRuntimeError(msg.format(self.sentlines[-1:],full_resp))if'ok'inline:withself._communication_lock:self._ok_received.set()self.responses.append(full_resp)full_resp=''if'start'inline:self.responses.append(line)ifline.startswith('echo:'):logger.info(line.rstrip()[len('echo:'):])else:# if no printer is attached, wait 10ms to check again.sleep(0.01)def_next_line(self):""" Prepares the next line to be sent to the printer by prepending the line number and appending a checksum and newline character. """line=self._buffer[self._current_line_idx].strip()ifline.startswith('M110 N'):new_number=int(line[6:])self._reset_offset=self._current_line_idx+1-new_numberelifline.startswith('M110'):self._reset_offset=self._current_line_idx+1idx=self._current_line_idx+1-self._reset_offsetline='N{}{}'.format(idx,line)checksum=self._checksum(line)return'{}*{}\n'.format(line,checksum)def_checksum(self,line):""" Calclate the checksum by xor'ing all characters together. """ifnotline:raiseRuntimeError("cannot compute checksum of an empty string")returnreduce(lambdaa,b:a^b,[ord(char)forcharinline])
connect(s=None)
Instantiate a Serial object using the stored port and baudrate.
Parameters:
Name
Type
Description
Default
s
Serial
If a serial object is passed in then it will be used instead of
creating a new one.
defconnect(self,s=None):""" Instantiate a Serial object using the stored port and baudrate. Parameters ---------- s : serial.Serial If a serial object is passed in then it will be used instead of creating a new one. """withself._connection_lock:ifsisNone:self.s=serial.Serial(self.port,self.baudrate,timeout=3)else:self.s=sself._owns_serial=Falseself._ok_received.set()self._current_line_idx=0self._buffer=[]self.responses=[]self.sentlines=[]self._disconnect_pending=Falseself._start_read_thread()ifsisNone:start_time=time()whilelen(self.responses)==0andtime()<start_time+0.1:sleep(0.01)# wait until a start message is recievedself.responses=[]logger.debug('Connected to {}'.format(self.s))
current_position()
Get the current postion of the printer.
Returns:
Name
Type
Description
pos
dict
Dict with keys of 'X', 'Y', 'Z', and 'E' and values of their
positions
defcurrent_position(self):""" Get the current postion of the printer. Returns ------- pos : dict Dict with keys of 'X', 'Y', 'Z', and 'E' and values of their positions """# example r: X:0.00 Y:0.00 Z:0.00 E:0.00 Count X: 0.00 Y:0.00 Z:0.00r=self.get_response("M114")r=r.split(' Count')[0].strip().split()r=[x.split(':')forxinr]pos=dict([(k,float(v))fork,vinr])returnpos
current_temperature()
Get the current temperature of the printer.
Returns:
Name
Type
Description
temp
dict
Dict with keys of 'T', 'B', 'T/', 'B/', '@', and 'B@'
and values of their temperatures and powers.
T = extruder temperature, can also be T0, T1 ..
B = bed temperature
*/ = target temperature
C = chamber temperature
@ = hotend power
B@ = bed power
defcurrent_temperature(self):""" Get the current temperature of the printer. Returns ------- temp : dict Dict with keys of 'T', 'B', 'T/', 'B/', '@', and 'B@' and values of their temperatures and powers. T = extruder temperature, can also be T0, T1 .. B = bed temperature */ = target temperature C = chamber temperature @ = hotend power B@ = bed power """# example r: T:149.98 /150.00 B:60.00 /60.00 @:72 B@:30r=self.get_response("M105")r=r.replace(' /','/').strip().split()temp={}foriteminr:if':'initem:name,val=item.split(':',1)if'/'inval:val1,val2=val.split('/')temp[name]=float(val1)temp[name+'/']=float(val2)else:temp[name]=float(val)returntemp
disconnect(wait=False)
Disconnect from the printer by stopping threads and closing the port
Parameters:
Name
Type
Description
Default
wait
Bool (default: False)
If true, this method waits until all lines in the buffer have been
sent and acknowledged before disconnecting. Clearing the buffer
isn't guaranteed. If the read thread isn't running for some reason,
this function may return without waiting even when wait is set to
True.
defdisconnect(self,wait=False):""" Disconnect from the printer by stopping threads and closing the port Parameters ---------- wait : Bool (default: False) If true, this method waits until all lines in the buffer have been sent and acknowledged before disconnecting. Clearing the buffer isn't guaranteed. If the read thread isn't running for some reason, this function may return without waiting even when wait is set to True. """withself._connection_lock:self._disconnect_pending=Trueifwait:buf_len=len(self._buffer)whilebuf_len>len(self.responses)and \
self._is_read_thread_running():sleep(0.01)# wait until all lines in the buffer are sentifself._print_threadisnotNone:self.stop_printing=Trueifself.sisnotNoneandself.s.writeTimeoutisnotNone:timeout=self.s.writeTimeout+1else:timeout=10self._print_thread.join(timeout)ifself._read_threadisnotNone:self.stop_reading=Trueifself.sisnotNoneandself.s.timeoutisnotNone:timeout=self.s.timeout+1else:timeout=10self._read_thread.join(timeout)ifself.sisnotNoneandself._owns_serialisTrue:self.s.close()self.s=Noneself.printing=Falseself._current_line_idx=0self._buffer=[]self.responses=[]self.sentlines=[]self._disconnect_pending=Falselogger.debug('Disconnected from printer')
get_response(line,timeout=0)
Send the given line and return the response from the printer.
defget_response(self,line,timeout=0):""" Send the given line and return the response from the printer. Parameters ---------- line : str The line to send to the printer Returns ------- r : str The response from the printer. """buf_len=len(self._buffer)+1self.sendline(line)start_time=time()whilelen(self.responses)!=buf_len:iflen(self.responses)>buf_len:msg="Received more responses than lines sent"raiseRuntimeError(msg)iftimeout>0and(time()-start_time)>timeout:return''# return blank string on timeout.ifnotself._is_read_thread_running():raiseRuntimeError("can't get response from serial since read thread isn't running")sleep(0.01)returnself.responses[-1]
load_file(filepath)
Load the given file into an internal _buffer. The lines will not be
send until self._start_print_thread() is called.
Parameters:
Name
Type
Description
Default
filepath
str
The path to a text file containing lines of GCode to be printed.
defload_file(self,filepath):""" Load the given file into an internal _buffer. The lines will not be send until `self._start_print_thread()` is called. Parameters ---------- filepath : str The path to a text file containing lines of GCode to be printed. """lines=[]withopen(filepath)asf:forlineinf:line=line.strip()if';'inline:# clear out the commentsline=line.split(';')[0]ifline:lines.append(line)self._buffer.extend(lines)
sendline(line)
Send the given line over serial by appending it to the send buffer
defsendline(self,line):""" Send the given line over serial by appending it to the send buffer Parameters ---------- line : str A line of GCode to send to the printer. """ifself._disconnect_pending:msg='Attempted to send line after a disconnect was requested: {}'raiseRuntimeError(msg.format(line))ifline:line=str(line).strip()if';'inline:# clear out the commentsline=line.split(';')[0]ifline:self._buffer.append(line)
defstart(self):""" Starts the read_thread and the _print_thread. """self._start_read_thread()self._start_print_thread()self.reset_linenumber(self._current_line_idx)