diff --git a/cupshelpers/cupshelpers.py b/cupshelpers/cupshelpers.py index 8fe7ead4a..76eff0f81 100755 --- a/cupshelpers/cupshelpers.py +++ b/cupshelpers/cupshelpers.py @@ -526,6 +526,7 @@ def __init__(self, uri, **kw): self.make_and_model = kw.get('device-make-and-model', '') self.id = kw.get('device-id', '') self.location = kw.get('device-location', '') + self.other_attributes = kw.copy () uri_pieces = uri.split(":") self.type = uri_pieces[0] diff --git a/newprinter.py b/newprinter.py index 3d3b49bfb..e09279965 100644 --- a/newprinter.py +++ b/newprinter.py @@ -239,6 +239,7 @@ def __init__(self): self.recommended_model_selected = False self._searchdialog = None self._installdialog = None + self.web_interface_device = None self.getWidgets({"NewPrinterWindow": ["NewPrinterWindow", @@ -254,6 +255,7 @@ def __init__(self): "tvNPDevices", "ntbkNPType", "lblNPDeviceDescription", + "btnNPOpenWebInterface", "expNPDeviceURIs", "tvNPDeviceURIs", "cmbNPTSerialBaud", @@ -380,7 +382,8 @@ def __init__(self): self.tvNCMembers, m), (_("Others"), Gtk.ListStore(str), self.tvNCNotMembers, m), (_("Devices"), Gtk.ListStore(str), self.tvNPDevices, s), - (_("Connections"), Gtk.ListStore(str), self.tvNPDeviceURIs, s), + (_("Connections"), Gtk.ListStore(str, GObject.TYPE_PYOBJECT, str), + self.tvNPDeviceURIs, s), (_("Makes"), Gtk.ListStore(str, str), self.tvNPMakes,s), (_("Models"), Gtk.ListStore(str, str), self.tvNPModels,s), (_("Drivers"), Gtk.ListStore(str), self.tvNPDrivers,s), @@ -453,11 +456,16 @@ def protect_toggle (toggle_widget): self.tvNPDevices.set_row_separator_func (self.device_row_separator_fn, None) self.tvNPDevices.connect ("row-activated", self.device_row_activated) self.tvNPDevices.connect ("row-expanded", self.device_row_expanded) + self.tvNPDeviceURIs.get_selection ().set_select_function ( + self.device_uri_select_function, None) + self.tvNPDeviceURIs.set_row_separator_func ( + self.device_uri_row_separator_fn, None) # Devices expander self.expNPDeviceURIs.connect ("notify::expanded", self.on_expNPDeviceURIs_expanded) self.expNPDeviceURIs.set_expanded(1) + # self.btnNPOpenWebInterface.set_sensitive (False) # SMB browser self.smb_store = Gtk.TreeStore (GObject.TYPE_PYOBJECT) @@ -2246,19 +2254,29 @@ def fillDeviceTab(self, current_uri=None): 'device-info': _("Find Network Printer") } network = cupshelpers.Device ('network', **network_dict) find_nw_iter = model.append (network_iter, - row=[network_dict['device-info'], + row=[self._manual_network_device_label (network), PhysicalDevice (network), False]) - model.insert_after (network_iter, find_nw_iter, row=['', None, True]) + model.append (network_iter, row=['', None, True]) + ipp_group_iter = model.append (network_iter, + row=[_("IPP Destinations"), + None, + False]) + model.append (network_iter, row=['', None, True]) + legacy_group_iter = model.append (network_iter, + row=[_("Legacy Protocols"), + None, + False]) smbdev_dict = { 'device-class': 'network', 'device-info': _("Windows Printer via SAMBA") } smbdev = cupshelpers.Device ('smb', **smbdev_dict) - find_smb_iter = model.append (network_iter, - row=[smbdev_dict['device-info'], - PhysicalDevice (smbdev), False]) - model.insert_after (find_nw_iter, find_smb_iter, row=['', None, True]) + model.append (legacy_group_iter, + row=[self._manual_network_device_label (smbdev), + PhysicalDevice (smbdev), False]) self.devices_uri_iter = uri_iter self.devices_find_nw_iter = find_nw_iter self.devices_network_iter = network_iter + self.devices_network_ipp_group_iter = ipp_group_iter + self.devices_network_legacy_group_iter = legacy_group_iter self.devices_network_fetched = False self.tvNPDevices.set_model (model) self.entNPTDevice.set_text ('') @@ -2408,6 +2426,7 @@ def replace_generic (device): network_iter = self.devices_network_iter find_nw_iter = self.devices_find_nw_iter + ipp_group_iter = self.devices_network_ipp_group_iter for newdevice in newdevices: device = None try: @@ -2420,13 +2439,15 @@ def replace_generic (device): devs = device.get_devices () network = devs[0].device_class == 'network' info = device.get_info () + if network and devs[0].uri == devs[0].type: + info = self._manual_network_device_label (devs[0]) if device == current_device: info += _(" (Current)") row=[info, device, False] if network: if devs[0].uri != devs[0].type: # An actual network printer device. Put this at the top. - iter = model.insert_before (network_iter, find_nw_iter, + iter = model.insert_before (network_iter, ipp_group_iter, row=row) # If this is the currently selected device we need @@ -2436,8 +2457,10 @@ def replace_generic (device): network_path = model.get_path (network_iter) self.tvNPDevices.expand_row (network_path, False) else: - # Just a method of finding one. - iter = model.append (network_iter, row=row) + # Just a manual connection method. + iter = model.append ( + self._network_group_for_manual_device (devs[0]), + row=row) else: # Insert this local device in order. network_path = model.get_path (network_iter) @@ -2458,27 +2481,23 @@ def replace_generic (device): column = self.tvNPDevices.get_column (0) self.tvNPDevices.set_cursor (device_select_path, column, False) - connection_select_path = 0 + connection_select_path = None if current_uri: - model = self.tvNPDeviceURIs.get_model () - iter = model.get_iter_first () - i = 0 - while iter: - dev = model.get_value (iter, 1) - if current_uri == dev.uri: - connection_select_path = i - break - - iter = model.iter_next (iter) - i += 1 + connection_select_path = self._get_connection_path_for_uri ( + current_uri) elif not self.device_selected: # Select the device. column = self.tvNPDevices.get_column (0) self.tvNPDevices.set_cursor (Gtk.TreePath(), column, False) # Select the connection. + self._set_default_connection_selection ( + self.tvNPDeviceURIs.get_model ()) + + if connection_select_path is not None: column = self.tvNPDeviceURIs.get_column (0) - self.tvNPDeviceURIs.set_cursor (connection_select_path, column, False) + self.tvNPDeviceURIs.set_cursor (connection_select_path, column, + False) ## SMB browsing @@ -2952,6 +2971,243 @@ def on_expNPDeviceURIs_expanded (self, widget, UNUSED): parent.set_child_packing (widget, expand, fill, padding, pack_type) + def device_uri_row_separator_fn (self, model, iter, data): + return model.get_value (iter, 2) == "separator" + + def device_uri_select_function (self, selection, model, path, *UNUSED): + iter = model.get_iter (path) + return model.get_value (iter, 2) == "device" + + def _classify_connection_device (self, device): + + if device.type in ["ipp", "ipps", "https"]: + parsed = urllib.parse.urlparse (device.uri) + print(device.type,"dtttt",parsed.path.startswith ("/printers/")) + if parsed.path.startswith ("/printers/"): + return "queue" + return "ipp" + + if device.type in ["dnssd", "mdns"]: + if device.uri.endswith ("/cups"): + return "queue" + + parsed = urllib.parse.urlparse (device.uri) + service = parsed.netloc or "" + if "._ipp" in service: + return "ipp" + return "queue" + + return "other" + + def _connection_group_title (self, group): + return { "ipp": _("IPP Destinations"), + "queue": _("Queues"), + "other": _("Other Connections") }[group] + + def _group_connection_devices (self, devices): + grouped = { "ipp": [], "queue": [], "other": [] } + for device in devices: + grouped[self._classify_connection_device (device)].append (device) + return grouped + + def _append_connection_group (self, model, title, devices): + if not devices: + return + + if model.iter_n_children (None) > 0: + model.append (["", None, "separator"]) + + model.append ([title, None, "section"]) + for device in devices: + model.append ([device.menuentry, device, "device"]) + + def _set_default_connection_selection (self, model): + iter = model.get_iter_first () + while iter: + if model.get_value (iter, 2) == "device": + path = model.get_path (iter) + column = self.tvNPDeviceURIs.get_column (0) + self.tvNPDeviceURIs.set_cursor (path, column, False) + return + + iter = model.iter_next (iter) + + # self.btnNPOpenWebInterface.set_sensitive (False) + + def _get_connection_path_for_uri (self, uri): + model = self.tvNPDeviceURIs.get_model () + if model is None: + return None + + iter = model.get_iter_first () + while iter: + if model.get_value (iter, 2) == "device": + device = model.get_value (iter, 1) + if device is not None and device.uri == uri: + return model.get_path (iter) + + iter = model.iter_next (iter) + + return None + + def _network_group_for_manual_device (self, device): + if device.type in ["ipp", "ipps", "https"]: + return self.devices_network_ipp_group_iter + + return self.devices_network_legacy_group_iter + + def _manual_network_device_label (self, device): + labels = { + "network": _("Find Network Printer"), + "smb": _("SAMBA"), + "socket": _("JetDirect"), + "lpd": _("LPD/LPR"), + "ipp": _("IPP"), + "ipps": _("IPPS"), + "http": _("HTTP"), + "https": _("HTTPS"), + } + return labels.get (device.type, + getattr (device, "info", None) or device.uri) + + def _get_adminurl_from_avahi(self, device): + import subprocess + import re + + try: + output = subprocess.check_output( + ["avahi-browse", "-rt", "_ipps._tcp"], + text=True + ) + + matches = re.findall(r'adminurl=([^\s"]+)', output) + + if matches: + return matches[0].replace(".local./", ".local/") + + except Exception as e: + print("Avahi error:", e) + + return None + def _get_device_web_interface_url(self, device, physicaldevice=None): + import urllib.parse + + attrs = getattr(device, "other_attributes", {}) + + for key in ["printer-more-info", "device-more-info", "adminurl"]: + url = attrs.get(key) + if isinstance(url, list): + url = url[0] if url else None + if url: + return url.replace(".local./", ".local/") + + if physicaldevice: + txt = getattr(physicaldevice, "txt", None) or \ + getattr(physicaldevice, "dnssd_txt", None) + + if txt: + for entry in txt: + if isinstance(entry, bytes): + entry = entry.decode(errors="ignore") + + if isinstance(entry, str) and entry.startswith("adminurl="): + url = entry.split("=", 1)[1] + return url.replace(".local./", ".local/") + + parsed = urllib.parse.urlparse(device.uri) + raw_host = parsed.hostname or "" + + if "._tcp" in raw_host: + url = self._get_adminurl_from_avahi(device) + print("ggg",url) + if url: + return url + + host = raw_host + + if not host or "._tcp" in host: + host = ( + attrs.get("hostname") or + attrs.get("host") or + attrs.get("address") or + attrs.get("ip-address") + ) + + if not host and physicaldevice: + host = ( + getattr(physicaldevice, "dnssd_hostname", None) or + getattr(physicaldevice, "_network_host", None) or + getattr(physicaldevice, "address", None) + ) + + if not host: + return None + + host = urllib.parse.unquote(host).rstrip(".") + + scheme = "https" if parsed.scheme in ["ipps", "https"] else "http" + + port = parsed.port + if port and port not in [80, 443]: + return f"{scheme}://{host}:{port}/" + + return f"{scheme}://{host}/" + + def _get_preferred_ipp_device (self, physicaldevice): + for device in physicaldevice.get_devices (): + if (self._classify_connection_device (device) == "ipp" and + self._get_device_web_interface_url (device, + physicaldevice=physicaldevice) is not None): + return device + + return None + + def _get_selected_physical_device (self): + path, column = self.tvNPDevices.get_cursor () + if path is None: + return None + + model = self.tvNPDevices.get_model () + if model is None: + return None + + iter = model.get_iter (path) + if iter is None: + return None + + return model.get_value (iter, 1) + + def _get_selected_connection_device (self): + path, column = self.tvNPDeviceURIs.get_cursor () + if path is None: + return None + + model = self.tvNPDeviceURIs.get_model () + if model is None: + return None + + iter = model.get_iter (path) + if iter is None: + return None + + return model.get_value (iter, 1) + + def _update_web_interface_button (self, device=None, physicaldevice=None): + if device is None: + device = self._get_selected_connection_device () + if (device is None or + self._classify_connection_device (device) != "ipp" or + self._get_device_web_interface_url (device, + physicaldevice=physicaldevice) is None): + if physicaldevice is None: + physicaldevice = self._get_selected_physical_device () + if physicaldevice is not None: + device = self._get_preferred_ipp_device (physicaldevice) + else: + device = None + self.web_interface_device = device + self.btnNPOpenWebInterface.set_sensitive (device is not None) + def device_row_separator_fn (self, model, iter, data): return model.get_value (iter, 2) @@ -3132,7 +3388,8 @@ def on_tvNPDevices_cursor_changed(self, widget): device.menuentry = device.uri model = Gtk.ListStore (str, # URI description - GObject.TYPE_PYOBJECT) # cupshelpers.Device + GObject.TYPE_PYOBJECT, # cupshelpers.Device + str) # Row type self.tvNPDeviceURIs.set_model (model) # If this is a network device, check whether HPLIP can drive it. @@ -3218,13 +3475,18 @@ def on_tvNPDevices_cursor_changed(self, widget): device.hp_scannable = getattr (physicaldevice, 'hp_scannable', None) - # Fill the list of connections for this device. - n = 0 - for device in physicaldevice.get_devices (): - model.append ((device.menuentry, device)) - n += 1 - column = self.tvNPDeviceURIs.get_column (0) - self.tvNPDeviceURIs.set_cursor (Gtk.TreePath(), column, False) + # Fill the list of connections for this device with IPP entries + # separated from queue-style endpoints. + grouped_devices = self._group_connection_devices ( + physicaldevice.get_devices ()) + for group in ["ipp", "queue", "other"]: + self._append_connection_group ( + model, self._connection_group_title (group), + grouped_devices[group]) + self._set_default_connection_selection (model) + # Keep main selection in sync with the default connection row. + self.device = self._get_selected_connection_device () + self._update_web_interface_button (physicaldevice=physicaldevice) if show_uris: self.expNPDeviceURIs.show_all () else: @@ -3238,7 +3500,11 @@ def on_tvNPDeviceURIs_cursor_changed(self, widget): model = widget.get_model () iter = model.get_iter (path) device = model.get_value(iter, 1) + if device is None: + return + self.device = device + self._update_web_interface_button (device=device) self.lblNPDeviceDescription.set_text ('') page = self.new_printer_device_tabs.get (device.type, self.PAGE_SELECT_DEVICE) self.ntbkNPType.set_current_page(page) @@ -3395,6 +3661,32 @@ def on_tvNPDeviceURIs_cursor_changed(self, widget): self.setNPButtons() + def on_btnNPOpenWebInterface_clicked(self, button): + import subprocess + + url = self._get_device_web_interface_url( + self.web_interface_device, + physicaldevice=self._get_selected_physical_device() + ) + + print(url) + + if not url: + show_error_dialog( + _("Unable to Open Web Interface"), + _("No web interface URL was provided by the printer."), + parent=self.NewPrinterWindow + ) + return + + try: + subprocess.Popen(["xdg-open", url]) + except Exception as e: + show_error_dialog( + _("Unable to Open Web Interface"), + str(e), + parent=self.NewPrinterWindow + ) def on_entNPTLpdHost_changed(self, ent): hostname = ent.get_text() self.btnNPTLpdProbe.set_sensitive (len (hostname) > 0) @@ -3501,7 +3793,8 @@ def found_network_printer_callback (self, new_device): self.devices.append (dev) self.devices.sort () model = self.tvNPDevices.get_model () - iter = model.insert_before (None, self.devices_find_nw_iter, + iter = model.insert_before (self.devices_network_iter, + self.devices_network_ipp_group_iter, row=[dev.get_info (), dev, False]) # If this is the first one we've found, select it. @@ -3525,6 +3818,8 @@ def found_network_printer_callback (self, new_device): ### def getDeviceURI(self): + if self.device is None: + raise AttributeError if self.dialog_mode in ['printer_with_uri', 'ppd']: return self.device.uri diff --git a/ui/NewPrinterWindow.ui b/ui/NewPrinterWindow.ui index ce9aab4e4..02bf81748 100644 --- a/ui/NewPrinterWindow.ui +++ b/ui/NewPrinterWindow.ui @@ -1436,6 +1436,22 @@ ipp://printer.mydomain/ipp 0 + + + Open Web Interface + True + True + True + start + False + + + + False + False + 1 + + 120 @@ -1472,7 +1488,7 @@ ipp://printer.mydomain/ipp True True end - 1 + 2